~cbehrens/nova/rpc-kombu

« back to all changes in this revision

Viewing changes to nova/rpc/impl_kombu.py

  • Committer: Chris Behrens
  • Date: 2011-08-31 18:54:19 UTC
  • Revision ID: cbehrens@codestud.com-20110831185419-3j19qwmn66fho4is
kludge for kombu 1.1.3 memory transport bug

Show diffs side-by-side

added added

removed removed

Lines of Context:
303
303
        self.interval_stepping = FLAGS.rabbit_retry_backoff
304
304
        # max retry-interval = 30 seconds
305
305
        self.interval_max = 30
 
306
        self.memory_transport = False
306
307
 
307
308
        self.params = dict(hostname=FLAGS.rabbit_host,
308
309
                          port=FLAGS.rabbit_port,
311
312
                          virtual_host=FLAGS.rabbit_virtual_host)
312
313
        if FLAGS.fake_rabbit:
313
314
            self.params['transport'] = 'memory'
 
315
            self.memory_transport = True
 
316
        else:
 
317
            self.memory_transport = False
314
318
        self.connection = None
315
319
        self.reconnect()
316
320
 
323
327
                pass
324
328
            time.sleep(1)
325
329
        self.connection = kombu.connection.BrokerConnection(**self.params)
326
 
        if FLAGS.fake_rabbit:
 
330
        if self.memory_transport:
327
331
            # Kludge to speed up tests.
328
332
            self.connection.transport.polling_interval = 0.0
329
333
        self.consumer_num = itertools.count(1)
345
349
        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' %
346
350
                self.params))
347
351
        self.channel = self.connection.channel()
 
352
        # work around 'memory' transport bug in 1.1.3
 
353
        if self.memory_transport:
 
354
            self.channel._new_queue('ae.undeliver')
348
355
        for consumer in self.consumers:
349
356
            consumer.reconnect(self.channel)
350
357
        if self.consumers:
374
381
        self.cancel_consumer_thread()
375
382
        self.channel.close()
376
383
        self.channel = self.connection.channel()
 
384
        # work around 'memory' transport bug in 1.1.3
 
385
        if self.memory_transport:
 
386
            self.channel._new_queue('ae.undeliver')
377
387
        self.consumers = []
378
388
 
379
389
    def declare_consumer(self, consumer_cls, topic, callback):