~ubuntu-branches/ubuntu/quantal/nova/quantal-proposed

« back to all changes in this revision

Viewing changes to nova/openstack/common/rpc/impl_zmq.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2012-08-16 14:04:11 UTC
  • mto: This revision was merged to the branch mainline in revision 84.
  • Revision ID: package-import@ubuntu.com-20120816140411-0mr4n241wmk30t9l
Tags: upstream-2012.2~f3
ImportĀ upstreamĀ versionĀ 2012.2~f3

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
#    License for the specific language governing permissions and limitations
15
15
#    under the License.
16
16
 
17
 
import json
18
17
import pprint
 
18
import socket
19
19
import string
20
20
import sys
21
21
import types
28
28
from nova.openstack.common import cfg
29
29
from nova.openstack.common.gettextutils import _
30
30
from nova.openstack.common import importutils
 
31
from nova.openstack.common import jsonutils
31
32
from nova.openstack.common.rpc import common as rpc_common
32
33
 
33
34
 
46
47
                    'address.'),
47
48
 
48
49
    # The module.Class to use for matchmaking.
49
 
    cfg.StrOpt('rpc_zmq_matchmaker',
50
 
        default='nova.openstack.common.rpc.matchmaker.MatchMakerLocalhost',
51
 
        help='MatchMaker driver'),
 
50
    cfg.StrOpt(
 
51
        'rpc_zmq_matchmaker',
 
52
        default=('nova.openstack.common.rpc.'
 
53
                 'matchmaker.MatchMakerLocalhost'),
 
54
        help='MatchMaker driver',
 
55
    ),
52
56
 
53
57
    # The following port is unassigned by IANA as of 2012-05-21
54
58
    cfg.IntOpt('rpc_zmq_port', default=9501,
59
63
 
60
64
    cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
61
65
               help='Directory for holding IPC sockets'),
 
66
 
 
67
    cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
 
68
               help='Name of this node. Must be a valid hostname, FQDN, or '
 
69
                    'IP address. Must match "host" option, if running Nova.')
62
70
]
63
71
 
64
72
 
65
73
# These globals are defined in register_opts(conf),
66
74
# a mandatory initialization call
67
 
FLAGS = None
 
75
CONF = None
68
76
ZMQ_CTX = None  # ZeroMQ Context, must be global.
69
77
matchmaker = None  # memoized matchmaker object
70
78
 
76
84
    Error if a developer passes us bad data.
77
85
    """
78
86
    try:
79
 
        return str(json.dumps(data, ensure_ascii=True))
 
87
        return str(jsonutils.dumps(data, ensure_ascii=True))
80
88
    except TypeError:
81
89
        LOG.error(_("JSON serialization failed."))
82
90
        raise
87
95
    Deserialization wrapper
88
96
    """
89
97
    LOG.debug(_("Deserializing: %s"), data)
90
 
    return json.loads(data)
 
98
    return jsonutils.loads(data)
91
99
 
92
100
 
93
101
class ZmqSocket(object):
119
127
        for f in do_sub:
120
128
            self.subscribe(f)
121
129
 
122
 
        LOG.debug(_("Connecting to %{addr}s with %{type}s"
123
 
                    "\n-> Subscribed to %{subscribe}s"
124
 
                    "\n-> bind: %{bind}s"),
125
 
                  {'addr': addr, 'type': self.socket_s(),
126
 
                   'subscribe': subscribe, 'bind': bind})
 
130
        str_data = {'addr': addr, 'type': self.socket_s(),
 
131
                    'subscribe': subscribe, 'bind': bind}
 
132
 
 
133
        LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data)
 
134
        LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data)
 
135
        LOG.debug(_("-> bind: %(bind)s"), str_data)
127
136
 
128
137
        try:
129
138
            if bind:
265
274
            ctx.replies)
266
275
 
267
276
        LOG.debug(_("Sending reply"))
268
 
        cast(FLAGS, ctx, topic, {
 
277
        cast(CONF, ctx, topic, {
269
278
            'method': '-process_reply',
270
279
            'args': {
271
280
                'msg_id': msg_id,
320
329
    def __init__(self, conf):
321
330
        super(ZmqBaseReactor, self).__init__()
322
331
 
323
 
        self.conf = conf
324
332
        self.mapping = {}
325
333
        self.proxies = {}
326
334
        self.threads = []
396
404
        super(ZmqProxy, self).__init__(conf)
397
405
 
398
406
        self.topic_proxy = {}
399
 
        ipc_dir = conf.rpc_zmq_ipc_dir
 
407
        ipc_dir = CONF.rpc_zmq_ipc_dir
400
408
 
401
409
        self.topic_proxy['zmq_replies'] = \
402
410
            ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
404
412
        self.sockets.append(self.topic_proxy['zmq_replies'])
405
413
 
406
414
    def consume(self, sock):
407
 
        ipc_dir = self.conf.rpc_zmq_ipc_dir
 
415
        ipc_dir = CONF.rpc_zmq_ipc_dir
408
416
 
409
417
        #TODO(ewindisch): use zero-copy (i.e. references, not copying)
410
418
        data = sock.recv()
478
486
    """Manages connections and threads."""
479
487
 
480
488
    def __init__(self, conf):
481
 
        self.conf = conf
482
489
        self.reactor = ZmqReactor(conf)
483
490
 
484
491
    def create_consumer(self, topic, proxy, fanout=False):
499
506
 
500
507
        # Receive messages from (local) proxy
501
508
        inaddr = "ipc://%s/zmq_topic_%s" % \
502
 
            (self.conf.rpc_zmq_ipc_dir, topic)
 
509
            (CONF.rpc_zmq_ipc_dir, topic)
503
510
 
504
511
        LOG.debug(_("Consumer is a zmq.%s"),
505
512
                  ['PULL', 'SUB'][sock_type == zmq.SUB])
518
525
 
519
526
 
520
527
def _cast(addr, context, msg_id, topic, msg, timeout=None):
521
 
    timeout_cast = timeout or FLAGS.rpc_cast_timeout
 
528
    timeout_cast = timeout or CONF.rpc_cast_timeout
522
529
    payload = [RpcContext.marshal(context), msg]
523
530
 
524
531
    with Timeout(timeout_cast, exception=rpc_common.Timeout):
536
543
 
537
544
def _call(addr, context, msg_id, topic, msg, timeout=None):
538
545
    # timeout_response is how long we wait for a response
539
 
    timeout = timeout or FLAGS.rpc_response_timeout
 
546
    timeout = timeout or CONF.rpc_response_timeout
540
547
 
541
548
    # The msg_id is used to track replies.
542
549
    msg_id = str(uuid.uuid4().hex)
543
550
 
544
551
    # Replies always come into the reply service.
545
 
    # We require that FLAGS.host is a FQDN, IP, or resolvable hostname.
546
 
    reply_topic = "zmq_replies.%s" % FLAGS.host
 
552
    reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
547
553
 
548
554
    LOG.debug(_("Creating payload"))
549
555
    # Curry the original request into a reply method.
565
571
    with Timeout(timeout, exception=rpc_common.Timeout):
566
572
        try:
567
573
            msg_waiter = ZmqSocket(
568
 
                "ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir,
 
574
                "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
569
575
                zmq.SUB, subscribe=msg_id, bind=False
570
576
            )
571
577
 
591
597
    # responses for Exceptions.
592
598
    for resp in responses:
593
599
        if isinstance(resp, types.DictType) and 'exc' in resp:
594
 
            raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc'])
 
600
            raise rpc_common.deserialize_remote_exception(CONF, resp['exc'])
595
601
 
596
602
    return responses[-1]
597
603
 
602
608
    dispatches to the matchmaker and sends
603
609
    message to all relevant hosts.
604
610
    """
605
 
    conf = FLAGS
 
611
    conf = CONF
606
612
    LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
607
613
 
608
614
    queues = matchmaker.queues(topic)
689
695
    # We memoize through these globals
690
696
    global ZMQ_CTX
691
697
    global matchmaker
692
 
    global FLAGS
 
698
    global CONF
693
699
 
694
 
    if not FLAGS:
 
700
    if not CONF:
695
701
        conf.register_opts(zmq_opts)
696
 
        FLAGS = conf
 
702
        CONF = conf
697
703
    # Don't re-set, if this method is called twice.
698
704
    if not ZMQ_CTX:
699
705
        ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
707
713
        if mm_path[-1][0] not in string.ascii_uppercase:
708
714
            LOG.error(_("Matchmaker could not be loaded.\n"
709
715
                      "rpc_zmq_matchmaker is not a class."))
710
 
            raise
 
716
            raise RPCException(_("Error loading Matchmaker."))
711
717
 
712
718
        mm_impl = importutils.import_module(mm_module)
713
719
        mm_constructor = getattr(mm_impl, mm_class)
714
720
        matchmaker = mm_constructor()
 
721
 
 
722
 
 
723
register_opts(cfg.CONF)