~rogpeppe/+junk/mgo-tagged-log-messages

« back to all changes in this revision

Viewing changes to session.go

  • Committer: Roger Peppe
  • Date: 2014-03-14 18:11:33 UTC
  • mfrom: (263.1.8 master)
  • Revision ID: roger.peppe@canonical.com-20140314181133-107ag3xpitk9682u
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
34
34
        "local/runtime/debug"
35
35
        "math"
36
36
        "net"
 
37
        "net/url"
37
38
        "reflect"
38
39
        "sort"
39
40
        "strconv"
68
69
        syncTimeout  time.Duration
69
70
        sockTimeout  time.Duration
70
71
        defaultdb    string
71
 
        dialAuth     *authInfo
72
 
        auth         []authInfo
 
72
        sourcedb     string
 
73
        dialCred     *Credential
 
74
        creds        []Credential
73
75
}
74
76
 
75
77
type Database struct {
164
166
//
165
167
//     connect=direct
166
168
//
167
 
//         This option will disable the automatic replica set server
168
 
//         discovery logic, and will only use the servers provided.
169
 
//         This enables forcing the communication with a specific
170
 
//         server or set of servers (even if they are slaves).  Note
171
 
//         that to talk to a slave you'll need to relax the consistency
172
 
//         requirements using a Monotonic or Eventual mode via SetMode.
 
169
//         Disables the automatic replica set server discovery logic, and
 
170
//         forces the use of servers provided only (even if secondaries).
 
171
//         Note that to talk to a secondary the consistency requirements
 
172
//         must be relaxed to Monotonic or Eventual via SetMode.
 
173
//
 
174
//
 
175
//     authSource=<db>
 
176
//
 
177
//         Informs the database used to establish credentials and privileges
 
178
//         with a MongoDB server. Defaults to the database name provided via
 
179
//         the URL path, and "admin" if that's unset.
 
180
//
 
181
//
 
182
//     authMechanism=<mechanism>
 
183
//
 
184
//        Defines the protocol for credential negotiation. Defaults to "MONGODB-CR",
 
185
//        which is the default username/password challenge-response mechanism.
 
186
//
 
187
//
 
188
//     gssapiServiceName=<name>
 
189
//
 
190
//           Defines the service name to use when authenticating with the GSSAPI
 
191
//           mechanism. Defaults to "mongodb".
 
192
//
173
193
//
174
194
// Relevant documentation:
175
195
//
176
 
//     http://www.mongodb.org/display/DOCS/Connections
 
196
//     http://docs.mongodb.org/manual/reference/connection-string/
177
197
//
178
198
func Dial(url string) (*Session, error) {
179
199
        session, err := DialWithTimeout(url, 10*time.Second)
196
216
                return nil, err
197
217
        }
198
218
        direct := false
 
219
        mechanism := ""
 
220
        service := ""
 
221
        source := ""
199
222
        for k, v := range uinfo.options {
200
223
                switch k {
 
224
                case "authSource":
 
225
                        source = v
 
226
                case "authMechanism":
 
227
                        mechanism = v
 
228
                case "gssapiServiceName":
 
229
                        service = v
201
230
                case "connect":
202
231
                        if v == "direct" {
203
232
                                direct = true
208
237
                        }
209
238
                        fallthrough
210
239
                default:
211
 
                        return nil, errors.New("Unsupported connection URL option: " + k + "=" + v)
 
240
                        return nil, errors.New("unsupported connection URL option: " + k + "=" + v)
212
241
                }
213
242
        }
214
243
        info := DialInfo{
215
 
                Addrs:    uinfo.addrs,
216
 
                Direct:   direct,
217
 
                Timeout:  timeout,
218
 
                Username: uinfo.user,
219
 
                Password: uinfo.pass,
220
 
                Database: uinfo.db,
 
244
                Addrs:     uinfo.addrs,
 
245
                Direct:    direct,
 
246
                Timeout:   timeout,
 
247
                Database:  uinfo.db,
 
248
                Username:  uinfo.user,
 
249
                Password:  uinfo.pass,
 
250
                Mechanism: mechanism,
 
251
                Service:   service,
 
252
                Source:    source,
221
253
        }
222
254
        return DialWithInfo(&info)
223
255
}
246
278
        // distinguish it from a slow server, so the timeout stays relevant.
247
279
        FailFast bool
248
280
 
249
 
        // Database is the database name used during the initial authentication.
250
 
        // If set, the value is also returned as the default result from the
251
 
        // Session.DB method, in place of "test".
 
281
        // Database is the default database name used when the Session.DB method
 
282
        // is called with an empty name, and is also used during the intial
 
283
        // authenticatoin if Source is unset.
252
284
        Database string
253
285
 
254
 
        // Username and Password inform the credentials for the initial
255
 
        // authentication done against Database, if that is set,
256
 
        // or the "admin" database otherwise. See the Session.Login method too.
 
286
        // Source is the database used to establish credentials and privileges
 
287
        // with a MongoDB server. Defaults to the value of Database, if that is
 
288
        // set, or "admin" otherwise.
 
289
        Source string
 
290
 
 
291
        // Service defines the service name to use when authenticating with the GSSAPI
 
292
        // mechanism. Defaults to "mongodb".
 
293
        Service string
 
294
 
 
295
        // Mechanism defines the protocol for credential negotiation.
 
296
        // Defaults to "MONGODB-CR".
 
297
        Mechanism string
 
298
 
 
299
        // Username and Password inform the credentials for the initial authentication
 
300
        // done on the database defined by the Source field. See Session.Login.
257
301
        Username string
258
302
        Password string
259
303
 
302
346
        if session.defaultdb == "" {
303
347
                session.defaultdb = "test"
304
348
        }
 
349
        session.sourcedb = info.Source
 
350
        if session.sourcedb == "" {
 
351
                session.sourcedb = info.Database
 
352
                if session.sourcedb == "" {
 
353
                        session.sourcedb = "admin"
 
354
                }
 
355
        }
305
356
        if info.Username != "" {
306
 
                db := info.Database
307
 
                if db == "" {
308
 
                        db = "admin"
309
 
                }
310
 
                session.dialAuth = &authInfo{db, info.Username, info.Password}
311
 
                session.auth = []authInfo{*session.dialAuth}
 
357
                source := session.sourcedb
 
358
                if info.Source == "" && info.Mechanism == "GSSAPI" {
 
359
                        source = "$external"
 
360
                }
 
361
                session.dialCred = &Credential{
 
362
                        Username:  info.Username,
 
363
                        Password:  info.Password,
 
364
                        Mechanism: info.Mechanism,
 
365
                        Service:   info.Service,
 
366
                        Source:    source,
 
367
                }
 
368
                session.creds = []Credential{*session.dialCred}
312
369
        }
313
370
        cluster.Release()
314
371
 
336
393
        options map[string]string
337
394
}
338
395
 
339
 
func parseURL(url string) (*urlInfo, error) {
340
 
        if strings.HasPrefix(url, "mongodb://") {
341
 
                url = url[10:]
 
396
func parseURL(s string) (*urlInfo, error) {
 
397
        if strings.HasPrefix(s, "mongodb://") {
 
398
                s = s[10:]
342
399
        }
343
400
        info := &urlInfo{options: make(map[string]string)}
344
 
        if c := strings.Index(url, "?"); c != -1 {
345
 
                for _, pair := range strings.FieldsFunc(url[c+1:], isOptSep) {
 
401
        if c := strings.Index(s, "?"); c != -1 {
 
402
                for _, pair := range strings.FieldsFunc(s[c+1:], isOptSep) {
346
403
                        l := strings.SplitN(pair, "=", 2)
347
404
                        if len(l) != 2 || l[0] == "" || l[1] == "" {
348
 
                                return nil, errors.New("Connection option must be key=value: " + pair)
 
405
                                return nil, errors.New("connection option must be key=value: " + pair)
349
406
                        }
350
407
                        info.options[l[0]] = l[1]
351
408
                }
352
 
                url = url[:c]
353
 
        }
354
 
        if c := strings.Index(url, "@"); c != -1 {
355
 
                pair := strings.SplitN(url[:c], ":", 2)
356
 
                if len(pair) != 2 || pair[0] == "" {
357
 
                        return nil, errors.New("Credentials must be provided as user:pass@host")
358
 
                }
359
 
                info.user = pair[0]
360
 
                info.pass = pair[1]
361
 
                url = url[c+1:]
362
 
        }
363
 
        if c := strings.Index(url, "/"); c != -1 {
364
 
                info.db = url[c+1:]
365
 
                url = url[:c]
366
 
        }
367
 
        info.addrs = strings.Split(url, ",")
 
409
                s = s[:c]
 
410
        }
 
411
        if c := strings.Index(s, "@"); c != -1 {
 
412
                pair := strings.SplitN(s[:c], ":", 2)
 
413
                if len(pair) > 2 || pair[0] == "" {
 
414
                        return nil, errors.New("credentials must be provided as user:pass@host")
 
415
                }
 
416
                var err error
 
417
                info.user, err = url.QueryUnescape(pair[0])
 
418
                if err != nil {
 
419
                        return nil, fmt.Errorf("cannot unescape username in URL: %q", pair[0])
 
420
                }
 
421
                if len(pair) > 1 {
 
422
                        info.pass, err = url.QueryUnescape(pair[1])
 
423
                        if err != nil {
 
424
                                return nil, fmt.Errorf("cannot unescape password in URL")
 
425
                        }
 
426
                }
 
427
                s = s[c+1:]
 
428
        }
 
429
        if c := strings.Index(s, "/"); c != -1 {
 
430
                info.db = s[c+1:]
 
431
                s = s[:c]
 
432
        }
 
433
        info.addrs = strings.Split(s, ",")
368
434
        return info, nil
369
435
}
370
436
 
383
449
        return session
384
450
}
385
451
 
386
 
func copySession(session *Session, keepAuth bool) (s *Session) {
 
452
func copySession(session *Session, keepCreds bool) (s *Session) {
387
453
        cluster := session.cluster()
388
454
        cluster.Acquire()
389
455
        if session.masterSocket != nil {
392
458
        if session.slaveSocket != nil {
393
459
                session.slaveSocket.Acquire()
394
460
        }
395
 
        var auth []authInfo
396
 
        if keepAuth {
397
 
                auth = make([]authInfo, len(session.auth))
398
 
                copy(auth, session.auth)
399
 
        } else if session.dialAuth != nil {
400
 
                auth = []authInfo{*session.dialAuth}
 
461
        var creds []Credential
 
462
        if keepCreds {
 
463
                creds = make([]Credential, len(session.creds))
 
464
                copy(creds, session.creds)
 
465
        } else if session.dialCred != nil {
 
466
                creds = []Credential{*session.dialCred}
401
467
        }
402
468
        scopy := *session
403
469
        scopy.m = sync.RWMutex{}
404
 
        scopy.auth = auth
 
470
        scopy.creds = creds
405
471
        s = &scopy
406
472
        session.debugf("New session %p on cluster %p (copy from %p)", s, cluster, session)
407
473
        return s
472
538
        return newGridFS(db, prefix)
473
539
}
474
540
 
475
 
// Run issues the provided command against the database and unmarshals
 
541
// Run issues the provided command on the db database and unmarshals
476
542
// its result in the respective argument. The cmd argument may be either
477
543
// a string with the command name itself, in which case an empty document of
478
544
// the form bson.M{cmd: 1} will be used, or it may be a full command document.
484
550
//
485
551
//     db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
486
552
//
487
 
// For privilleged commands typically run against the "admin" database, see
 
553
// For privilleged commands typically run on the "admin" database, see
488
554
// the Run method in the Session type.
489
555
//
490
556
// Relevant documentation:
499
565
        return db.C("$cmd").Find(cmd).One(result)
500
566
}
501
567
 
502
 
// Login authenticates against MongoDB with the provided credentials.  The
503
 
// authentication is valid for the whole session and will stay valid until
504
 
// Logout is explicitly called for the same database, or the session is
505
 
// closed.
506
 
//
507
 
// Concurrent Login calls will work correctly.
508
 
func (db *Database) Login(user, pass string) (err error) {
509
 
        session := db.Session
510
 
        dbname := db.Name
511
 
 
512
 
        socket, err := session.acquireSocket(true)
 
568
// Credential holds details to authenticate with a MongoDB server.
 
569
type Credential struct {
 
570
        // Username and Password hold the basic details for authentication.
 
571
        // Password is optional with some authentication mechanisms.
 
572
        Username string
 
573
        Password string
 
574
 
 
575
        // Source is the database used to establish credentials and privileges
 
576
        // with a MongoDB server. Defaults to the default database provided
 
577
        // during dial, or "admin" if that was unset.
 
578
        Source string
 
579
 
 
580
        // Service defines the service name to use when authenticating with the GSSAPI
 
581
        // mechanism. Defaults to "mongodb".
 
582
        Service string
 
583
 
 
584
        // Mechanism defines the protocol for credential negotiation.
 
585
        // Defaults to "MONGODB-CR".
 
586
        Mechanism string
 
587
}
 
588
 
 
589
// Login authenticates with MongoDB using the provided credential.  The
 
590
// authentication is valid for the whole session and will stay valid until
 
591
// Logout is explicitly called for the same database, or the session is
 
592
// closed.
 
593
func (db *Database) Login(user, pass string) error {
 
594
        return db.Session.Login(&Credential{Username: user, Password: pass, Source: db.Name})
 
595
}
 
596
 
 
597
// Login authenticates with MongoDB using the provided credential.  The
 
598
// authentication is valid for the whole session and will stay valid until
 
599
// Logout is explicitly called for the same database, or the session is
 
600
// closed.
 
601
func (s *Session) Login(cred *Credential) error {
 
602
        socket, err := s.acquireSocket(true)
513
603
        if err != nil {
514
604
                return err
515
605
        }
516
606
        defer socket.Release()
517
607
 
518
 
        err = socket.Login(dbname, user, pass)
519
 
        if err != nil {
520
 
                return err
521
 
        }
522
 
 
523
 
        session.m.Lock()
524
 
        defer session.m.Unlock()
525
 
 
526
 
        for _, a := range session.auth {
527
 
                if a.db == dbname {
528
 
                        a.user = user
529
 
                        a.pass = pass
530
 
                        return nil
 
608
        credCopy := *cred
 
609
        if cred.Source == "" {
 
610
                if cred.Mechanism == "GSSAPI" {
 
611
                        credCopy.Source = "$external"
 
612
                } else {
 
613
                        credCopy.Source = s.sourcedb
531
614
                }
532
615
        }
533
 
        session.auth = append(session.auth, authInfo{dbname, user, pass})
 
616
        err = socket.Login(credCopy)
 
617
        if err != nil {
 
618
                return err
 
619
        }
 
620
 
 
621
        s.m.Lock()
 
622
        s.creds = append(s.creds, credCopy)
 
623
        s.m.Unlock()
534
624
        return nil
535
625
}
536
626
 
537
627
func (s *Session) socketLogin(socket *mongoSocket) error {
538
 
        for _, a := range s.auth {
539
 
                if err := socket.Login(a.db, a.user, a.pass); err != nil {
 
628
        for _, cred := range s.creds {
 
629
                if err := socket.Login(cred); err != nil {
540
630
                        return err
541
631
                }
542
632
        }
549
639
        dbname := db.Name
550
640
        session.m.Lock()
551
641
        found := false
552
 
        for i, a := range session.auth {
553
 
                if a.db == dbname {
554
 
                        copy(session.auth[i:], session.auth[i+1:])
555
 
                        session.auth = session.auth[:len(session.auth)-1]
 
642
        for i, cred := range session.creds {
 
643
                if cred.Source == dbname {
 
644
                        copy(session.creds[i:], session.creds[i+1:])
 
645
                        session.creds = session.creds[:len(session.creds)-1]
556
646
                        found = true
557
647
                        break
558
648
                }
571
661
// LogoutAll removes all established authentication credentials for the session.
572
662
func (s *Session) LogoutAll() {
573
663
        s.m.Lock()
574
 
        for _, a := range s.auth {
 
664
        for _, cred := range s.creds {
575
665
                if s.masterSocket != nil {
576
 
                        s.masterSocket.Logout(a.db)
 
666
                        s.masterSocket.Logout(cred.Source)
577
667
                }
578
668
                if s.slaveSocket != nil {
579
 
                        s.slaveSocket.Logout(a.db)
 
669
                        s.slaveSocket.Logout(cred.Source)
580
670
                }
581
671
        }
582
 
        s.auth = s.auth[0:0]
 
672
        s.creds = s.creds[0:0]
583
673
        s.m.Unlock()
584
674
}
585
675
 
770
860
                        }
771
861
                }
772
862
                if field == "" || kind != "" && order != kind {
773
 
                        return "", nil, fmt.Errorf(`Invalid index key: want "[$<kind>:][-]<field name>", got %q`, raw)
 
863
                        return "", nil, fmt.Errorf(`invalid index key: want "[$<kind>:][-]<field name>", got %q`, raw)
774
864
                }
775
865
                realKey = append(realKey, bson.DocElem{field, order})
776
866
        }
777
867
        if name == "" {
778
 
                return "", nil, errors.New("Invalid index key: no fields provided")
 
868
                return "", nil, errors.New("invalid index key: no fields provided")
779
869
        }
780
870
        return
781
871
}
1088
1178
// SetMode changes the consistency mode for the session.
1089
1179
//
1090
1180
// In the Strong consistency mode reads and writes will always be made to
1091
 
// the master server using a unique connection so that reads and writes are
 
1181
// the primary server using a unique connection so that reads and writes are
1092
1182
// fully consistent, ordered, and observing the most up-to-date data.
1093
1183
// This offers the least benefits in terms of distributing load, but the
1094
1184
// most guarantees.  See also Monotonic and Eventual.
1100
1190
// queries (read-your-writes).
1101
1191
//
1102
1192
// In practice, the Monotonic mode is obtained by performing initial reads
1103
 
// against a unique connection to an arbitrary slave, if one is available,
 
1193
// on a unique connection to an arbitrary secondary, if one is available,
1104
1194
// and once the first write happens, the session connection is switched over
1105
 
// to the master server.  This manages to distribute some of the reading
1106
 
// load with slaves, while maintaining some useful guarantees.
 
1195
// to the primary server.  This manages to distribute some of the reading
 
1196
// load with secondaries, while maintaining some useful guarantees.
1107
1197
//
1108
 
// In the Eventual consistency mode reads will be made to any slave in the
 
1198
// In the Eventual consistency mode reads will be made to any secondary in the
1109
1199
// cluster, if one is available, and sequential reads will not necessarily
1110
1200
// be made with the same connection.  This means that data may be observed
1111
 
// out of order.  Writes will of course be issued to the master, but
 
1201
// out of order.  Writes will of course be issued to the primary, but
1112
1202
// independent writes in the same Eventual session may also be made with
1113
1203
// independent connections, so there are also no guarantees in terms of
1114
1204
// write ordering (no read-your-writes guarantees either).
1119
1209
//
1120
1210
// If refresh is true, in addition to ensuring the session is in the given
1121
1211
// consistency mode, the consistency guarantees will also be reset (e.g.
1122
 
// a Monotonic session will be allowed to read from slaves again).  This is
1123
 
// equivalent to calling the Refresh function.
 
1212
// a Monotonic session will be allowed to read from secondaries again).
 
1213
// This is equivalent to calling the Refresh function.
1124
1214
//
1125
1215
// Shifting between Monotonic and Strong modes will keep a previously
1126
1216
// reserved connection for the session unless refresh is true or the
1127
 
// connection is unsuitable (to a slave server in a Strong session).
 
1217
// connection is unsuitable (to a secondary server in a Strong session).
1128
1218
func (s *Session) SetMode(consistency mode, refresh bool) {
1129
1219
        s.m.Lock()
1130
1220
        s.debugf("Session %p: setting mode %d with refresh=%v (master=%p, slave=%p)", s, consistency, refresh, s.masterSocket, s.slaveSocket)
1259
1349
//
1260
1350
// The safe.W parameter determines how many servers should confirm a write
1261
1351
// before the operation is considered successful.  If set to 0 or 1, the
1262
 
// command will return as soon as the master is done with the request.
 
1352
// command will return as soon as the primary is done with the request.
1263
1353
// If safe.WTimeout is greater than zero, it determines how many milliseconds
1264
1354
// to wait for the safe.W servers to respond before returning an error.
1265
1355
//
1391
1481
        }
1392
1482
}
1393
1483
 
1394
 
// Run issues the provided command against the "admin" database and
 
1484
// Run issues the provided command on the "admin" database and
1395
1485
// and unmarshals its result in the respective argument. The cmd
1396
1486
// argument may be either a string with the command name itself, in
1397
1487
// which case an empty document of the form bson.M{cmd: 1} will be used,
1404
1494
//
1405
1495
//     db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
1406
1496
//
1407
 
// For commands against arbitrary databases, see the Run method in
 
1497
// For commands on arbitrary databases, see the Run method in
1408
1498
// the Database type.
1409
1499
//
1410
1500
// Relevant documentation:
1457
1547
// after it is successfully locked will block until FsyncUnlock is
1458
1548
// called for the same server.
1459
1549
//
1460
 
// This method works on slaves as well, preventing the oplog from being
1461
 
// flushed while the server is locked, but since only the server
1462
 
// connected to is locked, for locking specific slaves it may be
1463
 
// necessary to establish a connection directly to the slave (see
 
1550
// This method works on secondaries as well, preventing the oplog from
 
1551
// being flushed while the server is locked, but since only the server
 
1552
// connected to is locked, for locking specific secondaries it may be
 
1553
// necessary to establish a connection directly to the secondary (see
1464
1554
// Dial's connect=direct option).
1465
1555
//
1466
1556
// As an important caveat, note that once a write is attempted and
1959
2049
//     http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order
1960
2050
//
1961
2051
func (q *Query) Sort(fields ...string) *Query {
 
2052
        // TODO //     query4 := collection.Find(nil).Sort("score:{$meta:textScore}")
1962
2053
        q.m.Lock()
1963
2054
        var order bson.D
1964
2055
        for _, field := range fields {
2077
2168
}
2078
2169
 
2079
2170
// LogReplay enables an option that optimizes queries that are typically
2080
 
// made against the MongoDB oplog for replaying it. This is an internal
 
2171
// made on the MongoDB oplog for replaying it. This is an internal
2081
2172
// implementation aspect and most likely uninteresting for other uses.
2082
2173
// It has seen at least one use case, though, so it's exposed via the API.
2083
2174
func (q *Query) LogReplay() *Query {