1
# Copyright 2011 Cloudscaling Group, Inc
3
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4
# not use this file except in compliance with the License. You may obtain
5
# a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12
# License for the specific language governing permissions and limitations
31
from oslo.config import cfg
32
from oslo.messaging._drivers import base
33
from oslo.messaging._drivers import common as rpc_common
34
from oslo.messaging._executors import impl_eventlet # FIXME(markmc)
35
from oslo.messaging.openstack.common.gettextutils import _
36
from oslo.messaging.openstack.common import jsonutils
37
from oslo.utils import excutils
38
from oslo.utils import importutils
41
zmq = importutils.try_import('eventlet.green.zmq')
43
# for convenience, are not modified.
44
pformat = pprint.pformat
45
Timeout = eventlet.timeout.Timeout
46
LOG = logging.getLogger(__name__)
47
RPCException = rpc_common.RPCException
50
cfg.StrOpt('rpc_zmq_bind_address', default='*',
51
help='ZeroMQ bind address. Should be a wildcard (*), '
52
'an ethernet interface, or IP. '
53
'The "host" option should point or resolve to this '
56
# The module.Class to use for matchmaking.
59
default=('oslo.messaging._drivers.'
60
'matchmaker.MatchMakerLocalhost'),
61
help='MatchMaker driver.',
64
# The following port is unassigned by IANA as of 2012-05-21
65
cfg.IntOpt('rpc_zmq_port', default=9501,
66
help='ZeroMQ receiver listening port.'),
68
cfg.IntOpt('rpc_zmq_contexts', default=1,
69
help='Number of ZeroMQ contexts, defaults to 1.'),
71
cfg.IntOpt('rpc_zmq_topic_backlog',
72
help='Maximum number of ingress messages to locally buffer '
73
'per topic. Default is unlimited.'),
75
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
76
help='Directory for holding IPC sockets.'),
78
cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
79
sample_default='localhost',
80
help='Name of this node. Must be a valid hostname, FQDN, or '
81
'IP address. Must match "host" option, if running Nova.'),
83
cfg.IntOpt('rpc_cast_timeout',
85
help='Seconds to wait before a cast expires (TTL). '
86
'Only supported by impl_zmq.'),
91
ZMQ_CTX = None # ZeroMQ Context, must be global.
92
matchmaker = None # memoized matchmaker object
96
"""Serialization wrapper.
98
We prefer using JSON, but it cannot encode all types.
99
Error if a developer passes us bad data.
102
return jsonutils.dumps(data, ensure_ascii=True)
104
with excutils.save_and_reraise_exception():
105
LOG.error(_("JSON serialization failed."))
108
def _deserialize(data):
109
"""Deserialization wrapper."""
110
LOG.debug("Deserializing: %s", data)
111
return jsonutils.loads(data)
114
class ZmqSocket(object):
115
"""A tiny wrapper around ZeroMQ.
117
Simplifies the send/recv protocol and connection management.
118
Can be used as a Context (supports the 'with' statement).
121
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
122
self.sock = _get_ctxt().socket(zmq_type)
125
self.subscriptions = []
127
# Support failures on sending/receiving on wrong socket type.
128
self.can_recv = zmq_type in (zmq.PULL, zmq.SUB)
129
self.can_send = zmq_type in (zmq.PUSH, zmq.PUB)
130
self.can_sub = zmq_type in (zmq.SUB, )
132
# Support list, str, & None for subscribe arg (cast to list)
142
str_data = {'addr': addr, 'type': self.socket_s(),
143
'subscribe': subscribe, 'bind': bind}
145
LOG.debug("Connecting to %(addr)s with %(type)s", str_data)
146
LOG.debug("-> Subscribed to %(subscribe)s", str_data)
147
LOG.debug("-> bind: %(bind)s", str_data)
153
self.sock.connect(addr)
155
raise RPCException(_("Could not open socket."))
158
"""Get socket type as string."""
159
t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER',
161
return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type]
163
def subscribe(self, msg_filter):
166
raise RPCException("Cannot subscribe on this socket.")
167
LOG.debug("Subscribing to %s", msg_filter)
170
self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
174
self.subscriptions.append(msg_filter)
176
def unsubscribe(self, msg_filter):
178
if msg_filter not in self.subscriptions:
180
self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
181
self.subscriptions.remove(msg_filter)
184
if self.sock is None or self.sock.closed:
187
# We must unsubscribe, or we'll leak descriptors.
188
if self.subscriptions:
189
for f in self.subscriptions:
191
self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
194
self.subscriptions = []
197
# Default is to linger
200
# While this is a bad thing to happen,
201
# it would be much worse if some of the code calling this
202
# were to fail. For now, lets log, and later evaluate
203
# if we can safely raise here.
204
LOG.error("ZeroMQ socket could not be closed.")
207
def recv(self, **kwargs):
208
if not self.can_recv:
209
raise RPCException(_("You cannot recv on this socket."))
210
return self.sock.recv_multipart(**kwargs)
212
def send(self, data, **kwargs):
213
if not self.can_send:
214
raise RPCException(_("You cannot send on this socket."))
215
self.sock.send_multipart(data, **kwargs)
218
class ZmqClient(object):
219
"""Client for ZMQ sockets."""
221
def __init__(self, addr):
222
self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
224
def cast(self, msg_id, topic, data, envelope):
228
self.outq.send(map(bytes,
229
(msg_id, topic, 'cast', _serialize(data))))
232
rpc_envelope = rpc_common.serialize_msg(data[1])
233
zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items())
234
self.outq.send(map(bytes,
235
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
241
class RpcContext(rpc_common.CommonRpcContext):
242
"""Context that supports replying to a rpc.call."""
243
def __init__(self, **kwargs):
245
super(RpcContext, self).__init__(**kwargs)
248
values = self.to_dict()
249
values['replies'] = self.replies
250
return self.__class__(**values)
252
def reply(self, reply=None, failure=None, ending=False):
255
self.replies.append(reply)
258
def marshal(self, ctx):
259
ctx_data = ctx.to_dict()
260
return _serialize(ctx_data)
263
def unmarshal(self, data):
264
return RpcContext.from_dict(_deserialize(data))
267
class InternalContext(object):
268
"""Used by ConsumerBase as a private context for - methods."""
270
def __init__(self, proxy):
272
self.msg_waiter = None
274
def _get_response(self, ctx, proxy, topic, data):
275
"""Process a curried message and cast the result to topic."""
276
LOG.debug("Running func with context: %s", ctx.to_dict())
277
data.setdefault('version', None)
278
data.setdefault('args', {})
281
result = proxy.dispatch(
282
ctx, data['version'], data['method'],
283
data.get('namespace'), **data['args'])
284
return ConsumerBase.normalize_reply(result, ctx.replies)
285
except greenlet.GreenletExit:
286
# ignore these since they are just from shutdowns
288
except rpc_common.ClientException as e:
289
LOG.debug("Expected exception during message handling (%s)",
292
rpc_common.serialize_remote_exception(e._exc_info,
295
LOG.error(_("Exception during message handling"))
297
rpc_common.serialize_remote_exception(sys.exc_info())}
299
def reply(self, ctx, proxy,
300
msg_id=None, context=None, topic=None, msg=None):
301
"""Reply to a casted call."""
302
# NOTE(ewindisch): context kwarg exists for Grizzly compat.
303
# this may be able to be removed earlier than
304
# 'I' if ConsumerBase.process were refactored.
305
if type(msg) is list:
310
response = ConsumerBase.normalize_reply(
311
self._get_response(ctx, proxy, topic, payload),
314
LOG.debug("Sending reply")
315
_multi_send(_cast, ctx, topic, {
316
'method': '-process_reply',
318
'msg_id': msg_id, # Include for Folsom compat.
324
class ConsumerBase(object):
328
self.private_ctx = InternalContext(None)
331
def normalize_reply(self, result, replies):
332
# TODO(ewindisch): re-evaluate and document this method.
333
if isinstance(result, types.GeneratorType):
340
def process(self, proxy, ctx, data):
341
data.setdefault('version', None)
342
data.setdefault('args', {})
344
# Method starting with - are
345
# processed internally. (non-valid method name)
346
method = data.get('method')
348
LOG.error(_("RPC message did not include method."))
352
# uses internal context for safety.
353
if method == '-reply':
354
self.private_ctx.reply(ctx, proxy, **data['args'])
357
proxy.dispatch(ctx, data['version'],
358
data['method'], data.get('namespace'), **data['args'])
361
class ZmqBaseReactor(ConsumerBase):
362
"""A consumer class implementing a centralized casting broker (PULL-PUSH).
364
Used for RoundRobin requests.
367
def __init__(self, conf):
368
super(ZmqBaseReactor, self).__init__()
375
self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
377
def register(self, proxy, in_addr, zmq_type_in,
378
in_bind=True, subscribe=None):
380
LOG.info(_("Registering reactor"))
382
if zmq_type_in not in (zmq.PULL, zmq.SUB):
383
raise RPCException("Bad input socktype")
386
inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
389
self.proxies[inq] = proxy
390
self.sockets.append(inq)
392
LOG.info(_("In reactor registered"))
394
def consume_in_thread(self):
396
LOG.info(_("Consuming socket"))
400
for k in self.proxies.keys():
402
self.pool.spawn(_consume, k)
406
for t in self.threads:
410
for s in self.sockets:
413
for t in self.threads:
417
class ZmqProxy(ZmqBaseReactor):
418
"""A consumer class implementing a topic-based proxy.
420
Forwards to IPC sockets.
423
def __init__(self, conf):
424
super(ZmqProxy, self).__init__(conf)
425
pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
426
self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
428
self.topic_proxy = {}
430
def consume(self, sock):
431
ipc_dir = CONF.rpc_zmq_ipc_dir
433
data = sock.recv(copy=False)
434
topic = data[1].bytes
436
if topic.startswith('fanout~'):
438
topic = topic.split('.', 1)[0]
439
elif topic.startswith('zmq_replies'):
444
if topic not in self.topic_proxy:
445
def publisher(waiter):
446
LOG.info(_("Creating proxy for topic: %s"), topic)
449
# The topic is received over the network,
450
# don't trust this input.
451
if self.badchars.search(topic) is not None:
452
emsg = _("Topic contained dangerous characters.")
454
raise RPCException(emsg)
456
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
458
sock_type, bind=True)
460
waiter.send_exception(*sys.exc_info())
463
self.topic_proxy[topic] = eventlet.queue.LightQueue(
464
CONF.rpc_zmq_topic_backlog)
465
self.sockets.append(out_sock)
467
# It takes some time for a pub socket to open,
468
# before we can have any faith in doing a send() to it.
469
if sock_type == zmq.PUB:
475
data = self.topic_proxy[topic].get()
476
out_sock.send(data, copy=False)
478
wait_sock_creation = eventlet.event.Event()
479
eventlet.spawn(publisher, wait_sock_creation)
482
wait_sock_creation.wait()
484
LOG.error(_("Topic socket file creation failed."))
488
self.topic_proxy[topic].put_nowait(data)
489
except eventlet.queue.Full:
490
LOG.error(_("Local per-topic backlog buffer full for topic "
491
"%s. Dropping message."), topic)
493
def consume_in_thread(self):
494
"""Runs the ZmqProxy service."""
495
ipc_dir = CONF.rpc_zmq_ipc_dir
496
consume_in = "tcp://%s:%s" % \
497
(CONF.rpc_zmq_bind_address,
499
consumption_proxy = InternalContext(None)
504
if not os.path.isdir(ipc_dir):
505
with excutils.save_and_reraise_exception():
506
LOG.error(_("Required IPC directory does not exist at"
509
self.register(consumption_proxy,
513
if os.access(ipc_dir, os.X_OK):
514
with excutils.save_and_reraise_exception():
515
LOG.error(_("Permission denied to IPC directory at"
517
with excutils.save_and_reraise_exception():
518
LOG.error(_("Could not create ZeroMQ receiver daemon. "
519
"Socket may already be in use."))
521
super(ZmqProxy, self).consume_in_thread()
524
def unflatten_envelope(packenv):
525
"""Unflattens the RPC envelope.
527
Takes a list and returns a dictionary.
528
i.e. [1,2,3,4] => {1: 2, 3: 4}
536
except StopIteration:
540
class ZmqReactor(ZmqBaseReactor):
541
"""A consumer class implementing a consumer for messages.
543
Can also be used as a 1:1 proxy
546
def __init__(self, conf):
547
super(ZmqReactor, self).__init__(conf)
549
def consume(self, sock):
550
# TODO(ewindisch): use zero-copy (i.e. references, not copying)
552
LOG.debug("CONSUMER RECEIVED DATA: %s", data)
554
proxy = self.proxies[sock]
556
if data[2] == 'cast': # Legacy protocol
559
ctx, msg = _deserialize(packenv)
560
request = rpc_common.deserialize_msg(msg)
561
ctx = RpcContext.unmarshal(ctx)
562
elif data[2] == 'impl_zmq_v2':
565
msg = unflatten_envelope(packenv)
566
request = rpc_common.deserialize_msg(msg)
568
# Unmarshal only after verifying the message.
569
ctx = RpcContext.unmarshal(data[3])
571
LOG.error(_("ZMQ Envelope version unsupported or unknown."))
574
self.pool.spawn_n(self.process, proxy, ctx, request)
577
class Connection(rpc_common.Connection):
578
"""Manages connections and threads."""
580
def __init__(self, conf):
582
self.reactor = ZmqReactor(conf)
584
def create_consumer(self, topic, proxy, fanout=False):
585
# Register with matchmaker.
586
_get_matchmaker().register(topic, CONF.rpc_zmq_host)
588
# Subscription scenarios
591
subscribe = ('', fanout)[type(fanout) == str]
592
topic = 'fanout~' + topic.split('.', 1)[0]
596
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
598
if topic in self.topics:
599
LOG.info(_("Skipping topic registration. Already registered."))
602
# Receive messages from (local) proxy
603
inaddr = "ipc://%s/zmq_topic_%s" % \
604
(CONF.rpc_zmq_ipc_dir, topic)
606
LOG.debug("Consumer is a zmq.%s",
607
['PULL', 'SUB'][sock_type == zmq.SUB])
609
self.reactor.register(proxy, inaddr, sock_type,
610
subscribe=subscribe, in_bind=False)
611
self.topics.append(topic)
614
_get_matchmaker().stop_heartbeat()
615
for topic in self.topics:
616
_get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
624
def consume_in_thread(self):
625
_get_matchmaker().start_heartbeat()
626
self.reactor.consume_in_thread()
629
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
630
_msg_id=None, allowed_remote_exmods=None):
631
allowed_remote_exmods = allowed_remote_exmods or []
632
timeout_cast = timeout or CONF.rpc_cast_timeout
633
payload = [RpcContext.marshal(context), msg]
635
with Timeout(timeout_cast, exception=rpc_common.Timeout):
637
conn = ZmqClient(addr)
639
# assumes cast can't return an exception
640
conn.cast(_msg_id, topic, payload, envelope)
642
raise RPCException("Cast failed. ZMQ Socket Exception")
648
def _call(addr, context, topic, msg, timeout=None,
649
envelope=False, allowed_remote_exmods=None):
650
allowed_remote_exmods = allowed_remote_exmods or []
651
# timeout_response is how long we wait for a response
652
timeout = timeout or CONF.rpc_response_timeout
654
# The msg_id is used to track replies.
655
msg_id = uuid.uuid4().hex
657
# Replies always come into the reply service.
658
reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
660
LOG.debug("Creating payload")
661
# Curry the original request into a reply method.
662
mcontext = RpcContext.marshal(context)
667
'topic': reply_topic,
668
# TODO(ewindisch): safe to remove mcontext in I.
669
'msg': [mcontext, msg]
673
LOG.debug("Creating queue socket for reply waiter")
675
# Messages arriving async.
676
# TODO(ewindisch): have reply consumer with dynamic subscription mgmt
677
with Timeout(timeout, exception=rpc_common.Timeout):
679
msg_waiter = ZmqSocket(
680
"ipc://%s/zmq_topic_zmq_replies.%s" %
681
(CONF.rpc_zmq_ipc_dir,
683
zmq.SUB, subscribe=msg_id, bind=False
686
LOG.debug("Sending cast")
687
_cast(addr, context, topic, payload, envelope=envelope)
689
LOG.debug("Cast sent; Waiting reply")
690
# Blocks until receives reply
691
msg = msg_waiter.recv()
692
LOG.debug("Received message: %s", msg)
693
LOG.debug("Unpacking response")
695
if msg[2] == 'cast': # Legacy version
696
raw_msg = _deserialize(msg[-1])[-1]
697
elif msg[2] == 'impl_zmq_v2':
698
rpc_envelope = unflatten_envelope(msg[4:])
699
raw_msg = rpc_common.deserialize_msg(rpc_envelope)
701
raise rpc_common.UnsupportedRpcEnvelopeVersion(
702
_("Unsupported or unknown ZMQ envelope returned."))
704
responses = raw_msg['args']['response']
705
# ZMQError trumps the Timeout error.
707
raise RPCException("ZMQ Socket Error")
708
except (IndexError, KeyError):
709
raise RPCException(_("RPC Message Invalid."))
711
if 'msg_waiter' in vars():
714
# It seems we don't need to do all of the following,
715
# but perhaps it would be useful for multicall?
716
# One effect of this is that we're checking all
717
# responses for Exceptions.
718
for resp in responses:
719
if isinstance(resp, types.DictType) and 'exc' in resp:
720
raise rpc_common.deserialize_remote_exception(
721
resp['exc'], allowed_remote_exmods)
726
def _multi_send(method, context, topic, msg, timeout=None,
727
envelope=False, _msg_id=None, allowed_remote_exmods=None):
728
"""Wraps the sending of messages.
730
Dispatches to the matchmaker and sends message to all relevant hosts.
732
allowed_remote_exmods = allowed_remote_exmods or []
734
LOG.debug(' '.join(map(pformat, (topic, msg))))
736
queues = _get_matchmaker().queues(topic)
737
LOG.debug("Sending message(s) to: %s", queues)
739
# Don't stack if we have no matchmaker results
741
LOG.warn(_("No matchmaker results. Not casting."))
742
# While not strictly a timeout, callers know how to handle
743
# this exception and a timeout isn't too big a lie.
744
raise rpc_common.Timeout(_("No match from matchmaker."))
746
# This supports brokerless fanout (addresses > 1)
749
_topic, ip_addr = queue
750
_addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
752
if method.__name__ == '_cast':
753
eventlet.spawn_n(method, _addr, context,
754
_topic, msg, timeout, envelope, _msg_id)
756
return_val = method(_addr, context, _topic, msg, timeout,
757
envelope, allowed_remote_exmods)
762
def create_connection(conf, new=True):
763
return Connection(conf)
766
def multicall(conf, *args, **kwargs):
767
"""Multiple calls."""
768
return _multi_send(_call, *args, **kwargs)
771
def call(conf, *args, **kwargs):
772
"""Send a message, expect a response."""
773
data = _multi_send(_call, *args, **kwargs)
777
def cast(conf, *args, **kwargs):
778
"""Send a message expecting no reply."""
779
_multi_send(_cast, *args, **kwargs)
782
def fanout_cast(conf, context, topic, msg, **kwargs):
783
"""Send a message to all listening and expect no reply."""
784
# NOTE(ewindisch): fanout~ is used because it avoid splitting on .
785
# and acts as a non-subtle hint to the matchmaker and ZmqProxy.
786
_multi_send(_cast, context, 'fanout~' + six.text_type(topic),
790
def notify(conf, context, topic, msg, envelope):
791
"""Send notification event.
793
Notifications are sent to topic-priority.
794
This differs from the AMQP drivers which send to topic.priority.
796
# NOTE(ewindisch): dot-priority in rpc notifier does not
797
# work with our assumptions.
798
topic = topic.replace('.', '-')
799
cast(conf, context, topic, msg, envelope=envelope)
803
"""Clean up resources in use by implementation."""
815
raise ImportError("Failed to import eventlet.green.zmq")
819
ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
823
def _get_matchmaker(*args, **kwargs):
826
mm = CONF.rpc_zmq_matchmaker
827
if mm.endswith('matchmaker.MatchMakerRing'):
828
mm.replace('matchmaker', 'matchmaker_ring')
829
LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
830
' %(new)s instead') % dict(
831
orig=CONF.rpc_zmq_matchmaker, new=mm))
832
matchmaker = importutils.import_object(mm, *args, **kwargs)
836
class ZmqIncomingMessage(base.IncomingMessage):
838
ReceivedReply = collections.namedtuple(
839
'ReceivedReply', ['reply', 'failure', 'log_failure'])
841
def __init__(self, listener, ctxt, message):
842
super(ZmqIncomingMessage, self).__init__(listener, ctxt, message)
843
self.condition = threading.Condition()
846
def reply(self, reply=None, failure=None, log_failure=True):
847
self.received = self.ReceivedReply(reply, failure, log_failure)
849
self.condition.notify()
855
class ZmqListener(base.Listener):
857
def __init__(self, driver):
858
super(ZmqListener, self).__init__(driver)
859
self.incoming_queue = moves.queue.Queue()
861
def dispatch(self, ctxt, version, method, namespace, **kwargs):
867
message['version'] = version
869
message['namespace'] = namespace
871
incoming = ZmqIncomingMessage(self,
875
self.incoming_queue.put(incoming)
877
with incoming.condition:
878
incoming.condition.wait()
880
assert incoming.received
882
if incoming.received.failure:
883
raise incoming.received.failure
885
return incoming.received.reply
887
def poll(self, timeout=None):
889
return self.incoming_queue.get(timeout=timeout)
890
except six.moves.queue.Empty:
895
class ZmqDriver(base.BaseDriver):
897
# FIXME(markmc): allow this driver to be used without eventlet
899
def __init__(self, conf, url, default_exchange=None,
900
allowed_remote_exmods=None):
901
conf.register_opts(zmq_opts)
902
conf.register_opts(impl_eventlet._eventlet_opts)
904
super(ZmqDriver, self).__init__(conf, url, default_exchange,
905
allowed_remote_exmods)
907
# FIXME(markmc): handle default_exchange
909
# FIXME(markmc): handle transport URL
911
raise NotImplementedError('The ZeroMQ driver does not yet support '
914
# FIXME(markmc): use self.conf everywhere
915
if self.conf is not CONF:
916
raise NotImplementedError('The ZeroMQ driver currently only works '
917
'with oslo.config.cfg.CONF')
919
def _send(self, target, ctxt, message,
920
wait_for_reply=None, timeout=None, envelope=False):
922
# FIXME(markmc): remove this temporary hack
923
class Context(object):
924
def __init__(self, d):
930
context = Context(ctxt)
939
# NOTE(ewindisch): fanout~ is used because it avoid splitting on
940
# and acts as a non-subtle hint to the matchmaker and ZmqProxy.
941
topic = 'fanout~' + topic
943
reply = _multi_send(method, context, topic, message,
945
allowed_remote_exmods=self._allowed_remote_exmods)
950
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
952
# NOTE(sileht): retry is not implemented because this driver never
954
return self._send(target, ctxt, message, wait_for_reply, timeout)
956
def send_notification(self, target, ctxt, message, version, retry=None):
957
# NOTE(ewindisch): dot-priority in rpc notifier does not
958
# work with our assumptions.
959
# NOTE(sileht): retry is not implemented because this driver never
961
target = target(topic=target.topic.replace('.', '-'))
962
return self._send(target, ctxt, message, envelope=(version == 2.0))
964
def listen(self, target):
965
conn = create_connection(self.conf)
967
listener = ZmqListener(self)
969
conn.create_consumer(target.topic, listener)
970
conn.create_consumer('%s.%s' % (target.topic, target.server),
972
conn.create_consumer(target.topic, listener, fanout=True)
974
conn.consume_in_thread()
978
def listen_for_notifications(self, targets_and_priorities):
979
# NOTE(sileht): this listener implementation is limited
980
# because zeromq doesn't support requeing message
981
conn = create_connection(self.conf)
983
listener = ZmqListener(self, None)
984
for target, priority in targets_and_priorities:
985
# NOTE(ewindisch): dot-priority in rpc notifier does not
986
# work with our assumptions.
987
# NOTE(sileht): create_consumer doesn't support target.exchange
988
conn.create_consumer('%s-%s' % (target.topic, priority),
990
conn.consume_in_thread()