~ubuntu-branches/ubuntu/vivid/oslo.messaging/vivid-proposed

« back to all changes in this revision

Viewing changes to .pc/zmq-server-routing.patch/oslo/messaging/_drivers/impl_zmq.py

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-12-11 14:34:04 UTC
  • mfrom: (1.1.8)
  • Revision ID: package-import@ubuntu.com-20141211143404-jmuzfr2q6k06wqxc
Tags: 1.5.1-0ubuntu1
* New upstream release.
  - d/p/*: Dropped, patches accepted upstream.
  - d/control: Add new dependencies, update minimum version
    requirements inline with new release.
  - d/p/enable-zmq-tests.patch: Cherry pick patch to fix misc
    ZeroMQ issues and enable test suite.
  - d/p/matchmaker-redis-fix.patch: Cherry pick fix for Redis
    matchmaker.
  - d/p/disable-zmq-tests.patch: Make conditional execution of
    zmq tests a little more intelligent.
* Enable easier use of ZeroMQ receiver daemon:
  - d/control: Add new oslo-messaging-zmq-receiver package.
  - d/oslo-messaging-zmq-receiver.{upstart,service}: Add upstart
    and systemd configuration files for receiver.
  - d/oslo-messaging-zmq-receiver.postinst: Create oslo user and
    group for daemon processes to run under.
  - d/etc/oslo-messaging.conf: Add default configuration file.
* d/control: Add Suggests on python-zmq to python-oslo.messaging. 
* d/control,compat: Bump compat level to 9.
* d/control,rules: Add subunit to BD's, generate pretty test output.
* d/control: Align dependency version requirements with upstream.
* d/control: Bumped Standards-Version to 3.9.6, no changes.
* d/control: Add dh-python to BD's.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#    Copyright 2011 Cloudscaling Group, Inc
2
 
#
3
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
4
 
#    not use this file except in compliance with the License. You may obtain
5
 
#    a copy of the License at
6
 
#
7
 
#         http://www.apache.org/licenses/LICENSE-2.0
8
 
#
9
 
#    Unless required by applicable law or agreed to in writing, software
10
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12
 
#    License for the specific language governing permissions and limitations
13
 
#    under the License.
14
 
 
15
 
import collections
16
 
import logging
17
 
import os
18
 
import pprint
19
 
import re
20
 
import socket
21
 
import sys
22
 
import threading
23
 
import types
24
 
import uuid
25
 
 
26
 
import eventlet
27
 
import greenlet
28
 
import six
29
 
from six import moves
30
 
 
31
 
from oslo.config import cfg
32
 
from oslo.messaging._drivers import base
33
 
from oslo.messaging._drivers import common as rpc_common
34
 
from oslo.messaging._executors import impl_eventlet  # FIXME(markmc)
35
 
from oslo.messaging.openstack.common.gettextutils import _
36
 
from oslo.messaging.openstack.common import jsonutils
37
 
from oslo.utils import excutils
38
 
from oslo.utils import importutils
39
 
 
40
 
 
41
 
zmq = importutils.try_import('eventlet.green.zmq')
42
 
 
43
 
# for convenience, are not modified.
44
 
pformat = pprint.pformat
45
 
Timeout = eventlet.timeout.Timeout
46
 
LOG = logging.getLogger(__name__)
47
 
RPCException = rpc_common.RPCException
48
 
 
49
 
zmq_opts = [
50
 
    cfg.StrOpt('rpc_zmq_bind_address', default='*',
51
 
               help='ZeroMQ bind address. Should be a wildcard (*), '
52
 
                    'an ethernet interface, or IP. '
53
 
                    'The "host" option should point or resolve to this '
54
 
                    'address.'),
55
 
 
56
 
    # The module.Class to use for matchmaking.
57
 
    cfg.StrOpt(
58
 
        'rpc_zmq_matchmaker',
59
 
        default=('oslo.messaging._drivers.'
60
 
                 'matchmaker.MatchMakerLocalhost'),
61
 
        help='MatchMaker driver.',
62
 
    ),
63
 
 
64
 
    # The following port is unassigned by IANA as of 2012-05-21
65
 
    cfg.IntOpt('rpc_zmq_port', default=9501,
66
 
               help='ZeroMQ receiver listening port.'),
67
 
 
68
 
    cfg.IntOpt('rpc_zmq_contexts', default=1,
69
 
               help='Number of ZeroMQ contexts, defaults to 1.'),
70
 
 
71
 
    cfg.IntOpt('rpc_zmq_topic_backlog',
72
 
               help='Maximum number of ingress messages to locally buffer '
73
 
                    'per topic. Default is unlimited.'),
74
 
 
75
 
    cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
76
 
               help='Directory for holding IPC sockets.'),
77
 
 
78
 
    cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
79
 
               sample_default='localhost',
80
 
               help='Name of this node. Must be a valid hostname, FQDN, or '
81
 
                    'IP address. Must match "host" option, if running Nova.'),
82
 
 
83
 
    cfg.IntOpt('rpc_cast_timeout',
84
 
               default=30,
85
 
               help='Seconds to wait before a cast expires (TTL). '
86
 
                    'Only supported by impl_zmq.'),
87
 
]
88
 
 
89
 
CONF = cfg.CONF
90
 
 
91
 
ZMQ_CTX = None  # ZeroMQ Context, must be global.
92
 
matchmaker = None  # memoized matchmaker object
93
 
 
94
 
 
95
 
def _serialize(data):
96
 
    """Serialization wrapper.
97
 
 
98
 
    We prefer using JSON, but it cannot encode all types.
99
 
    Error if a developer passes us bad data.
100
 
    """
101
 
    try:
102
 
        return jsonutils.dumps(data, ensure_ascii=True)
103
 
    except TypeError:
104
 
        with excutils.save_and_reraise_exception():
105
 
            LOG.error(_("JSON serialization failed."))
106
 
 
107
 
 
108
 
def _deserialize(data):
109
 
    """Deserialization wrapper."""
110
 
    LOG.debug("Deserializing: %s", data)
111
 
    return jsonutils.loads(data)
112
 
 
113
 
 
114
 
class ZmqSocket(object):
115
 
    """A tiny wrapper around ZeroMQ.
116
 
 
117
 
    Simplifies the send/recv protocol and connection management.
118
 
    Can be used as a Context (supports the 'with' statement).
119
 
    """
120
 
 
121
 
    def __init__(self, addr, zmq_type, bind=True, subscribe=None):
122
 
        self.sock = _get_ctxt().socket(zmq_type)
123
 
        self.addr = addr
124
 
        self.type = zmq_type
125
 
        self.subscriptions = []
126
 
 
127
 
        # Support failures on sending/receiving on wrong socket type.
128
 
        self.can_recv = zmq_type in (zmq.PULL, zmq.SUB)
129
 
        self.can_send = zmq_type in (zmq.PUSH, zmq.PUB)
130
 
        self.can_sub = zmq_type in (zmq.SUB, )
131
 
 
132
 
        # Support list, str, & None for subscribe arg (cast to list)
133
 
        do_sub = {
134
 
            list: subscribe,
135
 
            str: [subscribe],
136
 
            type(None): []
137
 
        }[type(subscribe)]
138
 
 
139
 
        for f in do_sub:
140
 
            self.subscribe(f)
141
 
 
142
 
        str_data = {'addr': addr, 'type': self.socket_s(),
143
 
                    'subscribe': subscribe, 'bind': bind}
144
 
 
145
 
        LOG.debug("Connecting to %(addr)s with %(type)s", str_data)
146
 
        LOG.debug("-> Subscribed to %(subscribe)s", str_data)
147
 
        LOG.debug("-> bind: %(bind)s", str_data)
148
 
 
149
 
        try:
150
 
            if bind:
151
 
                self.sock.bind(addr)
152
 
            else:
153
 
                self.sock.connect(addr)
154
 
        except Exception:
155
 
            raise RPCException(_("Could not open socket."))
156
 
 
157
 
    def socket_s(self):
158
 
        """Get socket type as string."""
159
 
        t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER',
160
 
                  'DEALER')
161
 
        return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type]
162
 
 
163
 
    def subscribe(self, msg_filter):
164
 
        """Subscribe."""
165
 
        if not self.can_sub:
166
 
            raise RPCException("Cannot subscribe on this socket.")
167
 
        LOG.debug("Subscribing to %s", msg_filter)
168
 
 
169
 
        try:
170
 
            self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
171
 
        except Exception:
172
 
            return
173
 
 
174
 
        self.subscriptions.append(msg_filter)
175
 
 
176
 
    def unsubscribe(self, msg_filter):
177
 
        """Unsubscribe."""
178
 
        if msg_filter not in self.subscriptions:
179
 
            return
180
 
        self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
181
 
        self.subscriptions.remove(msg_filter)
182
 
 
183
 
    def close(self):
184
 
        if self.sock is None or self.sock.closed:
185
 
            return
186
 
 
187
 
        # We must unsubscribe, or we'll leak descriptors.
188
 
        if self.subscriptions:
189
 
            for f in self.subscriptions:
190
 
                try:
191
 
                    self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
192
 
                except Exception:
193
 
                    pass
194
 
            self.subscriptions = []
195
 
 
196
 
        try:
197
 
            # Default is to linger
198
 
            self.sock.close()
199
 
        except Exception:
200
 
            # While this is a bad thing to happen,
201
 
            # it would be much worse if some of the code calling this
202
 
            # were to fail. For now, lets log, and later evaluate
203
 
            # if we can safely raise here.
204
 
            LOG.error("ZeroMQ socket could not be closed.")
205
 
        self.sock = None
206
 
 
207
 
    def recv(self, **kwargs):
208
 
        if not self.can_recv:
209
 
            raise RPCException(_("You cannot recv on this socket."))
210
 
        return self.sock.recv_multipart(**kwargs)
211
 
 
212
 
    def send(self, data, **kwargs):
213
 
        if not self.can_send:
214
 
            raise RPCException(_("You cannot send on this socket."))
215
 
        self.sock.send_multipart(data, **kwargs)
216
 
 
217
 
 
218
 
class ZmqClient(object):
219
 
    """Client for ZMQ sockets."""
220
 
 
221
 
    def __init__(self, addr):
222
 
        self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
223
 
 
224
 
    def cast(self, msg_id, topic, data, envelope):
225
 
        msg_id = msg_id or 0
226
 
 
227
 
        if not envelope:
228
 
            self.outq.send(map(bytes,
229
 
                           (msg_id, topic, 'cast', _serialize(data))))
230
 
            return
231
 
 
232
 
        rpc_envelope = rpc_common.serialize_msg(data[1])
233
 
        zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items())
234
 
        self.outq.send(map(bytes,
235
 
                       (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
236
 
 
237
 
    def close(self):
238
 
        self.outq.close()
239
 
 
240
 
 
241
 
class RpcContext(rpc_common.CommonRpcContext):
242
 
    """Context that supports replying to a rpc.call."""
243
 
    def __init__(self, **kwargs):
244
 
        self.replies = []
245
 
        super(RpcContext, self).__init__(**kwargs)
246
 
 
247
 
    def deepcopy(self):
248
 
        values = self.to_dict()
249
 
        values['replies'] = self.replies
250
 
        return self.__class__(**values)
251
 
 
252
 
    def reply(self, reply=None, failure=None, ending=False):
253
 
        if ending:
254
 
            return
255
 
        self.replies.append(reply)
256
 
 
257
 
    @classmethod
258
 
    def marshal(self, ctx):
259
 
        ctx_data = ctx.to_dict()
260
 
        return _serialize(ctx_data)
261
 
 
262
 
    @classmethod
263
 
    def unmarshal(self, data):
264
 
        return RpcContext.from_dict(_deserialize(data))
265
 
 
266
 
 
267
 
class InternalContext(object):
268
 
    """Used by ConsumerBase as a private context for - methods."""
269
 
 
270
 
    def __init__(self, proxy):
271
 
        self.proxy = proxy
272
 
        self.msg_waiter = None
273
 
 
274
 
    def _get_response(self, ctx, proxy, topic, data):
275
 
        """Process a curried message and cast the result to topic."""
276
 
        LOG.debug("Running func with context: %s", ctx.to_dict())
277
 
        data.setdefault('version', None)
278
 
        data.setdefault('args', {})
279
 
 
280
 
        try:
281
 
            result = proxy.dispatch(
282
 
                ctx, data['version'], data['method'],
283
 
                data.get('namespace'), **data['args'])
284
 
            return ConsumerBase.normalize_reply(result, ctx.replies)
285
 
        except greenlet.GreenletExit:
286
 
            # ignore these since they are just from shutdowns
287
 
            pass
288
 
        except rpc_common.ClientException as e:
289
 
            LOG.debug("Expected exception during message handling (%s)",
290
 
                      e._exc_info[1])
291
 
            return {'exc':
292
 
                    rpc_common.serialize_remote_exception(e._exc_info,
293
 
                                                          log_failure=False)}
294
 
        except Exception:
295
 
            LOG.error(_("Exception during message handling"))
296
 
            return {'exc':
297
 
                    rpc_common.serialize_remote_exception(sys.exc_info())}
298
 
 
299
 
    def reply(self, ctx, proxy,
300
 
              msg_id=None, context=None, topic=None, msg=None):
301
 
        """Reply to a casted call."""
302
 
        # NOTE(ewindisch): context kwarg exists for Grizzly compat.
303
 
        #                  this may be able to be removed earlier than
304
 
        #                  'I' if ConsumerBase.process were refactored.
305
 
        if type(msg) is list:
306
 
            payload = msg[-1]
307
 
        else:
308
 
            payload = msg
309
 
 
310
 
        response = ConsumerBase.normalize_reply(
311
 
            self._get_response(ctx, proxy, topic, payload),
312
 
            ctx.replies)
313
 
 
314
 
        LOG.debug("Sending reply")
315
 
        _multi_send(_cast, ctx, topic, {
316
 
            'method': '-process_reply',
317
 
            'args': {
318
 
                'msg_id': msg_id,  # Include for Folsom compat.
319
 
                'response': response
320
 
            }
321
 
        }, _msg_id=msg_id)
322
 
 
323
 
 
324
 
class ConsumerBase(object):
325
 
    """Base Consumer."""
326
 
 
327
 
    def __init__(self):
328
 
        self.private_ctx = InternalContext(None)
329
 
 
330
 
    @classmethod
331
 
    def normalize_reply(self, result, replies):
332
 
        # TODO(ewindisch): re-evaluate and document this method.
333
 
        if isinstance(result, types.GeneratorType):
334
 
            return list(result)
335
 
        elif replies:
336
 
            return replies
337
 
        else:
338
 
            return [result]
339
 
 
340
 
    def process(self, proxy, ctx, data):
341
 
        data.setdefault('version', None)
342
 
        data.setdefault('args', {})
343
 
 
344
 
        # Method starting with - are
345
 
        # processed internally. (non-valid method name)
346
 
        method = data.get('method')
347
 
        if not method:
348
 
            LOG.error(_("RPC message did not include method."))
349
 
            return
350
 
 
351
 
        # Internal method
352
 
        # uses internal context for safety.
353
 
        if method == '-reply':
354
 
            self.private_ctx.reply(ctx, proxy, **data['args'])
355
 
            return
356
 
 
357
 
        proxy.dispatch(ctx, data['version'],
358
 
                       data['method'], data.get('namespace'), **data['args'])
359
 
 
360
 
 
361
 
class ZmqBaseReactor(ConsumerBase):
362
 
    """A consumer class implementing a centralized casting broker (PULL-PUSH).
363
 
 
364
 
    Used for RoundRobin requests.
365
 
    """
366
 
 
367
 
    def __init__(self, conf):
368
 
        super(ZmqBaseReactor, self).__init__()
369
 
 
370
 
        self.proxies = {}
371
 
        self.threads = []
372
 
        self.sockets = []
373
 
        self.subscribe = {}
374
 
 
375
 
        self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
376
 
 
377
 
    def register(self, proxy, in_addr, zmq_type_in,
378
 
                 in_bind=True, subscribe=None):
379
 
 
380
 
        LOG.info(_("Registering reactor"))
381
 
 
382
 
        if zmq_type_in not in (zmq.PULL, zmq.SUB):
383
 
            raise RPCException("Bad input socktype")
384
 
 
385
 
        # Items push in.
386
 
        inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
387
 
                        subscribe=subscribe)
388
 
 
389
 
        self.proxies[inq] = proxy
390
 
        self.sockets.append(inq)
391
 
 
392
 
        LOG.info(_("In reactor registered"))
393
 
 
394
 
    def consume_in_thread(self):
395
 
        def _consume(sock):
396
 
            LOG.info(_("Consuming socket"))
397
 
            while True:
398
 
                self.consume(sock)
399
 
 
400
 
        for k in self.proxies.keys():
401
 
            self.threads.append(
402
 
                self.pool.spawn(_consume, k)
403
 
            )
404
 
 
405
 
    def wait(self):
406
 
        for t in self.threads:
407
 
            t.wait()
408
 
 
409
 
    def close(self):
410
 
        for s in self.sockets:
411
 
            s.close()
412
 
 
413
 
        for t in self.threads:
414
 
            t.kill()
415
 
 
416
 
 
417
 
class ZmqProxy(ZmqBaseReactor):
418
 
    """A consumer class implementing a topic-based proxy.
419
 
 
420
 
    Forwards to IPC sockets.
421
 
    """
422
 
 
423
 
    def __init__(self, conf):
424
 
        super(ZmqProxy, self).__init__(conf)
425
 
        pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
426
 
        self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
427
 
 
428
 
        self.topic_proxy = {}
429
 
 
430
 
    def consume(self, sock):
431
 
        ipc_dir = CONF.rpc_zmq_ipc_dir
432
 
 
433
 
        data = sock.recv(copy=False)
434
 
        topic = data[1].bytes
435
 
 
436
 
        if topic.startswith('fanout~'):
437
 
            sock_type = zmq.PUB
438
 
            topic = topic.split('.', 1)[0]
439
 
        elif topic.startswith('zmq_replies'):
440
 
            sock_type = zmq.PUB
441
 
        else:
442
 
            sock_type = zmq.PUSH
443
 
 
444
 
        if topic not in self.topic_proxy:
445
 
            def publisher(waiter):
446
 
                LOG.info(_("Creating proxy for topic: %s"), topic)
447
 
 
448
 
                try:
449
 
                    # The topic is received over the network,
450
 
                    # don't trust this input.
451
 
                    if self.badchars.search(topic) is not None:
452
 
                        emsg = _("Topic contained dangerous characters.")
453
 
                        LOG.warn(emsg)
454
 
                        raise RPCException(emsg)
455
 
 
456
 
                    out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
457
 
                                         (ipc_dir, topic),
458
 
                                         sock_type, bind=True)
459
 
                except RPCException:
460
 
                    waiter.send_exception(*sys.exc_info())
461
 
                    return
462
 
 
463
 
                self.topic_proxy[topic] = eventlet.queue.LightQueue(
464
 
                    CONF.rpc_zmq_topic_backlog)
465
 
                self.sockets.append(out_sock)
466
 
 
467
 
                # It takes some time for a pub socket to open,
468
 
                # before we can have any faith in doing a send() to it.
469
 
                if sock_type == zmq.PUB:
470
 
                    eventlet.sleep(.5)
471
 
 
472
 
                waiter.send(True)
473
 
 
474
 
                while(True):
475
 
                    data = self.topic_proxy[topic].get()
476
 
                    out_sock.send(data, copy=False)
477
 
 
478
 
            wait_sock_creation = eventlet.event.Event()
479
 
            eventlet.spawn(publisher, wait_sock_creation)
480
 
 
481
 
            try:
482
 
                wait_sock_creation.wait()
483
 
            except RPCException:
484
 
                LOG.error(_("Topic socket file creation failed."))
485
 
                return
486
 
 
487
 
        try:
488
 
            self.topic_proxy[topic].put_nowait(data)
489
 
        except eventlet.queue.Full:
490
 
            LOG.error(_("Local per-topic backlog buffer full for topic "
491
 
                        "%s. Dropping message."), topic)
492
 
 
493
 
    def consume_in_thread(self):
494
 
        """Runs the ZmqProxy service."""
495
 
        ipc_dir = CONF.rpc_zmq_ipc_dir
496
 
        consume_in = "tcp://%s:%s" % \
497
 
            (CONF.rpc_zmq_bind_address,
498
 
             CONF.rpc_zmq_port)
499
 
        consumption_proxy = InternalContext(None)
500
 
 
501
 
        try:
502
 
            os.makedirs(ipc_dir)
503
 
        except os.error:
504
 
            if not os.path.isdir(ipc_dir):
505
 
                with excutils.save_and_reraise_exception():
506
 
                    LOG.error(_("Required IPC directory does not exist at"
507
 
                                " %s"), ipc_dir)
508
 
        try:
509
 
            self.register(consumption_proxy,
510
 
                          consume_in,
511
 
                          zmq.PULL)
512
 
        except zmq.ZMQError:
513
 
            if os.access(ipc_dir, os.X_OK):
514
 
                with excutils.save_and_reraise_exception():
515
 
                    LOG.error(_("Permission denied to IPC directory at"
516
 
                                " %s"), ipc_dir)
517
 
            with excutils.save_and_reraise_exception():
518
 
                LOG.error(_("Could not create ZeroMQ receiver daemon. "
519
 
                            "Socket may already be in use."))
520
 
 
521
 
        super(ZmqProxy, self).consume_in_thread()
522
 
 
523
 
 
524
 
def unflatten_envelope(packenv):
525
 
    """Unflattens the RPC envelope.
526
 
 
527
 
    Takes a list and returns a dictionary.
528
 
    i.e. [1,2,3,4] => {1: 2, 3: 4}
529
 
    """
530
 
    i = iter(packenv)
531
 
    h = {}
532
 
    try:
533
 
        while True:
534
 
            k = six.next(i)
535
 
            h[k] = six.next(i)
536
 
    except StopIteration:
537
 
        return h
538
 
 
539
 
 
540
 
class ZmqReactor(ZmqBaseReactor):
541
 
    """A consumer class implementing a consumer for messages.
542
 
 
543
 
    Can also be used as a 1:1 proxy
544
 
    """
545
 
 
546
 
    def __init__(self, conf):
547
 
        super(ZmqReactor, self).__init__(conf)
548
 
 
549
 
    def consume(self, sock):
550
 
        # TODO(ewindisch): use zero-copy (i.e. references, not copying)
551
 
        data = sock.recv()
552
 
        LOG.debug("CONSUMER RECEIVED DATA: %s", data)
553
 
 
554
 
        proxy = self.proxies[sock]
555
 
 
556
 
        if data[2] == 'cast':  # Legacy protocol
557
 
            packenv = data[3]
558
 
 
559
 
            ctx, msg = _deserialize(packenv)
560
 
            request = rpc_common.deserialize_msg(msg)
561
 
            ctx = RpcContext.unmarshal(ctx)
562
 
        elif data[2] == 'impl_zmq_v2':
563
 
            packenv = data[4:]
564
 
 
565
 
            msg = unflatten_envelope(packenv)
566
 
            request = rpc_common.deserialize_msg(msg)
567
 
 
568
 
            # Unmarshal only after verifying the message.
569
 
            ctx = RpcContext.unmarshal(data[3])
570
 
        else:
571
 
            LOG.error(_("ZMQ Envelope version unsupported or unknown."))
572
 
            return
573
 
 
574
 
        self.pool.spawn_n(self.process, proxy, ctx, request)
575
 
 
576
 
 
577
 
class Connection(rpc_common.Connection):
578
 
    """Manages connections and threads."""
579
 
 
580
 
    def __init__(self, conf):
581
 
        self.topics = []
582
 
        self.reactor = ZmqReactor(conf)
583
 
 
584
 
    def create_consumer(self, topic, proxy, fanout=False):
585
 
        # Register with matchmaker.
586
 
        _get_matchmaker().register(topic, CONF.rpc_zmq_host)
587
 
 
588
 
        # Subscription scenarios
589
 
        if fanout:
590
 
            sock_type = zmq.SUB
591
 
            subscribe = ('', fanout)[type(fanout) == str]
592
 
            topic = 'fanout~' + topic.split('.', 1)[0]
593
 
        else:
594
 
            sock_type = zmq.PULL
595
 
            subscribe = None
596
 
            topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
597
 
 
598
 
        if topic in self.topics:
599
 
            LOG.info(_("Skipping topic registration. Already registered."))
600
 
            return
601
 
 
602
 
        # Receive messages from (local) proxy
603
 
        inaddr = "ipc://%s/zmq_topic_%s" % \
604
 
            (CONF.rpc_zmq_ipc_dir, topic)
605
 
 
606
 
        LOG.debug("Consumer is a zmq.%s",
607
 
                  ['PULL', 'SUB'][sock_type == zmq.SUB])
608
 
 
609
 
        self.reactor.register(proxy, inaddr, sock_type,
610
 
                              subscribe=subscribe, in_bind=False)
611
 
        self.topics.append(topic)
612
 
 
613
 
    def close(self):
614
 
        _get_matchmaker().stop_heartbeat()
615
 
        for topic in self.topics:
616
 
            _get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
617
 
 
618
 
        self.reactor.close()
619
 
        self.topics = []
620
 
 
621
 
    def wait(self):
622
 
        self.reactor.wait()
623
 
 
624
 
    def consume_in_thread(self):
625
 
        _get_matchmaker().start_heartbeat()
626
 
        self.reactor.consume_in_thread()
627
 
 
628
 
 
629
 
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
630
 
          _msg_id=None, allowed_remote_exmods=None):
631
 
    allowed_remote_exmods = allowed_remote_exmods or []
632
 
    timeout_cast = timeout or CONF.rpc_cast_timeout
633
 
    payload = [RpcContext.marshal(context), msg]
634
 
 
635
 
    with Timeout(timeout_cast, exception=rpc_common.Timeout):
636
 
        try:
637
 
            conn = ZmqClient(addr)
638
 
 
639
 
            # assumes cast can't return an exception
640
 
            conn.cast(_msg_id, topic, payload, envelope)
641
 
        except zmq.ZMQError:
642
 
            raise RPCException("Cast failed. ZMQ Socket Exception")
643
 
        finally:
644
 
            if 'conn' in vars():
645
 
                conn.close()
646
 
 
647
 
 
648
 
def _call(addr, context, topic, msg, timeout=None,
649
 
          envelope=False, allowed_remote_exmods=None):
650
 
    allowed_remote_exmods = allowed_remote_exmods or []
651
 
    # timeout_response is how long we wait for a response
652
 
    timeout = timeout or CONF.rpc_response_timeout
653
 
 
654
 
    # The msg_id is used to track replies.
655
 
    msg_id = uuid.uuid4().hex
656
 
 
657
 
    # Replies always come into the reply service.
658
 
    reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
659
 
 
660
 
    LOG.debug("Creating payload")
661
 
    # Curry the original request into a reply method.
662
 
    mcontext = RpcContext.marshal(context)
663
 
    payload = {
664
 
        'method': '-reply',
665
 
        'args': {
666
 
            'msg_id': msg_id,
667
 
            'topic': reply_topic,
668
 
            # TODO(ewindisch): safe to remove mcontext in I.
669
 
            'msg': [mcontext, msg]
670
 
        }
671
 
    }
672
 
 
673
 
    LOG.debug("Creating queue socket for reply waiter")
674
 
 
675
 
    # Messages arriving async.
676
 
    # TODO(ewindisch): have reply consumer with dynamic subscription mgmt
677
 
    with Timeout(timeout, exception=rpc_common.Timeout):
678
 
        try:
679
 
            msg_waiter = ZmqSocket(
680
 
                "ipc://%s/zmq_topic_zmq_replies.%s" %
681
 
                (CONF.rpc_zmq_ipc_dir,
682
 
                 CONF.rpc_zmq_host),
683
 
                zmq.SUB, subscribe=msg_id, bind=False
684
 
            )
685
 
 
686
 
            LOG.debug("Sending cast")
687
 
            _cast(addr, context, topic, payload, envelope=envelope)
688
 
 
689
 
            LOG.debug("Cast sent; Waiting reply")
690
 
            # Blocks until receives reply
691
 
            msg = msg_waiter.recv()
692
 
            LOG.debug("Received message: %s", msg)
693
 
            LOG.debug("Unpacking response")
694
 
 
695
 
            if msg[2] == 'cast':  # Legacy version
696
 
                raw_msg = _deserialize(msg[-1])[-1]
697
 
            elif msg[2] == 'impl_zmq_v2':
698
 
                rpc_envelope = unflatten_envelope(msg[4:])
699
 
                raw_msg = rpc_common.deserialize_msg(rpc_envelope)
700
 
            else:
701
 
                raise rpc_common.UnsupportedRpcEnvelopeVersion(
702
 
                    _("Unsupported or unknown ZMQ envelope returned."))
703
 
 
704
 
            responses = raw_msg['args']['response']
705
 
        # ZMQError trumps the Timeout error.
706
 
        except zmq.ZMQError:
707
 
            raise RPCException("ZMQ Socket Error")
708
 
        except (IndexError, KeyError):
709
 
            raise RPCException(_("RPC Message Invalid."))
710
 
        finally:
711
 
            if 'msg_waiter' in vars():
712
 
                msg_waiter.close()
713
 
 
714
 
    # It seems we don't need to do all of the following,
715
 
    # but perhaps it would be useful for multicall?
716
 
    # One effect of this is that we're checking all
717
 
    # responses for Exceptions.
718
 
    for resp in responses:
719
 
        if isinstance(resp, types.DictType) and 'exc' in resp:
720
 
            raise rpc_common.deserialize_remote_exception(
721
 
                resp['exc'], allowed_remote_exmods)
722
 
 
723
 
    return responses[-1]
724
 
 
725
 
 
726
 
def _multi_send(method, context, topic, msg, timeout=None,
727
 
                envelope=False, _msg_id=None, allowed_remote_exmods=None):
728
 
    """Wraps the sending of messages.
729
 
 
730
 
    Dispatches to the matchmaker and sends message to all relevant hosts.
731
 
    """
732
 
    allowed_remote_exmods = allowed_remote_exmods or []
733
 
    conf = CONF
734
 
    LOG.debug(' '.join(map(pformat, (topic, msg))))
735
 
 
736
 
    queues = _get_matchmaker().queues(topic)
737
 
    LOG.debug("Sending message(s) to: %s", queues)
738
 
 
739
 
    # Don't stack if we have no matchmaker results
740
 
    if not queues:
741
 
        LOG.warn(_("No matchmaker results. Not casting."))
742
 
        # While not strictly a timeout, callers know how to handle
743
 
        # this exception and a timeout isn't too big a lie.
744
 
        raise rpc_common.Timeout(_("No match from matchmaker."))
745
 
 
746
 
    # This supports brokerless fanout (addresses > 1)
747
 
    return_val = None
748
 
    for queue in queues:
749
 
        _topic, ip_addr = queue
750
 
        _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
751
 
 
752
 
        if method.__name__ == '_cast':
753
 
            eventlet.spawn_n(method, _addr, context,
754
 
                             _topic, msg, timeout, envelope, _msg_id)
755
 
        else:
756
 
            return_val = method(_addr, context, _topic, msg, timeout,
757
 
                                envelope, allowed_remote_exmods)
758
 
 
759
 
    return return_val
760
 
 
761
 
 
762
 
def create_connection(conf, new=True):
763
 
    return Connection(conf)
764
 
 
765
 
 
766
 
def multicall(conf, *args, **kwargs):
767
 
    """Multiple calls."""
768
 
    return _multi_send(_call, *args, **kwargs)
769
 
 
770
 
 
771
 
def call(conf, *args, **kwargs):
772
 
    """Send a message, expect a response."""
773
 
    data = _multi_send(_call, *args, **kwargs)
774
 
    return data[-1]
775
 
 
776
 
 
777
 
def cast(conf, *args, **kwargs):
778
 
    """Send a message expecting no reply."""
779
 
    _multi_send(_cast, *args, **kwargs)
780
 
 
781
 
 
782
 
def fanout_cast(conf, context, topic, msg, **kwargs):
783
 
    """Send a message to all listening and expect no reply."""
784
 
    # NOTE(ewindisch): fanout~ is used because it avoid splitting on .
785
 
    # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
786
 
    _multi_send(_cast, context, 'fanout~' + six.text_type(topic),
787
 
                msg, **kwargs)
788
 
 
789
 
 
790
 
def notify(conf, context, topic, msg, envelope):
791
 
    """Send notification event.
792
 
 
793
 
    Notifications are sent to topic-priority.
794
 
    This differs from the AMQP drivers which send to topic.priority.
795
 
    """
796
 
    # NOTE(ewindisch): dot-priority in rpc notifier does not
797
 
    # work with our assumptions.
798
 
    topic = topic.replace('.', '-')
799
 
    cast(conf, context, topic, msg, envelope=envelope)
800
 
 
801
 
 
802
 
def cleanup():
803
 
    """Clean up resources in use by implementation."""
804
 
    global ZMQ_CTX
805
 
    if ZMQ_CTX:
806
 
        ZMQ_CTX.term()
807
 
    ZMQ_CTX = None
808
 
 
809
 
    global matchmaker
810
 
    matchmaker = None
811
 
 
812
 
 
813
 
def _get_ctxt():
814
 
    if not zmq:
815
 
        raise ImportError("Failed to import eventlet.green.zmq")
816
 
 
817
 
    global ZMQ_CTX
818
 
    if not ZMQ_CTX:
819
 
        ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
820
 
    return ZMQ_CTX
821
 
 
822
 
 
823
 
def _get_matchmaker(*args, **kwargs):
824
 
    global matchmaker
825
 
    if not matchmaker:
826
 
        mm = CONF.rpc_zmq_matchmaker
827
 
        if mm.endswith('matchmaker.MatchMakerRing'):
828
 
            mm.replace('matchmaker', 'matchmaker_ring')
829
 
            LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
830
 
                       ' %(new)s instead') % dict(
831
 
                     orig=CONF.rpc_zmq_matchmaker, new=mm))
832
 
        matchmaker = importutils.import_object(mm, *args, **kwargs)
833
 
    return matchmaker
834
 
 
835
 
 
836
 
class ZmqIncomingMessage(base.IncomingMessage):
837
 
 
838
 
    ReceivedReply = collections.namedtuple(
839
 
        'ReceivedReply', ['reply', 'failure', 'log_failure'])
840
 
 
841
 
    def __init__(self, listener, ctxt, message):
842
 
        super(ZmqIncomingMessage, self).__init__(listener, ctxt, message)
843
 
        self.condition = threading.Condition()
844
 
        self.received = None
845
 
 
846
 
    def reply(self, reply=None, failure=None, log_failure=True):
847
 
        self.received = self.ReceivedReply(reply, failure, log_failure)
848
 
        with self.condition:
849
 
            self.condition.notify()
850
 
 
851
 
    def requeue(self):
852
 
        pass
853
 
 
854
 
 
855
 
class ZmqListener(base.Listener):
856
 
 
857
 
    def __init__(self, driver):
858
 
        super(ZmqListener, self).__init__(driver)
859
 
        self.incoming_queue = moves.queue.Queue()
860
 
 
861
 
    def dispatch(self, ctxt, version, method, namespace, **kwargs):
862
 
        message = {
863
 
            'method': method,
864
 
            'args': kwargs
865
 
        }
866
 
        if version:
867
 
            message['version'] = version
868
 
        if namespace:
869
 
            message['namespace'] = namespace
870
 
 
871
 
        incoming = ZmqIncomingMessage(self,
872
 
                                      ctxt.to_dict(),
873
 
                                      message)
874
 
 
875
 
        self.incoming_queue.put(incoming)
876
 
 
877
 
        with incoming.condition:
878
 
            incoming.condition.wait()
879
 
 
880
 
        assert incoming.received
881
 
 
882
 
        if incoming.received.failure:
883
 
            raise incoming.received.failure
884
 
        else:
885
 
            return incoming.received.reply
886
 
 
887
 
    def poll(self, timeout=None):
888
 
        try:
889
 
            return self.incoming_queue.get(timeout=timeout)
890
 
        except six.moves.queue.Empty:
891
 
            # timeout
892
 
            return None
893
 
 
894
 
 
895
 
class ZmqDriver(base.BaseDriver):
896
 
 
897
 
    # FIXME(markmc): allow this driver to be used without eventlet
898
 
 
899
 
    def __init__(self, conf, url, default_exchange=None,
900
 
                 allowed_remote_exmods=None):
901
 
        conf.register_opts(zmq_opts)
902
 
        conf.register_opts(impl_eventlet._eventlet_opts)
903
 
 
904
 
        super(ZmqDriver, self).__init__(conf, url, default_exchange,
905
 
                                        allowed_remote_exmods)
906
 
 
907
 
        # FIXME(markmc): handle default_exchange
908
 
 
909
 
        # FIXME(markmc): handle transport URL
910
 
        if self._url.hosts:
911
 
            raise NotImplementedError('The ZeroMQ driver does not yet support '
912
 
                                      'transport URLs')
913
 
 
914
 
        # FIXME(markmc): use self.conf everywhere
915
 
        if self.conf is not CONF:
916
 
            raise NotImplementedError('The ZeroMQ driver currently only works '
917
 
                                      'with oslo.config.cfg.CONF')
918
 
 
919
 
    def _send(self, target, ctxt, message,
920
 
              wait_for_reply=None, timeout=None, envelope=False):
921
 
 
922
 
        # FIXME(markmc): remove this temporary hack
923
 
        class Context(object):
924
 
            def __init__(self, d):
925
 
                self.d = d
926
 
 
927
 
            def to_dict(self):
928
 
                return self.d
929
 
 
930
 
        context = Context(ctxt)
931
 
 
932
 
        if wait_for_reply:
933
 
            method = _call
934
 
        else:
935
 
            method = _cast
936
 
 
937
 
        topic = target.topic
938
 
        if target.fanout:
939
 
            # NOTE(ewindisch): fanout~ is used because it avoid splitting on
940
 
            # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
941
 
            topic = 'fanout~' + topic
942
 
 
943
 
        reply = _multi_send(method, context, topic, message,
944
 
                            envelope=envelope,
945
 
                            allowed_remote_exmods=self._allowed_remote_exmods)
946
 
 
947
 
        if wait_for_reply:
948
 
            return reply[-1]
949
 
 
950
 
    def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
951
 
             retry=None):
952
 
        # NOTE(sileht): retry is not implemented because this driver never
953
 
        # retry anything
954
 
        return self._send(target, ctxt, message, wait_for_reply, timeout)
955
 
 
956
 
    def send_notification(self, target, ctxt, message, version, retry=None):
957
 
        # NOTE(ewindisch): dot-priority in rpc notifier does not
958
 
        # work with our assumptions.
959
 
        # NOTE(sileht): retry is not implemented because this driver never
960
 
        # retry anything
961
 
        target = target(topic=target.topic.replace('.', '-'))
962
 
        return self._send(target, ctxt, message, envelope=(version == 2.0))
963
 
 
964
 
    def listen(self, target):
965
 
        conn = create_connection(self.conf)
966
 
 
967
 
        listener = ZmqListener(self)
968
 
 
969
 
        conn.create_consumer(target.topic, listener)
970
 
        conn.create_consumer('%s.%s' % (target.topic, target.server),
971
 
                             listener)
972
 
        conn.create_consumer(target.topic, listener, fanout=True)
973
 
 
974
 
        conn.consume_in_thread()
975
 
 
976
 
        return listener
977
 
 
978
 
    def listen_for_notifications(self, targets_and_priorities):
979
 
        # NOTE(sileht): this listener implementation is limited
980
 
        # because zeromq doesn't support requeing message
981
 
        conn = create_connection(self.conf)
982
 
 
983
 
        listener = ZmqListener(self, None)
984
 
        for target, priority in targets_and_priorities:
985
 
            # NOTE(ewindisch): dot-priority in rpc notifier does not
986
 
            # work with our assumptions.
987
 
            # NOTE(sileht): create_consumer doesn't support target.exchange
988
 
            conn.create_consumer('%s-%s' % (target.topic, priority),
989
 
                                 listener)
990
 
        conn.consume_in_thread()
991
 
 
992
 
        return listener
993
 
 
994
 
    def cleanup(self):
995
 
        cleanup()