~annegentle/nova/extdoc

« back to all changes in this revision

Viewing changes to nova/rpc.py

  • Committer: termie
  • Date: 2011-04-20 19:08:22 UTC
  • mto: This revision was merged to the branch mainline in revision 1013.
  • Revision ID: github@anarkystic.com-20110420190822-x5v1sr3x9uy822hl
docstring cleanup, nova dir

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
#    License for the specific language governing permissions and limitations
17
17
#    under the License.
18
18
 
19
 
"""
20
 
AMQP-based RPC. Queues have consumers and publishers.
 
19
"""AMQP-based RPC.
 
20
 
 
21
Queues have consumers and publishers.
 
22
 
21
23
No fan-out support yet.
 
24
 
22
25
"""
23
26
 
24
27
import json
40
43
from nova import utils
41
44
 
42
45
 
 
46
LOG = logging.getLogger('nova.rpc')
 
47
 
 
48
 
43
49
FLAGS = flags.FLAGS
44
 
LOG = logging.getLogger('nova.rpc')
45
 
 
46
50
flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool')
47
51
 
48
52
 
49
53
class Connection(carrot_connection.BrokerConnection):
50
 
    """Connection instance object"""
 
54
    """Connection instance object."""
 
55
 
51
56
    @classmethod
52
57
    def instance(cls, new=True):
53
 
        """Returns the instance"""
 
58
        """Returns the instance."""
54
59
        if new or not hasattr(cls, '_instance'):
55
60
            params = dict(hostname=FLAGS.rabbit_host,
56
61
                          port=FLAGS.rabbit_port,
71
76
 
72
77
    @classmethod
73
78
    def recreate(cls):
74
 
        """Recreates the connection instance
75
 
 
76
 
        This is necessary to recover from some network errors/disconnects"""
 
79
        """Recreates the connection instance.
 
80
 
 
81
        This is necessary to recover from some network errors/disconnects.
 
82
 
 
83
        """
77
84
        try:
78
85
            del cls._instance
79
86
        except AttributeError, e:
84
91
 
85
92
 
86
93
class Consumer(messaging.Consumer):
87
 
    """Consumer base class
88
 
 
89
 
    Contains methods for connecting the fetch method to async loops
 
94
    """Consumer base class.
 
95
 
 
96
    Contains methods for connecting the fetch method to async loops.
 
97
 
90
98
    """
 
99
 
91
100
    def __init__(self, *args, **kwargs):
92
101
        for i in xrange(FLAGS.rabbit_max_retries):
93
102
            if i > 0:
100
109
                fl_host = FLAGS.rabbit_host
101
110
                fl_port = FLAGS.rabbit_port
102
111
                fl_intv = FLAGS.rabbit_retry_interval
103
 
                LOG.error(_("AMQP server on %(fl_host)s:%(fl_port)d is"
104
 
                        " unreachable: %(e)s. Trying again in %(fl_intv)d"
105
 
                        " seconds.")
106
 
                        % locals())
 
112
                LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
 
113
                            ' unreachable: %(e)s. Trying again in %(fl_intv)d'
 
114
                            ' seconds.') % locals())
107
115
                self.failed_connection = True
108
116
        if self.failed_connection:
109
 
            LOG.error(_("Unable to connect to AMQP server "
110
 
                        "after %d tries. Shutting down."),
 
117
            LOG.error(_('Unable to connect to AMQP server '
 
118
                        'after %d tries. Shutting down.'),
111
119
                      FLAGS.rabbit_max_retries)
112
120
            sys.exit(1)
113
121
 
114
122
    def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
115
 
        """Wraps the parent fetch with some logic for failed connections"""
 
123
        """Wraps the parent fetch with some logic for failed connection."""
116
124
        # TODO(vish): the logic for failed connections and logging should be
117
125
        #             refactored into some sort of connection manager object
118
126
        try:
125
133
                self.declare()
126
134
            super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
127
135
            if self.failed_connection:
128
 
                LOG.error(_("Reconnected to queue"))
 
136
                LOG.error(_('Reconnected to queue'))
129
137
                self.failed_connection = False
130
138
        # NOTE(vish): This is catching all errors because we really don't
131
139
        #             want exceptions to be logged 10 times a second if some
132
140
        #             persistent failure occurs.
133
141
        except Exception, e:  # pylint: disable=W0703
134
142
            if not self.failed_connection:
135
 
                LOG.exception(_("Failed to fetch message from queue: %s" % e))
 
143
                LOG.exception(_('Failed to fetch message from queue: %s' % e))
136
144
                self.failed_connection = True
137
145
 
138
146
    def attach_to_eventlet(self):
143
151
 
144
152
 
145
153
class AdapterConsumer(Consumer):
146
 
    """Calls methods on a proxy object based on method and args"""
147
 
    def __init__(self, connection=None, topic="broadcast", proxy=None):
 
154
    """Calls methods on a proxy object based on method and args."""
 
155
 
 
156
    def __init__(self, connection=None, topic='broadcast', proxy=None):
148
157
        LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
149
158
        self.proxy = proxy
150
159
        self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
156
165
 
157
166
    @exception.wrap_exception
158
167
    def _receive(self, message_data, message):
159
 
        """Magically looks for a method on the proxy object and calls it
 
168
        """Magically looks for a method on the proxy object and calls it.
160
169
 
161
170
        Message data should be a dictionary with two keys:
162
171
            method: string representing the method to call
163
172
            args: dictionary of arg: value
164
173
 
165
174
        Example: {'method': 'echo', 'args': {'value': 42}}
 
175
 
166
176
        """
167
177
        LOG.debug(_('received %s') % message_data)
168
178
        msg_id = message_data.pop('_msg_id', None)
189
199
            if msg_id:
190
200
                msg_reply(msg_id, rval, None)
191
201
        except Exception as e:
192
 
            logging.exception("Exception during message handling")
 
202
            logging.exception('Exception during message handling')
193
203
            if msg_id:
194
204
                msg_reply(msg_id, None, sys.exc_info())
195
205
        return
196
206
 
197
207
 
198
208
class Publisher(messaging.Publisher):
199
 
    """Publisher base class"""
 
209
    """Publisher base class."""
200
210
    pass
201
211
 
202
212
 
203
213
class TopicAdapterConsumer(AdapterConsumer):
204
 
    """Consumes messages on a specific topic"""
205
 
    exchange_type = "topic"
206
 
 
207
 
    def __init__(self, connection=None, topic="broadcast", proxy=None):
 
214
    """Consumes messages on a specific topic."""
 
215
 
 
216
    exchange_type = 'topic'
 
217
 
 
218
    def __init__(self, connection=None, topic='broadcast', proxy=None):
208
219
        self.queue = topic
209
220
        self.routing_key = topic
210
221
        self.exchange = FLAGS.control_exchange
214
225
 
215
226
 
216
227
class FanoutAdapterConsumer(AdapterConsumer):
217
 
    """Consumes messages from a fanout exchange"""
218
 
    exchange_type = "fanout"
219
 
 
220
 
    def __init__(self, connection=None, topic="broadcast", proxy=None):
221
 
        self.exchange = "%s_fanout" % topic
 
228
    """Consumes messages from a fanout exchange."""
 
229
 
 
230
    exchange_type = 'fanout'
 
231
 
 
232
    def __init__(self, connection=None, topic='broadcast', proxy=None):
 
233
        self.exchange = '%s_fanout' % topic
222
234
        self.routing_key = topic
223
235
        unique = uuid.uuid4().hex
224
 
        self.queue = "%s_fanout_%s" % (topic, unique)
 
236
        self.queue = '%s_fanout_%s' % (topic, unique)
225
237
        self.durable = False
226
 
        LOG.info(_("Created '%(exchange)s' fanout exchange "
227
 
                   "with '%(key)s' routing key"),
228
 
                dict(exchange=self.exchange, key=self.routing_key))
 
238
        LOG.info(_('Created "%(exchange)s" fanout exchange '
 
239
                   'with "%(key)s" routing key'),
 
240
                 dict(exchange=self.exchange, key=self.routing_key))
229
241
        super(FanoutAdapterConsumer, self).__init__(connection=connection,
230
242
                                    topic=topic, proxy=proxy)
231
243
 
232
244
 
233
245
class TopicPublisher(Publisher):
234
 
    """Publishes messages on a specific topic"""
235
 
    exchange_type = "topic"
236
 
 
237
 
    def __init__(self, connection=None, topic="broadcast"):
 
246
    """Publishes messages on a specific topic."""
 
247
 
 
248
    exchange_type = 'topic'
 
249
 
 
250
    def __init__(self, connection=None, topic='broadcast'):
238
251
        self.routing_key = topic
239
252
        self.exchange = FLAGS.control_exchange
240
253
        self.durable = False
243
256
 
244
257
class FanoutPublisher(Publisher):
245
258
    """Publishes messages to a fanout exchange."""
246
 
    exchange_type = "fanout"
 
259
 
 
260
    exchange_type = 'fanout'
247
261
 
248
262
    def __init__(self, topic, connection=None):
249
 
        self.exchange = "%s_fanout" % topic
250
 
        self.queue = "%s_fanout" % topic
 
263
        self.exchange = '%s_fanout' % topic
 
264
        self.queue = '%s_fanout' % topic
251
265
        self.durable = False
252
 
        LOG.info(_("Creating '%(exchange)s' fanout exchange"),
253
 
                            dict(exchange=self.exchange))
 
266
        LOG.info(_('Creating "%(exchange)s" fanout exchange'),
 
267
                 dict(exchange=self.exchange))
254
268
        super(FanoutPublisher, self).__init__(connection=connection)
255
269
 
256
270
 
257
271
class DirectConsumer(Consumer):
258
 
    """Consumes messages directly on a channel specified by msg_id"""
259
 
    exchange_type = "direct"
 
272
    """Consumes messages directly on a channel specified by msg_id."""
 
273
 
 
274
    exchange_type = 'direct'
260
275
 
261
276
    def __init__(self, connection=None, msg_id=None):
262
277
        self.queue = msg_id
268
283
 
269
284
 
270
285
class DirectPublisher(Publisher):
271
 
    """Publishes messages directly on a channel specified by msg_id"""
272
 
    exchange_type = "direct"
 
286
    """Publishes messages directly on a channel specified by msg_id."""
 
287
 
 
288
    exchange_type = 'direct'
273
289
 
274
290
    def __init__(self, connection=None, msg_id=None):
275
291
        self.routing_key = msg_id
279
295
 
280
296
 
281
297
def msg_reply(msg_id, reply=None, failure=None):
282
 
    """Sends a reply or an error on the channel signified by msg_id
 
298
    """Sends a reply or an error on the channel signified by msg_id.
283
299
 
284
 
    failure should be a sys.exc_info() tuple.
 
300
    Failure should be a sys.exc_info() tuple.
285
301
 
286
302
    """
287
303
    if failure:
303
319
 
304
320
 
305
321
class RemoteError(exception.Error):
306
 
    """Signifies that a remote class has raised an exception
 
322
    """Signifies that a remote class has raised an exception.
307
323
 
308
324
    Containes a string representation of the type of the original exception,
309
325
    the value of the original exception, and the traceback.  These are
310
326
    sent to the parent as a joined string so printing the exception
311
 
    contains all of the relevent info."""
 
327
    contains all of the relevent info.
 
328
 
 
329
    """
 
330
 
312
331
    def __init__(self, exc_type, value, traceback):
313
332
        self.exc_type = exc_type
314
333
        self.value = value
315
334
        self.traceback = traceback
316
 
        super(RemoteError, self).__init__("%s %s\n%s" % (exc_type,
 
335
        super(RemoteError, self).__init__('%s %s\n%s' % (exc_type,
317
336
                                                         value,
318
337
                                                         traceback))
319
338
 
339
358
    context out into a bunch of separate keys. If we want to support
340
359
    more arguments in rabbit messages, we may want to do the same
341
360
    for args at some point.
 
361
 
342
362
    """
343
363
    context = dict([('_context_%s' % key, value)
344
364
                   for (key, value) in context.to_dict().iteritems()])
346
366
 
347
367
 
348
368
def call(context, topic, msg):
349
 
    """Sends a message on a topic and wait for a response"""
350
 
    LOG.debug(_("Making asynchronous call on %s ..."), topic)
 
369
    """Sends a message on a topic and wait for a response."""
 
370
    LOG.debug(_('Making asynchronous call on %s ...'), topic)
351
371
    msg_id = uuid.uuid4().hex
352
372
    msg.update({'_msg_id': msg_id})
353
 
    LOG.debug(_("MSG_ID is %s") % (msg_id))
 
373
    LOG.debug(_('MSG_ID is %s') % (msg_id))
354
374
    _pack_context(msg, context)
355
375
 
356
376
    class WaitMessage(object):
387
407
 
388
408
 
389
409
def cast(context, topic, msg):
390
 
    """Sends a message on a topic without waiting for a response"""
391
 
    LOG.debug(_("Making asynchronous cast on %s..."), topic)
 
410
    """Sends a message on a topic without waiting for a response."""
 
411
    LOG.debug(_('Making asynchronous cast on %s...'), topic)
392
412
    _pack_context(msg, context)
393
413
    conn = Connection.instance()
394
414
    publisher = TopicPublisher(connection=conn, topic=topic)
397
417
 
398
418
 
399
419
def fanout_cast(context, topic, msg):
400
 
    """Sends a message on a fanout exchange without waiting for a response"""
401
 
    LOG.debug(_("Making asynchronous fanout cast..."))
 
420
    """Sends a message on a fanout exchange without waiting for a response."""
 
421
    LOG.debug(_('Making asynchronous fanout cast...'))
402
422
    _pack_context(msg, context)
403
423
    conn = Connection.instance()
404
424
    publisher = FanoutPublisher(topic, connection=conn)
407
427
 
408
428
 
409
429
def generic_response(message_data, message):
410
 
    """Logs a result and exits"""
 
430
    """Logs a result and exits."""
411
431
    LOG.debug(_('response %s'), message_data)
412
432
    message.ack()
413
433
    sys.exit(0)
414
434
 
415
435
 
416
436
def send_message(topic, message, wait=True):
417
 
    """Sends a message for testing"""
 
437
    """Sends a message for testing."""
418
438
    msg_id = uuid.uuid4().hex
419
439
    message.update({'_msg_id': msg_id})
420
440
    LOG.debug(_('topic is %s'), topic)
425
445
                                      queue=msg_id,
426
446
                                      exchange=msg_id,
427
447
                                      auto_delete=True,
428
 
                                      exchange_type="direct",
 
448
                                      exchange_type='direct',
429
449
                                      routing_key=msg_id)
430
450
        consumer.register_callback(generic_response)
431
451
 
432
452
    publisher = messaging.Publisher(connection=Connection.instance(),
433
453
                                    exchange=FLAGS.control_exchange,
434
454
                                    durable=False,
435
 
                                    exchange_type="topic",
 
455
                                    exchange_type='topic',
436
456
                                    routing_key=topic)
437
457
    publisher.send(message)
438
458
    publisher.close()
441
461
        consumer.wait()
442
462
 
443
463
 
444
 
if __name__ == "__main__":
445
 
    # NOTE(vish): you can send messages from the command line using
446
 
    #             topic and a json sting representing a dictionary
447
 
    #             for the method
 
464
if __name__ == '__main__':
 
465
    # You can send messages from the command line using
 
466
    # topic and a json string representing a dictionary
 
467
    # for the method
448
468
    send_message(sys.argv[1], json.loads(sys.argv[2]))