~ubuntu-branches/ubuntu/trusty/heat/trusty-security

« back to all changes in this revision

Viewing changes to heat/openstack/common/rpc/impl_qpid.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Yolanda Robla, Chuck Short
  • Date: 2013-07-22 16:22:29 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20130722162229-zzvfu40id94ii0hc
Tags: 2013.2~b2-0ubuntu1
[ Yolanda Robla ]
* debian/tests: added autopkg tests

[ Chuck Short ]
* New upstream release
* debian/control:
  - Add python-pbr to build-depends.
  - Add python-d2to to build-depends.
  - Dropped python-argparse.
  - Add python-six to build-depends.
  - Dropped python-sendfile.
  - Dropped python-nose.
  - Added testrepository.
  - Added python-testtools.
* debian/rules: Run testrepository instead of nosetets.
* debian/patches/removes-lxml-version-limitation-from-pip-requires.patch: Dropped
  no longer needed.
* debian/patches/fix-package-version-detection-when-building-doc.patch: Dropped
  no longer needed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
2
 
3
 
#    Copyright 2011 OpenStack LLC
 
3
#    Copyright 2011 OpenStack Foundation
4
4
#    Copyright 2011 - 2012, Red Hat, Inc.
5
5
#
6
6
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
31
31
from heat.openstack.common.rpc import amqp as rpc_amqp
32
32
from heat.openstack.common.rpc import common as rpc_common
33
33
 
 
34
qpid_codec = importutils.try_import("qpid.codec010")
34
35
qpid_messaging = importutils.try_import("qpid.messaging")
35
36
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
36
37
 
40
41
    cfg.StrOpt('qpid_hostname',
41
42
               default='localhost',
42
43
               help='Qpid broker hostname'),
43
 
    cfg.StrOpt('qpid_port',
44
 
               default='5672',
 
44
    cfg.IntOpt('qpid_port',
 
45
               default=5672,
45
46
               help='Qpid broker port'),
46
47
    cfg.ListOpt('qpid_hosts',
47
48
                default=['$qpid_hostname:$qpid_port'],
51
52
               help='Username for qpid connection'),
52
53
    cfg.StrOpt('qpid_password',
53
54
               default='',
54
 
               help='Password for qpid connection'),
 
55
               help='Password for qpid connection',
 
56
               secret=True),
55
57
    cfg.StrOpt('qpid_sasl_mechanisms',
56
58
               default='',
57
59
               help='Space separated list of SASL mechanisms to use for auth'),
68
70
 
69
71
cfg.CONF.register_opts(qpid_opts)
70
72
 
 
73
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
 
74
 
71
75
 
72
76
class ConsumerBase(object):
73
77
    """Consumer base class."""
117
121
        self.reconnect(session)
118
122
 
119
123
    def reconnect(self, session):
120
 
        """Re-declare the receiver after a qpid reconnect"""
 
124
        """Re-declare the receiver after a qpid reconnect."""
121
125
        self.session = session
122
126
        self.receiver = session.receiver(self.address)
123
127
        self.receiver.capacity = 1
124
128
 
 
129
    def _unpack_json_msg(self, msg):
 
130
        """Load the JSON data in msg if msg.content_type indicates that it
 
131
           is necessary.  Put the loaded data back into msg.content and
 
132
           update msg.content_type appropriately.
 
133
 
 
134
        A Qpid Message containing a dict will have a content_type of
 
135
        'amqp/map', whereas one containing a string that needs to be converted
 
136
        back from JSON will have a content_type of JSON_CONTENT_TYPE.
 
137
 
 
138
        :param msg: a Qpid Message object
 
139
        :returns: None
 
140
        """
 
141
        if msg.content_type == JSON_CONTENT_TYPE:
 
142
            msg.content = jsonutils.loads(msg.content)
 
143
            msg.content_type = 'amqp/map'
 
144
 
125
145
    def consume(self):
126
 
        """Fetch the message and pass it to the callback object"""
 
146
        """Fetch the message and pass it to the callback object."""
127
147
        message = self.receiver.fetch()
128
148
        try:
 
149
            self._unpack_json_msg(message)
129
150
            msg = rpc_common.deserialize_msg(message.content)
130
151
            self.callback(msg)
131
152
        except Exception:
138
159
 
139
160
 
140
161
class DirectConsumer(ConsumerBase):
141
 
    """Queue/consumer class for 'direct'"""
 
162
    """Queue/consumer class for 'direct'."""
142
163
 
143
164
    def __init__(self, conf, session, msg_id, callback):
144
165
        """Init a 'direct' queue.
156
177
 
157
178
 
158
179
class TopicConsumer(ConsumerBase):
159
 
    """Consumer class for 'topic'"""
 
180
    """Consumer class for 'topic'."""
160
181
 
161
182
    def __init__(self, conf, session, topic, callback, name=None,
162
183
                 exchange_name=None):
176
197
 
177
198
 
178
199
class FanoutConsumer(ConsumerBase):
179
 
    """Consumer class for 'fanout'"""
 
200
    """Consumer class for 'fanout'."""
180
201
 
181
202
    def __init__(self, conf, session, topic, callback):
182
203
        """Init a 'fanout' queue.
195
216
 
196
217
 
197
218
class Publisher(object):
198
 
    """Base Publisher class"""
 
219
    """Base Publisher class."""
199
220
 
200
221
    def __init__(self, session, node_name, node_opts=None):
201
222
        """Init the Publisher class with the exchange_name, routing_key,
224
245
        self.reconnect(session)
225
246
 
226
247
    def reconnect(self, session):
227
 
        """Re-establish the Sender after a reconnection"""
 
248
        """Re-establish the Sender after a reconnection."""
228
249
        self.sender = session.sender(self.address)
229
250
 
 
251
    def _pack_json_msg(self, msg):
 
252
        """Qpid cannot serialize dicts containing strings longer than 65535
 
253
           characters.  This function dumps the message content to a JSON
 
254
           string, which Qpid is able to handle.
 
255
 
 
256
        :param msg: May be either a Qpid Message object or a bare dict.
 
257
        :returns: A Qpid Message with its content field JSON encoded.
 
258
        """
 
259
        try:
 
260
            msg.content = jsonutils.dumps(msg.content)
 
261
        except AttributeError:
 
262
            # Need to have a Qpid message so we can set the content_type.
 
263
            msg = qpid_messaging.Message(jsonutils.dumps(msg))
 
264
        msg.content_type = JSON_CONTENT_TYPE
 
265
        return msg
 
266
 
230
267
    def send(self, msg):
231
 
        """Send a message"""
 
268
        """Send a message."""
 
269
        try:
 
270
            # Check if Qpid can encode the message
 
271
            check_msg = msg
 
272
            if not hasattr(check_msg, 'content_type'):
 
273
                check_msg = qpid_messaging.Message(msg)
 
274
            content_type = check_msg.content_type
 
275
            enc, dec = qpid_messaging.message.get_codec(content_type)
 
276
            enc(check_msg.content)
 
277
        except qpid_codec.CodecException:
 
278
            # This means the message couldn't be serialized as a dict.
 
279
            msg = self._pack_json_msg(msg)
232
280
        self.sender.send(msg)
233
281
 
234
282
 
235
283
class DirectPublisher(Publisher):
236
 
    """Publisher class for 'direct'"""
 
284
    """Publisher class for 'direct'."""
237
285
    def __init__(self, conf, session, msg_id):
238
286
        """Init a 'direct' publisher."""
239
287
        super(DirectPublisher, self).__init__(session, msg_id,
241
289
 
242
290
 
243
291
class TopicPublisher(Publisher):
244
 
    """Publisher class for 'topic'"""
 
292
    """Publisher class for 'topic'."""
245
293
    def __init__(self, conf, session, topic):
246
294
        """init a 'topic' publisher.
247
295
        """
251
299
 
252
300
 
253
301
class FanoutPublisher(Publisher):
254
 
    """Publisher class for 'fanout'"""
 
302
    """Publisher class for 'fanout'."""
255
303
    def __init__(self, conf, session, topic):
256
304
        """init a 'fanout' publisher.
257
305
        """
261
309
 
262
310
 
263
311
class NotifyPublisher(Publisher):
264
 
    """Publisher class for notifications"""
 
312
    """Publisher class for notifications."""
265
313
    def __init__(self, conf, session, topic):
266
314
        """init a 'topic' publisher.
267
315
        """
319
367
        # Reconnection is done by self.reconnect()
320
368
        self.connection.reconnect = False
321
369
        self.connection.heartbeat = self.conf.qpid_heartbeat
322
 
        self.connection.protocol = self.conf.qpid_protocol
 
370
        self.connection.transport = self.conf.qpid_protocol
323
371
        self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
324
372
 
325
373
    def _register_consumer(self, consumer):
329
377
        return self.consumers[str(receiver)]
330
378
 
331
379
    def reconnect(self):
332
 
        """Handles reconnecting and re-establishing sessions and queues"""
333
 
        if self.connection.opened():
334
 
            try:
335
 
                self.connection.close()
336
 
            except qpid_exceptions.ConnectionError:
337
 
                pass
338
 
 
 
380
        """Handles reconnecting and re-establishing sessions and queues."""
339
381
        attempt = 0
340
382
        delay = 1
341
383
        while True:
 
384
            # Close the session if necessary
 
385
            if self.connection.opened():
 
386
                try:
 
387
                    self.connection.close()
 
388
                except qpid_exceptions.ConnectionError:
 
389
                    pass
 
390
 
342
391
            broker = self.brokers[attempt % len(self.brokers)]
343
392
            attempt += 1
344
393
 
345
394
            try:
346
395
                self.connection_create(broker)
347
396
                self.connection.open()
348
 
            except qpid_exceptions.ConnectionError, e:
 
397
            except qpid_exceptions.ConnectionError as e:
349
398
                msg_dict = dict(e=e, delay=delay)
350
399
                msg = _("Unable to connect to AMQP server: %(e)s. "
351
400
                        "Sleeping %(delay)s seconds") % msg_dict
373
422
            try:
374
423
                return method(*args, **kwargs)
375
424
            except (qpid_exceptions.Empty,
376
 
                    qpid_exceptions.ConnectionError), e:
 
425
                    qpid_exceptions.ConnectionError) as e:
377
426
                if error_callback:
378
427
                    error_callback(e)
379
428
                self.reconnect()
380
429
 
381
430
    def close(self):
382
 
        """Close/release this connection"""
 
431
        """Close/release this connection."""
383
432
        self.cancel_consumer_thread()
384
433
        self.wait_on_proxy_callbacks()
385
 
        self.connection.close()
 
434
        try:
 
435
            self.connection.close()
 
436
        except Exception:
 
437
            # NOTE(dripton) Logging exceptions that happen during cleanup just
 
438
            # causes confusion; there's really nothing useful we can do with
 
439
            # them.
 
440
            pass
386
441
        self.connection = None
387
442
 
388
443
    def reset(self):
389
 
        """Reset a connection so it can be used again"""
 
444
        """Reset a connection so it can be used again."""
390
445
        self.cancel_consumer_thread()
391
446
        self.wait_on_proxy_callbacks()
392
447
        self.session.close()
410
465
        return self.ensure(_connect_error, _declare_consumer)
411
466
 
412
467
    def iterconsume(self, limit=None, timeout=None):
413
 
        """Return an iterator that will consume from all queues/consumers"""
 
468
        """Return an iterator that will consume from all queues/consumers."""
414
469
 
415
470
        def _error_callback(exc):
416
471
            if isinstance(exc, qpid_exceptions.Empty):
417
 
                LOG.exception(_('Timed out waiting for RPC response: %s') %
418
 
                              str(exc))
 
472
                LOG.debug(_('Timed out waiting for RPC response: %s') %
 
473
                          str(exc))
419
474
                raise rpc_common.Timeout()
420
475
            else:
421
476
                LOG.exception(_('Failed to consume message from queue: %s') %
434
489
            yield self.ensure(_error_callback, _consume)
435
490
 
436
491
    def cancel_consumer_thread(self):
437
 
        """Cancel a consumer thread"""
 
492
        """Cancel a consumer thread."""
438
493
        if self.consumer_thread is not None:
439
494
            self.consumer_thread.kill()
440
495
            try:
449
504
            proxy_cb.wait()
450
505
 
451
506
    def publisher_send(self, cls, topic, msg):
452
 
        """Send to a publisher based on the publisher class"""
 
507
        """Send to a publisher based on the publisher class."""
453
508
 
454
509
        def _connect_error(exc):
455
510
            log_info = {'topic': topic, 'err_str': str(exc)}
479
534
                              topic, callback)
480
535
 
481
536
    def declare_fanout_consumer(self, topic, callback):
482
 
        """Create a 'fanout' consumer"""
 
537
        """Create a 'fanout' consumer."""
483
538
        self.declare_consumer(FanoutConsumer, topic, callback)
484
539
 
485
540
    def direct_send(self, msg_id, msg):
486
 
        """Send a 'direct' message"""
 
541
        """Send a 'direct' message."""
487
542
        self.publisher_send(DirectPublisher, msg_id, msg)
488
543
 
489
544
    def topic_send(self, topic, msg, timeout=None):
490
 
        """Send a 'topic' message"""
 
545
        """Send a 'topic' message."""
491
546
        #
492
547
        # We want to create a message with attributes, e.g. a TTL. We
493
548
        # don't really need to keep 'msg' in its JSON format any longer
502
557
        self.publisher_send(TopicPublisher, topic, qpid_message)
503
558
 
504
559
    def fanout_send(self, topic, msg):
505
 
        """Send a 'fanout' message"""
 
560
        """Send a 'fanout' message."""
506
561
        self.publisher_send(FanoutPublisher, topic, msg)
507
562
 
508
563
    def notify_send(self, topic, msg, **kwargs):
509
 
        """Send a notify message on a topic"""
 
564
        """Send a notify message on a topic."""
510
565
        self.publisher_send(NotifyPublisher, topic, msg)
511
566
 
512
567
    def consume(self, limit=None):
513
 
        """Consume from all queues/consumers"""
 
568
        """Consume from all queues/consumers."""
514
569
        it = self.iterconsume(limit=limit)
515
570
        while True:
516
571
            try:
519
574
                return
520
575
 
521
576
    def consume_in_thread(self):
522
 
        """Consumer from all queues/consumers in a greenthread"""
 
577
        """Consumer from all queues/consumers in a greenthread."""
523
578
        def _consumer_thread():
524
579
            try:
525
580
                self.consume()
530
585
        return self.consumer_thread
531
586
 
532
587
    def create_consumer(self, topic, proxy, fanout=False):
533
 
        """Create a consumer that calls a method in a proxy object"""
 
588
        """Create a consumer that calls a method in a proxy object."""
534
589
        proxy_cb = rpc_amqp.ProxyCallback(
535
590
            self.conf, proxy,
536
591
            rpc_amqp.get_connection_pool(self.conf, Connection))
546
601
        return consumer
547
602
 
548
603
    def create_worker(self, topic, proxy, pool_name):
549
 
        """Create a worker that calls a method in a proxy object"""
 
604
        """Create a worker that calls a method in a proxy object."""
550
605
        proxy_cb = rpc_amqp.ProxyCallback(
551
606
            self.conf, proxy,
552
607
            rpc_amqp.get_connection_pool(self.conf, Connection))
559
614
 
560
615
        return consumer
561
616
 
 
617
    def join_consumer_pool(self, callback, pool_name, topic,
 
618
                           exchange_name=None):
 
619
        """Register as a member of a group of consumers for a given topic from
 
620
        the specified exchange.
 
621
 
 
622
        Exactly one member of a given pool will receive each message.
 
623
 
 
624
        A message will be delivered to multiple pools, if more than
 
625
        one is created.
 
626
        """
 
627
        callback_wrapper = rpc_amqp.CallbackWrapper(
 
628
            conf=self.conf,
 
629
            callback=callback,
 
630
            connection_pool=rpc_amqp.get_connection_pool(self.conf,
 
631
                                                         Connection),
 
632
        )
 
633
        self.proxy_callbacks.append(callback_wrapper)
 
634
 
 
635
        consumer = TopicConsumer(conf=self.conf,
 
636
                                 session=self.session,
 
637
                                 topic=topic,
 
638
                                 callback=callback_wrapper,
 
639
                                 name=pool_name,
 
640
                                 exchange_name=exchange_name)
 
641
 
 
642
        self._register_consumer(consumer)
 
643
        return consumer
 
644
 
562
645
 
563
646
def create_connection(conf, new=True):
564
 
    """Create a connection"""
 
647
    """Create a connection."""
565
648
    return rpc_amqp.create_connection(
566
649
        conf, new,
567
650
        rpc_amqp.get_connection_pool(conf, Connection))