~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/gopkg.in/mgo.v2/session.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// mgo - MongoDB driver for Go
 
2
//
 
3
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
 
4
//
 
5
// All rights reserved.
 
6
//
 
7
// Redistribution and use in source and binary forms, with or without
 
8
// modification, are permitted provided that the following conditions are met:
 
9
//
 
10
// 1. Redistributions of source code must retain the above copyright notice, this
 
11
//    list of conditions and the following disclaimer.
 
12
// 2. Redistributions in binary form must reproduce the above copyright notice,
 
13
//    this list of conditions and the following disclaimer in the documentation
 
14
//    and/or other materials provided with the distribution.
 
15
//
 
16
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 
17
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 
18
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 
19
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
 
20
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 
21
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
22
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 
23
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
24
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 
25
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
26
 
 
27
package mgo
 
28
 
 
29
import (
 
30
        "crypto/md5"
 
31
        "encoding/hex"
 
32
        "errors"
 
33
        "fmt"
 
34
        "math"
 
35
        "net"
 
36
        "net/url"
 
37
        "reflect"
 
38
        "sort"
 
39
        "strconv"
 
40
        "strings"
 
41
        "sync"
 
42
        "time"
 
43
 
 
44
        "github.com/juju/loggo"
 
45
        "gopkg.in/mgo.v2/bson"
 
46
)
 
47
 
 
48
type Mode int
 
49
 
 
50
const (
 
51
        // Relevant documentation on read preference modes:
 
52
        //
 
53
        //     http://docs.mongodb.org/manual/reference/read-preference/
 
54
        //
 
55
        Primary            Mode = 2 // Default mode. All operations read from the current replica set primary.
 
56
        PrimaryPreferred   Mode = 3 // Read from the primary if available. Read from the secondary otherwise.
 
57
        Secondary          Mode = 4 // Read from one of the nearest secondary members of the replica set.
 
58
        SecondaryPreferred Mode = 5 // Read from one of the nearest secondaries if available. Read from primary otherwise.
 
59
        Nearest            Mode = 6 // Read from one of the nearest members, irrespective of it being primary or secondary.
 
60
 
 
61
        // Read preference modes are specific to mgo:
 
62
        Eventual  Mode = 0 // Same as Nearest, but may change servers between reads.
 
63
        Monotonic Mode = 1 // Same as SecondaryPreferred before first write. Same as Primary after first write.
 
64
        Strong    Mode = 2 // Same as Primary.
 
65
)
 
66
 
 
67
// mgo.v3: Drop Strong mode, suffix all modes with "Mode".
 
68
 
 
69
// When changing the Session type, check if newSession and copySession
 
70
// need to be updated too.
 
71
 
 
72
// Session represents a communication session with the database.
 
73
//
 
74
// All Session methods are concurrency-safe and may be called from multiple
 
75
// goroutines. In all session modes but Eventual, using the session from
 
76
// multiple goroutines will cause them to share the same underlying socket.
 
77
// See the documentation on Session.SetMode for more details.
 
78
type Session struct {
 
79
        m                sync.RWMutex
 
80
        cluster_         *mongoCluster
 
81
        slaveSocket      *mongoSocket
 
82
        masterSocket     *mongoSocket
 
83
        slaveOk          bool
 
84
        consistency      Mode
 
85
        queryConfig      query
 
86
        safeOp           *queryOp
 
87
        syncTimeout      time.Duration
 
88
        sockTimeout      time.Duration
 
89
        defaultdb        string
 
90
        sourcedb         string
 
91
        dialCred         *Credential
 
92
        creds            []Credential
 
93
        poolLimit        int
 
94
        bypassValidation bool
 
95
}
 
96
 
 
97
type Database struct {
 
98
        Session *Session
 
99
        Name    string
 
100
}
 
101
 
 
102
type Collection struct {
 
103
        Database *Database
 
104
        Name     string // "collection"
 
105
        FullName string // "db.collection"
 
106
}
 
107
 
 
108
type Query struct {
 
109
        m       sync.Mutex
 
110
        session *Session
 
111
        query   // Enables default settings in session.
 
112
}
 
113
 
 
114
type query struct {
 
115
        op       queryOp
 
116
        prefetch float64
 
117
        limit    int32
 
118
}
 
119
 
 
120
type getLastError struct {
 
121
        CmdName  int         "getLastError,omitempty"
 
122
        W        interface{} "w,omitempty"
 
123
        WTimeout int         "wtimeout,omitempty"
 
124
        FSync    bool        "fsync,omitempty"
 
125
        J        bool        "j,omitempty"
 
126
}
 
127
 
 
128
type Iter struct {
 
129
        m              sync.Mutex
 
130
        gotReply       sync.Cond
 
131
        session        *Session
 
132
        server         *mongoServer
 
133
        docData        queue
 
134
        err            error
 
135
        op             getMoreOp
 
136
        prefetch       float64
 
137
        limit          int32
 
138
        docsToReceive  int
 
139
        docsBeforeMore int
 
140
        timeout        time.Duration
 
141
        timedout       bool
 
142
        findCmd        bool
 
143
}
 
144
 
 
145
var (
 
146
        ErrNotFound = errors.New("not found")
 
147
        ErrCursor   = errors.New("invalid cursor")
 
148
 
 
149
        logPatchedOnce sync.Once
 
150
        logger         = loggo.GetLogger("mgo")
 
151
)
 
152
 
 
153
const (
 
154
        defaultPrefetch = 0.25
 
155
 
 
156
        // How many times we will retry an upsert if it produces duplicate
 
157
        // key errors.
 
158
        maxUpsertRetries = 5
 
159
)
 
160
 
 
161
// Dial establishes a new session to the cluster identified by the given seed
 
162
// server(s). The session will enable communication with all of the servers in
 
163
// the cluster, so the seed servers are used only to find out about the cluster
 
164
// topology.
 
165
//
 
166
// Dial will timeout after 10 seconds if a server isn't reached. The returned
 
167
// session will timeout operations after one minute by default if servers
 
168
// aren't available. To customize the timeout, see DialWithTimeout,
 
169
// SetSyncTimeout, and SetSocketTimeout.
 
170
//
 
171
// This method is generally called just once for a given cluster.  Further
 
172
// sessions to the same cluster are then established using the New or Copy
 
173
// methods on the obtained session. This will make them share the underlying
 
174
// cluster, and manage the pool of connections appropriately.
 
175
//
 
176
// Once the session is not useful anymore, Close must be called to release the
 
177
// resources appropriately.
 
178
//
 
179
// The seed servers must be provided in the following format:
 
180
//
 
181
//     [mongodb://][user:pass@]host1[:port1][,host2[:port2],...][/database][?options]
 
182
//
 
183
// For example, it may be as simple as:
 
184
//
 
185
//     localhost
 
186
//
 
187
// Or more involved like:
 
188
//
 
189
//     mongodb://myuser:mypass@localhost:40001,otherhost:40001/mydb
 
190
//
 
191
// If the port number is not provided for a server, it defaults to 27017.
 
192
//
 
193
// The username and password provided in the URL will be used to authenticate
 
194
// into the database named after the slash at the end of the host names, or
 
195
// into the "admin" database if none is provided.  The authentication information
 
196
// will persist in sessions obtained through the New method as well.
 
197
//
 
198
// The following connection options are supported after the question mark:
 
199
//
 
200
//     connect=direct
 
201
//
 
202
//         Disables the automatic replica set server discovery logic, and
 
203
//         forces the use of servers provided only (even if secondaries).
 
204
//         Note that to talk to a secondary the consistency requirements
 
205
//         must be relaxed to Monotonic or Eventual via SetMode.
 
206
//
 
207
//
 
208
//     connect=replicaSet
 
209
//
 
210
//         Discover replica sets automatically. Default connection behavior.
 
211
//
 
212
//
 
213
//     replicaSet=<setname>
 
214
//
 
215
//         If specified will prevent the obtained session from communicating
 
216
//         with any server which is not part of a replica set with the given name.
 
217
//         The default is to communicate with any server specified or discovered
 
218
//         via the servers contacted.
 
219
//
 
220
//
 
221
//     authSource=<db>
 
222
//
 
223
//         Informs the database used to establish credentials and privileges
 
224
//         with a MongoDB server. Defaults to the database name provided via
 
225
//         the URL path, and "admin" if that's unset.
 
226
//
 
227
//
 
228
//     authMechanism=<mechanism>
 
229
//
 
230
//        Defines the protocol for credential negotiation. Defaults to "MONGODB-CR",
 
231
//        which is the default username/password challenge-response mechanism.
 
232
//
 
233
//
 
234
//     gssapiServiceName=<name>
 
235
//
 
236
//        Defines the service name to use when authenticating with the GSSAPI
 
237
//        mechanism. Defaults to "mongodb".
 
238
//
 
239
//
 
240
//     maxPoolSize=<limit>
 
241
//
 
242
//        Defines the per-server socket pool limit. Defaults to 4096.
 
243
//        See Session.SetPoolLimit for details.
 
244
//
 
245
//
 
246
// Relevant documentation:
 
247
//
 
248
//     http://docs.mongodb.org/manual/reference/connection-string/
 
249
//
 
250
func Dial(url string) (*Session, error) {
 
251
        session, err := DialWithTimeout(url, 10*time.Second)
 
252
        if err == nil {
 
253
                session.SetSyncTimeout(1 * time.Minute)
 
254
                session.SetSocketTimeout(1 * time.Minute)
 
255
        }
 
256
        return session, err
 
257
}
 
258
 
 
259
// DialWithTimeout works like Dial, but uses timeout as the amount of time to
 
260
// wait for a server to respond when first connecting and also on follow up
 
261
// operations in the session. If timeout is zero, the call may block
 
262
// forever waiting for a connection to be made.
 
263
//
 
264
// See SetSyncTimeout for customizing the timeout for the session.
 
265
func DialWithTimeout(url string, timeout time.Duration) (*Session, error) {
 
266
        info, err := ParseURL(url)
 
267
        if err != nil {
 
268
                return nil, err
 
269
        }
 
270
        info.Timeout = timeout
 
271
        return DialWithInfo(info)
 
272
}
 
273
 
 
274
// ParseURL parses a MongoDB URL as accepted by the Dial function and returns
 
275
// a value suitable for providing into DialWithInfo.
 
276
//
 
277
// See Dial for more details on the format of url.
 
278
func ParseURL(url string) (*DialInfo, error) {
 
279
        uinfo, err := extractURL(url)
 
280
        if err != nil {
 
281
                return nil, err
 
282
        }
 
283
        direct := false
 
284
        mechanism := ""
 
285
        service := ""
 
286
        source := ""
 
287
        setName := ""
 
288
        poolLimit := 0
 
289
        for k, v := range uinfo.options {
 
290
                switch k {
 
291
                case "authSource":
 
292
                        source = v
 
293
                case "authMechanism":
 
294
                        mechanism = v
 
295
                case "gssapiServiceName":
 
296
                        service = v
 
297
                case "replicaSet":
 
298
                        setName = v
 
299
                case "maxPoolSize":
 
300
                        poolLimit, err = strconv.Atoi(v)
 
301
                        if err != nil {
 
302
                                return nil, errors.New("bad value for maxPoolSize: " + v)
 
303
                        }
 
304
                case "connect":
 
305
                        if v == "direct" {
 
306
                                direct = true
 
307
                                break
 
308
                        }
 
309
                        if v == "replicaSet" {
 
310
                                break
 
311
                        }
 
312
                        fallthrough
 
313
                default:
 
314
                        return nil, errors.New("unsupported connection URL option: " + k + "=" + v)
 
315
                }
 
316
        }
 
317
        info := DialInfo{
 
318
                Addrs:          uinfo.addrs,
 
319
                Direct:         direct,
 
320
                Database:       uinfo.db,
 
321
                Username:       uinfo.user,
 
322
                Password:       uinfo.pass,
 
323
                Mechanism:      mechanism,
 
324
                Service:        service,
 
325
                Source:         source,
 
326
                PoolLimit:      poolLimit,
 
327
                ReplicaSetName: setName,
 
328
        }
 
329
        return &info, nil
 
330
}
 
331
 
 
332
// DialInfo holds options for establishing a session with a MongoDB cluster.
 
333
// To use a URL, see the Dial function.
 
334
type DialInfo struct {
 
335
        // Addrs holds the addresses for the seed servers.
 
336
        Addrs []string
 
337
 
 
338
        // Direct informs whether to establish connections only with the
 
339
        // specified seed servers, or to obtain information for the whole
 
340
        // cluster and establish connections with further servers too.
 
341
        Direct bool
 
342
 
 
343
        // Timeout is the amount of time to wait for a server to respond when
 
344
        // first connecting and on follow up operations in the session. If
 
345
        // timeout is zero, the call may block forever waiting for a connection
 
346
        // to be established. Timeout does not affect logic in DialServer.
 
347
        Timeout time.Duration
 
348
 
 
349
        // FailFast will cause connection and query attempts to fail faster when
 
350
        // the server is unavailable, instead of retrying until the configured
 
351
        // timeout period. Note that an unavailable server may silently drop
 
352
        // packets instead of rejecting them, in which case it's impossible to
 
353
        // distinguish it from a slow server, so the timeout stays relevant.
 
354
        FailFast bool
 
355
 
 
356
        // Database is the default database name used when the Session.DB method
 
357
        // is called with an empty name, and is also used during the initial
 
358
        // authentication if Source is unset.
 
359
        Database string
 
360
 
 
361
        // ReplicaSetName, if specified, will prevent the obtained session from
 
362
        // communicating with any server which is not part of a replica set
 
363
        // with the given name. The default is to communicate with any server
 
364
        // specified or discovered via the servers contacted.
 
365
        ReplicaSetName string
 
366
 
 
367
        // Source is the database used to establish credentials and privileges
 
368
        // with a MongoDB server. Defaults to the value of Database, if that is
 
369
        // set, or "admin" otherwise.
 
370
        Source string
 
371
 
 
372
        // Service defines the service name to use when authenticating with the GSSAPI
 
373
        // mechanism. Defaults to "mongodb".
 
374
        Service string
 
375
 
 
376
        // ServiceHost defines which hostname to use when authenticating
 
377
        // with the GSSAPI mechanism. If not specified, defaults to the MongoDB
 
378
        // server's address.
 
379
        ServiceHost string
 
380
 
 
381
        // Mechanism defines the protocol for credential negotiation.
 
382
        // Defaults to "MONGODB-CR".
 
383
        Mechanism string
 
384
 
 
385
        // Username and Password inform the credentials for the initial authentication
 
386
        // done on the database defined by the Source field. See Session.Login.
 
387
        Username string
 
388
        Password string
 
389
 
 
390
        // PoolLimit defines the per-server socket pool limit. Defaults to 4096.
 
391
        // See Session.SetPoolLimit for details.
 
392
        PoolLimit int
 
393
 
 
394
        // DialServer optionally specifies the dial function for establishing
 
395
        // connections with the MongoDB servers.
 
396
        DialServer func(addr *ServerAddr) (net.Conn, error)
 
397
 
 
398
        // WARNING: This field is obsolete. See DialServer above.
 
399
        Dial func(addr net.Addr) (net.Conn, error)
 
400
}
 
401
 
 
402
// mgo.v3: Drop DialInfo.Dial.
 
403
 
 
404
// ServerAddr represents the address for establishing a connection to an
 
405
// individual MongoDB server.
 
406
type ServerAddr struct {
 
407
        str string
 
408
        tcp *net.TCPAddr
 
409
}
 
410
 
 
411
// String returns the address that was provided for the server before resolution.
 
412
func (addr *ServerAddr) String() string {
 
413
        return addr.str
 
414
}
 
415
 
 
416
// TCPAddr returns the resolved TCP address for the server.
 
417
func (addr *ServerAddr) TCPAddr() *net.TCPAddr {
 
418
        return addr.tcp
 
419
}
 
420
 
 
421
// DialWithInfo establishes a new session to the cluster identified by info.
 
422
func DialWithInfo(info *DialInfo) (*Session, error) {
 
423
        // This is using loggo because that can be done here in a
 
424
        // localised patch, while using mgo's logging would need a change
 
425
        // in Juju to call mgo.SetLogger. It's in this short-lived patch
 
426
        // as a stop-gap because it's proving difficult to tell if the
 
427
        // patch is applied in a running system. If you see it in
 
428
        // committed code then something has gone very awry - please
 
429
        // complain loudly! (babbageclunk)
 
430
        logPatchedOnce.Do(func() {
 
431
                logger.Debugf("duplicate key error patch applied")
 
432
        })
 
433
        addrs := make([]string, len(info.Addrs))
 
434
        for i, addr := range info.Addrs {
 
435
                p := strings.LastIndexAny(addr, "]:")
 
436
                if p == -1 || addr[p] != ':' {
 
437
                        // XXX This is untested. The test suite doesn't use the standard port.
 
438
                        addr += ":27017"
 
439
                }
 
440
                addrs[i] = addr
 
441
        }
 
442
        cluster := newCluster(addrs, info.Direct, info.FailFast, dialer{info.Dial, info.DialServer}, info.ReplicaSetName)
 
443
        session := newSession(Eventual, cluster, info.Timeout)
 
444
        session.defaultdb = info.Database
 
445
        if session.defaultdb == "" {
 
446
                session.defaultdb = "test"
 
447
        }
 
448
        session.sourcedb = info.Source
 
449
        if session.sourcedb == "" {
 
450
                session.sourcedb = info.Database
 
451
                if session.sourcedb == "" {
 
452
                        session.sourcedb = "admin"
 
453
                }
 
454
        }
 
455
        if info.Username != "" {
 
456
                source := session.sourcedb
 
457
                if info.Source == "" &&
 
458
                        (info.Mechanism == "GSSAPI" || info.Mechanism == "PLAIN" || info.Mechanism == "MONGODB-X509") {
 
459
                        source = "$external"
 
460
                }
 
461
                session.dialCred = &Credential{
 
462
                        Username:    info.Username,
 
463
                        Password:    info.Password,
 
464
                        Mechanism:   info.Mechanism,
 
465
                        Service:     info.Service,
 
466
                        ServiceHost: info.ServiceHost,
 
467
                        Source:      source,
 
468
                }
 
469
                session.creds = []Credential{*session.dialCred}
 
470
        }
 
471
        if info.PoolLimit > 0 {
 
472
                session.poolLimit = info.PoolLimit
 
473
        }
 
474
        cluster.Release()
 
475
 
 
476
        // People get confused when we return a session that is not actually
 
477
        // established to any servers yet (e.g. what if url was wrong). So,
 
478
        // ping the server to ensure there's someone there, and abort if it
 
479
        // fails.
 
480
        if err := session.Ping(); err != nil {
 
481
                session.Close()
 
482
                return nil, err
 
483
        }
 
484
        session.SetMode(Strong, true)
 
485
        return session, nil
 
486
}
 
487
 
 
488
func isOptSep(c rune) bool {
 
489
        return c == ';' || c == '&'
 
490
}
 
491
 
 
492
type urlInfo struct {
 
493
        addrs   []string
 
494
        user    string
 
495
        pass    string
 
496
        db      string
 
497
        options map[string]string
 
498
}
 
499
 
 
500
func extractURL(s string) (*urlInfo, error) {
 
501
        if strings.HasPrefix(s, "mongodb://") {
 
502
                s = s[10:]
 
503
        }
 
504
        info := &urlInfo{options: make(map[string]string)}
 
505
        if c := strings.Index(s, "?"); c != -1 {
 
506
                for _, pair := range strings.FieldsFunc(s[c+1:], isOptSep) {
 
507
                        l := strings.SplitN(pair, "=", 2)
 
508
                        if len(l) != 2 || l[0] == "" || l[1] == "" {
 
509
                                return nil, errors.New("connection option must be key=value: " + pair)
 
510
                        }
 
511
                        info.options[l[0]] = l[1]
 
512
                }
 
513
                s = s[:c]
 
514
        }
 
515
        if c := strings.Index(s, "@"); c != -1 {
 
516
                pair := strings.SplitN(s[:c], ":", 2)
 
517
                if len(pair) > 2 || pair[0] == "" {
 
518
                        return nil, errors.New("credentials must be provided as user:pass@host")
 
519
                }
 
520
                var err error
 
521
                info.user, err = url.QueryUnescape(pair[0])
 
522
                if err != nil {
 
523
                        return nil, fmt.Errorf("cannot unescape username in URL: %q", pair[0])
 
524
                }
 
525
                if len(pair) > 1 {
 
526
                        info.pass, err = url.QueryUnescape(pair[1])
 
527
                        if err != nil {
 
528
                                return nil, fmt.Errorf("cannot unescape password in URL")
 
529
                        }
 
530
                }
 
531
                s = s[c+1:]
 
532
        }
 
533
        if c := strings.Index(s, "/"); c != -1 {
 
534
                info.db = s[c+1:]
 
535
                s = s[:c]
 
536
        }
 
537
        info.addrs = strings.Split(s, ",")
 
538
        return info, nil
 
539
}
 
540
 
 
541
func newSession(consistency Mode, cluster *mongoCluster, timeout time.Duration) (session *Session) {
 
542
        cluster.Acquire()
 
543
        session = &Session{
 
544
                cluster_:    cluster,
 
545
                syncTimeout: timeout,
 
546
                sockTimeout: timeout,
 
547
                poolLimit:   4096,
 
548
        }
 
549
        debugf("New session %p on cluster %p", session, cluster)
 
550
        session.SetMode(consistency, true)
 
551
        session.SetSafe(&Safe{})
 
552
        session.queryConfig.prefetch = defaultPrefetch
 
553
        return session
 
554
}
 
555
 
 
556
func copySession(session *Session, keepCreds bool) (s *Session) {
 
557
        cluster := session.cluster()
 
558
        cluster.Acquire()
 
559
        if session.masterSocket != nil {
 
560
                session.masterSocket.Acquire()
 
561
        }
 
562
        if session.slaveSocket != nil {
 
563
                session.slaveSocket.Acquire()
 
564
        }
 
565
        var creds []Credential
 
566
        if keepCreds {
 
567
                creds = make([]Credential, len(session.creds))
 
568
                copy(creds, session.creds)
 
569
        } else if session.dialCred != nil {
 
570
                creds = []Credential{*session.dialCred}
 
571
        }
 
572
        scopy := *session
 
573
        scopy.m = sync.RWMutex{}
 
574
        scopy.creds = creds
 
575
        s = &scopy
 
576
        debugf("New session %p on cluster %p (copy from %p)", s, cluster, session)
 
577
        return s
 
578
}
 
579
 
 
580
// LiveServers returns a list of server addresses which are
 
581
// currently known to be alive.
 
582
func (s *Session) LiveServers() (addrs []string) {
 
583
        s.m.RLock()
 
584
        addrs = s.cluster().LiveServers()
 
585
        s.m.RUnlock()
 
586
        return addrs
 
587
}
 
588
 
 
589
// DB returns a value representing the named database. If name
 
590
// is empty, the database name provided in the dialed URL is
 
591
// used instead. If that is also empty, "test" is used as a
 
592
// fallback in a way equivalent to the mongo shell.
 
593
//
 
594
// Creating this value is a very lightweight operation, and
 
595
// involves no network communication.
 
596
func (s *Session) DB(name string) *Database {
 
597
        if name == "" {
 
598
                name = s.defaultdb
 
599
        }
 
600
        return &Database{s, name}
 
601
}
 
602
 
 
603
// C returns a value representing the named collection.
 
604
//
 
605
// Creating this value is a very lightweight operation, and
 
606
// involves no network communication.
 
607
func (db *Database) C(name string) *Collection {
 
608
        return &Collection{db, name, db.Name + "." + name}
 
609
}
 
610
 
 
611
// With returns a copy of db that uses session s.
 
612
func (db *Database) With(s *Session) *Database {
 
613
        newdb := *db
 
614
        newdb.Session = s
 
615
        return &newdb
 
616
}
 
617
 
 
618
// With returns a copy of c that uses session s.
 
619
func (c *Collection) With(s *Session) *Collection {
 
620
        newdb := *c.Database
 
621
        newdb.Session = s
 
622
        newc := *c
 
623
        newc.Database = &newdb
 
624
        return &newc
 
625
}
 
626
 
 
627
// GridFS returns a GridFS value representing collections in db that
 
628
// follow the standard GridFS specification.
 
629
// The provided prefix (sometimes known as root) will determine which
 
630
// collections to use, and is usually set to "fs" when there is a
 
631
// single GridFS in the database.
 
632
//
 
633
// See the GridFS Create, Open, and OpenId methods for more details.
 
634
//
 
635
// Relevant documentation:
 
636
//
 
637
//     http://www.mongodb.org/display/DOCS/GridFS
 
638
//     http://www.mongodb.org/display/DOCS/GridFS+Tools
 
639
//     http://www.mongodb.org/display/DOCS/GridFS+Specification
 
640
//
 
641
func (db *Database) GridFS(prefix string) *GridFS {
 
642
        return newGridFS(db, prefix)
 
643
}
 
644
 
 
645
// Run issues the provided command on the db database and unmarshals
 
646
// its result in the respective argument. The cmd argument may be either
 
647
// a string with the command name itself, in which case an empty document of
 
648
// the form bson.M{cmd: 1} will be used, or it may be a full command document.
 
649
//
 
650
// Note that MongoDB considers the first marshalled key as the command
 
651
// name, so when providing a command with options, it's important to
 
652
// use an ordering-preserving document, such as a struct value or an
 
653
// instance of bson.D.  For instance:
 
654
//
 
655
//     db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
 
656
//
 
657
// For privilleged commands typically run on the "admin" database, see
 
658
// the Run method in the Session type.
 
659
//
 
660
// Relevant documentation:
 
661
//
 
662
//     http://www.mongodb.org/display/DOCS/Commands
 
663
//     http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
 
664
//
 
665
func (db *Database) Run(cmd interface{}, result interface{}) error {
 
666
        socket, err := db.Session.acquireSocket(true)
 
667
        if err != nil {
 
668
                return err
 
669
        }
 
670
        defer socket.Release()
 
671
 
 
672
        // This is an optimized form of db.C("$cmd").Find(cmd).One(result).
 
673
        return db.run(socket, cmd, result)
 
674
}
 
675
 
 
676
// Credential holds details to authenticate with a MongoDB server.
 
677
type Credential struct {
 
678
        // Username and Password hold the basic details for authentication.
 
679
        // Password is optional with some authentication mechanisms.
 
680
        Username string
 
681
        Password string
 
682
 
 
683
        // Source is the database used to establish credentials and privileges
 
684
        // with a MongoDB server. Defaults to the default database provided
 
685
        // during dial, or "admin" if that was unset.
 
686
        Source string
 
687
 
 
688
        // Service defines the service name to use when authenticating with the GSSAPI
 
689
        // mechanism. Defaults to "mongodb".
 
690
        Service string
 
691
 
 
692
        // ServiceHost defines which hostname to use when authenticating
 
693
        // with the GSSAPI mechanism. If not specified, defaults to the MongoDB
 
694
        // server's address.
 
695
        ServiceHost string
 
696
 
 
697
        // Mechanism defines the protocol for credential negotiation.
 
698
        // Defaults to "MONGODB-CR".
 
699
        Mechanism string
 
700
}
 
701
 
 
702
// Login authenticates with MongoDB using the provided credential.  The
 
703
// authentication is valid for the whole session and will stay valid until
 
704
// Logout is explicitly called for the same database, or the session is
 
705
// closed.
 
706
func (db *Database) Login(user, pass string) error {
 
707
        return db.Session.Login(&Credential{Username: user, Password: pass, Source: db.Name})
 
708
}
 
709
 
 
710
// Login authenticates with MongoDB using the provided credential.  The
 
711
// authentication is valid for the whole session and will stay valid until
 
712
// Logout is explicitly called for the same database, or the session is
 
713
// closed.
 
714
func (s *Session) Login(cred *Credential) error {
 
715
        socket, err := s.acquireSocket(true)
 
716
        if err != nil {
 
717
                return err
 
718
        }
 
719
        defer socket.Release()
 
720
 
 
721
        credCopy := *cred
 
722
        if cred.Source == "" {
 
723
                if cred.Mechanism == "GSSAPI" {
 
724
                        credCopy.Source = "$external"
 
725
                } else {
 
726
                        credCopy.Source = s.sourcedb
 
727
                }
 
728
        }
 
729
        err = socket.Login(credCopy)
 
730
        if err != nil {
 
731
                return err
 
732
        }
 
733
 
 
734
        s.m.Lock()
 
735
        s.creds = append(s.creds, credCopy)
 
736
        s.m.Unlock()
 
737
        return nil
 
738
}
 
739
 
 
740
func (s *Session) socketLogin(socket *mongoSocket) error {
 
741
        for _, cred := range s.creds {
 
742
                if err := socket.Login(cred); err != nil {
 
743
                        return err
 
744
                }
 
745
        }
 
746
        return nil
 
747
}
 
748
 
 
749
// Logout removes any established authentication credentials for the database.
 
750
func (db *Database) Logout() {
 
751
        session := db.Session
 
752
        dbname := db.Name
 
753
        session.m.Lock()
 
754
        found := false
 
755
        for i, cred := range session.creds {
 
756
                if cred.Source == dbname {
 
757
                        copy(session.creds[i:], session.creds[i+1:])
 
758
                        session.creds = session.creds[:len(session.creds)-1]
 
759
                        found = true
 
760
                        break
 
761
                }
 
762
        }
 
763
        if found {
 
764
                if session.masterSocket != nil {
 
765
                        session.masterSocket.Logout(dbname)
 
766
                }
 
767
                if session.slaveSocket != nil {
 
768
                        session.slaveSocket.Logout(dbname)
 
769
                }
 
770
        }
 
771
        session.m.Unlock()
 
772
}
 
773
 
 
774
// LogoutAll removes all established authentication credentials for the session.
 
775
func (s *Session) LogoutAll() {
 
776
        s.m.Lock()
 
777
        for _, cred := range s.creds {
 
778
                if s.masterSocket != nil {
 
779
                        s.masterSocket.Logout(cred.Source)
 
780
                }
 
781
                if s.slaveSocket != nil {
 
782
                        s.slaveSocket.Logout(cred.Source)
 
783
                }
 
784
        }
 
785
        s.creds = s.creds[0:0]
 
786
        s.m.Unlock()
 
787
}
 
788
 
 
789
// User represents a MongoDB user.
 
790
//
 
791
// Relevant documentation:
 
792
//
 
793
//     http://docs.mongodb.org/manual/reference/privilege-documents/
 
794
//     http://docs.mongodb.org/manual/reference/user-privileges/
 
795
//
 
796
type User struct {
 
797
        // Username is how the user identifies itself to the system.
 
798
        Username string `bson:"user"`
 
799
 
 
800
        // Password is the plaintext password for the user. If set,
 
801
        // the UpsertUser method will hash it into PasswordHash and
 
802
        // unset it before the user is added to the database.
 
803
        Password string `bson:",omitempty"`
 
804
 
 
805
        // PasswordHash is the MD5 hash of Username+":mongo:"+Password.
 
806
        PasswordHash string `bson:"pwd,omitempty"`
 
807
 
 
808
        // CustomData holds arbitrary data admins decide to associate
 
809
        // with this user, such as the full name or employee id.
 
810
        CustomData interface{} `bson:"customData,omitempty"`
 
811
 
 
812
        // Roles indicates the set of roles the user will be provided.
 
813
        // See the Role constants.
 
814
        Roles []Role `bson:"roles"`
 
815
 
 
816
        // OtherDBRoles allows assigning roles in other databases from
 
817
        // user documents inserted in the admin database. This field
 
818
        // only works in the admin database.
 
819
        OtherDBRoles map[string][]Role `bson:"otherDBRoles,omitempty"`
 
820
 
 
821
        // UserSource indicates where to look for this user's credentials.
 
822
        // It may be set to a database name, or to "$external" for
 
823
        // consulting an external resource such as Kerberos. UserSource
 
824
        // must not be set if Password or PasswordHash are present.
 
825
        //
 
826
        // WARNING: This setting was only ever supported in MongoDB 2.4,
 
827
        // and is now obsolete.
 
828
        UserSource string `bson:"userSource,omitempty"`
 
829
}
 
830
 
 
831
type Role string
 
832
 
 
833
const (
 
834
        // Relevant documentation:
 
835
        //
 
836
        //     http://docs.mongodb.org/manual/reference/user-privileges/
 
837
        //
 
838
        RoleRoot         Role = "root"
 
839
        RoleRead         Role = "read"
 
840
        RoleReadAny      Role = "readAnyDatabase"
 
841
        RoleReadWrite    Role = "readWrite"
 
842
        RoleReadWriteAny Role = "readWriteAnyDatabase"
 
843
        RoleDBAdmin      Role = "dbAdmin"
 
844
        RoleDBAdminAny   Role = "dbAdminAnyDatabase"
 
845
        RoleUserAdmin    Role = "userAdmin"
 
846
        RoleUserAdminAny Role = "userAdminAnyDatabase"
 
847
        RoleClusterAdmin Role = "clusterAdmin"
 
848
)
 
849
 
 
850
// UpsertUser updates the authentication credentials and the roles for
 
851
// a MongoDB user within the db database. If the named user doesn't exist
 
852
// it will be created.
 
853
//
 
854
// This method should only be used from MongoDB 2.4 and on. For older
 
855
// MongoDB releases, use the obsolete AddUser method instead.
 
856
//
 
857
// Relevant documentation:
 
858
//
 
859
//     http://docs.mongodb.org/manual/reference/user-privileges/
 
860
//     http://docs.mongodb.org/manual/reference/privilege-documents/
 
861
//
 
862
func (db *Database) UpsertUser(user *User) error {
 
863
        if user.Username == "" {
 
864
                return fmt.Errorf("user has no Username")
 
865
        }
 
866
        if (user.Password != "" || user.PasswordHash != "") && user.UserSource != "" {
 
867
                return fmt.Errorf("user has both Password/PasswordHash and UserSource set")
 
868
        }
 
869
        if len(user.OtherDBRoles) > 0 && db.Name != "admin" && db.Name != "$external" {
 
870
                return fmt.Errorf("user with OtherDBRoles is only supported in the admin or $external databases")
 
871
        }
 
872
 
 
873
        // Attempt to run this using 2.6+ commands.
 
874
        rundb := db
 
875
        if user.UserSource != "" {
 
876
                // Compatibility logic for the userSource field of MongoDB <= 2.4.X
 
877
                rundb = db.Session.DB(user.UserSource)
 
878
        }
 
879
        err := rundb.runUserCmd("updateUser", user)
 
880
        // retry with createUser when isAuthError in order to enable the "localhost exception"
 
881
        if isNotFound(err) || isAuthError(err) {
 
882
                return rundb.runUserCmd("createUser", user)
 
883
        }
 
884
        if !isNoCmd(err) {
 
885
                return err
 
886
        }
 
887
 
 
888
        // Command does not exist. Fallback to pre-2.6 behavior.
 
889
        var set, unset bson.D
 
890
        if user.Password != "" {
 
891
                psum := md5.New()
 
892
                psum.Write([]byte(user.Username + ":mongo:" + user.Password))
 
893
                set = append(set, bson.DocElem{"pwd", hex.EncodeToString(psum.Sum(nil))})
 
894
                unset = append(unset, bson.DocElem{"userSource", 1})
 
895
        } else if user.PasswordHash != "" {
 
896
                set = append(set, bson.DocElem{"pwd", user.PasswordHash})
 
897
                unset = append(unset, bson.DocElem{"userSource", 1})
 
898
        }
 
899
        if user.UserSource != "" {
 
900
                set = append(set, bson.DocElem{"userSource", user.UserSource})
 
901
                unset = append(unset, bson.DocElem{"pwd", 1})
 
902
        }
 
903
        if user.Roles != nil || user.OtherDBRoles != nil {
 
904
                set = append(set, bson.DocElem{"roles", user.Roles})
 
905
                if len(user.OtherDBRoles) > 0 {
 
906
                        set = append(set, bson.DocElem{"otherDBRoles", user.OtherDBRoles})
 
907
                } else {
 
908
                        unset = append(unset, bson.DocElem{"otherDBRoles", 1})
 
909
                }
 
910
        }
 
911
        users := db.C("system.users")
 
912
        err = users.Update(bson.D{{"user", user.Username}}, bson.D{{"$unset", unset}, {"$set", set}})
 
913
        if err == ErrNotFound {
 
914
                set = append(set, bson.DocElem{"user", user.Username})
 
915
                if user.Roles == nil && user.OtherDBRoles == nil {
 
916
                        // Roles must be sent, as it's the way MongoDB distinguishes
 
917
                        // old-style documents from new-style documents in pre-2.6.
 
918
                        set = append(set, bson.DocElem{"roles", user.Roles})
 
919
                }
 
920
                err = users.Insert(set)
 
921
        }
 
922
        return err
 
923
}
 
924
 
 
925
func isNoCmd(err error) bool {
 
926
        e, ok := err.(*QueryError)
 
927
        return ok && (e.Code == 59 || e.Code == 13390 || strings.HasPrefix(e.Message, "no such cmd:"))
 
928
}
 
929
 
 
930
func isNotFound(err error) bool {
 
931
        e, ok := err.(*QueryError)
 
932
        return ok && e.Code == 11
 
933
}
 
934
 
 
935
func isAuthError(err error) bool {
 
936
        e, ok := err.(*QueryError)
 
937
        return ok && e.Code == 13
 
938
}
 
939
 
 
940
func (db *Database) runUserCmd(cmdName string, user *User) error {
 
941
        cmd := make(bson.D, 0, 16)
 
942
        cmd = append(cmd, bson.DocElem{cmdName, user.Username})
 
943
        if user.Password != "" {
 
944
                cmd = append(cmd, bson.DocElem{"pwd", user.Password})
 
945
        }
 
946
        var roles []interface{}
 
947
        for _, role := range user.Roles {
 
948
                roles = append(roles, role)
 
949
        }
 
950
        for db, dbroles := range user.OtherDBRoles {
 
951
                for _, role := range dbroles {
 
952
                        roles = append(roles, bson.D{{"role", role}, {"db", db}})
 
953
                }
 
954
        }
 
955
        if roles != nil || user.Roles != nil || cmdName == "createUser" {
 
956
                cmd = append(cmd, bson.DocElem{"roles", roles})
 
957
        }
 
958
        err := db.Run(cmd, nil)
 
959
        if !isNoCmd(err) && user.UserSource != "" && (user.UserSource != "$external" || db.Name != "$external") {
 
960
                return fmt.Errorf("MongoDB 2.6+ does not support the UserSource setting")
 
961
        }
 
962
        return err
 
963
}
 
964
 
 
965
// AddUser creates or updates the authentication credentials of user within
 
966
// the db database.
 
967
//
 
968
// WARNING: This method is obsolete and should only be used with MongoDB 2.2
 
969
// or earlier. For MongoDB 2.4 and on, use UpsertUser instead.
 
970
func (db *Database) AddUser(username, password string, readOnly bool) error {
 
971
        // Try to emulate the old behavior on 2.6+
 
972
        user := &User{Username: username, Password: password}
 
973
        if db.Name == "admin" {
 
974
                if readOnly {
 
975
                        user.Roles = []Role{RoleReadAny}
 
976
                } else {
 
977
                        user.Roles = []Role{RoleReadWriteAny}
 
978
                }
 
979
        } else {
 
980
                if readOnly {
 
981
                        user.Roles = []Role{RoleRead}
 
982
                } else {
 
983
                        user.Roles = []Role{RoleReadWrite}
 
984
                }
 
985
        }
 
986
        err := db.runUserCmd("updateUser", user)
 
987
        if isNotFound(err) {
 
988
                return db.runUserCmd("createUser", user)
 
989
        }
 
990
        if !isNoCmd(err) {
 
991
                return err
 
992
        }
 
993
 
 
994
        // Command doesn't exist. Fallback to pre-2.6 behavior.
 
995
        psum := md5.New()
 
996
        psum.Write([]byte(username + ":mongo:" + password))
 
997
        digest := hex.EncodeToString(psum.Sum(nil))
 
998
        c := db.C("system.users")
 
999
        _, err = c.Upsert(bson.M{"user": username}, bson.M{"$set": bson.M{"user": username, "pwd": digest, "readOnly": readOnly}})
 
1000
        return err
 
1001
}
 
1002
 
 
1003
// RemoveUser removes the authentication credentials of user from the database.
 
1004
func (db *Database) RemoveUser(user string) error {
 
1005
        err := db.Run(bson.D{{"dropUser", user}}, nil)
 
1006
        if isNoCmd(err) {
 
1007
                users := db.C("system.users")
 
1008
                return users.Remove(bson.M{"user": user})
 
1009
        }
 
1010
        if isNotFound(err) {
 
1011
                return ErrNotFound
 
1012
        }
 
1013
        return err
 
1014
}
 
1015
 
 
1016
type indexSpec struct {
 
1017
        Name, NS         string
 
1018
        Key              bson.D
 
1019
        Unique           bool    ",omitempty"
 
1020
        DropDups         bool    "dropDups,omitempty"
 
1021
        Background       bool    ",omitempty"
 
1022
        Sparse           bool    ",omitempty"
 
1023
        Bits             int     ",omitempty"
 
1024
        Min, Max         float64 ",omitempty"
 
1025
        BucketSize       float64 "bucketSize,omitempty"
 
1026
        ExpireAfter      int     "expireAfterSeconds,omitempty"
 
1027
        Weights          bson.D  ",omitempty"
 
1028
        DefaultLanguage  string  "default_language,omitempty"
 
1029
        LanguageOverride string  "language_override,omitempty"
 
1030
        TextIndexVersion int     "textIndexVersion,omitempty"
 
1031
}
 
1032
 
 
1033
type Index struct {
 
1034
        Key        []string // Index key fields; prefix name with dash (-) for descending order
 
1035
        Unique     bool     // Prevent two documents from having the same index key
 
1036
        DropDups   bool     // Drop documents with the same index key as a previously indexed one
 
1037
        Background bool     // Build index in background and return immediately
 
1038
        Sparse     bool     // Only index documents containing the Key fields
 
1039
 
 
1040
        // If ExpireAfter is defined the server will periodically delete
 
1041
        // documents with indexed time.Time older than the provided delta.
 
1042
        ExpireAfter time.Duration
 
1043
 
 
1044
        // Name holds the stored index name. On creation if this field is unset it is
 
1045
        // computed by EnsureIndex based on the index key.
 
1046
        Name string
 
1047
 
 
1048
        // Properties for spatial indexes.
 
1049
        //
 
1050
        // Min and Max were improperly typed as int when they should have been
 
1051
        // floats.  To preserve backwards compatibility they are still typed as
 
1052
        // int and the following two fields enable reading and writing the same
 
1053
        // fields as float numbers. In mgo.v3, these fields will be dropped and
 
1054
        // Min/Max will become floats.
 
1055
        Min, Max   int
 
1056
        Minf, Maxf float64
 
1057
        BucketSize float64
 
1058
        Bits       int
 
1059
 
 
1060
        // Properties for text indexes.
 
1061
        DefaultLanguage  string
 
1062
        LanguageOverride string
 
1063
 
 
1064
        // Weights defines the significance of provided fields relative to other
 
1065
        // fields in a text index. The score for a given word in a document is derived
 
1066
        // from the weighted sum of the frequency for each of the indexed fields in
 
1067
        // that document. The default field weight is 1.
 
1068
        Weights map[string]int
 
1069
}
 
1070
 
 
1071
// mgo.v3: Drop Minf and Maxf and transform Min and Max to floats.
 
1072
// mgo.v3: Drop DropDups as it's unsupported past 2.8.
 
1073
 
 
1074
type indexKeyInfo struct {
 
1075
        name    string
 
1076
        key     bson.D
 
1077
        weights bson.D
 
1078
}
 
1079
 
 
1080
func parseIndexKey(key []string) (*indexKeyInfo, error) {
 
1081
        var keyInfo indexKeyInfo
 
1082
        isText := false
 
1083
        var order interface{}
 
1084
        for _, field := range key {
 
1085
                raw := field
 
1086
                if keyInfo.name != "" {
 
1087
                        keyInfo.name += "_"
 
1088
                }
 
1089
                var kind string
 
1090
                if field != "" {
 
1091
                        if field[0] == '$' {
 
1092
                                if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 {
 
1093
                                        kind = field[1:c]
 
1094
                                        field = field[c+1:]
 
1095
                                        keyInfo.name += field + "_" + kind
 
1096
                                } else {
 
1097
                                        field = "\x00"
 
1098
                                }
 
1099
                        }
 
1100
                        switch field[0] {
 
1101
                        case 0:
 
1102
                                // Logic above failed. Reset and error.
 
1103
                                field = ""
 
1104
                        case '@':
 
1105
                                order = "2d"
 
1106
                                field = field[1:]
 
1107
                                // The shell used to render this field as key_ instead of key_2d,
 
1108
                                // and mgo followed suit. This has been fixed in recent server
 
1109
                                // releases, and mgo followed as well.
 
1110
                                keyInfo.name += field + "_2d"
 
1111
                        case '-':
 
1112
                                order = -1
 
1113
                                field = field[1:]
 
1114
                                keyInfo.name += field + "_-1"
 
1115
                        case '+':
 
1116
                                field = field[1:]
 
1117
                                fallthrough
 
1118
                        default:
 
1119
                                if kind == "" {
 
1120
                                        order = 1
 
1121
                                        keyInfo.name += field + "_1"
 
1122
                                } else {
 
1123
                                        order = kind
 
1124
                                }
 
1125
                        }
 
1126
                }
 
1127
                if field == "" || kind != "" && order != kind {
 
1128
                        return nil, fmt.Errorf(`invalid index key: want "[$<kind>:][-]<field name>", got %q`, raw)
 
1129
                }
 
1130
                if kind == "text" {
 
1131
                        if !isText {
 
1132
                                keyInfo.key = append(keyInfo.key, bson.DocElem{"_fts", "text"}, bson.DocElem{"_ftsx", 1})
 
1133
                                isText = true
 
1134
                        }
 
1135
                        keyInfo.weights = append(keyInfo.weights, bson.DocElem{field, 1})
 
1136
                } else {
 
1137
                        keyInfo.key = append(keyInfo.key, bson.DocElem{field, order})
 
1138
                }
 
1139
        }
 
1140
        if keyInfo.name == "" {
 
1141
                return nil, errors.New("invalid index key: no fields provided")
 
1142
        }
 
1143
        return &keyInfo, nil
 
1144
}
 
1145
 
 
1146
// EnsureIndexKey ensures an index with the given key exists, creating it
 
1147
// if necessary.
 
1148
//
 
1149
// This example:
 
1150
//
 
1151
//     err := collection.EnsureIndexKey("a", "b")
 
1152
//
 
1153
// Is equivalent to:
 
1154
//
 
1155
//     err := collection.EnsureIndex(mgo.Index{Key: []string{"a", "b"}})
 
1156
//
 
1157
// See the EnsureIndex method for more details.
 
1158
func (c *Collection) EnsureIndexKey(key ...string) error {
 
1159
        return c.EnsureIndex(Index{Key: key})
 
1160
}
 
1161
 
 
1162
// EnsureIndex ensures an index with the given key exists, creating it with
 
1163
// the provided parameters if necessary. EnsureIndex does not modify a previously
 
1164
// existent index with a matching key. The old index must be dropped first instead.
 
1165
//
 
1166
// Once EnsureIndex returns successfully, following requests for the same index
 
1167
// will not contact the server unless Collection.DropIndex is used to drop the
 
1168
// same index, or Session.ResetIndexCache is called.
 
1169
//
 
1170
// For example:
 
1171
//
 
1172
//     index := Index{
 
1173
//         Key: []string{"lastname", "firstname"},
 
1174
//         Unique: true,
 
1175
//         DropDups: true,
 
1176
//         Background: true, // See notes.
 
1177
//         Sparse: true,
 
1178
//     }
 
1179
//     err := collection.EnsureIndex(index)
 
1180
//
 
1181
// The Key value determines which fields compose the index. The index ordering
 
1182
// will be ascending by default.  To obtain an index with a descending order,
 
1183
// the field name should be prefixed by a dash (e.g. []string{"-time"}). It can
 
1184
// also be optionally prefixed by an index kind, as in "$text:summary" or
 
1185
// "$2d:-point". The key string format is:
 
1186
//
 
1187
//     [$<kind>:][-]<field name>
 
1188
//
 
1189
// If the Unique field is true, the index must necessarily contain only a single
 
1190
// document per Key.  With DropDups set to true, documents with the same key
 
1191
// as a previously indexed one will be dropped rather than an error returned.
 
1192
//
 
1193
// If Background is true, other connections will be allowed to proceed using
 
1194
// the collection without the index while it's being built. Note that the
 
1195
// session executing EnsureIndex will be blocked for as long as it takes for
 
1196
// the index to be built.
 
1197
//
 
1198
// If Sparse is true, only documents containing the provided Key fields will be
 
1199
// included in the index.  When using a sparse index for sorting, only indexed
 
1200
// documents will be returned.
 
1201
//
 
1202
// If ExpireAfter is non-zero, the server will periodically scan the collection
 
1203
// and remove documents containing an indexed time.Time field with a value
 
1204
// older than ExpireAfter. See the documentation for details:
 
1205
//
 
1206
//     http://docs.mongodb.org/manual/tutorial/expire-data
 
1207
//
 
1208
// Other kinds of indexes are also supported through that API. Here is an example:
 
1209
//
 
1210
//     index := Index{
 
1211
//         Key: []string{"$2d:loc"},
 
1212
//         Bits: 26,
 
1213
//     }
 
1214
//     err := collection.EnsureIndex(index)
 
1215
//
 
1216
// The example above requests the creation of a "2d" index for the "loc" field.
 
1217
//
 
1218
// The 2D index bounds may be changed using the Min and Max attributes of the
 
1219
// Index value.  The default bound setting of (-180, 180) is suitable for
 
1220
// latitude/longitude pairs.
 
1221
//
 
1222
// The Bits parameter sets the precision of the 2D geohash values.  If not
 
1223
// provided, 26 bits are used, which is roughly equivalent to 1 foot of
 
1224
// precision for the default (-180, 180) index bounds.
 
1225
//
 
1226
// Relevant documentation:
 
1227
//
 
1228
//     http://www.mongodb.org/display/DOCS/Indexes
 
1229
//     http://www.mongodb.org/display/DOCS/Indexing+Advice+and+FAQ
 
1230
//     http://www.mongodb.org/display/DOCS/Indexing+as+a+Background+Operation
 
1231
//     http://www.mongodb.org/display/DOCS/Geospatial+Indexing
 
1232
//     http://www.mongodb.org/display/DOCS/Multikeys
 
1233
//
 
1234
func (c *Collection) EnsureIndex(index Index) error {
 
1235
        keyInfo, err := parseIndexKey(index.Key)
 
1236
        if err != nil {
 
1237
                return err
 
1238
        }
 
1239
 
 
1240
        session := c.Database.Session
 
1241
        cacheKey := c.FullName + "\x00" + keyInfo.name
 
1242
        if session.cluster().HasCachedIndex(cacheKey) {
 
1243
                return nil
 
1244
        }
 
1245
 
 
1246
        spec := indexSpec{
 
1247
                Name:             keyInfo.name,
 
1248
                NS:               c.FullName,
 
1249
                Key:              keyInfo.key,
 
1250
                Unique:           index.Unique,
 
1251
                DropDups:         index.DropDups,
 
1252
                Background:       index.Background,
 
1253
                Sparse:           index.Sparse,
 
1254
                Bits:             index.Bits,
 
1255
                Min:              index.Minf,
 
1256
                Max:              index.Maxf,
 
1257
                BucketSize:       index.BucketSize,
 
1258
                ExpireAfter:      int(index.ExpireAfter / time.Second),
 
1259
                Weights:          keyInfo.weights,
 
1260
                DefaultLanguage:  index.DefaultLanguage,
 
1261
                LanguageOverride: index.LanguageOverride,
 
1262
        }
 
1263
 
 
1264
        if spec.Min == 0 && spec.Max == 0 {
 
1265
                spec.Min = float64(index.Min)
 
1266
                spec.Max = float64(index.Max)
 
1267
        }
 
1268
 
 
1269
        if index.Name != "" {
 
1270
                spec.Name = index.Name
 
1271
        }
 
1272
 
 
1273
NextField:
 
1274
        for name, weight := range index.Weights {
 
1275
                for i, elem := range spec.Weights {
 
1276
                        if elem.Name == name {
 
1277
                                spec.Weights[i].Value = weight
 
1278
                                continue NextField
 
1279
                        }
 
1280
                }
 
1281
                panic("weight provided for field that is not part of index key: " + name)
 
1282
        }
 
1283
 
 
1284
        cloned := session.Clone()
 
1285
        defer cloned.Close()
 
1286
        cloned.SetMode(Strong, false)
 
1287
        cloned.EnsureSafe(&Safe{})
 
1288
        db := c.Database.With(cloned)
 
1289
 
 
1290
        // Try with a command first.
 
1291
        err = db.Run(bson.D{{"createIndexes", c.Name}, {"indexes", []indexSpec{spec}}}, nil)
 
1292
        if isNoCmd(err) {
 
1293
                // Command not yet supported. Insert into the indexes collection instead.
 
1294
                err = db.C("system.indexes").Insert(&spec)
 
1295
        }
 
1296
        if err == nil {
 
1297
                session.cluster().CacheIndex(cacheKey, true)
 
1298
        }
 
1299
        return err
 
1300
}
 
1301
 
 
1302
// DropIndex drops the index with the provided key from the c collection.
 
1303
//
 
1304
// See EnsureIndex for details on the accepted key variants.
 
1305
//
 
1306
// For example:
 
1307
//
 
1308
//     err1 := collection.DropIndex("firstField", "-secondField")
 
1309
//     err2 := collection.DropIndex("customIndexName")
 
1310
//
 
1311
func (c *Collection) DropIndex(key ...string) error {
 
1312
        keyInfo, err := parseIndexKey(key)
 
1313
        if err != nil {
 
1314
                return err
 
1315
        }
 
1316
 
 
1317
        session := c.Database.Session
 
1318
        cacheKey := c.FullName + "\x00" + keyInfo.name
 
1319
        session.cluster().CacheIndex(cacheKey, false)
 
1320
 
 
1321
        session = session.Clone()
 
1322
        defer session.Close()
 
1323
        session.SetMode(Strong, false)
 
1324
 
 
1325
        db := c.Database.With(session)
 
1326
        result := struct {
 
1327
                ErrMsg string
 
1328
                Ok     bool
 
1329
        }{}
 
1330
        err = db.Run(bson.D{{"dropIndexes", c.Name}, {"index", keyInfo.name}}, &result)
 
1331
        if err != nil {
 
1332
                return err
 
1333
        }
 
1334
        if !result.Ok {
 
1335
                return errors.New(result.ErrMsg)
 
1336
        }
 
1337
        return nil
 
1338
}
 
1339
 
 
1340
// DropIndexName removes the index with the provided index name.
 
1341
//
 
1342
// For example:
 
1343
//
 
1344
//     err := collection.DropIndex("customIndexName")
 
1345
//
 
1346
func (c *Collection) DropIndexName(name string) error {
 
1347
        session := c.Database.Session
 
1348
 
 
1349
        session = session.Clone()
 
1350
        defer session.Close()
 
1351
        session.SetMode(Strong, false)
 
1352
 
 
1353
        c = c.With(session)
 
1354
 
 
1355
        indexes, err := c.Indexes()
 
1356
        if err != nil {
 
1357
                return err
 
1358
        }
 
1359
 
 
1360
        var index Index
 
1361
        for _, idx := range indexes {
 
1362
                if idx.Name == name {
 
1363
                        index = idx
 
1364
                        break
 
1365
                }
 
1366
        }
 
1367
 
 
1368
        if index.Name != "" {
 
1369
                keyInfo, err := parseIndexKey(index.Key)
 
1370
                if err != nil {
 
1371
                        return err
 
1372
                }
 
1373
 
 
1374
                cacheKey := c.FullName + "\x00" + keyInfo.name
 
1375
                session.cluster().CacheIndex(cacheKey, false)
 
1376
        }
 
1377
 
 
1378
        result := struct {
 
1379
                ErrMsg string
 
1380
                Ok     bool
 
1381
        }{}
 
1382
        err = c.Database.Run(bson.D{{"dropIndexes", c.Name}, {"index", name}}, &result)
 
1383
        if err != nil {
 
1384
                return err
 
1385
        }
 
1386
        if !result.Ok {
 
1387
                return errors.New(result.ErrMsg)
 
1388
        }
 
1389
        return nil
 
1390
}
 
1391
 
 
1392
// nonEventual returns a clone of session and ensures it is not Eventual.
 
1393
// This guarantees that the server that is used for queries may be reused
 
1394
// afterwards when a cursor is received.
 
1395
func (session *Session) nonEventual() *Session {
 
1396
        cloned := session.Clone()
 
1397
        if cloned.consistency == Eventual {
 
1398
                cloned.SetMode(Monotonic, false)
 
1399
        }
 
1400
        return cloned
 
1401
}
 
1402
 
 
1403
// Indexes returns a list of all indexes for the collection.
 
1404
//
 
1405
// For example, this snippet would drop all available indexes:
 
1406
//
 
1407
//   indexes, err := collection.Indexes()
 
1408
//   if err != nil {
 
1409
//       return err
 
1410
//   }
 
1411
//   for _, index := range indexes {
 
1412
//       err = collection.DropIndex(index.Key...)
 
1413
//       if err != nil {
 
1414
//           return err
 
1415
//       }
 
1416
//   }
 
1417
//
 
1418
// See the EnsureIndex method for more details on indexes.
 
1419
func (c *Collection) Indexes() (indexes []Index, err error) {
 
1420
        cloned := c.Database.Session.nonEventual()
 
1421
        defer cloned.Close()
 
1422
 
 
1423
        batchSize := int(cloned.queryConfig.op.limit)
 
1424
 
 
1425
        // Try with a command.
 
1426
        var result struct {
 
1427
                Indexes []bson.Raw
 
1428
                Cursor  cursorData
 
1429
        }
 
1430
        var iter *Iter
 
1431
        err = c.Database.With(cloned).Run(bson.D{{"listIndexes", c.Name}, {"cursor", bson.D{{"batchSize", batchSize}}}}, &result)
 
1432
        if err == nil {
 
1433
                firstBatch := result.Indexes
 
1434
                if firstBatch == nil {
 
1435
                        firstBatch = result.Cursor.FirstBatch
 
1436
                }
 
1437
                ns := strings.SplitN(result.Cursor.NS, ".", 2)
 
1438
                if len(ns) < 2 {
 
1439
                        iter = c.With(cloned).NewIter(nil, firstBatch, result.Cursor.Id, nil)
 
1440
                } else {
 
1441
                        iter = cloned.DB(ns[0]).C(ns[1]).NewIter(nil, firstBatch, result.Cursor.Id, nil)
 
1442
                }
 
1443
        } else if isNoCmd(err) {
 
1444
                // Command not yet supported. Query the database instead.
 
1445
                iter = c.Database.C("system.indexes").Find(bson.M{"ns": c.FullName}).Iter()
 
1446
        } else {
 
1447
                return nil, err
 
1448
        }
 
1449
 
 
1450
        var spec indexSpec
 
1451
        for iter.Next(&spec) {
 
1452
                indexes = append(indexes, indexFromSpec(spec))
 
1453
        }
 
1454
        if err = iter.Close(); err != nil {
 
1455
                return nil, err
 
1456
        }
 
1457
        sort.Sort(indexSlice(indexes))
 
1458
        return indexes, nil
 
1459
}
 
1460
 
 
1461
func indexFromSpec(spec indexSpec) Index {
 
1462
        index := Index{
 
1463
                Name:             spec.Name,
 
1464
                Key:              simpleIndexKey(spec.Key),
 
1465
                Unique:           spec.Unique,
 
1466
                DropDups:         spec.DropDups,
 
1467
                Background:       spec.Background,
 
1468
                Sparse:           spec.Sparse,
 
1469
                Minf:             spec.Min,
 
1470
                Maxf:             spec.Max,
 
1471
                Bits:             spec.Bits,
 
1472
                BucketSize:       spec.BucketSize,
 
1473
                DefaultLanguage:  spec.DefaultLanguage,
 
1474
                LanguageOverride: spec.LanguageOverride,
 
1475
                ExpireAfter:      time.Duration(spec.ExpireAfter) * time.Second,
 
1476
        }
 
1477
        if float64(int(spec.Min)) == spec.Min && float64(int(spec.Max)) == spec.Max {
 
1478
                index.Min = int(spec.Min)
 
1479
                index.Max = int(spec.Max)
 
1480
        }
 
1481
        if spec.TextIndexVersion > 0 {
 
1482
                index.Key = make([]string, len(spec.Weights))
 
1483
                index.Weights = make(map[string]int)
 
1484
                for i, elem := range spec.Weights {
 
1485
                        index.Key[i] = "$text:" + elem.Name
 
1486
                        if w, ok := elem.Value.(int); ok {
 
1487
                                index.Weights[elem.Name] = w
 
1488
                        }
 
1489
                }
 
1490
        }
 
1491
        return index
 
1492
}
 
1493
 
 
1494
type indexSlice []Index
 
1495
 
 
1496
func (idxs indexSlice) Len() int           { return len(idxs) }
 
1497
func (idxs indexSlice) Less(i, j int) bool { return idxs[i].Name < idxs[j].Name }
 
1498
func (idxs indexSlice) Swap(i, j int)      { idxs[i], idxs[j] = idxs[j], idxs[i] }
 
1499
 
 
1500
func simpleIndexKey(realKey bson.D) (key []string) {
 
1501
        for i := range realKey {
 
1502
                field := realKey[i].Name
 
1503
                vi, ok := realKey[i].Value.(int)
 
1504
                if !ok {
 
1505
                        vf, _ := realKey[i].Value.(float64)
 
1506
                        vi = int(vf)
 
1507
                }
 
1508
                if vi == 1 {
 
1509
                        key = append(key, field)
 
1510
                        continue
 
1511
                }
 
1512
                if vi == -1 {
 
1513
                        key = append(key, "-"+field)
 
1514
                        continue
 
1515
                }
 
1516
                if vs, ok := realKey[i].Value.(string); ok {
 
1517
                        key = append(key, "$"+vs+":"+field)
 
1518
                        continue
 
1519
                }
 
1520
                panic("Got unknown index key type for field " + field)
 
1521
        }
 
1522
        return
 
1523
}
 
1524
 
 
1525
// ResetIndexCache() clears the cache of previously ensured indexes.
 
1526
// Following requests to EnsureIndex will contact the server.
 
1527
func (s *Session) ResetIndexCache() {
 
1528
        s.cluster().ResetIndexCache()
 
1529
}
 
1530
 
 
1531
// New creates a new session with the same parameters as the original
 
1532
// session, including consistency, batch size, prefetching, safety mode,
 
1533
// etc. The returned session will use sockets from the pool, so there's
 
1534
// a chance that writes just performed in another session may not yet
 
1535
// be visible.
 
1536
//
 
1537
// Login information from the original session will not be copied over
 
1538
// into the new session unless it was provided through the initial URL
 
1539
// for the Dial function.
 
1540
//
 
1541
// See the Copy and Clone methods.
 
1542
//
 
1543
func (s *Session) New() *Session {
 
1544
        s.m.Lock()
 
1545
        scopy := copySession(s, false)
 
1546
        s.m.Unlock()
 
1547
        scopy.Refresh()
 
1548
        return scopy
 
1549
}
 
1550
 
 
1551
// Copy works just like New, but preserves the exact authentication
 
1552
// information from the original session.
 
1553
func (s *Session) Copy() *Session {
 
1554
        s.m.Lock()
 
1555
        scopy := copySession(s, true)
 
1556
        s.m.Unlock()
 
1557
        scopy.Refresh()
 
1558
        return scopy
 
1559
}
 
1560
 
 
1561
// Clone works just like Copy, but also reuses the same socket as the original
 
1562
// session, in case it had already reserved one due to its consistency
 
1563
// guarantees.  This behavior ensures that writes performed in the old session
 
1564
// are necessarily observed when using the new session, as long as it was a
 
1565
// strong or monotonic session.  That said, it also means that long operations
 
1566
// may cause other goroutines using the original session to wait.
 
1567
func (s *Session) Clone() *Session {
 
1568
        s.m.Lock()
 
1569
        scopy := copySession(s, true)
 
1570
        s.m.Unlock()
 
1571
        return scopy
 
1572
}
 
1573
 
 
1574
// Close terminates the session.  It's a runtime error to use a session
 
1575
// after it has been closed.
 
1576
func (s *Session) Close() {
 
1577
        s.m.Lock()
 
1578
        if s.cluster_ != nil {
 
1579
                debugf("Closing session %p", s)
 
1580
                s.unsetSocket()
 
1581
                s.cluster_.Release()
 
1582
                s.cluster_ = nil
 
1583
        }
 
1584
        s.m.Unlock()
 
1585
}
 
1586
 
 
1587
func (s *Session) cluster() *mongoCluster {
 
1588
        if s.cluster_ == nil {
 
1589
                panic("Session already closed")
 
1590
        }
 
1591
        return s.cluster_
 
1592
}
 
1593
 
 
1594
// Refresh puts back any reserved sockets in use and restarts the consistency
 
1595
// guarantees according to the current consistency setting for the session.
 
1596
func (s *Session) Refresh() {
 
1597
        s.m.Lock()
 
1598
        s.slaveOk = s.consistency != Strong
 
1599
        s.unsetSocket()
 
1600
        s.m.Unlock()
 
1601
}
 
1602
 
 
1603
// SetMode changes the consistency mode for the session.
 
1604
//
 
1605
// In the Strong consistency mode reads and writes will always be made to
 
1606
// the primary server using a unique connection so that reads and writes are
 
1607
// fully consistent, ordered, and observing the most up-to-date data.
 
1608
// This offers the least benefits in terms of distributing load, but the
 
1609
// most guarantees.  See also Monotonic and Eventual.
 
1610
//
 
1611
// In the Monotonic consistency mode reads may not be entirely up-to-date,
 
1612
// but they will always see the history of changes moving forward, the data
 
1613
// read will be consistent across sequential queries in the same session,
 
1614
// and modifications made within the session will be observed in following
 
1615
// queries (read-your-writes).
 
1616
//
 
1617
// In practice, the Monotonic mode is obtained by performing initial reads
 
1618
// on a unique connection to an arbitrary secondary, if one is available,
 
1619
// and once the first write happens, the session connection is switched over
 
1620
// to the primary server.  This manages to distribute some of the reading
 
1621
// load with secondaries, while maintaining some useful guarantees.
 
1622
//
 
1623
// In the Eventual consistency mode reads will be made to any secondary in the
 
1624
// cluster, if one is available, and sequential reads will not necessarily
 
1625
// be made with the same connection.  This means that data may be observed
 
1626
// out of order.  Writes will of course be issued to the primary, but
 
1627
// independent writes in the same Eventual session may also be made with
 
1628
// independent connections, so there are also no guarantees in terms of
 
1629
// write ordering (no read-your-writes guarantees either).
 
1630
//
 
1631
// The Eventual mode is the fastest and most resource-friendly, but is
 
1632
// also the one offering the least guarantees about ordering of the data
 
1633
// read and written.
 
1634
//
 
1635
// If refresh is true, in addition to ensuring the session is in the given
 
1636
// consistency mode, the consistency guarantees will also be reset (e.g.
 
1637
// a Monotonic session will be allowed to read from secondaries again).
 
1638
// This is equivalent to calling the Refresh function.
 
1639
//
 
1640
// Shifting between Monotonic and Strong modes will keep a previously
 
1641
// reserved connection for the session unless refresh is true or the
 
1642
// connection is unsuitable (to a secondary server in a Strong session).
 
1643
func (s *Session) SetMode(consistency Mode, refresh bool) {
 
1644
        s.m.Lock()
 
1645
        debugf("Session %p: setting mode %d with refresh=%v (master=%p, slave=%p)", s, consistency, refresh, s.masterSocket, s.slaveSocket)
 
1646
        s.consistency = consistency
 
1647
        if refresh {
 
1648
                s.slaveOk = s.consistency != Strong
 
1649
                s.unsetSocket()
 
1650
        } else if s.consistency == Strong {
 
1651
                s.slaveOk = false
 
1652
        } else if s.masterSocket == nil {
 
1653
                s.slaveOk = true
 
1654
        }
 
1655
        s.m.Unlock()
 
1656
}
 
1657
 
 
1658
// Mode returns the current consistency mode for the session.
 
1659
func (s *Session) Mode() Mode {
 
1660
        s.m.RLock()
 
1661
        mode := s.consistency
 
1662
        s.m.RUnlock()
 
1663
        return mode
 
1664
}
 
1665
 
 
1666
// SetSyncTimeout sets the amount of time an operation with this session
 
1667
// will wait before returning an error in case a connection to a usable
 
1668
// server can't be established. Set it to zero to wait forever. The
 
1669
// default value is 7 seconds.
 
1670
func (s *Session) SetSyncTimeout(d time.Duration) {
 
1671
        s.m.Lock()
 
1672
        s.syncTimeout = d
 
1673
        s.m.Unlock()
 
1674
}
 
1675
 
 
1676
// SetSocketTimeout sets the amount of time to wait for a non-responding
 
1677
// socket to the database before it is forcefully closed.
 
1678
func (s *Session) SetSocketTimeout(d time.Duration) {
 
1679
        s.m.Lock()
 
1680
        s.sockTimeout = d
 
1681
        if s.masterSocket != nil {
 
1682
                s.masterSocket.SetTimeout(d)
 
1683
        }
 
1684
        if s.slaveSocket != nil {
 
1685
                s.slaveSocket.SetTimeout(d)
 
1686
        }
 
1687
        s.m.Unlock()
 
1688
}
 
1689
 
 
1690
// SetCursorTimeout changes the standard timeout period that the server
 
1691
// enforces on created cursors. The only supported value right now is
 
1692
// 0, which disables the timeout. The standard server timeout is 10 minutes.
 
1693
func (s *Session) SetCursorTimeout(d time.Duration) {
 
1694
        s.m.Lock()
 
1695
        if d == 0 {
 
1696
                s.queryConfig.op.flags |= flagNoCursorTimeout
 
1697
        } else {
 
1698
                panic("SetCursorTimeout: only 0 (disable timeout) supported for now")
 
1699
        }
 
1700
        s.m.Unlock()
 
1701
}
 
1702
 
 
1703
// SetPoolLimit sets the maximum number of sockets in use in a single server
 
1704
// before this session will block waiting for a socket to be available.
 
1705
// The default limit is 4096.
 
1706
//
 
1707
// This limit must be set to cover more than any expected workload of the
 
1708
// application. It is a bad practice and an unsupported use case to use the
 
1709
// database driver to define the concurrency limit of an application. Prevent
 
1710
// such concurrency "at the door" instead, by properly restricting the amount
 
1711
// of used resources and number of goroutines before they are created.
 
1712
func (s *Session) SetPoolLimit(limit int) {
 
1713
        s.m.Lock()
 
1714
        s.poolLimit = limit
 
1715
        s.m.Unlock()
 
1716
}
 
1717
 
 
1718
// SetBypassValidation sets whether the server should bypass the registered
 
1719
// validation expressions executed when documents are inserted or modified,
 
1720
// in the interest of preserving invariants in the collection being modified.
 
1721
// The default is to not bypass, and thus to perform the validation
 
1722
// expressions registered for modified collections.
 
1723
//
 
1724
// Document validation was introuced in MongoDB 3.2.
 
1725
//
 
1726
// Relevant documentation:
 
1727
//
 
1728
//   https://docs.mongodb.org/manual/release-notes/3.2/#bypass-validation
 
1729
//
 
1730
func (s *Session) SetBypassValidation(bypass bool) {
 
1731
        s.m.Lock()
 
1732
        s.bypassValidation = bypass
 
1733
        s.m.Unlock()
 
1734
}
 
1735
 
 
1736
// SetBatch sets the default batch size used when fetching documents from the
 
1737
// database. It's possible to change this setting on a per-query basis as
 
1738
// well, using the Query.Batch method.
 
1739
//
 
1740
// The default batch size is defined by the database itself.  As of this
 
1741
// writing, MongoDB will use an initial size of min(100 docs, 4MB) on the
 
1742
// first batch, and 4MB on remaining ones.
 
1743
func (s *Session) SetBatch(n int) {
 
1744
        if n == 1 {
 
1745
                // Server interprets 1 as -1 and closes the cursor (!?)
 
1746
                n = 2
 
1747
        }
 
1748
        s.m.Lock()
 
1749
        s.queryConfig.op.limit = int32(n)
 
1750
        s.m.Unlock()
 
1751
}
 
1752
 
 
1753
// SetPrefetch sets the default point at which the next batch of results will be
 
1754
// requested.  When there are p*batch_size remaining documents cached in an
 
1755
// Iter, the next batch will be requested in background. For instance, when
 
1756
// using this:
 
1757
//
 
1758
//     session.SetBatch(200)
 
1759
//     session.SetPrefetch(0.25)
 
1760
//
 
1761
// and there are only 50 documents cached in the Iter to be processed, the
 
1762
// next batch of 200 will be requested. It's possible to change this setting on
 
1763
// a per-query basis as well, using the Prefetch method of Query.
 
1764
//
 
1765
// The default prefetch value is 0.25.
 
1766
func (s *Session) SetPrefetch(p float64) {
 
1767
        s.m.Lock()
 
1768
        s.queryConfig.prefetch = p
 
1769
        s.m.Unlock()
 
1770
}
 
1771
 
 
1772
// See SetSafe for details on the Safe type.
 
1773
type Safe struct {
 
1774
        W        int    // Min # of servers to ack before success
 
1775
        WMode    string // Write mode for MongoDB 2.0+ (e.g. "majority")
 
1776
        WTimeout int    // Milliseconds to wait for W before timing out
 
1777
        FSync    bool   // Sync via the journal if present, or via data files sync otherwise
 
1778
        J        bool   // Sync via the journal if present
 
1779
}
 
1780
 
 
1781
// Safe returns the current safety mode for the session.
 
1782
func (s *Session) Safe() (safe *Safe) {
 
1783
        s.m.Lock()
 
1784
        defer s.m.Unlock()
 
1785
        if s.safeOp != nil {
 
1786
                cmd := s.safeOp.query.(*getLastError)
 
1787
                safe = &Safe{WTimeout: cmd.WTimeout, FSync: cmd.FSync, J: cmd.J}
 
1788
                switch w := cmd.W.(type) {
 
1789
                case string:
 
1790
                        safe.WMode = w
 
1791
                case int:
 
1792
                        safe.W = w
 
1793
                }
 
1794
        }
 
1795
        return
 
1796
}
 
1797
 
 
1798
// SetSafe changes the session safety mode.
 
1799
//
 
1800
// If the safe parameter is nil, the session is put in unsafe mode, and writes
 
1801
// become fire-and-forget, without error checking.  The unsafe mode is faster
 
1802
// since operations won't hold on waiting for a confirmation.
 
1803
//
 
1804
// If the safe parameter is not nil, any changing query (insert, update, ...)
 
1805
// will be followed by a getLastError command with the specified parameters,
 
1806
// to ensure the request was correctly processed.
 
1807
//
 
1808
// The safe.W parameter determines how many servers should confirm a write
 
1809
// before the operation is considered successful.  If set to 0 or 1, the
 
1810
// command will return as soon as the primary is done with the request.
 
1811
// If safe.WTimeout is greater than zero, it determines how many milliseconds
 
1812
// to wait for the safe.W servers to respond before returning an error.
 
1813
//
 
1814
// Starting with MongoDB 2.0.0 the safe.WMode parameter can be used instead
 
1815
// of W to request for richer semantics. If set to "majority" the server will
 
1816
// wait for a majority of members from the replica set to respond before
 
1817
// returning. Custom modes may also be defined within the server to create
 
1818
// very detailed placement schemas. See the data awareness documentation in
 
1819
// the links below for more details (note that MongoDB internally reuses the
 
1820
// "w" field name for WMode).
 
1821
//
 
1822
// If safe.J is true, servers will block until write operations have been
 
1823
// committed to the journal. Cannot be used in combination with FSync. Prior
 
1824
// to MongoDB 2.6 this option was ignored if the server was running without
 
1825
// journaling. Starting with MongoDB 2.6 write operations will fail with an
 
1826
// exception if this option is used when the server is running without
 
1827
// journaling.
 
1828
//
 
1829
// If safe.FSync is true and the server is running without journaling, blocks
 
1830
// until the server has synced all data files to disk. If the server is running
 
1831
// with journaling, this acts the same as the J option, blocking until write
 
1832
// operations have been committed to the journal. Cannot be used in
 
1833
// combination with J.
 
1834
//
 
1835
// Since MongoDB 2.0.0, the safe.J option can also be used instead of FSync
 
1836
// to force the server to wait for a group commit in case journaling is
 
1837
// enabled. The option has no effect if the server has journaling disabled.
 
1838
//
 
1839
// For example, the following statement will make the session check for
 
1840
// errors, without imposing further constraints:
 
1841
//
 
1842
//     session.SetSafe(&mgo.Safe{})
 
1843
//
 
1844
// The following statement will force the server to wait for a majority of
 
1845
// members of a replica set to return (MongoDB 2.0+ only):
 
1846
//
 
1847
//     session.SetSafe(&mgo.Safe{WMode: "majority"})
 
1848
//
 
1849
// The following statement, on the other hand, ensures that at least two
 
1850
// servers have flushed the change to disk before confirming the success
 
1851
// of operations:
 
1852
//
 
1853
//     session.EnsureSafe(&mgo.Safe{W: 2, FSync: true})
 
1854
//
 
1855
// The following statement, on the other hand, disables the verification
 
1856
// of errors entirely:
 
1857
//
 
1858
//     session.SetSafe(nil)
 
1859
//
 
1860
// See also the EnsureSafe method.
 
1861
//
 
1862
// Relevant documentation:
 
1863
//
 
1864
//     http://www.mongodb.org/display/DOCS/getLastError+Command
 
1865
//     http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
 
1866
//     http://www.mongodb.org/display/DOCS/Data+Center+Awareness
 
1867
//
 
1868
func (s *Session) SetSafe(safe *Safe) {
 
1869
        s.m.Lock()
 
1870
        s.safeOp = nil
 
1871
        s.ensureSafe(safe)
 
1872
        s.m.Unlock()
 
1873
}
 
1874
 
 
1875
// EnsureSafe compares the provided safety parameters with the ones
 
1876
// currently in use by the session and picks the most conservative
 
1877
// choice for each setting.
 
1878
//
 
1879
// That is:
 
1880
//
 
1881
//     - safe.WMode is always used if set.
 
1882
//     - safe.W is used if larger than the current W and WMode is empty.
 
1883
//     - safe.FSync is always used if true.
 
1884
//     - safe.J is used if FSync is false.
 
1885
//     - safe.WTimeout is used if set and smaller than the current WTimeout.
 
1886
//
 
1887
// For example, the following statement will ensure the session is
 
1888
// at least checking for errors, without enforcing further constraints.
 
1889
// If a more conservative SetSafe or EnsureSafe call was previously done,
 
1890
// the following call will be ignored.
 
1891
//
 
1892
//     session.EnsureSafe(&mgo.Safe{})
 
1893
//
 
1894
// See also the SetSafe method for details on what each option means.
 
1895
//
 
1896
// Relevant documentation:
 
1897
//
 
1898
//     http://www.mongodb.org/display/DOCS/getLastError+Command
 
1899
//     http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
 
1900
//     http://www.mongodb.org/display/DOCS/Data+Center+Awareness
 
1901
//
 
1902
func (s *Session) EnsureSafe(safe *Safe) {
 
1903
        s.m.Lock()
 
1904
        s.ensureSafe(safe)
 
1905
        s.m.Unlock()
 
1906
}
 
1907
 
 
1908
func (s *Session) ensureSafe(safe *Safe) {
 
1909
        if safe == nil {
 
1910
                return
 
1911
        }
 
1912
 
 
1913
        var w interface{}
 
1914
        if safe.WMode != "" {
 
1915
                w = safe.WMode
 
1916
        } else if safe.W > 0 {
 
1917
                w = safe.W
 
1918
        }
 
1919
 
 
1920
        var cmd getLastError
 
1921
        if s.safeOp == nil {
 
1922
                cmd = getLastError{1, w, safe.WTimeout, safe.FSync, safe.J}
 
1923
        } else {
 
1924
                // Copy.  We don't want to mutate the existing query.
 
1925
                cmd = *(s.safeOp.query.(*getLastError))
 
1926
                if cmd.W == nil {
 
1927
                        cmd.W = w
 
1928
                } else if safe.WMode != "" {
 
1929
                        cmd.W = safe.WMode
 
1930
                } else if i, ok := cmd.W.(int); ok && safe.W > i {
 
1931
                        cmd.W = safe.W
 
1932
                }
 
1933
                if safe.WTimeout > 0 && safe.WTimeout < cmd.WTimeout {
 
1934
                        cmd.WTimeout = safe.WTimeout
 
1935
                }
 
1936
                if safe.FSync {
 
1937
                        cmd.FSync = true
 
1938
                        cmd.J = false
 
1939
                } else if safe.J && !cmd.FSync {
 
1940
                        cmd.J = true
 
1941
                }
 
1942
        }
 
1943
        s.safeOp = &queryOp{
 
1944
                query:      &cmd,
 
1945
                collection: "admin.$cmd",
 
1946
                limit:      -1,
 
1947
        }
 
1948
}
 
1949
 
 
1950
// Run issues the provided command on the "admin" database and
 
1951
// and unmarshals its result in the respective argument. The cmd
 
1952
// argument may be either a string with the command name itself, in
 
1953
// which case an empty document of the form bson.M{cmd: 1} will be used,
 
1954
// or it may be a full command document.
 
1955
//
 
1956
// Note that MongoDB considers the first marshalled key as the command
 
1957
// name, so when providing a command with options, it's important to
 
1958
// use an ordering-preserving document, such as a struct value or an
 
1959
// instance of bson.D.  For instance:
 
1960
//
 
1961
//     db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
 
1962
//
 
1963
// For commands on arbitrary databases, see the Run method in
 
1964
// the Database type.
 
1965
//
 
1966
// Relevant documentation:
 
1967
//
 
1968
//     http://www.mongodb.org/display/DOCS/Commands
 
1969
//     http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
 
1970
//
 
1971
func (s *Session) Run(cmd interface{}, result interface{}) error {
 
1972
        return s.DB("admin").Run(cmd, result)
 
1973
}
 
1974
 
 
1975
// SelectServers restricts communication to servers configured with the
 
1976
// given tags. For example, the following statement restricts servers
 
1977
// used for reading operations to those with both tag "disk" set to
 
1978
// "ssd" and tag "rack" set to 1:
 
1979
//
 
1980
//     session.SelectServers(bson.D{{"disk", "ssd"}, {"rack", 1}})
 
1981
//
 
1982
// Multiple sets of tags may be provided, in which case the used server
 
1983
// must match all tags within any one set.
 
1984
//
 
1985
// If a connection was previously assigned to the session due to the
 
1986
// current session mode (see Session.SetMode), the tag selection will
 
1987
// only be enforced after the session is refreshed.
 
1988
//
 
1989
// Relevant documentation:
 
1990
//
 
1991
//     http://docs.mongodb.org/manual/tutorial/configure-replica-set-tag-sets
 
1992
//
 
1993
func (s *Session) SelectServers(tags ...bson.D) {
 
1994
        s.m.Lock()
 
1995
        s.queryConfig.op.serverTags = tags
 
1996
        s.m.Unlock()
 
1997
}
 
1998
 
 
1999
// Ping runs a trivial ping command just to get in touch with the server.
 
2000
func (s *Session) Ping() error {
 
2001
        return s.Run("ping", nil)
 
2002
}
 
2003
 
 
2004
// Fsync flushes in-memory writes to disk on the server the session
 
2005
// is established with. If async is true, the call returns immediately,
 
2006
// otherwise it returns after the flush has been made.
 
2007
func (s *Session) Fsync(async bool) error {
 
2008
        return s.Run(bson.D{{"fsync", 1}, {"async", async}}, nil)
 
2009
}
 
2010
 
 
2011
// FsyncLock locks all writes in the specific server the session is
 
2012
// established with and returns. Any writes attempted to the server
 
2013
// after it is successfully locked will block until FsyncUnlock is
 
2014
// called for the same server.
 
2015
//
 
2016
// This method works on secondaries as well, preventing the oplog from
 
2017
// being flushed while the server is locked, but since only the server
 
2018
// connected to is locked, for locking specific secondaries it may be
 
2019
// necessary to establish a connection directly to the secondary (see
 
2020
// Dial's connect=direct option).
 
2021
//
 
2022
// As an important caveat, note that once a write is attempted and
 
2023
// blocks, follow up reads will block as well due to the way the
 
2024
// lock is internally implemented in the server. More details at:
 
2025
//
 
2026
//     https://jira.mongodb.org/browse/SERVER-4243
 
2027
//
 
2028
// FsyncLock is often used for performing consistent backups of
 
2029
// the database files on disk.
 
2030
//
 
2031
// Relevant documentation:
 
2032
//
 
2033
//     http://www.mongodb.org/display/DOCS/fsync+Command
 
2034
//     http://www.mongodb.org/display/DOCS/Backups
 
2035
//
 
2036
func (s *Session) FsyncLock() error {
 
2037
        return s.Run(bson.D{{"fsync", 1}, {"lock", true}}, nil)
 
2038
}
 
2039
 
 
2040
// FsyncUnlock releases the server for writes. See FsyncLock for details.
 
2041
func (s *Session) FsyncUnlock() error {
 
2042
        err := s.Run(bson.D{{"fsyncUnlock", 1}}, nil)
 
2043
        if isNoCmd(err) {
 
2044
                err = s.DB("admin").C("$cmd.sys.unlock").Find(nil).One(nil) // WTF?
 
2045
        }
 
2046
        return err
 
2047
}
 
2048
 
 
2049
// Find prepares a query using the provided document.  The document may be a
 
2050
// map or a struct value capable of being marshalled with bson.  The map
 
2051
// may be a generic one using interface{} for its key and/or values, such as
 
2052
// bson.M, or it may be a properly typed map.  Providing nil as the document
 
2053
// is equivalent to providing an empty document such as bson.M{}.
 
2054
//
 
2055
// Further details of the query may be tweaked using the resulting Query value,
 
2056
// and then executed to retrieve results using methods such as One, For,
 
2057
// Iter, or Tail.
 
2058
//
 
2059
// In case the resulting document includes a field named $err or errmsg, which
 
2060
// are standard ways for MongoDB to return query errors, the returned err will
 
2061
// be set to a *QueryError value including the Err message and the Code.  In
 
2062
// those cases, the result argument is still unmarshalled into with the
 
2063
// received document so that any other custom values may be obtained if
 
2064
// desired.
 
2065
//
 
2066
// Relevant documentation:
 
2067
//
 
2068
//     http://www.mongodb.org/display/DOCS/Querying
 
2069
//     http://www.mongodb.org/display/DOCS/Advanced+Queries
 
2070
//
 
2071
func (c *Collection) Find(query interface{}) *Query {
 
2072
        session := c.Database.Session
 
2073
        session.m.RLock()
 
2074
        q := &Query{session: session, query: session.queryConfig}
 
2075
        session.m.RUnlock()
 
2076
        q.op.query = query
 
2077
        q.op.collection = c.FullName
 
2078
        return q
 
2079
}
 
2080
 
 
2081
type repairCmd struct {
 
2082
        RepairCursor string           `bson:"repairCursor"`
 
2083
        Cursor       *repairCmdCursor ",omitempty"
 
2084
}
 
2085
 
 
2086
type repairCmdCursor struct {
 
2087
        BatchSize int `bson:"batchSize,omitempty"`
 
2088
}
 
2089
 
 
2090
// Repair returns an iterator that goes over all recovered documents in the
 
2091
// collection, in a best-effort manner. This is most useful when there are
 
2092
// damaged data files. Multiple copies of the same document may be returned
 
2093
// by the iterator.
 
2094
//
 
2095
// Repair is supported in MongoDB 2.7.8 and later.
 
2096
func (c *Collection) Repair() *Iter {
 
2097
        // Clone session and set it to Monotonic mode so that the server
 
2098
        // used for the query may be safely obtained afterwards, if
 
2099
        // necessary for iteration when a cursor is received.
 
2100
        session := c.Database.Session
 
2101
        cloned := session.nonEventual()
 
2102
        defer cloned.Close()
 
2103
 
 
2104
        batchSize := int(cloned.queryConfig.op.limit)
 
2105
 
 
2106
        var result struct{ Cursor cursorData }
 
2107
 
 
2108
        cmd := repairCmd{
 
2109
                RepairCursor: c.Name,
 
2110
                Cursor:       &repairCmdCursor{batchSize},
 
2111
        }
 
2112
 
 
2113
        clonedc := c.With(cloned)
 
2114
        err := clonedc.Database.Run(cmd, &result)
 
2115
        return clonedc.NewIter(session, result.Cursor.FirstBatch, result.Cursor.Id, err)
 
2116
}
 
2117
 
 
2118
// FindId is a convenience helper equivalent to:
 
2119
//
 
2120
//     query := collection.Find(bson.M{"_id": id})
 
2121
//
 
2122
// See the Find method for more details.
 
2123
func (c *Collection) FindId(id interface{}) *Query {
 
2124
        return c.Find(bson.D{{"_id", id}})
 
2125
}
 
2126
 
 
2127
type Pipe struct {
 
2128
        session    *Session
 
2129
        collection *Collection
 
2130
        pipeline   interface{}
 
2131
        allowDisk  bool
 
2132
        batchSize  int
 
2133
}
 
2134
 
 
2135
type pipeCmd struct {
 
2136
        Aggregate string
 
2137
        Pipeline  interface{}
 
2138
        Cursor    *pipeCmdCursor ",omitempty"
 
2139
        Explain   bool           ",omitempty"
 
2140
        AllowDisk bool           "allowDiskUse,omitempty"
 
2141
}
 
2142
 
 
2143
type pipeCmdCursor struct {
 
2144
        BatchSize int `bson:"batchSize,omitempty"`
 
2145
}
 
2146
 
 
2147
// Pipe prepares a pipeline to aggregate. The pipeline document
 
2148
// must be a slice built in terms of the aggregation framework language.
 
2149
//
 
2150
// For example:
 
2151
//
 
2152
//     pipe := collection.Pipe([]bson.M{{"$match": bson.M{"name": "Otavio"}}})
 
2153
//     iter := pipe.Iter()
 
2154
//
 
2155
// Relevant documentation:
 
2156
//
 
2157
//     http://docs.mongodb.org/manual/reference/aggregation
 
2158
//     http://docs.mongodb.org/manual/applications/aggregation
 
2159
//     http://docs.mongodb.org/manual/tutorial/aggregation-examples
 
2160
//
 
2161
func (c *Collection) Pipe(pipeline interface{}) *Pipe {
 
2162
        session := c.Database.Session
 
2163
        session.m.RLock()
 
2164
        batchSize := int(session.queryConfig.op.limit)
 
2165
        session.m.RUnlock()
 
2166
        return &Pipe{
 
2167
                session:    session,
 
2168
                collection: c,
 
2169
                pipeline:   pipeline,
 
2170
                batchSize:  batchSize,
 
2171
        }
 
2172
}
 
2173
 
 
2174
// Iter executes the pipeline and returns an iterator capable of going
 
2175
// over all the generated results.
 
2176
func (p *Pipe) Iter() *Iter {
 
2177
        // Clone session and set it to Monotonic mode so that the server
 
2178
        // used for the query may be safely obtained afterwards, if
 
2179
        // necessary for iteration when a cursor is received.
 
2180
        cloned := p.session.nonEventual()
 
2181
        defer cloned.Close()
 
2182
        c := p.collection.With(cloned)
 
2183
 
 
2184
        var result struct {
 
2185
                Result []bson.Raw // 2.4, no cursors.
 
2186
                Cursor cursorData // 2.6+, with cursors.
 
2187
        }
 
2188
 
 
2189
        cmd := pipeCmd{
 
2190
                Aggregate: c.Name,
 
2191
                Pipeline:  p.pipeline,
 
2192
                AllowDisk: p.allowDisk,
 
2193
                Cursor:    &pipeCmdCursor{p.batchSize},
 
2194
        }
 
2195
        err := c.Database.Run(cmd, &result)
 
2196
        if e, ok := err.(*QueryError); ok && e.Message == `unrecognized field "cursor` {
 
2197
                cmd.Cursor = nil
 
2198
                cmd.AllowDisk = false
 
2199
                err = c.Database.Run(cmd, &result)
 
2200
        }
 
2201
        firstBatch := result.Result
 
2202
        if firstBatch == nil {
 
2203
                firstBatch = result.Cursor.FirstBatch
 
2204
        }
 
2205
        return c.NewIter(p.session, firstBatch, result.Cursor.Id, err)
 
2206
}
 
2207
 
 
2208
// NewIter returns a newly created iterator with the provided parameters.
 
2209
// Using this method is not recommended unless the desired functionality
 
2210
// is not yet exposed via a more convenient interface (Find, Pipe, etc).
 
2211
//
 
2212
// The optional session parameter associates the lifetime of the returned
 
2213
// iterator to an arbitrary session. If nil, the iterator will be bound to
 
2214
// c's session.
 
2215
//
 
2216
// Documents in firstBatch will be individually provided by the returned
 
2217
// iterator before documents from cursorId are made available. If cursorId
 
2218
// is zero, only the documents in firstBatch are provided.
 
2219
//
 
2220
// If err is not nil, the iterator's Err method will report it after
 
2221
// exhausting documents in firstBatch.
 
2222
//
 
2223
// NewIter must be called right after the cursor id is obtained, and must not
 
2224
// be called on a collection in Eventual mode, because the cursor id is
 
2225
// associated with the specific server that returned it. The provided session
 
2226
// parameter may be in any mode or state, though.
 
2227
//
 
2228
func (c *Collection) NewIter(session *Session, firstBatch []bson.Raw, cursorId int64, err error) *Iter {
 
2229
        var server *mongoServer
 
2230
        csession := c.Database.Session
 
2231
        csession.m.RLock()
 
2232
        socket := csession.masterSocket
 
2233
        if socket == nil {
 
2234
                socket = csession.slaveSocket
 
2235
        }
 
2236
        if socket != nil {
 
2237
                server = socket.Server()
 
2238
        }
 
2239
        csession.m.RUnlock()
 
2240
 
 
2241
        if server == nil {
 
2242
                if csession.Mode() == Eventual {
 
2243
                        panic("Collection.NewIter called in Eventual mode")
 
2244
                }
 
2245
                if err == nil {
 
2246
                        err = errors.New("server not available")
 
2247
                }
 
2248
        }
 
2249
 
 
2250
        if session == nil {
 
2251
                session = csession
 
2252
        }
 
2253
 
 
2254
        iter := &Iter{
 
2255
                session: session,
 
2256
                server:  server,
 
2257
                timeout: -1,
 
2258
                err:     err,
 
2259
        }
 
2260
        iter.gotReply.L = &iter.m
 
2261
        for _, doc := range firstBatch {
 
2262
                iter.docData.Push(doc.Data)
 
2263
        }
 
2264
        if cursorId != 0 {
 
2265
                iter.op.cursorId = cursorId
 
2266
                iter.op.collection = c.FullName
 
2267
                iter.op.replyFunc = iter.replyFunc()
 
2268
        }
 
2269
        return iter
 
2270
}
 
2271
 
 
2272
// All works like Iter.All.
 
2273
func (p *Pipe) All(result interface{}) error {
 
2274
        return p.Iter().All(result)
 
2275
}
 
2276
 
 
2277
// One executes the pipeline and unmarshals the first item from the
 
2278
// result set into the result parameter.
 
2279
// It returns ErrNotFound if no items are generated by the pipeline.
 
2280
func (p *Pipe) One(result interface{}) error {
 
2281
        iter := p.Iter()
 
2282
        if iter.Next(result) {
 
2283
                return nil
 
2284
        }
 
2285
        if err := iter.Err(); err != nil {
 
2286
                return err
 
2287
        }
 
2288
        return ErrNotFound
 
2289
}
 
2290
 
 
2291
// Explain returns a number of details about how the MongoDB server would
 
2292
// execute the requested pipeline, such as the number of objects examined,
 
2293
// the number of times the read lock was yielded to allow writes to go in,
 
2294
// and so on.
 
2295
//
 
2296
// For example:
 
2297
//
 
2298
//     var m bson.M
 
2299
//     err := collection.Pipe(pipeline).Explain(&m)
 
2300
//     if err == nil {
 
2301
//         fmt.Printf("Explain: %#v\n", m)
 
2302
//     }
 
2303
//
 
2304
func (p *Pipe) Explain(result interface{}) error {
 
2305
        c := p.collection
 
2306
        cmd := pipeCmd{
 
2307
                Aggregate: c.Name,
 
2308
                Pipeline:  p.pipeline,
 
2309
                AllowDisk: p.allowDisk,
 
2310
                Explain:   true,
 
2311
        }
 
2312
        return c.Database.Run(cmd, result)
 
2313
}
 
2314
 
 
2315
// AllowDiskUse enables writing to the "<dbpath>/_tmp" server directory so
 
2316
// that aggregation pipelines do not have to be held entirely in memory.
 
2317
func (p *Pipe) AllowDiskUse() *Pipe {
 
2318
        p.allowDisk = true
 
2319
        return p
 
2320
}
 
2321
 
 
2322
// Batch sets the batch size used when fetching documents from the database.
 
2323
// It's possible to change this setting on a per-session basis as well, using
 
2324
// the Batch method of Session.
 
2325
//
 
2326
// The default batch size is defined by the database server.
 
2327
func (p *Pipe) Batch(n int) *Pipe {
 
2328
        p.batchSize = n
 
2329
        return p
 
2330
}
 
2331
 
 
2332
// mgo.v3: Use a single user-visible error type.
 
2333
 
 
2334
type LastError struct {
 
2335
        Err             string
 
2336
        Code, N, Waited int
 
2337
        FSyncFiles      int `bson:"fsyncFiles"`
 
2338
        WTimeout        bool
 
2339
        UpdatedExisting bool        `bson:"updatedExisting"`
 
2340
        UpsertedId      interface{} `bson:"upserted"`
 
2341
 
 
2342
        modified int
 
2343
        ecases   []BulkErrorCase
 
2344
}
 
2345
 
 
2346
func (err *LastError) Error() string {
 
2347
        return err.Err
 
2348
}
 
2349
 
 
2350
type queryError struct {
 
2351
        Err           string "$err"
 
2352
        ErrMsg        string
 
2353
        Assertion     string
 
2354
        Code          int
 
2355
        AssertionCode int        "assertionCode"
 
2356
        LastError     *LastError "lastErrorObject"
 
2357
}
 
2358
 
 
2359
type QueryError struct {
 
2360
        Code      int
 
2361
        Message   string
 
2362
        Assertion bool
 
2363
}
 
2364
 
 
2365
func (err *QueryError) Error() string {
 
2366
        return err.Message
 
2367
}
 
2368
 
 
2369
// IsDup returns whether err informs of a duplicate key error because
 
2370
// a primary key index or a secondary unique index already has an entry
 
2371
// with the given value.
 
2372
func IsDup(err error) bool {
 
2373
        // Besides being handy, helps with MongoDB bugs SERVER-7164 and SERVER-11493.
 
2374
        // What follows makes me sad. Hopefully conventions will be more clear over time.
 
2375
        switch e := err.(type) {
 
2376
        case *LastError:
 
2377
                return e.Code == 11000 || e.Code == 11001 || e.Code == 12582 || e.Code == 16460 && strings.Contains(e.Err, " E11000 ")
 
2378
        case *QueryError:
 
2379
                return e.Code == 11000 || e.Code == 11001 || e.Code == 12582
 
2380
        case *BulkError:
 
2381
                for _, ecase := range e.ecases {
 
2382
                        if !IsDup(ecase.Err) {
 
2383
                                return false
 
2384
                        }
 
2385
                }
 
2386
                return true
 
2387
        }
 
2388
        return false
 
2389
}
 
2390
 
 
2391
// Insert inserts one or more documents in the respective collection.  In
 
2392
// case the session is in safe mode (see the SetSafe method) and an error
 
2393
// happens while inserting the provided documents, the returned error will
 
2394
// be of type *LastError.
 
2395
func (c *Collection) Insert(docs ...interface{}) error {
 
2396
        _, err := c.writeOp(&insertOp{c.FullName, docs, 0}, true)
 
2397
        return err
 
2398
}
 
2399
 
 
2400
// Update finds a single document matching the provided selector document
 
2401
// and modifies it according to the update document.
 
2402
// If the session is in safe mode (see SetSafe) a ErrNotFound error is
 
2403
// returned if a document isn't found, or a value of type *LastError
 
2404
// when some other error is detected.
 
2405
//
 
2406
// Relevant documentation:
 
2407
//
 
2408
//     http://www.mongodb.org/display/DOCS/Updating
 
2409
//     http://www.mongodb.org/display/DOCS/Atomic+Operations
 
2410
//
 
2411
func (c *Collection) Update(selector interface{}, update interface{}) error {
 
2412
        if selector == nil {
 
2413
                selector = bson.D{}
 
2414
        }
 
2415
        op := updateOp{
 
2416
                Collection: c.FullName,
 
2417
                Selector:   selector,
 
2418
                Update:     update,
 
2419
        }
 
2420
        lerr, err := c.writeOp(&op, true)
 
2421
        if err == nil && lerr != nil && !lerr.UpdatedExisting {
 
2422
                return ErrNotFound
 
2423
        }
 
2424
        return err
 
2425
}
 
2426
 
 
2427
// UpdateId is a convenience helper equivalent to:
 
2428
//
 
2429
//     err := collection.Update(bson.M{"_id": id}, update)
 
2430
//
 
2431
// See the Update method for more details.
 
2432
func (c *Collection) UpdateId(id interface{}, update interface{}) error {
 
2433
        return c.Update(bson.D{{"_id", id}}, update)
 
2434
}
 
2435
 
 
2436
// ChangeInfo holds details about the outcome of an update operation.
 
2437
type ChangeInfo struct {
 
2438
        // Updated reports the number of existing documents modified.
 
2439
        // Due to server limitations, this reports the same value as the Matched field when
 
2440
        // talking to MongoDB <= 2.4 and on Upsert and Apply (findAndModify) operations.
 
2441
        Updated    int
 
2442
        Removed    int         // Number of documents removed
 
2443
        Matched    int         // Number of documents matched but not necessarily changed
 
2444
        UpsertedId interface{} // Upserted _id field, when not explicitly provided
 
2445
}
 
2446
 
 
2447
// UpdateAll finds all documents matching the provided selector document
 
2448
// and modifies them according to the update document.
 
2449
// If the session is in safe mode (see SetSafe) details of the executed
 
2450
// operation are returned in info or an error of type *LastError when
 
2451
// some problem is detected. It is not an error for the update to not be
 
2452
// applied on any documents because the selector doesn't match.
 
2453
//
 
2454
// Relevant documentation:
 
2455
//
 
2456
//     http://www.mongodb.org/display/DOCS/Updating
 
2457
//     http://www.mongodb.org/display/DOCS/Atomic+Operations
 
2458
//
 
2459
func (c *Collection) UpdateAll(selector interface{}, update interface{}) (info *ChangeInfo, err error) {
 
2460
        if selector == nil {
 
2461
                selector = bson.D{}
 
2462
        }
 
2463
        op := updateOp{
 
2464
                Collection: c.FullName,
 
2465
                Selector:   selector,
 
2466
                Update:     update,
 
2467
                Flags:      2,
 
2468
                Multi:      true,
 
2469
        }
 
2470
        lerr, err := c.writeOp(&op, true)
 
2471
        if err == nil && lerr != nil {
 
2472
                info = &ChangeInfo{Updated: lerr.modified, Matched: lerr.N}
 
2473
        }
 
2474
        return info, err
 
2475
}
 
2476
 
 
2477
// Upsert finds a single document matching the provided selector document
 
2478
// and modifies it according to the update document.  If no document matching
 
2479
// the selector is found, the update document is applied to the selector
 
2480
// document and the result is inserted in the collection.
 
2481
// If the session is in safe mode (see SetSafe) details of the executed
 
2482
// operation are returned in info, or an error of type *LastError when
 
2483
// some problem is detected.
 
2484
//
 
2485
// Relevant documentation:
 
2486
//
 
2487
//     http://www.mongodb.org/display/DOCS/Updating
 
2488
//     http://www.mongodb.org/display/DOCS/Atomic+Operations
 
2489
//
 
2490
func (c *Collection) Upsert(selector interface{}, update interface{}) (info *ChangeInfo, err error) {
 
2491
        if selector == nil {
 
2492
                selector = bson.D{}
 
2493
        }
 
2494
        op := updateOp{
 
2495
                Collection: c.FullName,
 
2496
                Selector:   selector,
 
2497
                Update:     update,
 
2498
                Flags:      1,
 
2499
                Upsert:     true,
 
2500
        }
 
2501
        var lerr *LastError
 
2502
        // <= to allow for the first attempt (not a retry).
 
2503
        for i := 0; i <= maxUpsertRetries; i++ {
 
2504
                lerr, err = c.writeOp(&op, true)
 
2505
                // Retry duplicate key errors on upserts.
 
2506
                // https://docs.mongodb.com/v3.2/reference/method/db.collection.update/#use-unique-indexes
 
2507
                if !IsDup(err) {
 
2508
                        break
 
2509
                }
 
2510
        }
 
2511
        if err == nil && lerr != nil {
 
2512
                info = &ChangeInfo{}
 
2513
                if lerr.UpdatedExisting {
 
2514
                        info.Matched = lerr.N
 
2515
                        info.Updated = lerr.modified
 
2516
                } else {
 
2517
                        info.UpsertedId = lerr.UpsertedId
 
2518
                }
 
2519
        }
 
2520
        return info, err
 
2521
}
 
2522
 
 
2523
// UpsertId is a convenience helper equivalent to:
 
2524
//
 
2525
//     info, err := collection.Upsert(bson.M{"_id": id}, update)
 
2526
//
 
2527
// See the Upsert method for more details.
 
2528
func (c *Collection) UpsertId(id interface{}, update interface{}) (info *ChangeInfo, err error) {
 
2529
        return c.Upsert(bson.D{{"_id", id}}, update)
 
2530
}
 
2531
 
 
2532
// Remove finds a single document matching the provided selector document
 
2533
// and removes it from the database.
 
2534
// If the session is in safe mode (see SetSafe) a ErrNotFound error is
 
2535
// returned if a document isn't found, or a value of type *LastError
 
2536
// when some other error is detected.
 
2537
//
 
2538
// Relevant documentation:
 
2539
//
 
2540
//     http://www.mongodb.org/display/DOCS/Removing
 
2541
//
 
2542
func (c *Collection) Remove(selector interface{}) error {
 
2543
        if selector == nil {
 
2544
                selector = bson.D{}
 
2545
        }
 
2546
        lerr, err := c.writeOp(&deleteOp{c.FullName, selector, 1, 1}, true)
 
2547
        if err == nil && lerr != nil && lerr.N == 0 {
 
2548
                return ErrNotFound
 
2549
        }
 
2550
        return err
 
2551
}
 
2552
 
 
2553
// RemoveId is a convenience helper equivalent to:
 
2554
//
 
2555
//     err := collection.Remove(bson.M{"_id": id})
 
2556
//
 
2557
// See the Remove method for more details.
 
2558
func (c *Collection) RemoveId(id interface{}) error {
 
2559
        return c.Remove(bson.D{{"_id", id}})
 
2560
}
 
2561
 
 
2562
// RemoveAll finds all documents matching the provided selector document
 
2563
// and removes them from the database.  In case the session is in safe mode
 
2564
// (see the SetSafe method) and an error happens when attempting the change,
 
2565
// the returned error will be of type *LastError.
 
2566
//
 
2567
// Relevant documentation:
 
2568
//
 
2569
//     http://www.mongodb.org/display/DOCS/Removing
 
2570
//
 
2571
func (c *Collection) RemoveAll(selector interface{}) (info *ChangeInfo, err error) {
 
2572
        if selector == nil {
 
2573
                selector = bson.D{}
 
2574
        }
 
2575
        lerr, err := c.writeOp(&deleteOp{c.FullName, selector, 0, 0}, true)
 
2576
        if err == nil && lerr != nil {
 
2577
                info = &ChangeInfo{Removed: lerr.N, Matched: lerr.N}
 
2578
        }
 
2579
        return info, err
 
2580
}
 
2581
 
 
2582
// DropDatabase removes the entire database including all of its collections.
 
2583
func (db *Database) DropDatabase() error {
 
2584
        return db.Run(bson.D{{"dropDatabase", 1}}, nil)
 
2585
}
 
2586
 
 
2587
// DropCollection removes the entire collection including all of its documents.
 
2588
func (c *Collection) DropCollection() error {
 
2589
        return c.Database.Run(bson.D{{"drop", c.Name}}, nil)
 
2590
}
 
2591
 
 
2592
// The CollectionInfo type holds metadata about a collection.
 
2593
//
 
2594
// Relevant documentation:
 
2595
//
 
2596
//     http://www.mongodb.org/display/DOCS/createCollection+Command
 
2597
//     http://www.mongodb.org/display/DOCS/Capped+Collections
 
2598
//
 
2599
type CollectionInfo struct {
 
2600
        // DisableIdIndex prevents the automatic creation of the index
 
2601
        // on the _id field for the collection.
 
2602
        DisableIdIndex bool
 
2603
 
 
2604
        // ForceIdIndex enforces the automatic creation of the index
 
2605
        // on the _id field for the collection. Capped collections,
 
2606
        // for example, do not have such an index by default.
 
2607
        ForceIdIndex bool
 
2608
 
 
2609
        // If Capped is true new documents will replace old ones when
 
2610
        // the collection is full. MaxBytes must necessarily be set
 
2611
        // to define the size when the collection wraps around.
 
2612
        // MaxDocs optionally defines the number of documents when it
 
2613
        // wraps, but MaxBytes still needs to be set.
 
2614
        Capped   bool
 
2615
        MaxBytes int
 
2616
        MaxDocs  int
 
2617
 
 
2618
        // Validator contains a validation expression that defines which
 
2619
        // documents should be considered valid for this collection.
 
2620
        Validator interface{}
 
2621
 
 
2622
        // ValidationLevel may be set to "strict" (the default) to force
 
2623
        // MongoDB to validate all documents on inserts and updates, to
 
2624
        // "moderate" to apply the validation rules only to documents
 
2625
        // that already fulfill the validation criteria, or to "off" for
 
2626
        // disabling validation entirely.
 
2627
        ValidationLevel string
 
2628
 
 
2629
        // ValidationAction determines how MongoDB handles documents that
 
2630
        // violate the validation rules. It may be set to "error" (the default)
 
2631
        // to reject inserts or updates that violate the rules, or to "warn"
 
2632
        // to log invalid operations but allow them to proceed.
 
2633
        ValidationAction string
 
2634
 
 
2635
        // StorageEngine allows specifying collection options for the
 
2636
        // storage engine in use. The map keys must hold the storage engine
 
2637
        // name for which options are being specified.
 
2638
        StorageEngine interface{}
 
2639
}
 
2640
 
 
2641
// Create explicitly creates the c collection with details of info.
 
2642
// MongoDB creates collections automatically on use, so this method
 
2643
// is only necessary when creating collection with non-default
 
2644
// characteristics, such as capped collections.
 
2645
//
 
2646
// Relevant documentation:
 
2647
//
 
2648
//     http://www.mongodb.org/display/DOCS/createCollection+Command
 
2649
//     http://www.mongodb.org/display/DOCS/Capped+Collections
 
2650
//
 
2651
func (c *Collection) Create(info *CollectionInfo) error {
 
2652
        cmd := make(bson.D, 0, 4)
 
2653
        cmd = append(cmd, bson.DocElem{"create", c.Name})
 
2654
        if info.Capped {
 
2655
                if info.MaxBytes < 1 {
 
2656
                        return fmt.Errorf("Collection.Create: with Capped, MaxBytes must also be set")
 
2657
                }
 
2658
                cmd = append(cmd, bson.DocElem{"capped", true})
 
2659
                cmd = append(cmd, bson.DocElem{"size", info.MaxBytes})
 
2660
                if info.MaxDocs > 0 {
 
2661
                        cmd = append(cmd, bson.DocElem{"max", info.MaxDocs})
 
2662
                }
 
2663
        }
 
2664
        if info.DisableIdIndex {
 
2665
                cmd = append(cmd, bson.DocElem{"autoIndexId", false})
 
2666
        }
 
2667
        if info.ForceIdIndex {
 
2668
                cmd = append(cmd, bson.DocElem{"autoIndexId", true})
 
2669
        }
 
2670
        if info.Validator != nil {
 
2671
                cmd = append(cmd, bson.DocElem{"validator", info.Validator})
 
2672
        }
 
2673
        if info.ValidationLevel != "" {
 
2674
                cmd = append(cmd, bson.DocElem{"validationLevel", info.ValidationLevel})
 
2675
        }
 
2676
        if info.ValidationAction != "" {
 
2677
                cmd = append(cmd, bson.DocElem{"validationAction", info.ValidationAction})
 
2678
        }
 
2679
        if info.StorageEngine != nil {
 
2680
                cmd = append(cmd, bson.DocElem{"storageEngine", info.StorageEngine})
 
2681
        }
 
2682
        return c.Database.Run(cmd, nil)
 
2683
}
 
2684
 
 
2685
// Batch sets the batch size used when fetching documents from the database.
 
2686
// It's possible to change this setting on a per-session basis as well, using
 
2687
// the Batch method of Session.
 
2688
 
 
2689
// The default batch size is defined by the database itself.  As of this
 
2690
// writing, MongoDB will use an initial size of min(100 docs, 4MB) on the
 
2691
// first batch, and 4MB on remaining ones.
 
2692
func (q *Query) Batch(n int) *Query {
 
2693
        if n == 1 {
 
2694
                // Server interprets 1 as -1 and closes the cursor (!?)
 
2695
                n = 2
 
2696
        }
 
2697
        q.m.Lock()
 
2698
        q.op.limit = int32(n)
 
2699
        q.m.Unlock()
 
2700
        return q
 
2701
}
 
2702
 
 
2703
// Prefetch sets the point at which the next batch of results will be requested.
 
2704
// When there are p*batch_size remaining documents cached in an Iter, the next
 
2705
// batch will be requested in background. For instance, when using this:
 
2706
//
 
2707
//     query.Batch(200).Prefetch(0.25)
 
2708
//
 
2709
// and there are only 50 documents cached in the Iter to be processed, the
 
2710
// next batch of 200 will be requested. It's possible to change this setting on
 
2711
// a per-session basis as well, using the SetPrefetch method of Session.
 
2712
//
 
2713
// The default prefetch value is 0.25.
 
2714
func (q *Query) Prefetch(p float64) *Query {
 
2715
        q.m.Lock()
 
2716
        q.prefetch = p
 
2717
        q.m.Unlock()
 
2718
        return q
 
2719
}
 
2720
 
 
2721
// Skip skips over the n initial documents from the query results.  Note that
 
2722
// this only makes sense with capped collections where documents are naturally
 
2723
// ordered by insertion time, or with sorted results.
 
2724
func (q *Query) Skip(n int) *Query {
 
2725
        q.m.Lock()
 
2726
        q.op.skip = int32(n)
 
2727
        q.m.Unlock()
 
2728
        return q
 
2729
}
 
2730
 
 
2731
// Limit restricts the maximum number of documents retrieved to n, and also
 
2732
// changes the batch size to the same value.  Once n documents have been
 
2733
// returned by Next, the following call will return ErrNotFound.
 
2734
func (q *Query) Limit(n int) *Query {
 
2735
        q.m.Lock()
 
2736
        switch {
 
2737
        case n == 1:
 
2738
                q.limit = 1
 
2739
                q.op.limit = -1
 
2740
        case n == math.MinInt32: // -MinInt32 == -MinInt32
 
2741
                q.limit = math.MaxInt32
 
2742
                q.op.limit = math.MinInt32 + 1
 
2743
        case n < 0:
 
2744
                q.limit = int32(-n)
 
2745
                q.op.limit = int32(n)
 
2746
        default:
 
2747
                q.limit = int32(n)
 
2748
                q.op.limit = int32(n)
 
2749
        }
 
2750
        q.m.Unlock()
 
2751
        return q
 
2752
}
 
2753
 
 
2754
// Select enables selecting which fields should be retrieved for the results
 
2755
// found. For example, the following query would only retrieve the name field:
 
2756
//
 
2757
//     err := collection.Find(nil).Select(bson.M{"name": 1}).One(&result)
 
2758
//
 
2759
// Relevant documentation:
 
2760
//
 
2761
//     http://www.mongodb.org/display/DOCS/Retrieving+a+Subset+of+Fields
 
2762
//
 
2763
func (q *Query) Select(selector interface{}) *Query {
 
2764
        q.m.Lock()
 
2765
        q.op.selector = selector
 
2766
        q.m.Unlock()
 
2767
        return q
 
2768
}
 
2769
 
 
2770
// Sort asks the database to order returned documents according to the
 
2771
// provided field names. A field name may be prefixed by - (minus) for
 
2772
// it to be sorted in reverse order.
 
2773
//
 
2774
// For example:
 
2775
//
 
2776
//     query1 := collection.Find(nil).Sort("firstname", "lastname")
 
2777
//     query2 := collection.Find(nil).Sort("-age")
 
2778
//     query3 := collection.Find(nil).Sort("$natural")
 
2779
//     query4 := collection.Find(nil).Select(bson.M{"score": bson.M{"$meta": "textScore"}}).Sort("$textScore:score")
 
2780
//
 
2781
// Relevant documentation:
 
2782
//
 
2783
//     http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
 
2784
//
 
2785
func (q *Query) Sort(fields ...string) *Query {
 
2786
        q.m.Lock()
 
2787
        var order bson.D
 
2788
        for _, field := range fields {
 
2789
                n := 1
 
2790
                var kind string
 
2791
                if field != "" {
 
2792
                        if field[0] == '$' {
 
2793
                                if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 {
 
2794
                                        kind = field[1:c]
 
2795
                                        field = field[c+1:]
 
2796
                                }
 
2797
                        }
 
2798
                        switch field[0] {
 
2799
                        case '+':
 
2800
                                field = field[1:]
 
2801
                        case '-':
 
2802
                                n = -1
 
2803
                                field = field[1:]
 
2804
                        }
 
2805
                }
 
2806
                if field == "" {
 
2807
                        panic("Sort: empty field name")
 
2808
                }
 
2809
                if kind == "textScore" {
 
2810
                        order = append(order, bson.DocElem{field, bson.M{"$meta": kind}})
 
2811
                } else {
 
2812
                        order = append(order, bson.DocElem{field, n})
 
2813
                }
 
2814
        }
 
2815
        q.op.options.OrderBy = order
 
2816
        q.op.hasOptions = true
 
2817
        q.m.Unlock()
 
2818
        return q
 
2819
}
 
2820
 
 
2821
// Explain returns a number of details about how the MongoDB server would
 
2822
// execute the requested query, such as the number of objects examined,
 
2823
// the number of times the read lock was yielded to allow writes to go in,
 
2824
// and so on.
 
2825
//
 
2826
// For example:
 
2827
//
 
2828
//     m := bson.M{}
 
2829
//     err := collection.Find(bson.M{"filename": name}).Explain(m)
 
2830
//     if err == nil {
 
2831
//         fmt.Printf("Explain: %#v\n", m)
 
2832
//     }
 
2833
//
 
2834
// Relevant documentation:
 
2835
//
 
2836
//     http://www.mongodb.org/display/DOCS/Optimization
 
2837
//     http://www.mongodb.org/display/DOCS/Query+Optimizer
 
2838
//
 
2839
func (q *Query) Explain(result interface{}) error {
 
2840
        q.m.Lock()
 
2841
        clone := &Query{session: q.session, query: q.query}
 
2842
        q.m.Unlock()
 
2843
        clone.op.options.Explain = true
 
2844
        clone.op.hasOptions = true
 
2845
        if clone.op.limit > 0 {
 
2846
                clone.op.limit = -q.op.limit
 
2847
        }
 
2848
        iter := clone.Iter()
 
2849
        if iter.Next(result) {
 
2850
                return nil
 
2851
        }
 
2852
        return iter.Close()
 
2853
}
 
2854
 
 
2855
// TODO: Add Collection.Explain. See https://goo.gl/1MDlvz.
 
2856
 
 
2857
// Hint will include an explicit "hint" in the query to force the server
 
2858
// to use a specified index, potentially improving performance in some
 
2859
// situations.  The provided parameters are the fields that compose the
 
2860
// key of the index to be used.  For details on how the indexKey may be
 
2861
// built, see the EnsureIndex method.
 
2862
//
 
2863
// For example:
 
2864
//
 
2865
//     query := collection.Find(bson.M{"firstname": "Joe", "lastname": "Winter"})
 
2866
//     query.Hint("lastname", "firstname")
 
2867
//
 
2868
// Relevant documentation:
 
2869
//
 
2870
//     http://www.mongodb.org/display/DOCS/Optimization
 
2871
//     http://www.mongodb.org/display/DOCS/Query+Optimizer
 
2872
//
 
2873
func (q *Query) Hint(indexKey ...string) *Query {
 
2874
        q.m.Lock()
 
2875
        keyInfo, err := parseIndexKey(indexKey)
 
2876
        q.op.options.Hint = keyInfo.key
 
2877
        q.op.hasOptions = true
 
2878
        q.m.Unlock()
 
2879
        if err != nil {
 
2880
                panic(err)
 
2881
        }
 
2882
        return q
 
2883
}
 
2884
 
 
2885
// SetMaxScan constrains the query to stop after scanning the specified
 
2886
// number of documents.
 
2887
//
 
2888
// This modifier is generally used to prevent potentially long running
 
2889
// queries from disrupting performance by scanning through too much data.
 
2890
func (q *Query) SetMaxScan(n int) *Query {
 
2891
        q.m.Lock()
 
2892
        q.op.options.MaxScan = n
 
2893
        q.op.hasOptions = true
 
2894
        q.m.Unlock()
 
2895
        return q
 
2896
}
 
2897
 
 
2898
// SetMaxTime constrains the query to stop after running for the specified time.
 
2899
//
 
2900
// When the time limit is reached MongoDB automatically cancels the query.
 
2901
// This can be used to efficiently prevent and identify unexpectedly slow queries.
 
2902
//
 
2903
// A few important notes about the mechanism enforcing this limit:
 
2904
//
 
2905
//  - Requests can block behind locking operations on the server, and that blocking
 
2906
//    time is not accounted for. In other words, the timer starts ticking only after
 
2907
//    the actual start of the query when it initially acquires the appropriate lock;
 
2908
//
 
2909
//  - Operations are interrupted only at interrupt points where an operation can be
 
2910
//    safely aborted – the total execution time may exceed the specified value;
 
2911
//
 
2912
//  - The limit can be applied to both CRUD operations and commands, but not all
 
2913
//    commands are interruptible;
 
2914
//
 
2915
//  - While iterating over results, computing follow up batches is included in the
 
2916
//    total time and the iteration continues until the alloted time is over, but
 
2917
//    network roundtrips are not taken into account for the limit.
 
2918
//
 
2919
//  - This limit does not override the inactive cursor timeout for idle cursors
 
2920
//    (default is 10 min).
 
2921
//
 
2922
// This mechanism was introduced in MongoDB 2.6.
 
2923
//
 
2924
// Relevant documentation:
 
2925
//
 
2926
//   http://blog.mongodb.org/post/83621787773/maxtimems-and-query-optimizer-introspection-in
 
2927
//
 
2928
func (q *Query) SetMaxTime(d time.Duration) *Query {
 
2929
        q.m.Lock()
 
2930
        q.op.options.MaxTimeMS = int(d / time.Millisecond)
 
2931
        q.op.hasOptions = true
 
2932
        q.m.Unlock()
 
2933
        return q
 
2934
}
 
2935
 
 
2936
// Snapshot will force the performed query to make use of an available
 
2937
// index on the _id field to prevent the same document from being returned
 
2938
// more than once in a single iteration. This might happen without this
 
2939
// setting in situations when the document changes in size and thus has to
 
2940
// be moved while the iteration is running.
 
2941
//
 
2942
// Because snapshot mode traverses the _id index, it may not be used with
 
2943
// sorting or explicit hints. It also cannot use any other index for the
 
2944
// query.
 
2945
//
 
2946
// Even with snapshot mode, items inserted or deleted during the query may
 
2947
// or may not be returned; that is, this mode is not a true point-in-time
 
2948
// snapshot.
 
2949
//
 
2950
// The same effect of Snapshot may be obtained by using any unique index on
 
2951
// field(s) that will not be modified (best to use Hint explicitly too).
 
2952
// A non-unique index (such as creation time) may be made unique by
 
2953
// appending _id to the index when creating it.
 
2954
//
 
2955
// Relevant documentation:
 
2956
//
 
2957
//     http://www.mongodb.org/display/DOCS/How+to+do+Snapshotted+Queries+in+the+Mongo+Database
 
2958
//
 
2959
func (q *Query) Snapshot() *Query {
 
2960
        q.m.Lock()
 
2961
        q.op.options.Snapshot = true
 
2962
        q.op.hasOptions = true
 
2963
        q.m.Unlock()
 
2964
        return q
 
2965
}
 
2966
 
 
2967
// Comment adds a comment to the query to identify it in the database profiler output.
 
2968
//
 
2969
// Relevant documentation:
 
2970
//
 
2971
//     http://docs.mongodb.org/manual/reference/operator/meta/comment
 
2972
//     http://docs.mongodb.org/manual/reference/command/profile
 
2973
//     http://docs.mongodb.org/manual/administration/analyzing-mongodb-performance/#database-profiling
 
2974
//
 
2975
func (q *Query) Comment(comment string) *Query {
 
2976
        q.m.Lock()
 
2977
        q.op.options.Comment = comment
 
2978
        q.op.hasOptions = true
 
2979
        q.m.Unlock()
 
2980
        return q
 
2981
}
 
2982
 
 
2983
// LogReplay enables an option that optimizes queries that are typically
 
2984
// made on the MongoDB oplog for replaying it. This is an internal
 
2985
// implementation aspect and most likely uninteresting for other uses.
 
2986
// It has seen at least one use case, though, so it's exposed via the API.
 
2987
func (q *Query) LogReplay() *Query {
 
2988
        q.m.Lock()
 
2989
        q.op.flags |= flagLogReplay
 
2990
        q.m.Unlock()
 
2991
        return q
 
2992
}
 
2993
 
 
2994
func checkQueryError(fullname string, d []byte) error {
 
2995
        l := len(d)
 
2996
        if l < 16 {
 
2997
                return nil
 
2998
        }
 
2999
        if d[5] == '$' && d[6] == 'e' && d[7] == 'r' && d[8] == 'r' && d[9] == '\x00' && d[4] == '\x02' {
 
3000
                goto Error
 
3001
        }
 
3002
        if len(fullname) < 5 || fullname[len(fullname)-5:] != ".$cmd" {
 
3003
                return nil
 
3004
        }
 
3005
        for i := 0; i+8 < l; i++ {
 
3006
                if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' {
 
3007
                        goto Error
 
3008
                }
 
3009
        }
 
3010
        return nil
 
3011
 
 
3012
Error:
 
3013
        result := &queryError{}
 
3014
        bson.Unmarshal(d, result)
 
3015
        if result.LastError != nil {
 
3016
                return result.LastError
 
3017
        }
 
3018
        if result.Err == "" && result.ErrMsg == "" {
 
3019
                return nil
 
3020
        }
 
3021
        if result.AssertionCode != 0 && result.Assertion != "" {
 
3022
                return &QueryError{Code: result.AssertionCode, Message: result.Assertion, Assertion: true}
 
3023
        }
 
3024
        if result.Err != "" {
 
3025
                return &QueryError{Code: result.Code, Message: result.Err}
 
3026
        }
 
3027
        return &QueryError{Code: result.Code, Message: result.ErrMsg}
 
3028
}
 
3029
 
 
3030
// One executes the query and unmarshals the first obtained document into the
 
3031
// result argument.  The result must be a struct or map value capable of being
 
3032
// unmarshalled into by gobson.  This function blocks until either a result
 
3033
// is available or an error happens.  For example:
 
3034
//
 
3035
//     err := collection.Find(bson.M{"a": 1}).One(&result)
 
3036
//
 
3037
// In case the resulting document includes a field named $err or errmsg, which
 
3038
// are standard ways for MongoDB to return query errors, the returned err will
 
3039
// be set to a *QueryError value including the Err message and the Code.  In
 
3040
// those cases, the result argument is still unmarshalled into with the
 
3041
// received document so that any other custom values may be obtained if
 
3042
// desired.
 
3043
//
 
3044
func (q *Query) One(result interface{}) (err error) {
 
3045
        q.m.Lock()
 
3046
        session := q.session
 
3047
        op := q.op // Copy.
 
3048
        q.m.Unlock()
 
3049
 
 
3050
        socket, err := session.acquireSocket(true)
 
3051
        if err != nil {
 
3052
                return err
 
3053
        }
 
3054
        defer socket.Release()
 
3055
 
 
3056
        op.limit = -1
 
3057
 
 
3058
        session.prepareQuery(&op)
 
3059
 
 
3060
        expectFindReply := prepareFindOp(socket, &op, 1)
 
3061
 
 
3062
        data, err := socket.SimpleQuery(&op)
 
3063
        if err != nil {
 
3064
                return err
 
3065
        }
 
3066
        if data == nil {
 
3067
                return ErrNotFound
 
3068
        }
 
3069
        if expectFindReply {
 
3070
                var findReply struct {
 
3071
                        Ok     bool
 
3072
                        Code   int
 
3073
                        Errmsg string
 
3074
                        Cursor cursorData
 
3075
                }
 
3076
                err = bson.Unmarshal(data, &findReply)
 
3077
                if err != nil {
 
3078
                        return err
 
3079
                }
 
3080
                if !findReply.Ok && findReply.Errmsg != "" {
 
3081
                        return &QueryError{Code: findReply.Code, Message: findReply.Errmsg}
 
3082
                }
 
3083
                if len(findReply.Cursor.FirstBatch) == 0 {
 
3084
                        return ErrNotFound
 
3085
                }
 
3086
                data = findReply.Cursor.FirstBatch[0].Data
 
3087
        }
 
3088
        if result != nil {
 
3089
                err = bson.Unmarshal(data, result)
 
3090
                if err == nil {
 
3091
                        debugf("Query %p document unmarshaled: %#v", q, result)
 
3092
                } else {
 
3093
                        debugf("Query %p document unmarshaling failed: %#v", q, err)
 
3094
                        return err
 
3095
                }
 
3096
        }
 
3097
        return checkQueryError(op.collection, data)
 
3098
}
 
3099
 
 
3100
// prepareFindOp translates op from being an old-style wire protocol query into
 
3101
// a new-style find command if that's supported by the MongoDB server (3.2+).
 
3102
// It returns whether to expect a find command result or not. Note op may be
 
3103
// translated into an explain command, in which case the function returns false.
 
3104
func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32) bool {
 
3105
        if socket.ServerInfo().MaxWireVersion < 4 || op.collection == "admin.$cmd" {
 
3106
                return false
 
3107
        }
 
3108
 
 
3109
        nameDot := strings.Index(op.collection, ".")
 
3110
        if nameDot < 0 {
 
3111
                panic("invalid query collection name: " + op.collection)
 
3112
        }
 
3113
 
 
3114
        find := findCmd{
 
3115
                Collection:  op.collection[nameDot+1:],
 
3116
                Filter:      op.query,
 
3117
                Projection:  op.selector,
 
3118
                Sort:        op.options.OrderBy,
 
3119
                Skip:        op.skip,
 
3120
                Limit:       limit,
 
3121
                MaxTimeMS:   op.options.MaxTimeMS,
 
3122
                MaxScan:     op.options.MaxScan,
 
3123
                Hint:        op.options.Hint,
 
3124
                Comment:     op.options.Comment,
 
3125
                Snapshot:    op.options.Snapshot,
 
3126
                OplogReplay: op.flags&flagLogReplay != 0,
 
3127
        }
 
3128
        if op.limit < 0 {
 
3129
                find.BatchSize = -op.limit
 
3130
                find.SingleBatch = true
 
3131
        } else {
 
3132
                find.BatchSize = op.limit
 
3133
        }
 
3134
 
 
3135
        explain := op.options.Explain
 
3136
 
 
3137
        op.collection = op.collection[:nameDot] + ".$cmd"
 
3138
        op.query = &find
 
3139
        op.skip = 0
 
3140
        op.limit = -1
 
3141
        op.options = queryWrapper{}
 
3142
        op.hasOptions = false
 
3143
 
 
3144
        if explain {
 
3145
                op.query = bson.D{{"explain", op.query}}
 
3146
                return false
 
3147
        }
 
3148
        return true
 
3149
}
 
3150
 
 
3151
type cursorData struct {
 
3152
        FirstBatch []bson.Raw "firstBatch"
 
3153
        NextBatch  []bson.Raw "nextBatch"
 
3154
        NS         string
 
3155
        Id         int64
 
3156
}
 
3157
 
 
3158
// findCmd holds the command used for performing queries on MongoDB 3.2+.
 
3159
//
 
3160
// Relevant documentation:
 
3161
//
 
3162
//     https://docs.mongodb.org/master/reference/command/find/#dbcmd.find
 
3163
//
 
3164
type findCmd struct {
 
3165
        Collection          string      `bson:"find"`
 
3166
        Filter              interface{} `bson:"filter,omitempty"`
 
3167
        Sort                interface{} `bson:"sort,omitempty"`
 
3168
        Projection          interface{} `bson:"projection,omitempty"`
 
3169
        Hint                interface{} `bson:"hint,omitempty"`
 
3170
        Skip                interface{} `bson:"skip,omitempty"`
 
3171
        Limit               int32       `bson:"limit,omitempty"`
 
3172
        BatchSize           int32       `bson:"batchSize,omitempty"`
 
3173
        SingleBatch         bool        `bson:"singleBatch,omitempty"`
 
3174
        Comment             string      `bson:"comment,omitempty"`
 
3175
        MaxScan             int         `bson:"maxScan,omitempty"`
 
3176
        MaxTimeMS           int         `bson:"maxTimeMS,omitempty"`
 
3177
        ReadConcern         interface{} `bson:"readConcern,omitempty"`
 
3178
        Max                 interface{} `bson:"max,omitempty"`
 
3179
        Min                 interface{} `bson:"min,omitempty"`
 
3180
        ReturnKey           bool        `bson:"returnKey,omitempty"`
 
3181
        ShowRecordId        bool        `bson:"showRecordId,omitempty"`
 
3182
        Snapshot            bool        `bson:"snapshot,omitempty"`
 
3183
        Tailable            bool        `bson:"tailable,omitempty"`
 
3184
        AwaitData           bool        `bson:"awaitData,omitempty"`
 
3185
        OplogReplay         bool        `bson:"oplogReplay,omitempty"`
 
3186
        NoCursorTimeout     bool        `bson:"noCursorTimeout,omitempty"`
 
3187
        AllowPartialResults bool        `bson:"allowPartialResults,omitempty"`
 
3188
}
 
3189
 
 
3190
// getMoreCmd holds the command used for requesting more query results on MongoDB 3.2+.
 
3191
//
 
3192
// Relevant documentation:
 
3193
//
 
3194
//     https://docs.mongodb.org/master/reference/command/getMore/#dbcmd.getMore
 
3195
//
 
3196
type getMoreCmd struct {
 
3197
        CursorId   int64  `bson:"getMore"`
 
3198
        Collection string `bson:"collection"`
 
3199
        BatchSize  int32  `bson:"batchSize,omitempty"`
 
3200
        MaxTimeMS  int64  `bson:"maxTimeMS,omitempty"`
 
3201
}
 
3202
 
 
3203
// run duplicates the behavior of collection.Find(query).One(&result)
 
3204
// as performed by Database.Run, specializing the logic for running
 
3205
// database commands on a given socket.
 
3206
func (db *Database) run(socket *mongoSocket, cmd, result interface{}) (err error) {
 
3207
        // Database.Run:
 
3208
        if name, ok := cmd.(string); ok {
 
3209
                cmd = bson.D{{name, 1}}
 
3210
        }
 
3211
 
 
3212
        // Collection.Find:
 
3213
        session := db.Session
 
3214
        session.m.RLock()
 
3215
        op := session.queryConfig.op // Copy.
 
3216
        session.m.RUnlock()
 
3217
        op.query = cmd
 
3218
        op.collection = db.Name + ".$cmd"
 
3219
 
 
3220
        // Query.One:
 
3221
        session.prepareQuery(&op)
 
3222
        op.limit = -1
 
3223
 
 
3224
        data, err := socket.SimpleQuery(&op)
 
3225
        if err != nil {
 
3226
                return err
 
3227
        }
 
3228
        if data == nil {
 
3229
                return ErrNotFound
 
3230
        }
 
3231
        if result != nil {
 
3232
                err = bson.Unmarshal(data, result)
 
3233
                if err == nil {
 
3234
                        var res bson.M
 
3235
                        bson.Unmarshal(data, &res)
 
3236
                        debugf("Run command unmarshaled: %#v, result: %#v", op, res)
 
3237
                } else {
 
3238
                        debugf("Run command unmarshaling failed: %#v", op, err)
 
3239
                        return err
 
3240
                }
 
3241
        }
 
3242
        return checkQueryError(op.collection, data)
 
3243
}
 
3244
 
 
3245
// The DBRef type implements support for the database reference MongoDB
 
3246
// convention as supported by multiple drivers.  This convention enables
 
3247
// cross-referencing documents between collections and databases using
 
3248
// a structure which includes a collection name, a document id, and
 
3249
// optionally a database name.
 
3250
//
 
3251
// See the FindRef methods on Session and on Database.
 
3252
//
 
3253
// Relevant documentation:
 
3254
//
 
3255
//     http://www.mongodb.org/display/DOCS/Database+References
 
3256
//
 
3257
type DBRef struct {
 
3258
        Collection string      `bson:"$ref"`
 
3259
        Id         interface{} `bson:"$id"`
 
3260
        Database   string      `bson:"$db,omitempty"`
 
3261
}
 
3262
 
 
3263
// NOTE: Order of fields for DBRef above does matter, per documentation.
 
3264
 
 
3265
// FindRef returns a query that looks for the document in the provided
 
3266
// reference. If the reference includes the DB field, the document will
 
3267
// be retrieved from the respective database.
 
3268
//
 
3269
// See also the DBRef type and the FindRef method on Session.
 
3270
//
 
3271
// Relevant documentation:
 
3272
//
 
3273
//     http://www.mongodb.org/display/DOCS/Database+References
 
3274
//
 
3275
func (db *Database) FindRef(ref *DBRef) *Query {
 
3276
        var c *Collection
 
3277
        if ref.Database == "" {
 
3278
                c = db.C(ref.Collection)
 
3279
        } else {
 
3280
                c = db.Session.DB(ref.Database).C(ref.Collection)
 
3281
        }
 
3282
        return c.FindId(ref.Id)
 
3283
}
 
3284
 
 
3285
// FindRef returns a query that looks for the document in the provided
 
3286
// reference. For a DBRef to be resolved correctly at the session level
 
3287
// it must necessarily have the optional DB field defined.
 
3288
//
 
3289
// See also the DBRef type and the FindRef method on Database.
 
3290
//
 
3291
// Relevant documentation:
 
3292
//
 
3293
//     http://www.mongodb.org/display/DOCS/Database+References
 
3294
//
 
3295
func (s *Session) FindRef(ref *DBRef) *Query {
 
3296
        if ref.Database == "" {
 
3297
                panic(errors.New(fmt.Sprintf("Can't resolve database for %#v", ref)))
 
3298
        }
 
3299
        c := s.DB(ref.Database).C(ref.Collection)
 
3300
        return c.FindId(ref.Id)
 
3301
}
 
3302
 
 
3303
// CollectionNames returns the collection names present in the db database.
 
3304
func (db *Database) CollectionNames() (names []string, err error) {
 
3305
        // Clone session and set it to Monotonic mode so that the server
 
3306
        // used for the query may be safely obtained afterwards, if
 
3307
        // necessary for iteration when a cursor is received.
 
3308
        cloned := db.Session.nonEventual()
 
3309
        defer cloned.Close()
 
3310
 
 
3311
        batchSize := int(cloned.queryConfig.op.limit)
 
3312
 
 
3313
        // Try with a command.
 
3314
        var result struct {
 
3315
                Collections []bson.Raw
 
3316
                Cursor      cursorData
 
3317
        }
 
3318
        err = db.With(cloned).Run(bson.D{{"listCollections", 1}, {"cursor", bson.D{{"batchSize", batchSize}}}}, &result)
 
3319
        if err == nil {
 
3320
                firstBatch := result.Collections
 
3321
                if firstBatch == nil {
 
3322
                        firstBatch = result.Cursor.FirstBatch
 
3323
                }
 
3324
                var iter *Iter
 
3325
                ns := strings.SplitN(result.Cursor.NS, ".", 2)
 
3326
                if len(ns) < 2 {
 
3327
                        iter = db.With(cloned).C("").NewIter(nil, firstBatch, result.Cursor.Id, nil)
 
3328
                } else {
 
3329
                        iter = cloned.DB(ns[0]).C(ns[1]).NewIter(nil, firstBatch, result.Cursor.Id, nil)
 
3330
                }
 
3331
                var coll struct{ Name string }
 
3332
                for iter.Next(&coll) {
 
3333
                        names = append(names, coll.Name)
 
3334
                }
 
3335
                if err := iter.Close(); err != nil {
 
3336
                        return nil, err
 
3337
                }
 
3338
                sort.Strings(names)
 
3339
                return names, err
 
3340
        }
 
3341
        if err != nil && !isNoCmd(err) {
 
3342
                return nil, err
 
3343
        }
 
3344
 
 
3345
        // Command not yet supported. Query the database instead.
 
3346
        nameIndex := len(db.Name) + 1
 
3347
        iter := db.C("system.namespaces").Find(nil).Iter()
 
3348
        var coll struct{ Name string }
 
3349
        for iter.Next(&coll) {
 
3350
                if strings.Index(coll.Name, "$") < 0 || strings.Index(coll.Name, ".oplog.$") >= 0 {
 
3351
                        names = append(names, coll.Name[nameIndex:])
 
3352
                }
 
3353
        }
 
3354
        if err := iter.Close(); err != nil {
 
3355
                return nil, err
 
3356
        }
 
3357
        sort.Strings(names)
 
3358
        return names, nil
 
3359
}
 
3360
 
 
3361
type dbNames struct {
 
3362
        Databases []struct {
 
3363
                Name  string
 
3364
                Empty bool
 
3365
        }
 
3366
}
 
3367
 
 
3368
// DatabaseNames returns the names of non-empty databases present in the cluster.
 
3369
func (s *Session) DatabaseNames() (names []string, err error) {
 
3370
        var result dbNames
 
3371
        err = s.Run("listDatabases", &result)
 
3372
        if err != nil {
 
3373
                return nil, err
 
3374
        }
 
3375
        for _, db := range result.Databases {
 
3376
                if !db.Empty {
 
3377
                        names = append(names, db.Name)
 
3378
                }
 
3379
        }
 
3380
        sort.Strings(names)
 
3381
        return names, nil
 
3382
}
 
3383
 
 
3384
// Iter executes the query and returns an iterator capable of going over all
 
3385
// the results. Results will be returned in batches of configurable
 
3386
// size (see the Batch method) and more documents will be requested when a
 
3387
// configurable number of documents is iterated over (see the Prefetch method).
 
3388
func (q *Query) Iter() *Iter {
 
3389
        q.m.Lock()
 
3390
        session := q.session
 
3391
        op := q.op
 
3392
        prefetch := q.prefetch
 
3393
        limit := q.limit
 
3394
        q.m.Unlock()
 
3395
 
 
3396
        iter := &Iter{
 
3397
                session:  session,
 
3398
                prefetch: prefetch,
 
3399
                limit:    limit,
 
3400
                timeout:  -1,
 
3401
        }
 
3402
        iter.gotReply.L = &iter.m
 
3403
        iter.op.collection = op.collection
 
3404
        iter.op.limit = op.limit
 
3405
        iter.op.replyFunc = iter.replyFunc()
 
3406
        iter.docsToReceive++
 
3407
 
 
3408
        socket, err := session.acquireSocket(true)
 
3409
        if err != nil {
 
3410
                iter.err = err
 
3411
                return iter
 
3412
        }
 
3413
        defer socket.Release()
 
3414
 
 
3415
        session.prepareQuery(&op)
 
3416
        op.replyFunc = iter.op.replyFunc
 
3417
 
 
3418
        if prepareFindOp(socket, &op, limit) {
 
3419
                iter.findCmd = true
 
3420
        }
 
3421
 
 
3422
        iter.server = socket.Server()
 
3423
        err = socket.Query(&op)
 
3424
        if err != nil {
 
3425
                // Must lock as the query is already out and it may call replyFunc.
 
3426
                iter.m.Lock()
 
3427
                iter.err = err
 
3428
                iter.m.Unlock()
 
3429
        }
 
3430
 
 
3431
        return iter
 
3432
}
 
3433
 
 
3434
// Tail returns a tailable iterator. Unlike a normal iterator, a
 
3435
// tailable iterator may wait for new values to be inserted in the
 
3436
// collection once the end of the current result set is reached,
 
3437
// A tailable iterator may only be used with capped collections.
 
3438
//
 
3439
// The timeout parameter indicates how long Next will block waiting
 
3440
// for a result before timing out.  If set to -1, Next will not
 
3441
// timeout, and will continue waiting for a result for as long as
 
3442
// the cursor is valid and the session is not closed. If set to 0,
 
3443
// Next times out as soon as it reaches the end of the result set.
 
3444
// Otherwise, Next will wait for at least the given number of
 
3445
// seconds for a new document to be available before timing out.
 
3446
//
 
3447
// On timeouts, Next will unblock and return false, and the Timeout
 
3448
// method will return true if called. In these cases, Next may still
 
3449
// be called again on the same iterator to check if a new value is
 
3450
// available at the current cursor position, and again it will block
 
3451
// according to the specified timeoutSecs. If the cursor becomes
 
3452
// invalid, though, both Next and Timeout will return false and
 
3453
// the query must be restarted.
 
3454
//
 
3455
// The following example demonstrates timeout handling and query
 
3456
// restarting:
 
3457
//
 
3458
//    iter := collection.Find(nil).Sort("$natural").Tail(5 * time.Second)
 
3459
//    for {
 
3460
//         for iter.Next(&result) {
 
3461
//             fmt.Println(result.Id)
 
3462
//             lastId = result.Id
 
3463
//         }
 
3464
//         if iter.Err() != nil {
 
3465
//             return iter.Close()
 
3466
//         }
 
3467
//         if iter.Timeout() {
 
3468
//             continue
 
3469
//         }
 
3470
//         query := collection.Find(bson.M{"_id": bson.M{"$gt": lastId}})
 
3471
//         iter = query.Sort("$natural").Tail(5 * time.Second)
 
3472
//    }
 
3473
//    iter.Close()
 
3474
//
 
3475
// Relevant documentation:
 
3476
//
 
3477
//     http://www.mongodb.org/display/DOCS/Tailable+Cursors
 
3478
//     http://www.mongodb.org/display/DOCS/Capped+Collections
 
3479
//     http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
 
3480
//
 
3481
func (q *Query) Tail(timeout time.Duration) *Iter {
 
3482
        q.m.Lock()
 
3483
        session := q.session
 
3484
        op := q.op
 
3485
        prefetch := q.prefetch
 
3486
        q.m.Unlock()
 
3487
 
 
3488
        iter := &Iter{session: session, prefetch: prefetch}
 
3489
        iter.gotReply.L = &iter.m
 
3490
        iter.timeout = timeout
 
3491
        iter.op.collection = op.collection
 
3492
        iter.op.limit = op.limit
 
3493
        iter.op.replyFunc = iter.replyFunc()
 
3494
        iter.docsToReceive++
 
3495
        session.prepareQuery(&op)
 
3496
        op.replyFunc = iter.op.replyFunc
 
3497
        op.flags |= flagTailable | flagAwaitData
 
3498
 
 
3499
        socket, err := session.acquireSocket(true)
 
3500
        if err != nil {
 
3501
                iter.err = err
 
3502
        } else {
 
3503
                iter.server = socket.Server()
 
3504
                err = socket.Query(&op)
 
3505
                if err != nil {
 
3506
                        // Must lock as the query is already out and it may call replyFunc.
 
3507
                        iter.m.Lock()
 
3508
                        iter.err = err
 
3509
                        iter.m.Unlock()
 
3510
                }
 
3511
                socket.Release()
 
3512
        }
 
3513
        return iter
 
3514
}
 
3515
 
 
3516
func (s *Session) prepareQuery(op *queryOp) {
 
3517
        s.m.RLock()
 
3518
        op.mode = s.consistency
 
3519
        if s.slaveOk {
 
3520
                op.flags |= flagSlaveOk
 
3521
        }
 
3522
        s.m.RUnlock()
 
3523
        return
 
3524
}
 
3525
 
 
3526
// Err returns nil if no errors happened during iteration, or the actual
 
3527
// error otherwise.
 
3528
//
 
3529
// In case a resulting document included a field named $err or errmsg, which are
 
3530
// standard ways for MongoDB to report an improper query, the returned value has
 
3531
// a *QueryError type, and includes the Err message and the Code.
 
3532
func (iter *Iter) Err() error {
 
3533
        iter.m.Lock()
 
3534
        err := iter.err
 
3535
        iter.m.Unlock()
 
3536
        if err == ErrNotFound {
 
3537
                return nil
 
3538
        }
 
3539
        return err
 
3540
}
 
3541
 
 
3542
// Close kills the server cursor used by the iterator, if any, and returns
 
3543
// nil if no errors happened during iteration, or the actual error otherwise.
 
3544
//
 
3545
// Server cursors are automatically closed at the end of an iteration, which
 
3546
// means close will do nothing unless the iteration was interrupted before
 
3547
// the server finished sending results to the driver. If Close is not called
 
3548
// in such a situation, the cursor will remain available at the server until
 
3549
// the default cursor timeout period is reached. No further problems arise.
 
3550
//
 
3551
// Close is idempotent. That means it can be called repeatedly and will
 
3552
// return the same result every time.
 
3553
//
 
3554
// In case a resulting document included a field named $err or errmsg, which are
 
3555
// standard ways for MongoDB to report an improper query, the returned value has
 
3556
// a *QueryError type.
 
3557
func (iter *Iter) Close() error {
 
3558
        iter.m.Lock()
 
3559
        cursorId := iter.op.cursorId
 
3560
        iter.op.cursorId = 0
 
3561
        err := iter.err
 
3562
        iter.m.Unlock()
 
3563
        if cursorId == 0 {
 
3564
                if err == ErrNotFound {
 
3565
                        return nil
 
3566
                }
 
3567
                return err
 
3568
        }
 
3569
        socket, err := iter.acquireSocket()
 
3570
        if err == nil {
 
3571
                // TODO Batch kills.
 
3572
                err = socket.Query(&killCursorsOp{[]int64{cursorId}})
 
3573
                socket.Release()
 
3574
        }
 
3575
 
 
3576
        iter.m.Lock()
 
3577
        if err != nil && (iter.err == nil || iter.err == ErrNotFound) {
 
3578
                iter.err = err
 
3579
        } else if iter.err != ErrNotFound {
 
3580
                err = iter.err
 
3581
        }
 
3582
        iter.m.Unlock()
 
3583
        return err
 
3584
}
 
3585
 
 
3586
// Timeout returns true if Next returned false due to a timeout of
 
3587
// a tailable cursor. In those cases, Next may be called again to continue
 
3588
// the iteration at the previous cursor position.
 
3589
func (iter *Iter) Timeout() bool {
 
3590
        iter.m.Lock()
 
3591
        result := iter.timedout
 
3592
        iter.m.Unlock()
 
3593
        return result
 
3594
}
 
3595
 
 
3596
// Next retrieves the next document from the result set, blocking if necessary.
 
3597
// This method will also automatically retrieve another batch of documents from
 
3598
// the server when the current one is exhausted, or before that in background
 
3599
// if pre-fetching is enabled (see the Query.Prefetch and Session.SetPrefetch
 
3600
// methods).
 
3601
//
 
3602
// Next returns true if a document was successfully unmarshalled onto result,
 
3603
// and false at the end of the result set or if an error happened.
 
3604
// When Next returns false, the Err method should be called to verify if
 
3605
// there was an error during iteration.
 
3606
//
 
3607
// For example:
 
3608
//
 
3609
//    iter := collection.Find(nil).Iter()
 
3610
//    for iter.Next(&result) {
 
3611
//        fmt.Printf("Result: %v\n", result.Id)
 
3612
//    }
 
3613
//    if err := iter.Close(); err != nil {
 
3614
//        return err
 
3615
//    }
 
3616
//
 
3617
func (iter *Iter) Next(result interface{}) bool {
 
3618
        iter.m.Lock()
 
3619
        iter.timedout = false
 
3620
        timeout := time.Time{}
 
3621
        for iter.err == nil && iter.docData.Len() == 0 && (iter.docsToReceive > 0 || iter.op.cursorId != 0) {
 
3622
                if iter.docsToReceive == 0 {
 
3623
                        if iter.timeout >= 0 {
 
3624
                                if timeout.IsZero() {
 
3625
                                        timeout = time.Now().Add(iter.timeout)
 
3626
                                }
 
3627
                                if time.Now().After(timeout) {
 
3628
                                        iter.timedout = true
 
3629
                                        iter.m.Unlock()
 
3630
                                        return false
 
3631
                                }
 
3632
                        }
 
3633
                        iter.getMore()
 
3634
                        if iter.err != nil {
 
3635
                                break
 
3636
                        }
 
3637
                }
 
3638
                iter.gotReply.Wait()
 
3639
        }
 
3640
 
 
3641
        // Exhaust available data before reporting any errors.
 
3642
        if docData, ok := iter.docData.Pop().([]byte); ok {
 
3643
                close := false
 
3644
                if iter.limit > 0 {
 
3645
                        iter.limit--
 
3646
                        if iter.limit == 0 {
 
3647
                                if iter.docData.Len() > 0 {
 
3648
                                        iter.m.Unlock()
 
3649
                                        panic(fmt.Errorf("data remains after limit exhausted: %d", iter.docData.Len()))
 
3650
                                }
 
3651
                                iter.err = ErrNotFound
 
3652
                                close = true
 
3653
                        }
 
3654
                }
 
3655
                if iter.op.cursorId != 0 && iter.err == nil {
 
3656
                        iter.docsBeforeMore--
 
3657
                        if iter.docsBeforeMore == -1 {
 
3658
                                iter.getMore()
 
3659
                        }
 
3660
                }
 
3661
                iter.m.Unlock()
 
3662
 
 
3663
                if close {
 
3664
                        iter.Close()
 
3665
                }
 
3666
                err := bson.Unmarshal(docData, result)
 
3667
                if err != nil {
 
3668
                        debugf("Iter %p document unmarshaling failed: %#v", iter, err)
 
3669
                        iter.m.Lock()
 
3670
                        if iter.err == nil {
 
3671
                                iter.err = err
 
3672
                        }
 
3673
                        iter.m.Unlock()
 
3674
                        return false
 
3675
                }
 
3676
                debugf("Iter %p document unmarshaled: %#v", iter, result)
 
3677
                // XXX Only have to check first document for a query error?
 
3678
                err = checkQueryError(iter.op.collection, docData)
 
3679
                if err != nil {
 
3680
                        iter.m.Lock()
 
3681
                        if iter.err == nil {
 
3682
                                iter.err = err
 
3683
                        }
 
3684
                        iter.m.Unlock()
 
3685
                        return false
 
3686
                }
 
3687
                return true
 
3688
        } else if iter.err != nil {
 
3689
                debugf("Iter %p returning false: %s", iter, iter.err)
 
3690
                iter.m.Unlock()
 
3691
                return false
 
3692
        } else if iter.op.cursorId == 0 {
 
3693
                iter.err = ErrNotFound
 
3694
                debugf("Iter %p exhausted with cursor=0", iter)
 
3695
                iter.m.Unlock()
 
3696
                return false
 
3697
        }
 
3698
 
 
3699
        panic("unreachable")
 
3700
}
 
3701
 
 
3702
// All retrieves all documents from the result set into the provided slice
 
3703
// and closes the iterator.
 
3704
//
 
3705
// The result argument must necessarily be the address for a slice. The slice
 
3706
// may be nil or previously allocated.
 
3707
//
 
3708
// WARNING: Obviously, All must not be used with result sets that may be
 
3709
// potentially large, since it may consume all memory until the system
 
3710
// crashes. Consider building the query with a Limit clause to ensure the
 
3711
// result size is bounded.
 
3712
//
 
3713
// For instance:
 
3714
//
 
3715
//    var result []struct{ Value int }
 
3716
//    iter := collection.Find(nil).Limit(100).Iter()
 
3717
//    err := iter.All(&result)
 
3718
//    if err != nil {
 
3719
//        return err
 
3720
//    }
 
3721
//
 
3722
func (iter *Iter) All(result interface{}) error {
 
3723
        resultv := reflect.ValueOf(result)
 
3724
        if resultv.Kind() != reflect.Ptr || resultv.Elem().Kind() != reflect.Slice {
 
3725
                panic("result argument must be a slice address")
 
3726
        }
 
3727
        slicev := resultv.Elem()
 
3728
        slicev = slicev.Slice(0, slicev.Cap())
 
3729
        elemt := slicev.Type().Elem()
 
3730
        i := 0
 
3731
        for {
 
3732
                if slicev.Len() == i {
 
3733
                        elemp := reflect.New(elemt)
 
3734
                        if !iter.Next(elemp.Interface()) {
 
3735
                                break
 
3736
                        }
 
3737
                        slicev = reflect.Append(slicev, elemp.Elem())
 
3738
                        slicev = slicev.Slice(0, slicev.Cap())
 
3739
                } else {
 
3740
                        if !iter.Next(slicev.Index(i).Addr().Interface()) {
 
3741
                                break
 
3742
                        }
 
3743
                }
 
3744
                i++
 
3745
        }
 
3746
        resultv.Elem().Set(slicev.Slice(0, i))
 
3747
        return iter.Close()
 
3748
}
 
3749
 
 
3750
// All works like Iter.All.
 
3751
func (q *Query) All(result interface{}) error {
 
3752
        return q.Iter().All(result)
 
3753
}
 
3754
 
 
3755
// The For method is obsolete and will be removed in a future release.
 
3756
// See Iter as an elegant replacement.
 
3757
func (q *Query) For(result interface{}, f func() error) error {
 
3758
        return q.Iter().For(result, f)
 
3759
}
 
3760
 
 
3761
// The For method is obsolete and will be removed in a future release.
 
3762
// See Iter as an elegant replacement.
 
3763
func (iter *Iter) For(result interface{}, f func() error) (err error) {
 
3764
        valid := false
 
3765
        v := reflect.ValueOf(result)
 
3766
        if v.Kind() == reflect.Ptr {
 
3767
                v = v.Elem()
 
3768
                switch v.Kind() {
 
3769
                case reflect.Map, reflect.Ptr, reflect.Interface, reflect.Slice:
 
3770
                        valid = v.IsNil()
 
3771
                }
 
3772
        }
 
3773
        if !valid {
 
3774
                panic("For needs a pointer to nil reference value.  See the documentation.")
 
3775
        }
 
3776
        zero := reflect.Zero(v.Type())
 
3777
        for {
 
3778
                v.Set(zero)
 
3779
                if !iter.Next(result) {
 
3780
                        break
 
3781
                }
 
3782
                err = f()
 
3783
                if err != nil {
 
3784
                        return err
 
3785
                }
 
3786
        }
 
3787
        return iter.Err()
 
3788
}
 
3789
 
 
3790
// acquireSocket acquires a socket from the same server that the iterator
 
3791
// cursor was obtained from.
 
3792
//
 
3793
// WARNING: This method must not be called with iter.m locked. Acquiring the
 
3794
// socket depends on the cluster sync loop, and the cluster sync loop might
 
3795
// attempt actions which cause replyFunc to be called, inducing a deadlock.
 
3796
func (iter *Iter) acquireSocket() (*mongoSocket, error) {
 
3797
        socket, err := iter.session.acquireSocket(true)
 
3798
        if err != nil {
 
3799
                return nil, err
 
3800
        }
 
3801
        if socket.Server() != iter.server {
 
3802
                // Socket server changed during iteration. This may happen
 
3803
                // with Eventual sessions, if a Refresh is done, or if a
 
3804
                // monotonic session gets a write and shifts from secondary
 
3805
                // to primary. Our cursor is in a specific server, though.
 
3806
                iter.session.m.Lock()
 
3807
                sockTimeout := iter.session.sockTimeout
 
3808
                iter.session.m.Unlock()
 
3809
                socket.Release()
 
3810
                socket, _, err = iter.server.AcquireSocket(0, sockTimeout)
 
3811
                if err != nil {
 
3812
                        return nil, err
 
3813
                }
 
3814
                err := iter.session.socketLogin(socket)
 
3815
                if err != nil {
 
3816
                        socket.Release()
 
3817
                        return nil, err
 
3818
                }
 
3819
        }
 
3820
        return socket, nil
 
3821
}
 
3822
 
 
3823
func (iter *Iter) getMore() {
 
3824
        // Increment now so that unlocking the iterator won't cause a
 
3825
        // different goroutine to get here as well.
 
3826
        iter.docsToReceive++
 
3827
        iter.m.Unlock()
 
3828
        socket, err := iter.acquireSocket()
 
3829
        iter.m.Lock()
 
3830
        if err != nil {
 
3831
                iter.err = err
 
3832
                return
 
3833
        }
 
3834
        defer socket.Release()
 
3835
 
 
3836
        debugf("Iter %p requesting more documents", iter)
 
3837
        if iter.limit > 0 {
 
3838
                // The -1 below accounts for the fact docsToReceive was incremented above.
 
3839
                limit := iter.limit - int32(iter.docsToReceive-1) - int32(iter.docData.Len())
 
3840
                if limit < iter.op.limit {
 
3841
                        iter.op.limit = limit
 
3842
                }
 
3843
        }
 
3844
        var op interface{}
 
3845
        if iter.findCmd {
 
3846
                op = iter.getMoreCmd()
 
3847
        } else {
 
3848
                op = &iter.op
 
3849
        }
 
3850
        if err := socket.Query(op); err != nil {
 
3851
                iter.docsToReceive--
 
3852
                iter.err = err
 
3853
        }
 
3854
}
 
3855
 
 
3856
func (iter *Iter) getMoreCmd() *queryOp {
 
3857
        // TODO: Define the query statically in the Iter type, next to getMoreOp.
 
3858
        nameDot := strings.Index(iter.op.collection, ".")
 
3859
        if nameDot < 0 {
 
3860
                panic("invalid query collection name: " + iter.op.collection)
 
3861
        }
 
3862
 
 
3863
        getMore := getMoreCmd{
 
3864
                CursorId:   iter.op.cursorId,
 
3865
                Collection: iter.op.collection[nameDot+1:],
 
3866
                BatchSize:  iter.op.limit,
 
3867
        }
 
3868
 
 
3869
        var op queryOp
 
3870
        op.collection = iter.op.collection[:nameDot] + ".$cmd"
 
3871
        op.query = &getMore
 
3872
        op.limit = -1
 
3873
        op.replyFunc = iter.op.replyFunc
 
3874
        return &op
 
3875
}
 
3876
 
 
3877
type countCmd struct {
 
3878
        Count string
 
3879
        Query interface{}
 
3880
        Limit int32 ",omitempty"
 
3881
        Skip  int32 ",omitempty"
 
3882
}
 
3883
 
 
3884
// Count returns the total number of documents in the result set.
 
3885
func (q *Query) Count() (n int, err error) {
 
3886
        q.m.Lock()
 
3887
        session := q.session
 
3888
        op := q.op
 
3889
        limit := q.limit
 
3890
        q.m.Unlock()
 
3891
 
 
3892
        c := strings.Index(op.collection, ".")
 
3893
        if c < 0 {
 
3894
                return 0, errors.New("Bad collection name: " + op.collection)
 
3895
        }
 
3896
 
 
3897
        dbname := op.collection[:c]
 
3898
        cname := op.collection[c+1:]
 
3899
        query := op.query
 
3900
        if query == nil {
 
3901
                query = bson.D{}
 
3902
        }
 
3903
        result := struct{ N int }{}
 
3904
        err = session.DB(dbname).Run(countCmd{cname, query, limit, op.skip}, &result)
 
3905
        return result.N, err
 
3906
}
 
3907
 
 
3908
// Count returns the total number of documents in the collection.
 
3909
func (c *Collection) Count() (n int, err error) {
 
3910
        return c.Find(nil).Count()
 
3911
}
 
3912
 
 
3913
type distinctCmd struct {
 
3914
        Collection string "distinct"
 
3915
        Key        string
 
3916
        Query      interface{} ",omitempty"
 
3917
}
 
3918
 
 
3919
// Distinct unmarshals into result the list of distinct values for the given key.
 
3920
//
 
3921
// For example:
 
3922
//
 
3923
//     var result []int
 
3924
//     err := collection.Find(bson.M{"gender": "F"}).Distinct("age", &result)
 
3925
//
 
3926
// Relevant documentation:
 
3927
//
 
3928
//     http://www.mongodb.org/display/DOCS/Aggregation
 
3929
//
 
3930
func (q *Query) Distinct(key string, result interface{}) error {
 
3931
        q.m.Lock()
 
3932
        session := q.session
 
3933
        op := q.op // Copy.
 
3934
        q.m.Unlock()
 
3935
 
 
3936
        c := strings.Index(op.collection, ".")
 
3937
        if c < 0 {
 
3938
                return errors.New("Bad collection name: " + op.collection)
 
3939
        }
 
3940
 
 
3941
        dbname := op.collection[:c]
 
3942
        cname := op.collection[c+1:]
 
3943
 
 
3944
        var doc struct{ Values bson.Raw }
 
3945
        err := session.DB(dbname).Run(distinctCmd{cname, key, op.query}, &doc)
 
3946
        if err != nil {
 
3947
                return err
 
3948
        }
 
3949
        return doc.Values.Unmarshal(result)
 
3950
}
 
3951
 
 
3952
type mapReduceCmd struct {
 
3953
        Collection string "mapreduce"
 
3954
        Map        string ",omitempty"
 
3955
        Reduce     string ",omitempty"
 
3956
        Finalize   string ",omitempty"
 
3957
        Limit      int32  ",omitempty"
 
3958
        Out        interface{}
 
3959
        Query      interface{} ",omitempty"
 
3960
        Sort       interface{} ",omitempty"
 
3961
        Scope      interface{} ",omitempty"
 
3962
        Verbose    bool        ",omitempty"
 
3963
}
 
3964
 
 
3965
type mapReduceResult struct {
 
3966
        Results    bson.Raw
 
3967
        Result     bson.Raw
 
3968
        TimeMillis int64 "timeMillis"
 
3969
        Counts     struct{ Input, Emit, Output int }
 
3970
        Ok         bool
 
3971
        Err        string
 
3972
        Timing     *MapReduceTime
 
3973
}
 
3974
 
 
3975
type MapReduce struct {
 
3976
        Map      string      // Map Javascript function code (required)
 
3977
        Reduce   string      // Reduce Javascript function code (required)
 
3978
        Finalize string      // Finalize Javascript function code (optional)
 
3979
        Out      interface{} // Output collection name or document. If nil, results are inlined into the result parameter.
 
3980
        Scope    interface{} // Optional global scope for Javascript functions
 
3981
        Verbose  bool
 
3982
}
 
3983
 
 
3984
type MapReduceInfo struct {
 
3985
        InputCount  int            // Number of documents mapped
 
3986
        EmitCount   int            // Number of times reduce called emit
 
3987
        OutputCount int            // Number of documents in resulting collection
 
3988
        Database    string         // Output database, if results are not inlined
 
3989
        Collection  string         // Output collection, if results are not inlined
 
3990
        Time        int64          // Time to run the job, in nanoseconds
 
3991
        VerboseTime *MapReduceTime // Only defined if Verbose was true
 
3992
}
 
3993
 
 
3994
type MapReduceTime struct {
 
3995
        Total    int64 // Total time, in nanoseconds
 
3996
        Map      int64 "mapTime"  // Time within map function, in nanoseconds
 
3997
        EmitLoop int64 "emitLoop" // Time within the emit/map loop, in nanoseconds
 
3998
}
 
3999
 
 
4000
// MapReduce executes a map/reduce job for documents covered by the query.
 
4001
// That kind of job is suitable for very flexible bulk aggregation of data
 
4002
// performed at the server side via Javascript functions.
 
4003
//
 
4004
// Results from the job may be returned as a result of the query itself
 
4005
// through the result parameter in case they'll certainly fit in memory
 
4006
// and in a single document.  If there's the possibility that the amount
 
4007
// of data might be too large, results must be stored back in an alternative
 
4008
// collection or even a separate database, by setting the Out field of the
 
4009
// provided MapReduce job.  In that case, provide nil as the result parameter.
 
4010
//
 
4011
// These are some of the ways to set Out:
 
4012
//
 
4013
//     nil
 
4014
//         Inline results into the result parameter.
 
4015
//
 
4016
//     bson.M{"replace": "mycollection"}
 
4017
//         The output will be inserted into a collection which replaces any
 
4018
//         existing collection with the same name.
 
4019
//
 
4020
//     bson.M{"merge": "mycollection"}
 
4021
//         This option will merge new data into the old output collection. In
 
4022
//         other words, if the same key exists in both the result set and the
 
4023
//         old collection, the new key will overwrite the old one.
 
4024
//
 
4025
//     bson.M{"reduce": "mycollection"}
 
4026
//         If documents exist for a given key in the result set and in the old
 
4027
//         collection, then a reduce operation (using the specified reduce
 
4028
//         function) will be performed on the two values and the result will be
 
4029
//         written to the output collection. If a finalize function was
 
4030
//         provided, this will be run after the reduce as well.
 
4031
//
 
4032
//     bson.M{...., "db": "mydb"}
 
4033
//         Any of the above options can have the "db" key included for doing
 
4034
//         the respective action in a separate database.
 
4035
//
 
4036
// The following is a trivial example which will count the number of
 
4037
// occurrences of a field named n on each document in a collection, and
 
4038
// will return results inline:
 
4039
//
 
4040
//     job := &mgo.MapReduce{
 
4041
//             Map:      "function() { emit(this.n, 1) }",
 
4042
//             Reduce:   "function(key, values) { return Array.sum(values) }",
 
4043
//     }
 
4044
//     var result []struct { Id int "_id"; Value int }
 
4045
//     _, err := collection.Find(nil).MapReduce(job, &result)
 
4046
//     if err != nil {
 
4047
//         return err
 
4048
//     }
 
4049
//     for _, item := range result {
 
4050
//         fmt.Println(item.Value)
 
4051
//     }
 
4052
//
 
4053
// This function is compatible with MongoDB 1.7.4+.
 
4054
//
 
4055
// Relevant documentation:
 
4056
//
 
4057
//     http://www.mongodb.org/display/DOCS/MapReduce
 
4058
//
 
4059
func (q *Query) MapReduce(job *MapReduce, result interface{}) (info *MapReduceInfo, err error) {
 
4060
        q.m.Lock()
 
4061
        session := q.session
 
4062
        op := q.op // Copy.
 
4063
        limit := q.limit
 
4064
        q.m.Unlock()
 
4065
 
 
4066
        c := strings.Index(op.collection, ".")
 
4067
        if c < 0 {
 
4068
                return nil, errors.New("Bad collection name: " + op.collection)
 
4069
        }
 
4070
 
 
4071
        dbname := op.collection[:c]
 
4072
        cname := op.collection[c+1:]
 
4073
 
 
4074
        cmd := mapReduceCmd{
 
4075
                Collection: cname,
 
4076
                Map:        job.Map,
 
4077
                Reduce:     job.Reduce,
 
4078
                Finalize:   job.Finalize,
 
4079
                Out:        fixMROut(job.Out),
 
4080
                Scope:      job.Scope,
 
4081
                Verbose:    job.Verbose,
 
4082
                Query:      op.query,
 
4083
                Sort:       op.options.OrderBy,
 
4084
                Limit:      limit,
 
4085
        }
 
4086
 
 
4087
        if cmd.Out == nil {
 
4088
                cmd.Out = bson.D{{"inline", 1}}
 
4089
        }
 
4090
 
 
4091
        var doc mapReduceResult
 
4092
        err = session.DB(dbname).Run(&cmd, &doc)
 
4093
        if err != nil {
 
4094
                return nil, err
 
4095
        }
 
4096
        if doc.Err != "" {
 
4097
                return nil, errors.New(doc.Err)
 
4098
        }
 
4099
 
 
4100
        info = &MapReduceInfo{
 
4101
                InputCount:  doc.Counts.Input,
 
4102
                EmitCount:   doc.Counts.Emit,
 
4103
                OutputCount: doc.Counts.Output,
 
4104
                Time:        doc.TimeMillis * 1e6,
 
4105
        }
 
4106
 
 
4107
        if doc.Result.Kind == 0x02 {
 
4108
                err = doc.Result.Unmarshal(&info.Collection)
 
4109
                info.Database = dbname
 
4110
        } else if doc.Result.Kind == 0x03 {
 
4111
                var v struct{ Collection, Db string }
 
4112
                err = doc.Result.Unmarshal(&v)
 
4113
                info.Collection = v.Collection
 
4114
                info.Database = v.Db
 
4115
        }
 
4116
 
 
4117
        if doc.Timing != nil {
 
4118
                info.VerboseTime = doc.Timing
 
4119
                info.VerboseTime.Total *= 1e6
 
4120
                info.VerboseTime.Map *= 1e6
 
4121
                info.VerboseTime.EmitLoop *= 1e6
 
4122
        }
 
4123
 
 
4124
        if err != nil {
 
4125
                return nil, err
 
4126
        }
 
4127
        if result != nil {
 
4128
                return info, doc.Results.Unmarshal(result)
 
4129
        }
 
4130
        return info, nil
 
4131
}
 
4132
 
 
4133
// The "out" option in the MapReduce command must be ordered. This was
 
4134
// found after the implementation was accepting maps for a long time,
 
4135
// so rather than breaking the API, we'll fix the order if necessary.
 
4136
// Details about the order requirement may be seen in MongoDB's code:
 
4137
//
 
4138
//     http://goo.gl/L8jwJX
 
4139
//
 
4140
func fixMROut(out interface{}) interface{} {
 
4141
        outv := reflect.ValueOf(out)
 
4142
        if outv.Kind() != reflect.Map || outv.Type().Key() != reflect.TypeOf("") {
 
4143
                return out
 
4144
        }
 
4145
        outs := make(bson.D, outv.Len())
 
4146
 
 
4147
        outTypeIndex := -1
 
4148
        for i, k := range outv.MapKeys() {
 
4149
                ks := k.String()
 
4150
                outs[i].Name = ks
 
4151
                outs[i].Value = outv.MapIndex(k).Interface()
 
4152
                switch ks {
 
4153
                case "normal", "replace", "merge", "reduce", "inline":
 
4154
                        outTypeIndex = i
 
4155
                }
 
4156
        }
 
4157
        if outTypeIndex > 0 {
 
4158
                outs[0], outs[outTypeIndex] = outs[outTypeIndex], outs[0]
 
4159
        }
 
4160
        return outs
 
4161
}
 
4162
 
 
4163
// Change holds fields for running a findAndModify MongoDB command via
 
4164
// the Query.Apply method.
 
4165
type Change struct {
 
4166
        Update    interface{} // The update document
 
4167
        Upsert    bool        // Whether to insert in case the document isn't found
 
4168
        Remove    bool        // Whether to remove the document found rather than updating
 
4169
        ReturnNew bool        // Should the modified document be returned rather than the old one
 
4170
}
 
4171
 
 
4172
type findModifyCmd struct {
 
4173
        Collection                  string      "findAndModify"
 
4174
        Query, Update, Sort, Fields interface{} ",omitempty"
 
4175
        Upsert, Remove, New         bool        ",omitempty"
 
4176
}
 
4177
 
 
4178
type valueResult struct {
 
4179
        Value     bson.Raw
 
4180
        LastError LastError "lastErrorObject"
 
4181
}
 
4182
 
 
4183
// Apply runs the findAndModify MongoDB command, which allows updating, upserting
 
4184
// or removing a document matching a query and atomically returning either the old
 
4185
// version (the default) or the new version of the document (when ReturnNew is true).
 
4186
// If no objects are found Apply returns ErrNotFound.
 
4187
//
 
4188
// The Sort and Select query methods affect the result of Apply.  In case
 
4189
// multiple documents match the query, Sort enables selecting which document to
 
4190
// act upon by ordering it first.  Select enables retrieving only a selection
 
4191
// of fields of the new or old document.
 
4192
//
 
4193
// This simple example increments a counter and prints its new value:
 
4194
//
 
4195
//     change := mgo.Change{
 
4196
//             Update: bson.M{"$inc": bson.M{"n": 1}},
 
4197
//             ReturnNew: true,
 
4198
//     }
 
4199
//     info, err = col.Find(M{"_id": id}).Apply(change, &doc)
 
4200
//     fmt.Println(doc.N)
 
4201
//
 
4202
// This method depends on MongoDB >= 2.0 to work properly.
 
4203
//
 
4204
// Relevant documentation:
 
4205
//
 
4206
//     http://www.mongodb.org/display/DOCS/findAndModify+Command
 
4207
//     http://www.mongodb.org/display/DOCS/Updating
 
4208
//     http://www.mongodb.org/display/DOCS/Atomic+Operations
 
4209
//
 
4210
func (q *Query) Apply(change Change, result interface{}) (info *ChangeInfo, err error) {
 
4211
        q.m.Lock()
 
4212
        session := q.session
 
4213
        op := q.op // Copy.
 
4214
        q.m.Unlock()
 
4215
 
 
4216
        c := strings.Index(op.collection, ".")
 
4217
        if c < 0 {
 
4218
                return nil, errors.New("bad collection name: " + op.collection)
 
4219
        }
 
4220
 
 
4221
        dbname := op.collection[:c]
 
4222
        cname := op.collection[c+1:]
 
4223
 
 
4224
        cmd := findModifyCmd{
 
4225
                Collection: cname,
 
4226
                Update:     change.Update,
 
4227
                Upsert:     change.Upsert,
 
4228
                Remove:     change.Remove,
 
4229
                New:        change.ReturnNew,
 
4230
                Query:      op.query,
 
4231
                Sort:       op.options.OrderBy,
 
4232
                Fields:     op.selector,
 
4233
        }
 
4234
 
 
4235
        session = session.Clone()
 
4236
        defer session.Close()
 
4237
        session.SetMode(Strong, false)
 
4238
 
 
4239
        var doc valueResult
 
4240
        for retries := 0; ; retries++ {
 
4241
                err = session.DB(dbname).Run(&cmd, &doc)
 
4242
                if err != nil {
 
4243
                        if qerr, ok := err.(*QueryError); ok && qerr.Message == "No matching object found" {
 
4244
                                return nil, ErrNotFound
 
4245
                        }
 
4246
                        if change.Upsert && IsDup(err) && retries < maxUpsertRetries {
 
4247
                                // Retry duplicate key errors on upserts.
 
4248
                                // https://docs.mongodb.com/v3.2/reference/method/db.collection.update/#use-unique-indexes
 
4249
                                continue
 
4250
                        }
 
4251
                        return nil, err
 
4252
                }
 
4253
                break // No error, so don't retry.
 
4254
        }
 
4255
 
 
4256
        if doc.LastError.N == 0 {
 
4257
                return nil, ErrNotFound
 
4258
        }
 
4259
        if doc.Value.Kind != 0x0A && result != nil {
 
4260
                err = doc.Value.Unmarshal(result)
 
4261
                if err != nil {
 
4262
                        return nil, err
 
4263
                }
 
4264
        }
 
4265
        info = &ChangeInfo{}
 
4266
        lerr := &doc.LastError
 
4267
        if lerr.UpdatedExisting {
 
4268
                info.Updated = lerr.N
 
4269
                info.Matched = lerr.N
 
4270
        } else if change.Remove {
 
4271
                info.Removed = lerr.N
 
4272
                info.Matched = lerr.N
 
4273
        } else if change.Upsert {
 
4274
                info.UpsertedId = lerr.UpsertedId
 
4275
        }
 
4276
        return info, nil
 
4277
}
 
4278
 
 
4279
// The BuildInfo type encapsulates details about the running MongoDB server.
 
4280
//
 
4281
// Note that the VersionArray field was introduced in MongoDB 2.0+, but it is
 
4282
// internally assembled from the Version information for previous versions.
 
4283
// In both cases, VersionArray is guaranteed to have at least 4 entries.
 
4284
type BuildInfo struct {
 
4285
        Version        string
 
4286
        VersionArray   []int  `bson:"versionArray"` // On MongoDB 2.0+; assembled from Version otherwise
 
4287
        GitVersion     string `bson:"gitVersion"`
 
4288
        OpenSSLVersion string `bson:"OpenSSLVersion"`
 
4289
        SysInfo        string `bson:"sysInfo"` // Deprecated and empty on MongoDB 3.2+.
 
4290
        Bits           int
 
4291
        Debug          bool
 
4292
        MaxObjectSize  int `bson:"maxBsonObjectSize"`
 
4293
}
 
4294
 
 
4295
// VersionAtLeast returns whether the BuildInfo version is greater than or
 
4296
// equal to the provided version number. If more than one number is
 
4297
// provided, numbers will be considered as major, minor, and so on.
 
4298
func (bi *BuildInfo) VersionAtLeast(version ...int) bool {
 
4299
        for i := range version {
 
4300
                if i == len(bi.VersionArray) {
 
4301
                        return false
 
4302
                }
 
4303
                if bi.VersionArray[i] < version[i] {
 
4304
                        return false
 
4305
                }
 
4306
        }
 
4307
        return true
 
4308
}
 
4309
 
 
4310
// BuildInfo retrieves the version and other details about the
 
4311
// running MongoDB server.
 
4312
func (s *Session) BuildInfo() (info BuildInfo, err error) {
 
4313
        err = s.Run(bson.D{{"buildInfo", "1"}}, &info)
 
4314
        if len(info.VersionArray) == 0 {
 
4315
                for _, a := range strings.Split(info.Version, ".") {
 
4316
                        i, err := strconv.Atoi(a)
 
4317
                        if err != nil {
 
4318
                                break
 
4319
                        }
 
4320
                        info.VersionArray = append(info.VersionArray, i)
 
4321
                }
 
4322
        }
 
4323
        for len(info.VersionArray) < 4 {
 
4324
                info.VersionArray = append(info.VersionArray, 0)
 
4325
        }
 
4326
        if i := strings.IndexByte(info.GitVersion, ' '); i >= 0 {
 
4327
                // Strip off the " modules: enterprise" suffix. This is a _git version_.
 
4328
                // That information may be moved to another field if people need it.
 
4329
                info.GitVersion = info.GitVersion[:i]
 
4330
        }
 
4331
        if info.SysInfo == "deprecated" {
 
4332
                info.SysInfo = ""
 
4333
        }
 
4334
        return
 
4335
}
 
4336
 
 
4337
// ---------------------------------------------------------------------------
 
4338
// Internal session handling helpers.
 
4339
 
 
4340
func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
 
4341
 
 
4342
        // Read-only lock to check for previously reserved socket.
 
4343
        s.m.RLock()
 
4344
        // If there is a slave socket reserved and its use is acceptable, take it as long
 
4345
        // as there isn't a master socket which would be preferred by the read preference mode.
 
4346
        if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
 
4347
                socket := s.slaveSocket
 
4348
                socket.Acquire()
 
4349
                s.m.RUnlock()
 
4350
                return socket, nil
 
4351
        }
 
4352
        if s.masterSocket != nil {
 
4353
                socket := s.masterSocket
 
4354
                socket.Acquire()
 
4355
                s.m.RUnlock()
 
4356
                return socket, nil
 
4357
        }
 
4358
        s.m.RUnlock()
 
4359
 
 
4360
        // No go.  We may have to request a new socket and change the session,
 
4361
        // so try again but with an exclusive lock now.
 
4362
        s.m.Lock()
 
4363
        defer s.m.Unlock()
 
4364
 
 
4365
        if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
 
4366
                s.slaveSocket.Acquire()
 
4367
                return s.slaveSocket, nil
 
4368
        }
 
4369
        if s.masterSocket != nil {
 
4370
                s.masterSocket.Acquire()
 
4371
                return s.masterSocket, nil
 
4372
        }
 
4373
 
 
4374
        // Still not good.  We need a new socket.
 
4375
        sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
 
4376
        if err != nil {
 
4377
                return nil, err
 
4378
        }
 
4379
 
 
4380
        // Authenticate the new socket.
 
4381
        if err = s.socketLogin(sock); err != nil {
 
4382
                sock.Release()
 
4383
                return nil, err
 
4384
        }
 
4385
 
 
4386
        // Keep track of the new socket, if necessary.
 
4387
        // Note that, as a special case, if the Eventual session was
 
4388
        // not refreshed (s.slaveSocket != nil), it means the developer
 
4389
        // asked to preserve an existing reserved socket, so we'll
 
4390
        // keep a master one around too before a Refresh happens.
 
4391
        if s.consistency != Eventual || s.slaveSocket != nil {
 
4392
                s.setSocket(sock)
 
4393
        }
 
4394
 
 
4395
        // Switch over a Monotonic session to the master.
 
4396
        if !slaveOk && s.consistency == Monotonic {
 
4397
                s.slaveOk = false
 
4398
        }
 
4399
 
 
4400
        return sock, nil
 
4401
}
 
4402
 
 
4403
// setSocket binds socket to this section.
 
4404
func (s *Session) setSocket(socket *mongoSocket) {
 
4405
        info := socket.Acquire()
 
4406
        if info.Master {
 
4407
                if s.masterSocket != nil {
 
4408
                        panic("setSocket(master) with existing master socket reserved")
 
4409
                }
 
4410
                s.masterSocket = socket
 
4411
        } else {
 
4412
                if s.slaveSocket != nil {
 
4413
                        panic("setSocket(slave) with existing slave socket reserved")
 
4414
                }
 
4415
                s.slaveSocket = socket
 
4416
        }
 
4417
}
 
4418
 
 
4419
// unsetSocket releases any slave and/or master sockets reserved.
 
4420
func (s *Session) unsetSocket() {
 
4421
        if s.masterSocket != nil {
 
4422
                s.masterSocket.Release()
 
4423
        }
 
4424
        if s.slaveSocket != nil {
 
4425
                s.slaveSocket.Release()
 
4426
        }
 
4427
        s.masterSocket = nil
 
4428
        s.slaveSocket = nil
 
4429
}
 
4430
 
 
4431
func (iter *Iter) replyFunc() replyFunc {
 
4432
        return func(err error, op *replyOp, docNum int, docData []byte) {
 
4433
                iter.m.Lock()
 
4434
                iter.docsToReceive--
 
4435
                if err != nil {
 
4436
                        iter.err = err
 
4437
                        debugf("Iter %p received an error: %s", iter, err.Error())
 
4438
                } else if docNum == -1 {
 
4439
                        debugf("Iter %p received no documents (cursor=%d).", iter, op.cursorId)
 
4440
                        if op != nil && op.cursorId != 0 {
 
4441
                                // It's a tailable cursor.
 
4442
                                iter.op.cursorId = op.cursorId
 
4443
                        } else if op != nil && op.cursorId == 0 && op.flags&1 == 1 {
 
4444
                                // Cursor likely timed out.
 
4445
                                iter.err = ErrCursor
 
4446
                        } else {
 
4447
                                iter.err = ErrNotFound
 
4448
                        }
 
4449
                } else if iter.findCmd {
 
4450
                        debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, int(op.replyDocs), op.cursorId)
 
4451
                        var findReply struct {
 
4452
                                Ok     bool
 
4453
                                Code   int
 
4454
                                Errmsg string
 
4455
                                Cursor cursorData
 
4456
                        }
 
4457
                        if err := bson.Unmarshal(docData, &findReply); err != nil {
 
4458
                                iter.err = err
 
4459
                        } else if !findReply.Ok && findReply.Errmsg != "" {
 
4460
                                iter.err = &QueryError{Code: findReply.Code, Message: findReply.Errmsg}
 
4461
                        } else if len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 {
 
4462
                                iter.err = ErrNotFound
 
4463
                        } else {
 
4464
                                batch := findReply.Cursor.FirstBatch
 
4465
                                if len(batch) == 0 {
 
4466
                                        batch = findReply.Cursor.NextBatch
 
4467
                                }
 
4468
                                rdocs := len(batch)
 
4469
                                for _, raw := range batch {
 
4470
                                        iter.docData.Push(raw.Data)
 
4471
                                }
 
4472
                                iter.docsToReceive = 0
 
4473
                                docsToProcess := iter.docData.Len()
 
4474
                                if iter.limit == 0 || int32(docsToProcess) < iter.limit {
 
4475
                                        iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs))
 
4476
                                } else {
 
4477
                                        iter.docsBeforeMore = -1
 
4478
                                }
 
4479
                                iter.op.cursorId = findReply.Cursor.Id
 
4480
                        }
 
4481
                } else {
 
4482
                        rdocs := int(op.replyDocs)
 
4483
                        if docNum == 0 {
 
4484
                                iter.docsToReceive += rdocs - 1
 
4485
                                docsToProcess := iter.docData.Len() + rdocs
 
4486
                                if iter.limit == 0 || int32(docsToProcess) < iter.limit {
 
4487
                                        iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs))
 
4488
                                } else {
 
4489
                                        iter.docsBeforeMore = -1
 
4490
                                }
 
4491
                                iter.op.cursorId = op.cursorId
 
4492
                        }
 
4493
                        debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, rdocs, op.cursorId)
 
4494
                        iter.docData.Push(docData)
 
4495
                }
 
4496
                iter.gotReply.Broadcast()
 
4497
                iter.m.Unlock()
 
4498
        }
 
4499
}
 
4500
 
 
4501
type writeCmdResult struct {
 
4502
        Ok        bool
 
4503
        N         int
 
4504
        NModified int `bson:"nModified"`
 
4505
        Upserted  []struct {
 
4506
                Index int
 
4507
                Id    interface{} `_id`
 
4508
        }
 
4509
        ConcernError writeConcernError `bson:"writeConcernError"`
 
4510
        Errors       []writeCmdError   `bson:"writeErrors"`
 
4511
}
 
4512
 
 
4513
type writeConcernError struct {
 
4514
        Code   int
 
4515
        ErrMsg string
 
4516
}
 
4517
 
 
4518
type writeCmdError struct {
 
4519
        Index  int
 
4520
        Code   int
 
4521
        ErrMsg string
 
4522
}
 
4523
 
 
4524
func (r *writeCmdResult) BulkErrorCases() []BulkErrorCase {
 
4525
        ecases := make([]BulkErrorCase, len(r.Errors))
 
4526
        for i, err := range r.Errors {
 
4527
                ecases[i] = BulkErrorCase{err.Index, &QueryError{Code: err.Code, Message: err.ErrMsg}}
 
4528
        }
 
4529
        return ecases
 
4530
}
 
4531
 
 
4532
// writeOp runs the given modifying operation, potentially followed up
 
4533
// by a getLastError command in case the session is in safe mode.  The
 
4534
// LastError result is made available in lerr, and if lerr.Err is set it
 
4535
// will also be returned as err.
 
4536
func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err error) {
 
4537
        s := c.Database.Session
 
4538
        socket, err := s.acquireSocket(c.Database.Name == "local")
 
4539
        if err != nil {
 
4540
                return nil, err
 
4541
        }
 
4542
        defer socket.Release()
 
4543
 
 
4544
        s.m.RLock()
 
4545
        safeOp := s.safeOp
 
4546
        bypassValidation := s.bypassValidation
 
4547
        s.m.RUnlock()
 
4548
 
 
4549
        if socket.ServerInfo().MaxWireVersion >= 2 {
 
4550
                // Servers with a more recent write protocol benefit from write commands.
 
4551
                if op, ok := op.(*insertOp); ok && len(op.documents) > 1000 {
 
4552
                        var lerr LastError
 
4553
 
 
4554
                        // Maximum batch size is 1000. Must split out in separate operations for compatibility.
 
4555
                        all := op.documents
 
4556
                        for i := 0; i < len(all); i += 1000 {
 
4557
                                l := i + 1000
 
4558
                                if l > len(all) {
 
4559
                                        l = len(all)
 
4560
                                }
 
4561
                                op.documents = all[i:l]
 
4562
                                oplerr, err := c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation)
 
4563
                                lerr.N += oplerr.N
 
4564
                                lerr.modified += oplerr.modified
 
4565
                                if err != nil {
 
4566
                                        for ei := range lerr.ecases {
 
4567
                                                oplerr.ecases[ei].Index += i
 
4568
                                        }
 
4569
                                        lerr.ecases = append(lerr.ecases, oplerr.ecases...)
 
4570
                                        if op.flags&1 == 0 {
 
4571
                                                return &lerr, err
 
4572
                                        }
 
4573
                                }
 
4574
                        }
 
4575
                        if len(lerr.ecases) != 0 {
 
4576
                                return &lerr, lerr.ecases[0].Err
 
4577
                        }
 
4578
                        return &lerr, nil
 
4579
                }
 
4580
                return c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation)
 
4581
        } else if updateOps, ok := op.(bulkUpdateOp); ok {
 
4582
                var lerr LastError
 
4583
                for i, updateOp := range updateOps {
 
4584
                        oplerr, err := c.writeOpQuery(socket, safeOp, updateOp, ordered)
 
4585
                        lerr.N += oplerr.N
 
4586
                        lerr.modified += oplerr.modified
 
4587
                        if err != nil {
 
4588
                                lerr.ecases = append(lerr.ecases, BulkErrorCase{i, err})
 
4589
                                if ordered {
 
4590
                                        break
 
4591
                                }
 
4592
                        }
 
4593
                }
 
4594
                if len(lerr.ecases) != 0 {
 
4595
                        return &lerr, lerr.ecases[0].Err
 
4596
                }
 
4597
                return &lerr, nil
 
4598
        } else if deleteOps, ok := op.(bulkDeleteOp); ok {
 
4599
                var lerr LastError
 
4600
                for i, deleteOp := range deleteOps {
 
4601
                        oplerr, err := c.writeOpQuery(socket, safeOp, deleteOp, ordered)
 
4602
                        lerr.N += oplerr.N
 
4603
                        lerr.modified += oplerr.modified
 
4604
                        if err != nil {
 
4605
                                lerr.ecases = append(lerr.ecases, BulkErrorCase{i, err})
 
4606
                                if ordered {
 
4607
                                        break
 
4608
                                }
 
4609
                        }
 
4610
                }
 
4611
                if len(lerr.ecases) != 0 {
 
4612
                        return &lerr, lerr.ecases[0].Err
 
4613
                }
 
4614
                return &lerr, nil
 
4615
        }
 
4616
        return c.writeOpQuery(socket, safeOp, op, ordered)
 
4617
}
 
4618
 
 
4619
func (c *Collection) writeOpQuery(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered bool) (lerr *LastError, err error) {
 
4620
        if safeOp == nil {
 
4621
                return nil, socket.Query(op)
 
4622
        }
 
4623
 
 
4624
        var mutex sync.Mutex
 
4625
        var replyData []byte
 
4626
        var replyErr error
 
4627
        mutex.Lock()
 
4628
        query := *safeOp // Copy the data.
 
4629
        query.collection = c.Database.Name + ".$cmd"
 
4630
        query.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) {
 
4631
                replyData = docData
 
4632
                replyErr = err
 
4633
                mutex.Unlock()
 
4634
        }
 
4635
        err = socket.Query(op, &query)
 
4636
        if err != nil {
 
4637
                return nil, err
 
4638
        }
 
4639
        mutex.Lock() // Wait.
 
4640
        if replyErr != nil {
 
4641
                return nil, replyErr // XXX TESTME
 
4642
        }
 
4643
        if hasErrMsg(replyData) {
 
4644
                // Looks like getLastError itself failed.
 
4645
                err = checkQueryError(query.collection, replyData)
 
4646
                if err != nil {
 
4647
                        return nil, err
 
4648
                }
 
4649
        }
 
4650
        result := &LastError{}
 
4651
        bson.Unmarshal(replyData, &result)
 
4652
        debugf("Result from writing query: %#v", result)
 
4653
        if result.Err != "" {
 
4654
                result.ecases = []BulkErrorCase{{Index: 0, Err: result}}
 
4655
                if insert, ok := op.(*insertOp); ok && len(insert.documents) > 1 {
 
4656
                        result.ecases[0].Index = -1
 
4657
                }
 
4658
                return result, result
 
4659
        }
 
4660
        // With MongoDB <2.6 we don't know how many actually changed, so make it the same as matched.
 
4661
        result.modified = result.N
 
4662
        return result, nil
 
4663
}
 
4664
 
 
4665
func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered, bypassValidation bool) (lerr *LastError, err error) {
 
4666
        var writeConcern interface{}
 
4667
        if safeOp == nil {
 
4668
                writeConcern = bson.D{{"w", 0}}
 
4669
        } else {
 
4670
                writeConcern = safeOp.query.(*getLastError)
 
4671
        }
 
4672
 
 
4673
        var cmd bson.D
 
4674
        switch op := op.(type) {
 
4675
        case *insertOp:
 
4676
                // http://docs.mongodb.org/manual/reference/command/insert
 
4677
                cmd = bson.D{
 
4678
                        {"insert", c.Name},
 
4679
                        {"documents", op.documents},
 
4680
                        {"writeConcern", writeConcern},
 
4681
                        {"ordered", op.flags&1 == 0},
 
4682
                }
 
4683
        case *updateOp:
 
4684
                // http://docs.mongodb.org/manual/reference/command/update
 
4685
                cmd = bson.D{
 
4686
                        {"update", c.Name},
 
4687
                        {"updates", []interface{}{op}},
 
4688
                        {"writeConcern", writeConcern},
 
4689
                        {"ordered", ordered},
 
4690
                }
 
4691
        case bulkUpdateOp:
 
4692
                // http://docs.mongodb.org/manual/reference/command/update
 
4693
                cmd = bson.D{
 
4694
                        {"update", c.Name},
 
4695
                        {"updates", op},
 
4696
                        {"writeConcern", writeConcern},
 
4697
                        {"ordered", ordered},
 
4698
                }
 
4699
        case *deleteOp:
 
4700
                // http://docs.mongodb.org/manual/reference/command/delete
 
4701
                cmd = bson.D{
 
4702
                        {"delete", c.Name},
 
4703
                        {"deletes", []interface{}{op}},
 
4704
                        {"writeConcern", writeConcern},
 
4705
                        {"ordered", ordered},
 
4706
                }
 
4707
        case bulkDeleteOp:
 
4708
                // http://docs.mongodb.org/manual/reference/command/delete
 
4709
                cmd = bson.D{
 
4710
                        {"delete", c.Name},
 
4711
                        {"deletes", op},
 
4712
                        {"writeConcern", writeConcern},
 
4713
                        {"ordered", ordered},
 
4714
                }
 
4715
        }
 
4716
        if bypassValidation {
 
4717
                cmd = append(cmd, bson.DocElem{"bypassDocumentValidation", true})
 
4718
        }
 
4719
 
 
4720
        var result writeCmdResult
 
4721
        err = c.Database.run(socket, cmd, &result)
 
4722
        debugf("Write command result: %#v (err=%v)", result, err)
 
4723
        ecases := result.BulkErrorCases()
 
4724
        lerr = &LastError{
 
4725
                UpdatedExisting: result.N > 0 && len(result.Upserted) == 0,
 
4726
                N:               result.N,
 
4727
 
 
4728
                modified: result.NModified,
 
4729
                ecases:   ecases,
 
4730
        }
 
4731
        if len(result.Upserted) > 0 {
 
4732
                lerr.UpsertedId = result.Upserted[0].Id
 
4733
        }
 
4734
        if len(result.Errors) > 0 {
 
4735
                e := result.Errors[0]
 
4736
                lerr.Code = e.Code
 
4737
                lerr.Err = e.ErrMsg
 
4738
                err = lerr
 
4739
        } else if result.ConcernError.Code != 0 {
 
4740
                e := result.ConcernError
 
4741
                lerr.Code = e.Code
 
4742
                lerr.Err = e.ErrMsg
 
4743
                err = lerr
 
4744
        }
 
4745
 
 
4746
        if err == nil && safeOp == nil {
 
4747
                return nil, nil
 
4748
        }
 
4749
        return lerr, err
 
4750
}
 
4751
 
 
4752
func hasErrMsg(d []byte) bool {
 
4753
        l := len(d)
 
4754
        for i := 0; i+8 < l; i++ {
 
4755
                if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' {
 
4756
                        return true
 
4757
                }
 
4758
        }
 
4759
        return false
 
4760
}