~ps-jenkins/ubuntu-push/ubuntu-vivid-proposed

« back to all changes in this revision

Viewing changes to docs/example-server/node_modules/mongodb/lib/mongodb/connection/repl_set/repl_set.js

  • Committer: Roberto Alsina
  • Date: 2014-09-05 14:57:17 UTC
  • mto: (91.179.25 automatic)
  • mto: This revision was merged to the branch mainline in revision 129.
  • Revision ID: roberto.alsina@canonical.com-20140905145717-0ufcsv27w25i1nnu
added example app server

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
var ReadPreference = require('../read_preference').ReadPreference
 
2
  , DbCommand = require('../../commands/db_command').DbCommand
 
3
  , inherits = require('util').inherits
 
4
  , format = require('util').format
 
5
  , timers = require('timers')
 
6
  , Server = require('../server').Server
 
7
  , utils = require('../../utils')
 
8
  , PingStrategy = require('./strategies/ping_strategy').PingStrategy
 
9
  , StatisticsStrategy = require('./strategies/statistics_strategy').StatisticsStrategy
 
10
  , Options = require('./options').Options
 
11
  , ReplSetState = require('./repl_set_state').ReplSetState
 
12
  , HighAvailabilityProcess = require('./ha').HighAvailabilityProcess
 
13
  , Base = require('../base').Base;
 
14
 
 
15
var STATE_STARTING_PHASE_1 = 0;
 
16
var STATE_PRIMARY = 1;
 
17
var STATE_SECONDARY = 2;
 
18
var STATE_RECOVERING = 3;
 
19
var STATE_FATAL_ERROR = 4;
 
20
var STATE_STARTING_PHASE_2 = 5;
 
21
var STATE_UNKNOWN = 6;
 
22
var STATE_ARBITER = 7;
 
23
var STATE_DOWN = 8;
 
24
var STATE_ROLLBACK = 9;
 
25
 
 
26
// Set processor, setImmediate if 0.10 otherwise nextTick
 
27
var processor = require('../../utils').processor();
 
28
 
 
29
/**
 
30
 * ReplSet constructor provides replicaset functionality
 
31
 *
 
32
 * Options
 
33
 *  - **ha** {Boolean, default:true}, turn on high availability.
 
34
 *  - **haInterval** {Number, default:2000}, time between each replicaset status check.
 
35
 *  - **reconnectWait** {Number, default:1000}, time to wait in miliseconds before attempting reconnect.
 
36
 *  - **retries** {Number, default:30}, number of times to attempt a replicaset reconnect.
 
37
 *  - **rs_name** {String}, the name of the replicaset to connect to.
 
38
 *  - **socketOptions** {Object, default:null}, an object containing socket options to use (noDelay:(boolean), keepAlive:(number), connectTimeoutMS:(number), socketTimeoutMS:(number))
 
39
 *  - **strategy** {String, default:'ping'}, selection strategy for reads choose between (ping, statistical and none, default is ping)
 
40
 *  - **secondaryAcceptableLatencyMS** {Number, default:15}, sets the range of servers to pick when using NEAREST (lowest ping ms + the latency fence, ex: range of 1 to (1 + 15) ms)
 
41
 *  - **connectWithNoPrimary** {Boolean, default:false}, sets if the driver should connect even if no primary is available
 
42
 *  - **connectArbiter** {Boolean, default:false}, sets if the driver should connect to arbiters or not.
 
43
 *  - **logger** {Object, default:null}, an object representing a logger that you want to use, needs to support functions debug, log, error **({error:function(message, object) {}, log:function(message, object) {}, debug:function(message, object) {}})**.
 
44
 *  - **poolSize** {Number, default:5}, number of connections in the connection pool for each server instance, set to 5 as default for legacy reasons.
 
45
 *  - **ssl** {Boolean, default:false}, use ssl connection (needs to have a mongod server with ssl support)
 
46
 *  - **sslValidate** {Boolean, default:false}, validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher)
 
47
 *  - **sslCA** {Array, default:null}, Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher)
 
48
 *  - **sslCert** {Buffer/String, default:null}, String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
 
49
 *  - **sslKey** {Buffer/String, default:null}, String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
 
50
 *  - **sslPass** {Buffer/String, default:null}, String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher)
 
51
 *
 
52
 * @class Represents a 
 
53
 Replicaset Configuration
 
54
 * @param {Array} list of server objects participating in the replicaset.
 
55
 * @param {Object} [options] additional options for the replicaset connection.
 
56
 */
 
57
var ReplSet = exports.ReplSet = function(servers, options) {
 
58
  // Set up basic
 
59
  if(!(this instanceof ReplSet))
 
60
    return new ReplSet(servers, options);
 
61
 
 
62
  // Set up event emitter
 
63
  Base.call(this);
 
64
 
 
65
  // Ensure we have a list of servers
 
66
  if(!Array.isArray(servers)) throw Error("The parameter must be an array of servers and contain at least one server");
 
67
  // Ensure no Mongos's
 
68
  for(var i = 0; i < servers.length; i++) {
 
69
    if(!(servers[i] instanceof Server)) throw new Error("list of servers must be of type Server");
 
70
  }
 
71
 
 
72
  // Save the options
 
73
  this.options = new Options(options);
 
74
  // Ensure basic validation of options
 
75
  this.options.init();
 
76
 
 
77
  // Server state
 
78
  this._serverState = ReplSet.REPLSET_DISCONNECTED;
 
79
  // Add high availability process
 
80
  this._haProcess = new HighAvailabilityProcess(this, this.options);
 
81
 
 
82
  // Let's iterate over all the provided server objects and decorate them
 
83
  this.servers = this.options.decorateAndClean(servers, this._callBackStore);
 
84
  // Throw error if no seed servers
 
85
  if(this.servers.length == 0) throw new Error("No valid seed servers in the array");
 
86
 
 
87
  // Let's set up our strategy object for picking secondaries
 
88
  if(this.options.strategy == 'ping') {
 
89
    // Create a new instance
 
90
    this.strategyInstance = new PingStrategy(this, this.options.secondaryAcceptableLatencyMS);
 
91
  } else if(this.options.strategy == 'statistical') {
 
92
    // Set strategy as statistical
 
93
    this.strategyInstance = new StatisticsStrategy(this);
 
94
    // Add enable query information
 
95
    this.enableRecordQueryStats(true);
 
96
  }
 
97
 
 
98
  this.emitOpen = this.options.emitOpen || true;
 
99
  // Set up a clean state
 
100
  this._state = new ReplSetState(this);
 
101
  // Current round robin selected server
 
102
  this._currentServerChoice = 0;
 
103
  // Ensure up the server callbacks
 
104
  for(var i = 0; i < this.servers.length; i++) {
 
105
    this.servers[i]._callBackStore = this._callBackStore;
 
106
    this.servers[i].name = format("%s:%s", this.servers[i].host, this.servers[i].port)
 
107
    this.servers[i].replicasetInstance = this;
 
108
    this.servers[i].options.auto_reconnect = false;
 
109
    this.servers[i].inheritReplSetOptionsFrom(this);
 
110
  }
 
111
 
 
112
  // Allow setting the socketTimeoutMS on all connections
 
113
  // to work around issues such as secondaries blocking due to compaction
 
114
  utils.setSocketTimeoutProperty(this, this.options.socketOptions);
 
115
}
 
116
 
 
117
/**
 
118
 * @ignore
 
119
 */
 
120
inherits(ReplSet, Base);
 
121
 
 
122
// Replicaset states
 
123
ReplSet.REPLSET_CONNECTING = 'connecting';
 
124
ReplSet.REPLSET_DISCONNECTED = 'disconnected';
 
125
ReplSet.REPLSET_CONNECTED = 'connected';
 
126
ReplSet.REPLSET_RECONNECTING = 'reconnecting';
 
127
ReplSet.REPLSET_DESTROYED = 'destroyed';
 
128
ReplSet.REPLSET_READ_ONLY = 'readonly';
 
129
 
 
130
ReplSet.prototype.isAutoReconnect = function() {
 
131
  return true;
 
132
}
 
133
 
 
134
ReplSet.prototype.canWrite = function() {
 
135
  return this._state.master && this._state.master.isConnected();
 
136
}
 
137
 
 
138
ReplSet.prototype.canRead = function(read) {
 
139
  if((read == ReadPreference.PRIMARY 
 
140
      || (typeof read == 'object' && read.mode == ReadPreference.PRIMARY)
 
141
      || read == null || read == false) && (this._state.master == null || !this._state.master.isConnected())) return false;
 
142
  return Object.keys(this._state.secondaries).length > 0;
 
143
}
 
144
 
 
145
/**
 
146
 * @ignore
 
147
 */
 
148
ReplSet.prototype.enableRecordQueryStats = function(enable) {
 
149
  // Set the global enable record query stats
 
150
  this.recordQueryStats = enable;
 
151
 
 
152
  // Enable all the servers
 
153
  for(var i = 0; i < this.servers.length; i++) {
 
154
    this.servers[i].enableRecordQueryStats(enable);
 
155
  }
 
156
}
 
157
 
 
158
/**
 
159
 * @ignore
 
160
 */
 
161
ReplSet.prototype.setSocketOptions = function(options) {
 
162
  var servers = this.allServerInstances();
 
163
  
 
164
  if(typeof options.socketTimeoutMS == 'number') {
 
165
    this.options.socketOptions.socketTimeoutMS = options.socketTimeoutMS;
 
166
  }
 
167
 
 
168
  if(typeof options.connectTimeoutMS == 'number')
 
169
    this.options.socketOptions.connectTimeoutMS = options.connectTimeoutMS;
 
170
 
 
171
  for(var i = 0; i < servers.length; i++) {
 
172
    servers[i].setSocketOptions(options);
 
173
  }
 
174
}
 
175
 
 
176
/**
 
177
 * @ignore
 
178
 */
 
179
ReplSet.prototype.setReadPreference = function(preference) {
 
180
  this.options.readPreference = preference;
 
181
}
 
182
 
 
183
ReplSet.prototype.connect = function(parent, options, callback) {
 
184
  if(this._serverState != ReplSet.REPLSET_DISCONNECTED) 
 
185
    return callback(new Error("in process of connection"));
 
186
 
 
187
  // If no callback throw
 
188
  if(!(typeof callback == 'function')) 
 
189
    throw new Error("cannot call ReplSet.prototype.connect with no callback function");
 
190
 
 
191
  var self = this;
 
192
  // Save db reference
 
193
  this.options.db = parent;
 
194
  // Set replicaset as connecting
 
195
  this._serverState = ReplSet.REPLSET_CONNECTING
 
196
  // Copy all the servers to our list of seeds
 
197
  var candidateServers = this.servers.slice(0);
 
198
  // Pop the first server
 
199
  var server = candidateServers.pop();
 
200
  server.name = format("%s:%s", server.host, server.port);
 
201
  // Set up the options
 
202
  var opts = {
 
203
    returnIsMasterResults: true,
 
204
    eventReceiver: server
 
205
  }
 
206
 
 
207
  // Register some event listeners
 
208
  this.once("fullsetup", function(err, db, replset) {
 
209
    // Set state to connected
 
210
    self._serverState = ReplSet.REPLSET_CONNECTED;
 
211
    // Stop any process running
 
212
    if(self._haProcess) self._haProcess.stop();
 
213
    // Start the HA process
 
214
    self._haProcess.start();
 
215
 
 
216
    // Emit fullsetup
 
217
    processor(function() {
 
218
      if(self.emitOpen)
 
219
        self._emitAcrossAllDbInstances(self, null, "open", null, null, null);        
 
220
 
 
221
      self._emitAcrossAllDbInstances(self, null, "fullsetup", null, null, null);        
 
222
    });
 
223
 
 
224
    // If we have a strategy defined start it
 
225
    if(self.strategyInstance) {
 
226
      self.strategyInstance.start();
 
227
    }
 
228
 
 
229
    // Finishing up the call
 
230
    callback(err, db, replset);
 
231
  });
 
232
 
 
233
  // Errors
 
234
  this.once("connectionError", function(err, result) {
 
235
    callback(err, result);
 
236
  });
 
237
 
 
238
  // Attempt to connect to the server
 
239
  server.connect(this.options.db, opts, _connectHandler(this, candidateServers, server));
 
240
}
 
241
 
 
242
ReplSet.prototype.close = function(callback) {  
 
243
  var self = this;
 
244
  // Set as destroyed
 
245
  this._serverState = ReplSet.REPLSET_DESTROYED;
 
246
  // Stop the ha
 
247
  this._haProcess.stop();
 
248
  
 
249
  // If we have a strategy stop it
 
250
  if(this.strategyInstance) {
 
251
    this.strategyInstance.stop();
 
252
  }
 
253
 
 
254
  // Kill all servers available
 
255
  for(var name in this._state.addresses) {
 
256
    this._state.addresses[name].close();
 
257
  }
 
258
 
 
259
  // Clean out the state
 
260
  this._state = new ReplSetState(this); 
 
261
  
 
262
  // Emit close event
 
263
  processor(function() {
 
264
    self._emitAcrossAllDbInstances(self, null, "close", null, null, true)    
 
265
  });
 
266
 
 
267
  // Flush out any remaining call handlers
 
268
  self._flushAllCallHandlers(utils.toError("Connection Closed By Application"));
 
269
 
 
270
  // Callback
 
271
  if(typeof callback == 'function') 
 
272
    return callback(null, null);
 
273
}
 
274
 
 
275
/**
 
276
 * Creates a new server for the `replset` based on `host`.
 
277
 *
 
278
 * @param {String} host - host:port pair (localhost:27017)
 
279
 * @param {ReplSet} replset - the ReplSet instance
 
280
 * @return {Server}
 
281
 * @ignore
 
282
 */
 
283
var createServer = function(self, host, options) {
 
284
  // copy existing socket options to new server
 
285
  var socketOptions = {}
 
286
  if(options.socketOptions) {
 
287
    var keys = Object.keys(options.socketOptions);
 
288
    for(var k = 0; k < keys.length; k++) {
 
289
      socketOptions[keys[k]] = options.socketOptions[keys[k]];
 
290
    }
 
291
  }
 
292
 
 
293
  var parts = host.split(/:/);
 
294
  if(1 === parts.length) {
 
295
    parts[1] = Connection.DEFAULT_PORT;
 
296
  }
 
297
 
 
298
  socketOptions.host = parts[0];
 
299
  socketOptions.port = parseInt(parts[1], 10);
 
300
 
 
301
  var serverOptions = {
 
302
    readPreference: options.readPreference,
 
303
    socketOptions: socketOptions,
 
304
    poolSize: options.poolSize,
 
305
    logger: options.logger,
 
306
    auto_reconnect: false,
 
307
    ssl: options.ssl,
 
308
    sslValidate: options.sslValidate,
 
309
    sslCA: options.sslCA,
 
310
    sslCert: options.sslCert,
 
311
    sslKey: options.sslKey,
 
312
    sslPass: options.sslPass
 
313
  }
 
314
 
 
315
  var server = new Server(socketOptions.host, socketOptions.port, serverOptions);
 
316
  // Set up shared state
 
317
  server._callBackStore = self._callBackStore;
 
318
  server.replicasetInstance = self;
 
319
  server.enableRecordQueryStats(self.recordQueryStats);
 
320
  // Set up event handlers
 
321
  server.on("close", _handler("close", self, server));
 
322
  server.on("error", _handler("error", self, server));
 
323
  server.on("timeout", _handler("timeout", self, server));
 
324
  return server;
 
325
}
 
326
 
 
327
var _handler = function(event, self, server) {
 
328
  return function(err, doc) {
 
329
    // The event happened to a primary
 
330
    // Remove it from play
 
331
    if(self._state.isPrimary(server)) {    
 
332
      // Emit that the primary left the replicaset
 
333
      self.emit('left', 'primary', server);
 
334
      // Get the current master
 
335
      var current_master = self._state.master;
 
336
      self._state.master = null;
 
337
      self._serverState = ReplSet.REPLSET_READ_ONLY;
 
338
    
 
339
      if(current_master != null) {
 
340
        // Unpack variables
 
341
        var host = current_master.socketOptions.host;
 
342
        var port = current_master.socketOptions.port;
 
343
 
 
344
        // Fire error on any unknown callbacks
 
345
        self.__executeAllServerSpecificErrorCallbacks(host, port, err);        
 
346
      }
 
347
    } else if(self._state.isSecondary(server)) {
 
348
      // Emit that a secondary left the replicaset
 
349
      self.emit('left', 'secondary', server);
 
350
      // Delete from the list
 
351
      delete self._state.secondaries[server.name];
 
352
    }
 
353
 
 
354
    // If there is no more connections left and the setting is not destroyed
 
355
    // set to disconnected
 
356
    if(Object.keys(self._state.addresses).length == 0 
 
357
      && self._serverState != ReplSet.REPLSET_DESTROYED) {
 
358
        self._serverState = ReplSet.REPLSET_DISCONNECTED;
 
359
 
 
360
        // Emit close across all the attached db instances
 
361
        self._dbStore.emit("close", new Error("replicaset disconnected, no valid servers contactable over tcp"), null, true);
 
362
    }
 
363
 
 
364
    // Unpack variables
 
365
    var host = server.socketOptions.host;
 
366
    var port = server.socketOptions.port;
 
367
 
 
368
    // Fire error on any unknown callbacks
 
369
    self.__executeAllServerSpecificErrorCallbacks(host, port, err);
 
370
  }
 
371
}
 
372
 
 
373
var locateNewServers = function(self, state, candidateServers, ismaster) {
 
374
  // Retrieve the host
 
375
  var hosts = ismaster.hosts;
 
376
  // In candidate servers
 
377
  var inCandidateServers = function(name, candidateServers) {
 
378
    for(var i = 0; i < candidateServers.length; i++) {
 
379
      if(candidateServers[i].name == name) return true;
 
380
    }
 
381
 
 
382
    return false;
 
383
  }
 
384
 
 
385
  // New servers
 
386
  var newServers = [];
 
387
  if(Array.isArray(hosts)) {
 
388
    // Let's go over all the hosts
 
389
    for(var i = 0; i < hosts.length; i++) {
 
390
      if(!state.contains(hosts[i]) 
 
391
        && !inCandidateServers(hosts[i], candidateServers)) {
 
392
          newServers.push(createServer(self, hosts[i], self.options));
 
393
      }
 
394
    }    
 
395
  }
 
396
 
 
397
  // Return list of possible new servers
 
398
  return newServers;
 
399
}
 
400
 
 
401
var _connectHandler = function(self, candidateServers, instanceServer) {
 
402
  return function(err, doc) {
 
403
    // If we have an error add to the list
 
404
    if(err) {
 
405
      self._state.errors[instanceServer.name] = instanceServer;
 
406
    } else {
 
407
      delete self._state.errors[instanceServer.name];
 
408
    }
 
409
 
 
410
    if(!err) {      
 
411
      var ismaster = doc.documents[0]
 
412
 
 
413
      // Error the server if 
 
414
      if(!ismaster.ismaster
 
415
        && !ismaster.secondary) {
 
416
        self._state.errors[instanceServer.name] = instanceServer;
 
417
      }
 
418
    }
 
419
 
 
420
 
 
421
    // No error let's analyse the ismaster command
 
422
    if(!err && self._state.errors[instanceServer.name] == null) {
 
423
      var ismaster = doc.documents[0]
 
424
 
 
425
      // If no replicaset name exists set the current one
 
426
      if(self.options.rs_name == null) {
 
427
        self.options.rs_name = ismaster.setName;
 
428
      }
 
429
 
 
430
      // If we have a member that is not part of the set let's finish up
 
431
      if(typeof ismaster.setName == 'string' && ismaster.setName != self.options.rs_name) {
 
432
        return self.emit("connectionError", new Error("Replicaset name " + ismaster.setName + " does not match specified name " + self.options.rs_name));
 
433
      }
 
434
 
 
435
      // Add the error handlers
 
436
      instanceServer.on("close", _handler("close", self, instanceServer));
 
437
      instanceServer.on("error", _handler("error", self, instanceServer));
 
438
      instanceServer.on("timeout", _handler("timeout", self, instanceServer));
 
439
      
 
440
      // Set any tags on the instance server
 
441
      instanceServer.name = ismaster.me;
 
442
      instanceServer.tags = ismaster.tags;
 
443
 
 
444
      // Add the server to the list
 
445
      self._state.addServer(instanceServer, ismaster);
 
446
 
 
447
      // Check if we have more servers to add (only check when done with initial set)
 
448
      if(candidateServers.length == 0) {
 
449
        // Get additional new servers that are not currently in set
 
450
        var new_servers = locateNewServers(self, self._state, candidateServers, ismaster);
 
451
 
 
452
        // Locate any new servers that have not errored out yet
 
453
        for(var i = 0; i < new_servers.length; i++) {
 
454
          if(self._state.errors[new_servers[i].name] == null) {
 
455
            candidateServers.push(new_servers[i])            
 
456
          }
 
457
        }
 
458
      }
 
459
    }
 
460
 
 
461
    // If the candidate server list is empty and no valid servers
 
462
    if(candidateServers.length == 0 &&
 
463
      !self._state.hasValidServers()) {
 
464
        return self.emit("connectionError", new Error("No valid replicaset instance servers found"));
 
465
    } else if(candidateServers.length == 0) {      
 
466
      if(!self.options.connectWithNoPrimary && (self._state.master == null || !self._state.master.isConnected())) {
 
467
        return self.emit("connectionError", new Error("No primary found in set"));
 
468
      }
 
469
      return self.emit("fullsetup", null, self.options.db, self);
 
470
    }
 
471
        
 
472
    // Let's connect the next server    
 
473
    var nextServer = candidateServers.pop();
 
474
  
 
475
    // Set up the options
 
476
    var opts = {
 
477
      returnIsMasterResults: true,
 
478
      eventReceiver: nextServer
 
479
    }
 
480
 
 
481
    // Attempt to connect to the server
 
482
    nextServer.connect(self.options.db, opts, _connectHandler(self, candidateServers, nextServer));
 
483
  }
 
484
}
 
485
 
 
486
ReplSet.prototype.isDestroyed = function() {
 
487
  return this._serverState == ReplSet.REPLSET_DESTROYED;
 
488
}
 
489
 
 
490
ReplSet.prototype.isConnected = function(read) {
 
491
  var isConnected = false;  
 
492
 
 
493
  if(read == null || read == ReadPreference.PRIMARY || read == false)
 
494
    isConnected = this._state.master != null && this._state.master.isConnected();
 
495
 
 
496
  if((read == ReadPreference.PRIMARY_PREFERRED || read == ReadPreference.SECONDARY_PREFERRED || read == ReadPreference.NEAREST)
 
497
    && ((this._state.master != null && this._state.master.isConnected())
 
498
    || (this._state && this._state.secondaries && Object.keys(this._state.secondaries).length > 0))) {
 
499
      isConnected = true;
 
500
  } else if(read == ReadPreference.SECONDARY) {
 
501
    isConnected = this._state && this._state.secondaries && Object.keys(this._state.secondaries).length > 0;
 
502
  }
 
503
 
 
504
  // No valid connection return false
 
505
  return isConnected;
 
506
}
 
507
 
 
508
ReplSet.prototype.isMongos = function() {
 
509
  return false;
 
510
}
 
511
 
 
512
ReplSet.prototype.checkoutWriter = function() {
 
513
  if(this._state.master) return this._state.master.checkoutWriter();
 
514
  return new Error("no writer connection available");
 
515
}
 
516
 
 
517
ReplSet.prototype.processIsMaster = function(_server, _ismaster) {
 
518
  // Server in recovery mode, remove it from available servers
 
519
  if(!_ismaster.ismaster && !_ismaster.secondary) {
 
520
    // Locate the actual server
 
521
    var server = this._state.addresses[_server.name];
 
522
    // Close the server, simulating the closing of the connection
 
523
    // to get right removal semantics
 
524
    if(server) server.close();
 
525
    // Execute any callback errors
 
526
    _handler(null, this, server)(new Error("server is in recovery mode"));
 
527
  }
 
528
}
 
529
 
 
530
ReplSet.prototype.allRawConnections = function() {
 
531
  var connections = [];
 
532
 
 
533
  for(var name in this._state.addresses) {
 
534
    connections = connections.concat(this._state.addresses[name].allRawConnections());
 
535
  }
 
536
 
 
537
  return connections;
 
538
}
 
539
 
 
540
/**
 
541
 * @ignore
 
542
 */
 
543
ReplSet.prototype.allServerInstances = function() {
 
544
  var self = this;
 
545
  // If no state yet return empty
 
546
  if(!self._state) return [];
 
547
  // Close all the servers (concatenate entire list of servers first for ease)
 
548
  var allServers = self._state.master != null ? [self._state.master] : [];
 
549
 
 
550
  // Secondary keys
 
551
  var keys = Object.keys(self._state.secondaries);
 
552
  // Add all secondaries
 
553
  for(var i = 0; i < keys.length; i++) {
 
554
    allServers.push(self._state.secondaries[keys[i]]);
 
555
  }
 
556
 
 
557
  // Return complete list of all servers
 
558
  return allServers;
 
559
}
 
560
 
 
561
/**
 
562
 * @ignore
 
563
 */
 
564
ReplSet.prototype.checkoutReader = function(readPreference, tags) {
 
565
  var connection = null;
 
566
 
 
567
  // If we have a read preference object unpack it
 
568
  if(typeof readPreference == 'object' && readPreference['_type'] == 'ReadPreference') {
 
569
    // Validate if the object is using a valid mode
 
570
    if(!readPreference.isValid()) throw new Error("Illegal readPreference mode specified, " + JSON.stringify(readPreference.mode));
 
571
    // Set the tag
 
572
    tags = readPreference.tags;
 
573
    readPreference = readPreference.mode;
 
574
  } else if(typeof readPreference == 'object' && readPreference['_type'] != 'ReadPreference') {
 
575
    return new Error("read preferences must be either a string or an instance of ReadPreference");
 
576
  }
 
577
 
 
578
  // Set up our read Preference, allowing us to override the readPreference
 
579
  var finalReadPreference = readPreference != null ? readPreference : this.options.readPreference;
 
580
 
 
581
  // Ensure we unpack a reference
 
582
  if(finalReadPreference != null && typeof finalReadPreference == 'object' && finalReadPreference['_type'] == 'ReadPreference') {
 
583
    // Validate if the object is using a valid mode
 
584
    if(!finalReadPreference.isValid()) throw new Error("Illegal readPreference mode specified, " + JSON.stringify(finalReadPreference.mode));
 
585
    // Set the tag
 
586
    tags = finalReadPreference.tags;
 
587
    readPreference = finalReadPreference.mode;
 
588
  }
 
589
 
 
590
  // Finalize the read preference setup
 
591
  finalReadPreference = finalReadPreference == true ? ReadPreference.SECONDARY_PREFERRED : finalReadPreference;
 
592
  finalReadPreference = finalReadPreference == null ? ReadPreference.PRIMARY : finalReadPreference;
 
593
 
 
594
  // If we are reading from a primary
 
595
  if(finalReadPreference == 'primary') {
 
596
    // If we provide a tags set send an error
 
597
    if(typeof tags == 'object' && tags != null) {
 
598
      return new Error("PRIMARY cannot be combined with tags");
 
599
    }
 
600
 
 
601
    // If we provide a tags set send an error
 
602
    if(this._state.master == null) {
 
603
      return new Error("No replica set primary available for query with ReadPreference PRIMARY");
 
604
    }
 
605
 
 
606
    // Checkout a writer
 
607
    return this.checkoutWriter();
 
608
  }
 
609
 
 
610
  // If we have specified to read from a secondary server grab a random one and read
 
611
  // from it, otherwise just pass the primary connection
 
612
  if((this.options.readSecondary || finalReadPreference == ReadPreference.SECONDARY_PREFERRED || finalReadPreference == ReadPreference.SECONDARY) && Object.keys(this._state.secondaries).length > 0) {
 
613
    // If we have tags, look for servers matching the specific tag
 
614
    if(this.strategyInstance != null) {
 
615
      // Only pick from secondaries
 
616
      var _secondaries = [];
 
617
      for(var key in this._state.secondaries) {
 
618
        _secondaries.push(this._state.secondaries[key]);
 
619
      }
 
620
 
 
621
      if(finalReadPreference == ReadPreference.SECONDARY) {
 
622
        // Check out the nearest from only the secondaries
 
623
        connection = this.strategyInstance.checkoutConnection(tags, _secondaries);
 
624
      } else {
 
625
        connection = this.strategyInstance.checkoutConnection(tags, _secondaries);
 
626
        // No candidate servers that match the tags, error
 
627
        if(connection == null || connection instanceof Error) {
 
628
          // No secondary server avilable, attemp to checkout a primary server
 
629
          connection = this.checkoutWriter();
 
630
          // If no connection return an error
 
631
          if(connection == null || connection instanceof Error) {
 
632
            return new Error("No replica set members available for query");
 
633
          }
 
634
        }
 
635
      }
 
636
    } else if(tags != null && typeof tags == 'object') {
 
637
      // Get connection
 
638
      connection = _pickFromTags(this, tags);// = function(self, readPreference, tags) {
 
639
      // No candidate servers that match the tags, error
 
640
      if(connection == null) {
 
641
        return new Error("No replica set members available for query");
 
642
      }
 
643
    } else {
 
644
      connection = _roundRobin(this, tags);
 
645
    }
 
646
  } else if(finalReadPreference == ReadPreference.PRIMARY_PREFERRED) {
 
647
    // Check if there is a primary available and return that if possible
 
648
    connection = this.checkoutWriter();
 
649
    // If no connection available checkout a secondary
 
650
    if(connection == null || connection instanceof Error) {
 
651
      // If we have tags, look for servers matching the specific tag
 
652
      if(tags != null && typeof tags == 'object') {
 
653
        // Get connection
 
654
        connection = _pickFromTags(this, tags);// = function(self, readPreference, tags) {
 
655
        // No candidate servers that match the tags, error
 
656
        if(connection == null) {
 
657
          return new Error("No replica set members available for query");
 
658
        }
 
659
      } else {
 
660
        connection = _roundRobin(this, tags);
 
661
      }
 
662
    }
 
663
  } else if(finalReadPreference == ReadPreference.SECONDARY_PREFERRED) {
 
664
    // If we have tags, look for servers matching the specific tag
 
665
    if(this.strategyInstance != null) {
 
666
      connection = this.strategyInstance.checkoutConnection(tags);
 
667
      
 
668
      // No candidate servers that match the tags, error
 
669
      if(connection == null || connection instanceof Error) {
 
670
        // No secondary server avilable, attemp to checkout a primary server
 
671
        connection = this.checkoutWriter();
 
672
        // If no connection return an error
 
673
        if(connection == null || connection instanceof Error) {
 
674
          var preferenceName = finalReadPreference == ReadPreference.SECONDARY ? 'secondary' : finalReadPreference;
 
675
          return new Error("No replica set member available for query with ReadPreference " + preferenceName + " and tags " + JSON.stringify(tags));
 
676
        }
 
677
      }
 
678
    } else if(tags != null && typeof tags == 'object') {
 
679
      // Get connection
 
680
      connection = _pickFromTags(this, tags);// = function(self, readPreference, tags) {
 
681
      // No candidate servers that match the tags, error
 
682
      if(connection == null) {
 
683
        // No secondary server avilable, attemp to checkout a primary server
 
684
        connection = this.checkoutWriter();
 
685
        // If no connection return an error
 
686
        if(connection == null || connection instanceof Error) {
 
687
          var preferenceName = finalReadPreference == ReadPreference.SECONDARY ? 'secondary' : finalReadPreference;
 
688
          return new Error("No replica set member available for query with ReadPreference " + preferenceName + " and tags " + JSON.stringify(tags));
 
689
        }
 
690
      }
 
691
    }
 
692
  } else if(finalReadPreference == ReadPreference.NEAREST && this.strategyInstance != null) {
 
693
    connection = this.strategyInstance.checkoutConnection(tags);
 
694
  } else if(finalReadPreference == ReadPreference.NEAREST && this.strategyInstance == null) {
 
695
    return new Error("A strategy for calculating nearness must be enabled such as ping or statistical");
 
696
  } else if(finalReadPreference == ReadPreference.SECONDARY && Object.keys(this._state.secondaries).length == 0) {
 
697
    if(tags != null && typeof tags == 'object') {
 
698
      var preferenceName = finalReadPreference == ReadPreference.SECONDARY ? 'secondary' : finalReadPreference;
 
699
      return new Error("No replica set member available for query with ReadPreference " + preferenceName + " and tags " + JSON.stringify(tags));
 
700
    } else {
 
701
      return new Error("No replica set secondary available for query with ReadPreference SECONDARY");
 
702
    }
 
703
  } else {
 
704
    connection = this.checkoutWriter();
 
705
  }
 
706
 
 
707
  // Return the connection
 
708
  return connection;
 
709
}
 
710
 
 
711
/**
 
712
 * @ignore
 
713
 */
 
714
var _pickFromTags = function(self, tags) {
 
715
  // If we have an array or single tag selection
 
716
  var tagObjects = Array.isArray(tags) ? tags : [tags];
 
717
  // Iterate over all tags until we find a candidate server
 
718
  for(var _i = 0; _i < tagObjects.length; _i++) {
 
719
    // Grab a tag object
 
720
    var tagObject = tagObjects[_i];
 
721
    // Matching keys
 
722
    var matchingKeys = Object.keys(tagObject);
 
723
    // Match all the servers that match the provdided tags
 
724
    var keys = Object.keys(self._state.secondaries);
 
725
    var candidateServers = [];
 
726
 
 
727
    for(var i = 0; i < keys.length; i++) {
 
728
      var server = self._state.secondaries[keys[i]];
 
729
      // If we have tags match
 
730
      if(server.tags != null) {
 
731
        var matching = true;
 
732
        // Ensure we have all the values
 
733
        for(var j = 0; j < matchingKeys.length; j++) {
 
734
          if(server.tags[matchingKeys[j]] != tagObject[matchingKeys[j]]) {
 
735
            matching = false;
 
736
            break;
 
737
          }
 
738
        }
 
739
 
 
740
        // If we have a match add it to the list of matching servers
 
741
        if(matching) {
 
742
          candidateServers.push(server);
 
743
        }
 
744
      }
 
745
    }
 
746
 
 
747
    // If we have a candidate server return
 
748
    if(candidateServers.length > 0) {
 
749
      if(self.strategyInstance) return self.strategyInstance.checkoutConnection(tags, candidateServers);
 
750
      // Set instance to return
 
751
      return candidateServers[Math.floor(Math.random() * candidateServers.length)].checkoutReader();
 
752
    }
 
753
  }
 
754
 
 
755
  // No connection found
 
756
  return null;
 
757
}
 
758
 
 
759
/**
 
760
 * Pick a secondary using round robin
 
761
 *
 
762
 * @ignore
 
763
 */
 
764
function _roundRobin (replset, tags) {
 
765
  var keys = Object.keys(replset._state.secondaries);
 
766
  // Update index
 
767
  replset._currentServerChoice = replset._currentServerChoice + 1;
 
768
  // Pick a server
 
769
  var key = keys[replset._currentServerChoice % keys.length];
 
770
 
 
771
  var conn = null != replset._state.secondaries[key]
 
772
    ? replset._state.secondaries[key].checkoutReader()
 
773
    : null;
 
774
 
 
775
  // If connection is null fallback to first available secondary
 
776
  if(null == conn) {
 
777
    conn = pickFirstConnectedSecondary(replset, tags);
 
778
  }
 
779
 
 
780
  return conn;
 
781
}
 
782
 
 
783
/**
 
784
 * @ignore
 
785
 */
 
786
var pickFirstConnectedSecondary = function pickFirstConnectedSecondary(self, tags) {
 
787
  var keys = Object.keys(self._state.secondaries);
 
788
  var connection;
 
789
 
 
790
  // Find first available reader if any
 
791
  for(var i = 0; i < keys.length; i++) {
 
792
    connection = self._state.secondaries[keys[i]].checkoutReader();
 
793
    if(connection) return connection;
 
794
  }
 
795
 
 
796
  // If we still have a null, read from primary if it's not secondary only
 
797
  if(self._readPreference == ReadPreference.SECONDARY_PREFERRED) {
 
798
    connection = self._state.master.checkoutReader();
 
799
    if(connection) return connection;
 
800
  }
 
801
 
 
802
  var preferenceName = self._readPreference == ReadPreference.SECONDARY_PREFERRED
 
803
    ? 'secondary'
 
804
    : self._readPreference;
 
805
 
 
806
  return new Error("No replica set member available for query with ReadPreference "
 
807
                  + preferenceName + " and tags " + JSON.stringify(tags));
 
808
}
 
809
 
 
810
/**
 
811
 * Get list of secondaries
 
812
 * @ignore
 
813
 */
 
814
Object.defineProperty(ReplSet.prototype, "secondaries", {enumerable: true
 
815
  , get: function() {
 
816
      return utils.objectToArray(this._state.secondaries);
 
817
    }
 
818
});
 
819
 
 
820
/**
 
821
 * Get list of secondaries
 
822
 * @ignore
 
823
 */
 
824
Object.defineProperty(ReplSet.prototype, "arbiters", {enumerable: true
 
825
  , get: function() {
 
826
      return utils.objectToArray(this._state.arbiters);
 
827
    }
 
828
});
 
829