~ubuntu-branches/ubuntu/trusty/heat/trusty-security

« back to all changes in this revision

Viewing changes to heat/openstack/common/rpc/impl_zmq.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Yolanda Robla, Chuck Short
  • Date: 2013-07-22 16:22:29 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20130722162229-zzvfu40id94ii0hc
Tags: 2013.2~b2-0ubuntu1
[ Yolanda Robla ]
* debian/tests: added autopkg tests

[ Chuck Short ]
* New upstream release
* debian/control:
  - Add python-pbr to build-depends.
  - Add python-d2to to build-depends.
  - Dropped python-argparse.
  - Add python-six to build-depends.
  - Dropped python-sendfile.
  - Dropped python-nose.
  - Added testrepository.
  - Added python-testtools.
* debian/rules: Run testrepository instead of nosetets.
* debian/patches/removes-lxml-version-limitation-from-pip-requires.patch: Dropped
  no longer needed.
* debian/patches/fix-package-version-detection-when-building-doc.patch: Dropped
  no longer needed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
import os
18
18
import pprint
 
19
import re
19
20
import socket
20
21
import sys
21
22
import types
25
26
import greenlet
26
27
from oslo.config import cfg
27
28
 
 
29
from heat.openstack.common import excutils
28
30
from heat.openstack.common.gettextutils import _
29
31
from heat.openstack.common import importutils
30
32
from heat.openstack.common import jsonutils
31
 
from heat.openstack.common import processutils as utils
32
33
from heat.openstack.common.rpc import common as rpc_common
33
34
 
34
35
zmq = importutils.try_import('eventlet.green.zmq')
83
84
 
84
85
 
85
86
def _serialize(data):
86
 
    """
87
 
    Serialization wrapper
 
87
    """Serialization wrapper.
 
88
 
88
89
    We prefer using JSON, but it cannot encode all types.
89
90
    Error if a developer passes us bad data.
90
91
    """
91
92
    try:
92
 
        return str(jsonutils.dumps(data, ensure_ascii=True))
 
93
        return jsonutils.dumps(data, ensure_ascii=True)
93
94
    except TypeError:
94
 
        LOG.error(_("JSON serialization failed."))
95
 
        raise
 
95
        with excutils.save_and_reraise_exception():
 
96
            LOG.error(_("JSON serialization failed."))
96
97
 
97
98
 
98
99
def _deserialize(data):
99
 
    """
100
 
    Deserialization wrapper
101
 
    """
 
100
    """Deserialization wrapper."""
102
101
    LOG.debug(_("Deserializing: %s"), data)
103
102
    return jsonutils.loads(data)
104
103
 
105
104
 
106
105
class ZmqSocket(object):
107
 
    """
108
 
    A tiny wrapper around ZeroMQ to simplify the send/recv protocol
109
 
    and connection management.
 
106
    """A tiny wrapper around ZeroMQ.
110
107
 
 
108
    Simplifies the send/recv protocol and connection management.
111
109
    Can be used as a Context (supports the 'with' statement).
112
110
    """
113
111
 
178
176
            return
179
177
 
180
178
        # We must unsubscribe, or we'll leak descriptors.
181
 
        if len(self.subscriptions) > 0:
 
179
        if self.subscriptions:
182
180
            for f in self.subscriptions:
183
181
                try:
184
182
                    self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
197
195
            LOG.error("ZeroMQ socket could not be closed.")
198
196
        self.sock = None
199
197
 
200
 
    def recv(self):
 
198
    def recv(self, **kwargs):
201
199
        if not self.can_recv:
202
200
            raise RPCException(_("You cannot recv on this socket."))
203
 
        return self.sock.recv_multipart()
 
201
        return self.sock.recv_multipart(**kwargs)
204
202
 
205
 
    def send(self, data):
 
203
    def send(self, data, **kwargs):
206
204
        if not self.can_send:
207
205
            raise RPCException(_("You cannot send on this socket."))
208
 
        self.sock.send_multipart(data)
 
206
        self.sock.send_multipart(data, **kwargs)
209
207
 
210
208
 
211
209
class ZmqClient(object):
212
210
    """Client for ZMQ sockets."""
213
211
 
214
 
    def __init__(self, addr, socket_type=None, bind=False):
215
 
        if socket_type is None:
216
 
            socket_type = zmq.PUSH
217
 
        self.outq = ZmqSocket(addr, socket_type, bind=bind)
218
 
 
219
 
    def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
220
 
        if serialize:
221
 
            data = rpc_common.serialize_msg(data, force_envelope)
222
 
        self.outq.send([str(msg_id), str(topic), str('cast'),
223
 
                        _serialize(data)])
 
212
    def __init__(self, addr):
 
213
        self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
 
214
 
 
215
    def cast(self, msg_id, topic, data, envelope):
 
216
        msg_id = msg_id or 0
 
217
 
 
218
        if not envelope:
 
219
            self.outq.send(map(bytes,
 
220
                           (msg_id, topic, 'cast', _serialize(data))))
 
221
            return
 
222
 
 
223
        rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
 
224
        zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
 
225
        self.outq.send(map(bytes,
 
226
                       (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
224
227
 
225
228
    def close(self):
226
229
        self.outq.close()
267
270
 
268
271
        try:
269
272
            result = proxy.dispatch(
270
 
                ctx, data['version'], data['method'], **data['args'])
 
273
                ctx, data['version'], data['method'],
 
274
                data.get('namespace'), **data['args'])
271
275
            return ConsumerBase.normalize_reply(result, ctx.replies)
272
276
        except greenlet.GreenletExit:
273
277
            # ignore these since they are just from shutdowns
274
278
            pass
275
 
        except rpc_common.ClientException, e:
 
279
        except rpc_common.ClientException as e:
276
280
            LOG.debug(_("Expected exception during message handling (%s)") %
277
281
                      e._exc_info[1])
278
282
            return {'exc':
286
290
    def reply(self, ctx, proxy,
287
291
              msg_id=None, context=None, topic=None, msg=None):
288
292
        """Reply to a casted call."""
289
 
        # Our real method is curried into msg['args']
 
293
        # NOTE(ewindisch): context kwarg exists for Grizzly compat.
 
294
        #                  this may be able to be removed earlier than
 
295
        #                  'I' if ConsumerBase.process were refactored.
 
296
        if type(msg) is list:
 
297
            payload = msg[-1]
 
298
        else:
 
299
            payload = msg
290
300
 
291
 
        child_ctx = RpcContext.unmarshal(msg[0])
292
301
        response = ConsumerBase.normalize_reply(
293
 
            self._get_response(child_ctx, proxy, topic, msg[1]),
 
302
            self._get_response(ctx, proxy, topic, payload),
294
303
            ctx.replies)
295
304
 
296
305
        LOG.debug(_("Sending reply"))
297
 
        cast(CONF, ctx, topic, {
 
306
        _multi_send(_cast, ctx, topic, {
298
307
            'method': '-process_reply',
299
308
            'args': {
300
 
                'msg_id': msg_id,
 
309
                'msg_id': msg_id,  # Include for Folsom compat.
301
310
                'response': response
302
311
            }
303
 
        })
 
312
        }, _msg_id=msg_id)
304
313
 
305
314
 
306
315
class ConsumerBase(object):
319
328
        else:
320
329
            return [result]
321
330
 
322
 
    def process(self, style, target, proxy, ctx, data):
 
331
    def process(self, proxy, ctx, data):
323
332
        data.setdefault('version', None)
324
333
        data.setdefault('args', {})
325
334
 
337
346
            return
338
347
 
339
348
        proxy.dispatch(ctx, data['version'],
340
 
                       data['method'], **data['args'])
 
349
                       data['method'], data.get('namespace'), **data['args'])
341
350
 
342
351
 
343
352
class ZmqBaseReactor(ConsumerBase):
344
 
    """
345
 
    A consumer class implementing a
346
 
    centralized casting broker (PULL-PUSH)
347
 
    for RoundRobin requests.
 
353
    """A consumer class implementing a centralized casting broker (PULL-PUSH).
 
354
 
 
355
    Used for RoundRobin requests.
348
356
    """
349
357
 
350
358
    def __init__(self, conf):
415
423
 
416
424
 
417
425
class ZmqProxy(ZmqBaseReactor):
418
 
    """
419
 
    A consumer class implementing a
420
 
    topic-based proxy, forwarding to
421
 
    IPC sockets.
 
426
    """A consumer class implementing a topic-based proxy.
 
427
 
 
428
    Forwards to IPC sockets.
422
429
    """
423
430
 
424
431
    def __init__(self, conf):
425
432
        super(ZmqProxy, self).__init__(conf)
 
433
        pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
 
434
        self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
426
435
 
427
436
        self.topic_proxy = {}
428
437
 
429
438
    def consume(self, sock):
430
439
        ipc_dir = CONF.rpc_zmq_ipc_dir
431
440
 
432
 
        #TODO(ewindisch): use zero-copy (i.e. references, not copying)
433
 
        data = sock.recv()
434
 
        msg_id, topic, style, in_msg = data
435
 
        topic = topic.split('.', 1)[0]
436
 
 
437
 
        LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
438
 
 
439
 
        # Handle zmq_replies magic
 
441
        data = sock.recv(copy=False)
 
442
        topic = data[1].bytes
 
443
 
440
444
        if topic.startswith('fanout~'):
441
445
            sock_type = zmq.PUB
 
446
            topic = topic.split('.', 1)[0]
442
447
        elif topic.startswith('zmq_replies'):
443
448
            sock_type = zmq.PUB
444
 
            inside = rpc_common.deserialize_msg(_deserialize(in_msg))
445
 
            msg_id = inside[-1]['args']['msg_id']
446
 
            response = inside[-1]['args']['response']
447
 
            LOG.debug(_("->response->%s"), response)
448
 
            data = [str(msg_id), _serialize(response)]
449
449
        else:
450
450
            sock_type = zmq.PUSH
451
451
 
454
454
                LOG.info(_("Creating proxy for topic: %s"), topic)
455
455
 
456
456
                try:
 
457
                    # The topic is received over the network,
 
458
                    # don't trust this input.
 
459
                    if self.badchars.search(topic) is not None:
 
460
                        emsg = _("Topic contained dangerous characters.")
 
461
                        LOG.warn(emsg)
 
462
                        raise RPCException(emsg)
 
463
 
457
464
                    out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
458
465
                                         (ipc_dir, topic),
459
466
                                         sock_type, bind=True)
474
481
 
475
482
                while(True):
476
483
                    data = self.topic_proxy[topic].get()
477
 
                    out_sock.send(data)
478
 
                    LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
479
 
                              {'data': data})
 
484
                    out_sock.send(data, copy=False)
480
485
 
481
486
            wait_sock_creation = eventlet.event.Event()
482
487
            eventlet.spawn(publisher, wait_sock_creation)
489
494
 
490
495
        try:
491
496
            self.topic_proxy[topic].put_nowait(data)
492
 
            LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
493
 
                      {'data': data})
494
497
        except eventlet.queue.Full:
495
498
            LOG.error(_("Local per-topic backlog buffer full for topic "
496
499
                        "%(topic)s. Dropping message.") % {'topic': topic})
497
500
 
498
501
    def consume_in_thread(self):
499
 
        """Runs the ZmqProxy service"""
 
502
        """Runs the ZmqProxy service."""
500
503
        ipc_dir = CONF.rpc_zmq_ipc_dir
501
504
        consume_in = "tcp://%s:%s" % \
502
505
            (CONF.rpc_zmq_bind_address,
503
506
             CONF.rpc_zmq_port)
504
507
        consumption_proxy = InternalContext(None)
505
508
 
506
 
        if not os.path.isdir(ipc_dir):
507
 
            try:
508
 
                utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
509
 
                utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
510
 
                              ipc_dir, run_as_root=True)
511
 
                utils.execute('chmod', '750', ipc_dir, run_as_root=True)
512
 
            except utils.ProcessExecutionError:
513
 
                LOG.error(_("Could not create IPC directory %s") %
514
 
                          (ipc_dir, ))
515
 
                raise
516
 
 
 
509
        try:
 
510
            os.makedirs(ipc_dir)
 
511
        except os.error:
 
512
            if not os.path.isdir(ipc_dir):
 
513
                with excutils.save_and_reraise_exception():
 
514
                    LOG.error(_("Required IPC directory does not exist at"
 
515
                                " %s") % (ipc_dir, ))
517
516
        try:
518
517
            self.register(consumption_proxy,
519
518
                          consume_in,
520
519
                          zmq.PULL,
521
520
                          out_bind=True)
522
521
        except zmq.ZMQError:
523
 
            LOG.error(_("Could not create ZeroMQ receiver daemon. "
524
 
                        "Socket may already be in use."))
525
 
            raise
 
522
            if os.access(ipc_dir, os.X_OK):
 
523
                with excutils.save_and_reraise_exception():
 
524
                    LOG.error(_("Permission denied to IPC directory at"
 
525
                                " %s") % (ipc_dir, ))
 
526
            with excutils.save_and_reraise_exception():
 
527
                LOG.error(_("Could not create ZeroMQ receiver daemon. "
 
528
                            "Socket may already be in use."))
526
529
 
527
530
        super(ZmqProxy, self).consume_in_thread()
528
531
 
529
532
 
 
533
def unflatten_envelope(packenv):
 
534
    """Unflattens the RPC envelope.
 
535
 
 
536
    Takes a list and returns a dictionary.
 
537
    i.e. [1,2,3,4] => {1: 2, 3: 4}
 
538
    """
 
539
    i = iter(packenv)
 
540
    h = {}
 
541
    try:
 
542
        while True:
 
543
            k = i.next()
 
544
            h[k] = i.next()
 
545
    except StopIteration:
 
546
        return h
 
547
 
 
548
 
530
549
class ZmqReactor(ZmqBaseReactor):
531
 
    """
532
 
    A consumer class implementing a
533
 
    consumer for messages. Can also be
534
 
    used as a 1:1 proxy
 
550
    """A consumer class implementing a consumer for messages.
 
551
 
 
552
    Can also be used as a 1:1 proxy
535
553
    """
536
554
 
537
555
    def __init__(self, conf):
547
565
            self.mapping[sock].send(data)
548
566
            return
549
567
 
550
 
        msg_id, topic, style, in_msg = data
551
 
 
552
 
        ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
553
 
        ctx = RpcContext.unmarshal(ctx)
554
 
 
555
568
        proxy = self.proxies[sock]
556
569
 
557
 
        self.pool.spawn_n(self.process, style, topic,
558
 
                          proxy, ctx, request)
 
570
        if data[2] == 'cast':  # Legacy protocol
 
571
            packenv = data[3]
 
572
 
 
573
            ctx, msg = _deserialize(packenv)
 
574
            request = rpc_common.deserialize_msg(msg)
 
575
            ctx = RpcContext.unmarshal(ctx)
 
576
        elif data[2] == 'impl_zmq_v2':
 
577
            packenv = data[4:]
 
578
 
 
579
            msg = unflatten_envelope(packenv)
 
580
            request = rpc_common.deserialize_msg(msg)
 
581
 
 
582
            # Unmarshal only after verifying the message.
 
583
            ctx = RpcContext.unmarshal(data[3])
 
584
        else:
 
585
            LOG.error(_("ZMQ Envelope version unsupported or unknown."))
 
586
            return
 
587
 
 
588
        self.pool.spawn_n(self.process, proxy, ctx, request)
559
589
 
560
590
 
561
591
class Connection(rpc_common.Connection):
562
592
    """Manages connections and threads."""
563
593
 
564
594
    def __init__(self, conf):
 
595
        self.topics = []
565
596
        self.reactor = ZmqReactor(conf)
566
597
 
567
598
    def create_consumer(self, topic, proxy, fanout=False):
568
 
        # Only consume on the base topic name.
569
 
        topic = topic.split('.', 1)[0]
570
 
 
571
 
        LOG.info(_("Create Consumer for topic (%(topic)s)") %
572
 
                 {'topic': topic})
 
599
        # Register with matchmaker.
 
600
        _get_matchmaker().register(topic, CONF.rpc_zmq_host)
573
601
 
574
602
        # Subscription scenarios
575
603
        if fanout:
 
604
            sock_type = zmq.SUB
576
605
            subscribe = ('', fanout)[type(fanout) == str]
577
 
            sock_type = zmq.SUB
578
 
            topic = 'fanout~' + topic
 
606
            topic = 'fanout~' + topic.split('.', 1)[0]
579
607
        else:
580
608
            sock_type = zmq.PULL
581
609
            subscribe = None
 
610
            topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
 
611
 
 
612
        if topic in self.topics:
 
613
            LOG.info(_("Skipping topic registration. Already registered."))
 
614
            return
582
615
 
583
616
        # Receive messages from (local) proxy
584
617
        inaddr = "ipc://%s/zmq_topic_%s" % \
589
622
 
590
623
        self.reactor.register(proxy, inaddr, sock_type,
591
624
                              subscribe=subscribe, in_bind=False)
 
625
        self.topics.append(topic)
592
626
 
593
627
    def close(self):
 
628
        _get_matchmaker().stop_heartbeat()
 
629
        for topic in self.topics:
 
630
            _get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
 
631
 
594
632
        self.reactor.close()
 
633
        self.topics = []
595
634
 
596
635
    def wait(self):
597
636
        self.reactor.wait()
598
637
 
599
638
    def consume_in_thread(self):
 
639
        _get_matchmaker().start_heartbeat()
600
640
        self.reactor.consume_in_thread()
601
641
 
602
642
 
603
 
def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
604
 
          force_envelope=False):
 
643
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
 
644
          _msg_id=None):
605
645
    timeout_cast = timeout or CONF.rpc_cast_timeout
606
646
    payload = [RpcContext.marshal(context), msg]
607
647
 
610
650
            conn = ZmqClient(addr)
611
651
 
612
652
            # assumes cast can't return an exception
613
 
            conn.cast(msg_id, topic, payload, serialize, force_envelope)
 
653
            conn.cast(_msg_id, topic, payload, envelope)
614
654
        except zmq.ZMQError:
615
655
            raise RPCException("Cast failed. ZMQ Socket Exception")
616
656
        finally:
618
658
                conn.close()
619
659
 
620
660
 
621
 
def _call(addr, context, msg_id, topic, msg, timeout=None,
622
 
          serialize=True, force_envelope=False):
 
661
def _call(addr, context, topic, msg, timeout=None,
 
662
          envelope=False):
623
663
    # timeout_response is how long we wait for a response
624
664
    timeout = timeout or CONF.rpc_response_timeout
625
665
 
636
676
        'method': '-reply',
637
677
        'args': {
638
678
            'msg_id': msg_id,
639
 
            'context': mcontext,
640
679
            'topic': reply_topic,
 
680
            # TODO(ewindisch): safe to remove mcontext in I.
641
681
            'msg': [mcontext, msg]
642
682
        }
643
683
    }
649
689
    with Timeout(timeout, exception=rpc_common.Timeout):
650
690
        try:
651
691
            msg_waiter = ZmqSocket(
652
 
                "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
 
692
                "ipc://%s/zmq_topic_zmq_replies.%s" %
 
693
                (CONF.rpc_zmq_ipc_dir,
 
694
                 CONF.rpc_zmq_host),
653
695
                zmq.SUB, subscribe=msg_id, bind=False
654
696
            )
655
697
 
656
698
            LOG.debug(_("Sending cast"))
657
 
            _cast(addr, context, msg_id, topic, payload,
658
 
                  serialize=serialize, force_envelope=force_envelope)
 
699
            _cast(addr, context, topic, payload, envelope)
659
700
 
660
701
            LOG.debug(_("Cast sent; Waiting reply"))
661
702
            # Blocks until receives reply
662
703
            msg = msg_waiter.recv()
663
704
            LOG.debug(_("Received message: %s"), msg)
664
705
            LOG.debug(_("Unpacking response"))
665
 
            responses = _deserialize(msg[-1])
 
706
 
 
707
            if msg[2] == 'cast':  # Legacy version
 
708
                raw_msg = _deserialize(msg[-1])[-1]
 
709
            elif msg[2] == 'impl_zmq_v2':
 
710
                rpc_envelope = unflatten_envelope(msg[4:])
 
711
                raw_msg = rpc_common.deserialize_msg(rpc_envelope)
 
712
            else:
 
713
                raise rpc_common.UnsupportedRpcEnvelopeVersion(
 
714
                    _("Unsupported or unknown ZMQ envelope returned."))
 
715
 
 
716
            responses = raw_msg['args']['response']
666
717
        # ZMQError trumps the Timeout error.
667
718
        except zmq.ZMQError:
668
719
            raise RPCException("ZMQ Socket Error")
 
720
        except (IndexError, KeyError):
 
721
            raise RPCException(_("RPC Message Invalid."))
669
722
        finally:
670
723
            if 'msg_waiter' in vars():
671
724
                msg_waiter.close()
681
734
    return responses[-1]
682
735
 
683
736
 
684
 
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
685
 
                force_envelope=False):
686
 
    """
687
 
    Wraps the sending of messages,
688
 
    dispatches to the matchmaker and sends
689
 
    message to all relevant hosts.
 
737
def _multi_send(method, context, topic, msg, timeout=None,
 
738
                envelope=False, _msg_id=None):
 
739
    """Wraps the sending of messages.
 
740
 
 
741
    Dispatches to the matchmaker and sends message to all relevant hosts.
690
742
    """
691
743
    conf = CONF
692
744
    LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
695
747
    LOG.debug(_("Sending message(s) to: %s"), queues)
696
748
 
697
749
    # Don't stack if we have no matchmaker results
698
 
    if len(queues) == 0:
 
750
    if not queues:
699
751
        LOG.warn(_("No matchmaker results. Not casting."))
700
752
        # While not strictly a timeout, callers know how to handle
701
753
        # this exception and a timeout isn't too big a lie.
702
 
        raise rpc_common.Timeout, "No match from matchmaker."
 
754
        raise rpc_common.Timeout(_("No match from matchmaker."))
703
755
 
704
756
    # This supports brokerless fanout (addresses > 1)
705
757
    for queue in queues:
708
760
 
709
761
        if method.__name__ == '_cast':
710
762
            eventlet.spawn_n(method, _addr, context,
711
 
                             _topic, _topic, msg, timeout, serialize,
712
 
                             force_envelope)
 
763
                             _topic, msg, timeout, envelope,
 
764
                             _msg_id)
713
765
            return
714
 
        return method(_addr, context, _topic, _topic, msg, timeout,
715
 
                      serialize, force_envelope)
 
766
        return method(_addr, context, _topic, msg, timeout,
 
767
                      envelope)
716
768
 
717
769
 
718
770
def create_connection(conf, new=True):
742
794
    _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
743
795
 
744
796
 
745
 
def notify(conf, context, topic, msg, **kwargs):
746
 
    """
747
 
    Send notification event.
 
797
def notify(conf, context, topic, msg, envelope):
 
798
    """Send notification event.
 
799
 
748
800
    Notifications are sent to topic-priority.
749
801
    This differs from the AMQP drivers which send to topic.priority.
750
802
    """
751
803
    # NOTE(ewindisch): dot-priority in rpc notifier does not
752
804
    # work with our assumptions.
753
 
    topic.replace('.', '-')
754
 
    kwargs['serialize'] = kwargs.pop('envelope')
755
 
    kwargs['force_envelope'] = True
756
 
    cast(conf, context, topic, msg, **kwargs)
 
805
    topic = topic.replace('.', '-')
 
806
    cast(conf, context, topic, msg, envelope=envelope)
757
807
 
758
808
 
759
809
def cleanup():
777
827
    return ZMQ_CTX
778
828
 
779
829
 
780
 
def _get_matchmaker():
 
830
def _get_matchmaker(*args, **kwargs):
781
831
    global matchmaker
782
832
    if not matchmaker:
783
 
        matchmaker = importutils.import_object(CONF.rpc_zmq_matchmaker)
 
833
        mm = CONF.rpc_zmq_matchmaker
 
834
        if mm.endswith('matchmaker.MatchMakerRing'):
 
835
            mm.replace('matchmaker', 'matchmaker_ring')
 
836
            LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
 
837
                       ' %(new)s instead') % dict(
 
838
                     orig=CONF.rpc_zmq_matchmaker, new=mm))
 
839
        matchmaker = importutils.import_object(mm, *args, **kwargs)
784
840
    return matchmaker