1
var ReadPreference = require('../read_preference').ReadPreference
2
, DbCommand = require('../../commands/db_command').DbCommand
3
, inherits = require('util').inherits
4
, format = require('util').format
5
, timers = require('timers')
6
, Server = require('../server').Server
7
, utils = require('../../utils')
8
, PingStrategy = require('./strategies/ping_strategy').PingStrategy
9
, StatisticsStrategy = require('./strategies/statistics_strategy').StatisticsStrategy
10
, Options = require('./options').Options
11
, ReplSetState = require('./repl_set_state').ReplSetState
12
, HighAvailabilityProcess = require('./ha').HighAvailabilityProcess
13
, Base = require('../base').Base;
15
var STATE_STARTING_PHASE_1 = 0;
16
var STATE_PRIMARY = 1;
17
var STATE_SECONDARY = 2;
18
var STATE_RECOVERING = 3;
19
var STATE_FATAL_ERROR = 4;
20
var STATE_STARTING_PHASE_2 = 5;
21
var STATE_UNKNOWN = 6;
22
var STATE_ARBITER = 7;
24
var STATE_ROLLBACK = 9;
26
// Set processor, setImmediate if 0.10 otherwise nextTick
27
var processor = require('../../utils').processor();
30
* ReplSet constructor provides replicaset functionality
33
* - **ha** {Boolean, default:true}, turn on high availability.
34
* - **haInterval** {Number, default:2000}, time between each replicaset status check.
35
* - **reconnectWait** {Number, default:1000}, time to wait in miliseconds before attempting reconnect.
36
* - **retries** {Number, default:30}, number of times to attempt a replicaset reconnect.
37
* - **rs_name** {String}, the name of the replicaset to connect to.
38
* - **socketOptions** {Object, default:null}, an object containing socket options to use (noDelay:(boolean), keepAlive:(number), connectTimeoutMS:(number), socketTimeoutMS:(number))
39
* - **strategy** {String, default:'ping'}, selection strategy for reads choose between (ping, statistical and none, default is ping)
40
* - **secondaryAcceptableLatencyMS** {Number, default:15}, sets the range of servers to pick when using NEAREST (lowest ping ms + the latency fence, ex: range of 1 to (1 + 15) ms)
41
* - **connectWithNoPrimary** {Boolean, default:false}, sets if the driver should connect even if no primary is available
42
* - **connectArbiter** {Boolean, default:false}, sets if the driver should connect to arbiters or not.
43
* - **logger** {Object, default:null}, an object representing a logger that you want to use, needs to support functions debug, log, error **({error:function(message, object) {}, log:function(message, object) {}, debug:function(message, object) {}})**.
44
* - **poolSize** {Number, default:5}, number of connections in the connection pool for each server instance, set to 5 as default for legacy reasons.
45
* - **ssl** {Boolean, default:false}, use ssl connection (needs to have a mongod server with ssl support)
46
* - **sslValidate** {Boolean, default:false}, validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher)
47
* - **sslCA** {Array, default:null}, Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher)
48
* - **sslCert** {Buffer/String, default:null}, String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
49
* - **sslKey** {Buffer/String, default:null}, String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
50
* - **sslPass** {Buffer/String, default:null}, String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher)
53
Replicaset Configuration
54
* @param {Array} list of server objects participating in the replicaset.
55
* @param {Object} [options] additional options for the replicaset connection.
57
var ReplSet = exports.ReplSet = function(servers, options) {
59
if(!(this instanceof ReplSet))
60
return new ReplSet(servers, options);
62
// Set up event emitter
65
// Ensure we have a list of servers
66
if(!Array.isArray(servers)) throw Error("The parameter must be an array of servers and contain at least one server");
68
for(var i = 0; i < servers.length; i++) {
69
if(!(servers[i] instanceof Server)) throw new Error("list of servers must be of type Server");
73
this.options = new Options(options);
74
// Ensure basic validation of options
78
this._serverState = ReplSet.REPLSET_DISCONNECTED;
79
// Add high availability process
80
this._haProcess = new HighAvailabilityProcess(this, this.options);
82
// Let's iterate over all the provided server objects and decorate them
83
this.servers = this.options.decorateAndClean(servers, this._callBackStore);
84
// Throw error if no seed servers
85
if(this.servers.length == 0) throw new Error("No valid seed servers in the array");
87
// Let's set up our strategy object for picking secondaries
88
if(this.options.strategy == 'ping') {
89
// Create a new instance
90
this.strategyInstance = new PingStrategy(this, this.options.secondaryAcceptableLatencyMS);
91
} else if(this.options.strategy == 'statistical') {
92
// Set strategy as statistical
93
this.strategyInstance = new StatisticsStrategy(this);
94
// Add enable query information
95
this.enableRecordQueryStats(true);
98
this.emitOpen = this.options.emitOpen || true;
99
// Set up a clean state
100
this._state = new ReplSetState(this);
101
// Current round robin selected server
102
this._currentServerChoice = 0;
103
// Ensure up the server callbacks
104
for(var i = 0; i < this.servers.length; i++) {
105
this.servers[i]._callBackStore = this._callBackStore;
106
this.servers[i].name = format("%s:%s", this.servers[i].host, this.servers[i].port)
107
this.servers[i].replicasetInstance = this;
108
this.servers[i].options.auto_reconnect = false;
109
this.servers[i].inheritReplSetOptionsFrom(this);
112
// Allow setting the socketTimeoutMS on all connections
113
// to work around issues such as secondaries blocking due to compaction
114
utils.setSocketTimeoutProperty(this, this.options.socketOptions);
120
inherits(ReplSet, Base);
123
ReplSet.REPLSET_CONNECTING = 'connecting';
124
ReplSet.REPLSET_DISCONNECTED = 'disconnected';
125
ReplSet.REPLSET_CONNECTED = 'connected';
126
ReplSet.REPLSET_RECONNECTING = 'reconnecting';
127
ReplSet.REPLSET_DESTROYED = 'destroyed';
128
ReplSet.REPLSET_READ_ONLY = 'readonly';
130
ReplSet.prototype.isAutoReconnect = function() {
134
ReplSet.prototype.canWrite = function() {
135
return this._state.master && this._state.master.isConnected();
138
ReplSet.prototype.canRead = function(read) {
139
if((read == ReadPreference.PRIMARY
140
|| (typeof read == 'object' && read.mode == ReadPreference.PRIMARY)
141
|| read == null || read == false) && (this._state.master == null || !this._state.master.isConnected())) return false;
142
return Object.keys(this._state.secondaries).length > 0;
148
ReplSet.prototype.enableRecordQueryStats = function(enable) {
149
// Set the global enable record query stats
150
this.recordQueryStats = enable;
152
// Enable all the servers
153
for(var i = 0; i < this.servers.length; i++) {
154
this.servers[i].enableRecordQueryStats(enable);
161
ReplSet.prototype.setSocketOptions = function(options) {
162
var servers = this.allServerInstances();
164
if(typeof options.socketTimeoutMS == 'number') {
165
this.options.socketOptions.socketTimeoutMS = options.socketTimeoutMS;
168
if(typeof options.connectTimeoutMS == 'number')
169
this.options.socketOptions.connectTimeoutMS = options.connectTimeoutMS;
171
for(var i = 0; i < servers.length; i++) {
172
servers[i].setSocketOptions(options);
179
ReplSet.prototype.setReadPreference = function(preference) {
180
this.options.readPreference = preference;
183
ReplSet.prototype.connect = function(parent, options, callback) {
184
if(this._serverState != ReplSet.REPLSET_DISCONNECTED)
185
return callback(new Error("in process of connection"));
187
// If no callback throw
188
if(!(typeof callback == 'function'))
189
throw new Error("cannot call ReplSet.prototype.connect with no callback function");
193
this.options.db = parent;
194
// Set replicaset as connecting
195
this._serverState = ReplSet.REPLSET_CONNECTING
196
// Copy all the servers to our list of seeds
197
var candidateServers = this.servers.slice(0);
198
// Pop the first server
199
var server = candidateServers.pop();
200
server.name = format("%s:%s", server.host, server.port);
201
// Set up the options
203
returnIsMasterResults: true,
204
eventReceiver: server
207
// Register some event listeners
208
this.once("fullsetup", function(err, db, replset) {
209
// Set state to connected
210
self._serverState = ReplSet.REPLSET_CONNECTED;
211
// Stop any process running
212
if(self._haProcess) self._haProcess.stop();
213
// Start the HA process
214
self._haProcess.start();
217
processor(function() {
219
self._emitAcrossAllDbInstances(self, null, "open", null, null, null);
221
self._emitAcrossAllDbInstances(self, null, "fullsetup", null, null, null);
224
// If we have a strategy defined start it
225
if(self.strategyInstance) {
226
self.strategyInstance.start();
229
// Finishing up the call
230
callback(err, db, replset);
234
this.once("connectionError", function(err, result) {
235
callback(err, result);
238
// Attempt to connect to the server
239
server.connect(this.options.db, opts, _connectHandler(this, candidateServers, server));
242
ReplSet.prototype.close = function(callback) {
245
this._serverState = ReplSet.REPLSET_DESTROYED;
247
this._haProcess.stop();
249
// If we have a strategy stop it
250
if(this.strategyInstance) {
251
this.strategyInstance.stop();
254
// Kill all servers available
255
for(var name in this._state.addresses) {
256
this._state.addresses[name].close();
259
// Clean out the state
260
this._state = new ReplSetState(this);
263
processor(function() {
264
self._emitAcrossAllDbInstances(self, null, "close", null, null, true)
267
// Flush out any remaining call handlers
268
self._flushAllCallHandlers(utils.toError("Connection Closed By Application"));
271
if(typeof callback == 'function')
272
return callback(null, null);
276
* Creates a new server for the `replset` based on `host`.
278
* @param {String} host - host:port pair (localhost:27017)
279
* @param {ReplSet} replset - the ReplSet instance
283
var createServer = function(self, host, options) {
284
// copy existing socket options to new server
285
var socketOptions = {}
286
if(options.socketOptions) {
287
var keys = Object.keys(options.socketOptions);
288
for(var k = 0; k < keys.length; k++) {
289
socketOptions[keys[k]] = options.socketOptions[keys[k]];
293
var parts = host.split(/:/);
294
if(1 === parts.length) {
295
parts[1] = Connection.DEFAULT_PORT;
298
socketOptions.host = parts[0];
299
socketOptions.port = parseInt(parts[1], 10);
301
var serverOptions = {
302
readPreference: options.readPreference,
303
socketOptions: socketOptions,
304
poolSize: options.poolSize,
305
logger: options.logger,
306
auto_reconnect: false,
308
sslValidate: options.sslValidate,
309
sslCA: options.sslCA,
310
sslCert: options.sslCert,
311
sslKey: options.sslKey,
312
sslPass: options.sslPass
315
var server = new Server(socketOptions.host, socketOptions.port, serverOptions);
316
// Set up shared state
317
server._callBackStore = self._callBackStore;
318
server.replicasetInstance = self;
319
server.enableRecordQueryStats(self.recordQueryStats);
320
// Set up event handlers
321
server.on("close", _handler("close", self, server));
322
server.on("error", _handler("error", self, server));
323
server.on("timeout", _handler("timeout", self, server));
327
var _handler = function(event, self, server) {
328
return function(err, doc) {
329
// The event happened to a primary
330
// Remove it from play
331
if(self._state.isPrimary(server)) {
332
// Emit that the primary left the replicaset
333
self.emit('left', 'primary', server);
334
// Get the current master
335
var current_master = self._state.master;
336
self._state.master = null;
337
self._serverState = ReplSet.REPLSET_READ_ONLY;
339
if(current_master != null) {
341
var host = current_master.socketOptions.host;
342
var port = current_master.socketOptions.port;
344
// Fire error on any unknown callbacks
345
self.__executeAllServerSpecificErrorCallbacks(host, port, err);
347
} else if(self._state.isSecondary(server)) {
348
// Emit that a secondary left the replicaset
349
self.emit('left', 'secondary', server);
350
// Delete from the list
351
delete self._state.secondaries[server.name];
354
// If there is no more connections left and the setting is not destroyed
355
// set to disconnected
356
if(Object.keys(self._state.addresses).length == 0
357
&& self._serverState != ReplSet.REPLSET_DESTROYED) {
358
self._serverState = ReplSet.REPLSET_DISCONNECTED;
360
// Emit close across all the attached db instances
361
self._dbStore.emit("close", new Error("replicaset disconnected, no valid servers contactable over tcp"), null, true);
365
var host = server.socketOptions.host;
366
var port = server.socketOptions.port;
368
// Fire error on any unknown callbacks
369
self.__executeAllServerSpecificErrorCallbacks(host, port, err);
373
var locateNewServers = function(self, state, candidateServers, ismaster) {
375
var hosts = ismaster.hosts;
376
// In candidate servers
377
var inCandidateServers = function(name, candidateServers) {
378
for(var i = 0; i < candidateServers.length; i++) {
379
if(candidateServers[i].name == name) return true;
387
if(Array.isArray(hosts)) {
388
// Let's go over all the hosts
389
for(var i = 0; i < hosts.length; i++) {
390
if(!state.contains(hosts[i])
391
&& !inCandidateServers(hosts[i], candidateServers)) {
392
newServers.push(createServer(self, hosts[i], self.options));
397
// Return list of possible new servers
401
var _connectHandler = function(self, candidateServers, instanceServer) {
402
return function(err, doc) {
403
// If we have an error add to the list
405
self._state.errors[instanceServer.name] = instanceServer;
407
delete self._state.errors[instanceServer.name];
411
var ismaster = doc.documents[0]
413
// Error the server if
414
if(!ismaster.ismaster
415
&& !ismaster.secondary) {
416
self._state.errors[instanceServer.name] = instanceServer;
421
// No error let's analyse the ismaster command
422
if(!err && self._state.errors[instanceServer.name] == null) {
423
var ismaster = doc.documents[0]
425
// If no replicaset name exists set the current one
426
if(self.options.rs_name == null) {
427
self.options.rs_name = ismaster.setName;
430
// If we have a member that is not part of the set let's finish up
431
if(typeof ismaster.setName == 'string' && ismaster.setName != self.options.rs_name) {
432
return self.emit("connectionError", new Error("Replicaset name " + ismaster.setName + " does not match specified name " + self.options.rs_name));
435
// Add the error handlers
436
instanceServer.on("close", _handler("close", self, instanceServer));
437
instanceServer.on("error", _handler("error", self, instanceServer));
438
instanceServer.on("timeout", _handler("timeout", self, instanceServer));
440
// Set any tags on the instance server
441
instanceServer.name = ismaster.me;
442
instanceServer.tags = ismaster.tags;
444
// Add the server to the list
445
self._state.addServer(instanceServer, ismaster);
447
// Check if we have more servers to add (only check when done with initial set)
448
if(candidateServers.length == 0) {
449
// Get additional new servers that are not currently in set
450
var new_servers = locateNewServers(self, self._state, candidateServers, ismaster);
452
// Locate any new servers that have not errored out yet
453
for(var i = 0; i < new_servers.length; i++) {
454
if(self._state.errors[new_servers[i].name] == null) {
455
candidateServers.push(new_servers[i])
461
// If the candidate server list is empty and no valid servers
462
if(candidateServers.length == 0 &&
463
!self._state.hasValidServers()) {
464
return self.emit("connectionError", new Error("No valid replicaset instance servers found"));
465
} else if(candidateServers.length == 0) {
466
if(!self.options.connectWithNoPrimary && (self._state.master == null || !self._state.master.isConnected())) {
467
return self.emit("connectionError", new Error("No primary found in set"));
469
return self.emit("fullsetup", null, self.options.db, self);
472
// Let's connect the next server
473
var nextServer = candidateServers.pop();
475
// Set up the options
477
returnIsMasterResults: true,
478
eventReceiver: nextServer
481
// Attempt to connect to the server
482
nextServer.connect(self.options.db, opts, _connectHandler(self, candidateServers, nextServer));
486
ReplSet.prototype.isDestroyed = function() {
487
return this._serverState == ReplSet.REPLSET_DESTROYED;
490
ReplSet.prototype.isConnected = function(read) {
491
var isConnected = false;
493
if(read == null || read == ReadPreference.PRIMARY || read == false)
494
isConnected = this._state.master != null && this._state.master.isConnected();
496
if((read == ReadPreference.PRIMARY_PREFERRED || read == ReadPreference.SECONDARY_PREFERRED || read == ReadPreference.NEAREST)
497
&& ((this._state.master != null && this._state.master.isConnected())
498
|| (this._state && this._state.secondaries && Object.keys(this._state.secondaries).length > 0))) {
500
} else if(read == ReadPreference.SECONDARY) {
501
isConnected = this._state && this._state.secondaries && Object.keys(this._state.secondaries).length > 0;
504
// No valid connection return false
508
ReplSet.prototype.isMongos = function() {
512
ReplSet.prototype.checkoutWriter = function() {
513
if(this._state.master) return this._state.master.checkoutWriter();
514
return new Error("no writer connection available");
517
ReplSet.prototype.processIsMaster = function(_server, _ismaster) {
518
// Server in recovery mode, remove it from available servers
519
if(!_ismaster.ismaster && !_ismaster.secondary) {
520
// Locate the actual server
521
var server = this._state.addresses[_server.name];
522
// Close the server, simulating the closing of the connection
523
// to get right removal semantics
524
if(server) server.close();
525
// Execute any callback errors
526
_handler(null, this, server)(new Error("server is in recovery mode"));
530
ReplSet.prototype.allRawConnections = function() {
531
var connections = [];
533
for(var name in this._state.addresses) {
534
connections = connections.concat(this._state.addresses[name].allRawConnections());
543
ReplSet.prototype.allServerInstances = function() {
545
// If no state yet return empty
546
if(!self._state) return [];
547
// Close all the servers (concatenate entire list of servers first for ease)
548
var allServers = self._state.master != null ? [self._state.master] : [];
551
var keys = Object.keys(self._state.secondaries);
552
// Add all secondaries
553
for(var i = 0; i < keys.length; i++) {
554
allServers.push(self._state.secondaries[keys[i]]);
557
// Return complete list of all servers
564
ReplSet.prototype.checkoutReader = function(readPreference, tags) {
565
var connection = null;
567
// If we have a read preference object unpack it
568
if(typeof readPreference == 'object' && readPreference['_type'] == 'ReadPreference') {
569
// Validate if the object is using a valid mode
570
if(!readPreference.isValid()) throw new Error("Illegal readPreference mode specified, " + JSON.stringify(readPreference.mode));
572
tags = readPreference.tags;
573
readPreference = readPreference.mode;
574
} else if(typeof readPreference == 'object' && readPreference['_type'] != 'ReadPreference') {
575
return new Error("read preferences must be either a string or an instance of ReadPreference");
578
// Set up our read Preference, allowing us to override the readPreference
579
var finalReadPreference = readPreference != null ? readPreference : this.options.readPreference;
581
// Ensure we unpack a reference
582
if(finalReadPreference != null && typeof finalReadPreference == 'object' && finalReadPreference['_type'] == 'ReadPreference') {
583
// Validate if the object is using a valid mode
584
if(!finalReadPreference.isValid()) throw new Error("Illegal readPreference mode specified, " + JSON.stringify(finalReadPreference.mode));
586
tags = finalReadPreference.tags;
587
readPreference = finalReadPreference.mode;
590
// Finalize the read preference setup
591
finalReadPreference = finalReadPreference == true ? ReadPreference.SECONDARY_PREFERRED : finalReadPreference;
592
finalReadPreference = finalReadPreference == null ? ReadPreference.PRIMARY : finalReadPreference;
594
// If we are reading from a primary
595
if(finalReadPreference == 'primary') {
596
// If we provide a tags set send an error
597
if(typeof tags == 'object' && tags != null) {
598
return new Error("PRIMARY cannot be combined with tags");
601
// If we provide a tags set send an error
602
if(this._state.master == null) {
603
return new Error("No replica set primary available for query with ReadPreference PRIMARY");
607
return this.checkoutWriter();
610
// If we have specified to read from a secondary server grab a random one and read
611
// from it, otherwise just pass the primary connection
612
if((this.options.readSecondary || finalReadPreference == ReadPreference.SECONDARY_PREFERRED || finalReadPreference == ReadPreference.SECONDARY) && Object.keys(this._state.secondaries).length > 0) {
613
// If we have tags, look for servers matching the specific tag
614
if(this.strategyInstance != null) {
615
// Only pick from secondaries
616
var _secondaries = [];
617
for(var key in this._state.secondaries) {
618
_secondaries.push(this._state.secondaries[key]);
621
if(finalReadPreference == ReadPreference.SECONDARY) {
622
// Check out the nearest from only the secondaries
623
connection = this.strategyInstance.checkoutConnection(tags, _secondaries);
625
connection = this.strategyInstance.checkoutConnection(tags, _secondaries);
626
// No candidate servers that match the tags, error
627
if(connection == null || connection instanceof Error) {
628
// No secondary server avilable, attemp to checkout a primary server
629
connection = this.checkoutWriter();
630
// If no connection return an error
631
if(connection == null || connection instanceof Error) {
632
return new Error("No replica set members available for query");
636
} else if(tags != null && typeof tags == 'object') {
638
connection = _pickFromTags(this, tags);// = function(self, readPreference, tags) {
639
// No candidate servers that match the tags, error
640
if(connection == null) {
641
return new Error("No replica set members available for query");
644
connection = _roundRobin(this, tags);
646
} else if(finalReadPreference == ReadPreference.PRIMARY_PREFERRED) {
647
// Check if there is a primary available and return that if possible
648
connection = this.checkoutWriter();
649
// If no connection available checkout a secondary
650
if(connection == null || connection instanceof Error) {
651
// If we have tags, look for servers matching the specific tag
652
if(tags != null && typeof tags == 'object') {
654
connection = _pickFromTags(this, tags);// = function(self, readPreference, tags) {
655
// No candidate servers that match the tags, error
656
if(connection == null) {
657
return new Error("No replica set members available for query");
660
connection = _roundRobin(this, tags);
663
} else if(finalReadPreference == ReadPreference.SECONDARY_PREFERRED) {
664
// If we have tags, look for servers matching the specific tag
665
if(this.strategyInstance != null) {
666
connection = this.strategyInstance.checkoutConnection(tags);
668
// No candidate servers that match the tags, error
669
if(connection == null || connection instanceof Error) {
670
// No secondary server avilable, attemp to checkout a primary server
671
connection = this.checkoutWriter();
672
// If no connection return an error
673
if(connection == null || connection instanceof Error) {
674
var preferenceName = finalReadPreference == ReadPreference.SECONDARY ? 'secondary' : finalReadPreference;
675
return new Error("No replica set member available for query with ReadPreference " + preferenceName + " and tags " + JSON.stringify(tags));
678
} else if(tags != null && typeof tags == 'object') {
680
connection = _pickFromTags(this, tags);// = function(self, readPreference, tags) {
681
// No candidate servers that match the tags, error
682
if(connection == null) {
683
// No secondary server avilable, attemp to checkout a primary server
684
connection = this.checkoutWriter();
685
// If no connection return an error
686
if(connection == null || connection instanceof Error) {
687
var preferenceName = finalReadPreference == ReadPreference.SECONDARY ? 'secondary' : finalReadPreference;
688
return new Error("No replica set member available for query with ReadPreference " + preferenceName + " and tags " + JSON.stringify(tags));
692
} else if(finalReadPreference == ReadPreference.NEAREST && this.strategyInstance != null) {
693
connection = this.strategyInstance.checkoutConnection(tags);
694
} else if(finalReadPreference == ReadPreference.NEAREST && this.strategyInstance == null) {
695
return new Error("A strategy for calculating nearness must be enabled such as ping or statistical");
696
} else if(finalReadPreference == ReadPreference.SECONDARY && Object.keys(this._state.secondaries).length == 0) {
697
if(tags != null && typeof tags == 'object') {
698
var preferenceName = finalReadPreference == ReadPreference.SECONDARY ? 'secondary' : finalReadPreference;
699
return new Error("No replica set member available for query with ReadPreference " + preferenceName + " and tags " + JSON.stringify(tags));
701
return new Error("No replica set secondary available for query with ReadPreference SECONDARY");
704
connection = this.checkoutWriter();
707
// Return the connection
714
var _pickFromTags = function(self, tags) {
715
// If we have an array or single tag selection
716
var tagObjects = Array.isArray(tags) ? tags : [tags];
717
// Iterate over all tags until we find a candidate server
718
for(var _i = 0; _i < tagObjects.length; _i++) {
720
var tagObject = tagObjects[_i];
722
var matchingKeys = Object.keys(tagObject);
723
// Match all the servers that match the provdided tags
724
var keys = Object.keys(self._state.secondaries);
725
var candidateServers = [];
727
for(var i = 0; i < keys.length; i++) {
728
var server = self._state.secondaries[keys[i]];
729
// If we have tags match
730
if(server.tags != null) {
732
// Ensure we have all the values
733
for(var j = 0; j < matchingKeys.length; j++) {
734
if(server.tags[matchingKeys[j]] != tagObject[matchingKeys[j]]) {
740
// If we have a match add it to the list of matching servers
742
candidateServers.push(server);
747
// If we have a candidate server return
748
if(candidateServers.length > 0) {
749
if(self.strategyInstance) return self.strategyInstance.checkoutConnection(tags, candidateServers);
750
// Set instance to return
751
return candidateServers[Math.floor(Math.random() * candidateServers.length)].checkoutReader();
755
// No connection found
760
* Pick a secondary using round robin
764
function _roundRobin (replset, tags) {
765
var keys = Object.keys(replset._state.secondaries);
767
replset._currentServerChoice = replset._currentServerChoice + 1;
769
var key = keys[replset._currentServerChoice % keys.length];
771
var conn = null != replset._state.secondaries[key]
772
? replset._state.secondaries[key].checkoutReader()
775
// If connection is null fallback to first available secondary
777
conn = pickFirstConnectedSecondary(replset, tags);
786
var pickFirstConnectedSecondary = function pickFirstConnectedSecondary(self, tags) {
787
var keys = Object.keys(self._state.secondaries);
790
// Find first available reader if any
791
for(var i = 0; i < keys.length; i++) {
792
connection = self._state.secondaries[keys[i]].checkoutReader();
793
if(connection) return connection;
796
// If we still have a null, read from primary if it's not secondary only
797
if(self._readPreference == ReadPreference.SECONDARY_PREFERRED) {
798
connection = self._state.master.checkoutReader();
799
if(connection) return connection;
802
var preferenceName = self._readPreference == ReadPreference.SECONDARY_PREFERRED
804
: self._readPreference;
806
return new Error("No replica set member available for query with ReadPreference "
807
+ preferenceName + " and tags " + JSON.stringify(tags));
811
* Get list of secondaries
814
Object.defineProperty(ReplSet.prototype, "secondaries", {enumerable: true
816
return utils.objectToArray(this._state.secondaries);
821
* Get list of secondaries
824
Object.defineProperty(ReplSet.prototype, "arbiters", {enumerable: true
826
return utils.objectToArray(this._state.arbiters);