~ubuntu-branches/ubuntu/raring/nova/raring-proposed

« back to all changes in this revision

Viewing changes to nova/openstack/common/rpc/impl_qpid.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2013-01-11 13:06:56 UTC
  • mto: This revision was merged to the branch mainline in revision 96.
  • Revision ID: package-import@ubuntu.com-20130111130656-z9mceux6qpkqomma
Tags: upstream-2013.1~g2
ImportĀ upstreamĀ versionĀ 2013.1~g2

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
 
18
18
import functools
19
19
import itertools
20
 
import logging
21
20
import time
22
21
import uuid
23
22
 
29
28
from nova.openstack.common import cfg
30
29
from nova.openstack.common.gettextutils import _
31
30
from nova.openstack.common import jsonutils
 
31
from nova.openstack.common import log as logging
32
32
from nova.openstack.common.rpc import amqp as rpc_amqp
33
33
from nova.openstack.common.rpc import common as rpc_common
34
34
 
41
41
    cfg.StrOpt('qpid_port',
42
42
               default='5672',
43
43
               help='Qpid broker port'),
 
44
    cfg.ListOpt('qpid_hosts',
 
45
                default=['$qpid_hostname:$qpid_port'],
 
46
                help='Qpid HA cluster host:port pairs'),
44
47
    cfg.StrOpt('qpid_username',
45
48
               default='',
46
49
               help='Username for qpid connection'),
121
124
        """Fetch the message and pass it to the callback object"""
122
125
        message = self.receiver.fetch()
123
126
        try:
124
 
            self.callback(message.content)
 
127
            msg = rpc_common.deserialize_msg(message.content)
 
128
            self.callback(msg)
125
129
        except Exception:
126
130
            LOG.exception(_("Failed to process message... skipping it."))
127
131
        finally:
274
278
        self.session = None
275
279
        self.consumers = {}
276
280
        self.consumer_thread = None
 
281
        self.proxy_callbacks = []
277
282
        self.conf = conf
278
283
 
 
284
        if server_params and 'hostname' in server_params:
 
285
            # NOTE(russellb) This enables support for cast_to_server.
 
286
            server_params['qpid_hosts'] = [
 
287
                '%s:%d' % (server_params['hostname'],
 
288
                           server_params.get('port', 5672))
 
289
            ]
 
290
 
279
291
        params = {
280
 
            'hostname': self.conf.qpid_hostname,
281
 
            'port': self.conf.qpid_port,
 
292
            'qpid_hosts': self.conf.qpid_hosts,
282
293
            'username': self.conf.qpid_username,
283
294
            'password': self.conf.qpid_password,
284
295
        }
285
296
        params.update(server_params or {})
286
297
 
287
 
        self.broker = params['hostname'] + ":" + str(params['port'])
 
298
        self.brokers = params['qpid_hosts']
288
299
        self.username = params['username']
289
300
        self.password = params['password']
290
 
        self.connection_create()
 
301
        self.connection_create(self.brokers[0])
291
302
        self.reconnect()
292
303
 
293
 
    def connection_create(self):
 
304
    def connection_create(self, broker):
294
305
        # Create the connection - this does not open the connection
295
 
        self.connection = qpid.messaging.Connection(self.broker)
 
306
        self.connection = qpid.messaging.Connection(broker)
296
307
 
297
308
        # Check if flags are set and if so set them for the connection
298
309
        # before we call open
320
331
            except qpid.messaging.exceptions.ConnectionError:
321
332
                pass
322
333
 
 
334
        attempt = 0
323
335
        delay = 1
324
336
        while True:
 
337
            broker = self.brokers[attempt % len(self.brokers)]
 
338
            attempt += 1
 
339
 
325
340
            try:
326
 
                self.connection_create()
 
341
                self.connection_create(broker)
327
342
                self.connection.open()
328
343
            except qpid.messaging.exceptions.ConnectionError, e:
329
344
                msg_dict = dict(e=e, delay=delay)
333
348
                time.sleep(delay)
334
349
                delay = min(2 * delay, 60)
335
350
            else:
 
351
                LOG.info(_('Connected to AMQP server on %s'), broker)
336
352
                break
337
353
 
338
 
        LOG.info(_('Connected to AMQP server on %s'), self.broker)
339
 
 
340
354
        self.session = self.connection.session()
341
355
 
342
356
        if self.consumers:
362
376
    def close(self):
363
377
        """Close/release this connection"""
364
378
        self.cancel_consumer_thread()
 
379
        self.wait_on_proxy_callbacks()
365
380
        self.connection.close()
366
381
        self.connection = None
367
382
 
368
383
    def reset(self):
369
384
        """Reset a connection so it can be used again"""
370
385
        self.cancel_consumer_thread()
 
386
        self.wait_on_proxy_callbacks()
371
387
        self.session.close()
372
388
        self.session = self.connection.session()
373
389
        self.consumers = {}
422
438
                pass
423
439
            self.consumer_thread = None
424
440
 
 
441
    def wait_on_proxy_callbacks(self):
 
442
        """Wait for all proxy callback threads to exit."""
 
443
        for proxy_cb in self.proxy_callbacks:
 
444
            proxy_cb.wait()
 
445
 
425
446
    def publisher_send(self, cls, topic, msg):
426
447
        """Send to a publisher based on the publisher class"""
427
448
 
497
518
        proxy_cb = rpc_amqp.ProxyCallback(
498
519
            self.conf, proxy,
499
520
            rpc_amqp.get_connection_pool(self.conf, Connection))
 
521
        self.proxy_callbacks.append(proxy_cb)
500
522
 
501
523
        if fanout:
502
524
            consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
512
534
        proxy_cb = rpc_amqp.ProxyCallback(
513
535
            self.conf, proxy,
514
536
            rpc_amqp.get_connection_pool(self.conf, Connection))
 
537
        self.proxy_callbacks.append(proxy_cb)
515
538
 
516
539
        consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
517
540
                                 name=pool_name)
570
593
        rpc_amqp.get_connection_pool(conf, Connection))
571
594
 
572
595
 
573
 
def notify(conf, context, topic, msg):
 
596
def notify(conf, context, topic, msg, envelope):
574
597
    """Sends a notification event on a topic."""
575
598
    return rpc_amqp.notify(conf, context, topic, msg,
576
 
                           rpc_amqp.get_connection_pool(conf, Connection))
 
599
                           rpc_amqp.get_connection_pool(conf, Connection),
 
600
                           envelope)
577
601
 
578
602
 
579
603
def cleanup():