~soren/nova/iptables-security-groups

« back to all changes in this revision

Viewing changes to nova/rpc.py

  • Committer: Soren Hansen
  • Date: 2011-01-03 09:56:21 UTC
  • mfrom: (430.2.79 nova)
  • Revision ID: soren@linux2go.dk-20110103095621-qy398qk1uk8o7cy3
Merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
import logging
26
26
import sys
27
27
import time
 
28
import traceback
28
29
import uuid
29
30
 
30
31
from carrot import connection as carrot_connection
31
32
from carrot import messaging
32
33
from eventlet import greenthread
33
 
from twisted.internet import defer
34
 
from twisted.internet import task
35
34
 
 
35
from nova import context
36
36
from nova import exception
37
37
from nova import fakerabbit
38
38
from nova import flags
39
 
from nova import context
 
39
from nova import utils
40
40
 
41
41
 
42
42
FLAGS = flags.FLAGS
91
91
                self.failed_connection = False
92
92
                break
93
93
            except:  # Catching all because carrot sucks
94
 
                logging.exception("AMQP server on %s:%d is unreachable." \
95
 
                    " Trying again in %d seconds." % (
 
94
                logging.exception(_("AMQP server on %s:%d is unreachable."
 
95
                    " Trying again in %d seconds.") % (
96
96
                    FLAGS.rabbit_host,
97
97
                    FLAGS.rabbit_port,
98
98
                    FLAGS.rabbit_retry_interval))
99
99
                self.failed_connection = True
100
100
        if self.failed_connection:
101
 
            logging.exception("Unable to connect to AMQP server" \
102
 
                " after %d tries. Shutting down." % FLAGS.rabbit_max_retries)
 
101
            logging.exception(_("Unable to connect to AMQP server"
 
102
                " after %d tries. Shutting down.") % FLAGS.rabbit_max_retries)
103
103
            sys.exit(1)
104
104
 
105
105
    def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
116
116
                self.declare()
117
117
            super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
118
118
            if self.failed_connection:
119
 
                logging.error("Reconnected to queue")
 
119
                logging.error(_("Reconnected to queue"))
120
120
                self.failed_connection = False
121
121
        # NOTE(vish): This is catching all errors because we really don't
122
122
        #             exceptions to be logged 10 times a second if some
123
123
        #             persistent failure occurs.
124
124
        except Exception:  # pylint: disable-msg=W0703
125
125
            if not self.failed_connection:
126
 
                logging.exception("Failed to fetch message from queue")
 
126
                logging.exception(_("Failed to fetch message from queue"))
127
127
                self.failed_connection = True
128
128
 
129
129
    def attach_to_eventlet(self):
130
130
        """Only needed for unit tests!"""
131
 
        def fetch_repeatedly():
132
 
            while True:
133
 
                self.fetch(enable_callbacks=True)
134
 
                greenthread.sleep(0.1)
135
 
        greenthread.spawn(fetch_repeatedly)
136
 
 
137
 
    def attach_to_twisted(self):
138
 
        """Attach a callback to twisted that fires 10 times a second"""
139
 
        loop = task.LoopingCall(self.fetch, enable_callbacks=True)
140
 
        loop.start(interval=0.1)
141
 
        return loop
 
131
        timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
 
132
        timer.start(0.1)
 
133
        return timer
142
134
 
143
135
 
144
136
class Publisher(messaging.Publisher):
161
153
class AdapterConsumer(TopicConsumer):
162
154
    """Calls methods on a proxy object based on method and args"""
163
155
    def __init__(self, connection=None, topic="broadcast", proxy=None):
164
 
        LOG.debug('Initing the Adapter Consumer for %s' % (topic))
 
156
        LOG.debug(_('Initing the Adapter Consumer for %s') % (topic))
165
157
        self.proxy = proxy
166
158
        super(AdapterConsumer, self).__init__(connection=connection,
167
159
                                              topic=topic)
176
168
 
177
169
        Example: {'method': 'echo', 'args': {'value': 42}}
178
170
        """
179
 
        LOG.debug('received %s' % (message_data))
 
171
        LOG.debug(_('received %s') % (message_data))
180
172
        msg_id = message_data.pop('_msg_id', None)
181
173
 
182
174
        ctxt = _unpack_context(message_data)
189
181
            #             messages stay in the queue indefinitely, so for now
190
182
            #             we just log the message and send an error string
191
183
            #             back to the caller
192
 
            LOG.warn('no method for message: %s' % (message_data))
193
 
            msg_reply(msg_id, 'No method for message: %s' % message_data)
 
184
            LOG.warn(_('no method for message: %s') % (message_data))
 
185
            msg_reply(msg_id, _('No method for message: %s') % message_data)
194
186
            return
195
187
 
196
188
        node_func = getattr(self.proxy, str(method))
197
189
        node_args = dict((str(k), v) for k, v in args.iteritems())
198
190
        # NOTE(vish): magic is fun!
199
 
        # pylint: disable-msg=W0142
200
 
        d = defer.maybeDeferred(node_func, context=ctxt, **node_args)
201
 
        if msg_id:
202
 
            d.addCallback(lambda rval: msg_reply(msg_id, rval, None))
203
 
            d.addErrback(lambda e: msg_reply(msg_id, None, e))
 
191
        try:
 
192
            rval = node_func(context=ctxt, **node_args)
 
193
            if msg_id:
 
194
                msg_reply(msg_id, rval, None)
 
195
        except Exception as e:
 
196
            if msg_id:
 
197
                msg_reply(msg_id, None, sys.exc_info())
204
198
        return
205
199
 
206
200
 
242
236
def msg_reply(msg_id, reply=None, failure=None):
243
237
    """Sends a reply or an error on the channel signified by msg_id
244
238
 
245
 
    failure should be a twisted failure object"""
 
239
    failure should be a sys.exc_info() tuple.
 
240
 
 
241
    """
246
242
    if failure:
247
 
        message = failure.getErrorMessage()
248
 
        traceback = failure.getTraceback()
249
 
        logging.error("Returning exception %s to caller", message)
250
 
        logging.error(traceback)
251
 
        failure = (failure.type.__name__, str(failure.value), traceback)
252
 
    conn = Connection.instance()
 
243
        message = str(failure[1])
 
244
        tb = traceback.format_exception(*failure)
 
245
        logging.error(_("Returning exception %s to caller"), message)
 
246
        logging.error(tb)
 
247
        failure = (failure[0].__name__, str(failure[1]), tb)
 
248
    conn = Connection.instance(True)
253
249
    publisher = DirectPublisher(connection=conn, msg_id=msg_id)
254
250
    try:
255
251
        publisher.send({'result': reply, 'failure': failure})
287
283
        if key.startswith('_context_'):
288
284
            value = msg.pop(key)
289
285
            context_dict[key[9:]] = value
290
 
    LOG.debug('unpacked context: %s', context_dict)
 
286
    LOG.debug(_('unpacked context: %s'), context_dict)
291
287
    return context.RequestContext.from_dict(context_dict)
292
288
 
293
289
 
306
302
 
307
303
def call(context, topic, msg):
308
304
    """Sends a message on a topic and wait for a response"""
309
 
    LOG.debug("Making asynchronous call...")
 
305
    LOG.debug(_("Making asynchronous call..."))
310
306
    msg_id = uuid.uuid4().hex
311
307
    msg.update({'_msg_id': msg_id})
312
 
    LOG.debug("MSG_ID is %s" % (msg_id))
 
308
    LOG.debug(_("MSG_ID is %s") % (msg_id))
313
309
    _pack_context(msg, context)
314
310
 
315
311
    class WaitMessage(object):
316
 
 
317
312
        def __call__(self, data, message):
318
313
            """Acks message and sets result."""
319
314
            message.ack()
337
332
    except StopIteration:
338
333
        pass
339
334
    consumer.close()
 
335
    # NOTE(termie): this is a little bit of a change from the original
 
336
    #               non-eventlet code where returning a Failure
 
337
    #               instance from a deferred call is very similar to
 
338
    #               raising an exception
 
339
    if isinstance(wait_msg.result, Exception):
 
340
        raise wait_msg.result
340
341
    return wait_msg.result
341
342
 
342
343
 
343
 
def call_twisted(context, topic, msg):
344
 
    """Sends a message on a topic and wait for a response"""
345
 
    LOG.debug("Making asynchronous call...")
346
 
    msg_id = uuid.uuid4().hex
347
 
    msg.update({'_msg_id': msg_id})
348
 
    LOG.debug("MSG_ID is %s" % (msg_id))
349
 
    _pack_context(msg, context)
350
 
 
351
 
    conn = Connection.instance()
352
 
    d = defer.Deferred()
353
 
    consumer = DirectConsumer(connection=conn, msg_id=msg_id)
354
 
 
355
 
    def deferred_receive(data, message):
356
 
        """Acks message and callbacks or errbacks"""
357
 
        message.ack()
358
 
        if data['failure']:
359
 
            return d.errback(RemoteError(*data['failure']))
360
 
        else:
361
 
            return d.callback(data['result'])
362
 
 
363
 
    consumer.register_callback(deferred_receive)
364
 
    injected = consumer.attach_to_twisted()
365
 
 
366
 
    # clean up after the injected listened and return x
367
 
    d.addCallback(lambda x: injected.stop() and x or x)
368
 
 
369
 
    publisher = TopicPublisher(connection=conn, topic=topic)
370
 
    publisher.send(msg)
371
 
    publisher.close()
372
 
    return d
373
 
 
374
 
 
375
344
def cast(context, topic, msg):
376
345
    """Sends a message on a topic without waiting for a response"""
377
346
    LOG.debug("Making asynchronous cast...")
384
353
 
385
354
def generic_response(message_data, message):
386
355
    """Logs a result and exits"""
387
 
    LOG.debug('response %s', message_data)
 
356
    LOG.debug(_('response %s'), message_data)
388
357
    message.ack()
389
358
    sys.exit(0)
390
359
 
393
362
    """Sends a message for testing"""
394
363
    msg_id = uuid.uuid4().hex
395
364
    message.update({'_msg_id': msg_id})
396
 
    LOG.debug('topic is %s', topic)
397
 
    LOG.debug('message %s', message)
 
365
    LOG.debug(_('topic is %s'), topic)
 
366
    LOG.debug(_('message %s'), message)
398
367
 
399
368
    if wait:
400
369
        consumer = messaging.Consumer(connection=Connection.instance(),