~juju-qa/ubuntu/yakkety/juju/2.0-rc3-again

« back to all changes in this revision

Viewing changes to src/labix.org/v2/mgo/session.go

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2013-04-24 22:34:47 UTC
  • Revision ID: package-import@ubuntu.com-20130424223447-f0qdji7ubnyo0s71
Tags: upstream-1.10.0.1
ImportĀ upstreamĀ versionĀ 1.10.0.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// mgo - MongoDB driver for Go
 
2
//
 
3
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
 
4
//
 
5
// All rights reserved.
 
6
//
 
7
// Redistribution and use in source and binary forms, with or without
 
8
// modification, are permitted provided that the following conditions are met:
 
9
//
 
10
// 1. Redistributions of source code must retain the above copyright notice, this
 
11
//    list of conditions and the following disclaimer.
 
12
// 2. Redistributions in binary form must reproduce the above copyright notice,
 
13
//    this list of conditions and the following disclaimer in the documentation
 
14
//    and/or other materials provided with the distribution.
 
15
//
 
16
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 
17
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 
18
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 
19
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
 
20
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 
21
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
22
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 
23
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
24
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 
25
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
26
 
 
27
package mgo
 
28
 
 
29
import (
 
30
        "crypto/md5"
 
31
        "encoding/hex"
 
32
        "errors"
 
33
        "fmt"
 
34
        "labix.org/v2/mgo/bson"
 
35
        "math"
 
36
        "net"
 
37
        "reflect"
 
38
        "sort"
 
39
        "strconv"
 
40
        "strings"
 
41
        "sync"
 
42
        "time"
 
43
)
 
44
 
 
45
type mode int
 
46
 
 
47
const (
 
48
        Eventual  mode = 0
 
49
        Monotonic mode = 1
 
50
        Strong    mode = 2
 
51
)
 
52
 
 
53
// When changing the Session type, check if newSession and copySession
 
54
// need to be updated too.
 
55
 
 
56
type Session struct {
 
57
        m            sync.RWMutex
 
58
        cluster_     *mongoCluster
 
59
        slaveSocket  *mongoSocket
 
60
        masterSocket *mongoSocket
 
61
        slaveOk      bool
 
62
        consistency  mode
 
63
        queryConfig  query
 
64
        safeOp       *queryOp
 
65
        syncTimeout  time.Duration
 
66
        defaultdb    string
 
67
        dialAuth     *authInfo
 
68
        auth         []authInfo
 
69
}
 
70
 
 
71
type Database struct {
 
72
        Session *Session
 
73
        Name    string
 
74
}
 
75
 
 
76
type Collection struct {
 
77
        Database *Database
 
78
        Name     string // "collection"
 
79
        FullName string // "db.collection"
 
80
}
 
81
 
 
82
type Query struct {
 
83
        m       sync.Mutex
 
84
        session *Session
 
85
        query   // Enables default settings in session.
 
86
}
 
87
 
 
88
type query struct {
 
89
        op       queryOp
 
90
        prefetch float64
 
91
        limit    int32
 
92
}
 
93
 
 
94
type getLastError struct {
 
95
        CmdName  int         "getLastError"
 
96
        W        interface{} "w,omitempty"
 
97
        WTimeout int         "wtimeout,omitempty"
 
98
        FSync    bool        "fsync,omitempty"
 
99
        J        bool        "j,omitempty"
 
100
}
 
101
 
 
102
type Iter struct {
 
103
        m              sync.Mutex
 
104
        gotReply       sync.Cond
 
105
        session        *Session
 
106
        server         *mongoServer
 
107
        docData        queue
 
108
        err            error
 
109
        op             getMoreOp
 
110
        prefetch       float64
 
111
        limit          int32
 
112
        docsToReceive  int
 
113
        docsBeforeMore int
 
114
        timeout        time.Duration
 
115
        timedout       bool
 
116
}
 
117
 
 
118
var ErrNotFound = errors.New("not found")
 
119
 
 
120
const defaultPrefetch = 0.25
 
121
 
 
122
// Dial establishes a new session to the cluster identified by the given seed
 
123
// server(s). The session will enable communication with all of the servers in
 
124
// the cluster, so the seed servers are used only to find out about the cluster
 
125
// topology.
 
126
//
 
127
// Dial will timeout after 10 seconds if a server isn't reached. The returned
 
128
// session will timeout operations after one minute by default if servers
 
129
// aren't available. To customize the timeout, see DialWithTimeout
 
130
// and SetSyncTimeout.
 
131
//
 
132
// This method is generally called just once for a given cluster.  Further
 
133
// sessions to the same cluster are then established using the New or Copy
 
134
// methods on the obtained session. This will make them share the underlying
 
135
// cluster, and manage the pool of connections appropriately.
 
136
//
 
137
// Once the session is not useful anymore, Close must be called to release the
 
138
// resources appropriately.
 
139
//
 
140
// The seed servers must be provided in the following format:
 
141
//
 
142
//     [mongodb://][user:pass@]host1[:port1][,host2[:port2],...][/database][?options]
 
143
//
 
144
// For example, it may be as simple as:
 
145
//
 
146
//     localhost
 
147
//
 
148
// Or more involved like:
 
149
//
 
150
//     mongodb://myuser:mypass@localhost:40001,otherhost:40001/mydb
 
151
//
 
152
// If the port number is not provided for a server, it defaults to 27017.
 
153
//
 
154
// The username and password provided in the URL will be used to authenticate
 
155
// into the database named after the slash at the end of the host names, or
 
156
// into the "admin" database if none is provided.  The authentication information
 
157
// will persist in sessions obtained through the New method as well.
 
158
//
 
159
// The following connection options are supported after the question mark:
 
160
//
 
161
//     connect=direct
 
162
//
 
163
//         This option will disable the automatic replica set server
 
164
//         discovery logic, and will only use the servers provided.
 
165
//         This enables forcing the communication with a specific
 
166
//         server or set of servers (even if they are slaves).  Note
 
167
//         that to talk to a slave you'll need to relax the consistency
 
168
//         requirements using a Monotonic or Eventual mode via SetMode.
 
169
//
 
170
// Relevant documentation:
 
171
//
 
172
//     http://www.mongodb.org/display/DOCS/Connections
 
173
//
 
174
func Dial(url string) (*Session, error) {
 
175
        session, err := DialWithTimeout(url, 10*time.Second)
 
176
        if err == nil {
 
177
                session.SetSyncTimeout(time.Minute)
 
178
        }
 
179
        return session, err
 
180
}
 
181
 
 
182
// DialWithTimeout works like Dial, but uses timeout as the amount of time to
 
183
// wait for a server to respond when first connecting and also on follow up
 
184
// operations in the session. If timeout is zero, the call may block
 
185
// forever waiting for a connection to be made.
 
186
//
 
187
// See SetSyncTimeout for customizing the timeout for the session.
 
188
func DialWithTimeout(url string, timeout time.Duration) (*Session, error) {
 
189
        uinfo, err := parseURL(url)
 
190
        if err != nil {
 
191
                return nil, err
 
192
        }
 
193
        direct := false
 
194
        for k, v := range uinfo.options {
 
195
                switch k {
 
196
                case "connect":
 
197
                        if v == "direct" {
 
198
                                direct = true
 
199
                                break
 
200
                        }
 
201
                        if v == "replicaSet" {
 
202
                                break
 
203
                        }
 
204
                        fallthrough
 
205
                default:
 
206
                        return nil, errors.New("Unsupported connection URL option: " + k + "=" + v)
 
207
                }
 
208
        }
 
209
        info := DialInfo{
 
210
                Addrs:    uinfo.addrs,
 
211
                Direct:   direct,
 
212
                Timeout:  timeout,
 
213
                Username: uinfo.user,
 
214
                Password: uinfo.pass,
 
215
                Database: uinfo.db,
 
216
        }
 
217
        return DialWithInfo(&info)
 
218
}
 
219
 
 
220
// DialInfo holds options for establishing a session with a MongoDB cluster.
 
221
// To use a URL, see the Dial function.
 
222
type DialInfo struct {
 
223
        // Addrs holds the addresses for the seed servers.
 
224
        Addrs []string
 
225
 
 
226
        // Direct informs whether to establish connections only with the
 
227
        // specified seed servers, or to obtain information for the whole
 
228
        // cluster and establish connections with further servers too.
 
229
        Direct bool
 
230
 
 
231
        // Timeout is the amount of time to wait for a server to respond when
 
232
        // first connecting and on follow up operations in the session. If
 
233
        // timeout is zero, the call may block forever waiting for a connection
 
234
        // to be established.
 
235
        Timeout time.Duration
 
236
 
 
237
        // Database is the database name used during the initial authentication.
 
238
        // If set, the value is also returned as the default result from the
 
239
        // Session.DB method, in place of "test".
 
240
        Database string
 
241
 
 
242
        // Username and Password inform the credentials for the initial
 
243
        // authentication done against Database, if that is set,
 
244
        // or the "admin" database otherwise. See the Session.Login method too.
 
245
        Username string
 
246
        Password string
 
247
 
 
248
        // Dial optionally specifies the dial function for creating connections.
 
249
        // At the moment addr will have type *net.TCPAddr, but other types may
 
250
        // be provided in the future, so check and fail if necessary.
 
251
        Dial func(addr net.Addr) (net.Conn, error)
 
252
}
 
253
 
 
254
// DialWithInfo establishes a new session to the cluster identified by info.
 
255
func DialWithInfo(info *DialInfo) (*Session, error) {
 
256
        addrs := make([]string, len(info.Addrs))
 
257
        for i, addr := range info.Addrs {
 
258
                p := strings.LastIndexAny(addr, "]:")
 
259
                if p == -1 || addr[p] != ':' {
 
260
                        // XXX This is untested. The test suite doesn't use the standard port.
 
261
                        addr += ":27017"
 
262
                }
 
263
                addrs[i] = addr
 
264
        }
 
265
        cluster := newCluster(addrs, info.Direct, info.Dial)
 
266
        session := newSession(Eventual, cluster, info.Timeout)
 
267
        session.defaultdb = info.Database
 
268
        if session.defaultdb == "" {
 
269
                session.defaultdb = "test"
 
270
        }
 
271
        if info.Username != "" {
 
272
                db := info.Database
 
273
                if db == "" {
 
274
                        db = "admin"
 
275
                }
 
276
                session.dialAuth = &authInfo{db, info.Username, info.Password}
 
277
                session.auth = []authInfo{*session.dialAuth}
 
278
        }
 
279
        cluster.Release()
 
280
 
 
281
        // People get confused when we return a session that is not actually
 
282
        // established to any servers yet (e.g. what if url was wrong). So,
 
283
        // ping the server to ensure there's someone there, and abort if it
 
284
        // fails.
 
285
        if err := session.Ping(); err != nil {
 
286
                session.Close()
 
287
                return nil, err
 
288
        }
 
289
        session.SetMode(Strong, true)
 
290
        return session, nil
 
291
}
 
292
 
 
293
func isOptSep(c rune) bool {
 
294
        return c == ';' || c == '&'
 
295
}
 
296
 
 
297
type urlInfo struct {
 
298
        addrs   []string
 
299
        user    string
 
300
        pass    string
 
301
        db      string
 
302
        options map[string]string
 
303
}
 
304
 
 
305
func parseURL(url string) (*urlInfo, error) {
 
306
        if strings.HasPrefix(url, "mongodb://") {
 
307
                url = url[10:]
 
308
        }
 
309
        info := &urlInfo{options: make(map[string]string)}
 
310
        if c := strings.Index(url, "?"); c != -1 {
 
311
                for _, pair := range strings.FieldsFunc(url[c+1:], isOptSep) {
 
312
                        l := strings.SplitN(pair, "=", 2)
 
313
                        if len(l) != 2 || l[0] == "" || l[1] == "" {
 
314
                                return nil, errors.New("Connection option must be key=value: " + pair)
 
315
                        }
 
316
                        info.options[l[0]] = l[1]
 
317
                }
 
318
                url = url[:c]
 
319
        }
 
320
        if c := strings.Index(url, "@"); c != -1 {
 
321
                pair := strings.SplitN(url[:c], ":", 2)
 
322
                if len(pair) != 2 || pair[0] == "" {
 
323
                        return nil, errors.New("Credentials must be provided as user:pass@host")
 
324
                }
 
325
                info.user = pair[0]
 
326
                info.pass = pair[1]
 
327
                url = url[c+1:]
 
328
        }
 
329
        if c := strings.Index(url, "/"); c != -1 {
 
330
                info.db = url[c+1:]
 
331
                url = url[:c]
 
332
        }
 
333
        info.addrs = strings.Split(url, ",")
 
334
        return info, nil
 
335
}
 
336
 
 
337
func newSession(consistency mode, cluster *mongoCluster, syncTimeout time.Duration) (session *Session) {
 
338
        cluster.Acquire()
 
339
        session = &Session{cluster_: cluster, syncTimeout: syncTimeout}
 
340
        debugf("New session %p on cluster %p", session, cluster)
 
341
        session.SetMode(consistency, true)
 
342
        session.SetSafe(&Safe{})
 
343
        session.queryConfig.prefetch = defaultPrefetch
 
344
        return session
 
345
}
 
346
 
 
347
func copySession(session *Session, keepAuth bool) (s *Session) {
 
348
        cluster := session.cluster()
 
349
        cluster.Acquire()
 
350
        if session.masterSocket != nil {
 
351
                session.masterSocket.Acquire()
 
352
        }
 
353
        if session.slaveSocket != nil {
 
354
                session.slaveSocket.Acquire()
 
355
        }
 
356
        var auth []authInfo
 
357
        if keepAuth {
 
358
                auth = make([]authInfo, len(session.auth))
 
359
                copy(auth, session.auth)
 
360
        } else if session.dialAuth != nil {
 
361
                auth = []authInfo{*session.dialAuth}
 
362
        }
 
363
        scopy := *session
 
364
        scopy.m = sync.RWMutex{}
 
365
        scopy.auth = auth
 
366
        s = &scopy
 
367
        debugf("New session %p on cluster %p (copy from %p)", s, cluster, session)
 
368
        return s
 
369
}
 
370
 
 
371
// LiveServers returns a list of server addresses which are
 
372
// currently known to be alive.
 
373
func (s *Session) LiveServers() (addrs []string) {
 
374
        s.m.RLock()
 
375
        addrs = s.cluster().LiveServers()
 
376
        s.m.RUnlock()
 
377
        return addrs
 
378
}
 
379
 
 
380
// DB returns a value representing the named database. If name
 
381
// is empty, the database name provided in the dialed URL is
 
382
// used instead. If that is also empty, "test" is used as a
 
383
// fallback in a way equivalent to the mongo shell.
 
384
//
 
385
// Creating this value is a very lightweight operation, and
 
386
// involves no network communication.
 
387
func (s *Session) DB(name string) *Database {
 
388
        if name == "" {
 
389
                name = s.defaultdb
 
390
        }
 
391
        return &Database{s, name}
 
392
}
 
393
 
 
394
// C returns a value representing the named collection.
 
395
//
 
396
// Creating this value is a very lightweight operation, and
 
397
// involves no network communication.
 
398
func (db *Database) C(name string) *Collection {
 
399
        return &Collection{db, name, db.Name + "." + name}
 
400
}
 
401
 
 
402
// With returns a copy of db that uses session s.
 
403
func (db *Database) With(s *Session) *Database {
 
404
        newdb := *db
 
405
        newdb.Session = s
 
406
        return &newdb
 
407
}
 
408
 
 
409
// With returns a copy of c that uses session s.
 
410
func (c *Collection) With(s *Session) *Collection {
 
411
        newdb := *c.Database
 
412
        newdb.Session = s
 
413
        newc := *c
 
414
        newc.Database = &newdb
 
415
        return &newc
 
416
}
 
417
 
 
418
// GridFS returns a GridFS value representing collections in db that
 
419
// follow the standard GridFS specification.
 
420
// The provided prefix (sometimes known as root) will determine which
 
421
// collections to use, and is usually set to "fs" when there is a
 
422
// single GridFS in the database.
 
423
//
 
424
// See the GridFS Create, Open, and OpenId methods for more details.
 
425
//
 
426
// Relevant documentation:
 
427
//
 
428
//     http://www.mongodb.org/display/DOCS/GridFS
 
429
//     http://www.mongodb.org/display/DOCS/GridFS+Tools
 
430
//     http://www.mongodb.org/display/DOCS/GridFS+Specification
 
431
//
 
432
func (db *Database) GridFS(prefix string) *GridFS {
 
433
        return newGridFS(db, prefix)
 
434
}
 
435
 
 
436
// Run issues the provided command against the database and unmarshals
 
437
// its result in the respective argument. The cmd argument may be either
 
438
// a string with the command name itself, in which case an empty document of
 
439
// the form bson.M{cmd: 1} will be used, or it may be a full command document.
 
440
//
 
441
// Note that MongoDB considers the first marshalled key as the command
 
442
// name, so when providing a command with options, it's important to
 
443
// use an ordering-preserving document, such as a struct value or an
 
444
// instance of bson.D.  For instance:
 
445
//
 
446
//     db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
 
447
//
 
448
// For privilleged commands typically run against the "admin" database, see
 
449
// the Run method in the Session type.
 
450
//
 
451
// Relevant documentation:
 
452
//
 
453
//     http://www.mongodb.org/display/DOCS/Commands
 
454
//     http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
 
455
//
 
456
func (db *Database) Run(cmd interface{}, result interface{}) error {
 
457
        if name, ok := cmd.(string); ok {
 
458
                cmd = bson.D{{name, 1}}
 
459
        }
 
460
        return db.C("$cmd").Find(cmd).One(result)
 
461
}
 
462
 
 
463
// Login authenticates against MongoDB with the provided credentials.  The
 
464
// authentication is valid for the whole session and will stay valid until
 
465
// Logout is explicitly called for the same database, or the session is
 
466
// closed.
 
467
//
 
468
// Concurrent Login calls will work correctly.
 
469
func (db *Database) Login(user, pass string) (err error) {
 
470
        session := db.Session
 
471
        dbname := db.Name
 
472
 
 
473
        socket, err := session.acquireSocket(false)
 
474
        if err != nil {
 
475
                return err
 
476
        }
 
477
        defer socket.Release()
 
478
 
 
479
        err = socket.Login(dbname, user, pass)
 
480
        if err != nil {
 
481
                return err
 
482
        }
 
483
 
 
484
        session.m.Lock()
 
485
        defer session.m.Unlock()
 
486
 
 
487
        for _, a := range session.auth {
 
488
                if a.db == dbname {
 
489
                        a.user = user
 
490
                        a.pass = pass
 
491
                        return nil
 
492
                }
 
493
        }
 
494
        session.auth = append(session.auth, authInfo{dbname, user, pass})
 
495
        return nil
 
496
}
 
497
 
 
498
func (s *Session) socketLogin(socket *mongoSocket) error {
 
499
        for _, a := range s.auth {
 
500
                if err := socket.Login(a.db, a.user, a.pass); err != nil {
 
501
                        return err
 
502
                }
 
503
        }
 
504
        return nil
 
505
}
 
506
 
 
507
// Logout removes any established authentication credentials for the database.
 
508
func (db *Database) Logout() {
 
509
        session := db.Session
 
510
        dbname := db.Name
 
511
        session.m.Lock()
 
512
        found := false
 
513
        for i, a := range session.auth {
 
514
                if a.db == dbname {
 
515
                        copy(session.auth[i:], session.auth[i+1:])
 
516
                        session.auth = session.auth[:len(session.auth)-1]
 
517
                        found = true
 
518
                        break
 
519
                }
 
520
        }
 
521
        if found {
 
522
                if session.masterSocket != nil {
 
523
                        session.masterSocket.Logout(dbname)
 
524
                }
 
525
                if session.slaveSocket != nil {
 
526
                        session.slaveSocket.Logout(dbname)
 
527
                }
 
528
        }
 
529
        session.m.Unlock()
 
530
}
 
531
 
 
532
// LogoutAll removes all established authentication credentials for the session.
 
533
func (s *Session) LogoutAll() {
 
534
        s.m.Lock()
 
535
        for _, a := range s.auth {
 
536
                if s.masterSocket != nil {
 
537
                        s.masterSocket.Logout(a.db)
 
538
                }
 
539
                if s.slaveSocket != nil {
 
540
                        s.slaveSocket.Logout(a.db)
 
541
                }
 
542
        }
 
543
        s.auth = s.auth[0:0]
 
544
        s.m.Unlock()
 
545
}
 
546
 
 
547
// User represents a MongoDB user.
 
548
//
 
549
// Relevant documentation:
 
550
//
 
551
//     http://docs.mongodb.org/manual/reference/privilege-documents/
 
552
//     http://docs.mongodb.org/manual/reference/user-privileges/
 
553
//
 
554
type User struct {
 
555
        // Username is how the user identifies itself to the system.
 
556
        Username string `bson:"user"`
 
557
 
 
558
        // Password is the plaintext password for the user. If set,
 
559
        // the UpsertUser method will hash it into PasswordHash and
 
560
        // unset it before the user is added to the database.
 
561
        Password string `bson:",omitempty"`
 
562
 
 
563
        // PasswordHash is the MD5 hash of Username+":mongo:"+Password.
 
564
        PasswordHash string `bson:"pwd,omitempty"`
 
565
 
 
566
        // UserSource indicates where to look for this user's credentials.
 
567
        // It may be set to a database name, or to "$external" for
 
568
        // consulting an external resource such as Kerberos. UserSource
 
569
        // must not be set if Password or PasswordHash are present.
 
570
        UserSource string `bson:"userSource,omitempty"`
 
571
 
 
572
        // Roles indicates the set of roles the user will be provided.
 
573
        // See the Role constants.
 
574
        Roles []Role `bson:"roles"`
 
575
 
 
576
        // OtherDBRoles allows assigning roles in other databases from
 
577
        // user documents inserted in the admin database. This field
 
578
        // only works in the admin database.
 
579
        OtherDBRoles map[string][]Role `bson:"otherDBRoles,omitempty"`
 
580
}
 
581
 
 
582
type Role string
 
583
 
 
584
const (
 
585
        // Relevant documentation:
 
586
        //
 
587
        //     http://docs.mongodb.org/manual/reference/user-privileges/
 
588
        //
 
589
        RoleRead         Role = "read"
 
590
        RoleReadAny      Role = "readAnyDatabase"
 
591
        RoleReadWrite    Role = "readWrite"
 
592
        RoleReadWriteAny Role = "readWriteAnyDatabase"
 
593
        RoleDBAdmin      Role = "dbAdmin"
 
594
        RoleDBAdminAny   Role = "dbAdminAnyDatabase"
 
595
        RoleUserAdmin    Role = "userAdmin"
 
596
        RoleUserAdminAny Role = "UserAdminAnyDatabase"
 
597
        RoleClusterAdmin Role = "clusterAdmin"
 
598
)
 
599
 
 
600
// UpsertUser updates the authentication credentials and the roles for
 
601
// a MongoDB user within the db database. If the named user doesn't exist
 
602
// it will be created.
 
603
//
 
604
// This method should only be used from MongoDB 2.4 and on. For older
 
605
// MongoDB releases, use the obsolete AddUser method instead.
 
606
//
 
607
// Relevant documentation:
 
608
//
 
609
//     http://docs.mongodb.org/manual/reference/user-privileges/
 
610
//     http://docs.mongodb.org/manual/reference/privilege-documents/
 
611
//
 
612
func (db *Database) UpsertUser(user *User) error {
 
613
        if user.Username == "" {
 
614
                return fmt.Errorf("user has no Username")
 
615
        }
 
616
        if user.Password != "" {
 
617
                psum := md5.New()
 
618
                psum.Write([]byte(user.Username + ":mongo:" + user.Password))
 
619
                user.PasswordHash = hex.EncodeToString(psum.Sum(nil))
 
620
                user.Password = ""
 
621
        }
 
622
        if user.PasswordHash != "" && user.UserSource != "" {
 
623
                return fmt.Errorf("user has both Password/PasswordHash and UserSource set")
 
624
        }
 
625
        if len(user.OtherDBRoles) > 0 && db.Name != "admin" {
 
626
                return fmt.Errorf("user with OtherDBRoles is only supported in admin database")
 
627
        }
 
628
        var unset bson.D
 
629
        if user.PasswordHash == "" {
 
630
                unset = append(unset, bson.DocElem{"pwd", 1})
 
631
        }
 
632
        if user.UserSource == "" {
 
633
                unset = append(unset, bson.DocElem{"userSource", 1})
 
634
        }
 
635
        // user.Roles is always sent, as it's the way MongoDB distinguishes
 
636
        // old-style documents from new-style documents.
 
637
        if len(user.OtherDBRoles) == 0 {
 
638
                unset = append(unset, bson.DocElem{"otherDBRoles", 1})
 
639
        }
 
640
        c := db.C("system.users")
 
641
        _, err := c.Upsert(bson.D{{"user", user.Username}}, bson.D{{"$unset", unset}, {"$set", user}})
 
642
        return err
 
643
}
 
644
 
 
645
// AddUser creates or updates the authentication credentials of user within
 
646
// the db database.
 
647
//
 
648
// This method is obsolete and should only be used with MongoDB 2.2 or
 
649
// earlier. For MongoDB 2.4 and on, use UpsertUser instead.
 
650
func (db *Database) AddUser(user, pass string, readOnly bool) error {
 
651
        psum := md5.New()
 
652
        psum.Write([]byte(user + ":mongo:" + pass))
 
653
        digest := hex.EncodeToString(psum.Sum(nil))
 
654
        c := db.C("system.users")
 
655
        _, err := c.Upsert(bson.M{"user": user}, bson.M{"$set": bson.M{"user": user, "pwd": digest, "readOnly": readOnly}})
 
656
        return err
 
657
}
 
658
 
 
659
// RemoveUser removes the authentication credentials of user from the database.
 
660
func (db *Database) RemoveUser(user string) error {
 
661
        c := db.C("system.users")
 
662
        return c.Remove(bson.M{"user": user})
 
663
}
 
664
 
 
665
type indexSpec struct {
 
666
        Name, NS       string
 
667
        Key            bson.D
 
668
        Unique         bool ",omitempty"
 
669
        DropDups       bool "dropDups,omitempty"
 
670
        Background     bool ",omitempty"
 
671
        Sparse         bool ",omitempty"
 
672
        Bits, Min, Max int  ",omitempty"
 
673
        ExpireAfter    int  "expireAfterSeconds,omitempty"
 
674
}
 
675
 
 
676
type Index struct {
 
677
        Key        []string // Index key fields; prefix name with dash (-) for descending order
 
678
        Unique     bool     // Prevent two documents from having the same index key
 
679
        DropDups   bool     // Drop documents with the same index key as a previously indexed one
 
680
        Background bool     // Build index in background and return immediately
 
681
        Sparse     bool     // Only index documents containing the Key fields
 
682
 
 
683
        ExpireAfter time.Duration // Periodically delete docs with indexed time.Time older than that.
 
684
 
 
685
        Name string // Index name, computed by EnsureIndex
 
686
 
 
687
        Bits, Min, Max int // Properties for spatial indexes
 
688
}
 
689
 
 
690
func parseIndexKey(key []string) (name string, realKey bson.D, err error) {
 
691
        var order interface{}
 
692
        for _, field := range key {
 
693
                raw := field
 
694
                if name != "" {
 
695
                        name += "_"
 
696
                }
 
697
                var kind string
 
698
                if field != "" {
 
699
                        if field[0] == '$' {
 
700
                                if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 {
 
701
                                        kind = field[1:c]
 
702
                                        field = field[c+1:]
 
703
                                }
 
704
                        }
 
705
                        switch field[0] {
 
706
                        case '$':
 
707
                                // Logic above failed. Reset and error.
 
708
                                field = ""
 
709
                        case '@':
 
710
                                order = "2d"
 
711
                                field = field[1:]
 
712
                                name += field + "_" // Why don't they put 2d here?
 
713
                        case '-':
 
714
                                order = -1
 
715
                                field = field[1:]
 
716
                                name += field + "_-1"
 
717
                        case '+':
 
718
                                field = field[1:]
 
719
                                fallthrough
 
720
                        default:
 
721
                                if kind == "" {
 
722
                                        order = 1
 
723
                                        name += field + "_1"
 
724
                                } else {
 
725
                                        order = kind
 
726
                                        name += field + "_" // Seems wrong. What about the kind?
 
727
                                }
 
728
                        }
 
729
                }
 
730
                if field == "" || kind != "" && order != kind {
 
731
                        return "", nil, fmt.Errorf(`Invalid index key: want "[$<kind>:][-]<field name>", got %q`, raw)
 
732
                }
 
733
                realKey = append(realKey, bson.DocElem{field, order})
 
734
        }
 
735
        if name == "" {
 
736
                return "", nil, errors.New("Invalid index key: no fields provided")
 
737
        }
 
738
        return
 
739
}
 
740
 
 
741
// EnsureIndexKey ensures an index with the given key exists, creating it
 
742
// if necessary.
 
743
//
 
744
// This example:
 
745
//
 
746
//     err := collection.EnsureIndexKey("a", "b")
 
747
//
 
748
// Is equivalent to:
 
749
//
 
750
//     err := collection.EnsureIndex(mgo.Index{Key: []string{"a", "b"}})
 
751
//
 
752
// See the EnsureIndex method for more details.
 
753
func (c *Collection) EnsureIndexKey(key ...string) error {
 
754
        return c.EnsureIndex(Index{Key: key})
 
755
}
 
756
 
 
757
// EnsureIndex ensures an index with the given key exists, creating it with
 
758
// the provided parameters if necessary.
 
759
//
 
760
// Once EnsureIndex returns successfully, following requests for the same index
 
761
// will not contact the server unless Collection.DropIndex is used to drop the
 
762
// same index, or Session.ResetIndexCache is called.
 
763
//
 
764
// For example:
 
765
//
 
766
//     index := Index{
 
767
//         Key: []string{"lastname", "firstname"},
 
768
//         Unique: true,
 
769
//         DropDups: true,
 
770
//         Background: true, // See notes.
 
771
//         Sparse: true,
 
772
//     }
 
773
//     err := collection.EnsureIndex(index)
 
774
//
 
775
// The Key value determines which fields compose the index. The index ordering
 
776
// will be ascending by default.  To obtain an index with a descending order,
 
777
// the field name should be prefixed by a dash (e.g. []string{"-time"}).
 
778
//
 
779
// If Unique is true, the index must necessarily contain only a single
 
780
// document per Key.  With DropDups set to true, documents with the same key
 
781
// as a previously indexed one will be dropped rather than an error returned.
 
782
//
 
783
// If Background is true, other connections will be allowed to proceed using
 
784
// the collection without the index while it's being built. Note that the
 
785
// session executing EnsureIndex will be blocked for as long as it takes for
 
786
// the index to be built.
 
787
//
 
788
// If Sparse is true, only documents containing the provided Key fields will be
 
789
// included in the index.  When using a sparse index for sorting, only indexed
 
790
// documents will be returned.
 
791
//
 
792
// If ExpireAfter is non-zero, the server will periodically scan the collection
 
793
// and remove documents containing an indexed time.Time field with a value
 
794
// older than ExpireAfter. See the documentation for details:
 
795
//
 
796
//     http://docs.mongodb.org/manual/tutorial/expire-data
 
797
//
 
798
// Other kinds of indexes are also supported through that API. Here is an example:
 
799
//
 
800
//     index := Index{
 
801
//         Key: []string{"$2d:loc"},
 
802
//         Bits: 26,
 
803
//     }
 
804
//     err := collection.EnsureIndex(index)
 
805
//
 
806
// The example above requests the creation of a "2d" index for the "loc" field.
 
807
//
 
808
// The 2D index bounds may be changed using the Min and Max attributes of the
 
809
// Index value.  The default bound setting of (-180, 180) is suitable for
 
810
// latitude/longitude pairs.
 
811
//
 
812
// The Bits parameter sets the precision of the 2D geohash values.  If not
 
813
// provided, 26 bits are used, which is roughly equivalent to 1 foot of
 
814
// precision for the default (-180, 180) index bounds.
 
815
//
 
816
// Relevant documentation:
 
817
//
 
818
//     http://www.mongodb.org/display/DOCS/Indexes
 
819
//     http://www.mongodb.org/display/DOCS/Indexing+Advice+and+FAQ
 
820
//     http://www.mongodb.org/display/DOCS/Indexing+as+a+Background+Operation
 
821
//     http://www.mongodb.org/display/DOCS/Geospatial+Indexing
 
822
//     http://www.mongodb.org/display/DOCS/Multikeys
 
823
//
 
824
func (c *Collection) EnsureIndex(index Index) error {
 
825
        name, realKey, err := parseIndexKey(index.Key)
 
826
        if err != nil {
 
827
                return err
 
828
        }
 
829
 
 
830
        session := c.Database.Session
 
831
        cacheKey := c.FullName + "\x00" + name
 
832
        if session.cluster().HasCachedIndex(cacheKey) {
 
833
                return nil
 
834
        }
 
835
 
 
836
        spec := indexSpec{
 
837
                Name:        name,
 
838
                NS:          c.FullName,
 
839
                Key:         realKey,
 
840
                Unique:      index.Unique,
 
841
                DropDups:    index.DropDups,
 
842
                Background:  index.Background,
 
843
                Sparse:      index.Sparse,
 
844
                Bits:        index.Bits,
 
845
                Min:         index.Min,
 
846
                Max:         index.Max,
 
847
                ExpireAfter: int(index.ExpireAfter / time.Second),
 
848
        }
 
849
 
 
850
        session = session.Clone()
 
851
        defer session.Close()
 
852
        session.SetMode(Strong, false)
 
853
        session.EnsureSafe(&Safe{})
 
854
 
 
855
        db := c.Database.With(session)
 
856
        err = db.C("system.indexes").Insert(&spec)
 
857
        if err == nil {
 
858
                session.cluster().CacheIndex(cacheKey, true)
 
859
        }
 
860
        session.Close()
 
861
        return err
 
862
}
 
863
 
 
864
// DropIndex removes the index with key from the collection.
 
865
//
 
866
// The key value determines which fields compose the index. The index ordering
 
867
// will be ascending by default.  To obtain an index with a descending order,
 
868
// the field name should be prefixed by a dash (e.g. []string{"-time"}).
 
869
//
 
870
// For example:
 
871
//
 
872
//     err := collection.DropIndex("lastname", "firstname")
 
873
//
 
874
// See the EnsureIndex method for more details on indexes.
 
875
func (c *Collection) DropIndex(key ...string) error {
 
876
        name, _, err := parseIndexKey(key)
 
877
        if err != nil {
 
878
                return err
 
879
        }
 
880
 
 
881
        session := c.Database.Session
 
882
        cacheKey := c.FullName + "\x00" + name
 
883
        session.cluster().CacheIndex(cacheKey, false)
 
884
 
 
885
        session = session.Clone()
 
886
        defer session.Close()
 
887
        session.SetMode(Strong, false)
 
888
 
 
889
        db := c.Database.With(session)
 
890
        result := struct {
 
891
                ErrMsg string
 
892
                Ok     bool
 
893
        }{}
 
894
        err = db.Run(bson.D{{"dropIndexes", c.Name}, {"index", name}}, &result)
 
895
        if err != nil {
 
896
                return err
 
897
        }
 
898
        if !result.Ok {
 
899
                return errors.New(result.ErrMsg)
 
900
        }
 
901
        return nil
 
902
}
 
903
 
 
904
// Indexes returns a list of all indexes for the collection.
 
905
//
 
906
// For example, this snippet would drop all available indexes:
 
907
//
 
908
//   indexes, err := collection.Indexes()
 
909
//   if err != nil {
 
910
//       return err
 
911
//   }
 
912
//   for _, index := range indexes {
 
913
//       err = collection.DropIndex(index.Key...)
 
914
//       if err != nil {
 
915
//           return err
 
916
//       }
 
917
//   }
 
918
//
 
919
// See the EnsureIndex method for more details on indexes.
 
920
func (c *Collection) Indexes() (indexes []Index, err error) {
 
921
        query := c.Database.C("system.indexes").Find(bson.M{"ns": c.FullName})
 
922
        iter := query.Sort("name").Iter()
 
923
        for {
 
924
                var spec indexSpec
 
925
                if !iter.Next(&spec) {
 
926
                        break
 
927
                }
 
928
                index := Index{
 
929
                        Name:        spec.Name,
 
930
                        Key:         simpleIndexKey(spec.Key),
 
931
                        Unique:      spec.Unique,
 
932
                        DropDups:    spec.DropDups,
 
933
                        Background:  spec.Background,
 
934
                        Sparse:      spec.Sparse,
 
935
                        ExpireAfter: time.Duration(spec.ExpireAfter) * time.Second,
 
936
                }
 
937
                indexes = append(indexes, index)
 
938
        }
 
939
        err = iter.Close()
 
940
        return
 
941
}
 
942
 
 
943
func simpleIndexKey(realKey bson.D) (key []string) {
 
944
        for i := range realKey {
 
945
                field := realKey[i].Name
 
946
                i, _ := realKey[i].Value.(int)
 
947
                if i == 1 {
 
948
                        key = append(key, field)
 
949
                        continue
 
950
                }
 
951
                if i == -1 {
 
952
                        key = append(key, "-"+field)
 
953
                        continue
 
954
                }
 
955
                if s, ok := realKey[i].Value.(string); ok {
 
956
                        key = append(key, "$"+s+":"+field)
 
957
                        continue
 
958
                }
 
959
                panic("Got unknown index key type for field " + field)
 
960
        }
 
961
        return
 
962
}
 
963
 
 
964
// ResetIndexCache() clears the cache of previously ensured indexes.
 
965
// Following requests to EnsureIndex will contact the server.
 
966
func (s *Session) ResetIndexCache() {
 
967
        s.cluster().ResetIndexCache()
 
968
}
 
969
 
 
970
// New creates a new session with the same parameters as the original
 
971
// session, including consistency, batch size, prefetching, safety mode,
 
972
// etc. The returned session will use sockets from the poll, so there's
 
973
// a chance that writes just performed in another session may not yet
 
974
// be visible.
 
975
//
 
976
// Login information from the original session will not be copied over
 
977
// into the new session unless it was provided through the initial URL
 
978
// for the Dial function.
 
979
//
 
980
// See the Copy and Clone methods.
 
981
//
 
982
func (s *Session) New() *Session {
 
983
        s.m.Lock()
 
984
        scopy := copySession(s, false)
 
985
        s.m.Unlock()
 
986
        scopy.Refresh()
 
987
        return scopy
 
988
}
 
989
 
 
990
// Copy works just like New, but preserves the exact authentication
 
991
// information from the original session.
 
992
func (s *Session) Copy() *Session {
 
993
        s.m.Lock()
 
994
        scopy := copySession(s, true)
 
995
        s.m.Unlock()
 
996
        scopy.Refresh()
 
997
        return scopy
 
998
}
 
999
 
 
1000
// Clone works just like Copy, but also reuses the same socket as the original
 
1001
// session, in case it had already reserved one due to its consistency
 
1002
// guarantees.  This behavior ensures that writes performed in the old session
 
1003
// are necessarily observed when using the new session, as long as it was a
 
1004
// strong or monotonic session.  That said, it also means that long operations
 
1005
// may cause other goroutines using the original session to wait.
 
1006
func (s *Session) Clone() *Session {
 
1007
        s.m.Lock()
 
1008
        scopy := copySession(s, true)
 
1009
        s.m.Unlock()
 
1010
        return scopy
 
1011
}
 
1012
 
 
1013
// Close terminates the session.  It's a runtime error to use a session
 
1014
// after it has been closed.
 
1015
func (s *Session) Close() {
 
1016
        s.m.Lock()
 
1017
        if s.cluster_ != nil {
 
1018
                debugf("Closing session %p", s)
 
1019
                s.unsetSocket()
 
1020
                s.cluster_.Release()
 
1021
                s.cluster_ = nil
 
1022
        }
 
1023
        s.m.Unlock()
 
1024
}
 
1025
 
 
1026
func (s *Session) cluster() *mongoCluster {
 
1027
        if s.cluster_ == nil {
 
1028
                panic("Session already closed")
 
1029
        }
 
1030
        return s.cluster_
 
1031
}
 
1032
 
 
1033
// Refresh puts back any reserved sockets in use and restarts the consistency
 
1034
// guarantees according to the current consistency setting for the session.
 
1035
func (s *Session) Refresh() {
 
1036
        s.m.Lock()
 
1037
        s.slaveOk = s.consistency != Strong
 
1038
        s.unsetSocket()
 
1039
        s.m.Unlock()
 
1040
}
 
1041
 
 
1042
// SetMode changes the consistency mode for the session.
 
1043
//
 
1044
// In the Strong consistency mode reads and writes will always be made to
 
1045
// the master server using a unique connection so that reads and writes are
 
1046
// fully consistent, ordered, and observing the most up-to-date data.
 
1047
// This offers the least benefits in terms of distributing load, but the
 
1048
// most guarantees.  See also Monotonic and Eventual.
 
1049
//
 
1050
// In the Monotonic consistency mode reads may not be entirely up-to-date,
 
1051
// but they will always see the history of changes moving forward, the data
 
1052
// read will be consistent across sequential queries in the same session,
 
1053
// and modifications made within the session will be observed in following
 
1054
// queries (read-your-writes).
 
1055
//
 
1056
// In practice, the Monotonic mode is obtained by performing initial reads
 
1057
// against a unique connection to an arbitrary slave, if one is available,
 
1058
// and once the first write happens, the session connection is switched over
 
1059
// to the master server.  This manages to distribute some of the reading
 
1060
// load with slaves, while maintaining some useful guarantees.
 
1061
//
 
1062
// In the Eventual consistency mode reads will be made to any slave in the
 
1063
// cluster, if one is available, and sequential reads will not necessarily
 
1064
// be made with the same connection.  This means that data may be observed
 
1065
// out of order.  Writes will of course be issued to the master, but
 
1066
// independent writes in the same Eventual session may also be made with
 
1067
// independent connections, so there are also no guarantees in terms of
 
1068
// write ordering (no read-your-writes guarantees either).
 
1069
//
 
1070
// The Eventual mode is the fastest and most resource-friendly, but is
 
1071
// also the one offering the least guarantees about ordering of the data
 
1072
// read and written.
 
1073
//
 
1074
// If refresh is true, in addition to ensuring the session is in the given
 
1075
// consistency mode, the consistency guarantees will also be reset (e.g.
 
1076
// a Monotonic session will be allowed to read from slaves again).  This is
 
1077
// equivalent to calling the Refresh function.
 
1078
//
 
1079
// Shifting between Monotonic and Strong modes will keep a previously
 
1080
// reserved connection for the session unless refresh is true or the
 
1081
// connection is unsuitable (to a slave server in a Strong session).
 
1082
func (s *Session) SetMode(consistency mode, refresh bool) {
 
1083
        s.m.Lock()
 
1084
        debugf("Session %p: setting mode %d with refresh=%v (master=%p, slave=%p)", s, consistency, refresh, s.masterSocket, s.slaveSocket)
 
1085
        s.consistency = consistency
 
1086
        if refresh {
 
1087
                s.slaveOk = s.consistency != Strong
 
1088
                s.unsetSocket()
 
1089
        } else if s.consistency == Strong {
 
1090
                s.slaveOk = false
 
1091
        } else if s.masterSocket == nil {
 
1092
                s.slaveOk = true
 
1093
        }
 
1094
        s.m.Unlock()
 
1095
}
 
1096
 
 
1097
// Mode returns the current consistency mode for the session.
 
1098
func (s *Session) Mode() mode {
 
1099
        s.m.RLock()
 
1100
        mode := s.consistency
 
1101
        s.m.RUnlock()
 
1102
        return mode
 
1103
}
 
1104
 
 
1105
// SetSyncTimeout sets the amount of time an operation with this session
 
1106
// will wait before returning an error in case a connection to a usable
 
1107
// server can't be established. Set it to zero to wait forever. The
 
1108
// default value is 7 seconds.
 
1109
func (s *Session) SetSyncTimeout(d time.Duration) {
 
1110
        s.m.Lock()
 
1111
        s.syncTimeout = d
 
1112
        s.m.Unlock()
 
1113
}
 
1114
 
 
1115
// SetBatch sets the default batch size used when fetching documents from the
 
1116
// database. It's possible to change this setting on a per-query basis as
 
1117
// well, using the Query.Batch method.
 
1118
//
 
1119
// The default batch size is defined by the database itself.  As of this
 
1120
// writing, MongoDB will use an initial size of min(100 docs, 4MB) on the
 
1121
// first batch, and 4MB on remaining ones.
 
1122
func (s *Session) SetBatch(n int) {
 
1123
        if n == 1 {
 
1124
                // Server interprets 1 as -1 and closes the cursor (!?)
 
1125
                n = 2
 
1126
        }
 
1127
        s.m.Lock()
 
1128
        s.queryConfig.op.limit = int32(n)
 
1129
        s.m.Unlock()
 
1130
}
 
1131
 
 
1132
// SetPrefetch sets the default point at which the next batch of results will be
 
1133
// requested.  When there are p*batch_size remaining documents cached in an
 
1134
// Iter, the next batch will be requested in background. For instance, when
 
1135
// using this:
 
1136
//
 
1137
//     session.SetBatch(200)
 
1138
//     session.SetPrefetch(0.25)
 
1139
//
 
1140
// and there are only 50 documents cached in the Iter to be processed, the
 
1141
// next batch of 200 will be requested. It's possible to change this setting on
 
1142
// a per-query basis as well, using the Prefetch method of Query.
 
1143
//
 
1144
// The default prefetch value is 0.25.
 
1145
func (s *Session) SetPrefetch(p float64) {
 
1146
        s.m.Lock()
 
1147
        s.queryConfig.prefetch = p
 
1148
        s.m.Unlock()
 
1149
}
 
1150
 
 
1151
// See SetSafe for details on the Safe type.
 
1152
type Safe struct {
 
1153
        W        int    // Min # of servers to ack before success
 
1154
        WMode    string // Write mode for MongoDB 2.0+ (e.g. "majority")
 
1155
        WTimeout int    // Milliseconds to wait for W before timing out
 
1156
        FSync    bool   // Should servers sync to disk before returning success
 
1157
        J        bool   // Wait for next group commit if journaling; no effect otherwise
 
1158
}
 
1159
 
 
1160
// Safe returns the current safety mode for the session.
 
1161
func (s *Session) Safe() (safe *Safe) {
 
1162
        s.m.Lock()
 
1163
        defer s.m.Unlock()
 
1164
        if s.safeOp != nil {
 
1165
                cmd := s.safeOp.query.(*getLastError)
 
1166
                safe = &Safe{WTimeout: cmd.WTimeout, FSync: cmd.FSync, J: cmd.J}
 
1167
                switch w := cmd.W.(type) {
 
1168
                case string:
 
1169
                        safe.WMode = w
 
1170
                case int:
 
1171
                        safe.W = w
 
1172
                }
 
1173
        }
 
1174
        return
 
1175
}
 
1176
 
 
1177
// SetSafe changes the session safety mode.
 
1178
//
 
1179
// If the safe parameter is nil, the session is put in unsafe mode, and writes
 
1180
// become fire-and-forget, without error checking.  The unsafe mode is faster
 
1181
// since operations won't hold on waiting for a confirmation.
 
1182
//
 
1183
// If the safe parameter is not nil, any changing query (insert, update, ...)
 
1184
// will be followed by a getLastError command with the specified parameters,
 
1185
// to ensure the request was correctly processed.
 
1186
//
 
1187
// The safe.W parameter determines how many servers should confirm a write
 
1188
// before the operation is considered successful.  If set to 0 or 1, the
 
1189
// command will return as soon as the master is done with the request.
 
1190
// If safe.WTimeout is greater than zero, it determines how many milliseconds
 
1191
// to wait for the safe.W servers to respond before returning an error.
 
1192
//
 
1193
// Starting with MongoDB 2.0.0 the safe.WMode parameter can be used instead
 
1194
// of W to request for richer semantics. If set to "majority" the server will
 
1195
// wait for a majority of members from the replica set to respond before
 
1196
// returning. Custom modes may also be defined within the server to create
 
1197
// very detailed placement schemas. See the data awareness documentation in
 
1198
// the links below for more details (note that MongoDB internally reuses the
 
1199
// "w" field name for WMode).
 
1200
//
 
1201
// If safe.FSync is true and journaling is disabled, the servers will be
 
1202
// forced to sync all files to disk immediately before returning. If the
 
1203
// same option is true but journaling is enabled, the server will instead
 
1204
// await for the next group commit before returning.
 
1205
//
 
1206
// Since MongoDB 2.0.0, the safe.J option can also be used instead of FSync
 
1207
// to force the server to wait for a group commit in case journaling is
 
1208
// enabled. The option has no effect if the server has journaling disabled.
 
1209
//
 
1210
// For example, the following statement will make the session check for
 
1211
// errors, without imposing further constraints:
 
1212
//
 
1213
//     session.SetSafe(&mgo.Safe{})
 
1214
//
 
1215
// The following statement will force the server to wait for a majority of
 
1216
// members of a replica set to return (MongoDB 2.0+ only):
 
1217
//
 
1218
//     session.SetSafe(&mgo.Safe{WMode: "majority"})
 
1219
//
 
1220
// The following statement, on the other hand, ensures that at least two
 
1221
// servers have flushed the change to disk before confirming the success
 
1222
// of operations:
 
1223
//
 
1224
//     session.EnsureSafe(&mgo.Safe{W: 2, FSync: true})
 
1225
//
 
1226
// The following statement, on the other hand, disables the verification
 
1227
// of errors entirely:
 
1228
//
 
1229
//     session.SetSafe(nil)
 
1230
//
 
1231
// See also the EnsureSafe method.
 
1232
//
 
1233
// Relevant documentation:
 
1234
//
 
1235
//     http://www.mongodb.org/display/DOCS/getLastError+Command
 
1236
//     http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
 
1237
//     http://www.mongodb.org/display/DOCS/Data+Center+Awareness
 
1238
//
 
1239
func (s *Session) SetSafe(safe *Safe) {
 
1240
        s.m.Lock()
 
1241
        s.safeOp = nil
 
1242
        s.ensureSafe(safe)
 
1243
        s.m.Unlock()
 
1244
}
 
1245
 
 
1246
// EnsureSafe compares the provided safety parameters with the ones
 
1247
// currently in use by the session and picks the most conservative
 
1248
// choice for each setting.
 
1249
//
 
1250
// That is:
 
1251
//
 
1252
//     - safe.WMode is always used if set.
 
1253
//     - safe.W is used if larger than the current W and WMode is empty.
 
1254
//     - safe.FSync is always used if true.
 
1255
//     - safe.J is used if FSync is false.
 
1256
//     - safe.WTimeout is used if set and smaller than the current WTimeout.
 
1257
//
 
1258
// For example, the following statement will ensure the session is
 
1259
// at least checking for errors, without enforcing further constraints.
 
1260
// If a more conservative SetSafe or EnsureSafe call was previously done,
 
1261
// the following call will be ignored.
 
1262
//
 
1263
//     session.EnsureSafe(&mgo.Safe{})
 
1264
//
 
1265
// See also the SetSafe method for details on what each option means.
 
1266
//
 
1267
// Relevant documentation:
 
1268
//
 
1269
//     http://www.mongodb.org/display/DOCS/getLastError+Command
 
1270
//     http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
 
1271
//     http://www.mongodb.org/display/DOCS/Data+Center+Awareness
 
1272
//
 
1273
func (s *Session) EnsureSafe(safe *Safe) {
 
1274
        s.m.Lock()
 
1275
        s.ensureSafe(safe)
 
1276
        s.m.Unlock()
 
1277
}
 
1278
 
 
1279
func (s *Session) ensureSafe(safe *Safe) {
 
1280
        if safe == nil {
 
1281
                return
 
1282
        }
 
1283
 
 
1284
        var w interface{}
 
1285
        if safe.WMode != "" {
 
1286
                w = safe.WMode
 
1287
        } else if safe.W > 0 {
 
1288
                w = safe.W
 
1289
        }
 
1290
 
 
1291
        var cmd getLastError
 
1292
        if s.safeOp == nil {
 
1293
                cmd = getLastError{1, w, safe.WTimeout, safe.FSync, safe.J}
 
1294
        } else {
 
1295
                // Copy.  We don't want to mutate the existing query.
 
1296
                cmd = *(s.safeOp.query.(*getLastError))
 
1297
                if cmd.W == nil {
 
1298
                        cmd.W = w
 
1299
                } else if safe.WMode != "" {
 
1300
                        cmd.W = safe.WMode
 
1301
                } else if i, ok := cmd.W.(int); ok && safe.W > i {
 
1302
                        cmd.W = safe.W
 
1303
                }
 
1304
                if safe.WTimeout > 0 && safe.WTimeout < cmd.WTimeout {
 
1305
                        cmd.WTimeout = safe.WTimeout
 
1306
                }
 
1307
                if safe.FSync {
 
1308
                        cmd.FSync = true
 
1309
                        cmd.J = false
 
1310
                } else if safe.J && !cmd.FSync {
 
1311
                        cmd.J = true
 
1312
                }
 
1313
        }
 
1314
        s.safeOp = &queryOp{
 
1315
                query:      &cmd,
 
1316
                collection: "admin.$cmd",
 
1317
                limit:      -1,
 
1318
        }
 
1319
}
 
1320
 
 
1321
// Run issues the provided command against the "admin" database and
 
1322
// and unmarshals its result in the respective argument. The cmd
 
1323
// argument may be either a string with the command name itself, in
 
1324
// which case an empty document of the form bson.M{cmd: 1} will be used,
 
1325
// or it may be a full command document.
 
1326
//
 
1327
// Note that MongoDB considers the first marshalled key as the command
 
1328
// name, so when providing a command with options, it's important to
 
1329
// use an ordering-preserving document, such as a struct value or an
 
1330
// instance of bson.D.  For instance:
 
1331
//
 
1332
//     db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
 
1333
//
 
1334
// For commands against arbitrary databases, see the Run method in
 
1335
// the Database type.
 
1336
//
 
1337
// Relevant documentation:
 
1338
//
 
1339
//     http://www.mongodb.org/display/DOCS/Commands
 
1340
//     http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
 
1341
//
 
1342
func (s *Session) Run(cmd interface{}, result interface{}) error {
 
1343
        return s.DB("admin").Run(cmd, result)
 
1344
}
 
1345
 
 
1346
// Ping runs a trivial ping command just to get in touch with the server.
 
1347
func (s *Session) Ping() error {
 
1348
        return s.Run("ping", nil)
 
1349
}
 
1350
 
 
1351
// Fsync flushes in-memory writes to disk on the server the session
 
1352
// is established with. If async is true, the call returns immediately,
 
1353
// otherwise it returns after the flush has been made.
 
1354
func (s *Session) Fsync(async bool) error {
 
1355
        return s.Run(bson.D{{"fsync", 1}, {"async", async}}, nil)
 
1356
}
 
1357
 
 
1358
// FsyncLock locks all writes in the specific server the session is
 
1359
// established with and returns. Any writes attempted to the server
 
1360
// after it is successfully locked will block until FsyncUnlock is
 
1361
// called for the same server.
 
1362
//
 
1363
// This method works on slaves as well, preventing the oplog from being
 
1364
// flushed while the server is locked, but since only the server
 
1365
// connected to is locked, for locking specific slaves it may be
 
1366
// necessary to establish a connection directly to the slave (see
 
1367
// Dial's connect=direct option).
 
1368
//
 
1369
// As an important caveat, note that once a write is attempted and
 
1370
// blocks, follow up reads will block as well due to the way the
 
1371
// lock is internally implemented in the server. More details at:
 
1372
//
 
1373
//     https://jira.mongodb.org/browse/SERVER-4243
 
1374
//
 
1375
// FsyncLock is often used for performing consistent backups of
 
1376
// the database files on disk.
 
1377
//
 
1378
// Relevant documentation:
 
1379
//
 
1380
//     http://www.mongodb.org/display/DOCS/fsync+Command
 
1381
//     http://www.mongodb.org/display/DOCS/Backups
 
1382
//
 
1383
func (s *Session) FsyncLock() error {
 
1384
        return s.Run(bson.D{{"fsync", 1}, {"lock", true}}, nil)
 
1385
}
 
1386
 
 
1387
// FsyncUnlock releases the server for writes. See FsyncLock for details.
 
1388
func (s *Session) FsyncUnlock() error {
 
1389
        return s.DB("admin").C("$cmd.sys.unlock").Find(nil).One(nil) // WTF?
 
1390
}
 
1391
 
 
1392
// Find prepares a query using the provided document.  The document may be a
 
1393
// map or a struct value capable of being marshalled with bson.  The map
 
1394
// may be a generic one using interface{} for its key and/or values, such as
 
1395
// bson.M, or it may be a properly typed map.  Providing nil as the document
 
1396
// is equivalent to providing an empty document such as bson.M{}.
 
1397
//
 
1398
// Further details of the query may be tweaked using the resulting Query value,
 
1399
// and then executed to retrieve results using methods such as One, For,
 
1400
// Iter, or Tail.
 
1401
//
 
1402
// In case the resulting document includes a field named $err or errmsg, which
 
1403
// are standard ways for MongoDB to return query errors, the returned err will
 
1404
// be set to a *QueryError value including the Err message and the Code.  In
 
1405
// those cases, the result argument is still unmarshalled into with the
 
1406
// received document so that any other custom values may be obtained if
 
1407
// desired.
 
1408
//
 
1409
// Relevant documentation:
 
1410
//
 
1411
//     http://www.mongodb.org/display/DOCS/Querying
 
1412
//     http://www.mongodb.org/display/DOCS/Advanced+Queries
 
1413
//
 
1414
func (c *Collection) Find(query interface{}) *Query {
 
1415
        session := c.Database.Session
 
1416
        session.m.RLock()
 
1417
        q := &Query{session: session, query: session.queryConfig}
 
1418
        session.m.RUnlock()
 
1419
        q.op.query = query
 
1420
        q.op.collection = c.FullName
 
1421
        return q
 
1422
}
 
1423
 
 
1424
// FindId is a convenience helper equivalent to:
 
1425
//
 
1426
//     query := collection.Find(bson.M{"_id": id})
 
1427
//
 
1428
// See the Find method for more details.
 
1429
func (c *Collection) FindId(id interface{}) *Query {
 
1430
        return c.Find(bson.D{{"_id", id}})
 
1431
}
 
1432
 
 
1433
type Pipe struct {
 
1434
        session    *Session
 
1435
        collection *Collection
 
1436
        pipeline   interface{}
 
1437
}
 
1438
 
 
1439
// Pipe prepares a pipeline to aggregate. The pipeline document
 
1440
// must be a slice built in terms of the aggregation framework language.
 
1441
//
 
1442
// For example:
 
1443
//
 
1444
//     pipe := collection.Pipe([]bson.M{{"$match": bson.M{"name": "Otavio"}}})
 
1445
//     iter := pipe.Iter()
 
1446
//
 
1447
// Relevant documentation:
 
1448
//
 
1449
//     http://docs.mongodb.org/manual/reference/aggregation
 
1450
//     http://docs.mongodb.org/manual/applications/aggregation
 
1451
//     http://docs.mongodb.org/manual/tutorial/aggregation-examples
 
1452
//
 
1453
func (c *Collection) Pipe(pipeline interface{}) *Pipe {
 
1454
        session := c.Database.Session
 
1455
        return &Pipe{
 
1456
                session:    session,
 
1457
                collection: c,
 
1458
                pipeline:   pipeline,
 
1459
        }
 
1460
}
 
1461
 
 
1462
// Iter executes the pipeline and returns an iterator capable of going
 
1463
// over all the generated results.
 
1464
func (p *Pipe) Iter() *Iter {
 
1465
        iter := &Iter{
 
1466
                session: p.session,
 
1467
                timeout: -1,
 
1468
        }
 
1469
        iter.gotReply.L = &iter.m
 
1470
        var result struct{ Result []bson.Raw }
 
1471
        c := p.collection
 
1472
        iter.err = c.Database.Run(bson.D{{"aggregate", c.Name}, {"pipeline", p.pipeline}}, &result)
 
1473
        if iter.err != nil {
 
1474
                return iter
 
1475
        }
 
1476
        for i := range result.Result {
 
1477
                iter.docData.Push(result.Result[i].Data)
 
1478
        }
 
1479
        return iter
 
1480
}
 
1481
 
 
1482
// All works like Iter.All.
 
1483
func (p *Pipe) All(result interface{}) error {
 
1484
        return p.Iter().All(result)
 
1485
}
 
1486
 
 
1487
// One executes the pipeline and unmarshals the first item from the
 
1488
// result set into the result parameter.
 
1489
// It returns ErrNotFound if no items are generated by the pipeline.
 
1490
func (p *Pipe) One(result interface{}) error {
 
1491
        iter := p.Iter()
 
1492
        if iter.Next(result) {
 
1493
                return nil
 
1494
        }
 
1495
        if err := iter.Err(); err != nil {
 
1496
                return err
 
1497
        }
 
1498
        return ErrNotFound
 
1499
}
 
1500
 
 
1501
type LastError struct {
 
1502
        Err             string
 
1503
        Code, N, Waited int
 
1504
        FSyncFiles      int `bson:"fsyncFiles"`
 
1505
        WTimeout        bool
 
1506
        UpdatedExisting bool        `bson:"updatedExisting"`
 
1507
        UpsertedId      interface{} `bson:"upserted"`
 
1508
}
 
1509
 
 
1510
func (err *LastError) Error() string {
 
1511
        return err.Err
 
1512
}
 
1513
 
 
1514
type queryError struct {
 
1515
        Err           string "$err"
 
1516
        ErrMsg        string
 
1517
        Assertion     string
 
1518
        Code          int
 
1519
        AssertionCode int        "assertionCode"
 
1520
        LastError     *LastError "lastErrorObject"
 
1521
}
 
1522
 
 
1523
type QueryError struct {
 
1524
        Code      int
 
1525
        Message   string
 
1526
        Assertion bool
 
1527
}
 
1528
 
 
1529
func (err *QueryError) Error() string {
 
1530
        return err.Message
 
1531
}
 
1532
 
 
1533
// IsDup returns whether err informs of a duplicate key error because
 
1534
// a primary key index or a secondary unique index already has an entry
 
1535
// with the given value.
 
1536
func IsDup(err error) bool {
 
1537
        // Besides being handy, helps with https://jira.mongodb.org/browse/SERVER-7164
 
1538
        // What follows makes me sad. Hopefully conventions will be more clear over time.
 
1539
        switch e := err.(type) {
 
1540
        case *LastError:
 
1541
                return e.Code == 11000 || e.Code == 11001 || e.Code == 12582
 
1542
        case *QueryError:
 
1543
                return e.Code == 11000 || e.Code == 11001 || e.Code == 12582
 
1544
        }
 
1545
        return false
 
1546
}
 
1547
 
 
1548
// Insert inserts one or more documents in the respective collection.  In
 
1549
// case the session is in safe mode (see the SetSafe method) and an error
 
1550
// happens while inserting the provided documents, the returned error will
 
1551
// be of type *LastError.
 
1552
func (c *Collection) Insert(docs ...interface{}) error {
 
1553
        _, err := c.writeQuery(&insertOp{c.FullName, docs})
 
1554
        return err
 
1555
}
 
1556
 
 
1557
// Update finds a single document matching the provided selector document
 
1558
// and modifies it according to the change document.
 
1559
// If the session is in safe mode (see SetSafe) a ErrNotFound error is
 
1560
// returned if a document isn't found, or a value of type *LastError
 
1561
// when some other error is detected.
 
1562
//
 
1563
// Relevant documentation:
 
1564
//
 
1565
//     http://www.mongodb.org/display/DOCS/Updating
 
1566
//     http://www.mongodb.org/display/DOCS/Atomic+Operations
 
1567
//
 
1568
func (c *Collection) Update(selector interface{}, change interface{}) error {
 
1569
        lerr, err := c.writeQuery(&updateOp{c.FullName, selector, change, 0})
 
1570
        if err == nil && lerr != nil && !lerr.UpdatedExisting {
 
1571
                return ErrNotFound
 
1572
        }
 
1573
        return err
 
1574
}
 
1575
 
 
1576
// UpdateId is a convenience helper equivalent to:
 
1577
//
 
1578
//     err := collection.Update(bson.M{"_id": id}, change)
 
1579
//
 
1580
// See the Update method for more details.
 
1581
func (c *Collection) UpdateId(id interface{}, change interface{}) error {
 
1582
        return c.Update(bson.D{{"_id", id}}, change)
 
1583
}
 
1584
 
 
1585
// ChangeInfo holds details about the outcome of a change operation.
 
1586
type ChangeInfo struct {
 
1587
        Updated    int         // Number of existing documents updated
 
1588
        Removed    int         // Number of documents removed
 
1589
        UpsertedId interface{} // Upserted _id field, when not explicitly provided
 
1590
}
 
1591
 
 
1592
// UpdateAll finds all documents matching the provided selector document
 
1593
// and modifies them according to the change document.
 
1594
// If the session is in safe mode (see SetSafe) details of the executed
 
1595
// operation are returned in info or an error of type *LastError when
 
1596
// some problem is detected. It is not an error for the update to not be
 
1597
// applied on any documents because the selector doesn't match.
 
1598
//
 
1599
// Relevant documentation:
 
1600
//
 
1601
//     http://www.mongodb.org/display/DOCS/Updating
 
1602
//     http://www.mongodb.org/display/DOCS/Atomic+Operations
 
1603
//
 
1604
func (c *Collection) UpdateAll(selector interface{}, change interface{}) (info *ChangeInfo, err error) {
 
1605
        lerr, err := c.writeQuery(&updateOp{c.FullName, selector, change, 2})
 
1606
        if err == nil && lerr != nil {
 
1607
                info = &ChangeInfo{Updated: lerr.N}
 
1608
        }
 
1609
        return info, err
 
1610
}
 
1611
 
 
1612
// Upsert finds a single document matching the provided selector document
 
1613
// and modifies it according to the change document.  If no document matching
 
1614
// the selector is found, the change document is applied to the selector
 
1615
// document and the result is inserted in the collection.
 
1616
// If the session is in safe mode (see SetSafe) details of the executed
 
1617
// operation are returned in info, or an error of type *LastError when
 
1618
// some problem is detected.
 
1619
//
 
1620
// Relevant documentation:
 
1621
//
 
1622
//     http://www.mongodb.org/display/DOCS/Updating
 
1623
//     http://www.mongodb.org/display/DOCS/Atomic+Operations
 
1624
//
 
1625
func (c *Collection) Upsert(selector interface{}, change interface{}) (info *ChangeInfo, err error) {
 
1626
        data, err := bson.Marshal(change)
 
1627
        if err != nil {
 
1628
                return nil, err
 
1629
        }
 
1630
        change = bson.Raw{0x03, data}
 
1631
        lerr, err := c.writeQuery(&updateOp{c.FullName, selector, change, 1})
 
1632
        if err == nil && lerr != nil {
 
1633
                info = &ChangeInfo{}
 
1634
                if lerr.UpdatedExisting {
 
1635
                        info.Updated = lerr.N
 
1636
                } else {
 
1637
                        info.UpsertedId = lerr.UpsertedId
 
1638
                }
 
1639
        }
 
1640
        return info, err
 
1641
}
 
1642
 
 
1643
// UpsertId is a convenience helper equivalent to:
 
1644
//
 
1645
//     info, err := collection.Upsert(bson.M{"_id": id}, change)
 
1646
//
 
1647
// See the Upsert method for more details.
 
1648
func (c *Collection) UpsertId(id interface{}, change interface{}) (info *ChangeInfo, err error) {
 
1649
        return c.Upsert(bson.D{{"_id", id}}, change)
 
1650
}
 
1651
 
 
1652
// Remove finds a single document matching the provided selector document
 
1653
// and removes it from the database.
 
1654
// If the session is in safe mode (see SetSafe) a ErrNotFound error is
 
1655
// returned if a document isn't found, or a value of type *LastError
 
1656
// when some other error is detected.
 
1657
//
 
1658
// Relevant documentation:
 
1659
//
 
1660
//     http://www.mongodb.org/display/DOCS/Removing
 
1661
//
 
1662
func (c *Collection) Remove(selector interface{}) error {
 
1663
        lerr, err := c.writeQuery(&deleteOp{c.FullName, selector, 1})
 
1664
        if err == nil && lerr != nil && lerr.N == 0 {
 
1665
                return ErrNotFound
 
1666
        }
 
1667
        return err
 
1668
}
 
1669
 
 
1670
// RemoveId is a convenience helper equivalent to:
 
1671
//
 
1672
//     err := collection.Remove(bson.M{"_id": id})
 
1673
//
 
1674
// See the Remove method for more details.
 
1675
func (c *Collection) RemoveId(id interface{}) error {
 
1676
        return c.Remove(bson.D{{"_id", id}})
 
1677
}
 
1678
 
 
1679
// RemoveAll finds all documents matching the provided selector document
 
1680
// and removes them from the database.  In case the session is in safe mode
 
1681
// (see the SetSafe method) and an error happens when attempting the change,
 
1682
// the returned error will be of type *LastError.
 
1683
//
 
1684
// Relevant documentation:
 
1685
//
 
1686
//     http://www.mongodb.org/display/DOCS/Removing
 
1687
//
 
1688
func (c *Collection) RemoveAll(selector interface{}) (info *ChangeInfo, err error) {
 
1689
        lerr, err := c.writeQuery(&deleteOp{c.FullName, selector, 0})
 
1690
        if err == nil && lerr != nil {
 
1691
                info = &ChangeInfo{Removed: lerr.N}
 
1692
        }
 
1693
        return info, err
 
1694
}
 
1695
 
 
1696
// DropDatabase removes the entire database including all of its collections.
 
1697
func (db *Database) DropDatabase() error {
 
1698
        return db.Run(bson.D{{"dropDatabase", 1}}, nil)
 
1699
}
 
1700
 
 
1701
// DropCollection removes the entire collection including all of its documents.
 
1702
func (c *Collection) DropCollection() error {
 
1703
        return c.Database.Run(bson.D{{"drop", c.Name}}, nil)
 
1704
}
 
1705
 
 
1706
// The CollectionInfo type holds metadata about a collection.
 
1707
//
 
1708
// Relevant documentation:
 
1709
//
 
1710
//     http://www.mongodb.org/display/DOCS/createCollection+Command
 
1711
//     http://www.mongodb.org/display/DOCS/Capped+Collections
 
1712
//
 
1713
type CollectionInfo struct {
 
1714
        // DisableIdIndex prevents the automatic creation of the index
 
1715
        // on the _id field for the collection.
 
1716
        DisableIdIndex bool
 
1717
 
 
1718
        // ForceIdIndex enforces the automatic creation of the index
 
1719
        // on the _id field for the collection. Capped collections,
 
1720
        // for example, do not have such an index by default.
 
1721
        ForceIdIndex bool
 
1722
 
 
1723
        // If Capped is true new documents will replace old ones when
 
1724
        // the collection is full. MaxBytes must necessarily be set
 
1725
        // to define the size when the collection wraps around.
 
1726
        // MaxDocs optionally defines the number of documents when it
 
1727
        // wraps, but MaxBytes still needs to be set.
 
1728
        Capped   bool
 
1729
        MaxBytes int
 
1730
        MaxDocs  int
 
1731
}
 
1732
 
 
1733
// Create explicitly creates the c collection with details of info.
 
1734
// MongoDB creates collections automatically on use, so this method
 
1735
// is only necessary when creating collection with non-default
 
1736
// characteristics, such as capped collections.
 
1737
//
 
1738
// Relevant documentation:
 
1739
//
 
1740
//     http://www.mongodb.org/display/DOCS/createCollection+Command
 
1741
//     http://www.mongodb.org/display/DOCS/Capped+Collections
 
1742
//
 
1743
func (c *Collection) Create(info *CollectionInfo) error {
 
1744
        cmd := make(bson.D, 0, 4)
 
1745
        cmd = append(cmd, bson.DocElem{"create", c.Name})
 
1746
        if info.Capped {
 
1747
                if info.MaxBytes < 1 {
 
1748
                        return fmt.Errorf("Collection.Create: with Capped, MaxBytes must also be set")
 
1749
                }
 
1750
                cmd = append(cmd, bson.DocElem{"capped", true})
 
1751
                cmd = append(cmd, bson.DocElem{"size", info.MaxBytes})
 
1752
                if info.MaxDocs > 0 {
 
1753
                        cmd = append(cmd, bson.DocElem{"max", info.MaxDocs})
 
1754
                }
 
1755
        }
 
1756
        if info.DisableIdIndex {
 
1757
                cmd = append(cmd, bson.DocElem{"autoIndexId", false})
 
1758
        }
 
1759
        if info.ForceIdIndex {
 
1760
                cmd = append(cmd, bson.DocElem{"autoIndexId", true})
 
1761
        }
 
1762
        return c.Database.Run(cmd, nil)
 
1763
}
 
1764
 
 
1765
// Batch sets the batch size used when fetching documents from the database.
 
1766
// It's possible to change this setting on a per-session basis as well, using
 
1767
// the Batch method of Session.
 
1768
//
 
1769
// The default batch size is defined by the database itself.  As of this
 
1770
// writing, MongoDB will use an initial size of min(100 docs, 4MB) on the
 
1771
// first batch, and 4MB on remaining ones.
 
1772
func (q *Query) Batch(n int) *Query {
 
1773
        if n == 1 {
 
1774
                // Server interprets 1 as -1 and closes the cursor (!?)
 
1775
                n = 2
 
1776
        }
 
1777
        q.m.Lock()
 
1778
        q.op.limit = int32(n)
 
1779
        q.m.Unlock()
 
1780
        return q
 
1781
}
 
1782
 
 
1783
// Prefetch sets the point at which the next batch of results will be requested.
 
1784
// When there are p*batch_size remaining documents cached in an Iter, the next
 
1785
// batch will be requested in background. For instance, when using this:
 
1786
//
 
1787
//     query.Batch(200).Prefetch(0.25)
 
1788
//
 
1789
// and there are only 50 documents cached in the Iter to be processed, the
 
1790
// next batch of 200 will be requested. It's possible to change this setting on
 
1791
// a per-session basis as well, using the SetPrefetch method of Session.
 
1792
//
 
1793
// The default prefetch value is 0.25.
 
1794
func (q *Query) Prefetch(p float64) *Query {
 
1795
        q.m.Lock()
 
1796
        q.prefetch = p
 
1797
        q.m.Unlock()
 
1798
        return q
 
1799
}
 
1800
 
 
1801
// Skip skips over the n initial documents from the query results.  Note that
 
1802
// this only makes sense with capped collections where documents are naturally
 
1803
// ordered by insertion time, or with sorted results.
 
1804
func (q *Query) Skip(n int) *Query {
 
1805
        q.m.Lock()
 
1806
        q.op.skip = int32(n)
 
1807
        q.m.Unlock()
 
1808
        return q
 
1809
}
 
1810
 
 
1811
// Limit restricts the maximum number of documents retrieved to n, and also
 
1812
// changes the batch size to the same value.  Once n documents have been
 
1813
// returned by Next, the following call will return ErrNotFound.
 
1814
func (q *Query) Limit(n int) *Query {
 
1815
        q.m.Lock()
 
1816
        switch {
 
1817
        case n == 1:
 
1818
                q.limit = 1
 
1819
                q.op.limit = -1
 
1820
        case n == math.MinInt32: // -MinInt32 == -MinInt32
 
1821
                q.limit = math.MaxInt32
 
1822
                q.op.limit = math.MinInt32 + 1
 
1823
        case n < 0:
 
1824
                q.limit = int32(-n)
 
1825
                q.op.limit = int32(n)
 
1826
        default:
 
1827
                q.limit = int32(n)
 
1828
                q.op.limit = int32(n)
 
1829
        }
 
1830
        q.m.Unlock()
 
1831
        return q
 
1832
}
 
1833
 
 
1834
// Select enables selecting which fields should be retrieved for the results
 
1835
// found. For example, the following query would only retrieve the name field:
 
1836
//
 
1837
//     err := collection.Find(nil).Select(bson.M{"name": 1}).One(&result)
 
1838
//
 
1839
// Relevant documentation:
 
1840
//
 
1841
//     http://www.mongodb.org/display/DOCS/Retrieving+a+Subset+of+Fields
 
1842
//
 
1843
func (q *Query) Select(selector interface{}) *Query {
 
1844
        q.m.Lock()
 
1845
        q.op.selector = selector
 
1846
        q.m.Unlock()
 
1847
        return q
 
1848
}
 
1849
 
 
1850
type queryWrapper struct {
 
1851
        Query    interface{} "$query"
 
1852
        OrderBy  interface{} "$orderby,omitempty"
 
1853
        Hint     interface{} "$hint,omitempty"
 
1854
        Explain  bool        "$explain,omitempty"
 
1855
        Snapshot bool        "$snapshot,omitempty"
 
1856
}
 
1857
 
 
1858
func (q *Query) wrap() *queryWrapper {
 
1859
        w, ok := q.op.query.(*queryWrapper)
 
1860
        if !ok {
 
1861
                if q.op.query == nil {
 
1862
                        var empty bson.D
 
1863
                        w = &queryWrapper{Query: empty}
 
1864
                } else {
 
1865
                        w = &queryWrapper{Query: q.op.query}
 
1866
                }
 
1867
                q.op.query = w
 
1868
        }
 
1869
        return w
 
1870
}
 
1871
 
 
1872
// Sort asks the database to order returned documents according to the
 
1873
// provided field names. A field name may be prefixed by - (minus) for
 
1874
// it to be sorted in reverse order.
 
1875
//
 
1876
// For example:
 
1877
//
 
1878
//     query1 := collection.Find(nil).Sort("firstname", "lastname")
 
1879
//     query2 := collection.Find(nil).Sort("-age")
 
1880
//     query3 := collection.Find(nil).Sort("$natural")
 
1881
//
 
1882
// Relevant documentation:
 
1883
//
 
1884
//     http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
 
1885
//
 
1886
func (q *Query) Sort(fields ...string) *Query {
 
1887
        q.m.Lock()
 
1888
        w := q.wrap()
 
1889
        var order bson.D
 
1890
        for _, field := range fields {
 
1891
                n := 1
 
1892
                if field != "" {
 
1893
                        switch field[0] {
 
1894
                        case '+':
 
1895
                                field = field[1:]
 
1896
                        case '-':
 
1897
                                n = -1
 
1898
                                field = field[1:]
 
1899
                        }
 
1900
                }
 
1901
                if field == "" {
 
1902
                        panic("Sort: empty field name")
 
1903
                }
 
1904
                order = append(order, bson.DocElem{field, n})
 
1905
        }
 
1906
        w.OrderBy = order
 
1907
        q.m.Unlock()
 
1908
        return q
 
1909
}
 
1910
 
 
1911
// Explain returns a number of details about how the MongoDB server would
 
1912
// execute the requested query, such as the number of objects examined,
 
1913
// the number of time the read lock was yielded to allow writes to go in,
 
1914
// and so on.
 
1915
//
 
1916
// For example:
 
1917
//
 
1918
//     m := bson.M{}
 
1919
//     err := collection.Find(bson.M{"filename": name}).Explain(m)
 
1920
//     if err == nil {
 
1921
//         fmt.Printf("Explain: %#v\n", m)
 
1922
//     }
 
1923
//
 
1924
// Relevant documentation:
 
1925
//
 
1926
//     http://www.mongodb.org/display/DOCS/Optimization
 
1927
//     http://www.mongodb.org/display/DOCS/Query+Optimizer
 
1928
//
 
1929
func (q *Query) Explain(result interface{}) error {
 
1930
        q.m.Lock()
 
1931
        clone := &Query{session: q.session, query: q.query}
 
1932
        q.m.Unlock()
 
1933
        w := clone.wrap()
 
1934
        w.Explain = true
 
1935
        if clone.op.limit > 0 {
 
1936
                clone.op.limit = -q.op.limit
 
1937
        }
 
1938
        iter := clone.Iter()
 
1939
        if iter.Next(result) {
 
1940
                return nil
 
1941
        }
 
1942
        return iter.Close()
 
1943
}
 
1944
 
 
1945
// Hint will include an explicit "hint" in the query to force the server
 
1946
// to use a specified index, potentially improving performance in some
 
1947
// situations.  The provided parameters are the fields that compose the
 
1948
// key of the index to be used.  For details on how the indexKey may be
 
1949
// built, see the EnsureIndex method.
 
1950
//
 
1951
// For example:
 
1952
//
 
1953
//     query := collection.Find(bson.M{"firstname": "Joe", "lastname": "Winter"})
 
1954
//     query.Hint("lastname", "firstname")
 
1955
//
 
1956
// Relevant documentation:
 
1957
//
 
1958
//     http://www.mongodb.org/display/DOCS/Optimization
 
1959
//     http://www.mongodb.org/display/DOCS/Query+Optimizer
 
1960
//
 
1961
func (q *Query) Hint(indexKey ...string) *Query {
 
1962
        q.m.Lock()
 
1963
        _, realKey, err := parseIndexKey(indexKey)
 
1964
        w := q.wrap()
 
1965
        w.Hint = realKey
 
1966
        q.m.Unlock()
 
1967
        if err != nil {
 
1968
                panic(err)
 
1969
        }
 
1970
        return q
 
1971
}
 
1972
 
 
1973
// Snapshot will force the performed query to make use of an available
 
1974
// index on the _id field to prevent the same document from being returned
 
1975
// more than once in a single iteration. This might happen without this
 
1976
// setting in situations when the document changes in size and thus has to
 
1977
// be moved while the iteration is running.
 
1978
//
 
1979
// Because snapshot mode traverses the _id index, it may not be used with
 
1980
// sorting or explicit hints. It also cannot use any other index for the
 
1981
// query.
 
1982
//
 
1983
// Even with snapshot mode, items inserted or deleted during the query may
 
1984
// or may not be returned; that is, this mode is not a true point-in-time
 
1985
// snapshot.
 
1986
//
 
1987
// The same effect of Snapshot may be obtained by using any unique index on
 
1988
// field(s) that will not be modified (best to use Hint explicitly too).
 
1989
// A non-unique index (such as creation time) may be made unique by
 
1990
// appending _id to the index when creating it.
 
1991
//
 
1992
// Relevant documentation:
 
1993
//
 
1994
//     http://www.mongodb.org/display/DOCS/How+to+do+Snapshotted+Queries+in+the+Mongo+Database
 
1995
//
 
1996
func (q *Query) Snapshot() *Query {
 
1997
        q.m.Lock()
 
1998
        w := q.wrap()
 
1999
        w.Snapshot = true
 
2000
        q.m.Unlock()
 
2001
        return q
 
2002
}
 
2003
 
 
2004
// LogReplay enables an option that optimizes queries that are typically
 
2005
// made against the MongoDB oplog for replaying it. This is an internal
 
2006
// implementation aspect and most likely uninteresting for other uses.
 
2007
// It has seen at least one use case, though, so it's exposed via the API.
 
2008
func (q *Query) LogReplay() *Query {
 
2009
        q.m.Lock()
 
2010
        q.op.flags |= flagLogReplay
 
2011
        q.m.Unlock()
 
2012
        return q
 
2013
}
 
2014
 
 
2015
func checkQueryError(fullname string, d []byte) error {
 
2016
        l := len(d)
 
2017
        if l < 16 {
 
2018
                return nil
 
2019
        }
 
2020
        if d[5] == '$' && d[6] == 'e' && d[7] == 'r' && d[8] == 'r' && d[9] == '\x00' && d[4] == '\x02' {
 
2021
                goto Error
 
2022
        }
 
2023
        if len(fullname) < 5 || fullname[len(fullname)-5:] != ".$cmd" {
 
2024
                return nil
 
2025
        }
 
2026
        for i := 0; i+8 < l; i++ {
 
2027
                if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' {
 
2028
                        goto Error
 
2029
                }
 
2030
        }
 
2031
        return nil
 
2032
 
 
2033
Error:
 
2034
        result := &queryError{}
 
2035
        bson.Unmarshal(d, result)
 
2036
        logf("queryError: %#v\n", result)
 
2037
        if result.LastError != nil {
 
2038
                return result.LastError
 
2039
        }
 
2040
        if result.Err == "" && result.ErrMsg == "" {
 
2041
                return nil
 
2042
        }
 
2043
        if result.AssertionCode != 0 && result.Assertion != "" {
 
2044
                return &QueryError{Code: result.AssertionCode, Message: result.Assertion, Assertion: true}
 
2045
        }
 
2046
        if result.Err != "" {
 
2047
                return &QueryError{Code: result.Code, Message: result.Err}
 
2048
        }
 
2049
        return &QueryError{Code: result.Code, Message: result.ErrMsg}
 
2050
}
 
2051
 
 
2052
// One executes the query and unmarshals the first obtained document into the
 
2053
// result argument.  The result must be a struct or map value capable of being
 
2054
// unmarshalled into by gobson.  This function blocks until either a result
 
2055
// is available or an error happens.  For example:
 
2056
//
 
2057
//     err := collection.Find(bson.M{"a", 1}).One(&result)
 
2058
//
 
2059
// In case the resulting document includes a field named $err or errmsg, which
 
2060
// are standard ways for MongoDB to return query errors, the returned err will
 
2061
// be set to a *QueryError value including the Err message and the Code.  In
 
2062
// those cases, the result argument is still unmarshalled into with the
 
2063
// received document so that any other custom values may be obtained if
 
2064
// desired.
 
2065
//
 
2066
func (q *Query) One(result interface{}) (err error) {
 
2067
        q.m.Lock()
 
2068
        session := q.session
 
2069
        op := q.op // Copy.
 
2070
        q.m.Unlock()
 
2071
 
 
2072
        socket, err := session.acquireSocket(true)
 
2073
        if err != nil {
 
2074
                return err
 
2075
        }
 
2076
        defer socket.Release()
 
2077
 
 
2078
        op.flags |= session.slaveOkFlag()
 
2079
        op.limit = -1
 
2080
 
 
2081
        data, err := socket.SimpleQuery(&op)
 
2082
        if err != nil {
 
2083
                return err
 
2084
        }
 
2085
        if data == nil {
 
2086
                return ErrNotFound
 
2087
        }
 
2088
        if result != nil {
 
2089
                err = bson.Unmarshal(data, result)
 
2090
                if err == nil {
 
2091
                        debugf("Query %p document unmarshaled: %#v", q, result)
 
2092
                } else {
 
2093
                        debugf("Query %p document unmarshaling failed: %#v", q, err)
 
2094
                        return err
 
2095
                }
 
2096
        }
 
2097
        return checkQueryError(op.collection, data)
 
2098
}
 
2099
 
 
2100
// The DBRef type implements support for the database reference MongoDB
 
2101
// convention as supported by multiple drivers.  This convention enables
 
2102
// cross-referencing documents between collections and databases using
 
2103
// a structure which includes a collection name, a document id, and
 
2104
// optionally a database name.
 
2105
//
 
2106
// See the FindRef methods on Session and on Database.
 
2107
//
 
2108
// Relevant documentation:
 
2109
//
 
2110
//     http://www.mongodb.org/display/DOCS/Database+References
 
2111
//
 
2112
type DBRef struct {
 
2113
        Collection string      `bson:"$ref"`
 
2114
        Id         interface{} `bson:"$id"`
 
2115
        Database   string      `bson:"$db,omitempty"`
 
2116
}
 
2117
 
 
2118
// NOTE: Order of fields for DBRef above does matter, per documentation.
 
2119
 
 
2120
// FindRef returns a query that looks for the document in the provided
 
2121
// reference. If the reference includes the DB field, the document will
 
2122
// be retrieved from the respective database.
 
2123
//
 
2124
// See also the DBRef type and the FindRef method on Session.
 
2125
//
 
2126
// Relevant documentation:
 
2127
//
 
2128
//     http://www.mongodb.org/display/DOCS/Database+References
 
2129
//
 
2130
func (db *Database) FindRef(ref *DBRef) *Query {
 
2131
        var c *Collection
 
2132
        if ref.Database == "" {
 
2133
                c = db.C(ref.Collection)
 
2134
        } else {
 
2135
                c = db.Session.DB(ref.Database).C(ref.Collection)
 
2136
        }
 
2137
        return c.FindId(ref.Id)
 
2138
}
 
2139
 
 
2140
// FindRef returns a query that looks for the document in the provided
 
2141
// reference. For a DBRef to be resolved correctly at the session level
 
2142
// it must necessarily have the optional DB field defined.
 
2143
//
 
2144
// See also the DBRef type and the FindRef method on Database.
 
2145
//
 
2146
// Relevant documentation:
 
2147
//
 
2148
//     http://www.mongodb.org/display/DOCS/Database+References
 
2149
//
 
2150
func (s *Session) FindRef(ref *DBRef) *Query {
 
2151
        if ref.Database == "" {
 
2152
                panic(errors.New(fmt.Sprintf("Can't resolve database for %#v", ref)))
 
2153
        }
 
2154
        c := s.DB(ref.Database).C(ref.Collection)
 
2155
        return c.FindId(ref.Id)
 
2156
}
 
2157
 
 
2158
// CollectionNames returns the collection names present in database.
 
2159
func (db *Database) CollectionNames() (names []string, err error) {
 
2160
        c := len(db.Name) + 1
 
2161
        iter := db.C("system.namespaces").Find(nil).Iter()
 
2162
        var result *struct{ Name string }
 
2163
        for iter.Next(&result) {
 
2164
                if strings.Index(result.Name, "$") < 0 || strings.Index(result.Name, ".oplog.$") >= 0 {
 
2165
                        names = append(names, result.Name[c:])
 
2166
                }
 
2167
        }
 
2168
        if err := iter.Close(); err != nil {
 
2169
                return nil, err
 
2170
        }
 
2171
        sort.Strings(names)
 
2172
        return names, nil
 
2173
}
 
2174
 
 
2175
type dbNames struct {
 
2176
        Databases []struct {
 
2177
                Name  string
 
2178
                Empty bool
 
2179
        }
 
2180
}
 
2181
 
 
2182
// DatabaseNames returns the names of non-empty databases present in the cluster.
 
2183
func (s *Session) DatabaseNames() (names []string, err error) {
 
2184
        var result dbNames
 
2185
        err = s.Run("listDatabases", &result)
 
2186
        if err != nil {
 
2187
                return nil, err
 
2188
        }
 
2189
        for _, db := range result.Databases {
 
2190
                if !db.Empty {
 
2191
                        names = append(names, db.Name)
 
2192
                }
 
2193
        }
 
2194
        sort.Strings(names)
 
2195
        return names, nil
 
2196
}
 
2197
 
 
2198
// Iter executes the query and returns an iterator capable of going over all
 
2199
// the results. Results will be returned in batches of configurable
 
2200
// size (see the Batch method) and more documents will be requested when a
 
2201
// configurable number of documents is iterated over (see the Prefetch method).
 
2202
func (q *Query) Iter() *Iter {
 
2203
        q.m.Lock()
 
2204
        session := q.session
 
2205
        op := q.op
 
2206
        prefetch := q.prefetch
 
2207
        limit := q.limit
 
2208
        q.m.Unlock()
 
2209
 
 
2210
        iter := &Iter{
 
2211
                session:  session,
 
2212
                prefetch: prefetch,
 
2213
                limit:    limit,
 
2214
                timeout:  -1,
 
2215
        }
 
2216
        iter.gotReply.L = &iter.m
 
2217
        iter.op.collection = op.collection
 
2218
        iter.op.limit = op.limit
 
2219
        iter.op.replyFunc = iter.replyFunc()
 
2220
        iter.docsToReceive++
 
2221
        op.replyFunc = iter.op.replyFunc
 
2222
        op.flags |= session.slaveOkFlag()
 
2223
 
 
2224
        socket, err := session.acquireSocket(true)
 
2225
        if err != nil {
 
2226
                iter.err = err
 
2227
        } else {
 
2228
                iter.err = socket.Query(&op)
 
2229
                iter.server = socket.Server()
 
2230
                socket.Release()
 
2231
        }
 
2232
        return iter
 
2233
}
 
2234
 
 
2235
// Tail returns a tailable iterator. Unlike a normal iterator, a
 
2236
// tailable iterator may wait for new values to be inserted in the
 
2237
// collection once the end of the current result set is reached,
 
2238
// A tailable iterator may only be used with capped collections.
 
2239
//
 
2240
// The timeout parameter indicates how long Next will block waiting
 
2241
// for a result before timing out.  If set to -1, Next will not
 
2242
// timeout, and will continue waiting for a result for as long as
 
2243
// the cursor is valid and the session is not closed. If set to 0,
 
2244
// Next times out as soon as it reaches the end of the result set.
 
2245
// Otherwise, Next will wait for at least the given number of
 
2246
// seconds for a new document to be available before timing out.
 
2247
//
 
2248
// On timeouts, Next will unblock and return false, and the Timeout
 
2249
// method will return true if called. In these cases, Next may still
 
2250
// be called again on the same iterator to check if a new value is
 
2251
// available at the current cursor position, and again it will block
 
2252
// according to the specified timeoutSecs. If the cursor becomes
 
2253
// invalid, though, both Next and Timeout will return false and
 
2254
// the query must be restarted.
 
2255
//
 
2256
// The following example demonstrates timeout handling and query
 
2257
// restarting:
 
2258
//
 
2259
//    iter := collection.Find(nil).Sort("$natural").Tail(5 * time.Second)
 
2260
//    for {
 
2261
//         for iter.Next(&result) {
 
2262
//             fmt.Println(result.Id)
 
2263
//             lastId = result.Id
 
2264
//         }
 
2265
//         if err := iter.Close(); err != nil {
 
2266
//             return err
 
2267
//         }
 
2268
//         if iter.Timeout() {
 
2269
//             continue
 
2270
//         }
 
2271
//         query := collection.Find(bson.M{"_id": bson.M{"$gt": lastId}})
 
2272
//         iter = query.Sort("$natural").Tail(5 * time.Second)
 
2273
//    }
 
2274
//
 
2275
// Relevant documentation:
 
2276
//
 
2277
//     http://www.mongodb.org/display/DOCS/Tailable+Cursors
 
2278
//     http://www.mongodb.org/display/DOCS/Capped+Collections
 
2279
//     http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
 
2280
//
 
2281
func (q *Query) Tail(timeout time.Duration) *Iter {
 
2282
        q.m.Lock()
 
2283
        session := q.session
 
2284
        op := q.op
 
2285
        prefetch := q.prefetch
 
2286
        q.m.Unlock()
 
2287
 
 
2288
        iter := &Iter{session: session, prefetch: prefetch}
 
2289
        iter.gotReply.L = &iter.m
 
2290
        iter.timeout = timeout
 
2291
        iter.op.collection = op.collection
 
2292
        iter.op.limit = op.limit
 
2293
        iter.op.replyFunc = iter.replyFunc()
 
2294
        iter.docsToReceive++
 
2295
        op.replyFunc = iter.op.replyFunc
 
2296
        op.flags |= flagTailable | flagAwaitData | session.slaveOkFlag()
 
2297
 
 
2298
        socket, err := session.acquireSocket(true)
 
2299
        if err != nil {
 
2300
                iter.err = err
 
2301
        } else {
 
2302
                iter.err = socket.Query(&op)
 
2303
                iter.server = socket.Server()
 
2304
                socket.Release()
 
2305
        }
 
2306
        return iter
 
2307
}
 
2308
 
 
2309
const (
 
2310
        flagTailable  = 1 << 1
 
2311
        flagSlaveOk   = 1 << 2
 
2312
        flagLogReplay = 1 << 3
 
2313
        flagAwaitData = 1 << 5
 
2314
)
 
2315
 
 
2316
func (s *Session) slaveOkFlag() (flag uint32) {
 
2317
        s.m.RLock()
 
2318
        if s.slaveOk {
 
2319
                flag = flagSlaveOk
 
2320
        }
 
2321
        s.m.RUnlock()
 
2322
        return
 
2323
}
 
2324
 
 
2325
// Err returns nil if no errors happened during iteration, or the actual
 
2326
// error otherwise.
 
2327
//
 
2328
// In case a resulting document included a field named $err or errmsg, which are
 
2329
// standard ways for MongoDB to report an improper query, the returned value has
 
2330
// a *QueryError type, and includes the Err message and the Code.
 
2331
func (iter *Iter) Err() error {
 
2332
        iter.m.Lock()
 
2333
        err := iter.err
 
2334
        iter.m.Unlock()
 
2335
        if err == ErrNotFound {
 
2336
                return nil
 
2337
        }
 
2338
        return err
 
2339
}
 
2340
 
 
2341
// Close kills the server cursor used by the iterator, if any, and returns
 
2342
// nil if no errors happened during iteration, or the actual error otherwise.
 
2343
//
 
2344
// Server cursors are automatically closed at the end of an iteration, which
 
2345
// means close will do nothing unless the iteration was interrupted before
 
2346
// the server finished sending results to the driver. If Close is not called
 
2347
// in such a situation, the cursor will remain available at the server until
 
2348
// the default cursor timeout period is reached. No further problems arise.
 
2349
//
 
2350
// Close is idempotent. That means it can be called repeatedly and will
 
2351
// return the same result every time.
 
2352
//
 
2353
// In case a resulting document included a field named $err or errmsg, which are
 
2354
// standard ways for MongoDB to report an improper query, the returned value has
 
2355
// a *QueryError type.
 
2356
func (iter *Iter) Close() error {
 
2357
        iter.m.Lock()
 
2358
        iter.killCursor()
 
2359
        err := iter.err
 
2360
        iter.m.Unlock()
 
2361
        if err == ErrNotFound {
 
2362
                return nil
 
2363
        }
 
2364
        return err
 
2365
}
 
2366
 
 
2367
func (iter *Iter) killCursor() error {
 
2368
        if iter.op.cursorId != 0 {
 
2369
                socket, err := iter.acquireSocket()
 
2370
                if err == nil {
 
2371
                        // TODO Batch kills.
 
2372
                        err = socket.Query(&killCursorsOp{[]int64{iter.op.cursorId}})
 
2373
                        socket.Release()
 
2374
                }
 
2375
                if err != nil && (iter.err == nil || iter.err == ErrNotFound) {
 
2376
                        iter.err = err
 
2377
                }
 
2378
                iter.op.cursorId = 0
 
2379
                return err
 
2380
        }
 
2381
        return nil
 
2382
}
 
2383
 
 
2384
// Timeout returns true if Next returned false due to a timeout of
 
2385
// a tailable cursor. In those cases, Next may be called again to continue
 
2386
// the iteration at the previous cursor position.
 
2387
func (iter *Iter) Timeout() bool {
 
2388
        iter.m.Lock()
 
2389
        result := iter.timedout
 
2390
        iter.m.Unlock()
 
2391
        return result
 
2392
}
 
2393
 
 
2394
// Next retrieves the next document from the result set, blocking if necessary.
 
2395
// This method will also automatically retrieve another batch of documents from
 
2396
// the server when the current one is exhausted, or before that in background
 
2397
// if pre-fetching is enabled (see the Query.Prefetch and Session.SetPrefetch
 
2398
// methods).
 
2399
//
 
2400
// Next returns true if a document was successfully unmarshalled onto result,
 
2401
// and false at the end of the result set or if an error happened.
 
2402
// When Next returns false, the Err method should be called to verify if
 
2403
// there was an error during iteration.
 
2404
//
 
2405
// For example:
 
2406
//
 
2407
//    iter := collection.Find(nil).Iter()
 
2408
//    for iter.Next(&result) {
 
2409
//        fmt.Printf("Result: %v\n", result.Id)
 
2410
//    }
 
2411
//    if err := iter.Close(); err != nil {
 
2412
//        return err
 
2413
//    }
 
2414
//
 
2415
func (iter *Iter) Next(result interface{}) bool {
 
2416
        iter.m.Lock()
 
2417
        iter.timedout = false
 
2418
        timeout := time.Time{}
 
2419
        for iter.err == nil && iter.docData.Len() == 0 && (iter.docsToReceive > 0 || iter.op.cursorId != 0) {
 
2420
                if iter.docsToReceive == 0 {
 
2421
                        if iter.timeout >= 0 {
 
2422
                                if timeout.IsZero() {
 
2423
                                        timeout = time.Now().Add(iter.timeout)
 
2424
                                }
 
2425
                                if time.Now().After(timeout) {
 
2426
                                        iter.timedout = true
 
2427
                                        iter.m.Unlock()
 
2428
                                        return false
 
2429
                                }
 
2430
                        }
 
2431
                        iter.getMore()
 
2432
                }
 
2433
                iter.gotReply.Wait()
 
2434
        }
 
2435
 
 
2436
        // Exhaust available data before reporting any errors.
 
2437
        if docData, ok := iter.docData.Pop().([]byte); ok {
 
2438
                if iter.limit > 0 {
 
2439
                        iter.limit--
 
2440
                        if iter.limit == 0 {
 
2441
                                if iter.docData.Len() > 0 {
 
2442
                                        panic(fmt.Errorf("data remains after limit exhausted: %d", iter.docData.Len()))
 
2443
                                }
 
2444
                                iter.err = ErrNotFound
 
2445
                                if iter.killCursor() != nil {
 
2446
                                        return false
 
2447
                                }
 
2448
                        }
 
2449
                }
 
2450
                if iter.op.cursorId != 0 && iter.err == nil {
 
2451
                        if iter.docsBeforeMore == 0 {
 
2452
                                iter.getMore()
 
2453
                        }
 
2454
                        iter.docsBeforeMore-- // Goes negative.
 
2455
                }
 
2456
                iter.m.Unlock()
 
2457
                err := bson.Unmarshal(docData, result)
 
2458
                if err != nil {
 
2459
                        debugf("Iter %p document unmarshaling failed: %#v", iter, err)
 
2460
                        iter.err = err
 
2461
                        return false
 
2462
                }
 
2463
                debugf("Iter %p document unmarshaled: %#v", iter, result)
 
2464
                // XXX Only have to check first document for a query error?
 
2465
                err = checkQueryError(iter.op.collection, docData)
 
2466
                if err != nil {
 
2467
                        iter.m.Lock()
 
2468
                        if iter.err == nil {
 
2469
                                iter.err = err
 
2470
                        }
 
2471
                        iter.m.Unlock()
 
2472
                        return false
 
2473
                }
 
2474
                return true
 
2475
        } else if iter.err != nil {
 
2476
                debugf("Iter %p returning false: %s", iter, iter.err)
 
2477
                iter.m.Unlock()
 
2478
                return false
 
2479
        } else if iter.op.cursorId == 0 {
 
2480
                iter.err = ErrNotFound
 
2481
                debugf("Iter %p exhausted with cursor=0", iter)
 
2482
                iter.m.Unlock()
 
2483
                return false
 
2484
        }
 
2485
 
 
2486
        panic("unreachable")
 
2487
}
 
2488
 
 
2489
// All retrieves all documents from the result set into the provided slice
 
2490
// and closes the iterator.
 
2491
//
 
2492
// The result argument must necessarily be the address for a slice. The slice
 
2493
// may be nil or previously allocated.
 
2494
//
 
2495
// WARNING: Obviously, All must not be used with result sets that may be
 
2496
// potentially large, since it may consume all memory until the system
 
2497
// crashes. Consider building the query with a Limit clause to ensure the
 
2498
// result size is bounded.
 
2499
//
 
2500
// For instance:
 
2501
//
 
2502
//    var result []struct{ Value int }
 
2503
//    iter := collection.Find(nil).Limit(100).Iter()
 
2504
//    err := iter.All(&result)
 
2505
//    if err != nil {
 
2506
//        return err
 
2507
//    }
 
2508
//
 
2509
func (iter *Iter) All(result interface{}) error {
 
2510
        resultv := reflect.ValueOf(result)
 
2511
        if resultv.Kind() != reflect.Ptr || resultv.Elem().Kind() != reflect.Slice {
 
2512
                panic("result argument must be a slice address")
 
2513
        }
 
2514
        slicev := resultv.Elem()
 
2515
        slicev = slicev.Slice(0, slicev.Cap())
 
2516
        elemt := slicev.Type().Elem()
 
2517
        i := 0
 
2518
        for {
 
2519
                if slicev.Len() == i {
 
2520
                        elemp := reflect.New(elemt)
 
2521
                        if !iter.Next(elemp.Interface()) {
 
2522
                                break
 
2523
                        }
 
2524
                        slicev = reflect.Append(slicev, elemp.Elem())
 
2525
                        slicev = slicev.Slice(0, slicev.Cap())
 
2526
                } else {
 
2527
                        if !iter.Next(slicev.Index(i).Addr().Interface()) {
 
2528
                                break
 
2529
                        }
 
2530
                }
 
2531
                i++
 
2532
        }
 
2533
        resultv.Elem().Set(slicev.Slice(0, i))
 
2534
        return iter.Close()
 
2535
}
 
2536
 
 
2537
// All works like Iter.All.
 
2538
func (q *Query) All(result interface{}) error {
 
2539
        return q.Iter().All(result)
 
2540
}
 
2541
 
 
2542
// The For method is obsolete and will be removed in a future release.
 
2543
// See Iter as an elegant replacement.
 
2544
func (q *Query) For(result interface{}, f func() error) error {
 
2545
        return q.Iter().For(result, f)
 
2546
}
 
2547
 
 
2548
// The For method is obsolete and will be removed in a future release.
 
2549
// See Iter as an elegant replacement.
 
2550
func (iter *Iter) For(result interface{}, f func() error) (err error) {
 
2551
        valid := false
 
2552
        v := reflect.ValueOf(result)
 
2553
        if v.Kind() == reflect.Ptr {
 
2554
                v = v.Elem()
 
2555
                switch v.Kind() {
 
2556
                case reflect.Map, reflect.Ptr, reflect.Interface, reflect.Slice:
 
2557
                        valid = v.IsNil()
 
2558
                }
 
2559
        }
 
2560
        if !valid {
 
2561
                panic("For needs a pointer to nil reference value.  See the documentation.")
 
2562
        }
 
2563
        zero := reflect.Zero(v.Type())
 
2564
        for {
 
2565
                v.Set(zero)
 
2566
                if !iter.Next(result) {
 
2567
                        break
 
2568
                }
 
2569
                err = f()
 
2570
                if err != nil {
 
2571
                        return err
 
2572
                }
 
2573
        }
 
2574
        return iter.Err()
 
2575
}
 
2576
 
 
2577
func (iter *Iter) acquireSocket() (*mongoSocket, error) {
 
2578
        socket, err := iter.session.acquireSocket(true)
 
2579
        if err != nil {
 
2580
                return nil, err
 
2581
        }
 
2582
        if socket.Server() != iter.server {
 
2583
                // Socket server changed during iteration. This may happen
 
2584
                // with Eventual sessions, if a Refresh is done, or if a
 
2585
                // monotonic session gets a write and shifts from secondary
 
2586
                // to primary. Our cursor is in a specific server, though.
 
2587
                socket.Release()
 
2588
                socket, _, err = iter.server.AcquireSocket(0)
 
2589
                if err != nil {
 
2590
                        return nil, err
 
2591
                }
 
2592
                err := iter.session.socketLogin(socket)
 
2593
                if err != nil {
 
2594
                        socket.Release()
 
2595
                        return nil, err
 
2596
                }
 
2597
        }
 
2598
        return socket, nil
 
2599
}
 
2600
 
 
2601
func (iter *Iter) getMore() {
 
2602
        socket, err := iter.acquireSocket()
 
2603
        if err != nil {
 
2604
                iter.err = err
 
2605
                return
 
2606
        }
 
2607
        defer socket.Release()
 
2608
 
 
2609
        debugf("Iter %p requesting more documents", iter)
 
2610
        if iter.limit > 0 {
 
2611
                limit := iter.limit - int32(iter.docsToReceive) - int32(iter.docData.Len())
 
2612
                if limit < iter.op.limit {
 
2613
                        iter.op.limit = limit
 
2614
                }
 
2615
        }
 
2616
        if err := socket.Query(&iter.op); err != nil {
 
2617
                iter.err = err
 
2618
        }
 
2619
        iter.docsToReceive++
 
2620
}
 
2621
 
 
2622
type countCmd struct {
 
2623
        Count string
 
2624
        Query interface{}
 
2625
        Limit int32 ",omitempty"
 
2626
        Skip  int32 ",omitempty"
 
2627
}
 
2628
 
 
2629
// Count returns the total number of documents in the result set.
 
2630
func (q *Query) Count() (n int, err error) {
 
2631
        q.m.Lock()
 
2632
        session := q.session
 
2633
        op := q.op
 
2634
        limit := q.limit
 
2635
        q.m.Unlock()
 
2636
 
 
2637
        c := strings.Index(op.collection, ".")
 
2638
        if c < 0 {
 
2639
                return 0, errors.New("Bad collection name: " + op.collection)
 
2640
        }
 
2641
 
 
2642
        dbname := op.collection[:c]
 
2643
        cname := op.collection[c+1:]
 
2644
 
 
2645
        qdoc := op.query
 
2646
        if wrapper, ok := qdoc.(*queryWrapper); ok {
 
2647
                qdoc = wrapper.Query
 
2648
        }
 
2649
 
 
2650
        result := struct{ N int }{}
 
2651
        err = session.DB(dbname).Run(countCmd{cname, qdoc, limit, op.skip}, &result)
 
2652
        return result.N, err
 
2653
}
 
2654
 
 
2655
// Count returns the total number of documents in the collection.
 
2656
func (c *Collection) Count() (n int, err error) {
 
2657
        return c.Find(nil).Count()
 
2658
}
 
2659
 
 
2660
type distinctCmd struct {
 
2661
        Collection string "distinct"
 
2662
        Key        string
 
2663
        Query      interface{} ",omitempty"
 
2664
}
 
2665
 
 
2666
// Distinct returns a list of distinct values for the given key within
 
2667
// the result set.  The list of distinct values will be unmarshalled
 
2668
// in the "values" key of the provided result parameter.
 
2669
//
 
2670
// For example:
 
2671
//
 
2672
//     var result []int
 
2673
//     err := collection.Find(bson.M{"gender": "F"}).Distinct("age", &result)
 
2674
//
 
2675
// Relevant documentation:
 
2676
//
 
2677
//     http://www.mongodb.org/display/DOCS/Aggregation
 
2678
//
 
2679
func (q *Query) Distinct(key string, result interface{}) error {
 
2680
        q.m.Lock()
 
2681
        session := q.session
 
2682
        op := q.op // Copy.
 
2683
        q.m.Unlock()
 
2684
 
 
2685
        c := strings.Index(op.collection, ".")
 
2686
        if c < 0 {
 
2687
                return errors.New("Bad collection name: " + op.collection)
 
2688
        }
 
2689
 
 
2690
        dbname := op.collection[:c]
 
2691
        cname := op.collection[c+1:]
 
2692
 
 
2693
        qdoc := op.query
 
2694
        if wrapper, ok := qdoc.(*queryWrapper); ok {
 
2695
                qdoc = wrapper.Query
 
2696
        }
 
2697
 
 
2698
        var doc struct{ Values bson.Raw }
 
2699
        err := session.DB(dbname).Run(distinctCmd{cname, key, qdoc}, &doc)
 
2700
        if err != nil {
 
2701
                return err
 
2702
        }
 
2703
        return doc.Values.Unmarshal(result)
 
2704
}
 
2705
 
 
2706
type mapReduceCmd struct {
 
2707
        Collection string "mapreduce"
 
2708
        Map        string ",omitempty"
 
2709
        Reduce     string ",omitempty"
 
2710
        Finalize   string ",omitempty"
 
2711
        Limit      int32  ",omitempty"
 
2712
        Out        interface{}
 
2713
        Query      interface{} ",omitempty"
 
2714
        Sort       interface{} ",omitempty"
 
2715
        Scope      interface{} ",omitempty"
 
2716
        Verbose    bool        ",omitempty"
 
2717
}
 
2718
 
 
2719
type mapReduceResult struct {
 
2720
        Results    bson.Raw
 
2721
        Result     bson.Raw
 
2722
        TimeMillis int64 "timeMillis"
 
2723
        Counts     struct{ Input, Emit, Output int }
 
2724
        Ok         bool
 
2725
        Err        string
 
2726
        Timing     *MapReduceTime
 
2727
}
 
2728
 
 
2729
type MapReduce struct {
 
2730
        Map      string      // Map Javascript function code (required)
 
2731
        Reduce   string      // Reduce Javascript function code (required)
 
2732
        Finalize string      // Finalize Javascript function code (optional)
 
2733
        Out      interface{} // Output collection name or document. If nil, results are inlined into the result parameter.
 
2734
        Scope    interface{} // Optional global scope for Javascript functions
 
2735
        Verbose  bool
 
2736
}
 
2737
 
 
2738
type MapReduceInfo struct {
 
2739
        InputCount  int            // Number of documents mapped
 
2740
        EmitCount   int            // Number of times reduce called emit
 
2741
        OutputCount int            // Number of documents in resulting collection
 
2742
        Database    string         // Output database, if results are not inlined
 
2743
        Collection  string         // Output collection, if results are not inlined
 
2744
        Time        int64          // Time to run the job, in nanoseconds
 
2745
        VerboseTime *MapReduceTime // Only defined if Verbose was true
 
2746
}
 
2747
 
 
2748
type MapReduceTime struct {
 
2749
        Total    int64 // Total time, in nanoseconds
 
2750
        Map      int64 "mapTime"  // Time within map function, in nanoseconds
 
2751
        EmitLoop int64 "emitLoop" // Time within the emit/map loop, in nanoseconds
 
2752
}
 
2753
 
 
2754
// MapReduce executes a map/reduce job for documents covered by the query.
 
2755
// That kind of job is suitable for very flexible bulk aggregation of data
 
2756
// performed at the server side via Javascript functions.
 
2757
//
 
2758
// Results from the job may be returned as a result of the query itself
 
2759
// through the result parameter in case they'll certainly fit in memory
 
2760
// and in a single document.  If there's the possibility that the amount
 
2761
// of data might be too large, results must be stored back in an alternative
 
2762
// collection or even a separate database, by setting the Out field of the
 
2763
// provided MapReduce job.  In that case, provide nil as the result parameter.
 
2764
//
 
2765
// These are some of the ways to set Out:
 
2766
//
 
2767
//     nil
 
2768
//         Inline results into the result parameter.
 
2769
//
 
2770
//     bson.M{"replace": "mycollection"}
 
2771
//         The output will be inserted into a collection which replaces any
 
2772
//         existing collection with the same name.
 
2773
//
 
2774
//     bson.M{"merge": "mycollection"}
 
2775
//         This option will merge new data into the old output collection. In
 
2776
//         other words, if the same key exists in both the result set and the
 
2777
//         old collection, the new key will overwrite the old one.
 
2778
//
 
2779
//     bson.M{"reduce": "mycollection"}
 
2780
//         If documents exist for a given key in the result set and in the old
 
2781
//         collection, then a reduce operation (using the specified reduce
 
2782
//         function) will be performed on the two values and the result will be
 
2783
//         written to the output collection. If a finalize function was
 
2784
//         provided, this will be run after the reduce as well.
 
2785
//
 
2786
//     bson.M{...., "db": "mydb"}
 
2787
//         Any of the above options can have the "db" key included for doing
 
2788
//         the respective action in a separate database.
 
2789
//
 
2790
// The following is a trivial example which will count the number of
 
2791
// occurrences of a field named n on each document in a collection, and
 
2792
// will return results inline:
 
2793
//
 
2794
//     job := &mgo.MapReduce{
 
2795
//             Map:      "function() { emit(this.n, 1) }",
 
2796
//             Reduce:   "function(key, values) { return Array.sum(values) }",
 
2797
//     }
 
2798
//     var result []struct { Id int "_id"; Value int }
 
2799
//     _, err := collection.Find(nil).MapReduce(job, &result)
 
2800
//     if err != nil {
 
2801
//         return err
 
2802
//     }
 
2803
//     for _, item := range result {
 
2804
//         fmt.Println(item.Value)
 
2805
//     }
 
2806
//
 
2807
// This function is compatible with MongoDB 1.7.4+.
 
2808
//
 
2809
// Relevant documentation:
 
2810
//
 
2811
//     http://www.mongodb.org/display/DOCS/MapReduce
 
2812
//
 
2813
func (q *Query) MapReduce(job *MapReduce, result interface{}) (info *MapReduceInfo, err error) {
 
2814
        q.m.Lock()
 
2815
        session := q.session
 
2816
        op := q.op // Copy.
 
2817
        limit := q.limit
 
2818
        q.m.Unlock()
 
2819
 
 
2820
        c := strings.Index(op.collection, ".")
 
2821
        if c < 0 {
 
2822
                return nil, errors.New("Bad collection name: " + op.collection)
 
2823
        }
 
2824
 
 
2825
        dbname := op.collection[:c]
 
2826
        cname := op.collection[c+1:]
 
2827
 
 
2828
        qdoc := op.query
 
2829
        var sort interface{}
 
2830
        if wrapper, ok := qdoc.(*queryWrapper); ok {
 
2831
                qdoc = wrapper.Query
 
2832
                sort = wrapper.OrderBy
 
2833
        }
 
2834
 
 
2835
        cmd := mapReduceCmd{
 
2836
                Collection: cname,
 
2837
                Map:        job.Map,
 
2838
                Reduce:     job.Reduce,
 
2839
                Finalize:   job.Finalize,
 
2840
                Out:        job.Out,
 
2841
                Scope:      job.Scope,
 
2842
                Verbose:    job.Verbose,
 
2843
                Query:      qdoc,
 
2844
                Sort:       sort,
 
2845
                Limit:      limit,
 
2846
        }
 
2847
 
 
2848
        if cmd.Out == nil {
 
2849
                cmd.Out = bson.M{"inline": 1}
 
2850
        }
 
2851
 
 
2852
        var doc mapReduceResult
 
2853
        err = session.DB(dbname).Run(&cmd, &doc)
 
2854
        if err != nil {
 
2855
                return nil, err
 
2856
        }
 
2857
        if doc.Err != "" {
 
2858
                return nil, errors.New(doc.Err)
 
2859
        }
 
2860
 
 
2861
        info = &MapReduceInfo{
 
2862
                InputCount:  doc.Counts.Input,
 
2863
                EmitCount:   doc.Counts.Emit,
 
2864
                OutputCount: doc.Counts.Output,
 
2865
                Time:        doc.TimeMillis * 1e6,
 
2866
        }
 
2867
 
 
2868
        if doc.Result.Kind == 0x02 {
 
2869
                err = doc.Result.Unmarshal(&info.Collection)
 
2870
                info.Database = dbname
 
2871
        } else if doc.Result.Kind == 0x03 {
 
2872
                var v struct{ Collection, Db string }
 
2873
                err = doc.Result.Unmarshal(&v)
 
2874
                info.Collection = v.Collection
 
2875
                info.Database = v.Db
 
2876
        }
 
2877
 
 
2878
        if doc.Timing != nil {
 
2879
                info.VerboseTime = doc.Timing
 
2880
                info.VerboseTime.Total *= 1e6
 
2881
                info.VerboseTime.Map *= 1e6
 
2882
                info.VerboseTime.EmitLoop *= 1e6
 
2883
        }
 
2884
 
 
2885
        if err != nil {
 
2886
                return nil, err
 
2887
        }
 
2888
        if result != nil {
 
2889
                return info, doc.Results.Unmarshal(result)
 
2890
        }
 
2891
        return info, nil
 
2892
}
 
2893
 
 
2894
type Change struct {
 
2895
        Update    interface{} // The change document
 
2896
        Upsert    bool        // Whether to insert in case the document isn't found
 
2897
        Remove    bool        // Whether to remove the document found rather than updating
 
2898
        ReturnNew bool        // Should the modified document be returned rather than the old one
 
2899
}
 
2900
 
 
2901
type findModifyCmd struct {
 
2902
        Collection                  string      "findAndModify"
 
2903
        Query, Update, Sort, Fields interface{} ",omitempty"
 
2904
        Upsert, Remove, New         bool        ",omitempty"
 
2905
}
 
2906
 
 
2907
type valueResult struct {
 
2908
        Value     bson.Raw
 
2909
        LastError LastError "lastErrorObject"
 
2910
}
 
2911
 
 
2912
// Apply allows updating, upserting or removing a document matching a query
 
2913
// and atomically returning either the old version (the default) or the new
 
2914
// version of the document (when ReturnNew is true). If no objects are
 
2915
// found Apply returns ErrNotFound.
 
2916
//
 
2917
// The Sort and Select query methods affect the result of Apply.  In case
 
2918
// multiple documents match the query, Sort enables selecting which document to
 
2919
// act upon by ordering it first.  Select enables retrieving only a selection
 
2920
// of fields of the new or old document.
 
2921
//
 
2922
// This simple example increments a counter and prints its new value:
 
2923
//
 
2924
//     change := mgo.Change{
 
2925
//             Update: bson.M{"$inc": bson.M{"n": 1}},
 
2926
//             ReturnNew: true,
 
2927
//     }
 
2928
//     info, err = col.Find(M{"_id": id}).Apply(change, &doc)
 
2929
//     fmt.Println(doc.N)
 
2930
//
 
2931
// Relevant documentation:
 
2932
//
 
2933
//     http://www.mongodb.org/display/DOCS/findAndModify+Command
 
2934
//     http://www.mongodb.org/display/DOCS/Updating
 
2935
//     http://www.mongodb.org/display/DOCS/Atomic+Operations
 
2936
//
 
2937
func (q *Query) Apply(change Change, result interface{}) (info *ChangeInfo, err error) {
 
2938
        q.m.Lock()
 
2939
        session := q.session
 
2940
        op := q.op // Copy.
 
2941
        q.m.Unlock()
 
2942
 
 
2943
        c := strings.Index(op.collection, ".")
 
2944
        if c < 0 {
 
2945
                return nil, errors.New("bad collection name: " + op.collection)
 
2946
        }
 
2947
 
 
2948
        dbname := op.collection[:c]
 
2949
        cname := op.collection[c+1:]
 
2950
 
 
2951
        qdoc := op.query
 
2952
        var sort interface{}
 
2953
        if wrapper, ok := qdoc.(*queryWrapper); ok {
 
2954
                qdoc = wrapper.Query
 
2955
                sort = wrapper.OrderBy
 
2956
        }
 
2957
 
 
2958
        cmd := findModifyCmd{
 
2959
                Collection: cname,
 
2960
                Update:     change.Update,
 
2961
                Upsert:     change.Upsert,
 
2962
                Remove:     change.Remove,
 
2963
                New:        change.ReturnNew,
 
2964
                Query:      qdoc,
 
2965
                Sort:       sort,
 
2966
                Fields:     op.selector,
 
2967
        }
 
2968
 
 
2969
        session = session.Clone()
 
2970
        defer session.Close()
 
2971
        session.SetMode(Strong, false)
 
2972
 
 
2973
        var doc valueResult
 
2974
        err = session.DB(dbname).Run(&cmd, &doc)
 
2975
        if err != nil {
 
2976
                if qerr, ok := err.(*QueryError); ok && qerr.Message == "No matching object found" {
 
2977
                        return nil, ErrNotFound
 
2978
                }
 
2979
                return nil, err
 
2980
        }
 
2981
        if doc.LastError.N == 0 {
 
2982
                return nil, ErrNotFound
 
2983
        }
 
2984
        if doc.Value.Kind != 0x0A {
 
2985
                err = doc.Value.Unmarshal(result)
 
2986
                if err != nil {
 
2987
                        return nil, err
 
2988
                }
 
2989
        }
 
2990
        info = &ChangeInfo{}
 
2991
        lerr := &doc.LastError
 
2992
        if lerr.UpdatedExisting {
 
2993
                info.Updated = lerr.N
 
2994
        } else if change.Remove {
 
2995
                info.Removed = lerr.N
 
2996
        } else if change.Upsert {
 
2997
                info.UpsertedId = lerr.UpsertedId
 
2998
        }
 
2999
        return info, nil
 
3000
}
 
3001
 
 
3002
// The BuildInfo type encapsulates details about the running MongoDB server.
 
3003
//
 
3004
// Note that the VersionArray field was introduced in MongoDB 2.0+, but it is
 
3005
// internally assembled from the Version information for previous versions.
 
3006
// In both cases, VersionArray is guaranteed to have at least 4 entries.
 
3007
type BuildInfo struct {
 
3008
        Version       string
 
3009
        VersionArray  []int  `bson:"versionArray"` // On MongoDB 2.0+; assembled from Version otherwise
 
3010
        GitVersion    string `bson:"gitVersion"`
 
3011
        SysInfo       string `bson:"sysInfo"`
 
3012
        Bits          int
 
3013
        Debug         bool
 
3014
        MaxObjectSize int `bson:"maxBsonObjectSize"`
 
3015
}
 
3016
 
 
3017
// BuildInfo retrieves the version and other details about the
 
3018
// running MongoDB server.
 
3019
func (s *Session) BuildInfo() (info BuildInfo, err error) {
 
3020
        err = s.Run(bson.D{{"buildInfo", "1"}}, &info)
 
3021
        if len(info.VersionArray) == 0 {
 
3022
                for _, a := range strings.Split(info.Version, ".") {
 
3023
                        i, err := strconv.Atoi(a)
 
3024
                        if err != nil {
 
3025
                                break
 
3026
                        }
 
3027
                        info.VersionArray = append(info.VersionArray, i)
 
3028
                }
 
3029
        }
 
3030
        for len(info.VersionArray) < 4 {
 
3031
                info.VersionArray = append(info.VersionArray, 0)
 
3032
        }
 
3033
        return
 
3034
}
 
3035
 
 
3036
// ---------------------------------------------------------------------------
 
3037
// Internal session handling helpers.
 
3038
 
 
3039
func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
 
3040
 
 
3041
        // Read-only lock to check for previously reserved socket.
 
3042
        s.m.RLock()
 
3043
        if s.masterSocket != nil {
 
3044
                socket := s.masterSocket
 
3045
                socket.Acquire()
 
3046
                s.m.RUnlock()
 
3047
                return socket, nil
 
3048
        }
 
3049
        if s.slaveSocket != nil && s.slaveOk && slaveOk {
 
3050
                socket := s.slaveSocket
 
3051
                socket.Acquire()
 
3052
                s.m.RUnlock()
 
3053
                return socket, nil
 
3054
        }
 
3055
        s.m.RUnlock()
 
3056
 
 
3057
        // No go.  We may have to request a new socket and change the session,
 
3058
        // so try again but with an exclusive lock now.
 
3059
        s.m.Lock()
 
3060
        defer s.m.Unlock()
 
3061
 
 
3062
        if s.masterSocket != nil {
 
3063
                s.masterSocket.Acquire()
 
3064
                return s.masterSocket, nil
 
3065
        }
 
3066
        if s.slaveSocket != nil && s.slaveOk && slaveOk {
 
3067
                s.slaveSocket.Acquire()
 
3068
                return s.slaveSocket, nil
 
3069
        }
 
3070
 
 
3071
        // Still not good.  We need a new socket.
 
3072
        sock, err := s.cluster().AcquireSocket(slaveOk && s.slaveOk, s.syncTimeout)
 
3073
        if err != nil {
 
3074
                return nil, err
 
3075
        }
 
3076
 
 
3077
        // Authenticate the new socket.
 
3078
        if err = s.socketLogin(sock); err != nil {
 
3079
                sock.Release()
 
3080
                return nil, err
 
3081
        }
 
3082
 
 
3083
        // Keep track of the new socket, if necessary.
 
3084
        // Note that, as a special case, if the Eventual session was
 
3085
        // not refreshed (s.slaveSocket != nil), it means the developer
 
3086
        // asked to preserve an existing reserved socket, so we'll
 
3087
        // keep a master one around too before a Refresh happens.
 
3088
        if s.consistency != Eventual || s.slaveSocket != nil {
 
3089
                s.setSocket(sock)
 
3090
        }
 
3091
 
 
3092
        // Switch over a Monotonic session to the master.
 
3093
        if !slaveOk && s.consistency == Monotonic {
 
3094
                s.slaveOk = false
 
3095
        }
 
3096
 
 
3097
        return sock, nil
 
3098
}
 
3099
 
 
3100
// setSocket binds socket to this section.
 
3101
func (s *Session) setSocket(socket *mongoSocket) {
 
3102
        if socket.Acquire() {
 
3103
                if s.masterSocket != nil {
 
3104
                        panic("setSocket(master) with existing master socket reserved")
 
3105
                }
 
3106
                s.masterSocket = socket
 
3107
        } else {
 
3108
                if s.slaveSocket != nil {
 
3109
                        panic("setSocket(slave) with existing slave socket reserved")
 
3110
                }
 
3111
                s.slaveSocket = socket
 
3112
        }
 
3113
}
 
3114
 
 
3115
// unsetSocket releases any slave and/or master sockets reserved.
 
3116
func (s *Session) unsetSocket() {
 
3117
        if s.masterSocket != nil {
 
3118
                s.masterSocket.Release()
 
3119
        }
 
3120
        if s.slaveSocket != nil {
 
3121
                s.slaveSocket.Release()
 
3122
        }
 
3123
        s.masterSocket = nil
 
3124
        s.slaveSocket = nil
 
3125
}
 
3126
 
 
3127
func (iter *Iter) replyFunc() replyFunc {
 
3128
        return func(err error, op *replyOp, docNum int, docData []byte) {
 
3129
                iter.m.Lock()
 
3130
                iter.docsToReceive--
 
3131
                if err != nil {
 
3132
                        iter.err = err
 
3133
                        debugf("Iter %p received an error: %s", iter, err.Error())
 
3134
                } else if docNum == -1 {
 
3135
                        debugf("Iter %p received no documents (cursor=%d).", iter, op.cursorId)
 
3136
                        if op != nil && op.cursorId != 0 {
 
3137
                                // It's a tailable cursor.
 
3138
                                iter.op.cursorId = op.cursorId
 
3139
                        } else {
 
3140
                                iter.err = ErrNotFound
 
3141
                        }
 
3142
                } else {
 
3143
                        rdocs := int(op.replyDocs)
 
3144
                        if docNum == 0 {
 
3145
                                iter.docsToReceive += rdocs - 1
 
3146
                                docsToProcess := iter.docData.Len() + rdocs
 
3147
                                if iter.limit == 0 || int32(docsToProcess) < iter.limit {
 
3148
                                        iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs))
 
3149
                                } else {
 
3150
                                        iter.docsBeforeMore = -1
 
3151
                                }
 
3152
                                iter.op.cursorId = op.cursorId
 
3153
                        }
 
3154
                        // XXX Handle errors and flags.
 
3155
                        debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, rdocs, op.cursorId)
 
3156
                        iter.docData.Push(docData)
 
3157
                }
 
3158
                iter.gotReply.Broadcast()
 
3159
                iter.m.Unlock()
 
3160
        }
 
3161
}
 
3162
 
 
3163
// writeQuery runs the given modifying operation, potentially followed up
 
3164
// by a getLastError command in case the session is in safe mode.  The
 
3165
// LastError result is made available in lerr, and if lerr.Err is set it
 
3166
// will also be returned as err.
 
3167
func (c *Collection) writeQuery(op interface{}) (lerr *LastError, err error) {
 
3168
        s := c.Database.Session
 
3169
        socket, err := s.acquireSocket(false)
 
3170
        if err != nil {
 
3171
                return nil, err
 
3172
        }
 
3173
        defer socket.Release()
 
3174
 
 
3175
        s.m.RLock()
 
3176
        safeOp := s.safeOp
 
3177
        s.m.RUnlock()
 
3178
 
 
3179
        if safeOp == nil {
 
3180
                return nil, socket.Query(op)
 
3181
        } else {
 
3182
                var mutex sync.Mutex
 
3183
                var replyData []byte
 
3184
                var replyErr error
 
3185
                mutex.Lock()
 
3186
                query := *safeOp // Copy the data.
 
3187
                query.collection = c.Database.Name + ".$cmd"
 
3188
                query.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) {
 
3189
                        replyData = docData
 
3190
                        replyErr = err
 
3191
                        mutex.Unlock()
 
3192
                }
 
3193
                err = socket.Query(op, &query)
 
3194
                if err != nil {
 
3195
                        return nil, err
 
3196
                }
 
3197
                mutex.Lock() // Wait.
 
3198
                if replyErr != nil {
 
3199
                        return nil, replyErr // XXX TESTME
 
3200
                }
 
3201
                if hasErrMsg(replyData) {
 
3202
                        // Looks like getLastError itself failed.
 
3203
                        err = checkQueryError(query.collection, replyData)
 
3204
                        if err != nil {
 
3205
                                return nil, err
 
3206
                        }
 
3207
                }
 
3208
                result := &LastError{}
 
3209
                bson.Unmarshal(replyData, &result)
 
3210
                debugf("Result from writing query: %#v", result)
 
3211
                if result.Err != "" {
 
3212
                        return result, result
 
3213
                }
 
3214
                return result, nil
 
3215
        }
 
3216
        panic("unreachable")
 
3217
}
 
3218
 
 
3219
func hasErrMsg(d []byte) bool {
 
3220
        l := len(d)
 
3221
        for i := 0; i+8 < l; i++ {
 
3222
                if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' {
 
3223
                        return true
 
3224
                }
 
3225
        }
 
3226
        return false
 
3227
}