30
31
from carrot import connection as carrot_connection
31
32
from carrot import messaging
32
33
from eventlet import greenthread
33
from twisted.internet import defer
34
from twisted.internet import task
35
from nova import context
36
36
from nova import exception
37
37
from nova import fakerabbit
38
38
from nova import flags
39
from nova import context
39
from nova import utils
42
42
FLAGS = flags.FLAGS
91
91
self.failed_connection = False
93
93
except: # Catching all because carrot sucks
94
logging.exception("AMQP server on %s:%d is unreachable." \
95
" Trying again in %d seconds." % (
94
logging.exception(_("AMQP server on %s:%d is unreachable."
95
" Trying again in %d seconds.") % (
98
98
FLAGS.rabbit_retry_interval))
99
99
self.failed_connection = True
100
100
if self.failed_connection:
101
logging.exception("Unable to connect to AMQP server" \
102
" after %d tries. Shutting down." % FLAGS.rabbit_max_retries)
101
logging.exception(_("Unable to connect to AMQP server"
102
" after %d tries. Shutting down.") % FLAGS.rabbit_max_retries)
105
105
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
117
117
super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
118
118
if self.failed_connection:
119
logging.error("Reconnected to queue")
119
logging.error(_("Reconnected to queue"))
120
120
self.failed_connection = False
121
121
# NOTE(vish): This is catching all errors because we really don't
122
122
# exceptions to be logged 10 times a second if some
123
123
# persistent failure occurs.
124
124
except Exception: # pylint: disable-msg=W0703
125
125
if not self.failed_connection:
126
logging.exception("Failed to fetch message from queue")
126
logging.exception(_("Failed to fetch message from queue"))
127
127
self.failed_connection = True
129
129
def attach_to_eventlet(self):
130
130
"""Only needed for unit tests!"""
131
def fetch_repeatedly():
133
self.fetch(enable_callbacks=True)
134
greenthread.sleep(0.1)
135
greenthread.spawn(fetch_repeatedly)
137
def attach_to_twisted(self):
138
"""Attach a callback to twisted that fires 10 times a second"""
139
loop = task.LoopingCall(self.fetch, enable_callbacks=True)
140
loop.start(interval=0.1)
131
timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
144
136
class Publisher(messaging.Publisher):
161
153
class AdapterConsumer(TopicConsumer):
162
154
"""Calls methods on a proxy object based on method and args"""
163
155
def __init__(self, connection=None, topic="broadcast", proxy=None):
164
LOG.debug('Initing the Adapter Consumer for %s' % (topic))
156
LOG.debug(_('Initing the Adapter Consumer for %s') % (topic))
165
157
self.proxy = proxy
166
158
super(AdapterConsumer, self).__init__(connection=connection,
177
169
Example: {'method': 'echo', 'args': {'value': 42}}
179
LOG.debug('received %s' % (message_data))
171
LOG.debug(_('received %s') % (message_data))
180
172
msg_id = message_data.pop('_msg_id', None)
182
174
ctxt = _unpack_context(message_data)
189
181
# messages stay in the queue indefinitely, so for now
190
182
# we just log the message and send an error string
191
183
# back to the caller
192
LOG.warn('no method for message: %s' % (message_data))
193
msg_reply(msg_id, 'No method for message: %s' % message_data)
184
LOG.warn(_('no method for message: %s') % (message_data))
185
msg_reply(msg_id, _('No method for message: %s') % message_data)
196
188
node_func = getattr(self.proxy, str(method))
197
189
node_args = dict((str(k), v) for k, v in args.iteritems())
198
190
# NOTE(vish): magic is fun!
199
# pylint: disable-msg=W0142
200
d = defer.maybeDeferred(node_func, context=ctxt, **node_args)
202
d.addCallback(lambda rval: msg_reply(msg_id, rval, None))
203
d.addErrback(lambda e: msg_reply(msg_id, None, e))
192
rval = node_func(context=ctxt, **node_args)
194
msg_reply(msg_id, rval, None)
195
except Exception as e:
197
msg_reply(msg_id, None, sys.exc_info())
242
236
def msg_reply(msg_id, reply=None, failure=None):
243
237
"""Sends a reply or an error on the channel signified by msg_id
245
failure should be a twisted failure object"""
239
failure should be a sys.exc_info() tuple.
247
message = failure.getErrorMessage()
248
traceback = failure.getTraceback()
249
logging.error("Returning exception %s to caller", message)
250
logging.error(traceback)
251
failure = (failure.type.__name__, str(failure.value), traceback)
252
conn = Connection.instance()
243
message = str(failure[1])
244
tb = traceback.format_exception(*failure)
245
logging.error(_("Returning exception %s to caller"), message)
247
failure = (failure[0].__name__, str(failure[1]), tb)
248
conn = Connection.instance(True)
253
249
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
255
251
publisher.send({'result': reply, 'failure': failure})
287
283
if key.startswith('_context_'):
288
284
value = msg.pop(key)
289
285
context_dict[key[9:]] = value
290
LOG.debug('unpacked context: %s', context_dict)
286
LOG.debug(_('unpacked context: %s'), context_dict)
291
287
return context.RequestContext.from_dict(context_dict)
307
303
def call(context, topic, msg):
308
304
"""Sends a message on a topic and wait for a response"""
309
LOG.debug("Making asynchronous call...")
305
LOG.debug(_("Making asynchronous call..."))
310
306
msg_id = uuid.uuid4().hex
311
307
msg.update({'_msg_id': msg_id})
312
LOG.debug("MSG_ID is %s" % (msg_id))
308
LOG.debug(_("MSG_ID is %s") % (msg_id))
313
309
_pack_context(msg, context)
315
311
class WaitMessage(object):
317
312
def __call__(self, data, message):
318
313
"""Acks message and sets result."""
337
332
except StopIteration:
335
# NOTE(termie): this is a little bit of a change from the original
336
# non-eventlet code where returning a Failure
337
# instance from a deferred call is very similar to
338
# raising an exception
339
if isinstance(wait_msg.result, Exception):
340
raise wait_msg.result
340
341
return wait_msg.result
343
def call_twisted(context, topic, msg):
344
"""Sends a message on a topic and wait for a response"""
345
LOG.debug("Making asynchronous call...")
346
msg_id = uuid.uuid4().hex
347
msg.update({'_msg_id': msg_id})
348
LOG.debug("MSG_ID is %s" % (msg_id))
349
_pack_context(msg, context)
351
conn = Connection.instance()
353
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
355
def deferred_receive(data, message):
356
"""Acks message and callbacks or errbacks"""
359
return d.errback(RemoteError(*data['failure']))
361
return d.callback(data['result'])
363
consumer.register_callback(deferred_receive)
364
injected = consumer.attach_to_twisted()
366
# clean up after the injected listened and return x
367
d.addCallback(lambda x: injected.stop() and x or x)
369
publisher = TopicPublisher(connection=conn, topic=topic)
375
344
def cast(context, topic, msg):
376
345
"""Sends a message on a topic without waiting for a response"""
377
346
LOG.debug("Making asynchronous cast...")
385
354
def generic_response(message_data, message):
386
355
"""Logs a result and exits"""
387
LOG.debug('response %s', message_data)
356
LOG.debug(_('response %s'), message_data)
393
362
"""Sends a message for testing"""
394
363
msg_id = uuid.uuid4().hex
395
364
message.update({'_msg_id': msg_id})
396
LOG.debug('topic is %s', topic)
397
LOG.debug('message %s', message)
365
LOG.debug(_('topic is %s'), topic)
366
LOG.debug(_('message %s'), message)
400
369
consumer = messaging.Consumer(connection=Connection.instance(),