602
631
self.log.debug('Ceph {} samples (OK): '
603
632
'{}'.format(sample_type, samples))
635
# rabbitmq/amqp specific helpers:
637
def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
638
"""Wait for rmq units extended status to show cluster readiness,
639
after an optional initial sleep period. Initial sleep is likely
640
necessary to be effective following a config change, as status
641
message may not instantly update to non-ready."""
644
time.sleep(init_sleep)
646
message = re.compile('^Unit is ready and clustered$')
647
deployment._auto_wait_for_status(message=message,
649
include_only=['rabbitmq-server'])
651
def add_rmq_test_user(self, sentry_units,
652
username="testuser1", password="changeme"):
653
"""Add a test user via the first rmq juju unit, check connection as
654
the new user against all sentry units.
656
:param sentry_units: list of sentry unit pointers
657
:param username: amqp user name, default to testuser1
658
:param password: amqp user password
659
:returns: None if successful. Raise on error.
661
self.log.debug('Adding rmq user ({})...'.format(username))
663
# Check that user does not already exist
664
cmd_user_list = 'rabbitmqctl list_users'
665
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
666
if username in output:
667
self.log.warning('User ({}) already exists, returning '
668
'gracefully.'.format(username))
671
perms = '".*" ".*" ".*"'
672
cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
673
'rabbitmqctl set_permissions {} {}'.format(username, perms)]
675
# Add user via first unit
677
output, _ = self.run_cmd_unit(sentry_units[0], cmd)
679
# Check connection against the other sentry_units
680
self.log.debug('Checking user connect against units...')
681
for sentry_unit in sentry_units:
682
connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
687
def delete_rmq_test_user(self, sentry_units, username="testuser1"):
688
"""Delete a rabbitmq user via the first rmq juju unit.
690
:param sentry_units: list of sentry unit pointers
691
:param username: amqp user name, default to testuser1
692
:param password: amqp user password
693
:returns: None if successful or no such user.
695
self.log.debug('Deleting rmq user ({})...'.format(username))
697
# Check that the user exists
698
cmd_user_list = 'rabbitmqctl list_users'
699
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
701
if username not in output:
702
self.log.warning('User ({}) does not exist, returning '
703
'gracefully.'.format(username))
707
cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
708
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
710
def get_rmq_cluster_status(self, sentry_unit):
711
"""Execute rabbitmq cluster status command on a unit and return
714
:param unit: sentry unit
715
:returns: String containing console output of cluster status command
717
cmd = 'rabbitmqctl cluster_status'
718
output, _ = self.run_cmd_unit(sentry_unit, cmd)
719
self.log.debug('{} cluster_status:\n{}'.format(
720
sentry_unit.info['unit_name'], output))
723
def get_rmq_cluster_running_nodes(self, sentry_unit):
724
"""Parse rabbitmqctl cluster_status output string, return list of
725
running rabbitmq cluster nodes.
727
:param unit: sentry unit
728
:returns: List containing node names of running nodes
730
# NOTE(beisner): rabbitmqctl cluster_status output is not
731
# json-parsable, do string chop foo, then json.loads that.
732
str_stat = self.get_rmq_cluster_status(sentry_unit)
733
if 'running_nodes' in str_stat:
734
pos_start = str_stat.find("{running_nodes,") + 15
735
pos_end = str_stat.find("]},", pos_start) + 1
736
str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
737
run_nodes = json.loads(str_run_nodes)
742
def validate_rmq_cluster_running_nodes(self, sentry_units):
743
"""Check that all rmq unit hostnames are represented in the
744
cluster_status output of all units.
746
:param host_names: dict of juju unit names to host names
747
:param units: list of sentry unit pointers (all rmq units)
748
:returns: None if successful, otherwise return error message
750
host_names = self.get_unit_hostnames(sentry_units)
753
# Query every unit for cluster_status running nodes
754
for query_unit in sentry_units:
755
query_unit_name = query_unit.info['unit_name']
756
running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
758
# Confirm that every unit is represented in the queried unit's
759
# cluster_status running nodes output.
760
for validate_unit in sentry_units:
761
val_host_name = host_names[validate_unit.info['unit_name']]
762
val_node_name = 'rabbit@{}'.format(val_host_name)
764
if val_node_name not in running_nodes:
765
errors.append('Cluster member check failed on {}: {} not '
766
'in {}\n'.format(query_unit_name,
770
return ''.join(errors)
772
def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
773
"""Check a single juju rmq unit for ssl and port in the config file."""
774
host = sentry_unit.info['public-address']
775
unit_name = sentry_unit.info['unit_name']
777
conf_file = '/etc/rabbitmq/rabbitmq.config'
778
conf_contents = str(self.file_contents_safe(sentry_unit,
779
conf_file, max_wait=16))
781
conf_ssl = 'ssl' in conf_contents
782
conf_port = str(port) in conf_contents
784
# Port explicitly checked in config
785
if port and conf_port and conf_ssl:
786
self.log.debug('SSL is enabled @{}:{} '
787
'({})'.format(host, port, unit_name))
789
elif port and not conf_port and conf_ssl:
790
self.log.debug('SSL is enabled @{} but not on port {} '
791
'({})'.format(host, port, unit_name))
793
# Port not checked (useful when checking that ssl is disabled)
794
elif not port and conf_ssl:
795
self.log.debug('SSL is enabled @{}:{} '
796
'({})'.format(host, port, unit_name))
799
self.log.debug('SSL not enabled @{}:{} '
800
'({})'.format(host, port, unit_name))
803
msg = ('Unknown condition when checking SSL status @{}:{} '
804
'({})'.format(host, port, unit_name))
805
amulet.raise_status(amulet.FAIL, msg)
807
def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
808
"""Check that ssl is enabled on rmq juju sentry units.
810
:param sentry_units: list of all rmq sentry units
811
:param port: optional ssl port override to validate
812
:returns: None if successful, otherwise return error message
814
for sentry_unit in sentry_units:
815
if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
816
return ('Unexpected condition: ssl is disabled on unit '
817
'({})'.format(sentry_unit.info['unit_name']))
820
def validate_rmq_ssl_disabled_units(self, sentry_units):
821
"""Check that ssl is enabled on listed rmq juju sentry units.
823
:param sentry_units: list of all rmq sentry units
824
:returns: True if successful. Raise on error.
826
for sentry_unit in sentry_units:
827
if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
828
return ('Unexpected condition: ssl is enabled on unit '
829
'({})'.format(sentry_unit.info['unit_name']))
832
def configure_rmq_ssl_on(self, sentry_units, deployment,
833
port=None, max_wait=60):
834
"""Turn ssl charm config option on, with optional non-default
835
ssl port specification. Confirm that it is enabled on every
838
:param sentry_units: list of sentry units
839
:param deployment: amulet deployment object pointer
840
:param port: amqp port, use defaults if None
841
:param max_wait: maximum time to wait in seconds to confirm
842
:returns: None if successful. Raise on error.
844
self.log.debug('Setting ssl charm config option: on')
847
config = {'ssl': 'on'}
849
config['ssl_port'] = port
851
deployment.d.configure('rabbitmq-server', config)
853
# Wait for unit status
854
self.rmq_wait_for_cluster(deployment)
858
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
859
while ret and tries < (max_wait / 4):
861
self.log.debug('Attempt {}: {}'.format(tries, ret))
862
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
866
amulet.raise_status(amulet.FAIL, ret)
868
def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
869
"""Turn ssl charm config option off, confirm that it is disabled
872
:param sentry_units: list of sentry units
873
:param deployment: amulet deployment object pointer
874
:param max_wait: maximum time to wait in seconds to confirm
875
:returns: None if successful. Raise on error.
877
self.log.debug('Setting ssl charm config option: off')
880
config = {'ssl': 'off'}
881
deployment.d.configure('rabbitmq-server', config)
883
# Wait for unit status
884
self.rmq_wait_for_cluster(deployment)
888
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
889
while ret and tries < (max_wait / 4):
891
self.log.debug('Attempt {}: {}'.format(tries, ret))
892
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
896
amulet.raise_status(amulet.FAIL, ret)
898
def connect_amqp_by_unit(self, sentry_unit, ssl=False,
899
port=None, fatal=True,
900
username="testuser1", password="changeme"):
901
"""Establish and return a pika amqp connection to the rabbitmq service
902
running on a rmq juju unit.
904
:param sentry_unit: sentry unit pointer
905
:param ssl: boolean, default to False
906
:param port: amqp port, use defaults if None
907
:param fatal: boolean, default to True (raises on connect error)
908
:param username: amqp user name, default to testuser1
909
:param password: amqp user password
910
:returns: pika amqp connection pointer or None if failed and non-fatal
912
host = sentry_unit.info['public-address']
913
unit_name = sentry_unit.info['unit_name']
915
# Default port logic if port is not specified
918
elif not ssl and not port:
921
self.log.debug('Connecting to amqp on {}:{} ({}) as '
922
'{}...'.format(host, port, unit_name, username))
925
credentials = pika.PlainCredentials(username, password)
926
parameters = pika.ConnectionParameters(host=host, port=port,
927
credentials=credentials,
929
connection_attempts=3,
932
connection = pika.BlockingConnection(parameters)
933
assert connection.server_properties['product'] == 'RabbitMQ'
934
self.log.debug('Connect OK')
936
except Exception as e:
937
msg = ('amqp connection failed to {}:{} as '
938
'{} ({})'.format(host, port, username, str(e)))
940
amulet.raise_status(amulet.FAIL, msg)
945
def publish_amqp_message_by_unit(self, sentry_unit, message,
946
queue="test", ssl=False,
947
username="testuser1",
950
"""Publish an amqp message to a rmq juju unit.
952
:param sentry_unit: sentry unit pointer
953
:param message: amqp message string
954
:param queue: message queue, default to test
955
:param username: amqp user name, default to testuser1
956
:param password: amqp user password
957
:param ssl: boolean, default to False
958
:param port: amqp port, use defaults if None
959
:returns: None. Raises exception if publish failed.
961
self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
963
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
968
# NOTE(beisner): extra debug here re: pika hang potential:
969
# https://github.com/pika/pika/issues/297
970
# https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
971
self.log.debug('Defining channel...')
972
channel = connection.channel()
973
self.log.debug('Declaring queue...')
974
channel.queue_declare(queue=queue, auto_delete=False, durable=True)
975
self.log.debug('Publishing message...')
976
channel.basic_publish(exchange='', routing_key=queue, body=message)
977
self.log.debug('Closing channel...')
979
self.log.debug('Closing connection...')
982
def get_amqp_message_by_unit(self, sentry_unit, queue="test",
983
username="testuser1",
985
ssl=False, port=None):
986
"""Get an amqp message from a rmq juju unit.
988
:param sentry_unit: sentry unit pointer
989
:param queue: message queue, default to test
990
:param username: amqp user name, default to testuser1
991
:param password: amqp user password
992
:param ssl: boolean, default to False
993
:param port: amqp port, use defaults if None
994
:returns: amqp message body as string. Raise if get fails.
996
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
1000
channel = connection.channel()
1001
method_frame, _, body = channel.basic_get(queue)
1004
self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
1006
channel.basic_ack(method_frame.delivery_tag)
1011
msg = 'No message retrieved.'
1012
amulet.raise_status(amulet.FAIL, msg)