26
27
from oslo.config import cfg
29
from heat.openstack.common import excutils
28
30
from heat.openstack.common.gettextutils import _
29
31
from heat.openstack.common import importutils
30
32
from heat.openstack.common import jsonutils
31
from heat.openstack.common import processutils as utils
32
33
from heat.openstack.common.rpc import common as rpc_common
34
35
zmq = importutils.try_import('eventlet.green.zmq')
85
86
def _serialize(data):
87
"""Serialization wrapper.
88
89
We prefer using JSON, but it cannot encode all types.
89
90
Error if a developer passes us bad data.
92
return str(jsonutils.dumps(data, ensure_ascii=True))
93
return jsonutils.dumps(data, ensure_ascii=True)
94
LOG.error(_("JSON serialization failed."))
95
with excutils.save_and_reraise_exception():
96
LOG.error(_("JSON serialization failed."))
98
99
def _deserialize(data):
100
Deserialization wrapper
100
"""Deserialization wrapper."""
102
101
LOG.debug(_("Deserializing: %s"), data)
103
102
return jsonutils.loads(data)
106
105
class ZmqSocket(object):
108
A tiny wrapper around ZeroMQ to simplify the send/recv protocol
109
and connection management.
106
"""A tiny wrapper around ZeroMQ.
108
Simplifies the send/recv protocol and connection management.
111
109
Can be used as a Context (supports the 'with' statement).
197
195
LOG.error("ZeroMQ socket could not be closed.")
198
def recv(self, **kwargs):
201
199
if not self.can_recv:
202
200
raise RPCException(_("You cannot recv on this socket."))
203
return self.sock.recv_multipart()
201
return self.sock.recv_multipart(**kwargs)
205
def send(self, data):
203
def send(self, data, **kwargs):
206
204
if not self.can_send:
207
205
raise RPCException(_("You cannot send on this socket."))
208
self.sock.send_multipart(data)
206
self.sock.send_multipart(data, **kwargs)
211
209
class ZmqClient(object):
212
210
"""Client for ZMQ sockets."""
214
def __init__(self, addr, socket_type=None, bind=False):
215
if socket_type is None:
216
socket_type = zmq.PUSH
217
self.outq = ZmqSocket(addr, socket_type, bind=bind)
219
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
221
data = rpc_common.serialize_msg(data, force_envelope)
222
self.outq.send([str(msg_id), str(topic), str('cast'),
212
def __init__(self, addr):
213
self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
215
def cast(self, msg_id, topic, data, envelope):
219
self.outq.send(map(bytes,
220
(msg_id, topic, 'cast', _serialize(data))))
223
rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
224
zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
225
self.outq.send(map(bytes,
226
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
226
229
self.outq.close()
269
272
result = proxy.dispatch(
270
ctx, data['version'], data['method'], **data['args'])
273
ctx, data['version'], data['method'],
274
data.get('namespace'), **data['args'])
271
275
return ConsumerBase.normalize_reply(result, ctx.replies)
272
276
except greenlet.GreenletExit:
273
277
# ignore these since they are just from shutdowns
275
except rpc_common.ClientException, e:
279
except rpc_common.ClientException as e:
276
280
LOG.debug(_("Expected exception during message handling (%s)") %
286
290
def reply(self, ctx, proxy,
287
291
msg_id=None, context=None, topic=None, msg=None):
288
292
"""Reply to a casted call."""
289
# Our real method is curried into msg['args']
293
# NOTE(ewindisch): context kwarg exists for Grizzly compat.
294
# this may be able to be removed earlier than
295
# 'I' if ConsumerBase.process were refactored.
296
if type(msg) is list:
291
child_ctx = RpcContext.unmarshal(msg[0])
292
301
response = ConsumerBase.normalize_reply(
293
self._get_response(child_ctx, proxy, topic, msg[1]),
302
self._get_response(ctx, proxy, topic, payload),
296
305
LOG.debug(_("Sending reply"))
297
cast(CONF, ctx, topic, {
306
_multi_send(_cast, ctx, topic, {
298
307
'method': '-process_reply',
309
'msg_id': msg_id, # Include for Folsom compat.
301
310
'response': response
306
315
class ConsumerBase(object):
322
def process(self, style, target, proxy, ctx, data):
331
def process(self, proxy, ctx, data):
323
332
data.setdefault('version', None)
324
333
data.setdefault('args', {})
339
348
proxy.dispatch(ctx, data['version'],
340
data['method'], **data['args'])
349
data['method'], data.get('namespace'), **data['args'])
343
352
class ZmqBaseReactor(ConsumerBase):
345
A consumer class implementing a
346
centralized casting broker (PULL-PUSH)
347
for RoundRobin requests.
353
"""A consumer class implementing a centralized casting broker (PULL-PUSH).
355
Used for RoundRobin requests.
350
358
def __init__(self, conf):
417
425
class ZmqProxy(ZmqBaseReactor):
419
A consumer class implementing a
420
topic-based proxy, forwarding to
426
"""A consumer class implementing a topic-based proxy.
428
Forwards to IPC sockets.
424
431
def __init__(self, conf):
425
432
super(ZmqProxy, self).__init__(conf)
433
pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
434
self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
427
436
self.topic_proxy = {}
429
438
def consume(self, sock):
430
439
ipc_dir = CONF.rpc_zmq_ipc_dir
432
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
434
msg_id, topic, style, in_msg = data
435
topic = topic.split('.', 1)[0]
437
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
439
# Handle zmq_replies magic
441
data = sock.recv(copy=False)
442
topic = data[1].bytes
440
444
if topic.startswith('fanout~'):
441
445
sock_type = zmq.PUB
446
topic = topic.split('.', 1)[0]
442
447
elif topic.startswith('zmq_replies'):
443
448
sock_type = zmq.PUB
444
inside = rpc_common.deserialize_msg(_deserialize(in_msg))
445
msg_id = inside[-1]['args']['msg_id']
446
response = inside[-1]['args']['response']
447
LOG.debug(_("->response->%s"), response)
448
data = [str(msg_id), _serialize(response)]
450
450
sock_type = zmq.PUSH
454
454
LOG.info(_("Creating proxy for topic: %s"), topic)
457
# The topic is received over the network,
458
# don't trust this input.
459
if self.badchars.search(topic) is not None:
460
emsg = _("Topic contained dangerous characters.")
462
raise RPCException(emsg)
457
464
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
458
465
(ipc_dir, topic),
459
466
sock_type, bind=True)
491
496
self.topic_proxy[topic].put_nowait(data)
492
LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
494
497
except eventlet.queue.Full:
495
498
LOG.error(_("Local per-topic backlog buffer full for topic "
496
499
"%(topic)s. Dropping message.") % {'topic': topic})
498
501
def consume_in_thread(self):
499
"""Runs the ZmqProxy service"""
502
"""Runs the ZmqProxy service."""
500
503
ipc_dir = CONF.rpc_zmq_ipc_dir
501
504
consume_in = "tcp://%s:%s" % \
502
505
(CONF.rpc_zmq_bind_address,
503
506
CONF.rpc_zmq_port)
504
507
consumption_proxy = InternalContext(None)
506
if not os.path.isdir(ipc_dir):
508
utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
509
utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
510
ipc_dir, run_as_root=True)
511
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
512
except utils.ProcessExecutionError:
513
LOG.error(_("Could not create IPC directory %s") %
512
if not os.path.isdir(ipc_dir):
513
with excutils.save_and_reraise_exception():
514
LOG.error(_("Required IPC directory does not exist at"
515
" %s") % (ipc_dir, ))
518
517
self.register(consumption_proxy,
522
521
except zmq.ZMQError:
523
LOG.error(_("Could not create ZeroMQ receiver daemon. "
524
"Socket may already be in use."))
522
if os.access(ipc_dir, os.X_OK):
523
with excutils.save_and_reraise_exception():
524
LOG.error(_("Permission denied to IPC directory at"
525
" %s") % (ipc_dir, ))
526
with excutils.save_and_reraise_exception():
527
LOG.error(_("Could not create ZeroMQ receiver daemon. "
528
"Socket may already be in use."))
527
530
super(ZmqProxy, self).consume_in_thread()
533
def unflatten_envelope(packenv):
534
"""Unflattens the RPC envelope.
536
Takes a list and returns a dictionary.
537
i.e. [1,2,3,4] => {1: 2, 3: 4}
545
except StopIteration:
530
549
class ZmqReactor(ZmqBaseReactor):
532
A consumer class implementing a
533
consumer for messages. Can also be
550
"""A consumer class implementing a consumer for messages.
552
Can also be used as a 1:1 proxy
537
555
def __init__(self, conf):
547
565
self.mapping[sock].send(data)
550
msg_id, topic, style, in_msg = data
552
ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
553
ctx = RpcContext.unmarshal(ctx)
555
568
proxy = self.proxies[sock]
557
self.pool.spawn_n(self.process, style, topic,
570
if data[2] == 'cast': # Legacy protocol
573
ctx, msg = _deserialize(packenv)
574
request = rpc_common.deserialize_msg(msg)
575
ctx = RpcContext.unmarshal(ctx)
576
elif data[2] == 'impl_zmq_v2':
579
msg = unflatten_envelope(packenv)
580
request = rpc_common.deserialize_msg(msg)
582
# Unmarshal only after verifying the message.
583
ctx = RpcContext.unmarshal(data[3])
585
LOG.error(_("ZMQ Envelope version unsupported or unknown."))
588
self.pool.spawn_n(self.process, proxy, ctx, request)
561
591
class Connection(rpc_common.Connection):
562
592
"""Manages connections and threads."""
564
594
def __init__(self, conf):
565
596
self.reactor = ZmqReactor(conf)
567
598
def create_consumer(self, topic, proxy, fanout=False):
568
# Only consume on the base topic name.
569
topic = topic.split('.', 1)[0]
571
LOG.info(_("Create Consumer for topic (%(topic)s)") %
599
# Register with matchmaker.
600
_get_matchmaker().register(topic, CONF.rpc_zmq_host)
574
602
# Subscription scenarios
576
605
subscribe = ('', fanout)[type(fanout) == str]
578
topic = 'fanout~' + topic
606
topic = 'fanout~' + topic.split('.', 1)[0]
580
608
sock_type = zmq.PULL
610
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
612
if topic in self.topics:
613
LOG.info(_("Skipping topic registration. Already registered."))
583
616
# Receive messages from (local) proxy
584
617
inaddr = "ipc://%s/zmq_topic_%s" % \
590
623
self.reactor.register(proxy, inaddr, sock_type,
591
624
subscribe=subscribe, in_bind=False)
625
self.topics.append(topic)
628
_get_matchmaker().stop_heartbeat()
629
for topic in self.topics:
630
_get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
594
632
self.reactor.close()
597
636
self.reactor.wait()
599
638
def consume_in_thread(self):
639
_get_matchmaker().start_heartbeat()
600
640
self.reactor.consume_in_thread()
603
def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
604
force_envelope=False):
643
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
605
645
timeout_cast = timeout or CONF.rpc_cast_timeout
606
646
payload = [RpcContext.marshal(context), msg]
610
650
conn = ZmqClient(addr)
612
652
# assumes cast can't return an exception
613
conn.cast(msg_id, topic, payload, serialize, force_envelope)
653
conn.cast(_msg_id, topic, payload, envelope)
614
654
except zmq.ZMQError:
615
655
raise RPCException("Cast failed. ZMQ Socket Exception")
621
def _call(addr, context, msg_id, topic, msg, timeout=None,
622
serialize=True, force_envelope=False):
661
def _call(addr, context, topic, msg, timeout=None,
623
663
# timeout_response is how long we wait for a response
624
664
timeout = timeout or CONF.rpc_response_timeout
649
689
with Timeout(timeout, exception=rpc_common.Timeout):
651
691
msg_waiter = ZmqSocket(
652
"ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
692
"ipc://%s/zmq_topic_zmq_replies.%s" %
693
(CONF.rpc_zmq_ipc_dir,
653
695
zmq.SUB, subscribe=msg_id, bind=False
656
698
LOG.debug(_("Sending cast"))
657
_cast(addr, context, msg_id, topic, payload,
658
serialize=serialize, force_envelope=force_envelope)
699
_cast(addr, context, topic, payload, envelope)
660
701
LOG.debug(_("Cast sent; Waiting reply"))
661
702
# Blocks until receives reply
662
703
msg = msg_waiter.recv()
663
704
LOG.debug(_("Received message: %s"), msg)
664
705
LOG.debug(_("Unpacking response"))
665
responses = _deserialize(msg[-1])
707
if msg[2] == 'cast': # Legacy version
708
raw_msg = _deserialize(msg[-1])[-1]
709
elif msg[2] == 'impl_zmq_v2':
710
rpc_envelope = unflatten_envelope(msg[4:])
711
raw_msg = rpc_common.deserialize_msg(rpc_envelope)
713
raise rpc_common.UnsupportedRpcEnvelopeVersion(
714
_("Unsupported or unknown ZMQ envelope returned."))
716
responses = raw_msg['args']['response']
666
717
# ZMQError trumps the Timeout error.
667
718
except zmq.ZMQError:
668
719
raise RPCException("ZMQ Socket Error")
720
except (IndexError, KeyError):
721
raise RPCException(_("RPC Message Invalid."))
670
723
if 'msg_waiter' in vars():
671
724
msg_waiter.close()
681
734
return responses[-1]
684
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
685
force_envelope=False):
687
Wraps the sending of messages,
688
dispatches to the matchmaker and sends
689
message to all relevant hosts.
737
def _multi_send(method, context, topic, msg, timeout=None,
738
envelope=False, _msg_id=None):
739
"""Wraps the sending of messages.
741
Dispatches to the matchmaker and sends message to all relevant hosts.
692
744
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
695
747
LOG.debug(_("Sending message(s) to: %s"), queues)
697
749
# Don't stack if we have no matchmaker results
699
751
LOG.warn(_("No matchmaker results. Not casting."))
700
752
# While not strictly a timeout, callers know how to handle
701
753
# this exception and a timeout isn't too big a lie.
702
raise rpc_common.Timeout, "No match from matchmaker."
754
raise rpc_common.Timeout(_("No match from matchmaker."))
704
756
# This supports brokerless fanout (addresses > 1)
705
757
for queue in queues:
709
761
if method.__name__ == '_cast':
710
762
eventlet.spawn_n(method, _addr, context,
711
_topic, _topic, msg, timeout, serialize,
763
_topic, msg, timeout, envelope,
714
return method(_addr, context, _topic, _topic, msg, timeout,
715
serialize, force_envelope)
766
return method(_addr, context, _topic, msg, timeout,
718
770
def create_connection(conf, new=True):
742
794
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
745
def notify(conf, context, topic, msg, **kwargs):
747
Send notification event.
797
def notify(conf, context, topic, msg, envelope):
798
"""Send notification event.
748
800
Notifications are sent to topic-priority.
749
801
This differs from the AMQP drivers which send to topic.priority.
751
803
# NOTE(ewindisch): dot-priority in rpc notifier does not
752
804
# work with our assumptions.
753
topic.replace('.', '-')
754
kwargs['serialize'] = kwargs.pop('envelope')
755
kwargs['force_envelope'] = True
756
cast(conf, context, topic, msg, **kwargs)
805
topic = topic.replace('.', '-')
806
cast(conf, context, topic, msg, envelope=envelope)
780
def _get_matchmaker():
830
def _get_matchmaker(*args, **kwargs):
781
831
global matchmaker
782
832
if not matchmaker:
783
matchmaker = importutils.import_object(CONF.rpc_zmq_matchmaker)
833
mm = CONF.rpc_zmq_matchmaker
834
if mm.endswith('matchmaker.MatchMakerRing'):
835
mm.replace('matchmaker', 'matchmaker_ring')
836
LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
837
' %(new)s instead') % dict(
838
orig=CONF.rpc_zmq_matchmaker, new=mm))
839
matchmaker = importutils.import_object(mm, *args, **kwargs)
784
840
return matchmaker