602
604
self.log.debug('Ceph {} samples (OK): '
603
605
'{}'.format(sample_type, samples))
608
# rabbitmq/amqp specific helpers:
610
def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
611
"""Wait for rmq units extended status to show cluster readiness,
612
after an optional initial sleep period. Initial sleep is likely
613
necessary to be effective following a config change, as status
614
message may not instantly update to non-ready."""
617
time.sleep(init_sleep)
619
message = re.compile('^Unit is ready and clustered$')
620
deployment._auto_wait_for_status(message=message,
622
include_only=['rabbitmq-server'])
624
def add_rmq_test_user(self, sentry_units,
625
username="testuser1", password="changeme"):
626
"""Add a test user via the first rmq juju unit, check connection as
627
the new user against all sentry units.
629
:param sentry_units: list of sentry unit pointers
630
:param username: amqp user name, default to testuser1
631
:param password: amqp user password
632
:returns: None if successful. Raise on error.
634
self.log.debug('Adding rmq user ({})...'.format(username))
636
# Check that user does not already exist
637
cmd_user_list = 'rabbitmqctl list_users'
638
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
639
if username in output:
640
self.log.warning('User ({}) already exists, returning '
641
'gracefully.'.format(username))
644
perms = '".*" ".*" ".*"'
645
cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
646
'rabbitmqctl set_permissions {} {}'.format(username, perms)]
648
# Add user via first unit
650
output, _ = self.run_cmd_unit(sentry_units[0], cmd)
652
# Check connection against the other sentry_units
653
self.log.debug('Checking user connect against units...')
654
for sentry_unit in sentry_units:
655
connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
660
def delete_rmq_test_user(self, sentry_units, username="testuser1"):
661
"""Delete a rabbitmq user via the first rmq juju unit.
663
:param sentry_units: list of sentry unit pointers
664
:param username: amqp user name, default to testuser1
665
:param password: amqp user password
666
:returns: None if successful or no such user.
668
self.log.debug('Deleting rmq user ({})...'.format(username))
670
# Check that the user exists
671
cmd_user_list = 'rabbitmqctl list_users'
672
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
674
if username not in output:
675
self.log.warning('User ({}) does not exist, returning '
676
'gracefully.'.format(username))
680
cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
681
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
683
def get_rmq_cluster_status(self, sentry_unit):
684
"""Execute rabbitmq cluster status command on a unit and return
687
:param unit: sentry unit
688
:returns: String containing console output of cluster status command
690
cmd = 'rabbitmqctl cluster_status'
691
output, _ = self.run_cmd_unit(sentry_unit, cmd)
692
self.log.debug('{} cluster_status:\n{}'.format(
693
sentry_unit.info['unit_name'], output))
696
def get_rmq_cluster_running_nodes(self, sentry_unit):
697
"""Parse rabbitmqctl cluster_status output string, return list of
698
running rabbitmq cluster nodes.
700
:param unit: sentry unit
701
:returns: List containing node names of running nodes
703
# NOTE(beisner): rabbitmqctl cluster_status output is not
704
# json-parsable, do string chop foo, then json.loads that.
705
str_stat = self.get_rmq_cluster_status(sentry_unit)
706
if 'running_nodes' in str_stat:
707
pos_start = str_stat.find("{running_nodes,") + 15
708
pos_end = str_stat.find("]},", pos_start) + 1
709
str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
710
run_nodes = json.loads(str_run_nodes)
715
def validate_rmq_cluster_running_nodes(self, sentry_units):
716
"""Check that all rmq unit hostnames are represented in the
717
cluster_status output of all units.
719
:param host_names: dict of juju unit names to host names
720
:param units: list of sentry unit pointers (all rmq units)
721
:returns: None if successful, otherwise return error message
723
host_names = self.get_unit_hostnames(sentry_units)
726
# Query every unit for cluster_status running nodes
727
for query_unit in sentry_units:
728
query_unit_name = query_unit.info['unit_name']
729
running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
731
# Confirm that every unit is represented in the queried unit's
732
# cluster_status running nodes output.
733
for validate_unit in sentry_units:
734
val_host_name = host_names[validate_unit.info['unit_name']]
735
val_node_name = 'rabbit@{}'.format(val_host_name)
737
if val_node_name not in running_nodes:
738
errors.append('Cluster member check failed on {}: {} not '
739
'in {}\n'.format(query_unit_name,
743
return ''.join(errors)
745
def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
746
"""Check a single juju rmq unit for ssl and port in the config file."""
747
host = sentry_unit.info['public-address']
748
unit_name = sentry_unit.info['unit_name']
750
conf_file = '/etc/rabbitmq/rabbitmq.config'
751
conf_contents = str(self.file_contents_safe(sentry_unit,
752
conf_file, max_wait=16))
754
conf_ssl = 'ssl' in conf_contents
755
conf_port = str(port) in conf_contents
757
# Port explicitly checked in config
758
if port and conf_port and conf_ssl:
759
self.log.debug('SSL is enabled @{}:{} '
760
'({})'.format(host, port, unit_name))
762
elif port and not conf_port and conf_ssl:
763
self.log.debug('SSL is enabled @{} but not on port {} '
764
'({})'.format(host, port, unit_name))
766
# Port not checked (useful when checking that ssl is disabled)
767
elif not port and conf_ssl:
768
self.log.debug('SSL is enabled @{}:{} '
769
'({})'.format(host, port, unit_name))
772
self.log.debug('SSL not enabled @{}:{} '
773
'({})'.format(host, port, unit_name))
776
msg = ('Unknown condition when checking SSL status @{}:{} '
777
'({})'.format(host, port, unit_name))
778
amulet.raise_status(amulet.FAIL, msg)
780
def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
781
"""Check that ssl is enabled on rmq juju sentry units.
783
:param sentry_units: list of all rmq sentry units
784
:param port: optional ssl port override to validate
785
:returns: None if successful, otherwise return error message
787
for sentry_unit in sentry_units:
788
if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
789
return ('Unexpected condition: ssl is disabled on unit '
790
'({})'.format(sentry_unit.info['unit_name']))
793
def validate_rmq_ssl_disabled_units(self, sentry_units):
794
"""Check that ssl is enabled on listed rmq juju sentry units.
796
:param sentry_units: list of all rmq sentry units
797
:returns: True if successful. Raise on error.
799
for sentry_unit in sentry_units:
800
if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
801
return ('Unexpected condition: ssl is enabled on unit '
802
'({})'.format(sentry_unit.info['unit_name']))
805
def configure_rmq_ssl_on(self, sentry_units, deployment,
806
port=None, max_wait=60):
807
"""Turn ssl charm config option on, with optional non-default
808
ssl port specification. Confirm that it is enabled on every
811
:param sentry_units: list of sentry units
812
:param deployment: amulet deployment object pointer
813
:param port: amqp port, use defaults if None
814
:param max_wait: maximum time to wait in seconds to confirm
815
:returns: None if successful. Raise on error.
817
self.log.debug('Setting ssl charm config option: on')
820
config = {'ssl': 'on'}
822
config['ssl_port'] = port
824
deployment.d.configure('rabbitmq-server', config)
826
# Wait for unit status
827
self.rmq_wait_for_cluster(deployment)
831
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
832
while ret and tries < (max_wait / 4):
834
self.log.debug('Attempt {}: {}'.format(tries, ret))
835
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
839
amulet.raise_status(amulet.FAIL, ret)
841
def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
842
"""Turn ssl charm config option off, confirm that it is disabled
845
:param sentry_units: list of sentry units
846
:param deployment: amulet deployment object pointer
847
:param max_wait: maximum time to wait in seconds to confirm
848
:returns: None if successful. Raise on error.
850
self.log.debug('Setting ssl charm config option: off')
853
config = {'ssl': 'off'}
854
deployment.d.configure('rabbitmq-server', config)
856
# Wait for unit status
857
self.rmq_wait_for_cluster(deployment)
861
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
862
while ret and tries < (max_wait / 4):
864
self.log.debug('Attempt {}: {}'.format(tries, ret))
865
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
869
amulet.raise_status(amulet.FAIL, ret)
871
def connect_amqp_by_unit(self, sentry_unit, ssl=False,
872
port=None, fatal=True,
873
username="testuser1", password="changeme"):
874
"""Establish and return a pika amqp connection to the rabbitmq service
875
running on a rmq juju unit.
877
:param sentry_unit: sentry unit pointer
878
:param ssl: boolean, default to False
879
:param port: amqp port, use defaults if None
880
:param fatal: boolean, default to True (raises on connect error)
881
:param username: amqp user name, default to testuser1
882
:param password: amqp user password
883
:returns: pika amqp connection pointer or None if failed and non-fatal
885
host = sentry_unit.info['public-address']
886
unit_name = sentry_unit.info['unit_name']
888
# Default port logic if port is not specified
891
elif not ssl and not port:
894
self.log.debug('Connecting to amqp on {}:{} ({}) as '
895
'{}...'.format(host, port, unit_name, username))
898
credentials = pika.PlainCredentials(username, password)
899
parameters = pika.ConnectionParameters(host=host, port=port,
900
credentials=credentials,
902
connection_attempts=3,
905
connection = pika.BlockingConnection(parameters)
906
assert connection.server_properties['product'] == 'RabbitMQ'
907
self.log.debug('Connect OK')
909
except Exception as e:
910
msg = ('amqp connection failed to {}:{} as '
911
'{} ({})'.format(host, port, username, str(e)))
913
amulet.raise_status(amulet.FAIL, msg)
918
def publish_amqp_message_by_unit(self, sentry_unit, message,
919
queue="test", ssl=False,
920
username="testuser1",
923
"""Publish an amqp message to a rmq juju unit.
925
:param sentry_unit: sentry unit pointer
926
:param message: amqp message string
927
:param queue: message queue, default to test
928
:param username: amqp user name, default to testuser1
929
:param password: amqp user password
930
:param ssl: boolean, default to False
931
:param port: amqp port, use defaults if None
932
:returns: None. Raises exception if publish failed.
934
self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
936
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
941
# NOTE(beisner): extra debug here re: pika hang potential:
942
# https://github.com/pika/pika/issues/297
943
# https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
944
self.log.debug('Defining channel...')
945
channel = connection.channel()
946
self.log.debug('Declaring queue...')
947
channel.queue_declare(queue=queue, auto_delete=False, durable=True)
948
self.log.debug('Publishing message...')
949
channel.basic_publish(exchange='', routing_key=queue, body=message)
950
self.log.debug('Closing channel...')
952
self.log.debug('Closing connection...')
955
def get_amqp_message_by_unit(self, sentry_unit, queue="test",
956
username="testuser1",
958
ssl=False, port=None):
959
"""Get an amqp message from a rmq juju unit.
961
:param sentry_unit: sentry unit pointer
962
:param queue: message queue, default to test
963
:param username: amqp user name, default to testuser1
964
:param password: amqp user password
965
:param ssl: boolean, default to False
966
:param port: amqp port, use defaults if None
967
:returns: amqp message body as string. Raise if get fails.
969
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
973
channel = connection.channel()
974
method_frame, _, body = channel.basic_get(queue)
977
self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
979
channel.basic_ack(method_frame.delivery_tag)
984
msg = 'No message retrieved.'
985
amulet.raise_status(amulet.FAIL, msg)