~ubuntu-cloud-archive/ubuntu/precise/nova/trunk

« back to all changes in this revision

Viewing changes to nova/openstack/common/rpc/impl_kombu.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Adam Gandelman
  • Date: 2012-06-22 12:39:57 UTC
  • mfrom: (1.1.57)
  • Revision ID: package-import@ubuntu.com-20120622123957-hbzwg84nt9rqwg8r
Tags: 2012.2~f2~20120621.14517-0ubuntu1
[ Chuck Short ]
* New upstream version.

[ Adam Gandelman ]
* debian/rules: Temporarily disable test suite while blocking
  tests are investigated. 
* debian/patches/kombu_tests_timeout.patch: Dropped.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
#    Copyright 2011 OpenStack LLC
 
4
#
 
5
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
6
#    not use this file except in compliance with the License. You may obtain
 
7
#    a copy of the License at
 
8
#
 
9
#         http://www.apache.org/licenses/LICENSE-2.0
 
10
#
 
11
#    Unless required by applicable law or agreed to in writing, software
 
12
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
13
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
14
#    License for the specific language governing permissions and limitations
 
15
#    under the License.
 
16
 
 
17
import functools
 
18
import itertools
 
19
import socket
 
20
import ssl
 
21
import sys
 
22
import time
 
23
import uuid
 
24
 
 
25
import eventlet
 
26
import greenlet
 
27
import kombu
 
28
import kombu.connection
 
29
import kombu.entity
 
30
import kombu.messaging
 
31
 
 
32
from nova.openstack.common import cfg
 
33
from nova.openstack.common.rpc import amqp as rpc_amqp
 
34
from nova.openstack.common.rpc import common as rpc_common
 
35
 
 
36
kombu_opts = [
 
37
    cfg.StrOpt('kombu_ssl_version',
 
38
               default='',
 
39
               help='SSL version to use (valid only if SSL enabled)'),
 
40
    cfg.StrOpt('kombu_ssl_keyfile',
 
41
               default='',
 
42
               help='SSL key file (valid only if SSL enabled)'),
 
43
    cfg.StrOpt('kombu_ssl_certfile',
 
44
               default='',
 
45
               help='SSL cert file (valid only if SSL enabled)'),
 
46
    cfg.StrOpt('kombu_ssl_ca_certs',
 
47
               default='',
 
48
               help=('SSL certification authority file '
 
49
                    '(valid only if SSL enabled)')),
 
50
    cfg.StrOpt('rabbit_host',
 
51
               default='localhost',
 
52
               help='the RabbitMQ host'),
 
53
    cfg.IntOpt('rabbit_port',
 
54
               default=5672,
 
55
               help='the RabbitMQ port'),
 
56
    cfg.BoolOpt('rabbit_use_ssl',
 
57
                default=False,
 
58
                help='connect over SSL for RabbitMQ'),
 
59
    cfg.StrOpt('rabbit_userid',
 
60
               default='guest',
 
61
               help='the RabbitMQ userid'),
 
62
    cfg.StrOpt('rabbit_password',
 
63
               default='guest',
 
64
               help='the RabbitMQ password'),
 
65
    cfg.StrOpt('rabbit_virtual_host',
 
66
               default='/',
 
67
               help='the RabbitMQ virtual host'),
 
68
    cfg.IntOpt('rabbit_retry_interval',
 
69
               default=1,
 
70
               help='how frequently to retry connecting with RabbitMQ'),
 
71
    cfg.IntOpt('rabbit_retry_backoff',
 
72
               default=2,
 
73
               help='how long to backoff for between retries when connecting '
 
74
                    'to RabbitMQ'),
 
75
    cfg.IntOpt('rabbit_max_retries',
 
76
               default=0,
 
77
               help='maximum retries with trying to connect to RabbitMQ '
 
78
                    '(the default of 0 implies an infinite retry count)'),
 
79
    cfg.BoolOpt('rabbit_durable_queues',
 
80
                default=False,
 
81
                help='use durable queues in RabbitMQ'),
 
82
 
 
83
    ]
 
84
 
 
85
cfg.CONF.register_opts(kombu_opts)
 
86
 
 
87
LOG = rpc_common.LOG
 
88
 
 
89
 
 
90
class ConsumerBase(object):
 
91
    """Consumer base class."""
 
92
 
 
93
    def __init__(self, channel, callback, tag, **kwargs):
 
94
        """Declare a queue on an amqp channel.
 
95
 
 
96
        'channel' is the amqp channel to use
 
97
        'callback' is the callback to call when messages are received
 
98
        'tag' is a unique ID for the consumer on the channel
 
99
 
 
100
        queue name, exchange name, and other kombu options are
 
101
        passed in here as a dictionary.
 
102
        """
 
103
        self.callback = callback
 
104
        self.tag = str(tag)
 
105
        self.kwargs = kwargs
 
106
        self.queue = None
 
107
        self.reconnect(channel)
 
108
 
 
109
    def reconnect(self, channel):
 
110
        """Re-declare the queue after a rabbit reconnect"""
 
111
        self.channel = channel
 
112
        self.kwargs['channel'] = channel
 
113
        self.queue = kombu.entity.Queue(**self.kwargs)
 
114
        self.queue.declare()
 
115
 
 
116
    def consume(self, *args, **kwargs):
 
117
        """Actually declare the consumer on the amqp channel.  This will
 
118
        start the flow of messages from the queue.  Using the
 
119
        Connection.iterconsume() iterator will process the messages,
 
120
        calling the appropriate callback.
 
121
 
 
122
        If a callback is specified in kwargs, use that.  Otherwise,
 
123
        use the callback passed during __init__()
 
124
 
 
125
        If kwargs['nowait'] is True, then this call will block until
 
126
        a message is read.
 
127
 
 
128
        Messages will automatically be acked if the callback doesn't
 
129
        raise an exception
 
130
        """
 
131
 
 
132
        options = {'consumer_tag': self.tag}
 
133
        options['nowait'] = kwargs.get('nowait', False)
 
134
        callback = kwargs.get('callback', self.callback)
 
135
        if not callback:
 
136
            raise ValueError("No callback defined")
 
137
 
 
138
        def _callback(raw_message):
 
139
            message = self.channel.message_to_python(raw_message)
 
140
            try:
 
141
                callback(message.payload)
 
142
                message.ack()
 
143
            except Exception:
 
144
                LOG.exception(_("Failed to process message... skipping it."))
 
145
 
 
146
        self.queue.consume(*args, callback=_callback, **options)
 
147
 
 
148
    def cancel(self):
 
149
        """Cancel the consuming from the queue, if it has started"""
 
150
        try:
 
151
            self.queue.cancel(self.tag)
 
152
        except KeyError, e:
 
153
            # NOTE(comstud): Kludge to get around a amqplib bug
 
154
            if str(e) != "u'%s'" % self.tag:
 
155
                raise
 
156
        self.queue = None
 
157
 
 
158
 
 
159
class DirectConsumer(ConsumerBase):
 
160
    """Queue/consumer class for 'direct'"""
 
161
 
 
162
    def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
 
163
        """Init a 'direct' queue.
 
164
 
 
165
        'channel' is the amqp channel to use
 
166
        'msg_id' is the msg_id to listen on
 
167
        'callback' is the callback to call when messages are received
 
168
        'tag' is a unique ID for the consumer on the channel
 
169
 
 
170
        Other kombu options may be passed
 
171
        """
 
172
        # Default options
 
173
        options = {'durable': False,
 
174
                'auto_delete': True,
 
175
                'exclusive': True}
 
176
        options.update(kwargs)
 
177
        exchange = kombu.entity.Exchange(
 
178
                name=msg_id,
 
179
                type='direct',
 
180
                durable=options['durable'],
 
181
                auto_delete=options['auto_delete'])
 
182
        super(DirectConsumer, self).__init__(
 
183
                channel,
 
184
                callback,
 
185
                tag,
 
186
                name=msg_id,
 
187
                exchange=exchange,
 
188
                routing_key=msg_id,
 
189
                **options)
 
190
 
 
191
 
 
192
class TopicConsumer(ConsumerBase):
 
193
    """Consumer class for 'topic'"""
 
194
 
 
195
    def __init__(self, conf, channel, topic, callback, tag, name=None,
 
196
                 **kwargs):
 
197
        """Init a 'topic' queue.
 
198
 
 
199
        :param channel: the amqp channel to use
 
200
        :param topic: the topic to listen on
 
201
        :paramtype topic: str
 
202
        :param callback: the callback to call when messages are received
 
203
        :param tag: a unique ID for the consumer on the channel
 
204
        :param name: optional queue name, defaults to topic
 
205
        :paramtype name: str
 
206
 
 
207
        Other kombu options may be passed as keyword arguments
 
208
        """
 
209
        # Default options
 
210
        options = {'durable': conf.rabbit_durable_queues,
 
211
                'auto_delete': False,
 
212
                'exclusive': False}
 
213
        options.update(kwargs)
 
214
        exchange = kombu.entity.Exchange(
 
215
                name=conf.control_exchange,
 
216
                type='topic',
 
217
                durable=options['durable'],
 
218
                auto_delete=options['auto_delete'])
 
219
        super(TopicConsumer, self).__init__(
 
220
                channel,
 
221
                callback,
 
222
                tag,
 
223
                name=name or topic,
 
224
                exchange=exchange,
 
225
                routing_key=topic,
 
226
                **options)
 
227
 
 
228
 
 
229
class FanoutConsumer(ConsumerBase):
 
230
    """Consumer class for 'fanout'"""
 
231
 
 
232
    def __init__(self, conf, channel, topic, callback, tag, **kwargs):
 
233
        """Init a 'fanout' queue.
 
234
 
 
235
        'channel' is the amqp channel to use
 
236
        'topic' is the topic to listen on
 
237
        'callback' is the callback to call when messages are received
 
238
        'tag' is a unique ID for the consumer on the channel
 
239
 
 
240
        Other kombu options may be passed
 
241
        """
 
242
        unique = uuid.uuid4().hex
 
243
        exchange_name = '%s_fanout' % topic
 
244
        queue_name = '%s_fanout_%s' % (topic, unique)
 
245
 
 
246
        # Default options
 
247
        options = {'durable': False,
 
248
                'auto_delete': True,
 
249
                'exclusive': True}
 
250
        options.update(kwargs)
 
251
        exchange = kombu.entity.Exchange(
 
252
                name=exchange_name,
 
253
                type='fanout',
 
254
                durable=options['durable'],
 
255
                auto_delete=options['auto_delete'])
 
256
        super(FanoutConsumer, self).__init__(
 
257
                channel,
 
258
                callback,
 
259
                tag,
 
260
                name=queue_name,
 
261
                exchange=exchange,
 
262
                routing_key=topic,
 
263
                **options)
 
264
 
 
265
 
 
266
class Publisher(object):
 
267
    """Base Publisher class"""
 
268
 
 
269
    def __init__(self, channel, exchange_name, routing_key, **kwargs):
 
270
        """Init the Publisher class with the exchange_name, routing_key,
 
271
        and other options
 
272
        """
 
273
        self.exchange_name = exchange_name
 
274
        self.routing_key = routing_key
 
275
        self.kwargs = kwargs
 
276
        self.reconnect(channel)
 
277
 
 
278
    def reconnect(self, channel):
 
279
        """Re-establish the Producer after a rabbit reconnection"""
 
280
        self.exchange = kombu.entity.Exchange(name=self.exchange_name,
 
281
                **self.kwargs)
 
282
        self.producer = kombu.messaging.Producer(exchange=self.exchange,
 
283
                channel=channel, routing_key=self.routing_key)
 
284
 
 
285
    def send(self, msg):
 
286
        """Send a message"""
 
287
        self.producer.publish(msg)
 
288
 
 
289
 
 
290
class DirectPublisher(Publisher):
 
291
    """Publisher class for 'direct'"""
 
292
    def __init__(self, conf, channel, msg_id, **kwargs):
 
293
        """init a 'direct' publisher.
 
294
 
 
295
        Kombu options may be passed as keyword args to override defaults
 
296
        """
 
297
 
 
298
        options = {'durable': False,
 
299
                'auto_delete': True,
 
300
                'exclusive': True}
 
301
        options.update(kwargs)
 
302
        super(DirectPublisher, self).__init__(channel,
 
303
                msg_id,
 
304
                msg_id,
 
305
                type='direct',
 
306
                **options)
 
307
 
 
308
 
 
309
class TopicPublisher(Publisher):
 
310
    """Publisher class for 'topic'"""
 
311
    def __init__(self, conf, channel, topic, **kwargs):
 
312
        """init a 'topic' publisher.
 
313
 
 
314
        Kombu options may be passed as keyword args to override defaults
 
315
        """
 
316
        options = {'durable': conf.rabbit_durable_queues,
 
317
                'auto_delete': False,
 
318
                'exclusive': False}
 
319
        options.update(kwargs)
 
320
        super(TopicPublisher, self).__init__(channel,
 
321
                conf.control_exchange,
 
322
                topic,
 
323
                type='topic',
 
324
                **options)
 
325
 
 
326
 
 
327
class FanoutPublisher(Publisher):
 
328
    """Publisher class for 'fanout'"""
 
329
    def __init__(self, conf, channel, topic, **kwargs):
 
330
        """init a 'fanout' publisher.
 
331
 
 
332
        Kombu options may be passed as keyword args to override defaults
 
333
        """
 
334
        options = {'durable': False,
 
335
                'auto_delete': True,
 
336
                'exclusive': True}
 
337
        options.update(kwargs)
 
338
        super(FanoutPublisher, self).__init__(channel,
 
339
                '%s_fanout' % topic,
 
340
                None,
 
341
                type='fanout',
 
342
                **options)
 
343
 
 
344
 
 
345
class NotifyPublisher(TopicPublisher):
 
346
    """Publisher class for 'notify'"""
 
347
 
 
348
    def __init__(self, conf, channel, topic, **kwargs):
 
349
        self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
 
350
        super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
 
351
 
 
352
    def reconnect(self, channel):
 
353
        super(NotifyPublisher, self).reconnect(channel)
 
354
 
 
355
        # NOTE(jerdfelt): Normally the consumer would create the queue, but
 
356
        # we do this to ensure that messages don't get dropped if the
 
357
        # consumer is started after we do
 
358
        queue = kombu.entity.Queue(channel=channel,
 
359
                exchange=self.exchange,
 
360
                durable=self.durable,
 
361
                name=self.routing_key,
 
362
                routing_key=self.routing_key)
 
363
        queue.declare()
 
364
 
 
365
 
 
366
class Connection(object):
 
367
    """Connection object."""
 
368
 
 
369
    pool = None
 
370
 
 
371
    def __init__(self, conf, server_params=None):
 
372
        self.consumers = []
 
373
        self.consumer_thread = None
 
374
        self.conf = conf
 
375
        self.max_retries = self.conf.rabbit_max_retries
 
376
        # Try forever?
 
377
        if self.max_retries <= 0:
 
378
            self.max_retries = None
 
379
        self.interval_start = self.conf.rabbit_retry_interval
 
380
        self.interval_stepping = self.conf.rabbit_retry_backoff
 
381
        # max retry-interval = 30 seconds
 
382
        self.interval_max = 30
 
383
        self.memory_transport = False
 
384
 
 
385
        if server_params is None:
 
386
            server_params = {}
 
387
 
 
388
        # Keys to translate from server_params to kombu params
 
389
        server_params_to_kombu_params = {'username': 'userid'}
 
390
 
 
391
        params = {}
 
392
        for sp_key, value in server_params.iteritems():
 
393
            p_key = server_params_to_kombu_params.get(sp_key, sp_key)
 
394
            params[p_key] = value
 
395
 
 
396
        params.setdefault('hostname', self.conf.rabbit_host)
 
397
        params.setdefault('port', self.conf.rabbit_port)
 
398
        params.setdefault('userid', self.conf.rabbit_userid)
 
399
        params.setdefault('password', self.conf.rabbit_password)
 
400
        params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
 
401
 
 
402
        self.params = params
 
403
 
 
404
        if self.conf.fake_rabbit:
 
405
            self.params['transport'] = 'memory'
 
406
            self.memory_transport = True
 
407
        else:
 
408
            self.memory_transport = False
 
409
 
 
410
        if self.conf.rabbit_use_ssl:
 
411
            self.params['ssl'] = self._fetch_ssl_params()
 
412
 
 
413
        self.connection = None
 
414
        self.reconnect()
 
415
 
 
416
    def _fetch_ssl_params(self):
 
417
        """Handles fetching what ssl params
 
418
        should be used for the connection (if any)"""
 
419
        ssl_params = dict()
 
420
 
 
421
        # http://docs.python.org/library/ssl.html - ssl.wrap_socket
 
422
        if self.conf.kombu_ssl_version:
 
423
            ssl_params['ssl_version'] = self.conf.kombu_ssl_version
 
424
        if self.conf.kombu_ssl_keyfile:
 
425
            ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
 
426
        if self.conf.kombu_ssl_certfile:
 
427
            ssl_params['certfile'] = self.conf.kombu_ssl_certfile
 
428
        if self.conf.kombu_ssl_ca_certs:
 
429
            ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
 
430
            # We might want to allow variations in the
 
431
            # future with this?
 
432
            ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
 
433
 
 
434
        if not ssl_params:
 
435
            # Just have the default behavior
 
436
            return True
 
437
        else:
 
438
            # Return the extended behavior
 
439
            return ssl_params
 
440
 
 
441
    def _connect(self):
 
442
        """Connect to rabbit.  Re-establish any queues that may have
 
443
        been declared before if we are reconnecting.  Exceptions should
 
444
        be handled by the caller.
 
445
        """
 
446
        if self.connection:
 
447
            LOG.info(_("Reconnecting to AMQP server on "
 
448
                    "%(hostname)s:%(port)d") % self.params)
 
449
            try:
 
450
                self.connection.close()
 
451
            except self.connection_errors:
 
452
                pass
 
453
            # Setting this in case the next statement fails, though
 
454
            # it shouldn't be doing any network operations, yet.
 
455
            self.connection = None
 
456
        self.connection = kombu.connection.BrokerConnection(
 
457
                **self.params)
 
458
        self.connection_errors = self.connection.connection_errors
 
459
        if self.memory_transport:
 
460
            # Kludge to speed up tests.
 
461
            self.connection.transport.polling_interval = 0.0
 
462
        self.consumer_num = itertools.count(1)
 
463
        self.connection.connect()
 
464
        self.channel = self.connection.channel()
 
465
        # work around 'memory' transport bug in 1.1.3
 
466
        if self.memory_transport:
 
467
            self.channel._new_queue('ae.undeliver')
 
468
        for consumer in self.consumers:
 
469
            consumer.reconnect(self.channel)
 
470
        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
 
471
                 self.params)
 
472
 
 
473
    def reconnect(self):
 
474
        """Handles reconnecting and re-establishing queues.
 
475
        Will retry up to self.max_retries number of times.
 
476
        self.max_retries = 0 means to retry forever.
 
477
        Sleep between tries, starting at self.interval_start
 
478
        seconds, backing off self.interval_stepping number of seconds
 
479
        each attempt.
 
480
        """
 
481
 
 
482
        attempt = 0
 
483
        while True:
 
484
            attempt += 1
 
485
            try:
 
486
                self._connect()
 
487
                return
 
488
            except (self.connection_errors, IOError), e:
 
489
                pass
 
490
            except Exception, e:
 
491
                # NOTE(comstud): Unfortunately it's possible for amqplib
 
492
                # to return an error not covered by its transport
 
493
                # connection_errors in the case of a timeout waiting for
 
494
                # a protocol response.  (See paste link in LP888621)
 
495
                # So, we check all exceptions for 'timeout' in them
 
496
                # and try to reconnect in this case.
 
497
                if 'timeout' not in str(e):
 
498
                    raise
 
499
 
 
500
            log_info = {}
 
501
            log_info['err_str'] = str(e)
 
502
            log_info['max_retries'] = self.max_retries
 
503
            log_info.update(self.params)
 
504
 
 
505
            if self.max_retries and attempt == self.max_retries:
 
506
                LOG.exception(_('Unable to connect to AMQP server on '
 
507
                        '%(hostname)s:%(port)d after %(max_retries)d '
 
508
                        'tries: %(err_str)s') % log_info)
 
509
                # NOTE(comstud): Copied from original code.  There's
 
510
                # really no better recourse because if this was a queue we
 
511
                # need to consume on, we have no way to consume anymore.
 
512
                sys.exit(1)
 
513
 
 
514
            if attempt == 1:
 
515
                sleep_time = self.interval_start or 1
 
516
            elif attempt > 1:
 
517
                sleep_time += self.interval_stepping
 
518
            if self.interval_max:
 
519
                sleep_time = min(sleep_time, self.interval_max)
 
520
 
 
521
            log_info['sleep_time'] = sleep_time
 
522
            LOG.warn(_('AMQP server on %(hostname)s:%(port)d is'
 
523
                    ' unreachable: %(err_str)s. Trying again in '
 
524
                    '%(sleep_time)d seconds.') % log_info)
 
525
            time.sleep(sleep_time)
 
526
 
 
527
    def ensure(self, error_callback, method, *args, **kwargs):
 
528
        while True:
 
529
            try:
 
530
                return method(*args, **kwargs)
 
531
            except (self.connection_errors, socket.timeout, IOError), e:
 
532
                pass
 
533
            except Exception, e:
 
534
                # NOTE(comstud): Unfortunately it's possible for amqplib
 
535
                # to return an error not covered by its transport
 
536
                # connection_errors in the case of a timeout waiting for
 
537
                # a protocol response.  (See paste link in LP888621)
 
538
                # So, we check all exceptions for 'timeout' in them
 
539
                # and try to reconnect in this case.
 
540
                if 'timeout' not in str(e):
 
541
                    raise
 
542
            if error_callback:
 
543
                error_callback(e)
 
544
            self.reconnect()
 
545
 
 
546
    def get_channel(self):
 
547
        """Convenience call for bin/clear_rabbit_queues"""
 
548
        return self.channel
 
549
 
 
550
    def close(self):
 
551
        """Close/release this connection"""
 
552
        self.cancel_consumer_thread()
 
553
        self.connection.release()
 
554
        self.connection = None
 
555
 
 
556
    def reset(self):
 
557
        """Reset a connection so it can be used again"""
 
558
        self.cancel_consumer_thread()
 
559
        self.channel.close()
 
560
        self.channel = self.connection.channel()
 
561
        # work around 'memory' transport bug in 1.1.3
 
562
        if self.memory_transport:
 
563
            self.channel._new_queue('ae.undeliver')
 
564
        self.consumers = []
 
565
 
 
566
    def declare_consumer(self, consumer_cls, topic, callback):
 
567
        """Create a Consumer using the class that was passed in and
 
568
        add it to our list of consumers
 
569
        """
 
570
 
 
571
        def _connect_error(exc):
 
572
            log_info = {'topic': topic, 'err_str': str(exc)}
 
573
            LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
 
574
                "%(err_str)s") % log_info)
 
575
 
 
576
        def _declare_consumer():
 
577
            consumer = consumer_cls(self.conf, self.channel, topic, callback,
 
578
                    self.consumer_num.next())
 
579
            self.consumers.append(consumer)
 
580
            return consumer
 
581
 
 
582
        return self.ensure(_connect_error, _declare_consumer)
 
583
 
 
584
    def iterconsume(self, limit=None, timeout=None):
 
585
        """Return an iterator that will consume from all queues/consumers"""
 
586
 
 
587
        info = {'do_consume': True}
 
588
 
 
589
        def _error_callback(exc):
 
590
            if isinstance(exc, socket.timeout):
 
591
                LOG.exception(_('Timed out waiting for RPC response: %s') %
 
592
                        str(exc))
 
593
                raise rpc_common.Timeout()
 
594
            else:
 
595
                LOG.exception(_('Failed to consume message from queue: %s') %
 
596
                        str(exc))
 
597
                info['do_consume'] = True
 
598
 
 
599
        def _consume():
 
600
            if info['do_consume']:
 
601
                queues_head = self.consumers[:-1]
 
602
                queues_tail = self.consumers[-1]
 
603
                for queue in queues_head:
 
604
                    queue.consume(nowait=True)
 
605
                queues_tail.consume(nowait=False)
 
606
                info['do_consume'] = False
 
607
            return self.connection.drain_events(timeout=timeout)
 
608
 
 
609
        for iteration in itertools.count(0):
 
610
            if limit and iteration >= limit:
 
611
                raise StopIteration
 
612
            yield self.ensure(_error_callback, _consume)
 
613
 
 
614
    def cancel_consumer_thread(self):
 
615
        """Cancel a consumer thread"""
 
616
        if self.consumer_thread is not None:
 
617
            self.consumer_thread.kill()
 
618
            try:
 
619
                self.consumer_thread.wait()
 
620
            except greenlet.GreenletExit:
 
621
                pass
 
622
            self.consumer_thread = None
 
623
 
 
624
    def publisher_send(self, cls, topic, msg, **kwargs):
 
625
        """Send to a publisher based on the publisher class"""
 
626
 
 
627
        def _error_callback(exc):
 
628
            log_info = {'topic': topic, 'err_str': str(exc)}
 
629
            LOG.exception(_("Failed to publish message to topic "
 
630
                "'%(topic)s': %(err_str)s") % log_info)
 
631
 
 
632
        def _publish():
 
633
            publisher = cls(self.conf, self.channel, topic, **kwargs)
 
634
            publisher.send(msg)
 
635
 
 
636
        self.ensure(_error_callback, _publish)
 
637
 
 
638
    def declare_direct_consumer(self, topic, callback):
 
639
        """Create a 'direct' queue.
 
640
        In nova's use, this is generally a msg_id queue used for
 
641
        responses for call/multicall
 
642
        """
 
643
        self.declare_consumer(DirectConsumer, topic, callback)
 
644
 
 
645
    def declare_topic_consumer(self, topic, callback=None, queue_name=None):
 
646
        """Create a 'topic' consumer."""
 
647
        self.declare_consumer(functools.partial(TopicConsumer,
 
648
                                                name=queue_name,
 
649
                                                ),
 
650
                              topic, callback)
 
651
 
 
652
    def declare_fanout_consumer(self, topic, callback):
 
653
        """Create a 'fanout' consumer"""
 
654
        self.declare_consumer(FanoutConsumer, topic, callback)
 
655
 
 
656
    def direct_send(self, msg_id, msg):
 
657
        """Send a 'direct' message"""
 
658
        self.publisher_send(DirectPublisher, msg_id, msg)
 
659
 
 
660
    def topic_send(self, topic, msg):
 
661
        """Send a 'topic' message"""
 
662
        self.publisher_send(TopicPublisher, topic, msg)
 
663
 
 
664
    def fanout_send(self, topic, msg):
 
665
        """Send a 'fanout' message"""
 
666
        self.publisher_send(FanoutPublisher, topic, msg)
 
667
 
 
668
    def notify_send(self, topic, msg, **kwargs):
 
669
        """Send a notify message on a topic"""
 
670
        self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
 
671
 
 
672
    def consume(self, limit=None):
 
673
        """Consume from all queues/consumers"""
 
674
        it = self.iterconsume(limit=limit)
 
675
        while True:
 
676
            try:
 
677
                it.next()
 
678
            except StopIteration:
 
679
                return
 
680
 
 
681
    def consume_in_thread(self):
 
682
        """Consumer from all queues/consumers in a greenthread"""
 
683
        def _consumer_thread():
 
684
            try:
 
685
                self.consume()
 
686
            except greenlet.GreenletExit:
 
687
                return
 
688
        if self.consumer_thread is None:
 
689
            self.consumer_thread = eventlet.spawn(_consumer_thread)
 
690
        return self.consumer_thread
 
691
 
 
692
    def create_consumer(self, topic, proxy, fanout=False):
 
693
        """Create a consumer that calls a method in a proxy object"""
 
694
        proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
 
695
                rpc_amqp.get_connection_pool(self.conf, Connection))
 
696
 
 
697
        if fanout:
 
698
            self.declare_fanout_consumer(topic, proxy_cb)
 
699
        else:
 
700
            self.declare_topic_consumer(topic, proxy_cb)
 
701
 
 
702
    def create_worker(self, topic, proxy, pool_name):
 
703
        """Create a worker that calls a method in a proxy object"""
 
704
        proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
 
705
                rpc_amqp.get_connection_pool(self.conf, Connection))
 
706
        self.declare_topic_consumer(topic, proxy_cb, pool_name)
 
707
 
 
708
 
 
709
def create_connection(conf, new=True):
 
710
    """Create a connection"""
 
711
    return rpc_amqp.create_connection(conf, new,
 
712
            rpc_amqp.get_connection_pool(conf, Connection))
 
713
 
 
714
 
 
715
def multicall(conf, context, topic, msg, timeout=None):
 
716
    """Make a call that returns multiple times."""
 
717
    return rpc_amqp.multicall(conf, context, topic, msg, timeout,
 
718
            rpc_amqp.get_connection_pool(conf, Connection))
 
719
 
 
720
 
 
721
def call(conf, context, topic, msg, timeout=None):
 
722
    """Sends a message on a topic and wait for a response."""
 
723
    return rpc_amqp.call(conf, context, topic, msg, timeout,
 
724
            rpc_amqp.get_connection_pool(conf, Connection))
 
725
 
 
726
 
 
727
def cast(conf, context, topic, msg):
 
728
    """Sends a message on a topic without waiting for a response."""
 
729
    return rpc_amqp.cast(conf, context, topic, msg,
 
730
            rpc_amqp.get_connection_pool(conf, Connection))
 
731
 
 
732
 
 
733
def fanout_cast(conf, context, topic, msg):
 
734
    """Sends a message on a fanout exchange without waiting for a response."""
 
735
    return rpc_amqp.fanout_cast(conf, context, topic, msg,
 
736
            rpc_amqp.get_connection_pool(conf, Connection))
 
737
 
 
738
 
 
739
def cast_to_server(conf, context, server_params, topic, msg):
 
740
    """Sends a message on a topic to a specific server."""
 
741
    return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
 
742
            rpc_amqp.get_connection_pool(conf, Connection))
 
743
 
 
744
 
 
745
def fanout_cast_to_server(conf, context, server_params, topic, msg):
 
746
    """Sends a message on a fanout exchange to a specific server."""
 
747
    return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
 
748
            rpc_amqp.get_connection_pool(conf, Connection))
 
749
 
 
750
 
 
751
def notify(conf, context, topic, msg):
 
752
    """Sends a notification event on a topic."""
 
753
    return rpc_amqp.notify(conf, context, topic, msg,
 
754
            rpc_amqp.get_connection_pool(conf, Connection))
 
755
 
 
756
 
 
757
def cleanup():
 
758
    return rpc_amqp.cleanup(Connection.pool)