~openstack-charmers-archive/charms/precise/ceilometer-agent/trunk

« back to all changes in this revision

Viewing changes to tests/charmhelpers/contrib/openstack/amulet/utils.py

  • Committer: James Page
  • Date: 2015-10-22 13:17:28 UTC
  • Revision ID: james.page@ubuntu.com-20151022131728-0030g2iln4rw6tk4
Tags: 15.10
15.10 Charm release

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
import heatclient.v1.client as heat_client
28
28
import keystoneclient.v2_0 as keystone_client
29
29
import novaclient.v1_1.client as nova_client
 
30
import pika
30
31
import swiftclient
31
32
 
32
33
from charmhelpers.contrib.amulet.utils import (
602
603
            self.log.debug('Ceph {} samples (OK): '
603
604
                           '{}'.format(sample_type, samples))
604
605
            return None
 
606
 
 
607
# rabbitmq/amqp specific helpers:
 
608
    def add_rmq_test_user(self, sentry_units,
 
609
                          username="testuser1", password="changeme"):
 
610
        """Add a test user via the first rmq juju unit, check connection as
 
611
        the new user against all sentry units.
 
612
 
 
613
        :param sentry_units: list of sentry unit pointers
 
614
        :param username: amqp user name, default to testuser1
 
615
        :param password: amqp user password
 
616
        :returns: None if successful.  Raise on error.
 
617
        """
 
618
        self.log.debug('Adding rmq user ({})...'.format(username))
 
619
 
 
620
        # Check that user does not already exist
 
621
        cmd_user_list = 'rabbitmqctl list_users'
 
622
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 
623
        if username in output:
 
624
            self.log.warning('User ({}) already exists, returning '
 
625
                             'gracefully.'.format(username))
 
626
            return
 
627
 
 
628
        perms = '".*" ".*" ".*"'
 
629
        cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
 
630
                'rabbitmqctl set_permissions {} {}'.format(username, perms)]
 
631
 
 
632
        # Add user via first unit
 
633
        for cmd in cmds:
 
634
            output, _ = self.run_cmd_unit(sentry_units[0], cmd)
 
635
 
 
636
        # Check connection against the other sentry_units
 
637
        self.log.debug('Checking user connect against units...')
 
638
        for sentry_unit in sentry_units:
 
639
            connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
 
640
                                                   username=username,
 
641
                                                   password=password)
 
642
            connection.close()
 
643
 
 
644
    def delete_rmq_test_user(self, sentry_units, username="testuser1"):
 
645
        """Delete a rabbitmq user via the first rmq juju unit.
 
646
 
 
647
        :param sentry_units: list of sentry unit pointers
 
648
        :param username: amqp user name, default to testuser1
 
649
        :param password: amqp user password
 
650
        :returns: None if successful or no such user.
 
651
        """
 
652
        self.log.debug('Deleting rmq user ({})...'.format(username))
 
653
 
 
654
        # Check that the user exists
 
655
        cmd_user_list = 'rabbitmqctl list_users'
 
656
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 
657
 
 
658
        if username not in output:
 
659
            self.log.warning('User ({}) does not exist, returning '
 
660
                             'gracefully.'.format(username))
 
661
            return
 
662
 
 
663
        # Delete the user
 
664
        cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
 
665
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
 
666
 
 
667
    def get_rmq_cluster_status(self, sentry_unit):
 
668
        """Execute rabbitmq cluster status command on a unit and return
 
669
        the full output.
 
670
 
 
671
        :param unit: sentry unit
 
672
        :returns: String containing console output of cluster status command
 
673
        """
 
674
        cmd = 'rabbitmqctl cluster_status'
 
675
        output, _ = self.run_cmd_unit(sentry_unit, cmd)
 
676
        self.log.debug('{} cluster_status:\n{}'.format(
 
677
            sentry_unit.info['unit_name'], output))
 
678
        return str(output)
 
679
 
 
680
    def get_rmq_cluster_running_nodes(self, sentry_unit):
 
681
        """Parse rabbitmqctl cluster_status output string, return list of
 
682
        running rabbitmq cluster nodes.
 
683
 
 
684
        :param unit: sentry unit
 
685
        :returns: List containing node names of running nodes
 
686
        """
 
687
        # NOTE(beisner): rabbitmqctl cluster_status output is not
 
688
        # json-parsable, do string chop foo, then json.loads that.
 
689
        str_stat = self.get_rmq_cluster_status(sentry_unit)
 
690
        if 'running_nodes' in str_stat:
 
691
            pos_start = str_stat.find("{running_nodes,") + 15
 
692
            pos_end = str_stat.find("]},", pos_start) + 1
 
693
            str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
 
694
            run_nodes = json.loads(str_run_nodes)
 
695
            return run_nodes
 
696
        else:
 
697
            return []
 
698
 
 
699
    def validate_rmq_cluster_running_nodes(self, sentry_units):
 
700
        """Check that all rmq unit hostnames are represented in the
 
701
        cluster_status output of all units.
 
702
 
 
703
        :param host_names: dict of juju unit names to host names
 
704
        :param units: list of sentry unit pointers (all rmq units)
 
705
        :returns: None if successful, otherwise return error message
 
706
        """
 
707
        host_names = self.get_unit_hostnames(sentry_units)
 
708
        errors = []
 
709
 
 
710
        # Query every unit for cluster_status running nodes
 
711
        for query_unit in sentry_units:
 
712
            query_unit_name = query_unit.info['unit_name']
 
713
            running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
 
714
 
 
715
            # Confirm that every unit is represented in the queried unit's
 
716
            # cluster_status running nodes output.
 
717
            for validate_unit in sentry_units:
 
718
                val_host_name = host_names[validate_unit.info['unit_name']]
 
719
                val_node_name = 'rabbit@{}'.format(val_host_name)
 
720
 
 
721
                if val_node_name not in running_nodes:
 
722
                    errors.append('Cluster member check failed on {}: {} not '
 
723
                                  'in {}\n'.format(query_unit_name,
 
724
                                                   val_node_name,
 
725
                                                   running_nodes))
 
726
        if errors:
 
727
            return ''.join(errors)
 
728
 
 
729
    def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
 
730
        """Check a single juju rmq unit for ssl and port in the config file."""
 
731
        host = sentry_unit.info['public-address']
 
732
        unit_name = sentry_unit.info['unit_name']
 
733
 
 
734
        conf_file = '/etc/rabbitmq/rabbitmq.config'
 
735
        conf_contents = str(self.file_contents_safe(sentry_unit,
 
736
                                                    conf_file, max_wait=16))
 
737
        # Checks
 
738
        conf_ssl = 'ssl' in conf_contents
 
739
        conf_port = str(port) in conf_contents
 
740
 
 
741
        # Port explicitly checked in config
 
742
        if port and conf_port and conf_ssl:
 
743
            self.log.debug('SSL is enabled  @{}:{} '
 
744
                           '({})'.format(host, port, unit_name))
 
745
            return True
 
746
        elif port and not conf_port and conf_ssl:
 
747
            self.log.debug('SSL is enabled @{} but not on port {} '
 
748
                           '({})'.format(host, port, unit_name))
 
749
            return False
 
750
        # Port not checked (useful when checking that ssl is disabled)
 
751
        elif not port and conf_ssl:
 
752
            self.log.debug('SSL is enabled  @{}:{} '
 
753
                           '({})'.format(host, port, unit_name))
 
754
            return True
 
755
        elif not conf_ssl:
 
756
            self.log.debug('SSL not enabled @{}:{} '
 
757
                           '({})'.format(host, port, unit_name))
 
758
            return False
 
759
        else:
 
760
            msg = ('Unknown condition when checking SSL status @{}:{} '
 
761
                   '({})'.format(host, port, unit_name))
 
762
            amulet.raise_status(amulet.FAIL, msg)
 
763
 
 
764
    def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
 
765
        """Check that ssl is enabled on rmq juju sentry units.
 
766
 
 
767
        :param sentry_units: list of all rmq sentry units
 
768
        :param port: optional ssl port override to validate
 
769
        :returns: None if successful, otherwise return error message
 
770
        """
 
771
        for sentry_unit in sentry_units:
 
772
            if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
 
773
                return ('Unexpected condition:  ssl is disabled on unit '
 
774
                        '({})'.format(sentry_unit.info['unit_name']))
 
775
        return None
 
776
 
 
777
    def validate_rmq_ssl_disabled_units(self, sentry_units):
 
778
        """Check that ssl is enabled on listed rmq juju sentry units.
 
779
 
 
780
        :param sentry_units: list of all rmq sentry units
 
781
        :returns: True if successful.  Raise on error.
 
782
        """
 
783
        for sentry_unit in sentry_units:
 
784
            if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
 
785
                return ('Unexpected condition:  ssl is enabled on unit '
 
786
                        '({})'.format(sentry_unit.info['unit_name']))
 
787
        return None
 
788
 
 
789
    def configure_rmq_ssl_on(self, sentry_units, deployment,
 
790
                             port=None, max_wait=60):
 
791
        """Turn ssl charm config option on, with optional non-default
 
792
        ssl port specification.  Confirm that it is enabled on every
 
793
        unit.
 
794
 
 
795
        :param sentry_units: list of sentry units
 
796
        :param deployment: amulet deployment object pointer
 
797
        :param port: amqp port, use defaults if None
 
798
        :param max_wait: maximum time to wait in seconds to confirm
 
799
        :returns: None if successful.  Raise on error.
 
800
        """
 
801
        self.log.debug('Setting ssl charm config option:  on')
 
802
 
 
803
        # Enable RMQ SSL
 
804
        config = {'ssl': 'on'}
 
805
        if port:
 
806
            config['ssl_port'] = port
 
807
 
 
808
        deployment.configure('rabbitmq-server', config)
 
809
 
 
810
        # Confirm
 
811
        tries = 0
 
812
        ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 
813
        while ret and tries < (max_wait / 4):
 
814
            time.sleep(4)
 
815
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 
816
            ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 
817
            tries += 1
 
818
 
 
819
        if ret:
 
820
            amulet.raise_status(amulet.FAIL, ret)
 
821
 
 
822
    def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
 
823
        """Turn ssl charm config option off, confirm that it is disabled
 
824
        on every unit.
 
825
 
 
826
        :param sentry_units: list of sentry units
 
827
        :param deployment: amulet deployment object pointer
 
828
        :param max_wait: maximum time to wait in seconds to confirm
 
829
        :returns: None if successful.  Raise on error.
 
830
        """
 
831
        self.log.debug('Setting ssl charm config option:  off')
 
832
 
 
833
        # Disable RMQ SSL
 
834
        config = {'ssl': 'off'}
 
835
        deployment.configure('rabbitmq-server', config)
 
836
 
 
837
        # Confirm
 
838
        tries = 0
 
839
        ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 
840
        while ret and tries < (max_wait / 4):
 
841
            time.sleep(4)
 
842
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 
843
            ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 
844
            tries += 1
 
845
 
 
846
        if ret:
 
847
            amulet.raise_status(amulet.FAIL, ret)
 
848
 
 
849
    def connect_amqp_by_unit(self, sentry_unit, ssl=False,
 
850
                             port=None, fatal=True,
 
851
                             username="testuser1", password="changeme"):
 
852
        """Establish and return a pika amqp connection to the rabbitmq service
 
853
        running on a rmq juju unit.
 
854
 
 
855
        :param sentry_unit: sentry unit pointer
 
856
        :param ssl: boolean, default to False
 
857
        :param port: amqp port, use defaults if None
 
858
        :param fatal: boolean, default to True (raises on connect error)
 
859
        :param username: amqp user name, default to testuser1
 
860
        :param password: amqp user password
 
861
        :returns: pika amqp connection pointer or None if failed and non-fatal
 
862
        """
 
863
        host = sentry_unit.info['public-address']
 
864
        unit_name = sentry_unit.info['unit_name']
 
865
 
 
866
        # Default port logic if port is not specified
 
867
        if ssl and not port:
 
868
            port = 5671
 
869
        elif not ssl and not port:
 
870
            port = 5672
 
871
 
 
872
        self.log.debug('Connecting to amqp on {}:{} ({}) as '
 
873
                       '{}...'.format(host, port, unit_name, username))
 
874
 
 
875
        try:
 
876
            credentials = pika.PlainCredentials(username, password)
 
877
            parameters = pika.ConnectionParameters(host=host, port=port,
 
878
                                                   credentials=credentials,
 
879
                                                   ssl=ssl,
 
880
                                                   connection_attempts=3,
 
881
                                                   retry_delay=5,
 
882
                                                   socket_timeout=1)
 
883
            connection = pika.BlockingConnection(parameters)
 
884
            assert connection.server_properties['product'] == 'RabbitMQ'
 
885
            self.log.debug('Connect OK')
 
886
            return connection
 
887
        except Exception as e:
 
888
            msg = ('amqp connection failed to {}:{} as '
 
889
                   '{} ({})'.format(host, port, username, str(e)))
 
890
            if fatal:
 
891
                amulet.raise_status(amulet.FAIL, msg)
 
892
            else:
 
893
                self.log.warn(msg)
 
894
                return None
 
895
 
 
896
    def publish_amqp_message_by_unit(self, sentry_unit, message,
 
897
                                     queue="test", ssl=False,
 
898
                                     username="testuser1",
 
899
                                     password="changeme",
 
900
                                     port=None):
 
901
        """Publish an amqp message to a rmq juju unit.
 
902
 
 
903
        :param sentry_unit: sentry unit pointer
 
904
        :param message: amqp message string
 
905
        :param queue: message queue, default to test
 
906
        :param username: amqp user name, default to testuser1
 
907
        :param password: amqp user password
 
908
        :param ssl: boolean, default to False
 
909
        :param port: amqp port, use defaults if None
 
910
        :returns: None.  Raises exception if publish failed.
 
911
        """
 
912
        self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
 
913
                                                                    message))
 
914
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 
915
                                               port=port,
 
916
                                               username=username,
 
917
                                               password=password)
 
918
 
 
919
        # NOTE(beisner): extra debug here re: pika hang potential:
 
920
        #   https://github.com/pika/pika/issues/297
 
921
        #   https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
 
922
        self.log.debug('Defining channel...')
 
923
        channel = connection.channel()
 
924
        self.log.debug('Declaring queue...')
 
925
        channel.queue_declare(queue=queue, auto_delete=False, durable=True)
 
926
        self.log.debug('Publishing message...')
 
927
        channel.basic_publish(exchange='', routing_key=queue, body=message)
 
928
        self.log.debug('Closing channel...')
 
929
        channel.close()
 
930
        self.log.debug('Closing connection...')
 
931
        connection.close()
 
932
 
 
933
    def get_amqp_message_by_unit(self, sentry_unit, queue="test",
 
934
                                 username="testuser1",
 
935
                                 password="changeme",
 
936
                                 ssl=False, port=None):
 
937
        """Get an amqp message from a rmq juju unit.
 
938
 
 
939
        :param sentry_unit: sentry unit pointer
 
940
        :param queue: message queue, default to test
 
941
        :param username: amqp user name, default to testuser1
 
942
        :param password: amqp user password
 
943
        :param ssl: boolean, default to False
 
944
        :param port: amqp port, use defaults if None
 
945
        :returns: amqp message body as string.  Raise if get fails.
 
946
        """
 
947
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 
948
                                               port=port,
 
949
                                               username=username,
 
950
                                               password=password)
 
951
        channel = connection.channel()
 
952
        method_frame, _, body = channel.basic_get(queue)
 
953
 
 
954
        if method_frame:
 
955
            self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
 
956
                                                                         body))
 
957
            channel.basic_ack(method_frame.delivery_tag)
 
958
            channel.close()
 
959
            connection.close()
 
960
            return body
 
961
        else:
 
962
            msg = 'No message retrieved.'
 
963
            amulet.raise_status(amulet.FAIL, msg)