216
217
socket_type = zmq.PUSH
217
218
self.outq = ZmqSocket(addr, socket_type, bind=bind)
219
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
220
def cast(self, msg_id, topic, data, envelope=False):
220
221
msg_id = msg_id or 0
223
data = rpc_common.serialize_msg(data, force_envelope)
224
self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
223
if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
224
self.outq.send(map(bytes,
225
(msg_id, topic, 'cast', _serialize(data))))
228
rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
229
zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
230
self.outq.send(map(bytes,
231
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
227
234
self.outq.close()
323
def process(self, style, target, proxy, ctx, data):
330
def process(self, proxy, ctx, data):
324
331
data.setdefault('version', None)
325
332
data.setdefault('args', {})
433
440
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
434
441
data = sock.recv()
435
msg_id, topic, style, in_msg = data
436
topic = topic.split('.', 1)[0]
438
444
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
440
if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
446
if topic.startswith('fanout~'):
448
topic = topic.split('.', 1)[0]
449
elif topic.startswith('zmq_replies'):
441
450
sock_type = zmq.PUB
443
452
sock_type = zmq.PUSH
503
512
ipc_dir, run_as_root=True)
504
513
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
505
514
except utils.ProcessExecutionError:
506
LOG.error(_("Could not create IPC directory %s") %
515
with excutils.save_and_reraise_exception():
516
LOG.error(_("Could not create IPC directory %s") %
511
520
self.register(consumption_proxy,
515
524
except zmq.ZMQError:
516
LOG.error(_("Could not create ZeroMQ receiver daemon. "
517
"Socket may already be in use."))
525
with excutils.save_and_reraise_exception():
526
LOG.error(_("Could not create ZeroMQ receiver daemon. "
527
"Socket may already be in use."))
520
529
super(ZmqProxy, self).consume_in_thread()
532
def unflatten_envelope(packenv):
533
"""Unflattens the RPC envelope.
534
Takes a list and returns a dictionary.
535
i.e. [1,2,3,4] => {1: 2, 3: 4}
543
except StopIteration:
523
547
class ZmqReactor(ZmqBaseReactor):
525
549
A consumer class implementing a
540
564
self.mapping[sock].send(data)
543
msg_id, topic, style, in_msg = data
545
ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
546
ctx = RpcContext.unmarshal(ctx)
548
567
proxy = self.proxies[sock]
550
self.pool.spawn_n(self.process, style, topic,
569
if data[2] == 'cast': # Legacy protocol
572
ctx, msg = _deserialize(packenv)
573
request = rpc_common.deserialize_msg(msg)
574
ctx = RpcContext.unmarshal(ctx)
575
elif data[2] == 'impl_zmq_v2':
578
msg = unflatten_envelope(packenv)
579
request = rpc_common.deserialize_msg(msg)
581
# Unmarshal only after verifying the message.
582
ctx = RpcContext.unmarshal(data[3])
584
LOG.error(_("ZMQ Envelope version unsupported or unknown."))
587
self.pool.spawn_n(self.process, proxy, ctx, request)
554
590
class Connection(rpc_common.Connection):
555
591
"""Manages connections and threads."""
557
593
def __init__(self, conf):
558
595
self.reactor = ZmqReactor(conf)
560
597
def create_consumer(self, topic, proxy, fanout=False):
561
# Only consume on the base topic name.
562
topic = topic.split('.', 1)[0]
564
LOG.info(_("Create Consumer for topic (%(topic)s)") %
598
# Register with matchmaker.
599
_get_matchmaker().register(topic, CONF.rpc_zmq_host)
567
601
# Subscription scenarios
569
604
subscribe = ('', fanout)[type(fanout) == str]
571
topic = 'fanout~' + topic
605
topic = 'fanout~' + topic.split('.', 1)[0]
573
607
sock_type = zmq.PULL
609
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
611
if topic in self.topics:
612
LOG.info(_("Skipping topic registration. Already registered."))
576
615
# Receive messages from (local) proxy
577
616
inaddr = "ipc://%s/zmq_topic_%s" % \
583
622
self.reactor.register(proxy, inaddr, sock_type,
584
623
subscribe=subscribe, in_bind=False)
624
self.topics.append(topic)
627
_get_matchmaker().stop_heartbeat()
628
for topic in self.topics:
629
_get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
587
631
self.reactor.close()
590
635
self.reactor.wait()
592
637
def consume_in_thread(self):
638
_get_matchmaker().start_heartbeat()
593
639
self.reactor.consume_in_thread()
596
def _cast(addr, context, topic, msg, timeout=None, serialize=True,
597
force_envelope=False, _msg_id=None):
642
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
598
644
timeout_cast = timeout or CONF.rpc_cast_timeout
599
645
payload = [RpcContext.marshal(context), msg]
603
649
conn = ZmqClient(addr)
605
651
# assumes cast can't return an exception
606
conn.cast(_msg_id, topic, payload, serialize, force_envelope)
652
conn.cast(_msg_id, topic, payload, envelope)
607
653
except zmq.ZMQError:
608
654
raise RPCException("Cast failed. ZMQ Socket Exception")
614
660
def _call(addr, context, topic, msg, timeout=None,
615
serialize=True, force_envelope=False):
616
662
# timeout_response is how long we wait for a response
617
663
timeout = timeout or CONF.rpc_response_timeout
642
688
with Timeout(timeout, exception=rpc_common.Timeout):
644
690
msg_waiter = ZmqSocket(
645
"ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
691
"ipc://%s/zmq_topic_zmq_replies.%s" %
692
(CONF.rpc_zmq_ipc_dir,
646
694
zmq.SUB, subscribe=msg_id, bind=False
649
697
LOG.debug(_("Sending cast"))
650
_cast(addr, context, topic, payload,
651
serialize=serialize, force_envelope=force_envelope)
698
_cast(addr, context, topic, payload, envelope)
653
700
LOG.debug(_("Cast sent; Waiting reply"))
654
701
# Blocks until receives reply
655
702
msg = msg_waiter.recv()
656
703
LOG.debug(_("Received message: %s"), msg)
657
704
LOG.debug(_("Unpacking response"))
658
responses = _deserialize(msg[-1])[-1]['args']['response']
706
if msg[2] == 'cast': # Legacy version
707
raw_msg = _deserialize(msg[-1])[-1]
708
elif msg[2] == 'impl_zmq_v2':
709
rpc_envelope = unflatten_envelope(msg[4:])
710
raw_msg = rpc_common.deserialize_msg(rpc_envelope)
712
raise rpc_common.UnsupportedRpcEnvelopeVersion(
713
_("Unsupported or unknown ZMQ envelope returned."))
715
responses = raw_msg['args']['response']
659
716
# ZMQError trumps the Timeout error.
660
717
except zmq.ZMQError:
661
718
raise RPCException("ZMQ Socket Error")
676
733
return responses[-1]
679
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
680
force_envelope=False, _msg_id=None):
736
def _multi_send(method, context, topic, msg, timeout=None,
737
envelope=False, _msg_id=None):
682
739
Wraps the sending of messages,
683
740
dispatches to the matchmaker and sends
694
751
LOG.warn(_("No matchmaker results. Not casting."))
695
752
# While not strictly a timeout, callers know how to handle
696
753
# this exception and a timeout isn't too big a lie.
697
raise rpc_common.Timeout, "No match from matchmaker."
754
raise rpc_common.Timeout(_("No match from matchmaker."))
699
756
# This supports brokerless fanout (addresses > 1)
700
757
for queue in queues:
704
761
if method.__name__ == '_cast':
705
762
eventlet.spawn_n(method, _addr, context,
706
_topic, msg, timeout, serialize,
707
force_envelope, _msg_id)
763
_topic, msg, timeout, envelope,
709
766
return method(_addr, context, _topic, msg, timeout,
710
serialize, force_envelope)
713
770
def create_connection(conf, new=True):
737
794
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
740
def notify(conf, context, topic, msg, **kwargs):
797
def notify(conf, context, topic, msg, envelope):
742
799
Send notification event.
743
800
Notifications are sent to topic-priority.
746
803
# NOTE(ewindisch): dot-priority in rpc notifier does not
747
804
# work with our assumptions.
748
topic.replace('.', '-')
749
kwargs['serialize'] = kwargs.pop('envelope')
750
kwargs['force_envelope'] = True
751
cast(conf, context, topic, msg, **kwargs)
805
topic = topic.replace('.', '-')
806
cast(conf, context, topic, msg, envelope=envelope)