1
// mgo - MongoDB driver for Go
3
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
5
// All rights reserved.
7
// Redistribution and use in source and binary forms, with or without
8
// modification, are permitted provided that the following conditions are met:
10
// 1. Redistributions of source code must retain the above copyright notice, this
11
// list of conditions and the following disclaimer.
12
// 2. Redistributions in binary form must reproduce the above copyright notice,
13
// this list of conditions and the following disclaimer in the documentation
14
// and/or other materials provided with the distribution.
16
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
20
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
44
"github.com/juju/loggo"
45
"gopkg.in/mgo.v2/bson"
51
// Relevant documentation on read preference modes:
53
// http://docs.mongodb.org/manual/reference/read-preference/
55
Primary Mode = 2 // Default mode. All operations read from the current replica set primary.
56
PrimaryPreferred Mode = 3 // Read from the primary if available. Read from the secondary otherwise.
57
Secondary Mode = 4 // Read from one of the nearest secondary members of the replica set.
58
SecondaryPreferred Mode = 5 // Read from one of the nearest secondaries if available. Read from primary otherwise.
59
Nearest Mode = 6 // Read from one of the nearest members, irrespective of it being primary or secondary.
61
// Read preference modes are specific to mgo:
62
Eventual Mode = 0 // Same as Nearest, but may change servers between reads.
63
Monotonic Mode = 1 // Same as SecondaryPreferred before first write. Same as Primary after first write.
64
Strong Mode = 2 // Same as Primary.
67
// mgo.v3: Drop Strong mode, suffix all modes with "Mode".
69
// When changing the Session type, check if newSession and copySession
70
// need to be updated too.
72
// Session represents a communication session with the database.
74
// All Session methods are concurrency-safe and may be called from multiple
75
// goroutines. In all session modes but Eventual, using the session from
76
// multiple goroutines will cause them to share the same underlying socket.
77
// See the documentation on Session.SetMode for more details.
80
cluster_ *mongoCluster
81
slaveSocket *mongoSocket
82
masterSocket *mongoSocket
87
syncTimeout time.Duration
88
sockTimeout time.Duration
97
type Database struct {
102
type Collection struct {
104
Name string // "collection"
105
FullName string // "db.collection"
111
query // Enables default settings in session.
120
type getLastError struct {
121
CmdName int "getLastError,omitempty"
122
W interface{} "w,omitempty"
123
WTimeout int "wtimeout,omitempty"
124
FSync bool "fsync,omitempty"
140
timeout time.Duration
146
ErrNotFound = errors.New("not found")
147
ErrCursor = errors.New("invalid cursor")
149
logPatchedOnce sync.Once
150
logger = loggo.GetLogger("mgo")
154
defaultPrefetch = 0.25
156
// How many times we will retry an upsert if it produces duplicate
161
// Dial establishes a new session to the cluster identified by the given seed
162
// server(s). The session will enable communication with all of the servers in
163
// the cluster, so the seed servers are used only to find out about the cluster
166
// Dial will timeout after 10 seconds if a server isn't reached. The returned
167
// session will timeout operations after one minute by default if servers
168
// aren't available. To customize the timeout, see DialWithTimeout,
169
// SetSyncTimeout, and SetSocketTimeout.
171
// This method is generally called just once for a given cluster. Further
172
// sessions to the same cluster are then established using the New or Copy
173
// methods on the obtained session. This will make them share the underlying
174
// cluster, and manage the pool of connections appropriately.
176
// Once the session is not useful anymore, Close must be called to release the
177
// resources appropriately.
179
// The seed servers must be provided in the following format:
181
// [mongodb://][user:pass@]host1[:port1][,host2[:port2],...][/database][?options]
183
// For example, it may be as simple as:
187
// Or more involved like:
189
// mongodb://myuser:mypass@localhost:40001,otherhost:40001/mydb
191
// If the port number is not provided for a server, it defaults to 27017.
193
// The username and password provided in the URL will be used to authenticate
194
// into the database named after the slash at the end of the host names, or
195
// into the "admin" database if none is provided. The authentication information
196
// will persist in sessions obtained through the New method as well.
198
// The following connection options are supported after the question mark:
202
// Disables the automatic replica set server discovery logic, and
203
// forces the use of servers provided only (even if secondaries).
204
// Note that to talk to a secondary the consistency requirements
205
// must be relaxed to Monotonic or Eventual via SetMode.
208
// connect=replicaSet
210
// Discover replica sets automatically. Default connection behavior.
213
// replicaSet=<setname>
215
// If specified will prevent the obtained session from communicating
216
// with any server which is not part of a replica set with the given name.
217
// The default is to communicate with any server specified or discovered
218
// via the servers contacted.
223
// Informs the database used to establish credentials and privileges
224
// with a MongoDB server. Defaults to the database name provided via
225
// the URL path, and "admin" if that's unset.
228
// authMechanism=<mechanism>
230
// Defines the protocol for credential negotiation. Defaults to "MONGODB-CR",
231
// which is the default username/password challenge-response mechanism.
234
// gssapiServiceName=<name>
236
// Defines the service name to use when authenticating with the GSSAPI
237
// mechanism. Defaults to "mongodb".
240
// maxPoolSize=<limit>
242
// Defines the per-server socket pool limit. Defaults to 4096.
243
// See Session.SetPoolLimit for details.
246
// Relevant documentation:
248
// http://docs.mongodb.org/manual/reference/connection-string/
250
func Dial(url string) (*Session, error) {
251
session, err := DialWithTimeout(url, 10*time.Second)
253
session.SetSyncTimeout(1 * time.Minute)
254
session.SetSocketTimeout(1 * time.Minute)
259
// DialWithTimeout works like Dial, but uses timeout as the amount of time to
260
// wait for a server to respond when first connecting and also on follow up
261
// operations in the session. If timeout is zero, the call may block
262
// forever waiting for a connection to be made.
264
// See SetSyncTimeout for customizing the timeout for the session.
265
func DialWithTimeout(url string, timeout time.Duration) (*Session, error) {
266
info, err := ParseURL(url)
270
info.Timeout = timeout
271
return DialWithInfo(info)
274
// ParseURL parses a MongoDB URL as accepted by the Dial function and returns
275
// a value suitable for providing into DialWithInfo.
277
// See Dial for more details on the format of url.
278
func ParseURL(url string) (*DialInfo, error) {
279
uinfo, err := extractURL(url)
289
for k, v := range uinfo.options {
293
case "authMechanism":
295
case "gssapiServiceName":
300
poolLimit, err = strconv.Atoi(v)
302
return nil, errors.New("bad value for maxPoolSize: " + v)
309
if v == "replicaSet" {
314
return nil, errors.New("unsupported connection URL option: " + k + "=" + v)
321
Username: uinfo.user,
322
Password: uinfo.pass,
323
Mechanism: mechanism,
326
PoolLimit: poolLimit,
327
ReplicaSetName: setName,
332
// DialInfo holds options for establishing a session with a MongoDB cluster.
333
// To use a URL, see the Dial function.
334
type DialInfo struct {
335
// Addrs holds the addresses for the seed servers.
338
// Direct informs whether to establish connections only with the
339
// specified seed servers, or to obtain information for the whole
340
// cluster and establish connections with further servers too.
343
// Timeout is the amount of time to wait for a server to respond when
344
// first connecting and on follow up operations in the session. If
345
// timeout is zero, the call may block forever waiting for a connection
346
// to be established. Timeout does not affect logic in DialServer.
347
Timeout time.Duration
349
// FailFast will cause connection and query attempts to fail faster when
350
// the server is unavailable, instead of retrying until the configured
351
// timeout period. Note that an unavailable server may silently drop
352
// packets instead of rejecting them, in which case it's impossible to
353
// distinguish it from a slow server, so the timeout stays relevant.
356
// Database is the default database name used when the Session.DB method
357
// is called with an empty name, and is also used during the initial
358
// authentication if Source is unset.
361
// ReplicaSetName, if specified, will prevent the obtained session from
362
// communicating with any server which is not part of a replica set
363
// with the given name. The default is to communicate with any server
364
// specified or discovered via the servers contacted.
365
ReplicaSetName string
367
// Source is the database used to establish credentials and privileges
368
// with a MongoDB server. Defaults to the value of Database, if that is
369
// set, or "admin" otherwise.
372
// Service defines the service name to use when authenticating with the GSSAPI
373
// mechanism. Defaults to "mongodb".
376
// ServiceHost defines which hostname to use when authenticating
377
// with the GSSAPI mechanism. If not specified, defaults to the MongoDB
381
// Mechanism defines the protocol for credential negotiation.
382
// Defaults to "MONGODB-CR".
385
// Username and Password inform the credentials for the initial authentication
386
// done on the database defined by the Source field. See Session.Login.
390
// PoolLimit defines the per-server socket pool limit. Defaults to 4096.
391
// See Session.SetPoolLimit for details.
394
// DialServer optionally specifies the dial function for establishing
395
// connections with the MongoDB servers.
396
DialServer func(addr *ServerAddr) (net.Conn, error)
398
// WARNING: This field is obsolete. See DialServer above.
399
Dial func(addr net.Addr) (net.Conn, error)
402
// mgo.v3: Drop DialInfo.Dial.
404
// ServerAddr represents the address for establishing a connection to an
405
// individual MongoDB server.
406
type ServerAddr struct {
411
// String returns the address that was provided for the server before resolution.
412
func (addr *ServerAddr) String() string {
416
// TCPAddr returns the resolved TCP address for the server.
417
func (addr *ServerAddr) TCPAddr() *net.TCPAddr {
421
// DialWithInfo establishes a new session to the cluster identified by info.
422
func DialWithInfo(info *DialInfo) (*Session, error) {
423
// This is using loggo because that can be done here in a
424
// localised patch, while using mgo's logging would need a change
425
// in Juju to call mgo.SetLogger. It's in this short-lived patch
426
// as a stop-gap because it's proving difficult to tell if the
427
// patch is applied in a running system. If you see it in
428
// committed code then something has gone very awry - please
429
// complain loudly! (babbageclunk)
430
logPatchedOnce.Do(func() {
431
logger.Debugf("duplicate key error patch applied")
433
addrs := make([]string, len(info.Addrs))
434
for i, addr := range info.Addrs {
435
p := strings.LastIndexAny(addr, "]:")
436
if p == -1 || addr[p] != ':' {
437
// XXX This is untested. The test suite doesn't use the standard port.
442
cluster := newCluster(addrs, info.Direct, info.FailFast, dialer{info.Dial, info.DialServer}, info.ReplicaSetName)
443
session := newSession(Eventual, cluster, info.Timeout)
444
session.defaultdb = info.Database
445
if session.defaultdb == "" {
446
session.defaultdb = "test"
448
session.sourcedb = info.Source
449
if session.sourcedb == "" {
450
session.sourcedb = info.Database
451
if session.sourcedb == "" {
452
session.sourcedb = "admin"
455
if info.Username != "" {
456
source := session.sourcedb
457
if info.Source == "" &&
458
(info.Mechanism == "GSSAPI" || info.Mechanism == "PLAIN" || info.Mechanism == "MONGODB-X509") {
461
session.dialCred = &Credential{
462
Username: info.Username,
463
Password: info.Password,
464
Mechanism: info.Mechanism,
465
Service: info.Service,
466
ServiceHost: info.ServiceHost,
469
session.creds = []Credential{*session.dialCred}
471
if info.PoolLimit > 0 {
472
session.poolLimit = info.PoolLimit
476
// People get confused when we return a session that is not actually
477
// established to any servers yet (e.g. what if url was wrong). So,
478
// ping the server to ensure there's someone there, and abort if it
480
if err := session.Ping(); err != nil {
484
session.SetMode(Strong, true)
488
func isOptSep(c rune) bool {
489
return c == ';' || c == '&'
492
type urlInfo struct {
497
options map[string]string
500
func extractURL(s string) (*urlInfo, error) {
501
if strings.HasPrefix(s, "mongodb://") {
504
info := &urlInfo{options: make(map[string]string)}
505
if c := strings.Index(s, "?"); c != -1 {
506
for _, pair := range strings.FieldsFunc(s[c+1:], isOptSep) {
507
l := strings.SplitN(pair, "=", 2)
508
if len(l) != 2 || l[0] == "" || l[1] == "" {
509
return nil, errors.New("connection option must be key=value: " + pair)
511
info.options[l[0]] = l[1]
515
if c := strings.Index(s, "@"); c != -1 {
516
pair := strings.SplitN(s[:c], ":", 2)
517
if len(pair) > 2 || pair[0] == "" {
518
return nil, errors.New("credentials must be provided as user:pass@host")
521
info.user, err = url.QueryUnescape(pair[0])
523
return nil, fmt.Errorf("cannot unescape username in URL: %q", pair[0])
526
info.pass, err = url.QueryUnescape(pair[1])
528
return nil, fmt.Errorf("cannot unescape password in URL")
533
if c := strings.Index(s, "/"); c != -1 {
537
info.addrs = strings.Split(s, ",")
541
func newSession(consistency Mode, cluster *mongoCluster, timeout time.Duration) (session *Session) {
545
syncTimeout: timeout,
546
sockTimeout: timeout,
549
debugf("New session %p on cluster %p", session, cluster)
550
session.SetMode(consistency, true)
551
session.SetSafe(&Safe{})
552
session.queryConfig.prefetch = defaultPrefetch
556
func copySession(session *Session, keepCreds bool) (s *Session) {
557
cluster := session.cluster()
559
if session.masterSocket != nil {
560
session.masterSocket.Acquire()
562
if session.slaveSocket != nil {
563
session.slaveSocket.Acquire()
565
var creds []Credential
567
creds = make([]Credential, len(session.creds))
568
copy(creds, session.creds)
569
} else if session.dialCred != nil {
570
creds = []Credential{*session.dialCred}
573
scopy.m = sync.RWMutex{}
576
debugf("New session %p on cluster %p (copy from %p)", s, cluster, session)
580
// LiveServers returns a list of server addresses which are
581
// currently known to be alive.
582
func (s *Session) LiveServers() (addrs []string) {
584
addrs = s.cluster().LiveServers()
589
// DB returns a value representing the named database. If name
590
// is empty, the database name provided in the dialed URL is
591
// used instead. If that is also empty, "test" is used as a
592
// fallback in a way equivalent to the mongo shell.
594
// Creating this value is a very lightweight operation, and
595
// involves no network communication.
596
func (s *Session) DB(name string) *Database {
600
return &Database{s, name}
603
// C returns a value representing the named collection.
605
// Creating this value is a very lightweight operation, and
606
// involves no network communication.
607
func (db *Database) C(name string) *Collection {
608
return &Collection{db, name, db.Name + "." + name}
611
// With returns a copy of db that uses session s.
612
func (db *Database) With(s *Session) *Database {
618
// With returns a copy of c that uses session s.
619
func (c *Collection) With(s *Session) *Collection {
623
newc.Database = &newdb
627
// GridFS returns a GridFS value representing collections in db that
628
// follow the standard GridFS specification.
629
// The provided prefix (sometimes known as root) will determine which
630
// collections to use, and is usually set to "fs" when there is a
631
// single GridFS in the database.
633
// See the GridFS Create, Open, and OpenId methods for more details.
635
// Relevant documentation:
637
// http://www.mongodb.org/display/DOCS/GridFS
638
// http://www.mongodb.org/display/DOCS/GridFS+Tools
639
// http://www.mongodb.org/display/DOCS/GridFS+Specification
641
func (db *Database) GridFS(prefix string) *GridFS {
642
return newGridFS(db, prefix)
645
// Run issues the provided command on the db database and unmarshals
646
// its result in the respective argument. The cmd argument may be either
647
// a string with the command name itself, in which case an empty document of
648
// the form bson.M{cmd: 1} will be used, or it may be a full command document.
650
// Note that MongoDB considers the first marshalled key as the command
651
// name, so when providing a command with options, it's important to
652
// use an ordering-preserving document, such as a struct value or an
653
// instance of bson.D. For instance:
655
// db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
657
// For privilleged commands typically run on the "admin" database, see
658
// the Run method in the Session type.
660
// Relevant documentation:
662
// http://www.mongodb.org/display/DOCS/Commands
663
// http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
665
func (db *Database) Run(cmd interface{}, result interface{}) error {
666
socket, err := db.Session.acquireSocket(true)
670
defer socket.Release()
672
// This is an optimized form of db.C("$cmd").Find(cmd).One(result).
673
return db.run(socket, cmd, result)
676
// Credential holds details to authenticate with a MongoDB server.
677
type Credential struct {
678
// Username and Password hold the basic details for authentication.
679
// Password is optional with some authentication mechanisms.
683
// Source is the database used to establish credentials and privileges
684
// with a MongoDB server. Defaults to the default database provided
685
// during dial, or "admin" if that was unset.
688
// Service defines the service name to use when authenticating with the GSSAPI
689
// mechanism. Defaults to "mongodb".
692
// ServiceHost defines which hostname to use when authenticating
693
// with the GSSAPI mechanism. If not specified, defaults to the MongoDB
697
// Mechanism defines the protocol for credential negotiation.
698
// Defaults to "MONGODB-CR".
702
// Login authenticates with MongoDB using the provided credential. The
703
// authentication is valid for the whole session and will stay valid until
704
// Logout is explicitly called for the same database, or the session is
706
func (db *Database) Login(user, pass string) error {
707
return db.Session.Login(&Credential{Username: user, Password: pass, Source: db.Name})
710
// Login authenticates with MongoDB using the provided credential. The
711
// authentication is valid for the whole session and will stay valid until
712
// Logout is explicitly called for the same database, or the session is
714
func (s *Session) Login(cred *Credential) error {
715
socket, err := s.acquireSocket(true)
719
defer socket.Release()
722
if cred.Source == "" {
723
if cred.Mechanism == "GSSAPI" {
724
credCopy.Source = "$external"
726
credCopy.Source = s.sourcedb
729
err = socket.Login(credCopy)
735
s.creds = append(s.creds, credCopy)
740
func (s *Session) socketLogin(socket *mongoSocket) error {
741
for _, cred := range s.creds {
742
if err := socket.Login(cred); err != nil {
749
// Logout removes any established authentication credentials for the database.
750
func (db *Database) Logout() {
751
session := db.Session
755
for i, cred := range session.creds {
756
if cred.Source == dbname {
757
copy(session.creds[i:], session.creds[i+1:])
758
session.creds = session.creds[:len(session.creds)-1]
764
if session.masterSocket != nil {
765
session.masterSocket.Logout(dbname)
767
if session.slaveSocket != nil {
768
session.slaveSocket.Logout(dbname)
774
// LogoutAll removes all established authentication credentials for the session.
775
func (s *Session) LogoutAll() {
777
for _, cred := range s.creds {
778
if s.masterSocket != nil {
779
s.masterSocket.Logout(cred.Source)
781
if s.slaveSocket != nil {
782
s.slaveSocket.Logout(cred.Source)
785
s.creds = s.creds[0:0]
789
// User represents a MongoDB user.
791
// Relevant documentation:
793
// http://docs.mongodb.org/manual/reference/privilege-documents/
794
// http://docs.mongodb.org/manual/reference/user-privileges/
797
// Username is how the user identifies itself to the system.
798
Username string `bson:"user"`
800
// Password is the plaintext password for the user. If set,
801
// the UpsertUser method will hash it into PasswordHash and
802
// unset it before the user is added to the database.
803
Password string `bson:",omitempty"`
805
// PasswordHash is the MD5 hash of Username+":mongo:"+Password.
806
PasswordHash string `bson:"pwd,omitempty"`
808
// CustomData holds arbitrary data admins decide to associate
809
// with this user, such as the full name or employee id.
810
CustomData interface{} `bson:"customData,omitempty"`
812
// Roles indicates the set of roles the user will be provided.
813
// See the Role constants.
814
Roles []Role `bson:"roles"`
816
// OtherDBRoles allows assigning roles in other databases from
817
// user documents inserted in the admin database. This field
818
// only works in the admin database.
819
OtherDBRoles map[string][]Role `bson:"otherDBRoles,omitempty"`
821
// UserSource indicates where to look for this user's credentials.
822
// It may be set to a database name, or to "$external" for
823
// consulting an external resource such as Kerberos. UserSource
824
// must not be set if Password or PasswordHash are present.
826
// WARNING: This setting was only ever supported in MongoDB 2.4,
827
// and is now obsolete.
828
UserSource string `bson:"userSource,omitempty"`
834
// Relevant documentation:
836
// http://docs.mongodb.org/manual/reference/user-privileges/
838
RoleRoot Role = "root"
839
RoleRead Role = "read"
840
RoleReadAny Role = "readAnyDatabase"
841
RoleReadWrite Role = "readWrite"
842
RoleReadWriteAny Role = "readWriteAnyDatabase"
843
RoleDBAdmin Role = "dbAdmin"
844
RoleDBAdminAny Role = "dbAdminAnyDatabase"
845
RoleUserAdmin Role = "userAdmin"
846
RoleUserAdminAny Role = "userAdminAnyDatabase"
847
RoleClusterAdmin Role = "clusterAdmin"
850
// UpsertUser updates the authentication credentials and the roles for
851
// a MongoDB user within the db database. If the named user doesn't exist
852
// it will be created.
854
// This method should only be used from MongoDB 2.4 and on. For older
855
// MongoDB releases, use the obsolete AddUser method instead.
857
// Relevant documentation:
859
// http://docs.mongodb.org/manual/reference/user-privileges/
860
// http://docs.mongodb.org/manual/reference/privilege-documents/
862
func (db *Database) UpsertUser(user *User) error {
863
if user.Username == "" {
864
return fmt.Errorf("user has no Username")
866
if (user.Password != "" || user.PasswordHash != "") && user.UserSource != "" {
867
return fmt.Errorf("user has both Password/PasswordHash and UserSource set")
869
if len(user.OtherDBRoles) > 0 && db.Name != "admin" && db.Name != "$external" {
870
return fmt.Errorf("user with OtherDBRoles is only supported in the admin or $external databases")
873
// Attempt to run this using 2.6+ commands.
875
if user.UserSource != "" {
876
// Compatibility logic for the userSource field of MongoDB <= 2.4.X
877
rundb = db.Session.DB(user.UserSource)
879
err := rundb.runUserCmd("updateUser", user)
880
// retry with createUser when isAuthError in order to enable the "localhost exception"
881
if isNotFound(err) || isAuthError(err) {
882
return rundb.runUserCmd("createUser", user)
888
// Command does not exist. Fallback to pre-2.6 behavior.
889
var set, unset bson.D
890
if user.Password != "" {
892
psum.Write([]byte(user.Username + ":mongo:" + user.Password))
893
set = append(set, bson.DocElem{"pwd", hex.EncodeToString(psum.Sum(nil))})
894
unset = append(unset, bson.DocElem{"userSource", 1})
895
} else if user.PasswordHash != "" {
896
set = append(set, bson.DocElem{"pwd", user.PasswordHash})
897
unset = append(unset, bson.DocElem{"userSource", 1})
899
if user.UserSource != "" {
900
set = append(set, bson.DocElem{"userSource", user.UserSource})
901
unset = append(unset, bson.DocElem{"pwd", 1})
903
if user.Roles != nil || user.OtherDBRoles != nil {
904
set = append(set, bson.DocElem{"roles", user.Roles})
905
if len(user.OtherDBRoles) > 0 {
906
set = append(set, bson.DocElem{"otherDBRoles", user.OtherDBRoles})
908
unset = append(unset, bson.DocElem{"otherDBRoles", 1})
911
users := db.C("system.users")
912
err = users.Update(bson.D{{"user", user.Username}}, bson.D{{"$unset", unset}, {"$set", set}})
913
if err == ErrNotFound {
914
set = append(set, bson.DocElem{"user", user.Username})
915
if user.Roles == nil && user.OtherDBRoles == nil {
916
// Roles must be sent, as it's the way MongoDB distinguishes
917
// old-style documents from new-style documents in pre-2.6.
918
set = append(set, bson.DocElem{"roles", user.Roles})
920
err = users.Insert(set)
925
func isNoCmd(err error) bool {
926
e, ok := err.(*QueryError)
927
return ok && (e.Code == 59 || e.Code == 13390 || strings.HasPrefix(e.Message, "no such cmd:"))
930
func isNotFound(err error) bool {
931
e, ok := err.(*QueryError)
932
return ok && e.Code == 11
935
func isAuthError(err error) bool {
936
e, ok := err.(*QueryError)
937
return ok && e.Code == 13
940
func (db *Database) runUserCmd(cmdName string, user *User) error {
941
cmd := make(bson.D, 0, 16)
942
cmd = append(cmd, bson.DocElem{cmdName, user.Username})
943
if user.Password != "" {
944
cmd = append(cmd, bson.DocElem{"pwd", user.Password})
946
var roles []interface{}
947
for _, role := range user.Roles {
948
roles = append(roles, role)
950
for db, dbroles := range user.OtherDBRoles {
951
for _, role := range dbroles {
952
roles = append(roles, bson.D{{"role", role}, {"db", db}})
955
if roles != nil || user.Roles != nil || cmdName == "createUser" {
956
cmd = append(cmd, bson.DocElem{"roles", roles})
958
err := db.Run(cmd, nil)
959
if !isNoCmd(err) && user.UserSource != "" && (user.UserSource != "$external" || db.Name != "$external") {
960
return fmt.Errorf("MongoDB 2.6+ does not support the UserSource setting")
965
// AddUser creates or updates the authentication credentials of user within
968
// WARNING: This method is obsolete and should only be used with MongoDB 2.2
969
// or earlier. For MongoDB 2.4 and on, use UpsertUser instead.
970
func (db *Database) AddUser(username, password string, readOnly bool) error {
971
// Try to emulate the old behavior on 2.6+
972
user := &User{Username: username, Password: password}
973
if db.Name == "admin" {
975
user.Roles = []Role{RoleReadAny}
977
user.Roles = []Role{RoleReadWriteAny}
981
user.Roles = []Role{RoleRead}
983
user.Roles = []Role{RoleReadWrite}
986
err := db.runUserCmd("updateUser", user)
988
return db.runUserCmd("createUser", user)
994
// Command doesn't exist. Fallback to pre-2.6 behavior.
996
psum.Write([]byte(username + ":mongo:" + password))
997
digest := hex.EncodeToString(psum.Sum(nil))
998
c := db.C("system.users")
999
_, err = c.Upsert(bson.M{"user": username}, bson.M{"$set": bson.M{"user": username, "pwd": digest, "readOnly": readOnly}})
1003
// RemoveUser removes the authentication credentials of user from the database.
1004
func (db *Database) RemoveUser(user string) error {
1005
err := db.Run(bson.D{{"dropUser", user}}, nil)
1007
users := db.C("system.users")
1008
return users.Remove(bson.M{"user": user})
1010
if isNotFound(err) {
1016
type indexSpec struct {
1019
Unique bool ",omitempty"
1020
DropDups bool "dropDups,omitempty"
1021
Background bool ",omitempty"
1022
Sparse bool ",omitempty"
1023
Bits int ",omitempty"
1024
Min, Max float64 ",omitempty"
1025
BucketSize float64 "bucketSize,omitempty"
1026
ExpireAfter int "expireAfterSeconds,omitempty"
1027
Weights bson.D ",omitempty"
1028
DefaultLanguage string "default_language,omitempty"
1029
LanguageOverride string "language_override,omitempty"
1030
TextIndexVersion int "textIndexVersion,omitempty"
1034
Key []string // Index key fields; prefix name with dash (-) for descending order
1035
Unique bool // Prevent two documents from having the same index key
1036
DropDups bool // Drop documents with the same index key as a previously indexed one
1037
Background bool // Build index in background and return immediately
1038
Sparse bool // Only index documents containing the Key fields
1040
// If ExpireAfter is defined the server will periodically delete
1041
// documents with indexed time.Time older than the provided delta.
1042
ExpireAfter time.Duration
1044
// Name holds the stored index name. On creation if this field is unset it is
1045
// computed by EnsureIndex based on the index key.
1048
// Properties for spatial indexes.
1050
// Min and Max were improperly typed as int when they should have been
1051
// floats. To preserve backwards compatibility they are still typed as
1052
// int and the following two fields enable reading and writing the same
1053
// fields as float numbers. In mgo.v3, these fields will be dropped and
1054
// Min/Max will become floats.
1060
// Properties for text indexes.
1061
DefaultLanguage string
1062
LanguageOverride string
1064
// Weights defines the significance of provided fields relative to other
1065
// fields in a text index. The score for a given word in a document is derived
1066
// from the weighted sum of the frequency for each of the indexed fields in
1067
// that document. The default field weight is 1.
1068
Weights map[string]int
1071
// mgo.v3: Drop Minf and Maxf and transform Min and Max to floats.
1072
// mgo.v3: Drop DropDups as it's unsupported past 2.8.
1074
type indexKeyInfo struct {
1080
func parseIndexKey(key []string) (*indexKeyInfo, error) {
1081
var keyInfo indexKeyInfo
1083
var order interface{}
1084
for _, field := range key {
1086
if keyInfo.name != "" {
1091
if field[0] == '$' {
1092
if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 {
1095
keyInfo.name += field + "_" + kind
1102
// Logic above failed. Reset and error.
1107
// The shell used to render this field as key_ instead of key_2d,
1108
// and mgo followed suit. This has been fixed in recent server
1109
// releases, and mgo followed as well.
1110
keyInfo.name += field + "_2d"
1114
keyInfo.name += field + "_-1"
1121
keyInfo.name += field + "_1"
1127
if field == "" || kind != "" && order != kind {
1128
return nil, fmt.Errorf(`invalid index key: want "[$<kind>:][-]<field name>", got %q`, raw)
1132
keyInfo.key = append(keyInfo.key, bson.DocElem{"_fts", "text"}, bson.DocElem{"_ftsx", 1})
1135
keyInfo.weights = append(keyInfo.weights, bson.DocElem{field, 1})
1137
keyInfo.key = append(keyInfo.key, bson.DocElem{field, order})
1140
if keyInfo.name == "" {
1141
return nil, errors.New("invalid index key: no fields provided")
1143
return &keyInfo, nil
1146
// EnsureIndexKey ensures an index with the given key exists, creating it
1151
// err := collection.EnsureIndexKey("a", "b")
1153
// Is equivalent to:
1155
// err := collection.EnsureIndex(mgo.Index{Key: []string{"a", "b"}})
1157
// See the EnsureIndex method for more details.
1158
func (c *Collection) EnsureIndexKey(key ...string) error {
1159
return c.EnsureIndex(Index{Key: key})
1162
// EnsureIndex ensures an index with the given key exists, creating it with
1163
// the provided parameters if necessary. EnsureIndex does not modify a previously
1164
// existent index with a matching key. The old index must be dropped first instead.
1166
// Once EnsureIndex returns successfully, following requests for the same index
1167
// will not contact the server unless Collection.DropIndex is used to drop the
1168
// same index, or Session.ResetIndexCache is called.
1173
// Key: []string{"lastname", "firstname"},
1176
// Background: true, // See notes.
1179
// err := collection.EnsureIndex(index)
1181
// The Key value determines which fields compose the index. The index ordering
1182
// will be ascending by default. To obtain an index with a descending order,
1183
// the field name should be prefixed by a dash (e.g. []string{"-time"}). It can
1184
// also be optionally prefixed by an index kind, as in "$text:summary" or
1185
// "$2d:-point". The key string format is:
1187
// [$<kind>:][-]<field name>
1189
// If the Unique field is true, the index must necessarily contain only a single
1190
// document per Key. With DropDups set to true, documents with the same key
1191
// as a previously indexed one will be dropped rather than an error returned.
1193
// If Background is true, other connections will be allowed to proceed using
1194
// the collection without the index while it's being built. Note that the
1195
// session executing EnsureIndex will be blocked for as long as it takes for
1196
// the index to be built.
1198
// If Sparse is true, only documents containing the provided Key fields will be
1199
// included in the index. When using a sparse index for sorting, only indexed
1200
// documents will be returned.
1202
// If ExpireAfter is non-zero, the server will periodically scan the collection
1203
// and remove documents containing an indexed time.Time field with a value
1204
// older than ExpireAfter. See the documentation for details:
1206
// http://docs.mongodb.org/manual/tutorial/expire-data
1208
// Other kinds of indexes are also supported through that API. Here is an example:
1211
// Key: []string{"$2d:loc"},
1214
// err := collection.EnsureIndex(index)
1216
// The example above requests the creation of a "2d" index for the "loc" field.
1218
// The 2D index bounds may be changed using the Min and Max attributes of the
1219
// Index value. The default bound setting of (-180, 180) is suitable for
1220
// latitude/longitude pairs.
1222
// The Bits parameter sets the precision of the 2D geohash values. If not
1223
// provided, 26 bits are used, which is roughly equivalent to 1 foot of
1224
// precision for the default (-180, 180) index bounds.
1226
// Relevant documentation:
1228
// http://www.mongodb.org/display/DOCS/Indexes
1229
// http://www.mongodb.org/display/DOCS/Indexing+Advice+and+FAQ
1230
// http://www.mongodb.org/display/DOCS/Indexing+as+a+Background+Operation
1231
// http://www.mongodb.org/display/DOCS/Geospatial+Indexing
1232
// http://www.mongodb.org/display/DOCS/Multikeys
1234
func (c *Collection) EnsureIndex(index Index) error {
1235
keyInfo, err := parseIndexKey(index.Key)
1240
session := c.Database.Session
1241
cacheKey := c.FullName + "\x00" + keyInfo.name
1242
if session.cluster().HasCachedIndex(cacheKey) {
1250
Unique: index.Unique,
1251
DropDups: index.DropDups,
1252
Background: index.Background,
1253
Sparse: index.Sparse,
1257
BucketSize: index.BucketSize,
1258
ExpireAfter: int(index.ExpireAfter / time.Second),
1259
Weights: keyInfo.weights,
1260
DefaultLanguage: index.DefaultLanguage,
1261
LanguageOverride: index.LanguageOverride,
1264
if spec.Min == 0 && spec.Max == 0 {
1265
spec.Min = float64(index.Min)
1266
spec.Max = float64(index.Max)
1269
if index.Name != "" {
1270
spec.Name = index.Name
1274
for name, weight := range index.Weights {
1275
for i, elem := range spec.Weights {
1276
if elem.Name == name {
1277
spec.Weights[i].Value = weight
1281
panic("weight provided for field that is not part of index key: " + name)
1284
cloned := session.Clone()
1285
defer cloned.Close()
1286
cloned.SetMode(Strong, false)
1287
cloned.EnsureSafe(&Safe{})
1288
db := c.Database.With(cloned)
1290
// Try with a command first.
1291
err = db.Run(bson.D{{"createIndexes", c.Name}, {"indexes", []indexSpec{spec}}}, nil)
1293
// Command not yet supported. Insert into the indexes collection instead.
1294
err = db.C("system.indexes").Insert(&spec)
1297
session.cluster().CacheIndex(cacheKey, true)
1302
// DropIndex drops the index with the provided key from the c collection.
1304
// See EnsureIndex for details on the accepted key variants.
1308
// err1 := collection.DropIndex("firstField", "-secondField")
1309
// err2 := collection.DropIndex("customIndexName")
1311
func (c *Collection) DropIndex(key ...string) error {
1312
keyInfo, err := parseIndexKey(key)
1317
session := c.Database.Session
1318
cacheKey := c.FullName + "\x00" + keyInfo.name
1319
session.cluster().CacheIndex(cacheKey, false)
1321
session = session.Clone()
1322
defer session.Close()
1323
session.SetMode(Strong, false)
1325
db := c.Database.With(session)
1330
err = db.Run(bson.D{{"dropIndexes", c.Name}, {"index", keyInfo.name}}, &result)
1335
return errors.New(result.ErrMsg)
1340
// DropIndexName removes the index with the provided index name.
1344
// err := collection.DropIndex("customIndexName")
1346
func (c *Collection) DropIndexName(name string) error {
1347
session := c.Database.Session
1349
session = session.Clone()
1350
defer session.Close()
1351
session.SetMode(Strong, false)
1355
indexes, err := c.Indexes()
1361
for _, idx := range indexes {
1362
if idx.Name == name {
1368
if index.Name != "" {
1369
keyInfo, err := parseIndexKey(index.Key)
1374
cacheKey := c.FullName + "\x00" + keyInfo.name
1375
session.cluster().CacheIndex(cacheKey, false)
1382
err = c.Database.Run(bson.D{{"dropIndexes", c.Name}, {"index", name}}, &result)
1387
return errors.New(result.ErrMsg)
1392
// nonEventual returns a clone of session and ensures it is not Eventual.
1393
// This guarantees that the server that is used for queries may be reused
1394
// afterwards when a cursor is received.
1395
func (session *Session) nonEventual() *Session {
1396
cloned := session.Clone()
1397
if cloned.consistency == Eventual {
1398
cloned.SetMode(Monotonic, false)
1403
// Indexes returns a list of all indexes for the collection.
1405
// For example, this snippet would drop all available indexes:
1407
// indexes, err := collection.Indexes()
1411
// for _, index := range indexes {
1412
// err = collection.DropIndex(index.Key...)
1418
// See the EnsureIndex method for more details on indexes.
1419
func (c *Collection) Indexes() (indexes []Index, err error) {
1420
cloned := c.Database.Session.nonEventual()
1421
defer cloned.Close()
1423
batchSize := int(cloned.queryConfig.op.limit)
1425
// Try with a command.
1431
err = c.Database.With(cloned).Run(bson.D{{"listIndexes", c.Name}, {"cursor", bson.D{{"batchSize", batchSize}}}}, &result)
1433
firstBatch := result.Indexes
1434
if firstBatch == nil {
1435
firstBatch = result.Cursor.FirstBatch
1437
ns := strings.SplitN(result.Cursor.NS, ".", 2)
1439
iter = c.With(cloned).NewIter(nil, firstBatch, result.Cursor.Id, nil)
1441
iter = cloned.DB(ns[0]).C(ns[1]).NewIter(nil, firstBatch, result.Cursor.Id, nil)
1443
} else if isNoCmd(err) {
1444
// Command not yet supported. Query the database instead.
1445
iter = c.Database.C("system.indexes").Find(bson.M{"ns": c.FullName}).Iter()
1451
for iter.Next(&spec) {
1452
indexes = append(indexes, indexFromSpec(spec))
1454
if err = iter.Close(); err != nil {
1457
sort.Sort(indexSlice(indexes))
1461
func indexFromSpec(spec indexSpec) Index {
1464
Key: simpleIndexKey(spec.Key),
1465
Unique: spec.Unique,
1466
DropDups: spec.DropDups,
1467
Background: spec.Background,
1468
Sparse: spec.Sparse,
1472
BucketSize: spec.BucketSize,
1473
DefaultLanguage: spec.DefaultLanguage,
1474
LanguageOverride: spec.LanguageOverride,
1475
ExpireAfter: time.Duration(spec.ExpireAfter) * time.Second,
1477
if float64(int(spec.Min)) == spec.Min && float64(int(spec.Max)) == spec.Max {
1478
index.Min = int(spec.Min)
1479
index.Max = int(spec.Max)
1481
if spec.TextIndexVersion > 0 {
1482
index.Key = make([]string, len(spec.Weights))
1483
index.Weights = make(map[string]int)
1484
for i, elem := range spec.Weights {
1485
index.Key[i] = "$text:" + elem.Name
1486
if w, ok := elem.Value.(int); ok {
1487
index.Weights[elem.Name] = w
1494
type indexSlice []Index
1496
func (idxs indexSlice) Len() int { return len(idxs) }
1497
func (idxs indexSlice) Less(i, j int) bool { return idxs[i].Name < idxs[j].Name }
1498
func (idxs indexSlice) Swap(i, j int) { idxs[i], idxs[j] = idxs[j], idxs[i] }
1500
func simpleIndexKey(realKey bson.D) (key []string) {
1501
for i := range realKey {
1502
field := realKey[i].Name
1503
vi, ok := realKey[i].Value.(int)
1505
vf, _ := realKey[i].Value.(float64)
1509
key = append(key, field)
1513
key = append(key, "-"+field)
1516
if vs, ok := realKey[i].Value.(string); ok {
1517
key = append(key, "$"+vs+":"+field)
1520
panic("Got unknown index key type for field " + field)
1525
// ResetIndexCache() clears the cache of previously ensured indexes.
1526
// Following requests to EnsureIndex will contact the server.
1527
func (s *Session) ResetIndexCache() {
1528
s.cluster().ResetIndexCache()
1531
// New creates a new session with the same parameters as the original
1532
// session, including consistency, batch size, prefetching, safety mode,
1533
// etc. The returned session will use sockets from the pool, so there's
1534
// a chance that writes just performed in another session may not yet
1537
// Login information from the original session will not be copied over
1538
// into the new session unless it was provided through the initial URL
1539
// for the Dial function.
1541
// See the Copy and Clone methods.
1543
func (s *Session) New() *Session {
1545
scopy := copySession(s, false)
1551
// Copy works just like New, but preserves the exact authentication
1552
// information from the original session.
1553
func (s *Session) Copy() *Session {
1555
scopy := copySession(s, true)
1561
// Clone works just like Copy, but also reuses the same socket as the original
1562
// session, in case it had already reserved one due to its consistency
1563
// guarantees. This behavior ensures that writes performed in the old session
1564
// are necessarily observed when using the new session, as long as it was a
1565
// strong or monotonic session. That said, it also means that long operations
1566
// may cause other goroutines using the original session to wait.
1567
func (s *Session) Clone() *Session {
1569
scopy := copySession(s, true)
1574
// Close terminates the session. It's a runtime error to use a session
1575
// after it has been closed.
1576
func (s *Session) Close() {
1578
if s.cluster_ != nil {
1579
debugf("Closing session %p", s)
1581
s.cluster_.Release()
1587
func (s *Session) cluster() *mongoCluster {
1588
if s.cluster_ == nil {
1589
panic("Session already closed")
1594
// Refresh puts back any reserved sockets in use and restarts the consistency
1595
// guarantees according to the current consistency setting for the session.
1596
func (s *Session) Refresh() {
1598
s.slaveOk = s.consistency != Strong
1603
// SetMode changes the consistency mode for the session.
1605
// In the Strong consistency mode reads and writes will always be made to
1606
// the primary server using a unique connection so that reads and writes are
1607
// fully consistent, ordered, and observing the most up-to-date data.
1608
// This offers the least benefits in terms of distributing load, but the
1609
// most guarantees. See also Monotonic and Eventual.
1611
// In the Monotonic consistency mode reads may not be entirely up-to-date,
1612
// but they will always see the history of changes moving forward, the data
1613
// read will be consistent across sequential queries in the same session,
1614
// and modifications made within the session will be observed in following
1615
// queries (read-your-writes).
1617
// In practice, the Monotonic mode is obtained by performing initial reads
1618
// on a unique connection to an arbitrary secondary, if one is available,
1619
// and once the first write happens, the session connection is switched over
1620
// to the primary server. This manages to distribute some of the reading
1621
// load with secondaries, while maintaining some useful guarantees.
1623
// In the Eventual consistency mode reads will be made to any secondary in the
1624
// cluster, if one is available, and sequential reads will not necessarily
1625
// be made with the same connection. This means that data may be observed
1626
// out of order. Writes will of course be issued to the primary, but
1627
// independent writes in the same Eventual session may also be made with
1628
// independent connections, so there are also no guarantees in terms of
1629
// write ordering (no read-your-writes guarantees either).
1631
// The Eventual mode is the fastest and most resource-friendly, but is
1632
// also the one offering the least guarantees about ordering of the data
1633
// read and written.
1635
// If refresh is true, in addition to ensuring the session is in the given
1636
// consistency mode, the consistency guarantees will also be reset (e.g.
1637
// a Monotonic session will be allowed to read from secondaries again).
1638
// This is equivalent to calling the Refresh function.
1640
// Shifting between Monotonic and Strong modes will keep a previously
1641
// reserved connection for the session unless refresh is true or the
1642
// connection is unsuitable (to a secondary server in a Strong session).
1643
func (s *Session) SetMode(consistency Mode, refresh bool) {
1645
debugf("Session %p: setting mode %d with refresh=%v (master=%p, slave=%p)", s, consistency, refresh, s.masterSocket, s.slaveSocket)
1646
s.consistency = consistency
1648
s.slaveOk = s.consistency != Strong
1650
} else if s.consistency == Strong {
1652
} else if s.masterSocket == nil {
1658
// Mode returns the current consistency mode for the session.
1659
func (s *Session) Mode() Mode {
1661
mode := s.consistency
1666
// SetSyncTimeout sets the amount of time an operation with this session
1667
// will wait before returning an error in case a connection to a usable
1668
// server can't be established. Set it to zero to wait forever. The
1669
// default value is 7 seconds.
1670
func (s *Session) SetSyncTimeout(d time.Duration) {
1676
// SetSocketTimeout sets the amount of time to wait for a non-responding
1677
// socket to the database before it is forcefully closed.
1678
func (s *Session) SetSocketTimeout(d time.Duration) {
1681
if s.masterSocket != nil {
1682
s.masterSocket.SetTimeout(d)
1684
if s.slaveSocket != nil {
1685
s.slaveSocket.SetTimeout(d)
1690
// SetCursorTimeout changes the standard timeout period that the server
1691
// enforces on created cursors. The only supported value right now is
1692
// 0, which disables the timeout. The standard server timeout is 10 minutes.
1693
func (s *Session) SetCursorTimeout(d time.Duration) {
1696
s.queryConfig.op.flags |= flagNoCursorTimeout
1698
panic("SetCursorTimeout: only 0 (disable timeout) supported for now")
1703
// SetPoolLimit sets the maximum number of sockets in use in a single server
1704
// before this session will block waiting for a socket to be available.
1705
// The default limit is 4096.
1707
// This limit must be set to cover more than any expected workload of the
1708
// application. It is a bad practice and an unsupported use case to use the
1709
// database driver to define the concurrency limit of an application. Prevent
1710
// such concurrency "at the door" instead, by properly restricting the amount
1711
// of used resources and number of goroutines before they are created.
1712
func (s *Session) SetPoolLimit(limit int) {
1718
// SetBypassValidation sets whether the server should bypass the registered
1719
// validation expressions executed when documents are inserted or modified,
1720
// in the interest of preserving invariants in the collection being modified.
1721
// The default is to not bypass, and thus to perform the validation
1722
// expressions registered for modified collections.
1724
// Document validation was introuced in MongoDB 3.2.
1726
// Relevant documentation:
1728
// https://docs.mongodb.org/manual/release-notes/3.2/#bypass-validation
1730
func (s *Session) SetBypassValidation(bypass bool) {
1732
s.bypassValidation = bypass
1736
// SetBatch sets the default batch size used when fetching documents from the
1737
// database. It's possible to change this setting on a per-query basis as
1738
// well, using the Query.Batch method.
1740
// The default batch size is defined by the database itself. As of this
1741
// writing, MongoDB will use an initial size of min(100 docs, 4MB) on the
1742
// first batch, and 4MB on remaining ones.
1743
func (s *Session) SetBatch(n int) {
1745
// Server interprets 1 as -1 and closes the cursor (!?)
1749
s.queryConfig.op.limit = int32(n)
1753
// SetPrefetch sets the default point at which the next batch of results will be
1754
// requested. When there are p*batch_size remaining documents cached in an
1755
// Iter, the next batch will be requested in background. For instance, when
1758
// session.SetBatch(200)
1759
// session.SetPrefetch(0.25)
1761
// and there are only 50 documents cached in the Iter to be processed, the
1762
// next batch of 200 will be requested. It's possible to change this setting on
1763
// a per-query basis as well, using the Prefetch method of Query.
1765
// The default prefetch value is 0.25.
1766
func (s *Session) SetPrefetch(p float64) {
1768
s.queryConfig.prefetch = p
1772
// See SetSafe for details on the Safe type.
1774
W int // Min # of servers to ack before success
1775
WMode string // Write mode for MongoDB 2.0+ (e.g. "majority")
1776
WTimeout int // Milliseconds to wait for W before timing out
1777
FSync bool // Sync via the journal if present, or via data files sync otherwise
1778
J bool // Sync via the journal if present
1781
// Safe returns the current safety mode for the session.
1782
func (s *Session) Safe() (safe *Safe) {
1785
if s.safeOp != nil {
1786
cmd := s.safeOp.query.(*getLastError)
1787
safe = &Safe{WTimeout: cmd.WTimeout, FSync: cmd.FSync, J: cmd.J}
1788
switch w := cmd.W.(type) {
1798
// SetSafe changes the session safety mode.
1800
// If the safe parameter is nil, the session is put in unsafe mode, and writes
1801
// become fire-and-forget, without error checking. The unsafe mode is faster
1802
// since operations won't hold on waiting for a confirmation.
1804
// If the safe parameter is not nil, any changing query (insert, update, ...)
1805
// will be followed by a getLastError command with the specified parameters,
1806
// to ensure the request was correctly processed.
1808
// The safe.W parameter determines how many servers should confirm a write
1809
// before the operation is considered successful. If set to 0 or 1, the
1810
// command will return as soon as the primary is done with the request.
1811
// If safe.WTimeout is greater than zero, it determines how many milliseconds
1812
// to wait for the safe.W servers to respond before returning an error.
1814
// Starting with MongoDB 2.0.0 the safe.WMode parameter can be used instead
1815
// of W to request for richer semantics. If set to "majority" the server will
1816
// wait for a majority of members from the replica set to respond before
1817
// returning. Custom modes may also be defined within the server to create
1818
// very detailed placement schemas. See the data awareness documentation in
1819
// the links below for more details (note that MongoDB internally reuses the
1820
// "w" field name for WMode).
1822
// If safe.J is true, servers will block until write operations have been
1823
// committed to the journal. Cannot be used in combination with FSync. Prior
1824
// to MongoDB 2.6 this option was ignored if the server was running without
1825
// journaling. Starting with MongoDB 2.6 write operations will fail with an
1826
// exception if this option is used when the server is running without
1829
// If safe.FSync is true and the server is running without journaling, blocks
1830
// until the server has synced all data files to disk. If the server is running
1831
// with journaling, this acts the same as the J option, blocking until write
1832
// operations have been committed to the journal. Cannot be used in
1833
// combination with J.
1835
// Since MongoDB 2.0.0, the safe.J option can also be used instead of FSync
1836
// to force the server to wait for a group commit in case journaling is
1837
// enabled. The option has no effect if the server has journaling disabled.
1839
// For example, the following statement will make the session check for
1840
// errors, without imposing further constraints:
1842
// session.SetSafe(&mgo.Safe{})
1844
// The following statement will force the server to wait for a majority of
1845
// members of a replica set to return (MongoDB 2.0+ only):
1847
// session.SetSafe(&mgo.Safe{WMode: "majority"})
1849
// The following statement, on the other hand, ensures that at least two
1850
// servers have flushed the change to disk before confirming the success
1853
// session.EnsureSafe(&mgo.Safe{W: 2, FSync: true})
1855
// The following statement, on the other hand, disables the verification
1856
// of errors entirely:
1858
// session.SetSafe(nil)
1860
// See also the EnsureSafe method.
1862
// Relevant documentation:
1864
// http://www.mongodb.org/display/DOCS/getLastError+Command
1865
// http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
1866
// http://www.mongodb.org/display/DOCS/Data+Center+Awareness
1868
func (s *Session) SetSafe(safe *Safe) {
1875
// EnsureSafe compares the provided safety parameters with the ones
1876
// currently in use by the session and picks the most conservative
1877
// choice for each setting.
1881
// - safe.WMode is always used if set.
1882
// - safe.W is used if larger than the current W and WMode is empty.
1883
// - safe.FSync is always used if true.
1884
// - safe.J is used if FSync is false.
1885
// - safe.WTimeout is used if set and smaller than the current WTimeout.
1887
// For example, the following statement will ensure the session is
1888
// at least checking for errors, without enforcing further constraints.
1889
// If a more conservative SetSafe or EnsureSafe call was previously done,
1890
// the following call will be ignored.
1892
// session.EnsureSafe(&mgo.Safe{})
1894
// See also the SetSafe method for details on what each option means.
1896
// Relevant documentation:
1898
// http://www.mongodb.org/display/DOCS/getLastError+Command
1899
// http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
1900
// http://www.mongodb.org/display/DOCS/Data+Center+Awareness
1902
func (s *Session) EnsureSafe(safe *Safe) {
1908
func (s *Session) ensureSafe(safe *Safe) {
1914
if safe.WMode != "" {
1916
} else if safe.W > 0 {
1920
var cmd getLastError
1921
if s.safeOp == nil {
1922
cmd = getLastError{1, w, safe.WTimeout, safe.FSync, safe.J}
1924
// Copy. We don't want to mutate the existing query.
1925
cmd = *(s.safeOp.query.(*getLastError))
1928
} else if safe.WMode != "" {
1930
} else if i, ok := cmd.W.(int); ok && safe.W > i {
1933
if safe.WTimeout > 0 && safe.WTimeout < cmd.WTimeout {
1934
cmd.WTimeout = safe.WTimeout
1939
} else if safe.J && !cmd.FSync {
1943
s.safeOp = &queryOp{
1945
collection: "admin.$cmd",
1950
// Run issues the provided command on the "admin" database and
1951
// and unmarshals its result in the respective argument. The cmd
1952
// argument may be either a string with the command name itself, in
1953
// which case an empty document of the form bson.M{cmd: 1} will be used,
1954
// or it may be a full command document.
1956
// Note that MongoDB considers the first marshalled key as the command
1957
// name, so when providing a command with options, it's important to
1958
// use an ordering-preserving document, such as a struct value or an
1959
// instance of bson.D. For instance:
1961
// db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
1963
// For commands on arbitrary databases, see the Run method in
1964
// the Database type.
1966
// Relevant documentation:
1968
// http://www.mongodb.org/display/DOCS/Commands
1969
// http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
1971
func (s *Session) Run(cmd interface{}, result interface{}) error {
1972
return s.DB("admin").Run(cmd, result)
1975
// SelectServers restricts communication to servers configured with the
1976
// given tags. For example, the following statement restricts servers
1977
// used for reading operations to those with both tag "disk" set to
1978
// "ssd" and tag "rack" set to 1:
1980
// session.SelectServers(bson.D{{"disk", "ssd"}, {"rack", 1}})
1982
// Multiple sets of tags may be provided, in which case the used server
1983
// must match all tags within any one set.
1985
// If a connection was previously assigned to the session due to the
1986
// current session mode (see Session.SetMode), the tag selection will
1987
// only be enforced after the session is refreshed.
1989
// Relevant documentation:
1991
// http://docs.mongodb.org/manual/tutorial/configure-replica-set-tag-sets
1993
func (s *Session) SelectServers(tags ...bson.D) {
1995
s.queryConfig.op.serverTags = tags
1999
// Ping runs a trivial ping command just to get in touch with the server.
2000
func (s *Session) Ping() error {
2001
return s.Run("ping", nil)
2004
// Fsync flushes in-memory writes to disk on the server the session
2005
// is established with. If async is true, the call returns immediately,
2006
// otherwise it returns after the flush has been made.
2007
func (s *Session) Fsync(async bool) error {
2008
return s.Run(bson.D{{"fsync", 1}, {"async", async}}, nil)
2011
// FsyncLock locks all writes in the specific server the session is
2012
// established with and returns. Any writes attempted to the server
2013
// after it is successfully locked will block until FsyncUnlock is
2014
// called for the same server.
2016
// This method works on secondaries as well, preventing the oplog from
2017
// being flushed while the server is locked, but since only the server
2018
// connected to is locked, for locking specific secondaries it may be
2019
// necessary to establish a connection directly to the secondary (see
2020
// Dial's connect=direct option).
2022
// As an important caveat, note that once a write is attempted and
2023
// blocks, follow up reads will block as well due to the way the
2024
// lock is internally implemented in the server. More details at:
2026
// https://jira.mongodb.org/browse/SERVER-4243
2028
// FsyncLock is often used for performing consistent backups of
2029
// the database files on disk.
2031
// Relevant documentation:
2033
// http://www.mongodb.org/display/DOCS/fsync+Command
2034
// http://www.mongodb.org/display/DOCS/Backups
2036
func (s *Session) FsyncLock() error {
2037
return s.Run(bson.D{{"fsync", 1}, {"lock", true}}, nil)
2040
// FsyncUnlock releases the server for writes. See FsyncLock for details.
2041
func (s *Session) FsyncUnlock() error {
2042
err := s.Run(bson.D{{"fsyncUnlock", 1}}, nil)
2044
err = s.DB("admin").C("$cmd.sys.unlock").Find(nil).One(nil) // WTF?
2049
// Find prepares a query using the provided document. The document may be a
2050
// map or a struct value capable of being marshalled with bson. The map
2051
// may be a generic one using interface{} for its key and/or values, such as
2052
// bson.M, or it may be a properly typed map. Providing nil as the document
2053
// is equivalent to providing an empty document such as bson.M{}.
2055
// Further details of the query may be tweaked using the resulting Query value,
2056
// and then executed to retrieve results using methods such as One, For,
2059
// In case the resulting document includes a field named $err or errmsg, which
2060
// are standard ways for MongoDB to return query errors, the returned err will
2061
// be set to a *QueryError value including the Err message and the Code. In
2062
// those cases, the result argument is still unmarshalled into with the
2063
// received document so that any other custom values may be obtained if
2066
// Relevant documentation:
2068
// http://www.mongodb.org/display/DOCS/Querying
2069
// http://www.mongodb.org/display/DOCS/Advanced+Queries
2071
func (c *Collection) Find(query interface{}) *Query {
2072
session := c.Database.Session
2074
q := &Query{session: session, query: session.queryConfig}
2077
q.op.collection = c.FullName
2081
type repairCmd struct {
2082
RepairCursor string `bson:"repairCursor"`
2083
Cursor *repairCmdCursor ",omitempty"
2086
type repairCmdCursor struct {
2087
BatchSize int `bson:"batchSize,omitempty"`
2090
// Repair returns an iterator that goes over all recovered documents in the
2091
// collection, in a best-effort manner. This is most useful when there are
2092
// damaged data files. Multiple copies of the same document may be returned
2095
// Repair is supported in MongoDB 2.7.8 and later.
2096
func (c *Collection) Repair() *Iter {
2097
// Clone session and set it to Monotonic mode so that the server
2098
// used for the query may be safely obtained afterwards, if
2099
// necessary for iteration when a cursor is received.
2100
session := c.Database.Session
2101
cloned := session.nonEventual()
2102
defer cloned.Close()
2104
batchSize := int(cloned.queryConfig.op.limit)
2106
var result struct{ Cursor cursorData }
2109
RepairCursor: c.Name,
2110
Cursor: &repairCmdCursor{batchSize},
2113
clonedc := c.With(cloned)
2114
err := clonedc.Database.Run(cmd, &result)
2115
return clonedc.NewIter(session, result.Cursor.FirstBatch, result.Cursor.Id, err)
2118
// FindId is a convenience helper equivalent to:
2120
// query := collection.Find(bson.M{"_id": id})
2122
// See the Find method for more details.
2123
func (c *Collection) FindId(id interface{}) *Query {
2124
return c.Find(bson.D{{"_id", id}})
2129
collection *Collection
2130
pipeline interface{}
2135
type pipeCmd struct {
2137
Pipeline interface{}
2138
Cursor *pipeCmdCursor ",omitempty"
2139
Explain bool ",omitempty"
2140
AllowDisk bool "allowDiskUse,omitempty"
2143
type pipeCmdCursor struct {
2144
BatchSize int `bson:"batchSize,omitempty"`
2147
// Pipe prepares a pipeline to aggregate. The pipeline document
2148
// must be a slice built in terms of the aggregation framework language.
2152
// pipe := collection.Pipe([]bson.M{{"$match": bson.M{"name": "Otavio"}}})
2153
// iter := pipe.Iter()
2155
// Relevant documentation:
2157
// http://docs.mongodb.org/manual/reference/aggregation
2158
// http://docs.mongodb.org/manual/applications/aggregation
2159
// http://docs.mongodb.org/manual/tutorial/aggregation-examples
2161
func (c *Collection) Pipe(pipeline interface{}) *Pipe {
2162
session := c.Database.Session
2164
batchSize := int(session.queryConfig.op.limit)
2170
batchSize: batchSize,
2174
// Iter executes the pipeline and returns an iterator capable of going
2175
// over all the generated results.
2176
func (p *Pipe) Iter() *Iter {
2177
// Clone session and set it to Monotonic mode so that the server
2178
// used for the query may be safely obtained afterwards, if
2179
// necessary for iteration when a cursor is received.
2180
cloned := p.session.nonEventual()
2181
defer cloned.Close()
2182
c := p.collection.With(cloned)
2185
Result []bson.Raw // 2.4, no cursors.
2186
Cursor cursorData // 2.6+, with cursors.
2191
Pipeline: p.pipeline,
2192
AllowDisk: p.allowDisk,
2193
Cursor: &pipeCmdCursor{p.batchSize},
2195
err := c.Database.Run(cmd, &result)
2196
if e, ok := err.(*QueryError); ok && e.Message == `unrecognized field "cursor` {
2198
cmd.AllowDisk = false
2199
err = c.Database.Run(cmd, &result)
2201
firstBatch := result.Result
2202
if firstBatch == nil {
2203
firstBatch = result.Cursor.FirstBatch
2205
return c.NewIter(p.session, firstBatch, result.Cursor.Id, err)
2208
// NewIter returns a newly created iterator with the provided parameters.
2209
// Using this method is not recommended unless the desired functionality
2210
// is not yet exposed via a more convenient interface (Find, Pipe, etc).
2212
// The optional session parameter associates the lifetime of the returned
2213
// iterator to an arbitrary session. If nil, the iterator will be bound to
2216
// Documents in firstBatch will be individually provided by the returned
2217
// iterator before documents from cursorId are made available. If cursorId
2218
// is zero, only the documents in firstBatch are provided.
2220
// If err is not nil, the iterator's Err method will report it after
2221
// exhausting documents in firstBatch.
2223
// NewIter must be called right after the cursor id is obtained, and must not
2224
// be called on a collection in Eventual mode, because the cursor id is
2225
// associated with the specific server that returned it. The provided session
2226
// parameter may be in any mode or state, though.
2228
func (c *Collection) NewIter(session *Session, firstBatch []bson.Raw, cursorId int64, err error) *Iter {
2229
var server *mongoServer
2230
csession := c.Database.Session
2232
socket := csession.masterSocket
2234
socket = csession.slaveSocket
2237
server = socket.Server()
2239
csession.m.RUnlock()
2242
if csession.Mode() == Eventual {
2243
panic("Collection.NewIter called in Eventual mode")
2246
err = errors.New("server not available")
2260
iter.gotReply.L = &iter.m
2261
for _, doc := range firstBatch {
2262
iter.docData.Push(doc.Data)
2265
iter.op.cursorId = cursorId
2266
iter.op.collection = c.FullName
2267
iter.op.replyFunc = iter.replyFunc()
2272
// All works like Iter.All.
2273
func (p *Pipe) All(result interface{}) error {
2274
return p.Iter().All(result)
2277
// One executes the pipeline and unmarshals the first item from the
2278
// result set into the result parameter.
2279
// It returns ErrNotFound if no items are generated by the pipeline.
2280
func (p *Pipe) One(result interface{}) error {
2282
if iter.Next(result) {
2285
if err := iter.Err(); err != nil {
2291
// Explain returns a number of details about how the MongoDB server would
2292
// execute the requested pipeline, such as the number of objects examined,
2293
// the number of times the read lock was yielded to allow writes to go in,
2299
// err := collection.Pipe(pipeline).Explain(&m)
2301
// fmt.Printf("Explain: %#v\n", m)
2304
func (p *Pipe) Explain(result interface{}) error {
2308
Pipeline: p.pipeline,
2309
AllowDisk: p.allowDisk,
2312
return c.Database.Run(cmd, result)
2315
// AllowDiskUse enables writing to the "<dbpath>/_tmp" server directory so
2316
// that aggregation pipelines do not have to be held entirely in memory.
2317
func (p *Pipe) AllowDiskUse() *Pipe {
2322
// Batch sets the batch size used when fetching documents from the database.
2323
// It's possible to change this setting on a per-session basis as well, using
2324
// the Batch method of Session.
2326
// The default batch size is defined by the database server.
2327
func (p *Pipe) Batch(n int) *Pipe {
2332
// mgo.v3: Use a single user-visible error type.
2334
type LastError struct {
2337
FSyncFiles int `bson:"fsyncFiles"`
2339
UpdatedExisting bool `bson:"updatedExisting"`
2340
UpsertedId interface{} `bson:"upserted"`
2343
ecases []BulkErrorCase
2346
func (err *LastError) Error() string {
2350
type queryError struct {
2355
AssertionCode int "assertionCode"
2356
LastError *LastError "lastErrorObject"
2359
type QueryError struct {
2365
func (err *QueryError) Error() string {
2369
// IsDup returns whether err informs of a duplicate key error because
2370
// a primary key index or a secondary unique index already has an entry
2371
// with the given value.
2372
func IsDup(err error) bool {
2373
// Besides being handy, helps with MongoDB bugs SERVER-7164 and SERVER-11493.
2374
// What follows makes me sad. Hopefully conventions will be more clear over time.
2375
switch e := err.(type) {
2377
return e.Code == 11000 || e.Code == 11001 || e.Code == 12582 || e.Code == 16460 && strings.Contains(e.Err, " E11000 ")
2379
return e.Code == 11000 || e.Code == 11001 || e.Code == 12582
2381
for _, ecase := range e.ecases {
2382
if !IsDup(ecase.Err) {
2391
// Insert inserts one or more documents in the respective collection. In
2392
// case the session is in safe mode (see the SetSafe method) and an error
2393
// happens while inserting the provided documents, the returned error will
2394
// be of type *LastError.
2395
func (c *Collection) Insert(docs ...interface{}) error {
2396
_, err := c.writeOp(&insertOp{c.FullName, docs, 0}, true)
2400
// Update finds a single document matching the provided selector document
2401
// and modifies it according to the update document.
2402
// If the session is in safe mode (see SetSafe) a ErrNotFound error is
2403
// returned if a document isn't found, or a value of type *LastError
2404
// when some other error is detected.
2406
// Relevant documentation:
2408
// http://www.mongodb.org/display/DOCS/Updating
2409
// http://www.mongodb.org/display/DOCS/Atomic+Operations
2411
func (c *Collection) Update(selector interface{}, update interface{}) error {
2412
if selector == nil {
2416
Collection: c.FullName,
2420
lerr, err := c.writeOp(&op, true)
2421
if err == nil && lerr != nil && !lerr.UpdatedExisting {
2427
// UpdateId is a convenience helper equivalent to:
2429
// err := collection.Update(bson.M{"_id": id}, update)
2431
// See the Update method for more details.
2432
func (c *Collection) UpdateId(id interface{}, update interface{}) error {
2433
return c.Update(bson.D{{"_id", id}}, update)
2436
// ChangeInfo holds details about the outcome of an update operation.
2437
type ChangeInfo struct {
2438
// Updated reports the number of existing documents modified.
2439
// Due to server limitations, this reports the same value as the Matched field when
2440
// talking to MongoDB <= 2.4 and on Upsert and Apply (findAndModify) operations.
2442
Removed int // Number of documents removed
2443
Matched int // Number of documents matched but not necessarily changed
2444
UpsertedId interface{} // Upserted _id field, when not explicitly provided
2447
// UpdateAll finds all documents matching the provided selector document
2448
// and modifies them according to the update document.
2449
// If the session is in safe mode (see SetSafe) details of the executed
2450
// operation are returned in info or an error of type *LastError when
2451
// some problem is detected. It is not an error for the update to not be
2452
// applied on any documents because the selector doesn't match.
2454
// Relevant documentation:
2456
// http://www.mongodb.org/display/DOCS/Updating
2457
// http://www.mongodb.org/display/DOCS/Atomic+Operations
2459
func (c *Collection) UpdateAll(selector interface{}, update interface{}) (info *ChangeInfo, err error) {
2460
if selector == nil {
2464
Collection: c.FullName,
2470
lerr, err := c.writeOp(&op, true)
2471
if err == nil && lerr != nil {
2472
info = &ChangeInfo{Updated: lerr.modified, Matched: lerr.N}
2477
// Upsert finds a single document matching the provided selector document
2478
// and modifies it according to the update document. If no document matching
2479
// the selector is found, the update document is applied to the selector
2480
// document and the result is inserted in the collection.
2481
// If the session is in safe mode (see SetSafe) details of the executed
2482
// operation are returned in info, or an error of type *LastError when
2483
// some problem is detected.
2485
// Relevant documentation:
2487
// http://www.mongodb.org/display/DOCS/Updating
2488
// http://www.mongodb.org/display/DOCS/Atomic+Operations
2490
func (c *Collection) Upsert(selector interface{}, update interface{}) (info *ChangeInfo, err error) {
2491
if selector == nil {
2495
Collection: c.FullName,
2502
// <= to allow for the first attempt (not a retry).
2503
for i := 0; i <= maxUpsertRetries; i++ {
2504
lerr, err = c.writeOp(&op, true)
2505
// Retry duplicate key errors on upserts.
2506
// https://docs.mongodb.com/v3.2/reference/method/db.collection.update/#use-unique-indexes
2511
if err == nil && lerr != nil {
2512
info = &ChangeInfo{}
2513
if lerr.UpdatedExisting {
2514
info.Matched = lerr.N
2515
info.Updated = lerr.modified
2517
info.UpsertedId = lerr.UpsertedId
2523
// UpsertId is a convenience helper equivalent to:
2525
// info, err := collection.Upsert(bson.M{"_id": id}, update)
2527
// See the Upsert method for more details.
2528
func (c *Collection) UpsertId(id interface{}, update interface{}) (info *ChangeInfo, err error) {
2529
return c.Upsert(bson.D{{"_id", id}}, update)
2532
// Remove finds a single document matching the provided selector document
2533
// and removes it from the database.
2534
// If the session is in safe mode (see SetSafe) a ErrNotFound error is
2535
// returned if a document isn't found, or a value of type *LastError
2536
// when some other error is detected.
2538
// Relevant documentation:
2540
// http://www.mongodb.org/display/DOCS/Removing
2542
func (c *Collection) Remove(selector interface{}) error {
2543
if selector == nil {
2546
lerr, err := c.writeOp(&deleteOp{c.FullName, selector, 1, 1}, true)
2547
if err == nil && lerr != nil && lerr.N == 0 {
2553
// RemoveId is a convenience helper equivalent to:
2555
// err := collection.Remove(bson.M{"_id": id})
2557
// See the Remove method for more details.
2558
func (c *Collection) RemoveId(id interface{}) error {
2559
return c.Remove(bson.D{{"_id", id}})
2562
// RemoveAll finds all documents matching the provided selector document
2563
// and removes them from the database. In case the session is in safe mode
2564
// (see the SetSafe method) and an error happens when attempting the change,
2565
// the returned error will be of type *LastError.
2567
// Relevant documentation:
2569
// http://www.mongodb.org/display/DOCS/Removing
2571
func (c *Collection) RemoveAll(selector interface{}) (info *ChangeInfo, err error) {
2572
if selector == nil {
2575
lerr, err := c.writeOp(&deleteOp{c.FullName, selector, 0, 0}, true)
2576
if err == nil && lerr != nil {
2577
info = &ChangeInfo{Removed: lerr.N, Matched: lerr.N}
2582
// DropDatabase removes the entire database including all of its collections.
2583
func (db *Database) DropDatabase() error {
2584
return db.Run(bson.D{{"dropDatabase", 1}}, nil)
2587
// DropCollection removes the entire collection including all of its documents.
2588
func (c *Collection) DropCollection() error {
2589
return c.Database.Run(bson.D{{"drop", c.Name}}, nil)
2592
// The CollectionInfo type holds metadata about a collection.
2594
// Relevant documentation:
2596
// http://www.mongodb.org/display/DOCS/createCollection+Command
2597
// http://www.mongodb.org/display/DOCS/Capped+Collections
2599
type CollectionInfo struct {
2600
// DisableIdIndex prevents the automatic creation of the index
2601
// on the _id field for the collection.
2604
// ForceIdIndex enforces the automatic creation of the index
2605
// on the _id field for the collection. Capped collections,
2606
// for example, do not have such an index by default.
2609
// If Capped is true new documents will replace old ones when
2610
// the collection is full. MaxBytes must necessarily be set
2611
// to define the size when the collection wraps around.
2612
// MaxDocs optionally defines the number of documents when it
2613
// wraps, but MaxBytes still needs to be set.
2618
// Validator contains a validation expression that defines which
2619
// documents should be considered valid for this collection.
2620
Validator interface{}
2622
// ValidationLevel may be set to "strict" (the default) to force
2623
// MongoDB to validate all documents on inserts and updates, to
2624
// "moderate" to apply the validation rules only to documents
2625
// that already fulfill the validation criteria, or to "off" for
2626
// disabling validation entirely.
2627
ValidationLevel string
2629
// ValidationAction determines how MongoDB handles documents that
2630
// violate the validation rules. It may be set to "error" (the default)
2631
// to reject inserts or updates that violate the rules, or to "warn"
2632
// to log invalid operations but allow them to proceed.
2633
ValidationAction string
2635
// StorageEngine allows specifying collection options for the
2636
// storage engine in use. The map keys must hold the storage engine
2637
// name for which options are being specified.
2638
StorageEngine interface{}
2641
// Create explicitly creates the c collection with details of info.
2642
// MongoDB creates collections automatically on use, so this method
2643
// is only necessary when creating collection with non-default
2644
// characteristics, such as capped collections.
2646
// Relevant documentation:
2648
// http://www.mongodb.org/display/DOCS/createCollection+Command
2649
// http://www.mongodb.org/display/DOCS/Capped+Collections
2651
func (c *Collection) Create(info *CollectionInfo) error {
2652
cmd := make(bson.D, 0, 4)
2653
cmd = append(cmd, bson.DocElem{"create", c.Name})
2655
if info.MaxBytes < 1 {
2656
return fmt.Errorf("Collection.Create: with Capped, MaxBytes must also be set")
2658
cmd = append(cmd, bson.DocElem{"capped", true})
2659
cmd = append(cmd, bson.DocElem{"size", info.MaxBytes})
2660
if info.MaxDocs > 0 {
2661
cmd = append(cmd, bson.DocElem{"max", info.MaxDocs})
2664
if info.DisableIdIndex {
2665
cmd = append(cmd, bson.DocElem{"autoIndexId", false})
2667
if info.ForceIdIndex {
2668
cmd = append(cmd, bson.DocElem{"autoIndexId", true})
2670
if info.Validator != nil {
2671
cmd = append(cmd, bson.DocElem{"validator", info.Validator})
2673
if info.ValidationLevel != "" {
2674
cmd = append(cmd, bson.DocElem{"validationLevel", info.ValidationLevel})
2676
if info.ValidationAction != "" {
2677
cmd = append(cmd, bson.DocElem{"validationAction", info.ValidationAction})
2679
if info.StorageEngine != nil {
2680
cmd = append(cmd, bson.DocElem{"storageEngine", info.StorageEngine})
2682
return c.Database.Run(cmd, nil)
2685
// Batch sets the batch size used when fetching documents from the database.
2686
// It's possible to change this setting on a per-session basis as well, using
2687
// the Batch method of Session.
2689
// The default batch size is defined by the database itself. As of this
2690
// writing, MongoDB will use an initial size of min(100 docs, 4MB) on the
2691
// first batch, and 4MB on remaining ones.
2692
func (q *Query) Batch(n int) *Query {
2694
// Server interprets 1 as -1 and closes the cursor (!?)
2698
q.op.limit = int32(n)
2703
// Prefetch sets the point at which the next batch of results will be requested.
2704
// When there are p*batch_size remaining documents cached in an Iter, the next
2705
// batch will be requested in background. For instance, when using this:
2707
// query.Batch(200).Prefetch(0.25)
2709
// and there are only 50 documents cached in the Iter to be processed, the
2710
// next batch of 200 will be requested. It's possible to change this setting on
2711
// a per-session basis as well, using the SetPrefetch method of Session.
2713
// The default prefetch value is 0.25.
2714
func (q *Query) Prefetch(p float64) *Query {
2721
// Skip skips over the n initial documents from the query results. Note that
2722
// this only makes sense with capped collections where documents are naturally
2723
// ordered by insertion time, or with sorted results.
2724
func (q *Query) Skip(n int) *Query {
2726
q.op.skip = int32(n)
2731
// Limit restricts the maximum number of documents retrieved to n, and also
2732
// changes the batch size to the same value. Once n documents have been
2733
// returned by Next, the following call will return ErrNotFound.
2734
func (q *Query) Limit(n int) *Query {
2740
case n == math.MinInt32: // -MinInt32 == -MinInt32
2741
q.limit = math.MaxInt32
2742
q.op.limit = math.MinInt32 + 1
2745
q.op.limit = int32(n)
2748
q.op.limit = int32(n)
2754
// Select enables selecting which fields should be retrieved for the results
2755
// found. For example, the following query would only retrieve the name field:
2757
// err := collection.Find(nil).Select(bson.M{"name": 1}).One(&result)
2759
// Relevant documentation:
2761
// http://www.mongodb.org/display/DOCS/Retrieving+a+Subset+of+Fields
2763
func (q *Query) Select(selector interface{}) *Query {
2765
q.op.selector = selector
2770
// Sort asks the database to order returned documents according to the
2771
// provided field names. A field name may be prefixed by - (minus) for
2772
// it to be sorted in reverse order.
2776
// query1 := collection.Find(nil).Sort("firstname", "lastname")
2777
// query2 := collection.Find(nil).Sort("-age")
2778
// query3 := collection.Find(nil).Sort("$natural")
2779
// query4 := collection.Find(nil).Select(bson.M{"score": bson.M{"$meta": "textScore"}}).Sort("$textScore:score")
2781
// Relevant documentation:
2783
// http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
2785
func (q *Query) Sort(fields ...string) *Query {
2788
for _, field := range fields {
2792
if field[0] == '$' {
2793
if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 {
2807
panic("Sort: empty field name")
2809
if kind == "textScore" {
2810
order = append(order, bson.DocElem{field, bson.M{"$meta": kind}})
2812
order = append(order, bson.DocElem{field, n})
2815
q.op.options.OrderBy = order
2816
q.op.hasOptions = true
2821
// Explain returns a number of details about how the MongoDB server would
2822
// execute the requested query, such as the number of objects examined,
2823
// the number of times the read lock was yielded to allow writes to go in,
2829
// err := collection.Find(bson.M{"filename": name}).Explain(m)
2831
// fmt.Printf("Explain: %#v\n", m)
2834
// Relevant documentation:
2836
// http://www.mongodb.org/display/DOCS/Optimization
2837
// http://www.mongodb.org/display/DOCS/Query+Optimizer
2839
func (q *Query) Explain(result interface{}) error {
2841
clone := &Query{session: q.session, query: q.query}
2843
clone.op.options.Explain = true
2844
clone.op.hasOptions = true
2845
if clone.op.limit > 0 {
2846
clone.op.limit = -q.op.limit
2848
iter := clone.Iter()
2849
if iter.Next(result) {
2855
// TODO: Add Collection.Explain. See https://goo.gl/1MDlvz.
2857
// Hint will include an explicit "hint" in the query to force the server
2858
// to use a specified index, potentially improving performance in some
2859
// situations. The provided parameters are the fields that compose the
2860
// key of the index to be used. For details on how the indexKey may be
2861
// built, see the EnsureIndex method.
2865
// query := collection.Find(bson.M{"firstname": "Joe", "lastname": "Winter"})
2866
// query.Hint("lastname", "firstname")
2868
// Relevant documentation:
2870
// http://www.mongodb.org/display/DOCS/Optimization
2871
// http://www.mongodb.org/display/DOCS/Query+Optimizer
2873
func (q *Query) Hint(indexKey ...string) *Query {
2875
keyInfo, err := parseIndexKey(indexKey)
2876
q.op.options.Hint = keyInfo.key
2877
q.op.hasOptions = true
2885
// SetMaxScan constrains the query to stop after scanning the specified
2886
// number of documents.
2888
// This modifier is generally used to prevent potentially long running
2889
// queries from disrupting performance by scanning through too much data.
2890
func (q *Query) SetMaxScan(n int) *Query {
2892
q.op.options.MaxScan = n
2893
q.op.hasOptions = true
2898
// SetMaxTime constrains the query to stop after running for the specified time.
2900
// When the time limit is reached MongoDB automatically cancels the query.
2901
// This can be used to efficiently prevent and identify unexpectedly slow queries.
2903
// A few important notes about the mechanism enforcing this limit:
2905
// - Requests can block behind locking operations on the server, and that blocking
2906
// time is not accounted for. In other words, the timer starts ticking only after
2907
// the actual start of the query when it initially acquires the appropriate lock;
2909
// - Operations are interrupted only at interrupt points where an operation can be
2910
// safely aborted – the total execution time may exceed the specified value;
2912
// - The limit can be applied to both CRUD operations and commands, but not all
2913
// commands are interruptible;
2915
// - While iterating over results, computing follow up batches is included in the
2916
// total time and the iteration continues until the alloted time is over, but
2917
// network roundtrips are not taken into account for the limit.
2919
// - This limit does not override the inactive cursor timeout for idle cursors
2920
// (default is 10 min).
2922
// This mechanism was introduced in MongoDB 2.6.
2924
// Relevant documentation:
2926
// http://blog.mongodb.org/post/83621787773/maxtimems-and-query-optimizer-introspection-in
2928
func (q *Query) SetMaxTime(d time.Duration) *Query {
2930
q.op.options.MaxTimeMS = int(d / time.Millisecond)
2931
q.op.hasOptions = true
2936
// Snapshot will force the performed query to make use of an available
2937
// index on the _id field to prevent the same document from being returned
2938
// more than once in a single iteration. This might happen without this
2939
// setting in situations when the document changes in size and thus has to
2940
// be moved while the iteration is running.
2942
// Because snapshot mode traverses the _id index, it may not be used with
2943
// sorting or explicit hints. It also cannot use any other index for the
2946
// Even with snapshot mode, items inserted or deleted during the query may
2947
// or may not be returned; that is, this mode is not a true point-in-time
2950
// The same effect of Snapshot may be obtained by using any unique index on
2951
// field(s) that will not be modified (best to use Hint explicitly too).
2952
// A non-unique index (such as creation time) may be made unique by
2953
// appending _id to the index when creating it.
2955
// Relevant documentation:
2957
// http://www.mongodb.org/display/DOCS/How+to+do+Snapshotted+Queries+in+the+Mongo+Database
2959
func (q *Query) Snapshot() *Query {
2961
q.op.options.Snapshot = true
2962
q.op.hasOptions = true
2967
// Comment adds a comment to the query to identify it in the database profiler output.
2969
// Relevant documentation:
2971
// http://docs.mongodb.org/manual/reference/operator/meta/comment
2972
// http://docs.mongodb.org/manual/reference/command/profile
2973
// http://docs.mongodb.org/manual/administration/analyzing-mongodb-performance/#database-profiling
2975
func (q *Query) Comment(comment string) *Query {
2977
q.op.options.Comment = comment
2978
q.op.hasOptions = true
2983
// LogReplay enables an option that optimizes queries that are typically
2984
// made on the MongoDB oplog for replaying it. This is an internal
2985
// implementation aspect and most likely uninteresting for other uses.
2986
// It has seen at least one use case, though, so it's exposed via the API.
2987
func (q *Query) LogReplay() *Query {
2989
q.op.flags |= flagLogReplay
2994
func checkQueryError(fullname string, d []byte) error {
2999
if d[5] == '$' && d[6] == 'e' && d[7] == 'r' && d[8] == 'r' && d[9] == '\x00' && d[4] == '\x02' {
3002
if len(fullname) < 5 || fullname[len(fullname)-5:] != ".$cmd" {
3005
for i := 0; i+8 < l; i++ {
3006
if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' {
3013
result := &queryError{}
3014
bson.Unmarshal(d, result)
3015
if result.LastError != nil {
3016
return result.LastError
3018
if result.Err == "" && result.ErrMsg == "" {
3021
if result.AssertionCode != 0 && result.Assertion != "" {
3022
return &QueryError{Code: result.AssertionCode, Message: result.Assertion, Assertion: true}
3024
if result.Err != "" {
3025
return &QueryError{Code: result.Code, Message: result.Err}
3027
return &QueryError{Code: result.Code, Message: result.ErrMsg}
3030
// One executes the query and unmarshals the first obtained document into the
3031
// result argument. The result must be a struct or map value capable of being
3032
// unmarshalled into by gobson. This function blocks until either a result
3033
// is available or an error happens. For example:
3035
// err := collection.Find(bson.M{"a": 1}).One(&result)
3037
// In case the resulting document includes a field named $err or errmsg, which
3038
// are standard ways for MongoDB to return query errors, the returned err will
3039
// be set to a *QueryError value including the Err message and the Code. In
3040
// those cases, the result argument is still unmarshalled into with the
3041
// received document so that any other custom values may be obtained if
3044
func (q *Query) One(result interface{}) (err error) {
3046
session := q.session
3050
socket, err := session.acquireSocket(true)
3054
defer socket.Release()
3058
session.prepareQuery(&op)
3060
expectFindReply := prepareFindOp(socket, &op, 1)
3062
data, err := socket.SimpleQuery(&op)
3069
if expectFindReply {
3070
var findReply struct {
3076
err = bson.Unmarshal(data, &findReply)
3080
if !findReply.Ok && findReply.Errmsg != "" {
3081
return &QueryError{Code: findReply.Code, Message: findReply.Errmsg}
3083
if len(findReply.Cursor.FirstBatch) == 0 {
3086
data = findReply.Cursor.FirstBatch[0].Data
3089
err = bson.Unmarshal(data, result)
3091
debugf("Query %p document unmarshaled: %#v", q, result)
3093
debugf("Query %p document unmarshaling failed: %#v", q, err)
3097
return checkQueryError(op.collection, data)
3100
// prepareFindOp translates op from being an old-style wire protocol query into
3101
// a new-style find command if that's supported by the MongoDB server (3.2+).
3102
// It returns whether to expect a find command result or not. Note op may be
3103
// translated into an explain command, in which case the function returns false.
3104
func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32) bool {
3105
if socket.ServerInfo().MaxWireVersion < 4 || op.collection == "admin.$cmd" {
3109
nameDot := strings.Index(op.collection, ".")
3111
panic("invalid query collection name: " + op.collection)
3115
Collection: op.collection[nameDot+1:],
3117
Projection: op.selector,
3118
Sort: op.options.OrderBy,
3121
MaxTimeMS: op.options.MaxTimeMS,
3122
MaxScan: op.options.MaxScan,
3123
Hint: op.options.Hint,
3124
Comment: op.options.Comment,
3125
Snapshot: op.options.Snapshot,
3126
OplogReplay: op.flags&flagLogReplay != 0,
3129
find.BatchSize = -op.limit
3130
find.SingleBatch = true
3132
find.BatchSize = op.limit
3135
explain := op.options.Explain
3137
op.collection = op.collection[:nameDot] + ".$cmd"
3141
op.options = queryWrapper{}
3142
op.hasOptions = false
3145
op.query = bson.D{{"explain", op.query}}
3151
type cursorData struct {
3152
FirstBatch []bson.Raw "firstBatch"
3153
NextBatch []bson.Raw "nextBatch"
3158
// findCmd holds the command used for performing queries on MongoDB 3.2+.
3160
// Relevant documentation:
3162
// https://docs.mongodb.org/master/reference/command/find/#dbcmd.find
3164
type findCmd struct {
3165
Collection string `bson:"find"`
3166
Filter interface{} `bson:"filter,omitempty"`
3167
Sort interface{} `bson:"sort,omitempty"`
3168
Projection interface{} `bson:"projection,omitempty"`
3169
Hint interface{} `bson:"hint,omitempty"`
3170
Skip interface{} `bson:"skip,omitempty"`
3171
Limit int32 `bson:"limit,omitempty"`
3172
BatchSize int32 `bson:"batchSize,omitempty"`
3173
SingleBatch bool `bson:"singleBatch,omitempty"`
3174
Comment string `bson:"comment,omitempty"`
3175
MaxScan int `bson:"maxScan,omitempty"`
3176
MaxTimeMS int `bson:"maxTimeMS,omitempty"`
3177
ReadConcern interface{} `bson:"readConcern,omitempty"`
3178
Max interface{} `bson:"max,omitempty"`
3179
Min interface{} `bson:"min,omitempty"`
3180
ReturnKey bool `bson:"returnKey,omitempty"`
3181
ShowRecordId bool `bson:"showRecordId,omitempty"`
3182
Snapshot bool `bson:"snapshot,omitempty"`
3183
Tailable bool `bson:"tailable,omitempty"`
3184
AwaitData bool `bson:"awaitData,omitempty"`
3185
OplogReplay bool `bson:"oplogReplay,omitempty"`
3186
NoCursorTimeout bool `bson:"noCursorTimeout,omitempty"`
3187
AllowPartialResults bool `bson:"allowPartialResults,omitempty"`
3190
// getMoreCmd holds the command used for requesting more query results on MongoDB 3.2+.
3192
// Relevant documentation:
3194
// https://docs.mongodb.org/master/reference/command/getMore/#dbcmd.getMore
3196
type getMoreCmd struct {
3197
CursorId int64 `bson:"getMore"`
3198
Collection string `bson:"collection"`
3199
BatchSize int32 `bson:"batchSize,omitempty"`
3200
MaxTimeMS int64 `bson:"maxTimeMS,omitempty"`
3203
// run duplicates the behavior of collection.Find(query).One(&result)
3204
// as performed by Database.Run, specializing the logic for running
3205
// database commands on a given socket.
3206
func (db *Database) run(socket *mongoSocket, cmd, result interface{}) (err error) {
3208
if name, ok := cmd.(string); ok {
3209
cmd = bson.D{{name, 1}}
3213
session := db.Session
3215
op := session.queryConfig.op // Copy.
3218
op.collection = db.Name + ".$cmd"
3221
session.prepareQuery(&op)
3224
data, err := socket.SimpleQuery(&op)
3232
err = bson.Unmarshal(data, result)
3235
bson.Unmarshal(data, &res)
3236
debugf("Run command unmarshaled: %#v, result: %#v", op, res)
3238
debugf("Run command unmarshaling failed: %#v", op, err)
3242
return checkQueryError(op.collection, data)
3245
// The DBRef type implements support for the database reference MongoDB
3246
// convention as supported by multiple drivers. This convention enables
3247
// cross-referencing documents between collections and databases using
3248
// a structure which includes a collection name, a document id, and
3249
// optionally a database name.
3251
// See the FindRef methods on Session and on Database.
3253
// Relevant documentation:
3255
// http://www.mongodb.org/display/DOCS/Database+References
3258
Collection string `bson:"$ref"`
3259
Id interface{} `bson:"$id"`
3260
Database string `bson:"$db,omitempty"`
3263
// NOTE: Order of fields for DBRef above does matter, per documentation.
3265
// FindRef returns a query that looks for the document in the provided
3266
// reference. If the reference includes the DB field, the document will
3267
// be retrieved from the respective database.
3269
// See also the DBRef type and the FindRef method on Session.
3271
// Relevant documentation:
3273
// http://www.mongodb.org/display/DOCS/Database+References
3275
func (db *Database) FindRef(ref *DBRef) *Query {
3277
if ref.Database == "" {
3278
c = db.C(ref.Collection)
3280
c = db.Session.DB(ref.Database).C(ref.Collection)
3282
return c.FindId(ref.Id)
3285
// FindRef returns a query that looks for the document in the provided
3286
// reference. For a DBRef to be resolved correctly at the session level
3287
// it must necessarily have the optional DB field defined.
3289
// See also the DBRef type and the FindRef method on Database.
3291
// Relevant documentation:
3293
// http://www.mongodb.org/display/DOCS/Database+References
3295
func (s *Session) FindRef(ref *DBRef) *Query {
3296
if ref.Database == "" {
3297
panic(errors.New(fmt.Sprintf("Can't resolve database for %#v", ref)))
3299
c := s.DB(ref.Database).C(ref.Collection)
3300
return c.FindId(ref.Id)
3303
// CollectionNames returns the collection names present in the db database.
3304
func (db *Database) CollectionNames() (names []string, err error) {
3305
// Clone session and set it to Monotonic mode so that the server
3306
// used for the query may be safely obtained afterwards, if
3307
// necessary for iteration when a cursor is received.
3308
cloned := db.Session.nonEventual()
3309
defer cloned.Close()
3311
batchSize := int(cloned.queryConfig.op.limit)
3313
// Try with a command.
3315
Collections []bson.Raw
3318
err = db.With(cloned).Run(bson.D{{"listCollections", 1}, {"cursor", bson.D{{"batchSize", batchSize}}}}, &result)
3320
firstBatch := result.Collections
3321
if firstBatch == nil {
3322
firstBatch = result.Cursor.FirstBatch
3325
ns := strings.SplitN(result.Cursor.NS, ".", 2)
3327
iter = db.With(cloned).C("").NewIter(nil, firstBatch, result.Cursor.Id, nil)
3329
iter = cloned.DB(ns[0]).C(ns[1]).NewIter(nil, firstBatch, result.Cursor.Id, nil)
3331
var coll struct{ Name string }
3332
for iter.Next(&coll) {
3333
names = append(names, coll.Name)
3335
if err := iter.Close(); err != nil {
3341
if err != nil && !isNoCmd(err) {
3345
// Command not yet supported. Query the database instead.
3346
nameIndex := len(db.Name) + 1
3347
iter := db.C("system.namespaces").Find(nil).Iter()
3348
var coll struct{ Name string }
3349
for iter.Next(&coll) {
3350
if strings.Index(coll.Name, "$") < 0 || strings.Index(coll.Name, ".oplog.$") >= 0 {
3351
names = append(names, coll.Name[nameIndex:])
3354
if err := iter.Close(); err != nil {
3361
type dbNames struct {
3362
Databases []struct {
3368
// DatabaseNames returns the names of non-empty databases present in the cluster.
3369
func (s *Session) DatabaseNames() (names []string, err error) {
3371
err = s.Run("listDatabases", &result)
3375
for _, db := range result.Databases {
3377
names = append(names, db.Name)
3384
// Iter executes the query and returns an iterator capable of going over all
3385
// the results. Results will be returned in batches of configurable
3386
// size (see the Batch method) and more documents will be requested when a
3387
// configurable number of documents is iterated over (see the Prefetch method).
3388
func (q *Query) Iter() *Iter {
3390
session := q.session
3392
prefetch := q.prefetch
3402
iter.gotReply.L = &iter.m
3403
iter.op.collection = op.collection
3404
iter.op.limit = op.limit
3405
iter.op.replyFunc = iter.replyFunc()
3406
iter.docsToReceive++
3408
socket, err := session.acquireSocket(true)
3413
defer socket.Release()
3415
session.prepareQuery(&op)
3416
op.replyFunc = iter.op.replyFunc
3418
if prepareFindOp(socket, &op, limit) {
3422
iter.server = socket.Server()
3423
err = socket.Query(&op)
3425
// Must lock as the query is already out and it may call replyFunc.
3434
// Tail returns a tailable iterator. Unlike a normal iterator, a
3435
// tailable iterator may wait for new values to be inserted in the
3436
// collection once the end of the current result set is reached,
3437
// A tailable iterator may only be used with capped collections.
3439
// The timeout parameter indicates how long Next will block waiting
3440
// for a result before timing out. If set to -1, Next will not
3441
// timeout, and will continue waiting for a result for as long as
3442
// the cursor is valid and the session is not closed. If set to 0,
3443
// Next times out as soon as it reaches the end of the result set.
3444
// Otherwise, Next will wait for at least the given number of
3445
// seconds for a new document to be available before timing out.
3447
// On timeouts, Next will unblock and return false, and the Timeout
3448
// method will return true if called. In these cases, Next may still
3449
// be called again on the same iterator to check if a new value is
3450
// available at the current cursor position, and again it will block
3451
// according to the specified timeoutSecs. If the cursor becomes
3452
// invalid, though, both Next and Timeout will return false and
3453
// the query must be restarted.
3455
// The following example demonstrates timeout handling and query
3458
// iter := collection.Find(nil).Sort("$natural").Tail(5 * time.Second)
3460
// for iter.Next(&result) {
3461
// fmt.Println(result.Id)
3462
// lastId = result.Id
3464
// if iter.Err() != nil {
3465
// return iter.Close()
3467
// if iter.Timeout() {
3470
// query := collection.Find(bson.M{"_id": bson.M{"$gt": lastId}})
3471
// iter = query.Sort("$natural").Tail(5 * time.Second)
3475
// Relevant documentation:
3477
// http://www.mongodb.org/display/DOCS/Tailable+Cursors
3478
// http://www.mongodb.org/display/DOCS/Capped+Collections
3479
// http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
3481
func (q *Query) Tail(timeout time.Duration) *Iter {
3483
session := q.session
3485
prefetch := q.prefetch
3488
iter := &Iter{session: session, prefetch: prefetch}
3489
iter.gotReply.L = &iter.m
3490
iter.timeout = timeout
3491
iter.op.collection = op.collection
3492
iter.op.limit = op.limit
3493
iter.op.replyFunc = iter.replyFunc()
3494
iter.docsToReceive++
3495
session.prepareQuery(&op)
3496
op.replyFunc = iter.op.replyFunc
3497
op.flags |= flagTailable | flagAwaitData
3499
socket, err := session.acquireSocket(true)
3503
iter.server = socket.Server()
3504
err = socket.Query(&op)
3506
// Must lock as the query is already out and it may call replyFunc.
3516
func (s *Session) prepareQuery(op *queryOp) {
3518
op.mode = s.consistency
3520
op.flags |= flagSlaveOk
3526
// Err returns nil if no errors happened during iteration, or the actual
3529
// In case a resulting document included a field named $err or errmsg, which are
3530
// standard ways for MongoDB to report an improper query, the returned value has
3531
// a *QueryError type, and includes the Err message and the Code.
3532
func (iter *Iter) Err() error {
3536
if err == ErrNotFound {
3542
// Close kills the server cursor used by the iterator, if any, and returns
3543
// nil if no errors happened during iteration, or the actual error otherwise.
3545
// Server cursors are automatically closed at the end of an iteration, which
3546
// means close will do nothing unless the iteration was interrupted before
3547
// the server finished sending results to the driver. If Close is not called
3548
// in such a situation, the cursor will remain available at the server until
3549
// the default cursor timeout period is reached. No further problems arise.
3551
// Close is idempotent. That means it can be called repeatedly and will
3552
// return the same result every time.
3554
// In case a resulting document included a field named $err or errmsg, which are
3555
// standard ways for MongoDB to report an improper query, the returned value has
3556
// a *QueryError type.
3557
func (iter *Iter) Close() error {
3559
cursorId := iter.op.cursorId
3560
iter.op.cursorId = 0
3564
if err == ErrNotFound {
3569
socket, err := iter.acquireSocket()
3571
// TODO Batch kills.
3572
err = socket.Query(&killCursorsOp{[]int64{cursorId}})
3577
if err != nil && (iter.err == nil || iter.err == ErrNotFound) {
3579
} else if iter.err != ErrNotFound {
3586
// Timeout returns true if Next returned false due to a timeout of
3587
// a tailable cursor. In those cases, Next may be called again to continue
3588
// the iteration at the previous cursor position.
3589
func (iter *Iter) Timeout() bool {
3591
result := iter.timedout
3596
// Next retrieves the next document from the result set, blocking if necessary.
3597
// This method will also automatically retrieve another batch of documents from
3598
// the server when the current one is exhausted, or before that in background
3599
// if pre-fetching is enabled (see the Query.Prefetch and Session.SetPrefetch
3602
// Next returns true if a document was successfully unmarshalled onto result,
3603
// and false at the end of the result set or if an error happened.
3604
// When Next returns false, the Err method should be called to verify if
3605
// there was an error during iteration.
3609
// iter := collection.Find(nil).Iter()
3610
// for iter.Next(&result) {
3611
// fmt.Printf("Result: %v\n", result.Id)
3613
// if err := iter.Close(); err != nil {
3617
func (iter *Iter) Next(result interface{}) bool {
3619
iter.timedout = false
3620
timeout := time.Time{}
3621
for iter.err == nil && iter.docData.Len() == 0 && (iter.docsToReceive > 0 || iter.op.cursorId != 0) {
3622
if iter.docsToReceive == 0 {
3623
if iter.timeout >= 0 {
3624
if timeout.IsZero() {
3625
timeout = time.Now().Add(iter.timeout)
3627
if time.Now().After(timeout) {
3628
iter.timedout = true
3634
if iter.err != nil {
3638
iter.gotReply.Wait()
3641
// Exhaust available data before reporting any errors.
3642
if docData, ok := iter.docData.Pop().([]byte); ok {
3646
if iter.limit == 0 {
3647
if iter.docData.Len() > 0 {
3649
panic(fmt.Errorf("data remains after limit exhausted: %d", iter.docData.Len()))
3651
iter.err = ErrNotFound
3655
if iter.op.cursorId != 0 && iter.err == nil {
3656
iter.docsBeforeMore--
3657
if iter.docsBeforeMore == -1 {
3666
err := bson.Unmarshal(docData, result)
3668
debugf("Iter %p document unmarshaling failed: %#v", iter, err)
3670
if iter.err == nil {
3676
debugf("Iter %p document unmarshaled: %#v", iter, result)
3677
// XXX Only have to check first document for a query error?
3678
err = checkQueryError(iter.op.collection, docData)
3681
if iter.err == nil {
3688
} else if iter.err != nil {
3689
debugf("Iter %p returning false: %s", iter, iter.err)
3692
} else if iter.op.cursorId == 0 {
3693
iter.err = ErrNotFound
3694
debugf("Iter %p exhausted with cursor=0", iter)
3699
panic("unreachable")
3702
// All retrieves all documents from the result set into the provided slice
3703
// and closes the iterator.
3705
// The result argument must necessarily be the address for a slice. The slice
3706
// may be nil or previously allocated.
3708
// WARNING: Obviously, All must not be used with result sets that may be
3709
// potentially large, since it may consume all memory until the system
3710
// crashes. Consider building the query with a Limit clause to ensure the
3711
// result size is bounded.
3715
// var result []struct{ Value int }
3716
// iter := collection.Find(nil).Limit(100).Iter()
3717
// err := iter.All(&result)
3722
func (iter *Iter) All(result interface{}) error {
3723
resultv := reflect.ValueOf(result)
3724
if resultv.Kind() != reflect.Ptr || resultv.Elem().Kind() != reflect.Slice {
3725
panic("result argument must be a slice address")
3727
slicev := resultv.Elem()
3728
slicev = slicev.Slice(0, slicev.Cap())
3729
elemt := slicev.Type().Elem()
3732
if slicev.Len() == i {
3733
elemp := reflect.New(elemt)
3734
if !iter.Next(elemp.Interface()) {
3737
slicev = reflect.Append(slicev, elemp.Elem())
3738
slicev = slicev.Slice(0, slicev.Cap())
3740
if !iter.Next(slicev.Index(i).Addr().Interface()) {
3746
resultv.Elem().Set(slicev.Slice(0, i))
3750
// All works like Iter.All.
3751
func (q *Query) All(result interface{}) error {
3752
return q.Iter().All(result)
3755
// The For method is obsolete and will be removed in a future release.
3756
// See Iter as an elegant replacement.
3757
func (q *Query) For(result interface{}, f func() error) error {
3758
return q.Iter().For(result, f)
3761
// The For method is obsolete and will be removed in a future release.
3762
// See Iter as an elegant replacement.
3763
func (iter *Iter) For(result interface{}, f func() error) (err error) {
3765
v := reflect.ValueOf(result)
3766
if v.Kind() == reflect.Ptr {
3769
case reflect.Map, reflect.Ptr, reflect.Interface, reflect.Slice:
3774
panic("For needs a pointer to nil reference value. See the documentation.")
3776
zero := reflect.Zero(v.Type())
3779
if !iter.Next(result) {
3790
// acquireSocket acquires a socket from the same server that the iterator
3791
// cursor was obtained from.
3793
// WARNING: This method must not be called with iter.m locked. Acquiring the
3794
// socket depends on the cluster sync loop, and the cluster sync loop might
3795
// attempt actions which cause replyFunc to be called, inducing a deadlock.
3796
func (iter *Iter) acquireSocket() (*mongoSocket, error) {
3797
socket, err := iter.session.acquireSocket(true)
3801
if socket.Server() != iter.server {
3802
// Socket server changed during iteration. This may happen
3803
// with Eventual sessions, if a Refresh is done, or if a
3804
// monotonic session gets a write and shifts from secondary
3805
// to primary. Our cursor is in a specific server, though.
3806
iter.session.m.Lock()
3807
sockTimeout := iter.session.sockTimeout
3808
iter.session.m.Unlock()
3810
socket, _, err = iter.server.AcquireSocket(0, sockTimeout)
3814
err := iter.session.socketLogin(socket)
3823
func (iter *Iter) getMore() {
3824
// Increment now so that unlocking the iterator won't cause a
3825
// different goroutine to get here as well.
3826
iter.docsToReceive++
3828
socket, err := iter.acquireSocket()
3834
defer socket.Release()
3836
debugf("Iter %p requesting more documents", iter)
3838
// The -1 below accounts for the fact docsToReceive was incremented above.
3839
limit := iter.limit - int32(iter.docsToReceive-1) - int32(iter.docData.Len())
3840
if limit < iter.op.limit {
3841
iter.op.limit = limit
3846
op = iter.getMoreCmd()
3850
if err := socket.Query(op); err != nil {
3851
iter.docsToReceive--
3856
func (iter *Iter) getMoreCmd() *queryOp {
3857
// TODO: Define the query statically in the Iter type, next to getMoreOp.
3858
nameDot := strings.Index(iter.op.collection, ".")
3860
panic("invalid query collection name: " + iter.op.collection)
3863
getMore := getMoreCmd{
3864
CursorId: iter.op.cursorId,
3865
Collection: iter.op.collection[nameDot+1:],
3866
BatchSize: iter.op.limit,
3870
op.collection = iter.op.collection[:nameDot] + ".$cmd"
3873
op.replyFunc = iter.op.replyFunc
3877
type countCmd struct {
3880
Limit int32 ",omitempty"
3881
Skip int32 ",omitempty"
3884
// Count returns the total number of documents in the result set.
3885
func (q *Query) Count() (n int, err error) {
3887
session := q.session
3892
c := strings.Index(op.collection, ".")
3894
return 0, errors.New("Bad collection name: " + op.collection)
3897
dbname := op.collection[:c]
3898
cname := op.collection[c+1:]
3903
result := struct{ N int }{}
3904
err = session.DB(dbname).Run(countCmd{cname, query, limit, op.skip}, &result)
3905
return result.N, err
3908
// Count returns the total number of documents in the collection.
3909
func (c *Collection) Count() (n int, err error) {
3910
return c.Find(nil).Count()
3913
type distinctCmd struct {
3914
Collection string "distinct"
3916
Query interface{} ",omitempty"
3919
// Distinct unmarshals into result the list of distinct values for the given key.
3924
// err := collection.Find(bson.M{"gender": "F"}).Distinct("age", &result)
3926
// Relevant documentation:
3928
// http://www.mongodb.org/display/DOCS/Aggregation
3930
func (q *Query) Distinct(key string, result interface{}) error {
3932
session := q.session
3936
c := strings.Index(op.collection, ".")
3938
return errors.New("Bad collection name: " + op.collection)
3941
dbname := op.collection[:c]
3942
cname := op.collection[c+1:]
3944
var doc struct{ Values bson.Raw }
3945
err := session.DB(dbname).Run(distinctCmd{cname, key, op.query}, &doc)
3949
return doc.Values.Unmarshal(result)
3952
type mapReduceCmd struct {
3953
Collection string "mapreduce"
3954
Map string ",omitempty"
3955
Reduce string ",omitempty"
3956
Finalize string ",omitempty"
3957
Limit int32 ",omitempty"
3959
Query interface{} ",omitempty"
3960
Sort interface{} ",omitempty"
3961
Scope interface{} ",omitempty"
3962
Verbose bool ",omitempty"
3965
type mapReduceResult struct {
3968
TimeMillis int64 "timeMillis"
3969
Counts struct{ Input, Emit, Output int }
3972
Timing *MapReduceTime
3975
type MapReduce struct {
3976
Map string // Map Javascript function code (required)
3977
Reduce string // Reduce Javascript function code (required)
3978
Finalize string // Finalize Javascript function code (optional)
3979
Out interface{} // Output collection name or document. If nil, results are inlined into the result parameter.
3980
Scope interface{} // Optional global scope for Javascript functions
3984
type MapReduceInfo struct {
3985
InputCount int // Number of documents mapped
3986
EmitCount int // Number of times reduce called emit
3987
OutputCount int // Number of documents in resulting collection
3988
Database string // Output database, if results are not inlined
3989
Collection string // Output collection, if results are not inlined
3990
Time int64 // Time to run the job, in nanoseconds
3991
VerboseTime *MapReduceTime // Only defined if Verbose was true
3994
type MapReduceTime struct {
3995
Total int64 // Total time, in nanoseconds
3996
Map int64 "mapTime" // Time within map function, in nanoseconds
3997
EmitLoop int64 "emitLoop" // Time within the emit/map loop, in nanoseconds
4000
// MapReduce executes a map/reduce job for documents covered by the query.
4001
// That kind of job is suitable for very flexible bulk aggregation of data
4002
// performed at the server side via Javascript functions.
4004
// Results from the job may be returned as a result of the query itself
4005
// through the result parameter in case they'll certainly fit in memory
4006
// and in a single document. If there's the possibility that the amount
4007
// of data might be too large, results must be stored back in an alternative
4008
// collection or even a separate database, by setting the Out field of the
4009
// provided MapReduce job. In that case, provide nil as the result parameter.
4011
// These are some of the ways to set Out:
4014
// Inline results into the result parameter.
4016
// bson.M{"replace": "mycollection"}
4017
// The output will be inserted into a collection which replaces any
4018
// existing collection with the same name.
4020
// bson.M{"merge": "mycollection"}
4021
// This option will merge new data into the old output collection. In
4022
// other words, if the same key exists in both the result set and the
4023
// old collection, the new key will overwrite the old one.
4025
// bson.M{"reduce": "mycollection"}
4026
// If documents exist for a given key in the result set and in the old
4027
// collection, then a reduce operation (using the specified reduce
4028
// function) will be performed on the two values and the result will be
4029
// written to the output collection. If a finalize function was
4030
// provided, this will be run after the reduce as well.
4032
// bson.M{...., "db": "mydb"}
4033
// Any of the above options can have the "db" key included for doing
4034
// the respective action in a separate database.
4036
// The following is a trivial example which will count the number of
4037
// occurrences of a field named n on each document in a collection, and
4038
// will return results inline:
4040
// job := &mgo.MapReduce{
4041
// Map: "function() { emit(this.n, 1) }",
4042
// Reduce: "function(key, values) { return Array.sum(values) }",
4044
// var result []struct { Id int "_id"; Value int }
4045
// _, err := collection.Find(nil).MapReduce(job, &result)
4049
// for _, item := range result {
4050
// fmt.Println(item.Value)
4053
// This function is compatible with MongoDB 1.7.4+.
4055
// Relevant documentation:
4057
// http://www.mongodb.org/display/DOCS/MapReduce
4059
func (q *Query) MapReduce(job *MapReduce, result interface{}) (info *MapReduceInfo, err error) {
4061
session := q.session
4066
c := strings.Index(op.collection, ".")
4068
return nil, errors.New("Bad collection name: " + op.collection)
4071
dbname := op.collection[:c]
4072
cname := op.collection[c+1:]
4074
cmd := mapReduceCmd{
4078
Finalize: job.Finalize,
4079
Out: fixMROut(job.Out),
4081
Verbose: job.Verbose,
4083
Sort: op.options.OrderBy,
4088
cmd.Out = bson.D{{"inline", 1}}
4091
var doc mapReduceResult
4092
err = session.DB(dbname).Run(&cmd, &doc)
4097
return nil, errors.New(doc.Err)
4100
info = &MapReduceInfo{
4101
InputCount: doc.Counts.Input,
4102
EmitCount: doc.Counts.Emit,
4103
OutputCount: doc.Counts.Output,
4104
Time: doc.TimeMillis * 1e6,
4107
if doc.Result.Kind == 0x02 {
4108
err = doc.Result.Unmarshal(&info.Collection)
4109
info.Database = dbname
4110
} else if doc.Result.Kind == 0x03 {
4111
var v struct{ Collection, Db string }
4112
err = doc.Result.Unmarshal(&v)
4113
info.Collection = v.Collection
4114
info.Database = v.Db
4117
if doc.Timing != nil {
4118
info.VerboseTime = doc.Timing
4119
info.VerboseTime.Total *= 1e6
4120
info.VerboseTime.Map *= 1e6
4121
info.VerboseTime.EmitLoop *= 1e6
4128
return info, doc.Results.Unmarshal(result)
4133
// The "out" option in the MapReduce command must be ordered. This was
4134
// found after the implementation was accepting maps for a long time,
4135
// so rather than breaking the API, we'll fix the order if necessary.
4136
// Details about the order requirement may be seen in MongoDB's code:
4138
// http://goo.gl/L8jwJX
4140
func fixMROut(out interface{}) interface{} {
4141
outv := reflect.ValueOf(out)
4142
if outv.Kind() != reflect.Map || outv.Type().Key() != reflect.TypeOf("") {
4145
outs := make(bson.D, outv.Len())
4148
for i, k := range outv.MapKeys() {
4151
outs[i].Value = outv.MapIndex(k).Interface()
4153
case "normal", "replace", "merge", "reduce", "inline":
4157
if outTypeIndex > 0 {
4158
outs[0], outs[outTypeIndex] = outs[outTypeIndex], outs[0]
4163
// Change holds fields for running a findAndModify MongoDB command via
4164
// the Query.Apply method.
4165
type Change struct {
4166
Update interface{} // The update document
4167
Upsert bool // Whether to insert in case the document isn't found
4168
Remove bool // Whether to remove the document found rather than updating
4169
ReturnNew bool // Should the modified document be returned rather than the old one
4172
type findModifyCmd struct {
4173
Collection string "findAndModify"
4174
Query, Update, Sort, Fields interface{} ",omitempty"
4175
Upsert, Remove, New bool ",omitempty"
4178
type valueResult struct {
4180
LastError LastError "lastErrorObject"
4183
// Apply runs the findAndModify MongoDB command, which allows updating, upserting
4184
// or removing a document matching a query and atomically returning either the old
4185
// version (the default) or the new version of the document (when ReturnNew is true).
4186
// If no objects are found Apply returns ErrNotFound.
4188
// The Sort and Select query methods affect the result of Apply. In case
4189
// multiple documents match the query, Sort enables selecting which document to
4190
// act upon by ordering it first. Select enables retrieving only a selection
4191
// of fields of the new or old document.
4193
// This simple example increments a counter and prints its new value:
4195
// change := mgo.Change{
4196
// Update: bson.M{"$inc": bson.M{"n": 1}},
4199
// info, err = col.Find(M{"_id": id}).Apply(change, &doc)
4200
// fmt.Println(doc.N)
4202
// This method depends on MongoDB >= 2.0 to work properly.
4204
// Relevant documentation:
4206
// http://www.mongodb.org/display/DOCS/findAndModify+Command
4207
// http://www.mongodb.org/display/DOCS/Updating
4208
// http://www.mongodb.org/display/DOCS/Atomic+Operations
4210
func (q *Query) Apply(change Change, result interface{}) (info *ChangeInfo, err error) {
4212
session := q.session
4216
c := strings.Index(op.collection, ".")
4218
return nil, errors.New("bad collection name: " + op.collection)
4221
dbname := op.collection[:c]
4222
cname := op.collection[c+1:]
4224
cmd := findModifyCmd{
4226
Update: change.Update,
4227
Upsert: change.Upsert,
4228
Remove: change.Remove,
4229
New: change.ReturnNew,
4231
Sort: op.options.OrderBy,
4232
Fields: op.selector,
4235
session = session.Clone()
4236
defer session.Close()
4237
session.SetMode(Strong, false)
4240
for retries := 0; ; retries++ {
4241
err = session.DB(dbname).Run(&cmd, &doc)
4243
if qerr, ok := err.(*QueryError); ok && qerr.Message == "No matching object found" {
4244
return nil, ErrNotFound
4246
if change.Upsert && IsDup(err) && retries < maxUpsertRetries {
4247
// Retry duplicate key errors on upserts.
4248
// https://docs.mongodb.com/v3.2/reference/method/db.collection.update/#use-unique-indexes
4253
break // No error, so don't retry.
4256
if doc.LastError.N == 0 {
4257
return nil, ErrNotFound
4259
if doc.Value.Kind != 0x0A && result != nil {
4260
err = doc.Value.Unmarshal(result)
4265
info = &ChangeInfo{}
4266
lerr := &doc.LastError
4267
if lerr.UpdatedExisting {
4268
info.Updated = lerr.N
4269
info.Matched = lerr.N
4270
} else if change.Remove {
4271
info.Removed = lerr.N
4272
info.Matched = lerr.N
4273
} else if change.Upsert {
4274
info.UpsertedId = lerr.UpsertedId
4279
// The BuildInfo type encapsulates details about the running MongoDB server.
4281
// Note that the VersionArray field was introduced in MongoDB 2.0+, but it is
4282
// internally assembled from the Version information for previous versions.
4283
// In both cases, VersionArray is guaranteed to have at least 4 entries.
4284
type BuildInfo struct {
4286
VersionArray []int `bson:"versionArray"` // On MongoDB 2.0+; assembled from Version otherwise
4287
GitVersion string `bson:"gitVersion"`
4288
OpenSSLVersion string `bson:"OpenSSLVersion"`
4289
SysInfo string `bson:"sysInfo"` // Deprecated and empty on MongoDB 3.2+.
4292
MaxObjectSize int `bson:"maxBsonObjectSize"`
4295
// VersionAtLeast returns whether the BuildInfo version is greater than or
4296
// equal to the provided version number. If more than one number is
4297
// provided, numbers will be considered as major, minor, and so on.
4298
func (bi *BuildInfo) VersionAtLeast(version ...int) bool {
4299
for i := range version {
4300
if i == len(bi.VersionArray) {
4303
if bi.VersionArray[i] < version[i] {
4310
// BuildInfo retrieves the version and other details about the
4311
// running MongoDB server.
4312
func (s *Session) BuildInfo() (info BuildInfo, err error) {
4313
err = s.Run(bson.D{{"buildInfo", "1"}}, &info)
4314
if len(info.VersionArray) == 0 {
4315
for _, a := range strings.Split(info.Version, ".") {
4316
i, err := strconv.Atoi(a)
4320
info.VersionArray = append(info.VersionArray, i)
4323
for len(info.VersionArray) < 4 {
4324
info.VersionArray = append(info.VersionArray, 0)
4326
if i := strings.IndexByte(info.GitVersion, ' '); i >= 0 {
4327
// Strip off the " modules: enterprise" suffix. This is a _git version_.
4328
// That information may be moved to another field if people need it.
4329
info.GitVersion = info.GitVersion[:i]
4331
if info.SysInfo == "deprecated" {
4337
// ---------------------------------------------------------------------------
4338
// Internal session handling helpers.
4340
func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
4342
// Read-only lock to check for previously reserved socket.
4344
// If there is a slave socket reserved and its use is acceptable, take it as long
4345
// as there isn't a master socket which would be preferred by the read preference mode.
4346
if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
4347
socket := s.slaveSocket
4352
if s.masterSocket != nil {
4353
socket := s.masterSocket
4360
// No go. We may have to request a new socket and change the session,
4361
// so try again but with an exclusive lock now.
4365
if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
4366
s.slaveSocket.Acquire()
4367
return s.slaveSocket, nil
4369
if s.masterSocket != nil {
4370
s.masterSocket.Acquire()
4371
return s.masterSocket, nil
4374
// Still not good. We need a new socket.
4375
sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
4380
// Authenticate the new socket.
4381
if err = s.socketLogin(sock); err != nil {
4386
// Keep track of the new socket, if necessary.
4387
// Note that, as a special case, if the Eventual session was
4388
// not refreshed (s.slaveSocket != nil), it means the developer
4389
// asked to preserve an existing reserved socket, so we'll
4390
// keep a master one around too before a Refresh happens.
4391
if s.consistency != Eventual || s.slaveSocket != nil {
4395
// Switch over a Monotonic session to the master.
4396
if !slaveOk && s.consistency == Monotonic {
4403
// setSocket binds socket to this section.
4404
func (s *Session) setSocket(socket *mongoSocket) {
4405
info := socket.Acquire()
4407
if s.masterSocket != nil {
4408
panic("setSocket(master) with existing master socket reserved")
4410
s.masterSocket = socket
4412
if s.slaveSocket != nil {
4413
panic("setSocket(slave) with existing slave socket reserved")
4415
s.slaveSocket = socket
4419
// unsetSocket releases any slave and/or master sockets reserved.
4420
func (s *Session) unsetSocket() {
4421
if s.masterSocket != nil {
4422
s.masterSocket.Release()
4424
if s.slaveSocket != nil {
4425
s.slaveSocket.Release()
4427
s.masterSocket = nil
4431
func (iter *Iter) replyFunc() replyFunc {
4432
return func(err error, op *replyOp, docNum int, docData []byte) {
4434
iter.docsToReceive--
4437
debugf("Iter %p received an error: %s", iter, err.Error())
4438
} else if docNum == -1 {
4439
debugf("Iter %p received no documents (cursor=%d).", iter, op.cursorId)
4440
if op != nil && op.cursorId != 0 {
4441
// It's a tailable cursor.
4442
iter.op.cursorId = op.cursorId
4443
} else if op != nil && op.cursorId == 0 && op.flags&1 == 1 {
4444
// Cursor likely timed out.
4445
iter.err = ErrCursor
4447
iter.err = ErrNotFound
4449
} else if iter.findCmd {
4450
debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, int(op.replyDocs), op.cursorId)
4451
var findReply struct {
4457
if err := bson.Unmarshal(docData, &findReply); err != nil {
4459
} else if !findReply.Ok && findReply.Errmsg != "" {
4460
iter.err = &QueryError{Code: findReply.Code, Message: findReply.Errmsg}
4461
} else if len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 {
4462
iter.err = ErrNotFound
4464
batch := findReply.Cursor.FirstBatch
4465
if len(batch) == 0 {
4466
batch = findReply.Cursor.NextBatch
4469
for _, raw := range batch {
4470
iter.docData.Push(raw.Data)
4472
iter.docsToReceive = 0
4473
docsToProcess := iter.docData.Len()
4474
if iter.limit == 0 || int32(docsToProcess) < iter.limit {
4475
iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs))
4477
iter.docsBeforeMore = -1
4479
iter.op.cursorId = findReply.Cursor.Id
4482
rdocs := int(op.replyDocs)
4484
iter.docsToReceive += rdocs - 1
4485
docsToProcess := iter.docData.Len() + rdocs
4486
if iter.limit == 0 || int32(docsToProcess) < iter.limit {
4487
iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs))
4489
iter.docsBeforeMore = -1
4491
iter.op.cursorId = op.cursorId
4493
debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, rdocs, op.cursorId)
4494
iter.docData.Push(docData)
4496
iter.gotReply.Broadcast()
4501
type writeCmdResult struct {
4504
NModified int `bson:"nModified"`
4507
Id interface{} `_id`
4509
ConcernError writeConcernError `bson:"writeConcernError"`
4510
Errors []writeCmdError `bson:"writeErrors"`
4513
type writeConcernError struct {
4518
type writeCmdError struct {
4524
func (r *writeCmdResult) BulkErrorCases() []BulkErrorCase {
4525
ecases := make([]BulkErrorCase, len(r.Errors))
4526
for i, err := range r.Errors {
4527
ecases[i] = BulkErrorCase{err.Index, &QueryError{Code: err.Code, Message: err.ErrMsg}}
4532
// writeOp runs the given modifying operation, potentially followed up
4533
// by a getLastError command in case the session is in safe mode. The
4534
// LastError result is made available in lerr, and if lerr.Err is set it
4535
// will also be returned as err.
4536
func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err error) {
4537
s := c.Database.Session
4538
socket, err := s.acquireSocket(c.Database.Name == "local")
4542
defer socket.Release()
4546
bypassValidation := s.bypassValidation
4549
if socket.ServerInfo().MaxWireVersion >= 2 {
4550
// Servers with a more recent write protocol benefit from write commands.
4551
if op, ok := op.(*insertOp); ok && len(op.documents) > 1000 {
4554
// Maximum batch size is 1000. Must split out in separate operations for compatibility.
4556
for i := 0; i < len(all); i += 1000 {
4561
op.documents = all[i:l]
4562
oplerr, err := c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation)
4564
lerr.modified += oplerr.modified
4566
for ei := range lerr.ecases {
4567
oplerr.ecases[ei].Index += i
4569
lerr.ecases = append(lerr.ecases, oplerr.ecases...)
4570
if op.flags&1 == 0 {
4575
if len(lerr.ecases) != 0 {
4576
return &lerr, lerr.ecases[0].Err
4580
return c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation)
4581
} else if updateOps, ok := op.(bulkUpdateOp); ok {
4583
for i, updateOp := range updateOps {
4584
oplerr, err := c.writeOpQuery(socket, safeOp, updateOp, ordered)
4586
lerr.modified += oplerr.modified
4588
lerr.ecases = append(lerr.ecases, BulkErrorCase{i, err})
4594
if len(lerr.ecases) != 0 {
4595
return &lerr, lerr.ecases[0].Err
4598
} else if deleteOps, ok := op.(bulkDeleteOp); ok {
4600
for i, deleteOp := range deleteOps {
4601
oplerr, err := c.writeOpQuery(socket, safeOp, deleteOp, ordered)
4603
lerr.modified += oplerr.modified
4605
lerr.ecases = append(lerr.ecases, BulkErrorCase{i, err})
4611
if len(lerr.ecases) != 0 {
4612
return &lerr, lerr.ecases[0].Err
4616
return c.writeOpQuery(socket, safeOp, op, ordered)
4619
func (c *Collection) writeOpQuery(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered bool) (lerr *LastError, err error) {
4621
return nil, socket.Query(op)
4624
var mutex sync.Mutex
4625
var replyData []byte
4628
query := *safeOp // Copy the data.
4629
query.collection = c.Database.Name + ".$cmd"
4630
query.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) {
4635
err = socket.Query(op, &query)
4639
mutex.Lock() // Wait.
4640
if replyErr != nil {
4641
return nil, replyErr // XXX TESTME
4643
if hasErrMsg(replyData) {
4644
// Looks like getLastError itself failed.
4645
err = checkQueryError(query.collection, replyData)
4650
result := &LastError{}
4651
bson.Unmarshal(replyData, &result)
4652
debugf("Result from writing query: %#v", result)
4653
if result.Err != "" {
4654
result.ecases = []BulkErrorCase{{Index: 0, Err: result}}
4655
if insert, ok := op.(*insertOp); ok && len(insert.documents) > 1 {
4656
result.ecases[0].Index = -1
4658
return result, result
4660
// With MongoDB <2.6 we don't know how many actually changed, so make it the same as matched.
4661
result.modified = result.N
4665
func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered, bypassValidation bool) (lerr *LastError, err error) {
4666
var writeConcern interface{}
4668
writeConcern = bson.D{{"w", 0}}
4670
writeConcern = safeOp.query.(*getLastError)
4674
switch op := op.(type) {
4676
// http://docs.mongodb.org/manual/reference/command/insert
4679
{"documents", op.documents},
4680
{"writeConcern", writeConcern},
4681
{"ordered", op.flags&1 == 0},
4684
// http://docs.mongodb.org/manual/reference/command/update
4687
{"updates", []interface{}{op}},
4688
{"writeConcern", writeConcern},
4689
{"ordered", ordered},
4692
// http://docs.mongodb.org/manual/reference/command/update
4696
{"writeConcern", writeConcern},
4697
{"ordered", ordered},
4700
// http://docs.mongodb.org/manual/reference/command/delete
4703
{"deletes", []interface{}{op}},
4704
{"writeConcern", writeConcern},
4705
{"ordered", ordered},
4708
// http://docs.mongodb.org/manual/reference/command/delete
4712
{"writeConcern", writeConcern},
4713
{"ordered", ordered},
4716
if bypassValidation {
4717
cmd = append(cmd, bson.DocElem{"bypassDocumentValidation", true})
4720
var result writeCmdResult
4721
err = c.Database.run(socket, cmd, &result)
4722
debugf("Write command result: %#v (err=%v)", result, err)
4723
ecases := result.BulkErrorCases()
4725
UpdatedExisting: result.N > 0 && len(result.Upserted) == 0,
4728
modified: result.NModified,
4731
if len(result.Upserted) > 0 {
4732
lerr.UpsertedId = result.Upserted[0].Id
4734
if len(result.Errors) > 0 {
4735
e := result.Errors[0]
4739
} else if result.ConcernError.Code != 0 {
4740
e := result.ConcernError
4746
if err == nil && safeOp == nil {
4752
func hasErrMsg(d []byte) bool {
4754
for i := 0; i+8 < l; i++ {
4755
if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' {