2
require 'net/ssh/gateway'
3
require 'net/ssh/multi/server'
4
require 'net/ssh/multi/dynamic_server'
5
require 'net/ssh/multi/server_list'
6
require 'net/ssh/multi/channel'
7
require 'net/ssh/multi/pending_connection'
8
require 'net/ssh/multi/session_actions'
9
require 'net/ssh/multi/subsession'
11
module Net; module SSH; module Multi
12
# Represents a collection of connections to various servers. It provides an
13
# interface for organizing the connections (#group), as well as a way to
14
# scope commands to a subset of all connections (#with). You can also provide
15
# a default gateway connection that servers should use when connecting
16
# (#via). It exposes an interface similar to Net::SSH::Connection::Session
17
# for opening SSH channels and executing commands, allowing for these
18
# operations to be done in parallel across multiple connections.
20
# Net::SSH::Multi.start do |session|
21
# # access servers via a gateway
22
# session.via 'gateway', 'gateway-user'
24
# # define the servers we want to use
25
# session.use 'user1@host1'
26
# session.use 'user2@host2'
28
# # define servers in groups for more granular access
29
# session.group :app do
30
# session.use 'user@app1'
31
# session.use 'user@app2'
34
# # execute commands on all servers
35
# session.exec "uptime"
37
# # execute commands on a subset of servers
38
# session.with(:app).exec "hostname"
40
# # run the aggregated event loop
44
# Note that connections are established lazily, as soon as they are needed.
45
# You can force the connections to be opened immediately, though, using the
48
# == Concurrent Connection Limiting
50
# Sometimes you may be dealing with a large number of servers, and if you
51
# try to have connections open to all of them simultaneously you'll run into
52
# open file handle limitations and such. If this happens to you, you can set
53
# the #concurrent_connections property of the session. Net::SSH::Multi will
54
# then ensure that no more than this number of connections are ever open
57
# Net::SSH::Multi.start(:concurrent_connections => 5) do |session|
61
# Opening channels and executing commands will still work exactly as before,
62
# but Net::SSH::Multi will transparently close finished connections and open
65
# == Controlling Connection Errors
67
# By default, Net::SSH::Multi will raise an exception if a connection error
68
# occurs when connecting to a server. This will typically bubble up and abort
69
# the entire connection process. Sometimes, however, you might wish to ignore
70
# connection errors, for instance when starting a daemon on a large number of
71
# boxes and you know that some of the boxes are going to be unavailable.
73
# To do this, simply set the #on_error property of the session to :ignore
74
# (or to :warn, if you want a warning message when a connection attempt
77
# Net::SSH::Multi.start(:on_error => :ignore) do |session|
81
# The default is :fail, which causes the exception to bubble up. Additionally,
82
# you can specify a Proc object as the value for #on_error, which will be
83
# invoked with the server in question if the connection attempt fails. You
84
# can force the connection attempt to retry by throwing the :go symbol, with
85
# :retry as the payload, or force the exception to be reraised by throwing
86
# :go with :raise as the payload:
88
# handler = Proc.new do |server|
89
# server[:connection_attempts] ||= 0
90
# if server[:connection_attempts] < 3
91
# server[:connection_attempts] += 1
98
# Net::SSH::Multi.start(:on_error => handler) do |session|
102
# Any other thrown value (or no thrown value at all) will result in the
103
# failure being ignored.
105
# == Lazily Evaluated Server Definitions
107
# Sometimes you might be dealing with an environment where you don't know the
108
# names or addresses of the servers until runtime. You can certainly dynamically
109
# build server names and pass them to #use, but if the operation to determine
110
# the server names is expensive, you might want to defer it until the server
111
# is actually needed (especially if the logic of your program is such that
112
# you might not even need to connect to that server every time the program
115
# You can do this by passing a block to #use:
117
# session.use do |opt|
118
# lookup_ip_address_of_remote_host
121
# See #use for more information about this usage.
123
include SessionActions
125
# The Net::SSH::Multi::ServerList managed by this session.
126
attr_reader :server_list
128
# The default Net::SSH::Gateway instance to use to connect to the servers.
129
# If +nil+, no default gateway will be used.
130
attr_reader :default_gateway
132
# The hash of group definitions, mapping each group name to a corresponding
133
# Net::SSH::Multi::ServerList.
136
# The number of allowed concurrent connections. No more than this number
137
# of sessions will be open at any given time.
138
attr_accessor :concurrent_connections
140
# How connection errors should be handled. This defaults to :fail, but
141
# may be set to :ignore if connection errors should be ignored, or
142
# :warn if connection errors should cause a warning.
143
attr_accessor :on_error
145
# The default user name to use when connecting to a server. If a user name
146
# is not given for a particular server, this value will be used. It defaults
147
# to ENV['USER'] || ENV['USERNAME'], or "unknown" if neither of those are
149
attr_accessor :default_user
151
# The number of connections that are currently open.
152
attr_reader :open_connections #:nodoc:
154
# The list of "open" groups, which will receive subsequent server definitions.
155
# See #use and #group.
156
attr_reader :open_groups #:nodoc:
158
# Creates a new Net::SSH::Multi::Session instance. Initially, it contains
159
# no server definitions, no group definitions, and no default gateway.
161
# You can set the #concurrent_connections property in the options. Setting
162
# it to +nil+ (the default) will cause Net::SSH::Multi to ignore any
163
# concurrent connection limit and allow all defined sessions to be open
164
# simultaneously. Setting it to an integer will cause Net::SSH::Multi to
165
# allow no more than that number of concurrently open sessions, opening
166
# subsequent sessions only when other sessions finish and close.
168
# Net::SSH::Multi.start(:concurrent_connections => 10) do |session|
171
def initialize(options={})
172
@server_list = ServerList.new
173
@groups = Hash.new { |h,k| h[k] = ServerList.new }
176
@connect_threads = []
178
@default_user = ENV['USER'] || ENV['USERNAME'] || "unknown"
180
@open_connections = 0
181
@pending_sessions = []
182
@session_mutex = Mutex.new
184
options.each { |opt, value| send("#{opt}=", value) }
187
# At its simplest, this associates a named group with a server definition.
188
# It can be used in either of two ways:
190
# First, you can use it to associate a group (or array of groups) with a
191
# server definition (or array of server definitions). The server definitions
192
# must already exist in the #server_list array (typically by calling #use):
194
# server1 = session.use('host1', 'user1')
195
# server2 = session.use('host2', 'user2')
196
# session.group :app => server1, :web => server2
197
# session.group :staging => [server1, server2]
198
# session.group %w(xen linux) => server2
199
# session.group %w(rackspace backup) => [server1, server2]
201
# Secondly, instead of a mapping of groups to servers, you can just
202
# provide a list of group names, and then a block. Inside the block, any
203
# calls to #use will automatically associate the new server definition with
204
# those groups. You can nest #group calls, too, which will aggregate the
207
# session.group :rackspace, :backup do
208
# session.use 'host1', 'user1'
209
# session.group :xen do
210
# session.use 'host2', 'user2'
214
mapping = args.last.is_a?(Hash) ? args.pop : {}
216
if mapping.any? && block_given?
217
raise ArgumentError, "must provide group mapping OR block, not both"
220
saved_groups = open_groups.dup
221
open_groups.concat(args.map { |a| a.to_sym }).uniq!
224
open_groups.replace(saved_groups)
227
mapping.each do |key, value|
228
(open_groups + Array(key)).uniq.each do |grp|
229
groups[grp.to_sym].concat(Array(value))
235
# Sets up a default gateway to use when establishing connections to servers.
236
# Note that any servers defined prior to this invocation will not use the
237
# default gateway; it only affects servers defined subsequently.
239
# session.via 'gateway.host', 'user'
241
# You may override the default gateway on a per-server basis by passing the
242
# :via key to the #use method; see #use for details.
243
def via(host, user, options={})
244
@default_gateway = Net::SSH::Gateway.new(host, user, options)
248
# Defines a new server definition, to be managed by this session. The
249
# server is at the given +host+, and will be connected to as the given
250
# +user+. The other options are passed as-is to the Net::SSH session
253
# If a default gateway has been specified previously (with #via) it will
254
# be passed to the new server definition. You can override this by passing
255
# a different Net::SSH::Gateway instance (or +nil+) with the :via key in
259
# session.use 'user@host2', :via => nil
260
# session.use 'host3', :user => "user3", :via => Net::SSH::Gateway.new('gateway.host', 'user')
262
# If only a single host is given, the new server instance is returned. You
263
# can give multiple hosts at a time, though, in which case an array of
264
# server instances will be returned.
266
# server1, server2 = session.use "host1", "host2"
268
# If given a block, this will save the block as a Net::SSH::Multi::DynamicServer
269
# definition, to be evaluated lazily the first time the server is needed.
270
# The block will recive any options hash given to #use, and should return
271
# +nil+ (if no servers are to be added), a String or an array of Strings
272
# (to be interpreted as a connection specification), or a Server or an
274
def use(*hosts, &block)
275
options = hosts.last.is_a?(Hash) ? hosts.pop : {}
276
options = { :via => default_gateway }.merge(options)
278
results = hosts.map do |host|
279
server_list.add(Server.new(self, host, options))
283
results << server_list.add(DynamicServer.new(self, options, block))
287
results.length > 1 ? results : results.first
290
# Essentially an alias for #servers_for without any arguments. This is used
291
# primarily to satistfy the expectations of the Net::SSH::Multi::SessionActions
297
# Returns the set of servers that match the given criteria. It can be used
298
# in any (or all) of three ways.
300
# First, you can omit any arguments. In this case, the full list of servers
303
# all = session.servers_for
305
# Second, you can simply specify a list of group names. All servers in all
306
# named groups will be returned. If a server belongs to multiple matching
307
# groups, then it will appear only once in the list (the resulting list
308
# will contain only unique servers).
310
# servers = session.servers_for(:app, :db)
312
# Last, you can specify a hash with group names as keys, and property
313
# constraints as the values. These property constraints are either "only"
314
# constraints (which restrict the set of servers to "only" those that match
315
# the given properties) or "except" constraints (which restrict the set of
316
# servers to those whose properties do _not_ match). Properties are described
317
# when the server is defined (via the :properties key):
319
# session.group :db do
320
# session.use 'dbmain', 'user', :properties => { :primary => true }
321
# session.use 'dbslave', 'user2'
322
# session.use 'dbslve2', 'user2'
325
# # return ONLY on the servers in the :db group which have the :primary
326
# # property set to true.
327
# primary = session.servers_for(:db => { :only => { :primary => true } })
329
# You can, naturally, combine these methods:
331
# # all servers in :app and :web, and all servers in :db with the
332
# # :primary property set to true
333
# servers = session.servers_for(:app, :web, :db => { :only => { :primary => true } })
334
def servers_for(*criteria)
338
# normalize the criteria list, so that every entry is a key to a
339
# criteria hash (possibly empty).
340
criteria = criteria.inject({}) do |hash, entry|
342
when Hash then hash.merge(entry)
343
else hash.merge(entry => {})
347
list = criteria.inject([]) do |aggregator, (group, properties)|
348
raise ArgumentError, "the value for any group must be a Hash, but got a #{properties.class} for #{group.inspect}" unless properties.is_a?(Hash)
349
bad_keys = properties.keys - [:only, :except]
350
raise ArgumentError, "unknown constraint(s) #{bad_keys.inspect} for #{group.inspect}" unless bad_keys.empty?
352
servers = groups[group].select do |server|
353
(properties[:only] || {}).all? { |prop, value| server[prop] == value } &&
354
!(properties[:except] || {}).any? { |prop, value| server[prop] == value }
357
aggregator.concat(servers)
364
# Returns a new Net::SSH::Multi::Subsession instance consisting of the
365
# servers that meet the given criteria. If a block is given, the
366
# subsession will be yielded to it. See #servers_for for a discussion of
367
# how these criteria are interpreted.
369
# session.with(:app).exec('hostname')
371
# session.with(:app, :db => { :primary => true }) do |s|
376
subsession = Subsession.new(self, servers_for(*groups))
377
yield subsession if block_given?
381
# Works as #with, but for specific servers rather than groups. It will
382
# return a new subsession (Net::SSH::Multi::Subsession) consisting of
383
# the given servers. (Note that it requires that the servers in question
384
# have been created via calls to #use on this session object, or things
385
# will not work quite right.) If a block is given, the new subsession
386
# will also be yielded to the block.
388
# srv1 = session.use('host1', 'user')
389
# srv2 = session.use('host2', 'user')
391
# session.on(srv1, srv2).exec('hostname')
393
subsession = Subsession.new(self, servers)
394
yield subsession if block_given?
398
# Closes the multi-session by shutting down all open server sessions, and
399
# the default gateway (if one was specified using #via). Note that other
400
# gateway connections (e.g., those passed to #use directly) will _not_ be
401
# closed by this method, and must be managed externally.
403
server_list.each { |server| server.close_channels }
404
loop(0) { busy?(true) }
405
server_list.each { |server| server.close }
406
default_gateway.shutdown! if default_gateway
409
alias :loop_forever :loop
411
# Run the aggregated event loop for all open server sessions, until the given
412
# block returns +false+. If no block is given, the loop will run for as
413
# long as #busy? returns +true+ (in other words, for as long as there are
414
# any (non-invisible) channels open).
415
def loop(wait=nil, &block)
416
running = block || Proc.new { |c| busy? }
417
loop_forever { break unless process(wait, &running) }
420
# Run a single iteration of the aggregated event loop for all open server
421
# sessions. The +wait+ parameter indicates how long to wait for an event
422
# to appear on any of the different sessions; +nil+ (the default) means
423
# "wait forever". If the block is given, then it will be used to determine
424
# whether #process returns +true+ (the block did not return +false+), or
425
# +false+ (the block returned +false+).
426
def process(wait=nil, &block)
427
realize_pending_connections!
428
wait = @connect_threads.any? ? 0 : wait
430
return false unless preprocess(&block)
432
readers = server_list.map { |s| s.readers }.flatten
433
writers = server_list.map { |s| s.writers }.flatten
435
readers, writers, = IO.select(readers, writers, nil, wait)
438
return postprocess(readers, writers)
444
# Runs the preprocess stage on all servers. Returns false if the block
445
# returns false, and true if there either is no block, or it returns true.
446
# This is called as part of the #process method.
447
def preprocess(&block) #:nodoc:
448
return false if block && !block[self]
449
server_list.each { |server| server.preprocess }
450
block.nil? || block[self]
453
# Runs the postprocess stage on all servers. Always returns true. This is
454
# called as part of the #process method.
455
def postprocess(readers, writers) #:nodoc:
456
server_list.each { |server| server.postprocess(readers, writers) }
460
# Takes the #concurrent_connections property into account, and tries to
461
# return a new session for the given server. If the concurrent connections
462
# limit has been reached, then a Net::SSH::Multi::PendingConnection instance
463
# will be returned instead, which will be realized into an actual session
464
# as soon as a slot opens up.
466
# If +force+ is true, the concurrent_connections check is skipped and a real
467
# connection is always returned.
468
def next_session(server, force=false) #:nodoc:
469
# don't retry a failed attempt
470
return nil if server.failed?
472
@session_mutex.synchronize do
473
if !force && concurrent_connections && concurrent_connections <= open_connections
474
connection = PendingConnection.new(server)
475
@pending_sessions << connection
479
@open_connections += 1
485
# I don't understand why this should be necessary--StandardError is a
486
# subclass of Exception, after all--but without explicitly rescuing
487
# StandardError, things like Errno::* and SocketError don't get caught
489
rescue Exception, StandardError => e
491
@session_mutex.synchronize { @open_connections -= 1 }
497
warn("error connecting to #{server}: #{e.class} (#{e.message})")
499
go = catch(:go) { on_error.call(server); nil }
501
when nil, :ignore then # nothing
502
when :retry then retry
503
when :raise then raise
504
else warn "unknown 'go' command: #{go.inspect}"
514
# Tells the session that the given server has closed its connection. The
515
# session indicates that a new connection slot is available, which may be
516
# filled by the next pending connection on the next event loop iteration.
517
def server_closed(server) #:nodoc:
518
@session_mutex.synchronize do
519
unless @pending_sessions.delete(server.session)
520
@open_connections -= 1
525
# Invoked by the event loop. If there is a concurrent_connections limit in
526
# effect, this will close any non-busy sessions and try to open as many
527
# new sessions as it can. It does this in threads, so that existing processing
530
# If there is no concurrent_connections limit in effect, then this method
532
def realize_pending_connections! #:nodoc:
533
return unless concurrent_connections
535
server_list.each do |server|
536
server.close if !server.busy?(true)
537
server.update_session!
540
@connect_threads.delete_if { |t| !t.alive? }
542
count = concurrent_connections ? (concurrent_connections - open_connections) : @pending_sessions.length
544
session = @pending_sessions.pop or break
545
@connect_threads << Thread.new do
546
session.replace_with(next_session(session.server, true))