~ubuntu-cloud-archive/ubuntu/precise/nova/trunk

« back to all changes in this revision

Viewing changes to nova/rpc/impl_qpid.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Adam Gandelman
  • Date: 2012-06-22 12:39:57 UTC
  • mfrom: (1.1.57)
  • Revision ID: package-import@ubuntu.com-20120622123957-hbzwg84nt9rqwg8r
Tags: 2012.2~f2~20120621.14517-0ubuntu1
[ Chuck Short ]
* New upstream version.

[ Adam Gandelman ]
* debian/rules: Temporarily disable test suite while blocking
  tests are investigated. 
* debian/patches/kombu_tests_timeout.patch: Dropped.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
#    Copyright 2011 OpenStack LLC
4
 
#    Copyright 2011 - 2012, Red Hat, Inc.
5
 
#
6
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
7
 
#    not use this file except in compliance with the License. You may obtain
8
 
#    a copy of the License at
9
 
#
10
 
#         http://www.apache.org/licenses/LICENSE-2.0
11
 
#
12
 
#    Unless required by applicable law or agreed to in writing, software
13
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
 
#    License for the specific language governing permissions and limitations
16
 
#    under the License.
17
 
 
18
 
import functools
19
 
import itertools
20
 
import json
21
 
import logging
22
 
import time
23
 
import uuid
24
 
 
25
 
import eventlet
26
 
import greenlet
27
 
import qpid.messaging
28
 
import qpid.messaging.exceptions
29
 
 
30
 
from nova.openstack.common import cfg
31
 
from nova.rpc import amqp as rpc_amqp
32
 
from nova.rpc import common as rpc_common
33
 
 
34
 
LOG = logging.getLogger(__name__)
35
 
 
36
 
qpid_opts = [
37
 
    cfg.StrOpt('qpid_hostname',
38
 
               default='localhost',
39
 
               help='Qpid broker hostname'),
40
 
    cfg.StrOpt('qpid_port',
41
 
               default='5672',
42
 
               help='Qpid broker port'),
43
 
    cfg.StrOpt('qpid_username',
44
 
               default='',
45
 
               help='Username for qpid connection'),
46
 
    cfg.StrOpt('qpid_password',
47
 
               default='',
48
 
               help='Password for qpid connection'),
49
 
    cfg.StrOpt('qpid_sasl_mechanisms',
50
 
               default='',
51
 
               help='Space separated list of SASL mechanisms to use for auth'),
52
 
    cfg.BoolOpt('qpid_reconnect',
53
 
                default=True,
54
 
                help='Automatically reconnect'),
55
 
    cfg.IntOpt('qpid_reconnect_timeout',
56
 
               default=0,
57
 
               help='Reconnection timeout in seconds'),
58
 
    cfg.IntOpt('qpid_reconnect_limit',
59
 
               default=0,
60
 
               help='Max reconnections before giving up'),
61
 
    cfg.IntOpt('qpid_reconnect_interval_min',
62
 
               default=0,
63
 
               help='Minimum seconds between reconnection attempts'),
64
 
    cfg.IntOpt('qpid_reconnect_interval_max',
65
 
               default=0,
66
 
               help='Maximum seconds between reconnection attempts'),
67
 
    cfg.IntOpt('qpid_reconnect_interval',
68
 
               default=0,
69
 
               help='Equivalent to setting max and min to the same value'),
70
 
    cfg.IntOpt('qpid_heartbeat',
71
 
               default=5,
72
 
               help='Seconds between connection keepalive heartbeats'),
73
 
    cfg.StrOpt('qpid_protocol',
74
 
               default='tcp',
75
 
               help="Transport to use, either 'tcp' or 'ssl'"),
76
 
    cfg.BoolOpt('qpid_tcp_nodelay',
77
 
                default=True,
78
 
                help='Disable Nagle algorithm'),
79
 
    ]
80
 
 
81
 
 
82
 
class ConsumerBase(object):
83
 
    """Consumer base class."""
84
 
 
85
 
    def __init__(self, session, callback, node_name, node_opts,
86
 
                 link_name, link_opts):
87
 
        """Declare a queue on an amqp session.
88
 
 
89
 
        'session' is the amqp session to use
90
 
        'callback' is the callback to call when messages are received
91
 
        'node_name' is the first part of the Qpid address string, before ';'
92
 
        'node_opts' will be applied to the "x-declare" section of "node"
93
 
                    in the address string.
94
 
        'link_name' goes into the "name" field of the "link" in the address
95
 
                    string
96
 
        'link_opts' will be applied to the "x-declare" section of "link"
97
 
                    in the address string.
98
 
        """
99
 
        self.callback = callback
100
 
        self.receiver = None
101
 
        self.session = None
102
 
 
103
 
        addr_opts = {
104
 
            "create": "always",
105
 
            "node": {
106
 
                "type": "topic",
107
 
                "x-declare": {
108
 
                    "durable": True,
109
 
                    "auto-delete": True,
110
 
                },
111
 
            },
112
 
            "link": {
113
 
                "name": link_name,
114
 
                "durable": True,
115
 
                "x-declare": {
116
 
                    "durable": False,
117
 
                    "auto-delete": True,
118
 
                    "exclusive": False,
119
 
                },
120
 
            },
121
 
        }
122
 
        addr_opts["node"]["x-declare"].update(node_opts)
123
 
        addr_opts["link"]["x-declare"].update(link_opts)
124
 
 
125
 
        self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
126
 
 
127
 
        self.reconnect(session)
128
 
 
129
 
    def reconnect(self, session):
130
 
        """Re-declare the receiver after a qpid reconnect"""
131
 
        self.session = session
132
 
        self.receiver = session.receiver(self.address)
133
 
        self.receiver.capacity = 1
134
 
 
135
 
    def consume(self):
136
 
        """Fetch the message and pass it to the callback object"""
137
 
        message = self.receiver.fetch()
138
 
        self.callback(message.content)
139
 
 
140
 
    def get_receiver(self):
141
 
        return self.receiver
142
 
 
143
 
 
144
 
class DirectConsumer(ConsumerBase):
145
 
    """Queue/consumer class for 'direct'"""
146
 
 
147
 
    def __init__(self, conf, session, msg_id, callback):
148
 
        """Init a 'direct' queue.
149
 
 
150
 
        'session' is the amqp session to use
151
 
        'msg_id' is the msg_id to listen on
152
 
        'callback' is the callback to call when messages are received
153
 
        """
154
 
 
155
 
        super(DirectConsumer, self).__init__(session, callback,
156
 
                        "%s/%s" % (msg_id, msg_id),
157
 
                        {"type": "direct"},
158
 
                        msg_id,
159
 
                        {"exclusive": True})
160
 
 
161
 
 
162
 
class TopicConsumer(ConsumerBase):
163
 
    """Consumer class for 'topic'"""
164
 
 
165
 
    def __init__(self, conf, session, topic, callback, name=None):
166
 
        """Init a 'topic' queue.
167
 
 
168
 
        :param session: the amqp session to use
169
 
        :param topic: is the topic to listen on
170
 
        :paramtype topic: str
171
 
        :param callback: the callback to call when messages are received
172
 
        :param name: optional queue name, defaults to topic
173
 
        """
174
 
 
175
 
        super(TopicConsumer, self).__init__(session, callback,
176
 
                        "%s/%s" % (conf.control_exchange, topic), {},
177
 
                        name or topic, {})
178
 
 
179
 
 
180
 
class FanoutConsumer(ConsumerBase):
181
 
    """Consumer class for 'fanout'"""
182
 
 
183
 
    def __init__(self, conf, session, topic, callback):
184
 
        """Init a 'fanout' queue.
185
 
 
186
 
        'session' is the amqp session to use
187
 
        'topic' is the topic to listen on
188
 
        'callback' is the callback to call when messages are received
189
 
        """
190
 
 
191
 
        super(FanoutConsumer, self).__init__(session, callback,
192
 
                        "%s_fanout" % topic,
193
 
                        {"durable": False, "type": "fanout"},
194
 
                        "%s_fanout_%s" % (topic, uuid.uuid4().hex),
195
 
                        {"exclusive": True})
196
 
 
197
 
 
198
 
class Publisher(object):
199
 
    """Base Publisher class"""
200
 
 
201
 
    def __init__(self, session, node_name, node_opts=None):
202
 
        """Init the Publisher class with the exchange_name, routing_key,
203
 
        and other options
204
 
        """
205
 
        self.sender = None
206
 
        self.session = session
207
 
 
208
 
        addr_opts = {
209
 
            "create": "always",
210
 
            "node": {
211
 
                "type": "topic",
212
 
                "x-declare": {
213
 
                    "durable": False,
214
 
                    # auto-delete isn't implemented for exchanges in qpid,
215
 
                    # but put in here anyway
216
 
                    "auto-delete": True,
217
 
                },
218
 
            },
219
 
        }
220
 
        if node_opts:
221
 
            addr_opts["node"]["x-declare"].update(node_opts)
222
 
 
223
 
        self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
224
 
 
225
 
        self.reconnect(session)
226
 
 
227
 
    def reconnect(self, session):
228
 
        """Re-establish the Sender after a reconnection"""
229
 
        self.sender = session.sender(self.address)
230
 
 
231
 
    def send(self, msg):
232
 
        """Send a message"""
233
 
        self.sender.send(msg)
234
 
 
235
 
 
236
 
class DirectPublisher(Publisher):
237
 
    """Publisher class for 'direct'"""
238
 
    def __init__(self, conf, session, msg_id):
239
 
        """Init a 'direct' publisher."""
240
 
        super(DirectPublisher, self).__init__(session, msg_id,
241
 
                                              {"type": "Direct"})
242
 
 
243
 
 
244
 
class TopicPublisher(Publisher):
245
 
    """Publisher class for 'topic'"""
246
 
    def __init__(self, conf, session, topic):
247
 
        """init a 'topic' publisher.
248
 
        """
249
 
        super(TopicPublisher, self).__init__(session,
250
 
                                "%s/%s" % (conf.control_exchange, topic))
251
 
 
252
 
 
253
 
class FanoutPublisher(Publisher):
254
 
    """Publisher class for 'fanout'"""
255
 
    def __init__(self, conf, session, topic):
256
 
        """init a 'fanout' publisher.
257
 
        """
258
 
        super(FanoutPublisher, self).__init__(session,
259
 
                                "%s_fanout" % topic, {"type": "fanout"})
260
 
 
261
 
 
262
 
class NotifyPublisher(Publisher):
263
 
    """Publisher class for notifications"""
264
 
    def __init__(self, conf, session, topic):
265
 
        """init a 'topic' publisher.
266
 
        """
267
 
        super(NotifyPublisher, self).__init__(session,
268
 
                                "%s/%s" % (conf.control_exchange, topic),
269
 
                                {"durable": True})
270
 
 
271
 
 
272
 
class Connection(object):
273
 
    """Connection object."""
274
 
 
275
 
    pool = None
276
 
 
277
 
    def __init__(self, conf, server_params=None):
278
 
        self.session = None
279
 
        self.consumers = {}
280
 
        self.consumer_thread = None
281
 
        self.conf = conf
282
 
 
283
 
        if server_params is None:
284
 
            server_params = {}
285
 
 
286
 
        default_params = dict(hostname=self.conf.qpid_hostname,
287
 
                port=self.conf.qpid_port,
288
 
                username=self.conf.qpid_username,
289
 
                password=self.conf.qpid_password)
290
 
 
291
 
        params = server_params
292
 
        for key in default_params.keys():
293
 
            params.setdefault(key, default_params[key])
294
 
 
295
 
        self.broker = params['hostname'] + ":" + str(params['port'])
296
 
        # Create the connection - this does not open the connection
297
 
        self.connection = qpid.messaging.Connection(self.broker)
298
 
 
299
 
        # Check if flags are set and if so set them for the connection
300
 
        # before we call open
301
 
        self.connection.username = params['username']
302
 
        self.connection.password = params['password']
303
 
        self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
304
 
        self.connection.reconnect = self.conf.qpid_reconnect
305
 
        if self.conf.qpid_reconnect_timeout:
306
 
            self.connection.reconnect_timeout = (
307
 
                    self.conf.qpid_reconnect_timeout)
308
 
        if self.conf.qpid_reconnect_limit:
309
 
            self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
310
 
        if self.conf.qpid_reconnect_interval_max:
311
 
            self.connection.reconnect_interval_max = (
312
 
                    self.conf.qpid_reconnect_interval_max)
313
 
        if self.conf.qpid_reconnect_interval_min:
314
 
            self.connection.reconnect_interval_min = (
315
 
                    self.conf.qpid_reconnect_interval_min)
316
 
        if self.conf.qpid_reconnect_interval:
317
 
            self.connection.reconnect_interval = (
318
 
                    self.conf.qpid_reconnect_interval)
319
 
        self.connection.hearbeat = self.conf.qpid_heartbeat
320
 
        self.connection.protocol = self.conf.qpid_protocol
321
 
        self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
322
 
 
323
 
        # Open is part of reconnect -
324
 
        # NOTE(WGH) not sure we need this with the reconnect flags
325
 
        self.reconnect()
326
 
 
327
 
    def _register_consumer(self, consumer):
328
 
        self.consumers[str(consumer.get_receiver())] = consumer
329
 
 
330
 
    def _lookup_consumer(self, receiver):
331
 
        return self.consumers[str(receiver)]
332
 
 
333
 
    def reconnect(self):
334
 
        """Handles reconnecting and re-establishing sessions and queues"""
335
 
        if self.connection.opened():
336
 
            try:
337
 
                self.connection.close()
338
 
            except qpid.messaging.exceptions.ConnectionError:
339
 
                pass
340
 
 
341
 
        while True:
342
 
            try:
343
 
                self.connection.open()
344
 
            except qpid.messaging.exceptions.ConnectionError, e:
345
 
                LOG.error(_('Unable to connect to AMQP server: %s'), e)
346
 
                time.sleep(self.conf.qpid_reconnect_interval or 1)
347
 
            else:
348
 
                break
349
 
 
350
 
        LOG.info(_('Connected to AMQP server on %s'), self.broker)
351
 
 
352
 
        self.session = self.connection.session()
353
 
 
354
 
        for consumer in self.consumers.itervalues():
355
 
            consumer.reconnect(self.session)
356
 
 
357
 
        if self.consumers:
358
 
            LOG.debug(_("Re-established AMQP queues"))
359
 
 
360
 
    def ensure(self, error_callback, method, *args, **kwargs):
361
 
        while True:
362
 
            try:
363
 
                return method(*args, **kwargs)
364
 
            except (qpid.messaging.exceptions.Empty,
365
 
                    qpid.messaging.exceptions.ConnectionError), e:
366
 
                if error_callback:
367
 
                    error_callback(e)
368
 
                self.reconnect()
369
 
 
370
 
    def close(self):
371
 
        """Close/release this connection"""
372
 
        self.cancel_consumer_thread()
373
 
        self.connection.close()
374
 
        self.connection = None
375
 
 
376
 
    def reset(self):
377
 
        """Reset a connection so it can be used again"""
378
 
        self.cancel_consumer_thread()
379
 
        self.session.close()
380
 
        self.session = self.connection.session()
381
 
        self.consumers = {}
382
 
 
383
 
    def declare_consumer(self, consumer_cls, topic, callback):
384
 
        """Create a Consumer using the class that was passed in and
385
 
        add it to our list of consumers
386
 
        """
387
 
        def _connect_error(exc):
388
 
            log_info = {'topic': topic, 'err_str': str(exc)}
389
 
            LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
390
 
                "%(err_str)s") % log_info)
391
 
 
392
 
        def _declare_consumer():
393
 
            consumer = consumer_cls(self.conf, self.session, topic, callback)
394
 
            self._register_consumer(consumer)
395
 
            return consumer
396
 
 
397
 
        return self.ensure(_connect_error, _declare_consumer)
398
 
 
399
 
    def iterconsume(self, limit=None, timeout=None):
400
 
        """Return an iterator that will consume from all queues/consumers"""
401
 
 
402
 
        def _error_callback(exc):
403
 
            if isinstance(exc, qpid.messaging.exceptions.Empty):
404
 
                LOG.exception(_('Timed out waiting for RPC response: %s') %
405
 
                        str(exc))
406
 
                raise rpc_common.Timeout()
407
 
            else:
408
 
                LOG.exception(_('Failed to consume message from queue: %s') %
409
 
                        str(exc))
410
 
 
411
 
        def _consume():
412
 
            nxt_receiver = self.session.next_receiver(timeout=timeout)
413
 
            try:
414
 
                self._lookup_consumer(nxt_receiver).consume()
415
 
            except Exception:
416
 
                LOG.exception(_("Error processing message.  Skipping it."))
417
 
 
418
 
        for iteration in itertools.count(0):
419
 
            if limit and iteration >= limit:
420
 
                raise StopIteration
421
 
            yield self.ensure(_error_callback, _consume)
422
 
 
423
 
    def cancel_consumer_thread(self):
424
 
        """Cancel a consumer thread"""
425
 
        if self.consumer_thread is not None:
426
 
            self.consumer_thread.kill()
427
 
            try:
428
 
                self.consumer_thread.wait()
429
 
            except greenlet.GreenletExit:
430
 
                pass
431
 
            self.consumer_thread = None
432
 
 
433
 
    def publisher_send(self, cls, topic, msg):
434
 
        """Send to a publisher based on the publisher class"""
435
 
 
436
 
        def _connect_error(exc):
437
 
            log_info = {'topic': topic, 'err_str': str(exc)}
438
 
            LOG.exception(_("Failed to publish message to topic "
439
 
                "'%(topic)s': %(err_str)s") % log_info)
440
 
 
441
 
        def _publisher_send():
442
 
            publisher = cls(self.conf, self.session, topic)
443
 
            publisher.send(msg)
444
 
 
445
 
        return self.ensure(_connect_error, _publisher_send)
446
 
 
447
 
    def declare_direct_consumer(self, topic, callback):
448
 
        """Create a 'direct' queue.
449
 
        In nova's use, this is generally a msg_id queue used for
450
 
        responses for call/multicall
451
 
        """
452
 
        self.declare_consumer(DirectConsumer, topic, callback)
453
 
 
454
 
    def declare_topic_consumer(self, topic, callback=None, queue_name=None):
455
 
        """Create a 'topic' consumer."""
456
 
        self.declare_consumer(functools.partial(TopicConsumer,
457
 
                                                name=queue_name,
458
 
                                                ),
459
 
                              topic, callback)
460
 
 
461
 
    def declare_fanout_consumer(self, topic, callback):
462
 
        """Create a 'fanout' consumer"""
463
 
        self.declare_consumer(FanoutConsumer, topic, callback)
464
 
 
465
 
    def direct_send(self, msg_id, msg):
466
 
        """Send a 'direct' message"""
467
 
        self.publisher_send(DirectPublisher, msg_id, msg)
468
 
 
469
 
    def topic_send(self, topic, msg):
470
 
        """Send a 'topic' message"""
471
 
        self.publisher_send(TopicPublisher, topic, msg)
472
 
 
473
 
    def fanout_send(self, topic, msg):
474
 
        """Send a 'fanout' message"""
475
 
        self.publisher_send(FanoutPublisher, topic, msg)
476
 
 
477
 
    def notify_send(self, topic, msg, **kwargs):
478
 
        """Send a notify message on a topic"""
479
 
        self.publisher_send(NotifyPublisher, topic, msg)
480
 
 
481
 
    def consume(self, limit=None):
482
 
        """Consume from all queues/consumers"""
483
 
        it = self.iterconsume(limit=limit)
484
 
        while True:
485
 
            try:
486
 
                it.next()
487
 
            except StopIteration:
488
 
                return
489
 
 
490
 
    def consume_in_thread(self):
491
 
        """Consumer from all queues/consumers in a greenthread"""
492
 
        def _consumer_thread():
493
 
            try:
494
 
                self.consume()
495
 
            except greenlet.GreenletExit:
496
 
                return
497
 
        if self.consumer_thread is None:
498
 
            self.consumer_thread = eventlet.spawn(_consumer_thread)
499
 
        return self.consumer_thread
500
 
 
501
 
    def create_consumer(self, topic, proxy, fanout=False):
502
 
        """Create a consumer that calls a method in a proxy object"""
503
 
        proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
504
 
                rpc_amqp.get_connection_pool(self.conf, Connection))
505
 
 
506
 
        if fanout:
507
 
            consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
508
 
        else:
509
 
            consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb)
510
 
 
511
 
        self._register_consumer(consumer)
512
 
 
513
 
        return consumer
514
 
 
515
 
    def create_worker(self, topic, proxy, pool_name):
516
 
        """Create a worker that calls a method in a proxy object"""
517
 
        proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
518
 
                rpc_amqp.get_connection_pool(self.conf, Connection))
519
 
 
520
 
        consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
521
 
                                 name=pool_name)
522
 
 
523
 
        self._register_consumer(consumer)
524
 
 
525
 
        return consumer
526
 
 
527
 
 
528
 
def create_connection(conf, new=True):
529
 
    """Create a connection"""
530
 
    return rpc_amqp.create_connection(conf, new,
531
 
            rpc_amqp.get_connection_pool(conf, Connection))
532
 
 
533
 
 
534
 
def multicall(conf, context, topic, msg, timeout=None):
535
 
    """Make a call that returns multiple times."""
536
 
    return rpc_amqp.multicall(conf, context, topic, msg, timeout,
537
 
            rpc_amqp.get_connection_pool(conf, Connection))
538
 
 
539
 
 
540
 
def call(conf, context, topic, msg, timeout=None):
541
 
    """Sends a message on a topic and wait for a response."""
542
 
    return rpc_amqp.call(conf, context, topic, msg, timeout,
543
 
            rpc_amqp.get_connection_pool(conf, Connection))
544
 
 
545
 
 
546
 
def cast(conf, context, topic, msg):
547
 
    """Sends a message on a topic without waiting for a response."""
548
 
    return rpc_amqp.cast(conf, context, topic, msg,
549
 
            rpc_amqp.get_connection_pool(conf, Connection))
550
 
 
551
 
 
552
 
def fanout_cast(conf, context, topic, msg):
553
 
    """Sends a message on a fanout exchange without waiting for a response."""
554
 
    return rpc_amqp.fanout_cast(conf, context, topic, msg,
555
 
            rpc_amqp.get_connection_pool(conf, Connection))
556
 
 
557
 
 
558
 
def cast_to_server(conf, context, server_params, topic, msg):
559
 
    """Sends a message on a topic to a specific server."""
560
 
    return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
561
 
            rpc_amqp.get_connection_pool(conf, Connection))
562
 
 
563
 
 
564
 
def fanout_cast_to_server(conf, context, server_params, topic, msg):
565
 
    """Sends a message on a fanout exchange to a specific server."""
566
 
    return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic,
567
 
            msg, rpc_amqp.get_connection_pool(conf, Connection))
568
 
 
569
 
 
570
 
def notify(conf, context, topic, msg):
571
 
    """Sends a notification event on a topic."""
572
 
    return rpc_amqp.notify(conf, context, topic, msg,
573
 
            rpc_amqp.get_connection_pool(conf, Connection))
574
 
 
575
 
 
576
 
def cleanup():
577
 
    return rpc_amqp.cleanup(Connection.pool)
578
 
 
579
 
 
580
 
def register_opts(conf):
581
 
    conf.register_opts(qpid_opts)