~ubuntu-branches/ubuntu/raring/nova/raring-proposed

« back to all changes in this revision

Viewing changes to nova/openstack/common/rpc/impl_zmq.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, James Page
  • Date: 2013-03-20 12:59:22 UTC
  • mfrom: (1.1.69)
  • Revision ID: package-import@ubuntu.com-20130320125922-ohvfav96lemn9wlz
Tags: 1:2013.1~rc1-0ubuntu1
[ Chuck Short ]
* New upstream release.
* debian/patches/avoid_setuptools_git_dependency.patch: Refreshed.
* debian/control: Clean up dependencies:
  - Dropped python-gflags no longer needed.
  - Dropped python-daemon no longer needed.
  - Dropped python-glance no longer needed.
  - Dropped python-lockfile no longer needed.
  - Dropped python-simplejson no longer needed.
  - Dropped python-tempita no longer needed.
  - Dropped python-xattr no longer needed.
  - Add sqlite3 required for the testsuite.

[ James Page ]
* d/watch: Update uversionmangle to deal with upstream versioning
  changes, remove tarballs.openstack.org. 

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
import greenlet
26
26
from oslo.config import cfg
27
27
 
 
28
from nova.openstack.common import excutils
28
29
from nova.openstack.common.gettextutils import _
29
30
from nova.openstack.common import importutils
30
31
from nova.openstack.common import jsonutils
91
92
    try:
92
93
        return jsonutils.dumps(data, ensure_ascii=True)
93
94
    except TypeError:
94
 
        LOG.error(_("JSON serialization failed."))
95
 
        raise
 
95
        with excutils.save_and_reraise_exception():
 
96
            LOG.error(_("JSON serialization failed."))
96
97
 
97
98
 
98
99
def _deserialize(data):
216
217
            socket_type = zmq.PUSH
217
218
        self.outq = ZmqSocket(addr, socket_type, bind=bind)
218
219
 
219
 
    def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
 
220
    def cast(self, msg_id, topic, data, envelope=False):
220
221
        msg_id = msg_id or 0
221
222
 
222
 
        if serialize:
223
 
            data = rpc_common.serialize_msg(data, force_envelope)
224
 
        self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
 
223
        if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
 
224
            self.outq.send(map(bytes,
 
225
                           (msg_id, topic, 'cast', _serialize(data))))
 
226
            return
 
227
 
 
228
        rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
 
229
        zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
 
230
        self.outq.send(map(bytes,
 
231
                       (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
225
232
 
226
233
    def close(self):
227
234
        self.outq.close()
320
327
        else:
321
328
            return [result]
322
329
 
323
 
    def process(self, style, target, proxy, ctx, data):
 
330
    def process(self, proxy, ctx, data):
324
331
        data.setdefault('version', None)
325
332
        data.setdefault('args', {})
326
333
 
432
439
 
433
440
        #TODO(ewindisch): use zero-copy (i.e. references, not copying)
434
441
        data = sock.recv()
435
 
        msg_id, topic, style, in_msg = data
436
 
        topic = topic.split('.', 1)[0]
 
442
        topic = data[1]
437
443
 
438
444
        LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
439
445
 
440
 
        if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
 
446
        if topic.startswith('fanout~'):
 
447
            sock_type = zmq.PUB
 
448
            topic = topic.split('.', 1)[0]
 
449
        elif topic.startswith('zmq_replies'):
441
450
            sock_type = zmq.PUB
442
451
        else:
443
452
            sock_type = zmq.PUSH
503
512
                              ipc_dir, run_as_root=True)
504
513
                utils.execute('chmod', '750', ipc_dir, run_as_root=True)
505
514
            except utils.ProcessExecutionError:
506
 
                LOG.error(_("Could not create IPC directory %s") %
507
 
                          (ipc_dir, ))
508
 
                raise
 
515
                with excutils.save_and_reraise_exception():
 
516
                    LOG.error(_("Could not create IPC directory %s") %
 
517
                              (ipc_dir, ))
509
518
 
510
519
        try:
511
520
            self.register(consumption_proxy,
513
522
                          zmq.PULL,
514
523
                          out_bind=True)
515
524
        except zmq.ZMQError:
516
 
            LOG.error(_("Could not create ZeroMQ receiver daemon. "
517
 
                        "Socket may already be in use."))
518
 
            raise
 
525
            with excutils.save_and_reraise_exception():
 
526
                LOG.error(_("Could not create ZeroMQ receiver daemon. "
 
527
                            "Socket may already be in use."))
519
528
 
520
529
        super(ZmqProxy, self).consume_in_thread()
521
530
 
522
531
 
 
532
def unflatten_envelope(packenv):
 
533
    """Unflattens the RPC envelope.
 
534
       Takes a list and returns a dictionary.
 
535
       i.e. [1,2,3,4] => {1: 2, 3: 4}
 
536
    """
 
537
    i = iter(packenv)
 
538
    h = {}
 
539
    try:
 
540
        while True:
 
541
            k = i.next()
 
542
            h[k] = i.next()
 
543
    except StopIteration:
 
544
        return h
 
545
 
 
546
 
523
547
class ZmqReactor(ZmqBaseReactor):
524
548
    """
525
549
    A consumer class implementing a
540
564
            self.mapping[sock].send(data)
541
565
            return
542
566
 
543
 
        msg_id, topic, style, in_msg = data
544
 
 
545
 
        ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
546
 
        ctx = RpcContext.unmarshal(ctx)
547
 
 
548
567
        proxy = self.proxies[sock]
549
568
 
550
 
        self.pool.spawn_n(self.process, style, topic,
551
 
                          proxy, ctx, request)
 
569
        if data[2] == 'cast':  # Legacy protocol
 
570
            packenv = data[3]
 
571
 
 
572
            ctx, msg = _deserialize(packenv)
 
573
            request = rpc_common.deserialize_msg(msg)
 
574
            ctx = RpcContext.unmarshal(ctx)
 
575
        elif data[2] == 'impl_zmq_v2':
 
576
            packenv = data[4:]
 
577
 
 
578
            msg = unflatten_envelope(packenv)
 
579
            request = rpc_common.deserialize_msg(msg)
 
580
 
 
581
            # Unmarshal only after verifying the message.
 
582
            ctx = RpcContext.unmarshal(data[3])
 
583
        else:
 
584
            LOG.error(_("ZMQ Envelope version unsupported or unknown."))
 
585
            return
 
586
 
 
587
        self.pool.spawn_n(self.process, proxy, ctx, request)
552
588
 
553
589
 
554
590
class Connection(rpc_common.Connection):
555
591
    """Manages connections and threads."""
556
592
 
557
593
    def __init__(self, conf):
 
594
        self.topics = []
558
595
        self.reactor = ZmqReactor(conf)
559
596
 
560
597
    def create_consumer(self, topic, proxy, fanout=False):
561
 
        # Only consume on the base topic name.
562
 
        topic = topic.split('.', 1)[0]
563
 
 
564
 
        LOG.info(_("Create Consumer for topic (%(topic)s)") %
565
 
                 {'topic': topic})
 
598
        # Register with matchmaker.
 
599
        _get_matchmaker().register(topic, CONF.rpc_zmq_host)
566
600
 
567
601
        # Subscription scenarios
568
602
        if fanout:
 
603
            sock_type = zmq.SUB
569
604
            subscribe = ('', fanout)[type(fanout) == str]
570
 
            sock_type = zmq.SUB
571
 
            topic = 'fanout~' + topic
 
605
            topic = 'fanout~' + topic.split('.', 1)[0]
572
606
        else:
573
607
            sock_type = zmq.PULL
574
608
            subscribe = None
 
609
            topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
 
610
 
 
611
        if topic in self.topics:
 
612
            LOG.info(_("Skipping topic registration. Already registered."))
 
613
            return
575
614
 
576
615
        # Receive messages from (local) proxy
577
616
        inaddr = "ipc://%s/zmq_topic_%s" % \
582
621
 
583
622
        self.reactor.register(proxy, inaddr, sock_type,
584
623
                              subscribe=subscribe, in_bind=False)
 
624
        self.topics.append(topic)
585
625
 
586
626
    def close(self):
 
627
        _get_matchmaker().stop_heartbeat()
 
628
        for topic in self.topics:
 
629
            _get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
 
630
 
587
631
        self.reactor.close()
 
632
        self.topics = []
588
633
 
589
634
    def wait(self):
590
635
        self.reactor.wait()
591
636
 
592
637
    def consume_in_thread(self):
 
638
        _get_matchmaker().start_heartbeat()
593
639
        self.reactor.consume_in_thread()
594
640
 
595
641
 
596
 
def _cast(addr, context, topic, msg, timeout=None, serialize=True,
597
 
          force_envelope=False, _msg_id=None):
 
642
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
 
643
          _msg_id=None):
598
644
    timeout_cast = timeout or CONF.rpc_cast_timeout
599
645
    payload = [RpcContext.marshal(context), msg]
600
646
 
603
649
            conn = ZmqClient(addr)
604
650
 
605
651
            # assumes cast can't return an exception
606
 
            conn.cast(_msg_id, topic, payload, serialize, force_envelope)
 
652
            conn.cast(_msg_id, topic, payload, envelope)
607
653
        except zmq.ZMQError:
608
654
            raise RPCException("Cast failed. ZMQ Socket Exception")
609
655
        finally:
612
658
 
613
659
 
614
660
def _call(addr, context, topic, msg, timeout=None,
615
 
          serialize=True, force_envelope=False):
 
661
          envelope=False):
616
662
    # timeout_response is how long we wait for a response
617
663
    timeout = timeout or CONF.rpc_response_timeout
618
664
 
642
688
    with Timeout(timeout, exception=rpc_common.Timeout):
643
689
        try:
644
690
            msg_waiter = ZmqSocket(
645
 
                "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
 
691
                "ipc://%s/zmq_topic_zmq_replies.%s" %
 
692
                (CONF.rpc_zmq_ipc_dir,
 
693
                 CONF.rpc_zmq_host),
646
694
                zmq.SUB, subscribe=msg_id, bind=False
647
695
            )
648
696
 
649
697
            LOG.debug(_("Sending cast"))
650
 
            _cast(addr, context, topic, payload,
651
 
                  serialize=serialize, force_envelope=force_envelope)
 
698
            _cast(addr, context, topic, payload, envelope)
652
699
 
653
700
            LOG.debug(_("Cast sent; Waiting reply"))
654
701
            # Blocks until receives reply
655
702
            msg = msg_waiter.recv()
656
703
            LOG.debug(_("Received message: %s"), msg)
657
704
            LOG.debug(_("Unpacking response"))
658
 
            responses = _deserialize(msg[-1])[-1]['args']['response']
 
705
 
 
706
            if msg[2] == 'cast':  # Legacy version
 
707
                raw_msg = _deserialize(msg[-1])[-1]
 
708
            elif msg[2] == 'impl_zmq_v2':
 
709
                rpc_envelope = unflatten_envelope(msg[4:])
 
710
                raw_msg = rpc_common.deserialize_msg(rpc_envelope)
 
711
            else:
 
712
                raise rpc_common.UnsupportedRpcEnvelopeVersion(
 
713
                    _("Unsupported or unknown ZMQ envelope returned."))
 
714
 
 
715
            responses = raw_msg['args']['response']
659
716
        # ZMQError trumps the Timeout error.
660
717
        except zmq.ZMQError:
661
718
            raise RPCException("ZMQ Socket Error")
676
733
    return responses[-1]
677
734
 
678
735
 
679
 
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
680
 
                force_envelope=False, _msg_id=None):
 
736
def _multi_send(method, context, topic, msg, timeout=None,
 
737
                envelope=False, _msg_id=None):
681
738
    """
682
739
    Wraps the sending of messages,
683
740
    dispatches to the matchmaker and sends
694
751
        LOG.warn(_("No matchmaker results. Not casting."))
695
752
        # While not strictly a timeout, callers know how to handle
696
753
        # this exception and a timeout isn't too big a lie.
697
 
        raise rpc_common.Timeout, "No match from matchmaker."
 
754
        raise rpc_common.Timeout(_("No match from matchmaker."))
698
755
 
699
756
    # This supports brokerless fanout (addresses > 1)
700
757
    for queue in queues:
703
760
 
704
761
        if method.__name__ == '_cast':
705
762
            eventlet.spawn_n(method, _addr, context,
706
 
                             _topic, msg, timeout, serialize,
707
 
                             force_envelope, _msg_id)
 
763
                             _topic, msg, timeout, envelope,
 
764
                             _msg_id)
708
765
            return
709
766
        return method(_addr, context, _topic, msg, timeout,
710
 
                      serialize, force_envelope)
 
767
                      envelope)
711
768
 
712
769
 
713
770
def create_connection(conf, new=True):
737
794
    _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
738
795
 
739
796
 
740
 
def notify(conf, context, topic, msg, **kwargs):
 
797
def notify(conf, context, topic, msg, envelope):
741
798
    """
742
799
    Send notification event.
743
800
    Notifications are sent to topic-priority.
745
802
    """
746
803
    # NOTE(ewindisch): dot-priority in rpc notifier does not
747
804
    # work with our assumptions.
748
 
    topic.replace('.', '-')
749
 
    kwargs['serialize'] = kwargs.pop('envelope')
750
 
    kwargs['force_envelope'] = True
751
 
    cast(conf, context, topic, msg, **kwargs)
 
805
    topic = topic.replace('.', '-')
 
806
    cast(conf, context, topic, msg, envelope=envelope)
752
807
 
753
808
 
754
809
def cleanup():