14
14
# License for the specific language governing permissions and limitations
15
15
# under the License.
25
from eventlet.green import zmq
26
from oslo.config import cfg
28
from nova.openstack.common import cfg
29
28
from nova.openstack.common.gettextutils import _
30
29
from nova.openstack.common import importutils
31
30
from nova.openstack.common import jsonutils
31
from nova.openstack.common import processutils as utils
32
32
from nova.openstack.common.rpc import common as rpc_common
34
zmq = importutils.try_import('eventlet.green.zmq')
35
36
# for convenience, are not modified.
36
37
pformat = pprint.pformat
61
62
cfg.IntOpt('rpc_zmq_contexts', default=1,
62
63
help='Number of ZeroMQ contexts, defaults to 1'),
65
cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
66
help='Maximum number of ingress messages to locally buffer '
67
'per topic. Default is unlimited.'),
64
69
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
65
70
help='Directory for holding IPC sockets'),
202
211
class ZmqClient(object):
203
212
"""Client for ZMQ sockets."""
205
def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
214
def __init__(self, addr, socket_type=None, bind=False):
215
if socket_type is None:
216
socket_type = zmq.PUSH
206
217
self.outq = ZmqSocket(addr, socket_type, bind=bind)
208
219
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
210
223
data = rpc_common.serialize_msg(data, force_envelope)
211
self.outq.send([str(msg_id), str(topic), str('cast'),
224
self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
215
227
self.outq.close()
311
323
def process(self, style, target, proxy, ctx, data):
324
data.setdefault('version', None)
325
data.setdefault('args', {})
312
327
# Method starting with - are
313
328
# processed internally. (non-valid method name)
314
method = data['method']
329
method = data.get('method')
331
LOG.error(_("RPC message did not include method."))
316
334
# Internal method
317
335
# uses internal context for safety.
318
if data['method'][0] == '-':
319
# For reply / process_reply
321
if method == 'reply':
322
self.private_ctx.reply(ctx, proxy, **data['args'])
336
if method == '-reply':
337
self.private_ctx.reply(ctx, proxy, **data['args'])
325
data.setdefault('version', None)
326
data.setdefault('args', {})
327
340
proxy.dispatch(ctx, data['version'],
328
341
data['method'], **data['args'])
431
438
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
433
# Handle zmq_replies magic
434
if topic.startswith('fanout~'):
436
elif topic.startswith('zmq_replies'):
438
inside = rpc_common.deserialize_msg(_deserialize(in_msg))
439
msg_id = inside[-1]['args']['msg_id']
440
response = inside[-1]['args']['response']
441
LOG.debug(_("->response->%s"), response)
442
data = [str(msg_id), _serialize(response)]
440
if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
444
443
sock_type = zmq.PUSH
446
if not topic in self.topic_proxy:
447
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
448
sock_type, bind=True)
449
self.topic_proxy[topic] = outq
450
self.sockets.append(outq)
451
LOG.info(_("Created topic proxy: %s"), topic)
453
# It takes some time for a pub socket to open,
454
# before we can have any faith in doing a send() to it.
455
if sock_type == zmq.PUB:
458
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
459
self.topic_proxy[topic].send(data)
460
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
445
if topic not in self.topic_proxy:
446
def publisher(waiter):
447
LOG.info(_("Creating proxy for topic: %s"), topic)
450
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
452
sock_type, bind=True)
454
waiter.send_exception(*sys.exc_info())
457
self.topic_proxy[topic] = eventlet.queue.LightQueue(
458
CONF.rpc_zmq_topic_backlog)
459
self.sockets.append(out_sock)
461
# It takes some time for a pub socket to open,
462
# before we can have any faith in doing a send() to it.
463
if sock_type == zmq.PUB:
469
data = self.topic_proxy[topic].get()
471
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
474
wait_sock_creation = eventlet.event.Event()
475
eventlet.spawn(publisher, wait_sock_creation)
478
wait_sock_creation.wait()
480
LOG.error(_("Topic socket file creation failed."))
484
self.topic_proxy[topic].put_nowait(data)
485
LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
487
except eventlet.queue.Full:
488
LOG.error(_("Local per-topic backlog buffer full for topic "
489
"%(topic)s. Dropping message.") % {'topic': topic})
491
def consume_in_thread(self):
492
"""Runs the ZmqProxy service"""
493
ipc_dir = CONF.rpc_zmq_ipc_dir
494
consume_in = "tcp://%s:%s" % \
495
(CONF.rpc_zmq_bind_address,
497
consumption_proxy = InternalContext(None)
499
if not os.path.isdir(ipc_dir):
501
utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
502
utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
503
ipc_dir, run_as_root=True)
504
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
505
except utils.ProcessExecutionError:
506
LOG.error(_("Could not create IPC directory %s") %
511
self.register(consumption_proxy,
516
LOG.error(_("Could not create ZeroMQ receiver daemon. "
517
"Socket may already be in use."))
520
super(ZmqProxy, self).consume_in_thread()
463
523
class ZmqReactor(ZmqBaseReactor):
533
593
self.reactor.consume_in_thread()
536
def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
537
force_envelope=False):
596
def _cast(addr, context, topic, msg, timeout=None, serialize=True,
597
force_envelope=False, _msg_id=None):
538
598
timeout_cast = timeout or CONF.rpc_cast_timeout
539
599
payload = [RpcContext.marshal(context), msg]
588
649
LOG.debug(_("Sending cast"))
589
_cast(addr, context, msg_id, topic, payload)
650
_cast(addr, context, topic, payload,
651
serialize=serialize, force_envelope=force_envelope)
591
653
LOG.debug(_("Cast sent; Waiting reply"))
592
654
# Blocks until receives reply
593
655
msg = msg_waiter.recv()
594
656
LOG.debug(_("Received message: %s"), msg)
595
657
LOG.debug(_("Unpacking response"))
596
responses = _deserialize(msg[-1])
658
responses = _deserialize(msg[-1])[-1]['args']['response']
597
659
# ZMQError trumps the Timeout error.
598
660
except zmq.ZMQError:
599
661
raise RPCException("ZMQ Socket Error")
662
except (IndexError, KeyError):
663
raise RPCException(_("RPC Message Invalid."))
601
665
if 'msg_waiter' in vars():
602
666
msg_waiter.close()
640
704
if method.__name__ == '_cast':
641
705
eventlet.spawn_n(method, _addr, context,
642
_topic, _topic, msg, timeout, serialize,
706
_topic, msg, timeout, serialize,
707
force_envelope, _msg_id)
645
return method(_addr, context, _topic, _topic, msg, timeout)
709
return method(_addr, context, _topic, msg, timeout,
710
serialize, force_envelope)
648
713
def create_connection(conf, new=True):
690
755
"""Clean up resources in use by implementation."""
692
761
global matchmaker
693
762
matchmaker = None
698
def register_opts(conf):
699
"""Registration of options for this driver."""
700
#NOTE(ewindisch): ZMQ_CTX and matchmaker
701
# are initialized here as this is as good
702
# an initialization method as any.
704
# We memoize through these globals
767
raise ImportError("Failed to import eventlet.green.zmq")
710
conf.register_opts(zmq_opts)
712
# Don't re-set, if this method is called twice.
714
ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
771
ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
775
def _get_matchmaker(*args, **kwargs):
715
777
if not matchmaker:
716
# rpc_zmq_matchmaker should be set to a 'module.Class'
717
mm_path = conf.rpc_zmq_matchmaker.split('.')
718
mm_module = '.'.join(mm_path[:-1])
719
mm_class = mm_path[-1]
721
# Only initialize a class.
722
if mm_path[-1][0] not in string.ascii_uppercase:
723
LOG.error(_("Matchmaker could not be loaded.\n"
724
"rpc_zmq_matchmaker is not a class."))
725
raise RPCException(_("Error loading Matchmaker."))
727
mm_impl = importutils.import_module(mm_module)
728
mm_constructor = getattr(mm_impl, mm_class)
729
matchmaker = mm_constructor()
732
register_opts(cfg.CONF)
778
matchmaker = importutils.import_object(
779
CONF.rpc_zmq_matchmaker, *args, **kwargs)