42
42
cfg.StrOpt('rpc_zmq_bind_address', default='*',
43
help='ZeroMQ bind address. Should be a wildcard (*), '
44
'an ethernet interface, or IP. '
45
'The "host" option should point or resolve to this address.'),
43
help='ZeroMQ bind address. Should be a wildcard (*), '
44
'an ethernet interface, or IP. '
45
'The "host" option should point or resolve to this '
47
48
# The module.Class to use for matchmaking.
48
49
cfg.StrOpt('rpc_zmq_matchmaker',
49
default='openstack.common.rpc.matchmaker.MatchMakerLocalhost',
50
default='nova.openstack.common.rpc.matchmaker.MatchMakerLocalhost',
50
51
help='MatchMaker driver'),
52
53
# The following port is unassigned by IANA as of 2012-05-21
53
54
cfg.IntOpt('rpc_zmq_port', default=9501,
54
help='ZeroMQ receiver listening port'),
55
help='ZeroMQ receiver listening port'),
56
57
cfg.IntOpt('rpc_zmq_contexts', default=1,
57
help='Number of ZeroMQ contexts, defaults to 1'),
58
help='Number of ZeroMQ contexts, defaults to 1'),
59
60
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
60
help='Directory for holding IPC sockets'),
61
help='Directory for holding IPC sockets'),
64
65
# These globals are defined in register_opts(conf),
119
120
self.subscribe(f)
121
122
LOG.debug(_("Connecting to %{addr}s with %{type}s"
122
"\n-> Subscribed to %{subscribe}s"
123
"\n-> bind: %{bind}s"),
124
{'addr': addr, 'type': self.socket_s(),
125
'subscribe': subscribe, 'bind': bind})
123
"\n-> Subscribed to %{subscribe}s"
124
"\n-> bind: %{bind}s"),
125
{'addr': addr, 'type': self.socket_s(),
126
'subscribe': subscribe, 'bind': bind})
198
199
def cast(self, msg_id, topic, data):
199
200
self.outq.send([str(msg_id), str(topic), str('cast'),
203
204
self.outq.close()
306
307
data.setdefault('version', None)
307
308
data.setdefault('args', [])
308
309
proxy.dispatch(ctx, data['version'],
309
data['method'], **data['args'])
310
data['method'], **data['args'])
312
313
class ZmqBaseReactor(ConsumerBase):
353
354
raise RPCException("Bad output socktype")
355
356
# Items push out.
356
outq = ZmqSocket(out_addr, zmq_type_out,
357
outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
359
359
self.mapping[inq] = outq
360
360
self.mapping[outq] = inq
429
429
if not topic in self.topic_proxy:
430
430
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
431
sock_type, bind=True)
431
sock_type, bind=True)
432
432
self.topic_proxy[topic] = outq
433
433
self.sockets.append(outq)
434
434
LOG.info(_("Created topic proxy: %s"), topic)
502
502
(self.conf.rpc_zmq_ipc_dir, topic)
504
504
LOG.debug(_("Consumer is a zmq.%s"),
505
['PULL', 'SUB'][sock_type == zmq.SUB])
505
['PULL', 'SUB'][sock_type == zmq.SUB])
507
507
self.reactor.register(proxy, inaddr, sock_type,
508
508
subscribe=subscribe, in_bind=False)