~ubuntu-branches/ubuntu/vivid/ruby-net-ssh-multi/vivid

« back to all changes in this revision

Viewing changes to lib/net/ssh/multi/session.rb

  • Committer: Bazaar Package Importer
  • Author(s): Lucas Nussbaum
  • Date: 2011-04-16 09:56:28 UTC
  • Revision ID: james.westby@ubuntu.com-20110416095628-fqa3s532t1y89siq
Tags: upstream-1.1
ImportĀ upstreamĀ versionĀ 1.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
require 'thread'
 
2
require 'net/ssh/gateway'
 
3
require 'net/ssh/multi/server'
 
4
require 'net/ssh/multi/dynamic_server'
 
5
require 'net/ssh/multi/server_list'
 
6
require 'net/ssh/multi/channel'
 
7
require 'net/ssh/multi/pending_connection'
 
8
require 'net/ssh/multi/session_actions'
 
9
require 'net/ssh/multi/subsession'
 
10
 
 
11
module Net; module SSH; module Multi
 
12
  # Represents a collection of connections to various servers. It provides an
 
13
  # interface for organizing the connections (#group), as well as a way to
 
14
  # scope commands to a subset of all connections (#with). You can also provide
 
15
  # a default gateway connection that servers should use when connecting
 
16
  # (#via). It exposes an interface similar to Net::SSH::Connection::Session
 
17
  # for opening SSH channels and executing commands, allowing for these
 
18
  # operations to be done in parallel across multiple connections.
 
19
  #
 
20
  #   Net::SSH::Multi.start do |session|
 
21
  #     # access servers via a gateway
 
22
  #     session.via 'gateway', 'gateway-user'
 
23
  # 
 
24
  #     # define the servers we want to use
 
25
  #     session.use 'user1@host1'
 
26
  #     session.use 'user2@host2'
 
27
  # 
 
28
  #     # define servers in groups for more granular access
 
29
  #     session.group :app do
 
30
  #       session.use 'user@app1'
 
31
  #       session.use 'user@app2'
 
32
  #     end
 
33
  # 
 
34
  #     # execute commands on all servers
 
35
  #     session.exec "uptime"
 
36
  # 
 
37
  #     # execute commands on a subset of servers
 
38
  #     session.with(:app).exec "hostname"
 
39
  # 
 
40
  #     # run the aggregated event loop
 
41
  #     session.loop
 
42
  #   end
 
43
  #
 
44
  # Note that connections are established lazily, as soon as they are needed.
 
45
  # You can force the connections to be opened immediately, though, using the
 
46
  # #connect! method.
 
47
  #
 
48
  # == Concurrent Connection Limiting
 
49
  #
 
50
  # Sometimes you may be dealing with a large number of servers, and if you
 
51
  # try to have connections open to all of them simultaneously you'll run into
 
52
  # open file handle limitations and such. If this happens to you, you can set
 
53
  # the #concurrent_connections property of the session. Net::SSH::Multi will
 
54
  # then ensure that no more than this number of connections are ever open
 
55
  # simultaneously.
 
56
  #
 
57
  #   Net::SSH::Multi.start(:concurrent_connections => 5) do |session|
 
58
  #     # ...
 
59
  #   end
 
60
  #
 
61
  # Opening channels and executing commands will still work exactly as before,
 
62
  # but Net::SSH::Multi will transparently close finished connections and open
 
63
  # pending ones.
 
64
  #
 
65
  # == Controlling Connection Errors
 
66
  #
 
67
  # By default, Net::SSH::Multi will raise an exception if a connection error
 
68
  # occurs when connecting to a server. This will typically bubble up and abort
 
69
  # the entire connection process. Sometimes, however, you might wish to ignore
 
70
  # connection errors, for instance when starting a daemon on a large number of
 
71
  # boxes and you know that some of the boxes are going to be unavailable.
 
72
  #
 
73
  # To do this, simply set the #on_error property of the session to :ignore
 
74
  # (or to :warn, if you want a warning message when a connection attempt
 
75
  # fails):
 
76
  #
 
77
  #   Net::SSH::Multi.start(:on_error => :ignore) do |session|
 
78
  #     # ...
 
79
  #   end
 
80
  #
 
81
  # The default is :fail, which causes the exception to bubble up. Additionally,
 
82
  # you can specify a Proc object as the value for #on_error, which will be
 
83
  # invoked with the server in question if the connection attempt fails. You
 
84
  # can force the connection attempt to retry by throwing the :go symbol, with
 
85
  # :retry as the payload, or force the exception to be reraised by throwing
 
86
  # :go with :raise as the payload:
 
87
  #
 
88
  #   handler = Proc.new do |server|
 
89
  #     server[:connection_attempts] ||= 0
 
90
  #     if server[:connection_attempts] < 3
 
91
  #       server[:connection_attempts] += 1
 
92
  #       throw :go, :retry
 
93
  #     else
 
94
  #       throw :go, :raise
 
95
  #     end
 
96
  #   end
 
97
  #
 
98
  #   Net::SSH::Multi.start(:on_error => handler) do |session|
 
99
  #     # ...
 
100
  #   end
 
101
  #
 
102
  # Any other thrown value (or no thrown value at all) will result in the
 
103
  # failure being ignored.
 
104
  #
 
105
  # == Lazily Evaluated Server Definitions
 
106
  #
 
107
  # Sometimes you might be dealing with an environment where you don't know the
 
108
  # names or addresses of the servers until runtime. You can certainly dynamically
 
109
  # build server names and pass them to #use, but if the operation to determine
 
110
  # the server names is expensive, you might want to defer it until the server
 
111
  # is actually needed (especially if the logic of your program is such that
 
112
  # you might not even need to connect to that server every time the program
 
113
  # runs).
 
114
  #
 
115
  # You can do this by passing a block to #use:
 
116
  #
 
117
  #   session.use do |opt|
 
118
  #     lookup_ip_address_of_remote_host
 
119
  #   end
 
120
  #
 
121
  # See #use for more information about this usage.
 
122
  class Session
 
123
    include SessionActions
 
124
 
 
125
    # The Net::SSH::Multi::ServerList managed by this session.
 
126
    attr_reader :server_list
 
127
 
 
128
    # The default Net::SSH::Gateway instance to use to connect to the servers.
 
129
    # If +nil+, no default gateway will be used.
 
130
    attr_reader :default_gateway
 
131
 
 
132
    # The hash of group definitions, mapping each group name to a corresponding
 
133
    # Net::SSH::Multi::ServerList.
 
134
    attr_reader :groups
 
135
 
 
136
    # The number of allowed concurrent connections. No more than this number
 
137
    # of sessions will be open at any given time.
 
138
    attr_accessor :concurrent_connections
 
139
 
 
140
    # How connection errors should be handled. This defaults to :fail, but
 
141
    # may be set to :ignore if connection errors should be ignored, or
 
142
    # :warn if connection errors should cause a warning.
 
143
    attr_accessor :on_error
 
144
 
 
145
    # The default user name to use when connecting to a server. If a user name
 
146
    # is not given for a particular server, this value will be used. It defaults
 
147
    # to ENV['USER'] || ENV['USERNAME'], or "unknown" if neither of those are
 
148
    # set.
 
149
    attr_accessor :default_user
 
150
 
 
151
    # The number of connections that are currently open.
 
152
    attr_reader :open_connections #:nodoc:
 
153
 
 
154
    # The list of "open" groups, which will receive subsequent server definitions.
 
155
    # See #use and #group.
 
156
    attr_reader :open_groups #:nodoc:
 
157
 
 
158
    # Creates a new Net::SSH::Multi::Session instance. Initially, it contains
 
159
    # no server definitions, no group definitions, and no default gateway.
 
160
    #
 
161
    # You can set the #concurrent_connections property in the options. Setting
 
162
    # it to +nil+ (the default) will cause Net::SSH::Multi to ignore any
 
163
    # concurrent connection limit and allow all defined sessions to be open
 
164
    # simultaneously. Setting it to an integer will cause Net::SSH::Multi to
 
165
    # allow no more than that number of concurrently open sessions, opening
 
166
    # subsequent sessions only when other sessions finish and close.
 
167
    #
 
168
    #   Net::SSH::Multi.start(:concurrent_connections => 10) do |session|
 
169
    #     session.use ...
 
170
    #   end
 
171
    def initialize(options={})
 
172
      @server_list = ServerList.new
 
173
      @groups = Hash.new { |h,k| h[k] = ServerList.new }
 
174
      @gateway = nil
 
175
      @open_groups = []
 
176
      @connect_threads = []
 
177
      @on_error = :fail
 
178
      @default_user = ENV['USER'] || ENV['USERNAME'] || "unknown"
 
179
 
 
180
      @open_connections = 0
 
181
      @pending_sessions = []
 
182
      @session_mutex = Mutex.new
 
183
 
 
184
      options.each { |opt, value| send("#{opt}=", value) }
 
185
    end
 
186
 
 
187
    # At its simplest, this associates a named group with a server definition.
 
188
    # It can be used in either of two ways:
 
189
    #
 
190
    # First, you can use it to associate a group (or array of groups) with a
 
191
    # server definition (or array of server definitions). The server definitions
 
192
    # must already exist in the #server_list array (typically by calling #use):
 
193
    #
 
194
    #   server1 = session.use('host1', 'user1')
 
195
    #   server2 = session.use('host2', 'user2')
 
196
    #   session.group :app => server1, :web => server2
 
197
    #   session.group :staging => [server1, server2]
 
198
    #   session.group %w(xen linux) => server2
 
199
    #   session.group %w(rackspace backup) => [server1, server2]
 
200
    #
 
201
    # Secondly, instead of a mapping of groups to servers, you can just
 
202
    # provide a list of group names, and then a block. Inside the block, any
 
203
    # calls to #use will automatically associate the new server definition with
 
204
    # those groups. You can nest #group calls, too, which will aggregate the
 
205
    # group definitions.
 
206
    #
 
207
    #   session.group :rackspace, :backup do
 
208
    #     session.use 'host1', 'user1'
 
209
    #     session.group :xen do
 
210
    #       session.use 'host2', 'user2'
 
211
    #     end
 
212
    #   end
 
213
    def group(*args)
 
214
      mapping = args.last.is_a?(Hash) ? args.pop : {}
 
215
 
 
216
      if mapping.any? && block_given?
 
217
        raise ArgumentError, "must provide group mapping OR block, not both"
 
218
      elsif block_given?
 
219
        begin
 
220
          saved_groups = open_groups.dup
 
221
          open_groups.concat(args.map { |a| a.to_sym }).uniq!
 
222
          yield self
 
223
        ensure
 
224
          open_groups.replace(saved_groups)
 
225
        end
 
226
      else
 
227
        mapping.each do |key, value|
 
228
          (open_groups + Array(key)).uniq.each do |grp|
 
229
            groups[grp.to_sym].concat(Array(value))
 
230
          end
 
231
        end
 
232
      end
 
233
    end
 
234
 
 
235
    # Sets up a default gateway to use when establishing connections to servers.
 
236
    # Note that any servers defined prior to this invocation will not use the
 
237
    # default gateway; it only affects servers defined subsequently.
 
238
    #
 
239
    #   session.via 'gateway.host', 'user'
 
240
    #
 
241
    # You may override the default gateway on a per-server basis by passing the
 
242
    # :via key to the #use method; see #use for details.
 
243
    def via(host, user, options={})
 
244
      @default_gateway = Net::SSH::Gateway.new(host, user, options)
 
245
      self
 
246
    end
 
247
 
 
248
    # Defines a new server definition, to be managed by this session. The
 
249
    # server is at the given +host+, and will be connected to as the given
 
250
    # +user+. The other options are passed as-is to the Net::SSH session
 
251
    # constructor.
 
252
    #
 
253
    # If a default gateway has been specified previously (with #via) it will
 
254
    # be passed to the new server definition. You can override this by passing
 
255
    # a different Net::SSH::Gateway instance (or +nil+) with the :via key in
 
256
    # the +options+.
 
257
    #
 
258
    #   session.use 'host'
 
259
    #   session.use 'user@host2', :via => nil
 
260
    #   session.use 'host3', :user => "user3", :via => Net::SSH::Gateway.new('gateway.host', 'user')
 
261
    #
 
262
    # If only a single host is given, the new server instance is returned. You
 
263
    # can give multiple hosts at a time, though, in which case an array of
 
264
    # server instances will be returned.
 
265
    #
 
266
    #   server1, server2 = session.use "host1", "host2"
 
267
    #
 
268
    # If given a block, this will save the block as a Net::SSH::Multi::DynamicServer
 
269
    # definition, to be evaluated lazily the first time the server is needed.
 
270
    # The block will recive any options hash given to #use, and should return
 
271
    # +nil+ (if no servers are to be added), a String or an array of Strings
 
272
    # (to be interpreted as a connection specification), or a Server or an
 
273
    # array of Servers.
 
274
    def use(*hosts, &block)
 
275
      options = hosts.last.is_a?(Hash) ? hosts.pop : {}
 
276
      options = { :via => default_gateway }.merge(options)
 
277
 
 
278
      results = hosts.map do |host|
 
279
        server_list.add(Server.new(self, host, options))
 
280
      end
 
281
 
 
282
      if block
 
283
        results << server_list.add(DynamicServer.new(self, options, block))
 
284
      end
 
285
 
 
286
      group [] => results
 
287
      results.length > 1 ? results : results.first
 
288
    end
 
289
 
 
290
    # Essentially an alias for #servers_for without any arguments. This is used
 
291
    # primarily to satistfy the expectations of the Net::SSH::Multi::SessionActions
 
292
    # module.
 
293
    def servers
 
294
      servers_for
 
295
    end
 
296
 
 
297
    # Returns the set of servers that match the given criteria. It can be used
 
298
    # in any (or all) of three ways.
 
299
    #
 
300
    # First, you can omit any arguments. In this case, the full list of servers
 
301
    # will be returned.
 
302
    #
 
303
    #   all = session.servers_for
 
304
    #
 
305
    # Second, you can simply specify a list of group names. All servers in all
 
306
    # named groups will be returned. If a server belongs to multiple matching
 
307
    # groups, then it will appear only once in the list (the resulting list
 
308
    # will contain only unique servers).
 
309
    #
 
310
    #   servers = session.servers_for(:app, :db)
 
311
    #
 
312
    # Last, you can specify a hash with group names as keys, and property
 
313
    # constraints as the values. These property constraints are either "only"
 
314
    # constraints (which restrict the set of servers to "only" those that match
 
315
    # the given properties) or "except" constraints (which restrict the set of
 
316
    # servers to those whose properties do _not_ match). Properties are described
 
317
    # when the server is defined (via the :properties key):
 
318
    #
 
319
    #   session.group :db do
 
320
    #     session.use 'dbmain', 'user', :properties => { :primary => true }
 
321
    #     session.use 'dbslave', 'user2'
 
322
    #     session.use 'dbslve2', 'user2'
 
323
    #   end
 
324
    #
 
325
    #   # return ONLY on the servers in the :db group which have the :primary
 
326
    #   # property set to true.
 
327
    #   primary = session.servers_for(:db => { :only => { :primary => true } })
 
328
    #
 
329
    # You can, naturally, combine these methods:
 
330
    #
 
331
    #   # all servers in :app and :web, and all servers in :db with the
 
332
    #   # :primary property set to true
 
333
    #   servers = session.servers_for(:app, :web, :db => { :only => { :primary => true } })
 
334
    def servers_for(*criteria)
 
335
      if criteria.empty?
 
336
        server_list.flatten
 
337
      else
 
338
        # normalize the criteria list, so that every entry is a key to a
 
339
        # criteria hash (possibly empty).
 
340
        criteria = criteria.inject({}) do |hash, entry|
 
341
          case entry
 
342
          when Hash then hash.merge(entry)
 
343
          else hash.merge(entry => {})
 
344
          end
 
345
        end
 
346
 
 
347
        list = criteria.inject([]) do |aggregator, (group, properties)|
 
348
          raise ArgumentError, "the value for any group must be a Hash, but got a #{properties.class} for #{group.inspect}" unless properties.is_a?(Hash)
 
349
          bad_keys = properties.keys - [:only, :except]
 
350
          raise ArgumentError, "unknown constraint(s) #{bad_keys.inspect} for #{group.inspect}" unless bad_keys.empty?
 
351
 
 
352
          servers = groups[group].select do |server|
 
353
            (properties[:only] || {}).all? { |prop, value| server[prop] == value } &&
 
354
            !(properties[:except] || {}).any? { |prop, value| server[prop] == value }
 
355
          end
 
356
 
 
357
          aggregator.concat(servers)
 
358
        end
 
359
 
 
360
        list.uniq
 
361
      end
 
362
    end
 
363
 
 
364
    # Returns a new Net::SSH::Multi::Subsession instance consisting of the
 
365
    # servers that meet the given criteria. If a block is given, the
 
366
    # subsession will be yielded to it. See #servers_for for a discussion of
 
367
    # how these criteria are interpreted.
 
368
    #
 
369
    #   session.with(:app).exec('hostname')
 
370
    #
 
371
    #   session.with(:app, :db => { :primary => true }) do |s|
 
372
    #     s.exec 'date'
 
373
    #     s.exec 'uptime'
 
374
    #   end
 
375
    def with(*groups)
 
376
      subsession = Subsession.new(self, servers_for(*groups))
 
377
      yield subsession if block_given?
 
378
      subsession
 
379
    end
 
380
 
 
381
    # Works as #with, but for specific servers rather than groups. It will
 
382
    # return a new subsession (Net::SSH::Multi::Subsession) consisting of
 
383
    # the given servers. (Note that it requires that the servers in question
 
384
    # have been created via calls to #use on this session object, or things
 
385
    # will not work quite right.) If a block is given, the new subsession
 
386
    # will also be yielded to the block.
 
387
    #
 
388
    #   srv1 = session.use('host1', 'user')
 
389
    #   srv2 = session.use('host2', 'user')
 
390
    #   # ...
 
391
    #   session.on(srv1, srv2).exec('hostname')
 
392
    def on(*servers)
 
393
      subsession = Subsession.new(self, servers)
 
394
      yield subsession if block_given?
 
395
      subsession
 
396
    end
 
397
 
 
398
    # Closes the multi-session by shutting down all open server sessions, and
 
399
    # the default gateway (if one was specified using #via). Note that other
 
400
    # gateway connections (e.g., those passed to #use directly) will _not_ be
 
401
    # closed by this method, and must be managed externally.
 
402
    def close
 
403
      server_list.each { |server| server.close_channels }
 
404
      loop(0) { busy?(true) }
 
405
      server_list.each { |server| server.close }
 
406
      default_gateway.shutdown! if default_gateway
 
407
    end
 
408
 
 
409
    alias :loop_forever :loop
 
410
 
 
411
    # Run the aggregated event loop for all open server sessions, until the given
 
412
    # block returns +false+. If no block is given, the loop will run for as
 
413
    # long as #busy? returns +true+ (in other words, for as long as there are
 
414
    # any (non-invisible) channels open).
 
415
    def loop(wait=nil, &block)
 
416
      running = block || Proc.new { |c| busy? }
 
417
      loop_forever { break unless process(wait, &running) }
 
418
    end
 
419
 
 
420
    # Run a single iteration of the aggregated event loop for all open server
 
421
    # sessions. The +wait+ parameter indicates how long to wait for an event
 
422
    # to appear on any of the different sessions; +nil+ (the default) means
 
423
    # "wait forever". If the block is given, then it will be used to determine
 
424
    # whether #process returns +true+ (the block did not return +false+), or
 
425
    # +false+ (the block returned +false+).
 
426
    def process(wait=nil, &block)
 
427
      realize_pending_connections!
 
428
      wait = @connect_threads.any? ? 0 : wait
 
429
 
 
430
      return false unless preprocess(&block)
 
431
 
 
432
      readers = server_list.map { |s| s.readers }.flatten
 
433
      writers = server_list.map { |s| s.writers }.flatten
 
434
 
 
435
      readers, writers, = IO.select(readers, writers, nil, wait)
 
436
 
 
437
      if readers
 
438
        return postprocess(readers, writers)
 
439
      else
 
440
        return true
 
441
      end
 
442
    end
 
443
 
 
444
    # Runs the preprocess stage on all servers. Returns false if the block
 
445
    # returns false, and true if there either is no block, or it returns true.
 
446
    # This is called as part of the #process method.
 
447
    def preprocess(&block) #:nodoc:
 
448
      return false if block && !block[self]
 
449
      server_list.each { |server| server.preprocess }
 
450
      block.nil? || block[self]
 
451
    end
 
452
 
 
453
    # Runs the postprocess stage on all servers. Always returns true. This is
 
454
    # called as part of the #process method.
 
455
    def postprocess(readers, writers) #:nodoc:
 
456
      server_list.each { |server| server.postprocess(readers, writers) }
 
457
      true
 
458
    end
 
459
 
 
460
    # Takes the #concurrent_connections property into account, and tries to
 
461
    # return a new session for the given server. If the concurrent connections
 
462
    # limit has been reached, then a Net::SSH::Multi::PendingConnection instance
 
463
    # will be returned instead, which will be realized into an actual session
 
464
    # as soon as a slot opens up.
 
465
    #
 
466
    # If +force+ is true, the concurrent_connections check is skipped and a real
 
467
    # connection is always returned.
 
468
    def next_session(server, force=false) #:nodoc:
 
469
      # don't retry a failed attempt
 
470
      return nil if server.failed?
 
471
 
 
472
      @session_mutex.synchronize do
 
473
        if !force && concurrent_connections && concurrent_connections <= open_connections
 
474
          connection = PendingConnection.new(server)
 
475
          @pending_sessions << connection
 
476
          return connection
 
477
        end
 
478
 
 
479
        @open_connections += 1
 
480
      end
 
481
 
 
482
      begin
 
483
        server.new_session
 
484
 
 
485
      # I don't understand why this should be necessary--StandardError is a
 
486
      # subclass of Exception, after all--but without explicitly rescuing
 
487
      # StandardError, things like Errno::* and SocketError don't get caught
 
488
      # here!
 
489
      rescue Exception, StandardError => e
 
490
        server.fail!
 
491
        @session_mutex.synchronize { @open_connections -= 1 }
 
492
 
 
493
        case on_error
 
494
        when :ignore then
 
495
          # do nothing
 
496
        when :warn then
 
497
          warn("error connecting to #{server}: #{e.class} (#{e.message})")
 
498
        when Proc then
 
499
          go = catch(:go) { on_error.call(server); nil }
 
500
          case go
 
501
          when nil, :ignore then # nothing
 
502
          when :retry then retry
 
503
          when :raise then raise
 
504
          else warn "unknown 'go' command: #{go.inspect}"
 
505
          end
 
506
        else
 
507
          raise
 
508
        end
 
509
 
 
510
        return nil
 
511
      end
 
512
    end
 
513
 
 
514
    # Tells the session that the given server has closed its connection. The
 
515
    # session indicates that a new connection slot is available, which may be
 
516
    # filled by the next pending connection on the next event loop iteration.
 
517
    def server_closed(server) #:nodoc:
 
518
      @session_mutex.synchronize do
 
519
        unless @pending_sessions.delete(server.session)
 
520
          @open_connections -= 1
 
521
        end
 
522
      end
 
523
    end
 
524
 
 
525
    # Invoked by the event loop. If there is a concurrent_connections limit in
 
526
    # effect, this will close any non-busy sessions and try to open as many
 
527
    # new sessions as it can. It does this in threads, so that existing processing
 
528
    # can continue.
 
529
    #
 
530
    # If there is no concurrent_connections limit in effect, then this method
 
531
    # does nothing.
 
532
    def realize_pending_connections! #:nodoc:
 
533
      return unless concurrent_connections
 
534
 
 
535
      server_list.each do |server|
 
536
        server.close if !server.busy?(true)
 
537
        server.update_session!
 
538
      end
 
539
 
 
540
      @connect_threads.delete_if { |t| !t.alive? }
 
541
 
 
542
      count = concurrent_connections ? (concurrent_connections - open_connections) : @pending_sessions.length
 
543
      count.times do
 
544
        session = @pending_sessions.pop or break
 
545
        @connect_threads << Thread.new do
 
546
          session.replace_with(next_session(session.server, true))
 
547
        end
 
548
      end
 
549
    end
 
550
  end
 
551
end; end; end