433
437
self.interval_stepping = self.conf.rabbit_retry_backoff
434
438
# max retry-interval = 30 seconds
435
439
self.interval_max = 30
436
self.memory_transport = False
438
ssl_params = self._fetch_ssl_params()
441
self._ssl_params = self._fetch_ssl_params()
442
self._login_method = self.conf.rabbit_login_method
440
444
if url.virtual_host is not None:
441
445
virtual_host = url.virtual_host
443
447
virtual_host = self.conf.rabbit_virtual_host
445
self.brokers_params = []
450
if self.conf.fake_rabbit:
451
LOG.warn("Deprecated: fake_rabbit option is deprecated, set "
452
"rpc_backend to kombu+memory or use the fake "
454
self._url = 'memory://%s/' % virtual_host
447
456
for host in url.hosts:
449
'hostname': host.hostname,
450
'port': host.port or 5672,
451
'userid': host.username or '',
452
'password': host.password or '',
453
'login_method': self.conf.rabbit_login_method,
454
'virtual_host': virtual_host
456
if self.conf.fake_rabbit:
457
params['transport'] = 'memory'
458
if self.conf.rabbit_use_ssl:
459
params['ssl'] = ssl_params
461
self.brokers_params.append(params)
457
transport = url.transport.replace('kombu+', '')
458
transport = url.transport.replace('rabbit', 'amqp')
459
self._url += '%s%s://%s:%s@%s:%s/%s' % (
460
";" if self._url else '',
462
parse.quote(host.username or ''),
463
parse.quote(host.password or ''),
464
host.hostname or '', str(host.port or 5672),
466
elif url.transport.startswith('kombu+'):
467
# NOTE(sileht): url have a + but no hosts
468
# (like kombu+memory:///), pass it to kombu as-is
469
transport = url.transport.replace('kombu+', '')
470
self._url = "%s://%s" % (transport, virtual_host)
463
# Old configuration format
464
472
for adr in self.conf.rabbit_hosts:
465
473
hostname, port = netutils.parse_host_port(
466
474
adr, default_port=self.conf.rabbit_port)
469
'hostname': hostname,
471
'userid': self.conf.rabbit_userid,
472
'password': self.conf.rabbit_password,
473
'login_method': self.conf.rabbit_login_method,
474
'virtual_host': virtual_host
477
if self.conf.fake_rabbit:
478
params['transport'] = 'memory'
479
if self.conf.rabbit_use_ssl:
480
params['ssl'] = ssl_params
482
self.brokers_params.append(params)
484
random.shuffle(self.brokers_params)
485
self.brokers = itertools.cycle(self.brokers_params)
487
self.memory_transport = self.conf.fake_rabbit
489
self.connection = None
490
self.do_consume = None
475
self._url += '%samqp://%s:%s@%s:%s/%s' % (
476
";" if self._url else '',
477
parse.quote(self.conf.rabbit_userid),
478
parse.quote(self.conf.rabbit_password),
482
self.do_consume = True
485
self.connection = kombu.connection.Connection(
486
self._url, ssl=self._ssl_params, login_method=self._login_method,
487
failover_strategy="shuffle")
489
LOG.info(_('Connecting to AMQP server on %(hostname)s:%(port)d'),
490
{'hostname': self.connection.hostname,
491
'port': self.connection.port})
492
# NOTE(sileht): just ensure the connection is setuped at startup
493
self.ensure(error_callback=None,
494
method=lambda channel: True)
495
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
496
{'hostname': self.connection.hostname,
497
'port': self.connection.port})
499
if self._url.startswith('memory://'):
500
# Kludge to speed up tests.
501
self.connection.transport.polling_interval = 0.0
493
503
# FIXME(markmc): use oslo sslutils when it is available as a library
494
504
_SSL_PROTOCOLS = {
495
505
"tlsv1": ssl.PROTOCOL_TLSv1,
496
"sslv23": ssl.PROTOCOL_SSLv23,
497
"sslv3": ssl.PROTOCOL_SSLv3
506
"sslv23": ssl.PROTOCOL_SSLv23
531
545
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
533
547
# Return the extended behavior or just have the default behavior
534
return ssl_params or True
548
return ssl_params or None
536
def _connect(self, broker):
537
"""Connect to rabbit. Re-establish any queues that may have
538
been declared before if we are reconnecting. Exceptions should
539
be handled by the caller.
550
def ensure(self, error_callback, method, retry=None,
551
timeout_is_error=True):
552
"""Will retry up to retry number of times.
553
retry = None means use the value of rabbit_max_retries
554
retry = -1 means to retry forever
555
retry = 0 means no retry
556
retry = N means N retries
541
LOG.info(_("Connecting to AMQP server on "
542
"%(hostname)s:%(port)d"), broker)
543
self.connection = kombu.connection.BrokerConnection(**broker)
544
self.connection_errors = self.connection.connection_errors
545
self.channel_errors = self.connection.channel_errors
546
if self.memory_transport:
547
# Kludge to speed up tests.
548
self.connection.transport.polling_interval = 0.0
549
self.do_consume = True
550
self.consumer_num = itertools.count(1)
551
self.connection.connect()
552
self.channel = self.connection.channel()
553
# work around 'memory' transport bug in 1.1.3
554
if self.memory_transport:
555
self.channel._new_queue('ae.undeliver')
556
for consumer in self.consumers:
557
consumer.reconnect(self.channel)
558
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
561
def _disconnect(self):
560
retry = self.max_retries
561
if retry is None or retry < 0:
564
def on_error(exc, interval):
567
error_callback and error_callback(exc)
569
interval = (self.conf.kombu_reconnect_delay + interval
570
if self.conf.kombu_reconnect_delay > 0 else interval)
572
info = {'hostname': self.connection.hostname,
573
'port': self.connection.port,
574
'err_str': exc, 'sleep_time': interval}
576
if 'Socket closed' in six.text_type(exc):
577
LOG.error(_('AMQP server %(hostname)s:%(port)s closed'
578
' the connection. Check login credentials:'
579
' %(err_str)s'), info)
581
LOG.error(_('AMQP server on %(hostname)s:%(port)s is '
582
'unreachable: %(err_str)s. Trying again in '
583
'%(sleep_time)d seconds.'), info)
563
585
# XXX(nic): when reconnecting to a RabbitMQ cluster
564
586
# with mirrored queues in use, the attempt to release the
565
587
# connection can hang "indefinitely" somewhere deep down
566
588
# in Kombu. Blocking the thread for a bit prior to
567
589
# release seems to kludge around the problem where it is
568
590
# otherwise reproduceable.
591
# TODO(sileht): Check if this is useful since we
592
# use kombu for HA connection, the interval_step
593
# should sufficient, because the underlying kombu transport
594
# connection object freed.
569
595
if self.conf.kombu_reconnect_delay > 0:
570
LOG.info(_("Delaying reconnect for %1.1f seconds...") %
571
self.conf.kombu_reconnect_delay)
572
596
time.sleep(self.conf.kombu_reconnect_delay)
575
self.connection.release()
576
except self.connection_errors:
578
self.connection = None
580
def reconnect(self, retry=None):
581
"""Handles reconnecting and re-establishing queues.
582
Will retry up to retry number of times.
583
retry = None means use the value of rabbit_max_retries
584
retry = -1 means to retry forever
585
retry = 0 means no retry
586
retry = N means N retries
587
Sleep between tries, starting at self.interval_start
588
seconds, backing off self.interval_stepping number of seconds
595
retry = self.max_retries
596
if retry is None or retry < 0:
602
broker = six.next(self.brokers)
605
self._connect(broker)
607
except IOError as ex:
609
except self.connection_errors as ex:
611
except Exception as ex:
612
# NOTE(comstud): Unfortunately it's possible for amqplib
613
# to return an error not covered by its transport
614
# connection_errors in the case of a timeout waiting for
615
# a protocol response. (See paste link in LP888621)
616
# So, we check all exceptions for 'timeout' in them
617
# and try to reconnect in this case.
618
if 'timeout' not in six.text_type(e):
623
log_info['err_str'] = e
624
log_info['retry'] = retry or 0
625
log_info.update(broker)
627
if not loop_forever and attempt > retry:
628
msg = _('Unable to connect to AMQP server on '
629
'%(hostname)s:%(port)d after %(retry)d '
630
'tries: %(err_str)s') % log_info
632
raise exceptions.MessageDeliveryFailure(msg)
635
sleep_time = self.interval_start or 1
637
sleep_time += self.interval_stepping
639
sleep_time = min(sleep_time, self.interval_max)
641
log_info['sleep_time'] = sleep_time
642
if 'Socket closed' in six.text_type(e):
643
LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
644
' the connection. Check login credentials:'
645
' %(err_str)s'), log_info)
647
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
648
'unreachable: %(err_str)s. Trying again in '
649
'%(sleep_time)d seconds.'), log_info)
650
time.sleep(sleep_time)
652
def ensure(self, error_callback, method, retry=None):
656
except self.connection_errors as e:
659
except self.channel_errors as e:
662
except (socket.timeout, IOError) as e:
665
except Exception as e:
666
# NOTE(comstud): Unfortunately it's possible for amqplib
667
# to return an error not covered by its transport
668
# connection_errors in the case of a timeout waiting for
669
# a protocol response. (See paste link in LP888621)
670
# So, we check all exceptions for 'timeout' in them
671
# and try to reconnect in this case.
672
if 'timeout' not in six.text_type(e):
676
self.reconnect(retry=retry)
678
def get_channel(self):
679
"""Convenience call for bin/clear_rabbit_queues."""
598
def on_reconnection(new_channel):
599
"""Callback invoked when the kombu reconnects and creates
600
a new channel, we use it the reconfigure our consumers.
602
self.consumer_num = itertools.count(1)
603
for consumer in self.consumers:
604
consumer.reconnect(new_channel)
606
recoverable_errors = (self.connection.recoverable_channel_errors +
607
self.connection.recoverable_connection_errors)
609
autoretry_method = self.connection.autoretry(
610
method, channel=self.channel,
613
interval_start=self.interval_start or 1,
614
interval_step=self.interval_stepping,
615
on_revive=on_reconnection,
617
ret, channel = autoretry_method()
618
self.channel = channel
620
except recoverable_errors as exc:
622
# NOTE(sileht): number of retry exceeded and the connection
624
msg = _('Unable to connect to AMQP server on '
625
'%(hostname)s:%(port)d after %(retry)d '
626
'tries: %(err_str)s') % {
627
'hostname': self.connection.hostname,
628
'port': self.connection.port,
632
raise exceptions.MessageDeliveryFailure(msg)
683
635
"""Close/release this connection."""