~ubuntu-branches/ubuntu/saucy/nova/saucy-proposed

« back to all changes in this revision

Viewing changes to nova/openstack/common/rpc/impl_zmq.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Adam Gandelman
  • Date: 2013-02-22 09:27:29 UTC
  • mfrom: (1.1.68)
  • Revision ID: package-import@ubuntu.com-20130222092729-nn3gt8rf97uvts77
Tags: 2013.1.g3-0ubuntu1
[ Chuck Short ]
* New usptream release. 
* debian/patches/debian/patches/fix-ubuntu-tests.patch: Refreshed.
* debian/nova-baremetal.logrotate: Fix logfile path.
* debian/control, debian/nova-spiceproxy.{install, logrotate, upstart}:
  Add spice html5 proxy support.
* debian/nova-novncproxy.upstart: Start on runlevel [2345]
* debian/rules: Call testr directly since run_tests.sh -N gives weird return
  value when tests pass.
* debian/pyddist-overrides: Add websockify.
* debian/nova-common.postinst: Removed config file conversion, since
  the option is no longer available. (LP: #1110567)
* debian/control: Add python-pyasn1 as a dependency.
* debian/control: Add python-oslo-config as a dependency.
* debian/control: Suggest sysfsutils, sg3-utils, multipath-tools for fibre
  channel support.

[ Adam Gandelman ]
* debian/control: Fix typo (websocikfy -> websockify).

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
#    License for the specific language governing permissions and limitations
15
15
#    under the License.
16
16
 
 
17
import os
17
18
import pprint
18
19
import socket
19
 
import string
20
20
import sys
21
21
import types
22
22
import uuid
23
23
 
24
24
import eventlet
25
 
from eventlet.green import zmq
26
25
import greenlet
 
26
from oslo.config import cfg
27
27
 
28
 
from nova.openstack.common import cfg
29
28
from nova.openstack.common.gettextutils import _
30
29
from nova.openstack.common import importutils
31
30
from nova.openstack.common import jsonutils
 
31
from nova.openstack.common import processutils as utils
32
32
from nova.openstack.common.rpc import common as rpc_common
33
33
 
 
34
zmq = importutils.try_import('eventlet.green.zmq')
34
35
 
35
36
# for convenience, are not modified.
36
37
pformat = pprint.pformat
61
62
    cfg.IntOpt('rpc_zmq_contexts', default=1,
62
63
               help='Number of ZeroMQ contexts, defaults to 1'),
63
64
 
 
65
    cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
 
66
               help='Maximum number of ingress messages to locally buffer '
 
67
                    'per topic. Default is unlimited.'),
 
68
 
64
69
    cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
65
70
               help='Directory for holding IPC sockets'),
66
71
 
70
75
]
71
76
 
72
77
 
73
 
# These globals are defined in register_opts(conf),
74
 
# a mandatory initialization call
75
 
CONF = None
 
78
CONF = cfg.CONF
 
79
CONF.register_opts(zmq_opts)
 
80
 
76
81
ZMQ_CTX = None  # ZeroMQ Context, must be global.
77
82
matchmaker = None  # memoized matchmaker object
78
83
 
84
89
    Error if a developer passes us bad data.
85
90
    """
86
91
    try:
87
 
        return str(jsonutils.dumps(data, ensure_ascii=True))
 
92
        return jsonutils.dumps(data, ensure_ascii=True)
88
93
    except TypeError:
89
94
        LOG.error(_("JSON serialization failed."))
90
95
        raise
107
112
    """
108
113
 
109
114
    def __init__(self, addr, zmq_type, bind=True, subscribe=None):
110
 
        self.sock = ZMQ_CTX.socket(zmq_type)
 
115
        self.sock = _get_ctxt().socket(zmq_type)
111
116
        self.addr = addr
112
117
        self.type = zmq_type
113
118
        self.subscriptions = []
181
186
                    pass
182
187
            self.subscriptions = []
183
188
 
184
 
        # Linger -1 prevents lost/dropped messages
185
189
        try:
186
 
            self.sock.close(linger=-1)
 
190
            # Default is to linger
 
191
            self.sock.close()
187
192
        except Exception:
188
 
            pass
 
193
            # While this is a bad thing to happen,
 
194
            # it would be much worse if some of the code calling this
 
195
            # were to fail. For now, lets log, and later evaluate
 
196
            # if we can safely raise here.
 
197
            LOG.error("ZeroMQ socket could not be closed.")
189
198
        self.sock = None
190
199
 
191
200
    def recv(self):
202
211
class ZmqClient(object):
203
212
    """Client for ZMQ sockets."""
204
213
 
205
 
    def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
 
214
    def __init__(self, addr, socket_type=None, bind=False):
 
215
        if socket_type is None:
 
216
            socket_type = zmq.PUSH
206
217
        self.outq = ZmqSocket(addr, socket_type, bind=bind)
207
218
 
208
219
    def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
 
220
        msg_id = msg_id or 0
 
221
 
209
222
        if serialize:
210
223
            data = rpc_common.serialize_msg(data, force_envelope)
211
 
        self.outq.send([str(msg_id), str(topic), str('cast'),
212
 
                        _serialize(data)])
 
224
        self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
213
225
 
214
226
    def close(self):
215
227
        self.outq.close()
283
295
            ctx.replies)
284
296
 
285
297
        LOG.debug(_("Sending reply"))
286
 
        cast(CONF, ctx, topic, {
 
298
        _multi_send(_cast, ctx, topic, {
287
299
            'method': '-process_reply',
288
300
            'args': {
289
 
                'msg_id': msg_id,
 
301
                'msg_id': msg_id,  # Include for Folsom compat.
290
302
                'response': response
291
303
            }
292
 
        })
 
304
        }, _msg_id=msg_id)
293
305
 
294
306
 
295
307
class ConsumerBase(object):
309
321
            return [result]
310
322
 
311
323
    def process(self, style, target, proxy, ctx, data):
 
324
        data.setdefault('version', None)
 
325
        data.setdefault('args', {})
 
326
 
312
327
        # Method starting with - are
313
328
        # processed internally. (non-valid method name)
314
 
        method = data['method']
 
329
        method = data.get('method')
 
330
        if not method:
 
331
            LOG.error(_("RPC message did not include method."))
 
332
            return
315
333
 
316
334
        # Internal method
317
335
        # uses internal context for safety.
318
 
        if data['method'][0] == '-':
319
 
            # For reply / process_reply
320
 
            method = method[1:]
321
 
            if method == 'reply':
322
 
                self.private_ctx.reply(ctx, proxy, **data['args'])
 
336
        if method == '-reply':
 
337
            self.private_ctx.reply(ctx, proxy, **data['args'])
323
338
            return
324
339
 
325
 
        data.setdefault('version', None)
326
 
        data.setdefault('args', {})
327
340
        proxy.dispatch(ctx, data['version'],
328
341
                       data['method'], **data['args'])
329
342
 
413
426
        super(ZmqProxy, self).__init__(conf)
414
427
 
415
428
        self.topic_proxy = {}
416
 
        ipc_dir = CONF.rpc_zmq_ipc_dir
417
 
 
418
 
        self.topic_proxy['zmq_replies'] = \
419
 
            ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
420
 
                      zmq.PUB, bind=True)
421
 
        self.sockets.append(self.topic_proxy['zmq_replies'])
422
429
 
423
430
    def consume(self, sock):
424
431
        ipc_dir = CONF.rpc_zmq_ipc_dir
430
437
 
431
438
        LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
432
439
 
433
 
        # Handle zmq_replies magic
434
 
        if topic.startswith('fanout~'):
435
 
            sock_type = zmq.PUB
436
 
        elif topic.startswith('zmq_replies'):
437
 
            sock_type = zmq.PUB
438
 
            inside = rpc_common.deserialize_msg(_deserialize(in_msg))
439
 
            msg_id = inside[-1]['args']['msg_id']
440
 
            response = inside[-1]['args']['response']
441
 
            LOG.debug(_("->response->%s"), response)
442
 
            data = [str(msg_id), _serialize(response)]
 
440
        if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
 
441
            sock_type = zmq.PUB
443
442
        else:
444
443
            sock_type = zmq.PUSH
445
444
 
446
 
        if not topic in self.topic_proxy:
447
 
            outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
448
 
                             sock_type, bind=True)
449
 
            self.topic_proxy[topic] = outq
450
 
            self.sockets.append(outq)
451
 
            LOG.info(_("Created topic proxy: %s"), topic)
452
 
 
453
 
            # It takes some time for a pub socket to open,
454
 
            # before we can have any faith in doing a send() to it.
455
 
            if sock_type == zmq.PUB:
456
 
                eventlet.sleep(.5)
457
 
 
458
 
        LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
459
 
        self.topic_proxy[topic].send(data)
460
 
        LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
 
445
        if topic not in self.topic_proxy:
 
446
            def publisher(waiter):
 
447
                LOG.info(_("Creating proxy for topic: %s"), topic)
 
448
 
 
449
                try:
 
450
                    out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
 
451
                                         (ipc_dir, topic),
 
452
                                         sock_type, bind=True)
 
453
                except RPCException:
 
454
                    waiter.send_exception(*sys.exc_info())
 
455
                    return
 
456
 
 
457
                self.topic_proxy[topic] = eventlet.queue.LightQueue(
 
458
                    CONF.rpc_zmq_topic_backlog)
 
459
                self.sockets.append(out_sock)
 
460
 
 
461
                # It takes some time for a pub socket to open,
 
462
                # before we can have any faith in doing a send() to it.
 
463
                if sock_type == zmq.PUB:
 
464
                    eventlet.sleep(.5)
 
465
 
 
466
                waiter.send(True)
 
467
 
 
468
                while(True):
 
469
                    data = self.topic_proxy[topic].get()
 
470
                    out_sock.send(data)
 
471
                    LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
 
472
                              {'data': data})
 
473
 
 
474
            wait_sock_creation = eventlet.event.Event()
 
475
            eventlet.spawn(publisher, wait_sock_creation)
 
476
 
 
477
            try:
 
478
                wait_sock_creation.wait()
 
479
            except RPCException:
 
480
                LOG.error(_("Topic socket file creation failed."))
 
481
                return
 
482
 
 
483
        try:
 
484
            self.topic_proxy[topic].put_nowait(data)
 
485
            LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
 
486
                      {'data': data})
 
487
        except eventlet.queue.Full:
 
488
            LOG.error(_("Local per-topic backlog buffer full for topic "
 
489
                        "%(topic)s. Dropping message.") % {'topic': topic})
 
490
 
 
491
    def consume_in_thread(self):
 
492
        """Runs the ZmqProxy service"""
 
493
        ipc_dir = CONF.rpc_zmq_ipc_dir
 
494
        consume_in = "tcp://%s:%s" % \
 
495
            (CONF.rpc_zmq_bind_address,
 
496
             CONF.rpc_zmq_port)
 
497
        consumption_proxy = InternalContext(None)
 
498
 
 
499
        if not os.path.isdir(ipc_dir):
 
500
            try:
 
501
                utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
 
502
                utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
 
503
                              ipc_dir, run_as_root=True)
 
504
                utils.execute('chmod', '750', ipc_dir, run_as_root=True)
 
505
            except utils.ProcessExecutionError:
 
506
                LOG.error(_("Could not create IPC directory %s") %
 
507
                          (ipc_dir, ))
 
508
                raise
 
509
 
 
510
        try:
 
511
            self.register(consumption_proxy,
 
512
                          consume_in,
 
513
                          zmq.PULL,
 
514
                          out_bind=True)
 
515
        except zmq.ZMQError:
 
516
            LOG.error(_("Could not create ZeroMQ receiver daemon. "
 
517
                        "Socket may already be in use."))
 
518
            raise
 
519
 
 
520
        super(ZmqProxy, self).consume_in_thread()
461
521
 
462
522
 
463
523
class ZmqReactor(ZmqBaseReactor):
533
593
        self.reactor.consume_in_thread()
534
594
 
535
595
 
536
 
def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
537
 
          force_envelope=False):
 
596
def _cast(addr, context, topic, msg, timeout=None, serialize=True,
 
597
          force_envelope=False, _msg_id=None):
538
598
    timeout_cast = timeout or CONF.rpc_cast_timeout
539
599
    payload = [RpcContext.marshal(context), msg]
540
600
 
543
603
            conn = ZmqClient(addr)
544
604
 
545
605
            # assumes cast can't return an exception
546
 
            conn.cast(msg_id, topic, payload, serialize, force_envelope)
 
606
            conn.cast(_msg_id, topic, payload, serialize, force_envelope)
547
607
        except zmq.ZMQError:
548
608
            raise RPCException("Cast failed. ZMQ Socket Exception")
549
609
        finally:
551
611
                conn.close()
552
612
 
553
613
 
554
 
def _call(addr, context, msg_id, topic, msg, timeout=None):
 
614
def _call(addr, context, topic, msg, timeout=None,
 
615
          serialize=True, force_envelope=False):
555
616
    # timeout_response is how long we wait for a response
556
617
    timeout = timeout or CONF.rpc_response_timeout
557
618
 
586
647
            )
587
648
 
588
649
            LOG.debug(_("Sending cast"))
589
 
            _cast(addr, context, msg_id, topic, payload)
 
650
            _cast(addr, context, topic, payload,
 
651
                  serialize=serialize, force_envelope=force_envelope)
590
652
 
591
653
            LOG.debug(_("Cast sent; Waiting reply"))
592
654
            # Blocks until receives reply
593
655
            msg = msg_waiter.recv()
594
656
            LOG.debug(_("Received message: %s"), msg)
595
657
            LOG.debug(_("Unpacking response"))
596
 
            responses = _deserialize(msg[-1])
 
658
            responses = _deserialize(msg[-1])[-1]['args']['response']
597
659
        # ZMQError trumps the Timeout error.
598
660
        except zmq.ZMQError:
599
661
            raise RPCException("ZMQ Socket Error")
 
662
        except (IndexError, KeyError):
 
663
            raise RPCException(_("RPC Message Invalid."))
600
664
        finally:
601
665
            if 'msg_waiter' in vars():
602
666
                msg_waiter.close()
613
677
 
614
678
 
615
679
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
616
 
                force_envelope=False):
 
680
                force_envelope=False, _msg_id=None):
617
681
    """
618
682
    Wraps the sending of messages,
619
683
    dispatches to the matchmaker and sends
622
686
    conf = CONF
623
687
    LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
624
688
 
625
 
    queues = matchmaker.queues(topic)
 
689
    queues = _get_matchmaker().queues(topic)
626
690
    LOG.debug(_("Sending message(s) to: %s"), queues)
627
691
 
628
692
    # Don't stack if we have no matchmaker results
639
703
 
640
704
        if method.__name__ == '_cast':
641
705
            eventlet.spawn_n(method, _addr, context,
642
 
                             _topic, _topic, msg, timeout, serialize,
643
 
                             force_envelope)
 
706
                             _topic, msg, timeout, serialize,
 
707
                             force_envelope, _msg_id)
644
708
            return
645
 
        return method(_addr, context, _topic, _topic, msg, timeout)
 
709
        return method(_addr, context, _topic, msg, timeout,
 
710
                      serialize, force_envelope)
646
711
 
647
712
 
648
713
def create_connection(conf, new=True):
689
754
def cleanup():
690
755
    """Clean up resources in use by implementation."""
691
756
    global ZMQ_CTX
 
757
    if ZMQ_CTX:
 
758
        ZMQ_CTX.term()
 
759
    ZMQ_CTX = None
 
760
 
692
761
    global matchmaker
693
762
    matchmaker = None
694
 
    ZMQ_CTX.term()
695
 
    ZMQ_CTX = None
696
 
 
697
 
 
698
 
def register_opts(conf):
699
 
    """Registration of options for this driver."""
700
 
    #NOTE(ewindisch): ZMQ_CTX and matchmaker
701
 
    # are initialized here as this is as good
702
 
    # an initialization method as any.
703
 
 
704
 
    # We memoize through these globals
 
763
 
 
764
 
 
765
def _get_ctxt():
 
766
    if not zmq:
 
767
        raise ImportError("Failed to import eventlet.green.zmq")
 
768
 
705
769
    global ZMQ_CTX
706
 
    global matchmaker
707
 
    global CONF
708
 
 
709
 
    if not CONF:
710
 
        conf.register_opts(zmq_opts)
711
 
        CONF = conf
712
 
    # Don't re-set, if this method is called twice.
713
770
    if not ZMQ_CTX:
714
 
        ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
 
771
        ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
 
772
    return ZMQ_CTX
 
773
 
 
774
 
 
775
def _get_matchmaker(*args, **kwargs):
 
776
    global matchmaker
715
777
    if not matchmaker:
716
 
        # rpc_zmq_matchmaker should be set to a 'module.Class'
717
 
        mm_path = conf.rpc_zmq_matchmaker.split('.')
718
 
        mm_module = '.'.join(mm_path[:-1])
719
 
        mm_class = mm_path[-1]
720
 
 
721
 
        # Only initialize a class.
722
 
        if mm_path[-1][0] not in string.ascii_uppercase:
723
 
            LOG.error(_("Matchmaker could not be loaded.\n"
724
 
                      "rpc_zmq_matchmaker is not a class."))
725
 
            raise RPCException(_("Error loading Matchmaker."))
726
 
 
727
 
        mm_impl = importutils.import_module(mm_module)
728
 
        mm_constructor = getattr(mm_impl, mm_class)
729
 
        matchmaker = mm_constructor()
730
 
 
731
 
 
732
 
register_opts(cfg.CONF)
 
778
        matchmaker = importutils.import_object(
 
779
            CONF.rpc_zmq_matchmaker, *args, **kwargs)
 
780
    return matchmaker