~ubuntu-branches/ubuntu/raring/nova/raring-proposed

« back to all changes in this revision

Viewing changes to nova/openstack/common/rpc/amqp.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, James Page
  • Date: 2013-03-20 12:59:22 UTC
  • mfrom: (1.1.69)
  • Revision ID: package-import@ubuntu.com-20130320125922-ohvfav96lemn9wlz
Tags: 1:2013.1~rc1-0ubuntu1
[ Chuck Short ]
* New upstream release.
* debian/patches/avoid_setuptools_git_dependency.patch: Refreshed.
* debian/control: Clean up dependencies:
  - Dropped python-gflags no longer needed.
  - Dropped python-daemon no longer needed.
  - Dropped python-glance no longer needed.
  - Dropped python-lockfile no longer needed.
  - Dropped python-simplejson no longer needed.
  - Dropped python-tempita no longer needed.
  - Dropped python-xattr no longer needed.
  - Add sqlite3 required for the testsuite.

[ James Page ]
* d/watch: Update uversionmangle to deal with upstream versioning
  changes, remove tarballs.openstack.org. 

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
AMQP, but is deprecated and predates this code.
26
26
"""
27
27
 
 
28
import collections
28
29
import inspect
29
30
import sys
30
31
import uuid
31
32
 
32
33
from eventlet import greenpool
33
34
from eventlet import pools
 
35
from eventlet import queue
34
36
from eventlet import semaphore
35
 
from eventlet import queue
36
 
 
37
37
# TODO(pekowsk): Remove import cfg and below comment in Havana.
38
38
# This import should no longer be needed when the amqp_rpc_single_reply_queue
39
39
# option is removed.
40
40
from oslo.config import cfg
 
41
 
41
42
from nova.openstack.common import excutils
42
43
from nova.openstack.common.gettextutils import _
43
44
from nova.openstack.common import local
44
45
from nova.openstack.common import log as logging
45
46
from nova.openstack.common.rpc import common as rpc_common
46
47
 
 
48
 
47
49
# TODO(pekowski): Remove this option in Havana.
48
50
amqp_opts = [
49
51
    cfg.BoolOpt('amqp_rpc_single_reply_queue',
54
56
 
55
57
cfg.CONF.register_opts(amqp_opts)
56
58
 
 
59
UNIQUE_ID = '_unique_id'
57
60
LOG = logging.getLogger(__name__)
58
61
 
59
62
 
236
239
                   'failure': failure}
237
240
        if ending:
238
241
            msg['ending'] = True
 
242
        _add_unique_id(msg)
239
243
        # If a reply_q exists, add the msg_id to the reply and pass the
240
244
        # reply_q to direct_send() to use it as the response queue.
241
245
        # Otherwise use the msg_id for backward compatibilty.
302
306
    msg.update(context_d)
303
307
 
304
308
 
 
309
class _MsgIdCache(object):
 
310
    """This class checks any duplicate messages."""
 
311
 
 
312
    # NOTE: This value is considered can be a configuration item, but
 
313
    #       it is not necessary to change its value in most cases,
 
314
    #       so let this value as static for now.
 
315
    DUP_MSG_CHECK_SIZE = 16
 
316
 
 
317
    def __init__(self, **kwargs):
 
318
        self.prev_msgids = collections.deque([],
 
319
                                             maxlen=self.DUP_MSG_CHECK_SIZE)
 
320
 
 
321
    def check_duplicate_message(self, message_data):
 
322
        """AMQP consumers may read same message twice when exceptions occur
 
323
           before ack is returned. This method prevents doing it.
 
324
        """
 
325
        if UNIQUE_ID in message_data:
 
326
            msg_id = message_data[UNIQUE_ID]
 
327
            if msg_id not in self.prev_msgids:
 
328
                self.prev_msgids.append(msg_id)
 
329
            else:
 
330
                raise rpc_common.DuplicateMessageError(msg_id=msg_id)
 
331
 
 
332
 
 
333
def _add_unique_id(msg):
 
334
    """Add unique_id for checking duplicate messages."""
 
335
    unique_id = uuid.uuid4().hex
 
336
    msg.update({UNIQUE_ID: unique_id})
 
337
    LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
 
338
 
 
339
 
305
340
class _ThreadPoolWithWait(object):
306
341
    """Base class for a delayed invocation manager used by
307
342
    the Connection class to start up green threads
349
384
            connection_pool=connection_pool,
350
385
        )
351
386
        self.proxy = proxy
 
387
        self.msg_id_cache = _MsgIdCache()
352
388
 
353
389
    def __call__(self, message_data):
354
390
        """Consumer callback to call a method on a proxy object.
368
404
        if hasattr(local.store, 'context'):
369
405
            del local.store.context
370
406
        rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
 
407
        self.msg_id_cache.check_duplicate_message(message_data)
371
408
        ctxt = unpack_context(self.conf, message_data)
372
409
        method = message_data.get('method')
373
410
        args = message_data.get('args', {})
406
443
                       connection_pool=self.connection_pool,
407
444
                       log_failure=False)
408
445
        except Exception:
409
 
            LOG.exception(_('Exception during message handling'))
410
 
            ctxt.reply(None, sys.exc_info(),
411
 
                       connection_pool=self.connection_pool)
 
446
            # sys.exc_info() is deleted by LOG.exception().
 
447
            exc_info = sys.exc_info()
 
448
            LOG.error(_('Exception during message handling'),
 
449
                      exc_info=exc_info)
 
450
            ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
412
451
 
413
452
 
414
453
class MulticallProxyWaiter(object):
422
461
        self._dataqueue = queue.LightQueue()
423
462
        # Add this caller to the reply proxy's call_waiters
424
463
        self._reply_proxy.add_call_waiter(self, self._msg_id)
 
464
        self.msg_id_cache = _MsgIdCache()
425
465
 
426
466
    def put(self, data):
427
467
        self._dataqueue.put(data)
435
475
 
436
476
    def _process_data(self, data):
437
477
        result = None
 
478
        self.msg_id_cache.check_duplicate_message(data)
438
479
        if data['failure']:
439
480
            failure = data['failure']
440
481
            result = rpc_common.deserialize_remote_exception(self._conf,
479
520
        self._done = False
480
521
        self._got_ending = False
481
522
        self._conf = conf
 
523
        self.msg_id_cache = _MsgIdCache()
482
524
 
483
525
    def done(self):
484
526
        if self._done:
490
532
 
491
533
    def __call__(self, data):
492
534
        """The consume() callback will call this.  Store the result."""
 
535
        self.msg_id_cache.check_duplicate_message(data)
493
536
        if data['failure']:
494
537
            failure = data['failure']
495
538
            self._result = rpc_common.deserialize_remote_exception(self._conf,
542
585
    msg_id = uuid.uuid4().hex
543
586
    msg.update({'_msg_id': msg_id})
544
587
    LOG.debug(_('MSG_ID is %s') % (msg_id))
 
588
    _add_unique_id(msg)
545
589
    pack_context(msg, context)
546
590
 
547
591
    # TODO(pekowski): Remove this flag and the code under the if clause
575
619
def cast(conf, context, topic, msg, connection_pool):
576
620
    """Sends a message on a topic without waiting for a response."""
577
621
    LOG.debug(_('Making asynchronous cast on %s...'), topic)
 
622
    _add_unique_id(msg)
578
623
    pack_context(msg, context)
579
624
    with ConnectionContext(conf, connection_pool) as conn:
580
625
        conn.topic_send(topic, rpc_common.serialize_msg(msg))
583
628
def fanout_cast(conf, context, topic, msg, connection_pool):
584
629
    """Sends a message on a fanout exchange without waiting for a response."""
585
630
    LOG.debug(_('Making asynchronous fanout cast...'))
 
631
    _add_unique_id(msg)
586
632
    pack_context(msg, context)
587
633
    with ConnectionContext(conf, connection_pool) as conn:
588
634
        conn.fanout_send(topic, rpc_common.serialize_msg(msg))
590
636
 
591
637
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
592
638
    """Sends a message on a topic to a specific server."""
 
639
    _add_unique_id(msg)
593
640
    pack_context(msg, context)
594
641
    with ConnectionContext(conf, connection_pool, pooled=False,
595
642
                           server_params=server_params) as conn:
599
646
def fanout_cast_to_server(conf, context, server_params, topic, msg,
600
647
                          connection_pool):
601
648
    """Sends a message on a fanout exchange to a specific server."""
 
649
    _add_unique_id(msg)
602
650
    pack_context(msg, context)
603
651
    with ConnectionContext(conf, connection_pool, pooled=False,
604
652
                           server_params=server_params) as conn:
610
658
    LOG.debug(_('Sending %(event_type)s on %(topic)s'),
611
659
              dict(event_type=msg.get('event_type'),
612
660
                   topic=topic))
 
661
    _add_unique_id(msg)
613
662
    pack_context(msg, context)
614
663
    with ConnectionContext(conf, connection_pool) as conn:
615
664
        if envelope: