602
603
self.log.debug('Ceph {} samples (OK): '
603
604
'{}'.format(sample_type, samples))
607
# rabbitmq/amqp specific helpers:
608
def add_rmq_test_user(self, sentry_units,
609
username="testuser1", password="changeme"):
610
"""Add a test user via the first rmq juju unit, check connection as
611
the new user against all sentry units.
613
:param sentry_units: list of sentry unit pointers
614
:param username: amqp user name, default to testuser1
615
:param password: amqp user password
616
:returns: None if successful. Raise on error.
618
self.log.debug('Adding rmq user ({})...'.format(username))
620
# Check that user does not already exist
621
cmd_user_list = 'rabbitmqctl list_users'
622
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
623
if username in output:
624
self.log.warning('User ({}) already exists, returning '
625
'gracefully.'.format(username))
628
perms = '".*" ".*" ".*"'
629
cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
630
'rabbitmqctl set_permissions {} {}'.format(username, perms)]
632
# Add user via first unit
634
output, _ = self.run_cmd_unit(sentry_units[0], cmd)
636
# Check connection against the other sentry_units
637
self.log.debug('Checking user connect against units...')
638
for sentry_unit in sentry_units:
639
connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
644
def delete_rmq_test_user(self, sentry_units, username="testuser1"):
645
"""Delete a rabbitmq user via the first rmq juju unit.
647
:param sentry_units: list of sentry unit pointers
648
:param username: amqp user name, default to testuser1
649
:param password: amqp user password
650
:returns: None if successful or no such user.
652
self.log.debug('Deleting rmq user ({})...'.format(username))
654
# Check that the user exists
655
cmd_user_list = 'rabbitmqctl list_users'
656
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
658
if username not in output:
659
self.log.warning('User ({}) does not exist, returning '
660
'gracefully.'.format(username))
664
cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
665
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
667
def get_rmq_cluster_status(self, sentry_unit):
668
"""Execute rabbitmq cluster status command on a unit and return
671
:param unit: sentry unit
672
:returns: String containing console output of cluster status command
674
cmd = 'rabbitmqctl cluster_status'
675
output, _ = self.run_cmd_unit(sentry_unit, cmd)
676
self.log.debug('{} cluster_status:\n{}'.format(
677
sentry_unit.info['unit_name'], output))
680
def get_rmq_cluster_running_nodes(self, sentry_unit):
681
"""Parse rabbitmqctl cluster_status output string, return list of
682
running rabbitmq cluster nodes.
684
:param unit: sentry unit
685
:returns: List containing node names of running nodes
687
# NOTE(beisner): rabbitmqctl cluster_status output is not
688
# json-parsable, do string chop foo, then json.loads that.
689
str_stat = self.get_rmq_cluster_status(sentry_unit)
690
if 'running_nodes' in str_stat:
691
pos_start = str_stat.find("{running_nodes,") + 15
692
pos_end = str_stat.find("]},", pos_start) + 1
693
str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
694
run_nodes = json.loads(str_run_nodes)
699
def validate_rmq_cluster_running_nodes(self, sentry_units):
700
"""Check that all rmq unit hostnames are represented in the
701
cluster_status output of all units.
703
:param host_names: dict of juju unit names to host names
704
:param units: list of sentry unit pointers (all rmq units)
705
:returns: None if successful, otherwise return error message
707
host_names = self.get_unit_hostnames(sentry_units)
710
# Query every unit for cluster_status running nodes
711
for query_unit in sentry_units:
712
query_unit_name = query_unit.info['unit_name']
713
running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
715
# Confirm that every unit is represented in the queried unit's
716
# cluster_status running nodes output.
717
for validate_unit in sentry_units:
718
val_host_name = host_names[validate_unit.info['unit_name']]
719
val_node_name = 'rabbit@{}'.format(val_host_name)
721
if val_node_name not in running_nodes:
722
errors.append('Cluster member check failed on {}: {} not '
723
'in {}\n'.format(query_unit_name,
727
return ''.join(errors)
729
def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
730
"""Check a single juju rmq unit for ssl and port in the config file."""
731
host = sentry_unit.info['public-address']
732
unit_name = sentry_unit.info['unit_name']
734
conf_file = '/etc/rabbitmq/rabbitmq.config'
735
conf_contents = str(self.file_contents_safe(sentry_unit,
736
conf_file, max_wait=16))
738
conf_ssl = 'ssl' in conf_contents
739
conf_port = str(port) in conf_contents
741
# Port explicitly checked in config
742
if port and conf_port and conf_ssl:
743
self.log.debug('SSL is enabled @{}:{} '
744
'({})'.format(host, port, unit_name))
746
elif port and not conf_port and conf_ssl:
747
self.log.debug('SSL is enabled @{} but not on port {} '
748
'({})'.format(host, port, unit_name))
750
# Port not checked (useful when checking that ssl is disabled)
751
elif not port and conf_ssl:
752
self.log.debug('SSL is enabled @{}:{} '
753
'({})'.format(host, port, unit_name))
755
elif not port and not conf_ssl:
756
self.log.debug('SSL not enabled @{}:{} '
757
'({})'.format(host, port, unit_name))
760
msg = ('Unknown condition when checking SSL status @{}:{} '
761
'({})'.format(host, port, unit_name))
762
amulet.raise_status(amulet.FAIL, msg)
764
def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
765
"""Check that ssl is enabled on rmq juju sentry units.
767
:param sentry_units: list of all rmq sentry units
768
:param port: optional ssl port override to validate
769
:returns: None if successful, otherwise return error message
771
for sentry_unit in sentry_units:
772
if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
773
return ('Unexpected condition: ssl is disabled on unit '
774
'({})'.format(sentry_unit.info['unit_name']))
777
def validate_rmq_ssl_disabled_units(self, sentry_units):
778
"""Check that ssl is enabled on listed rmq juju sentry units.
780
:param sentry_units: list of all rmq sentry units
781
:returns: True if successful. Raise on error.
783
for sentry_unit in sentry_units:
784
if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
785
return ('Unexpected condition: ssl is enabled on unit '
786
'({})'.format(sentry_unit.info['unit_name']))
789
def configure_rmq_ssl_on(self, sentry_units, deployment,
790
port=None, max_wait=60):
791
"""Turn ssl charm config option on, with optional non-default
792
ssl port specification. Confirm that it is enabled on every
795
:param sentry_units: list of sentry units
796
:param deployment: amulet deployment object pointer
797
:param port: amqp port, use defaults if None
798
:param max_wait: maximum time to wait in seconds to confirm
799
:returns: None if successful. Raise on error.
801
self.log.debug('Setting ssl charm config option: on')
804
config = {'ssl': 'on'}
806
config['ssl_port'] = port
808
deployment.configure('rabbitmq-server', config)
812
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
813
while ret and tries < (max_wait / 4):
815
self.log.debug('Attempt {}: {}'.format(tries, ret))
816
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
820
amulet.raise_status(amulet.FAIL, ret)
822
def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
823
"""Turn ssl charm config option off, confirm that it is disabled
826
:param sentry_units: list of sentry units
827
:param deployment: amulet deployment object pointer
828
:param max_wait: maximum time to wait in seconds to confirm
829
:returns: None if successful. Raise on error.
831
self.log.debug('Setting ssl charm config option: off')
834
config = {'ssl': 'off'}
835
deployment.configure('rabbitmq-server', config)
839
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
840
while ret and tries < (max_wait / 4):
842
self.log.debug('Attempt {}: {}'.format(tries, ret))
843
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
847
amulet.raise_status(amulet.FAIL, ret)
849
def connect_amqp_by_unit(self, sentry_unit, ssl=False,
850
port=None, fatal=True,
851
username="testuser1", password="changeme"):
852
"""Establish and return a pika amqp connection to the rabbitmq service
853
running on a rmq juju unit.
855
:param sentry_unit: sentry unit pointer
856
:param ssl: boolean, default to False
857
:param port: amqp port, use defaults if None
858
:param fatal: boolean, default to True (raises on connect error)
859
:param username: amqp user name, default to testuser1
860
:param password: amqp user password
861
:returns: pika amqp connection pointer or None if failed and non-fatal
863
host = sentry_unit.info['public-address']
864
unit_name = sentry_unit.info['unit_name']
866
# Default port logic if port is not specified
869
elif not ssl and not port:
872
self.log.debug('Connecting to amqp on {}:{} ({}) as '
873
'{}...'.format(host, port, unit_name, username))
876
credentials = pika.PlainCredentials(username, password)
877
parameters = pika.ConnectionParameters(host=host, port=port,
878
credentials=credentials,
880
connection_attempts=3,
883
connection = pika.BlockingConnection(parameters)
884
assert connection.server_properties['product'] == 'RabbitMQ'
885
self.log.debug('Connect OK')
887
except Exception as e:
888
msg = ('amqp connection failed to {}:{} as '
889
'{} ({})'.format(host, port, username, str(e)))
891
amulet.raise_status(amulet.FAIL, msg)
896
def publish_amqp_message_by_unit(self, sentry_unit, message,
897
queue="test", ssl=False,
898
username="testuser1",
901
"""Publish an amqp message to a rmq juju unit.
903
:param sentry_unit: sentry unit pointer
904
:param message: amqp message string
905
:param queue: message queue, default to test
906
:param username: amqp user name, default to testuser1
907
:param password: amqp user password
908
:param ssl: boolean, default to False
909
:param port: amqp port, use defaults if None
910
:returns: None. Raises exception if publish failed.
912
self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
914
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
919
# NOTE(beisner): extra debug here re: pika hang potential:
920
# https://github.com/pika/pika/issues/297
921
# https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
922
self.log.debug('Defining channel...')
923
channel = connection.channel()
924
self.log.debug('Declaring queue...')
925
channel.queue_declare(queue=queue, auto_delete=False, durable=True)
926
self.log.debug('Publishing message...')
927
channel.basic_publish(exchange='', routing_key=queue, body=message)
928
self.log.debug('Closing channel...')
930
self.log.debug('Closing connection...')
933
def get_amqp_message_by_unit(self, sentry_unit, queue="test",
934
username="testuser1",
936
ssl=False, port=None):
937
"""Get an amqp message from a rmq juju unit.
939
:param sentry_unit: sentry unit pointer
940
:param queue: message queue, default to test
941
:param username: amqp user name, default to testuser1
942
:param password: amqp user password
943
:param ssl: boolean, default to False
944
:param port: amqp port, use defaults if None
945
:returns: amqp message body as string. Raise if get fails.
947
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
951
channel = connection.channel()
952
method_frame, _, body = channel.basic_get(queue)
955
self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
957
channel.basic_ack(method_frame.delivery_tag)
962
msg = 'No message retrieved.'
963
amulet.raise_status(amulet.FAIL, msg)