40
43
from nova import utils
46
LOG = logging.getLogger('nova.rpc')
43
49
FLAGS = flags.FLAGS
44
LOG = logging.getLogger('nova.rpc')
46
50
flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool')
49
53
class Connection(carrot_connection.BrokerConnection):
50
"""Connection instance object"""
54
"""Connection instance object."""
52
57
def instance(cls, new=True):
53
"""Returns the instance"""
58
"""Returns the instance."""
54
59
if new or not hasattr(cls, '_instance'):
55
60
params = dict(hostname=FLAGS.rabbit_host,
56
61
port=FLAGS.rabbit_port,
74
"""Recreates the connection instance
76
This is necessary to recover from some network errors/disconnects"""
79
"""Recreates the connection instance.
81
This is necessary to recover from some network errors/disconnects.
79
86
except AttributeError, e:
86
93
class Consumer(messaging.Consumer):
87
"""Consumer base class
89
Contains methods for connecting the fetch method to async loops
94
"""Consumer base class.
96
Contains methods for connecting the fetch method to async loops.
91
100
def __init__(self, *args, **kwargs):
92
101
for i in xrange(FLAGS.rabbit_max_retries):
100
109
fl_host = FLAGS.rabbit_host
101
110
fl_port = FLAGS.rabbit_port
102
111
fl_intv = FLAGS.rabbit_retry_interval
103
LOG.error(_("AMQP server on %(fl_host)s:%(fl_port)d is"
104
" unreachable: %(e)s. Trying again in %(fl_intv)d"
112
LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
113
' unreachable: %(e)s. Trying again in %(fl_intv)d'
114
' seconds.') % locals())
107
115
self.failed_connection = True
108
116
if self.failed_connection:
109
LOG.error(_("Unable to connect to AMQP server "
110
"after %d tries. Shutting down."),
117
LOG.error(_('Unable to connect to AMQP server '
118
'after %d tries. Shutting down.'),
111
119
FLAGS.rabbit_max_retries)
114
122
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
115
"""Wraps the parent fetch with some logic for failed connections"""
123
"""Wraps the parent fetch with some logic for failed connection."""
116
124
# TODO(vish): the logic for failed connections and logging should be
117
125
# refactored into some sort of connection manager object
126
134
super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
127
135
if self.failed_connection:
128
LOG.error(_("Reconnected to queue"))
136
LOG.error(_('Reconnected to queue'))
129
137
self.failed_connection = False
130
138
# NOTE(vish): This is catching all errors because we really don't
131
139
# want exceptions to be logged 10 times a second if some
132
140
# persistent failure occurs.
133
141
except Exception, e: # pylint: disable=W0703
134
142
if not self.failed_connection:
135
LOG.exception(_("Failed to fetch message from queue: %s" % e))
143
LOG.exception(_('Failed to fetch message from queue: %s' % e))
136
144
self.failed_connection = True
138
146
def attach_to_eventlet(self):
145
153
class AdapterConsumer(Consumer):
146
"""Calls methods on a proxy object based on method and args"""
147
def __init__(self, connection=None, topic="broadcast", proxy=None):
154
"""Calls methods on a proxy object based on method and args."""
156
def __init__(self, connection=None, topic='broadcast', proxy=None):
148
157
LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
149
158
self.proxy = proxy
150
159
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
157
166
@exception.wrap_exception
158
167
def _receive(self, message_data, message):
159
"""Magically looks for a method on the proxy object and calls it
168
"""Magically looks for a method on the proxy object and calls it.
161
170
Message data should be a dictionary with two keys:
162
171
method: string representing the method to call
163
172
args: dictionary of arg: value
165
174
Example: {'method': 'echo', 'args': {'value': 42}}
167
177
LOG.debug(_('received %s') % message_data)
168
178
msg_id = message_data.pop('_msg_id', None)
190
200
msg_reply(msg_id, rval, None)
191
201
except Exception as e:
192
logging.exception("Exception during message handling")
202
logging.exception('Exception during message handling')
194
204
msg_reply(msg_id, None, sys.exc_info())
198
208
class Publisher(messaging.Publisher):
199
"""Publisher base class"""
209
"""Publisher base class."""
203
213
class TopicAdapterConsumer(AdapterConsumer):
204
"""Consumes messages on a specific topic"""
205
exchange_type = "topic"
207
def __init__(self, connection=None, topic="broadcast", proxy=None):
214
"""Consumes messages on a specific topic."""
216
exchange_type = 'topic'
218
def __init__(self, connection=None, topic='broadcast', proxy=None):
208
219
self.queue = topic
209
220
self.routing_key = topic
210
221
self.exchange = FLAGS.control_exchange
216
227
class FanoutAdapterConsumer(AdapterConsumer):
217
"""Consumes messages from a fanout exchange"""
218
exchange_type = "fanout"
220
def __init__(self, connection=None, topic="broadcast", proxy=None):
221
self.exchange = "%s_fanout" % topic
228
"""Consumes messages from a fanout exchange."""
230
exchange_type = 'fanout'
232
def __init__(self, connection=None, topic='broadcast', proxy=None):
233
self.exchange = '%s_fanout' % topic
222
234
self.routing_key = topic
223
235
unique = uuid.uuid4().hex
224
self.queue = "%s_fanout_%s" % (topic, unique)
236
self.queue = '%s_fanout_%s' % (topic, unique)
225
237
self.durable = False
226
LOG.info(_("Created '%(exchange)s' fanout exchange "
227
"with '%(key)s' routing key"),
228
dict(exchange=self.exchange, key=self.routing_key))
238
LOG.info(_('Created "%(exchange)s" fanout exchange '
239
'with "%(key)s" routing key'),
240
dict(exchange=self.exchange, key=self.routing_key))
229
241
super(FanoutAdapterConsumer, self).__init__(connection=connection,
230
242
topic=topic, proxy=proxy)
233
245
class TopicPublisher(Publisher):
234
"""Publishes messages on a specific topic"""
235
exchange_type = "topic"
237
def __init__(self, connection=None, topic="broadcast"):
246
"""Publishes messages on a specific topic."""
248
exchange_type = 'topic'
250
def __init__(self, connection=None, topic='broadcast'):
238
251
self.routing_key = topic
239
252
self.exchange = FLAGS.control_exchange
240
253
self.durable = False
244
257
class FanoutPublisher(Publisher):
245
258
"""Publishes messages to a fanout exchange."""
246
exchange_type = "fanout"
260
exchange_type = 'fanout'
248
262
def __init__(self, topic, connection=None):
249
self.exchange = "%s_fanout" % topic
250
self.queue = "%s_fanout" % topic
263
self.exchange = '%s_fanout' % topic
264
self.queue = '%s_fanout' % topic
251
265
self.durable = False
252
LOG.info(_("Creating '%(exchange)s' fanout exchange"),
253
dict(exchange=self.exchange))
266
LOG.info(_('Creating "%(exchange)s" fanout exchange'),
267
dict(exchange=self.exchange))
254
268
super(FanoutPublisher, self).__init__(connection=connection)
257
271
class DirectConsumer(Consumer):
258
"""Consumes messages directly on a channel specified by msg_id"""
259
exchange_type = "direct"
272
"""Consumes messages directly on a channel specified by msg_id."""
274
exchange_type = 'direct'
261
276
def __init__(self, connection=None, msg_id=None):
262
277
self.queue = msg_id
270
285
class DirectPublisher(Publisher):
271
"""Publishes messages directly on a channel specified by msg_id"""
272
exchange_type = "direct"
286
"""Publishes messages directly on a channel specified by msg_id."""
288
exchange_type = 'direct'
274
290
def __init__(self, connection=None, msg_id=None):
275
291
self.routing_key = msg_id
281
297
def msg_reply(msg_id, reply=None, failure=None):
282
"""Sends a reply or an error on the channel signified by msg_id
298
"""Sends a reply or an error on the channel signified by msg_id.
284
failure should be a sys.exc_info() tuple.
300
Failure should be a sys.exc_info() tuple.
305
321
class RemoteError(exception.Error):
306
"""Signifies that a remote class has raised an exception
322
"""Signifies that a remote class has raised an exception.
308
324
Containes a string representation of the type of the original exception,
309
325
the value of the original exception, and the traceback. These are
310
326
sent to the parent as a joined string so printing the exception
311
contains all of the relevent info."""
327
contains all of the relevent info.
312
331
def __init__(self, exc_type, value, traceback):
313
332
self.exc_type = exc_type
314
333
self.value = value
315
334
self.traceback = traceback
316
super(RemoteError, self).__init__("%s %s\n%s" % (exc_type,
335
super(RemoteError, self).__init__('%s %s\n%s' % (exc_type,
339
358
context out into a bunch of separate keys. If we want to support
340
359
more arguments in rabbit messages, we may want to do the same
341
360
for args at some point.
343
363
context = dict([('_context_%s' % key, value)
344
364
for (key, value) in context.to_dict().iteritems()])
348
368
def call(context, topic, msg):
349
"""Sends a message on a topic and wait for a response"""
350
LOG.debug(_("Making asynchronous call on %s ..."), topic)
369
"""Sends a message on a topic and wait for a response."""
370
LOG.debug(_('Making asynchronous call on %s ...'), topic)
351
371
msg_id = uuid.uuid4().hex
352
372
msg.update({'_msg_id': msg_id})
353
LOG.debug(_("MSG_ID is %s") % (msg_id))
373
LOG.debug(_('MSG_ID is %s') % (msg_id))
354
374
_pack_context(msg, context)
356
376
class WaitMessage(object):
389
409
def cast(context, topic, msg):
390
"""Sends a message on a topic without waiting for a response"""
391
LOG.debug(_("Making asynchronous cast on %s..."), topic)
410
"""Sends a message on a topic without waiting for a response."""
411
LOG.debug(_('Making asynchronous cast on %s...'), topic)
392
412
_pack_context(msg, context)
393
413
conn = Connection.instance()
394
414
publisher = TopicPublisher(connection=conn, topic=topic)
399
419
def fanout_cast(context, topic, msg):
400
"""Sends a message on a fanout exchange without waiting for a response"""
401
LOG.debug(_("Making asynchronous fanout cast..."))
420
"""Sends a message on a fanout exchange without waiting for a response."""
421
LOG.debug(_('Making asynchronous fanout cast...'))
402
422
_pack_context(msg, context)
403
423
conn = Connection.instance()
404
424
publisher = FanoutPublisher(topic, connection=conn)
409
429
def generic_response(message_data, message):
410
"""Logs a result and exits"""
430
"""Logs a result and exits."""
411
431
LOG.debug(_('response %s'), message_data)
416
436
def send_message(topic, message, wait=True):
417
"""Sends a message for testing"""
437
"""Sends a message for testing."""
418
438
msg_id = uuid.uuid4().hex
419
439
message.update({'_msg_id': msg_id})
420
440
LOG.debug(_('topic is %s'), topic)
444
if __name__ == "__main__":
445
# NOTE(vish): you can send messages from the command line using
446
# topic and a json sting representing a dictionary
464
if __name__ == '__main__':
465
# You can send messages from the command line using
466
# topic and a json string representing a dictionary
448
468
send_message(sys.argv[1], json.loads(sys.argv[2]))