~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/gopkg.in/mgo.v2/cluster.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// mgo - MongoDB driver for Go
 
2
//
 
3
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
 
4
//
 
5
// All rights reserved.
 
6
//
 
7
// Redistribution and use in source and binary forms, with or without
 
8
// modification, are permitted provided that the following conditions are met:
 
9
//
 
10
// 1. Redistributions of source code must retain the above copyright notice, this
 
11
//    list of conditions and the following disclaimer.
 
12
// 2. Redistributions in binary form must reproduce the above copyright notice,
 
13
//    this list of conditions and the following disclaimer in the documentation
 
14
//    and/or other materials provided with the distribution.
 
15
//
 
16
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 
17
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 
18
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 
19
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
 
20
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 
21
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
22
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 
23
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
24
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 
25
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
26
 
 
27
package mgo
 
28
 
 
29
import (
 
30
        "errors"
 
31
        "fmt"
 
32
        "net"
 
33
        "strconv"
 
34
        "strings"
 
35
        "sync"
 
36
        "time"
 
37
 
 
38
        "gopkg.in/mgo.v2/bson"
 
39
)
 
40
 
 
41
// ---------------------------------------------------------------------------
 
42
// Mongo cluster encapsulation.
 
43
//
 
44
// A cluster enables the communication with one or more servers participating
 
45
// in a mongo cluster.  This works with individual servers, a replica set,
 
46
// a replica pair, one or multiple mongos routers, etc.
 
47
 
 
48
type mongoCluster struct {
 
49
        sync.RWMutex
 
50
        serverSynced sync.Cond
 
51
        userSeeds    []string
 
52
        dynaSeeds    []string
 
53
        servers      mongoServers
 
54
        masters      mongoServers
 
55
        references   int
 
56
        syncing      bool
 
57
        direct       bool
 
58
        failFast     bool
 
59
        syncCount    uint
 
60
        setName      string
 
61
        cachedIndex  map[string]bool
 
62
        sync         chan bool
 
63
        dial         dialer
 
64
}
 
65
 
 
66
func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string) *mongoCluster {
 
67
        cluster := &mongoCluster{
 
68
                userSeeds:  userSeeds,
 
69
                references: 1,
 
70
                direct:     direct,
 
71
                failFast:   failFast,
 
72
                dial:       dial,
 
73
                setName:    setName,
 
74
        }
 
75
        cluster.serverSynced.L = cluster.RWMutex.RLocker()
 
76
        cluster.sync = make(chan bool, 1)
 
77
        stats.cluster(+1)
 
78
        go cluster.syncServersLoop()
 
79
        return cluster
 
80
}
 
81
 
 
82
// Acquire increases the reference count for the cluster.
 
83
func (cluster *mongoCluster) Acquire() {
 
84
        cluster.Lock()
 
85
        cluster.references++
 
86
        debugf("Cluster %p acquired (refs=%d)", cluster, cluster.references)
 
87
        cluster.Unlock()
 
88
}
 
89
 
 
90
// Release decreases the reference count for the cluster. Once
 
91
// it reaches zero, all servers will be closed.
 
92
func (cluster *mongoCluster) Release() {
 
93
        cluster.Lock()
 
94
        if cluster.references == 0 {
 
95
                panic("cluster.Release() with references == 0")
 
96
        }
 
97
        cluster.references--
 
98
        debugf("Cluster %p released (refs=%d)", cluster, cluster.references)
 
99
        if cluster.references == 0 {
 
100
                for _, server := range cluster.servers.Slice() {
 
101
                        server.Close()
 
102
                }
 
103
                // Wake up the sync loop so it can die.
 
104
                cluster.syncServers()
 
105
                stats.cluster(-1)
 
106
        }
 
107
        cluster.Unlock()
 
108
}
 
109
 
 
110
func (cluster *mongoCluster) LiveServers() (servers []string) {
 
111
        cluster.RLock()
 
112
        for _, serv := range cluster.servers.Slice() {
 
113
                servers = append(servers, serv.Addr)
 
114
        }
 
115
        cluster.RUnlock()
 
116
        return servers
 
117
}
 
118
 
 
119
func (cluster *mongoCluster) removeServer(server *mongoServer) {
 
120
        cluster.Lock()
 
121
        cluster.masters.Remove(server)
 
122
        other := cluster.servers.Remove(server)
 
123
        cluster.Unlock()
 
124
        if other != nil {
 
125
                other.Close()
 
126
                log("Removed server ", server.Addr, " from cluster.")
 
127
        }
 
128
        server.Close()
 
129
}
 
130
 
 
131
type isMasterResult struct {
 
132
        IsMaster       bool
 
133
        Secondary      bool
 
134
        Primary        string
 
135
        Hosts          []string
 
136
        Passives       []string
 
137
        Tags           bson.D
 
138
        Msg            string
 
139
        SetName        string `bson:"setName"`
 
140
        MaxWireVersion int    `bson:"maxWireVersion"`
 
141
}
 
142
 
 
143
func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResult) error {
 
144
        // Monotonic let's it talk to a slave and still hold the socket.
 
145
        session := newSession(Monotonic, cluster, 10*time.Second)
 
146
        session.setSocket(socket)
 
147
        err := session.Run("ismaster", result)
 
148
        session.Close()
 
149
        return err
 
150
}
 
151
 
 
152
type possibleTimeout interface {
 
153
        Timeout() bool
 
154
}
 
155
 
 
156
var syncSocketTimeout = 5 * time.Second
 
157
 
 
158
func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerInfo, hosts []string, err error) {
 
159
        var syncTimeout time.Duration
 
160
        if raceDetector {
 
161
                // This variable is only ever touched by tests.
 
162
                globalMutex.Lock()
 
163
                syncTimeout = syncSocketTimeout
 
164
                globalMutex.Unlock()
 
165
        } else {
 
166
                syncTimeout = syncSocketTimeout
 
167
        }
 
168
 
 
169
        addr := server.Addr
 
170
        log("SYNC Processing ", addr, "...")
 
171
 
 
172
        // Retry a few times to avoid knocking a server down for a hiccup.
 
173
        var result isMasterResult
 
174
        var tryerr error
 
175
        for retry := 0; ; retry++ {
 
176
                if retry == 3 || retry == 1 && cluster.failFast {
 
177
                        return nil, nil, tryerr
 
178
                }
 
179
                if retry > 0 {
 
180
                        // Don't abuse the server needlessly if there's something actually wrong.
 
181
                        if err, ok := tryerr.(possibleTimeout); ok && err.Timeout() {
 
182
                                // Give a chance for waiters to timeout as well.
 
183
                                cluster.serverSynced.Broadcast()
 
184
                        }
 
185
                        time.Sleep(syncShortDelay)
 
186
                }
 
187
 
 
188
                // It's not clear what would be a good timeout here. Is it
 
189
                // better to wait longer or to retry?
 
190
                socket, _, err := server.AcquireSocket(0, syncTimeout)
 
191
                if err != nil {
 
192
                        tryerr = err
 
193
                        logf("SYNC Failed to get socket to %s: %v", addr, err)
 
194
                        continue
 
195
                }
 
196
                err = cluster.isMaster(socket, &result)
 
197
                socket.Release()
 
198
                if err != nil {
 
199
                        tryerr = err
 
200
                        logf("SYNC Command 'ismaster' to %s failed: %v", addr, err)
 
201
                        continue
 
202
                }
 
203
                debugf("SYNC Result of 'ismaster' from %s: %#v", addr, result)
 
204
                break
 
205
        }
 
206
 
 
207
        if cluster.setName != "" && result.SetName != cluster.setName {
 
208
                logf("SYNC Server %s is not a member of replica set %q", addr, cluster.setName)
 
209
                return nil, nil, fmt.Errorf("server %s is not a member of replica set %q", addr, cluster.setName)
 
210
        }
 
211
 
 
212
        if result.IsMaster {
 
213
                debugf("SYNC %s is a master.", addr)
 
214
                if !server.info.Master {
 
215
                        // Made an incorrect assumption above, so fix stats.
 
216
                        stats.conn(-1, false)
 
217
                        stats.conn(+1, true)
 
218
                }
 
219
        } else if result.Secondary {
 
220
                debugf("SYNC %s is a slave.", addr)
 
221
        } else if cluster.direct {
 
222
                logf("SYNC %s in unknown state. Pretending it's a slave due to direct connection.", addr)
 
223
        } else {
 
224
                logf("SYNC %s is neither a master nor a slave.", addr)
 
225
                // Let stats track it as whatever was known before.
 
226
                return nil, nil, errors.New(addr + " is not a master nor slave")
 
227
        }
 
228
 
 
229
        info = &mongoServerInfo{
 
230
                Master:         result.IsMaster,
 
231
                Mongos:         result.Msg == "isdbgrid",
 
232
                Tags:           result.Tags,
 
233
                SetName:        result.SetName,
 
234
                MaxWireVersion: result.MaxWireVersion,
 
235
        }
 
236
 
 
237
        hosts = make([]string, 0, 1+len(result.Hosts)+len(result.Passives))
 
238
        if result.Primary != "" {
 
239
                // First in the list to speed up master discovery.
 
240
                hosts = append(hosts, result.Primary)
 
241
        }
 
242
        hosts = append(hosts, result.Hosts...)
 
243
        hosts = append(hosts, result.Passives...)
 
244
 
 
245
        debugf("SYNC %s knows about the following peers: %#v", addr, hosts)
 
246
        return info, hosts, nil
 
247
}
 
248
 
 
249
type syncKind bool
 
250
 
 
251
const (
 
252
        completeSync syncKind = true
 
253
        partialSync  syncKind = false
 
254
)
 
255
 
 
256
func (cluster *mongoCluster) addServer(server *mongoServer, info *mongoServerInfo, syncKind syncKind) {
 
257
        cluster.Lock()
 
258
        current := cluster.servers.Search(server.ResolvedAddr)
 
259
        if current == nil {
 
260
                if syncKind == partialSync {
 
261
                        cluster.Unlock()
 
262
                        server.Close()
 
263
                        log("SYNC Discarding unknown server ", server.Addr, " due to partial sync.")
 
264
                        return
 
265
                }
 
266
                cluster.servers.Add(server)
 
267
                if info.Master {
 
268
                        cluster.masters.Add(server)
 
269
                        log("SYNC Adding ", server.Addr, " to cluster as a master.")
 
270
                } else {
 
271
                        log("SYNC Adding ", server.Addr, " to cluster as a slave.")
 
272
                }
 
273
        } else {
 
274
                if server != current {
 
275
                        panic("addServer attempting to add duplicated server")
 
276
                }
 
277
                if server.Info().Master != info.Master {
 
278
                        if info.Master {
 
279
                                log("SYNC Server ", server.Addr, " is now a master.")
 
280
                                cluster.masters.Add(server)
 
281
                        } else {
 
282
                                log("SYNC Server ", server.Addr, " is now a slave.")
 
283
                                cluster.masters.Remove(server)
 
284
                        }
 
285
                }
 
286
        }
 
287
        server.SetInfo(info)
 
288
        debugf("SYNC Broadcasting availability of server %s", server.Addr)
 
289
        cluster.serverSynced.Broadcast()
 
290
        cluster.Unlock()
 
291
}
 
292
 
 
293
func (cluster *mongoCluster) getKnownAddrs() []string {
 
294
        cluster.RLock()
 
295
        max := len(cluster.userSeeds) + len(cluster.dynaSeeds) + cluster.servers.Len()
 
296
        seen := make(map[string]bool, max)
 
297
        known := make([]string, 0, max)
 
298
 
 
299
        add := func(addr string) {
 
300
                if _, found := seen[addr]; !found {
 
301
                        seen[addr] = true
 
302
                        known = append(known, addr)
 
303
                }
 
304
        }
 
305
 
 
306
        for _, addr := range cluster.userSeeds {
 
307
                add(addr)
 
308
        }
 
309
        for _, addr := range cluster.dynaSeeds {
 
310
                add(addr)
 
311
        }
 
312
        for _, serv := range cluster.servers.Slice() {
 
313
                add(serv.Addr)
 
314
        }
 
315
        cluster.RUnlock()
 
316
 
 
317
        return known
 
318
}
 
319
 
 
320
// syncServers injects a value into the cluster.sync channel to force
 
321
// an iteration of the syncServersLoop function.
 
322
func (cluster *mongoCluster) syncServers() {
 
323
        select {
 
324
        case cluster.sync <- true:
 
325
        default:
 
326
        }
 
327
}
 
328
 
 
329
// How long to wait for a checkup of the cluster topology if nothing
 
330
// else kicks a synchronization before that.
 
331
const syncServersDelay = 30 * time.Second
 
332
const syncShortDelay = 500 * time.Millisecond
 
333
 
 
334
// syncServersLoop loops while the cluster is alive to keep its idea of
 
335
// the server topology up-to-date. It must be called just once from
 
336
// newCluster.  The loop iterates once syncServersDelay has passed, or
 
337
// if somebody injects a value into the cluster.sync channel to force a
 
338
// synchronization.  A loop iteration will contact all servers in
 
339
// parallel, ask them about known peers and their own role within the
 
340
// cluster, and then attempt to do the same with all the peers
 
341
// retrieved.
 
342
func (cluster *mongoCluster) syncServersLoop() {
 
343
        for {
 
344
                debugf("SYNC Cluster %p is starting a sync loop iteration.", cluster)
 
345
 
 
346
                cluster.Lock()
 
347
                if cluster.references == 0 {
 
348
                        cluster.Unlock()
 
349
                        break
 
350
                }
 
351
                cluster.references++ // Keep alive while syncing.
 
352
                direct := cluster.direct
 
353
                cluster.Unlock()
 
354
 
 
355
                cluster.syncServersIteration(direct)
 
356
 
 
357
                // We just synchronized, so consume any outstanding requests.
 
358
                select {
 
359
                case <-cluster.sync:
 
360
                default:
 
361
                }
 
362
 
 
363
                cluster.Release()
 
364
 
 
365
                // Hold off before allowing another sync. No point in
 
366
                // burning CPU looking for down servers.
 
367
                if !cluster.failFast {
 
368
                        time.Sleep(syncShortDelay)
 
369
                }
 
370
 
 
371
                cluster.Lock()
 
372
                if cluster.references == 0 {
 
373
                        cluster.Unlock()
 
374
                        break
 
375
                }
 
376
                cluster.syncCount++
 
377
                // Poke all waiters so they have a chance to timeout or
 
378
                // restart syncing if they wish to.
 
379
                cluster.serverSynced.Broadcast()
 
380
                // Check if we have to restart immediately either way.
 
381
                restart := !direct && cluster.masters.Empty() || cluster.servers.Empty()
 
382
                cluster.Unlock()
 
383
 
 
384
                if restart {
 
385
                        log("SYNC No masters found. Will synchronize again.")
 
386
                        time.Sleep(syncShortDelay)
 
387
                        continue
 
388
                }
 
389
 
 
390
                debugf("SYNC Cluster %p waiting for next requested or scheduled sync.", cluster)
 
391
 
 
392
                // Hold off until somebody explicitly requests a synchronization
 
393
                // or it's time to check for a cluster topology change again.
 
394
                select {
 
395
                case <-cluster.sync:
 
396
                case <-time.After(syncServersDelay):
 
397
                }
 
398
        }
 
399
        debugf("SYNC Cluster %p is stopping its sync loop.", cluster)
 
400
}
 
401
 
 
402
func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer {
 
403
        cluster.RLock()
 
404
        server := cluster.servers.Search(tcpaddr.String())
 
405
        cluster.RUnlock()
 
406
        if server != nil {
 
407
                return server
 
408
        }
 
409
        return newServer(addr, tcpaddr, cluster.sync, cluster.dial)
 
410
}
 
411
 
 
412
func resolveAddr(addr string) (*net.TCPAddr, error) {
 
413
        // Simple cases that do not need actual resolution. Works with IPv4 and v6.
 
414
        if host, port, err := net.SplitHostPort(addr); err == nil {
 
415
                if port, _ := strconv.Atoi(port); port > 0 {
 
416
                        zone := ""
 
417
                        if i := strings.LastIndex(host, "%"); i >= 0 {
 
418
                                zone = host[i+1:]
 
419
                                host = host[:i]
 
420
                        }
 
421
                        ip := net.ParseIP(host)
 
422
                        if ip != nil {
 
423
                                return &net.TCPAddr{IP: ip, Port: port, Zone: zone}, nil
 
424
                        }
 
425
                }
 
426
        }
 
427
 
 
428
        // Attempt to resolve IPv4 and v6 concurrently.
 
429
        addrChan := make(chan *net.TCPAddr, 2)
 
430
        for _, network := range []string{"udp4", "udp6"} {
 
431
                network := network
 
432
                go func() {
 
433
                        // The unfortunate UDP dialing hack allows having a timeout on address resolution.
 
434
                        conn, err := net.DialTimeout(network, addr, 10*time.Second)
 
435
                        if err != nil {
 
436
                                addrChan <- nil
 
437
                        } else {
 
438
                                addrChan <- (*net.TCPAddr)(conn.RemoteAddr().(*net.UDPAddr))
 
439
                                conn.Close()
 
440
                        }
 
441
                }()
 
442
        }
 
443
 
 
444
        // Wait for the result of IPv4 and v6 resolution. Use IPv4 if available.
 
445
        tcpaddr := <-addrChan
 
446
        if tcpaddr == nil || len(tcpaddr.IP) != 4 {
 
447
                var timeout <-chan time.Time
 
448
                if tcpaddr != nil {
 
449
                        // Don't wait too long if an IPv6 address is known.
 
450
                        timeout = time.After(50 * time.Millisecond)
 
451
                }
 
452
                select {
 
453
                case <-timeout:
 
454
                case tcpaddr2 := <-addrChan:
 
455
                        if tcpaddr == nil || tcpaddr2 != nil {
 
456
                                // It's an IPv4 address or the only known address. Use it.
 
457
                                tcpaddr = tcpaddr2
 
458
                        }
 
459
                }
 
460
        }
 
461
 
 
462
        if tcpaddr == nil {
 
463
                log("SYNC Failed to resolve server address: ", addr)
 
464
                return nil, errors.New("failed to resolve server address: " + addr)
 
465
        }
 
466
        if tcpaddr.String() != addr {
 
467
                debug("SYNC Address ", addr, " resolved as ", tcpaddr.String())
 
468
        }
 
469
        return tcpaddr, nil
 
470
}
 
471
 
 
472
type pendingAdd struct {
 
473
        server *mongoServer
 
474
        info   *mongoServerInfo
 
475
}
 
476
 
 
477
func (cluster *mongoCluster) syncServersIteration(direct bool) {
 
478
        log("SYNC Starting full topology synchronization...")
 
479
 
 
480
        var wg sync.WaitGroup
 
481
        var m sync.Mutex
 
482
        notYetAdded := make(map[string]pendingAdd)
 
483
        addIfFound := make(map[string]bool)
 
484
        seen := make(map[string]bool)
 
485
        syncKind := partialSync
 
486
 
 
487
        var spawnSync func(addr string, byMaster bool)
 
488
        spawnSync = func(addr string, byMaster bool) {
 
489
                wg.Add(1)
 
490
                go func() {
 
491
                        defer wg.Done()
 
492
 
 
493
                        tcpaddr, err := resolveAddr(addr)
 
494
                        if err != nil {
 
495
                                log("SYNC Failed to start sync of ", addr, ": ", err.Error())
 
496
                                return
 
497
                        }
 
498
                        resolvedAddr := tcpaddr.String()
 
499
 
 
500
                        m.Lock()
 
501
                        if byMaster {
 
502
                                if pending, ok := notYetAdded[resolvedAddr]; ok {
 
503
                                        delete(notYetAdded, resolvedAddr)
 
504
                                        m.Unlock()
 
505
                                        cluster.addServer(pending.server, pending.info, completeSync)
 
506
                                        return
 
507
                                }
 
508
                                addIfFound[resolvedAddr] = true
 
509
                        }
 
510
                        if seen[resolvedAddr] {
 
511
                                m.Unlock()
 
512
                                return
 
513
                        }
 
514
                        seen[resolvedAddr] = true
 
515
                        m.Unlock()
 
516
 
 
517
                        server := cluster.server(addr, tcpaddr)
 
518
                        info, hosts, err := cluster.syncServer(server)
 
519
                        if err != nil {
 
520
                                cluster.removeServer(server)
 
521
                                return
 
522
                        }
 
523
 
 
524
                        m.Lock()
 
525
                        add := direct || info.Master || addIfFound[resolvedAddr]
 
526
                        if add {
 
527
                                syncKind = completeSync
 
528
                        } else {
 
529
                                notYetAdded[resolvedAddr] = pendingAdd{server, info}
 
530
                        }
 
531
                        m.Unlock()
 
532
                        if add {
 
533
                                cluster.addServer(server, info, completeSync)
 
534
                        }
 
535
                        if !direct {
 
536
                                for _, addr := range hosts {
 
537
                                        spawnSync(addr, info.Master)
 
538
                                }
 
539
                        }
 
540
                }()
 
541
        }
 
542
 
 
543
        knownAddrs := cluster.getKnownAddrs()
 
544
        for _, addr := range knownAddrs {
 
545
                spawnSync(addr, false)
 
546
        }
 
547
        wg.Wait()
 
548
 
 
549
        if syncKind == completeSync {
 
550
                logf("SYNC Synchronization was complete (got data from primary).")
 
551
                for _, pending := range notYetAdded {
 
552
                        cluster.removeServer(pending.server)
 
553
                }
 
554
        } else {
 
555
                logf("SYNC Synchronization was partial (cannot talk to primary).")
 
556
                for _, pending := range notYetAdded {
 
557
                        cluster.addServer(pending.server, pending.info, partialSync)
 
558
                }
 
559
        }
 
560
 
 
561
        cluster.Lock()
 
562
        mastersLen := cluster.masters.Len()
 
563
        logf("SYNC Synchronization completed: %d master(s) and %d slave(s) alive.", mastersLen, cluster.servers.Len()-mastersLen)
 
564
 
 
565
        // Update dynamic seeds, but only if we have any good servers. Otherwise,
 
566
        // leave them alone for better chances of a successful sync in the future.
 
567
        if syncKind == completeSync {
 
568
                dynaSeeds := make([]string, cluster.servers.Len())
 
569
                for i, server := range cluster.servers.Slice() {
 
570
                        dynaSeeds[i] = server.Addr
 
571
                }
 
572
                cluster.dynaSeeds = dynaSeeds
 
573
                debugf("SYNC New dynamic seeds: %#v\n", dynaSeeds)
 
574
        }
 
575
        cluster.Unlock()
 
576
}
 
577
 
 
578
// AcquireSocket returns a socket to a server in the cluster.  If slaveOk is
 
579
// true, it will attempt to return a socket to a slave server.  If it is
 
580
// false, the socket will necessarily be to a master server.
 
581
func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) {
 
582
        var started time.Time
 
583
        var syncCount uint
 
584
        warnedLimit := false
 
585
        for {
 
586
                cluster.RLock()
 
587
                for {
 
588
                        mastersLen := cluster.masters.Len()
 
589
                        slavesLen := cluster.servers.Len() - mastersLen
 
590
                        debugf("Cluster has %d known masters and %d known slaves.", mastersLen, slavesLen)
 
591
                        if !(slaveOk && mode == Secondary) && mastersLen > 0 || slaveOk && slavesLen > 0 {
 
592
                                break
 
593
                        }
 
594
                        if started.IsZero() {
 
595
                                // Initialize after fast path above.
 
596
                                started = time.Now()
 
597
                                syncCount = cluster.syncCount
 
598
                        } else if syncTimeout != 0 && started.Before(time.Now().Add(-syncTimeout)) || cluster.failFast && cluster.syncCount != syncCount {
 
599
                                cluster.RUnlock()
 
600
                                return nil, errors.New("no reachable servers")
 
601
                        }
 
602
                        log("Waiting for servers to synchronize...")
 
603
                        cluster.syncServers()
 
604
 
 
605
                        // Remember: this will release and reacquire the lock.
 
606
                        cluster.serverSynced.Wait()
 
607
                }
 
608
 
 
609
                var server *mongoServer
 
610
                if slaveOk {
 
611
                        server = cluster.servers.BestFit(mode, serverTags)
 
612
                } else {
 
613
                        server = cluster.masters.BestFit(mode, nil)
 
614
                }
 
615
                cluster.RUnlock()
 
616
 
 
617
                if server == nil {
 
618
                        // Must have failed the requested tags. Sleep to avoid spinning.
 
619
                        time.Sleep(1e8)
 
620
                        continue
 
621
                }
 
622
 
 
623
                s, abended, err := server.AcquireSocket(poolLimit, socketTimeout)
 
624
                if err == errPoolLimit {
 
625
                        if !warnedLimit {
 
626
                                warnedLimit = true
 
627
                                log("WARNING: Per-server connection limit reached.")
 
628
                        }
 
629
                        time.Sleep(100 * time.Millisecond)
 
630
                        continue
 
631
                }
 
632
                if err != nil {
 
633
                        cluster.removeServer(server)
 
634
                        cluster.syncServers()
 
635
                        continue
 
636
                }
 
637
                if abended && !slaveOk {
 
638
                        var result isMasterResult
 
639
                        err := cluster.isMaster(s, &result)
 
640
                        if err != nil || !result.IsMaster {
 
641
                                logf("Cannot confirm server %s as master (%v)", server.Addr, err)
 
642
                                s.Release()
 
643
                                cluster.syncServers()
 
644
                                time.Sleep(100 * time.Millisecond)
 
645
                                continue
 
646
                        }
 
647
                }
 
648
                return s, nil
 
649
        }
 
650
        panic("unreached")
 
651
}
 
652
 
 
653
func (cluster *mongoCluster) CacheIndex(cacheKey string, exists bool) {
 
654
        cluster.Lock()
 
655
        if cluster.cachedIndex == nil {
 
656
                cluster.cachedIndex = make(map[string]bool)
 
657
        }
 
658
        if exists {
 
659
                cluster.cachedIndex[cacheKey] = true
 
660
        } else {
 
661
                delete(cluster.cachedIndex, cacheKey)
 
662
        }
 
663
        cluster.Unlock()
 
664
}
 
665
 
 
666
func (cluster *mongoCluster) HasCachedIndex(cacheKey string) (result bool) {
 
667
        cluster.RLock()
 
668
        if cluster.cachedIndex != nil {
 
669
                result = cluster.cachedIndex[cacheKey]
 
670
        }
 
671
        cluster.RUnlock()
 
672
        return
 
673
}
 
674
 
 
675
func (cluster *mongoCluster) ResetIndexCache() {
 
676
        cluster.Lock()
 
677
        cluster.cachedIndex = make(map[string]bool)
 
678
        cluster.Unlock()
 
679
}