~hopem/charms/trusty/swift-proxy/self-healing-cluster-sync

« back to all changes in this revision

Viewing changes to tests/charmhelpers/contrib/openstack/amulet/utils.py

  • Committer: Edward Hope-Morley
  • Date: 2016-01-12 12:12:12 UTC
  • mfrom: (95.1.38 swift-proxy)
  • Revision ID: edward.hope-morley@canonical.com-20160112121212-ws31fznkuo9t4dpw
synced /next

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import json
19
19
import logging
20
20
import os
 
21
import re
21
22
import six
22
23
import time
23
24
import urllib
27
28
import heatclient.v1.client as heat_client
28
29
import keystoneclient.v2_0 as keystone_client
29
30
import novaclient.v1_1.client as nova_client
 
31
import pika
30
32
import swiftclient
31
33
 
32
34
from charmhelpers.contrib.amulet.utils import (
602
604
            self.log.debug('Ceph {} samples (OK): '
603
605
                           '{}'.format(sample_type, samples))
604
606
            return None
 
607
 
 
608
    # rabbitmq/amqp specific helpers:
 
609
 
 
610
    def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
 
611
        """Wait for rmq units extended status to show cluster readiness,
 
612
        after an optional initial sleep period.  Initial sleep is likely
 
613
        necessary to be effective following a config change, as status
 
614
        message may not instantly update to non-ready."""
 
615
 
 
616
        if init_sleep:
 
617
            time.sleep(init_sleep)
 
618
 
 
619
        message = re.compile('^Unit is ready and clustered$')
 
620
        deployment._auto_wait_for_status(message=message,
 
621
                                         timeout=timeout,
 
622
                                         include_only=['rabbitmq-server'])
 
623
 
 
624
    def add_rmq_test_user(self, sentry_units,
 
625
                          username="testuser1", password="changeme"):
 
626
        """Add a test user via the first rmq juju unit, check connection as
 
627
        the new user against all sentry units.
 
628
 
 
629
        :param sentry_units: list of sentry unit pointers
 
630
        :param username: amqp user name, default to testuser1
 
631
        :param password: amqp user password
 
632
        :returns: None if successful.  Raise on error.
 
633
        """
 
634
        self.log.debug('Adding rmq user ({})...'.format(username))
 
635
 
 
636
        # Check that user does not already exist
 
637
        cmd_user_list = 'rabbitmqctl list_users'
 
638
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 
639
        if username in output:
 
640
            self.log.warning('User ({}) already exists, returning '
 
641
                             'gracefully.'.format(username))
 
642
            return
 
643
 
 
644
        perms = '".*" ".*" ".*"'
 
645
        cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
 
646
                'rabbitmqctl set_permissions {} {}'.format(username, perms)]
 
647
 
 
648
        # Add user via first unit
 
649
        for cmd in cmds:
 
650
            output, _ = self.run_cmd_unit(sentry_units[0], cmd)
 
651
 
 
652
        # Check connection against the other sentry_units
 
653
        self.log.debug('Checking user connect against units...')
 
654
        for sentry_unit in sentry_units:
 
655
            connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
 
656
                                                   username=username,
 
657
                                                   password=password)
 
658
            connection.close()
 
659
 
 
660
    def delete_rmq_test_user(self, sentry_units, username="testuser1"):
 
661
        """Delete a rabbitmq user via the first rmq juju unit.
 
662
 
 
663
        :param sentry_units: list of sentry unit pointers
 
664
        :param username: amqp user name, default to testuser1
 
665
        :param password: amqp user password
 
666
        :returns: None if successful or no such user.
 
667
        """
 
668
        self.log.debug('Deleting rmq user ({})...'.format(username))
 
669
 
 
670
        # Check that the user exists
 
671
        cmd_user_list = 'rabbitmqctl list_users'
 
672
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 
673
 
 
674
        if username not in output:
 
675
            self.log.warning('User ({}) does not exist, returning '
 
676
                             'gracefully.'.format(username))
 
677
            return
 
678
 
 
679
        # Delete the user
 
680
        cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
 
681
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
 
682
 
 
683
    def get_rmq_cluster_status(self, sentry_unit):
 
684
        """Execute rabbitmq cluster status command on a unit and return
 
685
        the full output.
 
686
 
 
687
        :param unit: sentry unit
 
688
        :returns: String containing console output of cluster status command
 
689
        """
 
690
        cmd = 'rabbitmqctl cluster_status'
 
691
        output, _ = self.run_cmd_unit(sentry_unit, cmd)
 
692
        self.log.debug('{} cluster_status:\n{}'.format(
 
693
            sentry_unit.info['unit_name'], output))
 
694
        return str(output)
 
695
 
 
696
    def get_rmq_cluster_running_nodes(self, sentry_unit):
 
697
        """Parse rabbitmqctl cluster_status output string, return list of
 
698
        running rabbitmq cluster nodes.
 
699
 
 
700
        :param unit: sentry unit
 
701
        :returns: List containing node names of running nodes
 
702
        """
 
703
        # NOTE(beisner): rabbitmqctl cluster_status output is not
 
704
        # json-parsable, do string chop foo, then json.loads that.
 
705
        str_stat = self.get_rmq_cluster_status(sentry_unit)
 
706
        if 'running_nodes' in str_stat:
 
707
            pos_start = str_stat.find("{running_nodes,") + 15
 
708
            pos_end = str_stat.find("]},", pos_start) + 1
 
709
            str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
 
710
            run_nodes = json.loads(str_run_nodes)
 
711
            return run_nodes
 
712
        else:
 
713
            return []
 
714
 
 
715
    def validate_rmq_cluster_running_nodes(self, sentry_units):
 
716
        """Check that all rmq unit hostnames are represented in the
 
717
        cluster_status output of all units.
 
718
 
 
719
        :param host_names: dict of juju unit names to host names
 
720
        :param units: list of sentry unit pointers (all rmq units)
 
721
        :returns: None if successful, otherwise return error message
 
722
        """
 
723
        host_names = self.get_unit_hostnames(sentry_units)
 
724
        errors = []
 
725
 
 
726
        # Query every unit for cluster_status running nodes
 
727
        for query_unit in sentry_units:
 
728
            query_unit_name = query_unit.info['unit_name']
 
729
            running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
 
730
 
 
731
            # Confirm that every unit is represented in the queried unit's
 
732
            # cluster_status running nodes output.
 
733
            for validate_unit in sentry_units:
 
734
                val_host_name = host_names[validate_unit.info['unit_name']]
 
735
                val_node_name = 'rabbit@{}'.format(val_host_name)
 
736
 
 
737
                if val_node_name not in running_nodes:
 
738
                    errors.append('Cluster member check failed on {}: {} not '
 
739
                                  'in {}\n'.format(query_unit_name,
 
740
                                                   val_node_name,
 
741
                                                   running_nodes))
 
742
        if errors:
 
743
            return ''.join(errors)
 
744
 
 
745
    def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
 
746
        """Check a single juju rmq unit for ssl and port in the config file."""
 
747
        host = sentry_unit.info['public-address']
 
748
        unit_name = sentry_unit.info['unit_name']
 
749
 
 
750
        conf_file = '/etc/rabbitmq/rabbitmq.config'
 
751
        conf_contents = str(self.file_contents_safe(sentry_unit,
 
752
                                                    conf_file, max_wait=16))
 
753
        # Checks
 
754
        conf_ssl = 'ssl' in conf_contents
 
755
        conf_port = str(port) in conf_contents
 
756
 
 
757
        # Port explicitly checked in config
 
758
        if port and conf_port and conf_ssl:
 
759
            self.log.debug('SSL is enabled  @{}:{} '
 
760
                           '({})'.format(host, port, unit_name))
 
761
            return True
 
762
        elif port and not conf_port and conf_ssl:
 
763
            self.log.debug('SSL is enabled @{} but not on port {} '
 
764
                           '({})'.format(host, port, unit_name))
 
765
            return False
 
766
        # Port not checked (useful when checking that ssl is disabled)
 
767
        elif not port and conf_ssl:
 
768
            self.log.debug('SSL is enabled  @{}:{} '
 
769
                           '({})'.format(host, port, unit_name))
 
770
            return True
 
771
        elif not conf_ssl:
 
772
            self.log.debug('SSL not enabled @{}:{} '
 
773
                           '({})'.format(host, port, unit_name))
 
774
            return False
 
775
        else:
 
776
            msg = ('Unknown condition when checking SSL status @{}:{} '
 
777
                   '({})'.format(host, port, unit_name))
 
778
            amulet.raise_status(amulet.FAIL, msg)
 
779
 
 
780
    def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
 
781
        """Check that ssl is enabled on rmq juju sentry units.
 
782
 
 
783
        :param sentry_units: list of all rmq sentry units
 
784
        :param port: optional ssl port override to validate
 
785
        :returns: None if successful, otherwise return error message
 
786
        """
 
787
        for sentry_unit in sentry_units:
 
788
            if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
 
789
                return ('Unexpected condition:  ssl is disabled on unit '
 
790
                        '({})'.format(sentry_unit.info['unit_name']))
 
791
        return None
 
792
 
 
793
    def validate_rmq_ssl_disabled_units(self, sentry_units):
 
794
        """Check that ssl is enabled on listed rmq juju sentry units.
 
795
 
 
796
        :param sentry_units: list of all rmq sentry units
 
797
        :returns: True if successful.  Raise on error.
 
798
        """
 
799
        for sentry_unit in sentry_units:
 
800
            if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
 
801
                return ('Unexpected condition:  ssl is enabled on unit '
 
802
                        '({})'.format(sentry_unit.info['unit_name']))
 
803
        return None
 
804
 
 
805
    def configure_rmq_ssl_on(self, sentry_units, deployment,
 
806
                             port=None, max_wait=60):
 
807
        """Turn ssl charm config option on, with optional non-default
 
808
        ssl port specification.  Confirm that it is enabled on every
 
809
        unit.
 
810
 
 
811
        :param sentry_units: list of sentry units
 
812
        :param deployment: amulet deployment object pointer
 
813
        :param port: amqp port, use defaults if None
 
814
        :param max_wait: maximum time to wait in seconds to confirm
 
815
        :returns: None if successful.  Raise on error.
 
816
        """
 
817
        self.log.debug('Setting ssl charm config option:  on')
 
818
 
 
819
        # Enable RMQ SSL
 
820
        config = {'ssl': 'on'}
 
821
        if port:
 
822
            config['ssl_port'] = port
 
823
 
 
824
        deployment.d.configure('rabbitmq-server', config)
 
825
 
 
826
        # Wait for unit status
 
827
        self.rmq_wait_for_cluster(deployment)
 
828
 
 
829
        # Confirm
 
830
        tries = 0
 
831
        ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 
832
        while ret and tries < (max_wait / 4):
 
833
            time.sleep(4)
 
834
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 
835
            ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 
836
            tries += 1
 
837
 
 
838
        if ret:
 
839
            amulet.raise_status(amulet.FAIL, ret)
 
840
 
 
841
    def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
 
842
        """Turn ssl charm config option off, confirm that it is disabled
 
843
        on every unit.
 
844
 
 
845
        :param sentry_units: list of sentry units
 
846
        :param deployment: amulet deployment object pointer
 
847
        :param max_wait: maximum time to wait in seconds to confirm
 
848
        :returns: None if successful.  Raise on error.
 
849
        """
 
850
        self.log.debug('Setting ssl charm config option:  off')
 
851
 
 
852
        # Disable RMQ SSL
 
853
        config = {'ssl': 'off'}
 
854
        deployment.d.configure('rabbitmq-server', config)
 
855
 
 
856
        # Wait for unit status
 
857
        self.rmq_wait_for_cluster(deployment)
 
858
 
 
859
        # Confirm
 
860
        tries = 0
 
861
        ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 
862
        while ret and tries < (max_wait / 4):
 
863
            time.sleep(4)
 
864
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 
865
            ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 
866
            tries += 1
 
867
 
 
868
        if ret:
 
869
            amulet.raise_status(amulet.FAIL, ret)
 
870
 
 
871
    def connect_amqp_by_unit(self, sentry_unit, ssl=False,
 
872
                             port=None, fatal=True,
 
873
                             username="testuser1", password="changeme"):
 
874
        """Establish and return a pika amqp connection to the rabbitmq service
 
875
        running on a rmq juju unit.
 
876
 
 
877
        :param sentry_unit: sentry unit pointer
 
878
        :param ssl: boolean, default to False
 
879
        :param port: amqp port, use defaults if None
 
880
        :param fatal: boolean, default to True (raises on connect error)
 
881
        :param username: amqp user name, default to testuser1
 
882
        :param password: amqp user password
 
883
        :returns: pika amqp connection pointer or None if failed and non-fatal
 
884
        """
 
885
        host = sentry_unit.info['public-address']
 
886
        unit_name = sentry_unit.info['unit_name']
 
887
 
 
888
        # Default port logic if port is not specified
 
889
        if ssl and not port:
 
890
            port = 5671
 
891
        elif not ssl and not port:
 
892
            port = 5672
 
893
 
 
894
        self.log.debug('Connecting to amqp on {}:{} ({}) as '
 
895
                       '{}...'.format(host, port, unit_name, username))
 
896
 
 
897
        try:
 
898
            credentials = pika.PlainCredentials(username, password)
 
899
            parameters = pika.ConnectionParameters(host=host, port=port,
 
900
                                                   credentials=credentials,
 
901
                                                   ssl=ssl,
 
902
                                                   connection_attempts=3,
 
903
                                                   retry_delay=5,
 
904
                                                   socket_timeout=1)
 
905
            connection = pika.BlockingConnection(parameters)
 
906
            assert connection.server_properties['product'] == 'RabbitMQ'
 
907
            self.log.debug('Connect OK')
 
908
            return connection
 
909
        except Exception as e:
 
910
            msg = ('amqp connection failed to {}:{} as '
 
911
                   '{} ({})'.format(host, port, username, str(e)))
 
912
            if fatal:
 
913
                amulet.raise_status(amulet.FAIL, msg)
 
914
            else:
 
915
                self.log.warn(msg)
 
916
                return None
 
917
 
 
918
    def publish_amqp_message_by_unit(self, sentry_unit, message,
 
919
                                     queue="test", ssl=False,
 
920
                                     username="testuser1",
 
921
                                     password="changeme",
 
922
                                     port=None):
 
923
        """Publish an amqp message to a rmq juju unit.
 
924
 
 
925
        :param sentry_unit: sentry unit pointer
 
926
        :param message: amqp message string
 
927
        :param queue: message queue, default to test
 
928
        :param username: amqp user name, default to testuser1
 
929
        :param password: amqp user password
 
930
        :param ssl: boolean, default to False
 
931
        :param port: amqp port, use defaults if None
 
932
        :returns: None.  Raises exception if publish failed.
 
933
        """
 
934
        self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
 
935
                                                                    message))
 
936
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 
937
                                               port=port,
 
938
                                               username=username,
 
939
                                               password=password)
 
940
 
 
941
        # NOTE(beisner): extra debug here re: pika hang potential:
 
942
        #   https://github.com/pika/pika/issues/297
 
943
        #   https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
 
944
        self.log.debug('Defining channel...')
 
945
        channel = connection.channel()
 
946
        self.log.debug('Declaring queue...')
 
947
        channel.queue_declare(queue=queue, auto_delete=False, durable=True)
 
948
        self.log.debug('Publishing message...')
 
949
        channel.basic_publish(exchange='', routing_key=queue, body=message)
 
950
        self.log.debug('Closing channel...')
 
951
        channel.close()
 
952
        self.log.debug('Closing connection...')
 
953
        connection.close()
 
954
 
 
955
    def get_amqp_message_by_unit(self, sentry_unit, queue="test",
 
956
                                 username="testuser1",
 
957
                                 password="changeme",
 
958
                                 ssl=False, port=None):
 
959
        """Get an amqp message from a rmq juju unit.
 
960
 
 
961
        :param sentry_unit: sentry unit pointer
 
962
        :param queue: message queue, default to test
 
963
        :param username: amqp user name, default to testuser1
 
964
        :param password: amqp user password
 
965
        :param ssl: boolean, default to False
 
966
        :param port: amqp port, use defaults if None
 
967
        :returns: amqp message body as string.  Raise if get fails.
 
968
        """
 
969
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 
970
                                               port=port,
 
971
                                               username=username,
 
972
                                               password=password)
 
973
        channel = connection.channel()
 
974
        method_frame, _, body = channel.basic_get(queue)
 
975
 
 
976
        if method_frame:
 
977
            self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
 
978
                                                                         body))
 
979
            channel.basic_ack(method_frame.delivery_tag)
 
980
            channel.close()
 
981
            connection.close()
 
982
            return body
 
983
        else:
 
984
            msg = 'No message retrieved.'
 
985
            amulet.raise_status(amulet.FAIL, msg)