1
// mgo - MongoDB driver for Go
3
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
5
// All rights reserved.
7
// Redistribution and use in source and binary forms, with or without
8
// modification, are permitted provided that the following conditions are met:
10
// 1. Redistributions of source code must retain the above copyright notice, this
11
// list of conditions and the following disclaimer.
12
// 2. Redistributions in binary form must reproduce the above copyright notice,
13
// this list of conditions and the following disclaimer in the documentation
14
// and/or other materials provided with the distribution.
16
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
20
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36
"gopkg.in/mgo.v2/bson"
39
type replyFunc func(err error, reply *replyOp, docNum int, docData []byte)
41
type mongoSocket struct {
43
server *mongoServer // nil when cached
46
addr string // For debugging only.
48
replyFuncs map[uint32]replyFunc
55
serverInfo *mongoServerInfo
58
type queryOpFlags uint32
61
_ queryOpFlags = 1 << iota
84
type queryWrapper struct {
85
Query interface{} "$query"
86
OrderBy interface{} "$orderby,omitempty"
87
Hint interface{} "$hint,omitempty"
88
Explain bool "$explain,omitempty"
89
Snapshot bool "$snapshot,omitempty"
90
ReadPreference bson.D "$readPreference,omitempty"
91
MaxScan int "$maxScan,omitempty"
92
MaxTimeMS int "$maxTimeMS,omitempty"
93
Comment string "$comment,omitempty"
96
func (op *queryOp) finalQuery(socket *mongoSocket) interface{} {
97
if op.flags&flagSlaveOk != 0 && socket.ServerInfo().Mongos {
102
case Monotonic, Eventual:
103
modeName = "secondaryPreferred"
104
case PrimaryPreferred:
105
modeName = "primaryPreferred"
107
modeName = "secondary"
108
case SecondaryPreferred:
109
modeName = "secondaryPreferred"
113
panic(fmt.Sprintf("unsupported read mode: %d", op.mode))
116
op.options.ReadPreference = make(bson.D, 0, 2)
117
op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{"mode", modeName})
118
if len(op.serverTags) > 0 {
119
op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{"tags", op.serverTags})
125
op.options.Query = empty
127
op.options.Query = op.query
129
debugf("final query is %#v\n", &op.options)
135
type getMoreOp struct {
142
type replyOp struct {
149
type insertOp struct {
150
collection string // "database.collection"
151
documents []interface{} // One or more documents to insert
155
type updateOp struct {
156
Collection string `bson:"-"` // "database.collection"
157
Selector interface{} `bson:"q"`
158
Update interface{} `bson:"u"`
159
Flags uint32 `bson:"-"`
160
Multi bool `bson:"multi,omitempty"`
161
Upsert bool `bson:"upsert,omitempty"`
164
type deleteOp struct {
165
Collection string `bson:"-"` // "database.collection"
166
Selector interface{} `bson:"q"`
167
Flags uint32 `bson:"-"`
168
Limit int `bson:"limit"`
171
type killCursorsOp struct {
175
type requestInfo struct {
180
func newSocket(server *mongoServer, conn net.Conn, timeout time.Duration) *mongoSocket {
181
socket := &mongoSocket{
185
replyFuncs: make(map[uint32]replyFunc),
187
socket.gotNonce.L = &socket.Mutex
188
if err := socket.InitialAcquire(server.Info(), timeout); err != nil {
189
panic("newSocket: InitialAcquire returned error: " + err.Error())
191
stats.socketsAlive(+1)
192
debugf("Socket %p to %s: initialized", socket, socket.addr)
198
// Server returns the server that the socket is associated with.
199
// It returns nil while the socket is cached in its respective server.
200
func (socket *mongoSocket) Server() *mongoServer {
202
server := socket.server
207
// ServerInfo returns details for the server at the time the socket
208
// was initially acquired.
209
func (socket *mongoSocket) ServerInfo() *mongoServerInfo {
211
serverInfo := socket.serverInfo
216
// InitialAcquire obtains the first reference to the socket, either
217
// right after the connection is made or once a recycled socket is
218
// being put back in use.
219
func (socket *mongoSocket) InitialAcquire(serverInfo *mongoServerInfo, timeout time.Duration) error {
221
if socket.references > 0 {
222
panic("Socket acquired out of cache with references")
224
if socket.dead != nil {
230
socket.serverInfo = serverInfo
231
socket.timeout = timeout
232
stats.socketsInUse(+1)
238
// Acquire obtains an additional reference to the socket.
239
// The socket will only be recycled when it's released as many
240
// times as it's been acquired.
241
func (socket *mongoSocket) Acquire() (info *mongoServerInfo) {
243
if socket.references == 0 {
244
panic("Socket got non-initial acquire with references == 0")
246
// We'll track references to dead sockets as well.
247
// Caller is still supposed to release the socket.
250
serverInfo := socket.serverInfo
255
// Release decrements a socket reference. The socket will be
256
// recycled once its released as many times as it's been acquired.
257
func (socket *mongoSocket) Release() {
259
if socket.references == 0 {
260
panic("socket.Release() with references == 0")
264
if socket.references == 0 {
265
stats.socketsInUse(-1)
266
server := socket.server
269
// If the socket is dead server is nil.
271
server.RecycleSocket(socket)
278
// SetTimeout changes the timeout used on socket operations.
279
func (socket *mongoSocket) SetTimeout(d time.Duration) {
285
type deadlineType int
288
readDeadline deadlineType = 1
289
writeDeadline deadlineType = 2
292
func (socket *mongoSocket) updateDeadline(which deadlineType) {
294
if socket.timeout > 0 {
295
when = time.Now().Add(socket.timeout)
299
case readDeadline | writeDeadline:
300
whichstr = "read/write"
301
socket.conn.SetDeadline(when)
304
socket.conn.SetReadDeadline(when)
307
socket.conn.SetWriteDeadline(when)
309
panic("invalid parameter to updateDeadline")
311
debugf("Socket %p to %s: updated %s deadline to %s ahead (%s)", socket, socket.addr, whichstr, socket.timeout, when)
314
// Close terminates the socket use.
315
func (socket *mongoSocket) Close() {
316
socket.kill(errors.New("Closed explicitly"), false)
319
func (socket *mongoSocket) kill(err error, abend bool) {
321
if socket.dead != nil {
322
debugf("Socket %p to %s: killed again: %s (previously: %s)", socket, socket.addr, err.Error(), socket.dead.Error())
326
logf("Socket %p to %s: closing: %s (abend=%v)", socket, socket.addr, err.Error(), abend)
329
stats.socketsAlive(-1)
330
replyFuncs := socket.replyFuncs
331
socket.replyFuncs = make(map[uint32]replyFunc)
332
server := socket.server
334
socket.gotNonce.Broadcast()
336
for _, replyFunc := range replyFuncs {
337
logf("Socket %p to %s: notifying replyFunc of closed socket: %s", socket, socket.addr, err.Error())
338
replyFunc(err, nil, -1, nil)
341
server.AbendSocket(socket)
345
func (socket *mongoSocket) SimpleQuery(op *queryOp) (data []byte, err error) {
346
var wait, change sync.Mutex
351
op.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) {
363
err = socket.Query(op)
375
func (socket *mongoSocket) Query(ops ...interface{}) (err error) {
377
if lops := socket.flushLogout(); len(lops) > 0 {
378
ops = append(lops, ops...)
381
buf := make([]byte, 0, 256)
383
// Serialize operations synchronously to avoid interrupting
384
// other goroutines while we can't really be sending data.
385
// Also, record id positions so that we can compute request
386
// ids at once later with the lock already held.
387
requests := make([]requestInfo, len(ops))
390
for _, op := range ops {
391
debugf("Socket %p to %s: serializing op: %#v", socket, socket.addr, op)
392
if qop, ok := op.(*queryOp); ok {
393
if cmd, ok := qop.query.(*findCmd); ok {
394
debugf("Socket %p to %s: find command: %#v", socket, socket.addr, cmd)
398
var replyFunc replyFunc
399
switch op := op.(type) {
402
buf = addHeader(buf, 2001)
403
buf = addInt32(buf, 0) // Reserved
404
buf = addCString(buf, op.Collection)
405
buf = addInt32(buf, int32(op.Flags))
406
debugf("Socket %p to %s: serializing selector document: %#v", socket, socket.addr, op.Selector)
407
buf, err = addBSON(buf, op.Selector)
411
debugf("Socket %p to %s: serializing update document: %#v", socket, socket.addr, op.Update)
412
buf, err = addBSON(buf, op.Update)
418
buf = addHeader(buf, 2002)
419
buf = addInt32(buf, int32(op.flags))
420
buf = addCString(buf, op.collection)
421
for _, doc := range op.documents {
422
debugf("Socket %p to %s: serializing document for insertion: %#v", socket, socket.addr, doc)
423
buf, err = addBSON(buf, doc)
430
buf = addHeader(buf, 2004)
431
buf = addInt32(buf, int32(op.flags))
432
buf = addCString(buf, op.collection)
433
buf = addInt32(buf, op.skip)
434
buf = addInt32(buf, op.limit)
435
buf, err = addBSON(buf, op.finalQuery(socket))
439
if op.selector != nil {
440
buf, err = addBSON(buf, op.selector)
445
replyFunc = op.replyFunc
448
buf = addHeader(buf, 2005)
449
buf = addInt32(buf, 0) // Reserved
450
buf = addCString(buf, op.collection)
451
buf = addInt32(buf, op.limit)
452
buf = addInt64(buf, op.cursorId)
453
replyFunc = op.replyFunc
456
buf = addHeader(buf, 2006)
457
buf = addInt32(buf, 0) // Reserved
458
buf = addCString(buf, op.Collection)
459
buf = addInt32(buf, int32(op.Flags))
460
debugf("Socket %p to %s: serializing selector document: %#v", socket, socket.addr, op.Selector)
461
buf, err = addBSON(buf, op.Selector)
467
buf = addHeader(buf, 2007)
468
buf = addInt32(buf, 0) // Reserved
469
buf = addInt32(buf, int32(len(op.cursorIds)))
470
for _, cursorId := range op.cursorIds {
471
buf = addInt64(buf, cursorId)
475
panic("internal error: unknown operation type")
478
setInt32(buf, start, int32(len(buf)-start))
480
if replyFunc != nil {
481
request := &requests[requestCount]
482
request.replyFunc = replyFunc
483
request.bufferPos = start
488
// Buffer is ready for the pipe. Lock, allocate ids, and enqueue.
491
if socket.dead != nil {
494
debugf("Socket %p to %s: failing query, already closed: %s", socket, socket.addr, socket.dead.Error())
495
// XXX This seems necessary in case the session is closed concurrently
496
// with a query being performed, but it's not yet tested:
497
for i := 0; i != requestCount; i++ {
498
request := &requests[i]
499
if request.replyFunc != nil {
500
request.replyFunc(dead, nil, -1, nil)
506
wasWaiting := len(socket.replyFuncs) > 0
508
// Reserve id 0 for requests which should have no responses.
509
requestId := socket.nextRequestId + 1
513
socket.nextRequestId = requestId + uint32(requestCount)
514
for i := 0; i != requestCount; i++ {
515
request := &requests[i]
516
setInt32(buf, request.bufferPos+4, int32(requestId))
517
socket.replyFuncs[requestId] = request.replyFunc
521
debugf("Socket %p to %s: sending %d op(s) (%d bytes)", socket, socket.addr, len(ops), len(buf))
522
stats.sentOps(len(ops))
524
socket.updateDeadline(writeDeadline)
525
_, err = socket.conn.Write(buf)
526
if !wasWaiting && requestCount > 0 {
527
socket.updateDeadline(readDeadline)
533
func fill(r net.Conn, b []byte) error {
536
for n != l && err == nil {
538
ni, err = r.Read(b[n:])
544
// Estimated minimum cost per socket: 1 goroutine + memory for the largest
545
// document ever seen.
546
func (socket *mongoSocket) readLoop() {
547
p := make([]byte, 36) // 16 from header + 20 from OP_REPLY fixed fields
549
conn := socket.conn // No locking, conn never changes.
553
socket.kill(err, true)
557
totalLen := getInt32(p, 0)
558
responseTo := getInt32(p, 8)
559
opCode := getInt32(p, 12)
561
// Don't use socket.server.Addr here. socket is not
562
// locked and socket.server may go away.
563
debugf("Socket %p to %s: got reply (%d bytes)", socket, socket.addr, totalLen)
568
socket.kill(errors.New("opcode != 1, corrupted data?"), true)
573
flags: uint32(getInt32(p, 16)),
574
cursorId: getInt64(p, 20),
575
firstDoc: getInt32(p, 28),
576
replyDocs: getInt32(p, 32),
579
stats.receivedOps(+1)
580
stats.receivedDocs(int(reply.replyDocs))
583
replyFunc, ok := socket.replyFuncs[uint32(responseTo)]
585
delete(socket.replyFuncs, uint32(responseTo))
589
if replyFunc != nil && reply.replyDocs == 0 {
590
replyFunc(nil, &reply, -1, nil)
592
for i := 0; i != int(reply.replyDocs); i++ {
595
if replyFunc != nil {
596
replyFunc(err, nil, -1, nil)
598
socket.kill(err, true)
602
b := make([]byte, int(getInt32(s, 0)))
604
// copy(b, s) in an efficient way.
610
err = fill(conn, b[4:])
612
if replyFunc != nil {
613
replyFunc(err, nil, -1, nil)
615
socket.kill(err, true)
619
if globalDebug && globalLogger != nil {
621
if err := bson.Unmarshal(b, m); err == nil {
622
debugf("Socket %p to %s: received document: %#v", socket, socket.addr, m)
626
if replyFunc != nil {
627
replyFunc(nil, &reply, i, b)
630
// XXX Do bound checking against totalLen.
635
if len(socket.replyFuncs) == 0 {
636
// Nothing else to read for now. Disable deadline.
637
socket.conn.SetReadDeadline(time.Time{})
639
socket.updateDeadline(readDeadline)
643
// XXX Do bound checking against totalLen.
647
var emptyHeader = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
649
func addHeader(b []byte, opcode int) []byte {
651
b = append(b, emptyHeader...)
652
// Enough for current opcodes.
653
b[i+12] = byte(opcode)
654
b[i+13] = byte(opcode >> 8)
658
func addInt32(b []byte, i int32) []byte {
659
return append(b, byte(i), byte(i>>8), byte(i>>16), byte(i>>24))
662
func addInt64(b []byte, i int64) []byte {
663
return append(b, byte(i), byte(i>>8), byte(i>>16), byte(i>>24),
664
byte(i>>32), byte(i>>40), byte(i>>48), byte(i>>56))
667
func addCString(b []byte, s string) []byte {
668
b = append(b, []byte(s)...)
673
func addBSON(b []byte, doc interface{}) ([]byte, error) {
675
return append(b, 5, 0, 0, 0, 0), nil
677
data, err := bson.Marshal(doc)
681
return append(b, data...), nil
684
func setInt32(b []byte, pos int, i int32) {
686
b[pos+1] = byte(i >> 8)
687
b[pos+2] = byte(i >> 16)
688
b[pos+3] = byte(i >> 24)
691
func getInt32(b []byte, pos int) int32 {
692
return (int32(b[pos+0])) |
693
(int32(b[pos+1]) << 8) |
694
(int32(b[pos+2]) << 16) |
695
(int32(b[pos+3]) << 24)
698
func getInt64(b []byte, pos int) int64 {
699
return (int64(b[pos+0])) |
700
(int64(b[pos+1]) << 8) |
701
(int64(b[pos+2]) << 16) |
702
(int64(b[pos+3]) << 24) |
703
(int64(b[pos+4]) << 32) |
704
(int64(b[pos+5]) << 40) |
705
(int64(b[pos+6]) << 48) |
706
(int64(b[pos+7]) << 56)