~junaidali/charms/trusty/plumgrid-gateway/pg-restart

« back to all changes in this revision

Viewing changes to hooks/charmhelpers/contrib/openstack/amulet/utils.py

  • Committer: bbaqar at plumgrid
  • Date: 2016-04-25 09:21:09 UTC
  • mfrom: (26.1.2 plumgrid-gateway)
  • Revision ID: bbaqar@plumgrid.com-20160425092109-kweey25bx97pmj80
Merge: Liberty/Mitaka support

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import json
19
19
import logging
20
20
import os
 
21
import re
21
22
import six
22
23
import time
23
24
import urllib
26
27
import glanceclient.v1.client as glance_client
27
28
import heatclient.v1.client as heat_client
28
29
import keystoneclient.v2_0 as keystone_client
29
 
import novaclient.v1_1.client as nova_client
 
30
from keystoneclient.auth.identity import v3 as keystone_id_v3
 
31
from keystoneclient import session as keystone_session
 
32
from keystoneclient.v3 import client as keystone_client_v3
 
33
 
 
34
import novaclient.client as nova_client
 
35
import pika
30
36
import swiftclient
31
37
 
32
38
from charmhelpers.contrib.amulet.utils import (
36
42
DEBUG = logging.DEBUG
37
43
ERROR = logging.ERROR
38
44
 
 
45
NOVA_CLIENT_VERSION = "2"
 
46
 
39
47
 
40
48
class OpenStackAmuletUtils(AmuletUtils):
41
49
    """OpenStack amulet utilities.
137
145
                return "role {} does not exist".format(e['name'])
138
146
        return ret
139
147
 
140
 
    def validate_user_data(self, expected, actual):
 
148
    def validate_user_data(self, expected, actual, api_version=None):
141
149
        """Validate user data.
142
150
 
143
151
           Validate a list of actual user data vs a list of expected user
148
156
        for e in expected:
149
157
            found = False
150
158
            for act in actual:
151
 
                a = {'enabled': act.enabled, 'name': act.name,
152
 
                     'email': act.email, 'tenantId': act.tenantId,
153
 
                     'id': act.id}
154
 
                if e['name'] == a['name']:
 
159
                if e['name'] == act.name:
 
160
                    a = {'enabled': act.enabled, 'name': act.name,
 
161
                         'email': act.email, 'id': act.id}
 
162
                    if api_version == 3:
 
163
                        a['default_project_id'] = getattr(act,
 
164
                                                          'default_project_id',
 
165
                                                          'none')
 
166
                    else:
 
167
                        a['tenantId'] = act.tenantId
155
168
                    found = True
156
169
                    ret = self._validate_dict_data(e, a)
157
170
                    if ret:
186
199
        return cinder_client.Client(username, password, tenant, ept)
187
200
 
188
201
    def authenticate_keystone_admin(self, keystone_sentry, user, password,
189
 
                                    tenant):
 
202
                                    tenant=None, api_version=None,
 
203
                                    keystone_ip=None):
190
204
        """Authenticates admin user with the keystone admin endpoint."""
191
205
        self.log.debug('Authenticating keystone admin...')
192
206
        unit = keystone_sentry
193
 
        service_ip = unit.relation('shared-db',
194
 
                                   'mysql:shared-db')['private-address']
195
 
        ep = "http://{}:35357/v2.0".format(service_ip.strip().decode('utf-8'))
196
 
        return keystone_client.Client(username=user, password=password,
197
 
                                      tenant_name=tenant, auth_url=ep)
 
207
        if not keystone_ip:
 
208
            keystone_ip = unit.relation('shared-db',
 
209
                                        'mysql:shared-db')['private-address']
 
210
        base_ep = "http://{}:35357".format(keystone_ip.strip().decode('utf-8'))
 
211
        if not api_version or api_version == 2:
 
212
            ep = base_ep + "/v2.0"
 
213
            return keystone_client.Client(username=user, password=password,
 
214
                                          tenant_name=tenant, auth_url=ep)
 
215
        else:
 
216
            ep = base_ep + "/v3"
 
217
            auth = keystone_id_v3.Password(
 
218
                user_domain_name='admin_domain',
 
219
                username=user,
 
220
                password=password,
 
221
                domain_name='admin_domain',
 
222
                auth_url=ep,
 
223
            )
 
224
            sess = keystone_session.Session(auth=auth)
 
225
            return keystone_client_v3.Client(session=sess)
198
226
 
199
227
    def authenticate_keystone_user(self, keystone, user, password, tenant):
200
228
        """Authenticates a regular user with the keystone public endpoint."""
223
251
        self.log.debug('Authenticating nova user ({})...'.format(user))
224
252
        ep = keystone.service_catalog.url_for(service_type='identity',
225
253
                                              endpoint_type='publicURL')
226
 
        return nova_client.Client(username=user, api_key=password,
 
254
        return nova_client.Client(NOVA_CLIENT_VERSION,
 
255
                                  username=user, api_key=password,
227
256
                                  project_id=tenant, auth_url=ep)
228
257
 
229
258
    def authenticate_swift_user(self, keystone, user, password, tenant):
602
631
            self.log.debug('Ceph {} samples (OK): '
603
632
                           '{}'.format(sample_type, samples))
604
633
            return None
 
634
 
 
635
    # rabbitmq/amqp specific helpers:
 
636
 
 
637
    def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
 
638
        """Wait for rmq units extended status to show cluster readiness,
 
639
        after an optional initial sleep period.  Initial sleep is likely
 
640
        necessary to be effective following a config change, as status
 
641
        message may not instantly update to non-ready."""
 
642
 
 
643
        if init_sleep:
 
644
            time.sleep(init_sleep)
 
645
 
 
646
        message = re.compile('^Unit is ready and clustered$')
 
647
        deployment._auto_wait_for_status(message=message,
 
648
                                         timeout=timeout,
 
649
                                         include_only=['rabbitmq-server'])
 
650
 
 
651
    def add_rmq_test_user(self, sentry_units,
 
652
                          username="testuser1", password="changeme"):
 
653
        """Add a test user via the first rmq juju unit, check connection as
 
654
        the new user against all sentry units.
 
655
 
 
656
        :param sentry_units: list of sentry unit pointers
 
657
        :param username: amqp user name, default to testuser1
 
658
        :param password: amqp user password
 
659
        :returns: None if successful.  Raise on error.
 
660
        """
 
661
        self.log.debug('Adding rmq user ({})...'.format(username))
 
662
 
 
663
        # Check that user does not already exist
 
664
        cmd_user_list = 'rabbitmqctl list_users'
 
665
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 
666
        if username in output:
 
667
            self.log.warning('User ({}) already exists, returning '
 
668
                             'gracefully.'.format(username))
 
669
            return
 
670
 
 
671
        perms = '".*" ".*" ".*"'
 
672
        cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
 
673
                'rabbitmqctl set_permissions {} {}'.format(username, perms)]
 
674
 
 
675
        # Add user via first unit
 
676
        for cmd in cmds:
 
677
            output, _ = self.run_cmd_unit(sentry_units[0], cmd)
 
678
 
 
679
        # Check connection against the other sentry_units
 
680
        self.log.debug('Checking user connect against units...')
 
681
        for sentry_unit in sentry_units:
 
682
            connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
 
683
                                                   username=username,
 
684
                                                   password=password)
 
685
            connection.close()
 
686
 
 
687
    def delete_rmq_test_user(self, sentry_units, username="testuser1"):
 
688
        """Delete a rabbitmq user via the first rmq juju unit.
 
689
 
 
690
        :param sentry_units: list of sentry unit pointers
 
691
        :param username: amqp user name, default to testuser1
 
692
        :param password: amqp user password
 
693
        :returns: None if successful or no such user.
 
694
        """
 
695
        self.log.debug('Deleting rmq user ({})...'.format(username))
 
696
 
 
697
        # Check that the user exists
 
698
        cmd_user_list = 'rabbitmqctl list_users'
 
699
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 
700
 
 
701
        if username not in output:
 
702
            self.log.warning('User ({}) does not exist, returning '
 
703
                             'gracefully.'.format(username))
 
704
            return
 
705
 
 
706
        # Delete the user
 
707
        cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
 
708
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
 
709
 
 
710
    def get_rmq_cluster_status(self, sentry_unit):
 
711
        """Execute rabbitmq cluster status command on a unit and return
 
712
        the full output.
 
713
 
 
714
        :param unit: sentry unit
 
715
        :returns: String containing console output of cluster status command
 
716
        """
 
717
        cmd = 'rabbitmqctl cluster_status'
 
718
        output, _ = self.run_cmd_unit(sentry_unit, cmd)
 
719
        self.log.debug('{} cluster_status:\n{}'.format(
 
720
            sentry_unit.info['unit_name'], output))
 
721
        return str(output)
 
722
 
 
723
    def get_rmq_cluster_running_nodes(self, sentry_unit):
 
724
        """Parse rabbitmqctl cluster_status output string, return list of
 
725
        running rabbitmq cluster nodes.
 
726
 
 
727
        :param unit: sentry unit
 
728
        :returns: List containing node names of running nodes
 
729
        """
 
730
        # NOTE(beisner): rabbitmqctl cluster_status output is not
 
731
        # json-parsable, do string chop foo, then json.loads that.
 
732
        str_stat = self.get_rmq_cluster_status(sentry_unit)
 
733
        if 'running_nodes' in str_stat:
 
734
            pos_start = str_stat.find("{running_nodes,") + 15
 
735
            pos_end = str_stat.find("]},", pos_start) + 1
 
736
            str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
 
737
            run_nodes = json.loads(str_run_nodes)
 
738
            return run_nodes
 
739
        else:
 
740
            return []
 
741
 
 
742
    def validate_rmq_cluster_running_nodes(self, sentry_units):
 
743
        """Check that all rmq unit hostnames are represented in the
 
744
        cluster_status output of all units.
 
745
 
 
746
        :param host_names: dict of juju unit names to host names
 
747
        :param units: list of sentry unit pointers (all rmq units)
 
748
        :returns: None if successful, otherwise return error message
 
749
        """
 
750
        host_names = self.get_unit_hostnames(sentry_units)
 
751
        errors = []
 
752
 
 
753
        # Query every unit for cluster_status running nodes
 
754
        for query_unit in sentry_units:
 
755
            query_unit_name = query_unit.info['unit_name']
 
756
            running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
 
757
 
 
758
            # Confirm that every unit is represented in the queried unit's
 
759
            # cluster_status running nodes output.
 
760
            for validate_unit in sentry_units:
 
761
                val_host_name = host_names[validate_unit.info['unit_name']]
 
762
                val_node_name = 'rabbit@{}'.format(val_host_name)
 
763
 
 
764
                if val_node_name not in running_nodes:
 
765
                    errors.append('Cluster member check failed on {}: {} not '
 
766
                                  'in {}\n'.format(query_unit_name,
 
767
                                                   val_node_name,
 
768
                                                   running_nodes))
 
769
        if errors:
 
770
            return ''.join(errors)
 
771
 
 
772
    def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
 
773
        """Check a single juju rmq unit for ssl and port in the config file."""
 
774
        host = sentry_unit.info['public-address']
 
775
        unit_name = sentry_unit.info['unit_name']
 
776
 
 
777
        conf_file = '/etc/rabbitmq/rabbitmq.config'
 
778
        conf_contents = str(self.file_contents_safe(sentry_unit,
 
779
                                                    conf_file, max_wait=16))
 
780
        # Checks
 
781
        conf_ssl = 'ssl' in conf_contents
 
782
        conf_port = str(port) in conf_contents
 
783
 
 
784
        # Port explicitly checked in config
 
785
        if port and conf_port and conf_ssl:
 
786
            self.log.debug('SSL is enabled  @{}:{} '
 
787
                           '({})'.format(host, port, unit_name))
 
788
            return True
 
789
        elif port and not conf_port and conf_ssl:
 
790
            self.log.debug('SSL is enabled @{} but not on port {} '
 
791
                           '({})'.format(host, port, unit_name))
 
792
            return False
 
793
        # Port not checked (useful when checking that ssl is disabled)
 
794
        elif not port and conf_ssl:
 
795
            self.log.debug('SSL is enabled  @{}:{} '
 
796
                           '({})'.format(host, port, unit_name))
 
797
            return True
 
798
        elif not conf_ssl:
 
799
            self.log.debug('SSL not enabled @{}:{} '
 
800
                           '({})'.format(host, port, unit_name))
 
801
            return False
 
802
        else:
 
803
            msg = ('Unknown condition when checking SSL status @{}:{} '
 
804
                   '({})'.format(host, port, unit_name))
 
805
            amulet.raise_status(amulet.FAIL, msg)
 
806
 
 
807
    def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
 
808
        """Check that ssl is enabled on rmq juju sentry units.
 
809
 
 
810
        :param sentry_units: list of all rmq sentry units
 
811
        :param port: optional ssl port override to validate
 
812
        :returns: None if successful, otherwise return error message
 
813
        """
 
814
        for sentry_unit in sentry_units:
 
815
            if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
 
816
                return ('Unexpected condition:  ssl is disabled on unit '
 
817
                        '({})'.format(sentry_unit.info['unit_name']))
 
818
        return None
 
819
 
 
820
    def validate_rmq_ssl_disabled_units(self, sentry_units):
 
821
        """Check that ssl is enabled on listed rmq juju sentry units.
 
822
 
 
823
        :param sentry_units: list of all rmq sentry units
 
824
        :returns: True if successful.  Raise on error.
 
825
        """
 
826
        for sentry_unit in sentry_units:
 
827
            if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
 
828
                return ('Unexpected condition:  ssl is enabled on unit '
 
829
                        '({})'.format(sentry_unit.info['unit_name']))
 
830
        return None
 
831
 
 
832
    def configure_rmq_ssl_on(self, sentry_units, deployment,
 
833
                             port=None, max_wait=60):
 
834
        """Turn ssl charm config option on, with optional non-default
 
835
        ssl port specification.  Confirm that it is enabled on every
 
836
        unit.
 
837
 
 
838
        :param sentry_units: list of sentry units
 
839
        :param deployment: amulet deployment object pointer
 
840
        :param port: amqp port, use defaults if None
 
841
        :param max_wait: maximum time to wait in seconds to confirm
 
842
        :returns: None if successful.  Raise on error.
 
843
        """
 
844
        self.log.debug('Setting ssl charm config option:  on')
 
845
 
 
846
        # Enable RMQ SSL
 
847
        config = {'ssl': 'on'}
 
848
        if port:
 
849
            config['ssl_port'] = port
 
850
 
 
851
        deployment.d.configure('rabbitmq-server', config)
 
852
 
 
853
        # Wait for unit status
 
854
        self.rmq_wait_for_cluster(deployment)
 
855
 
 
856
        # Confirm
 
857
        tries = 0
 
858
        ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 
859
        while ret and tries < (max_wait / 4):
 
860
            time.sleep(4)
 
861
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 
862
            ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 
863
            tries += 1
 
864
 
 
865
        if ret:
 
866
            amulet.raise_status(amulet.FAIL, ret)
 
867
 
 
868
    def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
 
869
        """Turn ssl charm config option off, confirm that it is disabled
 
870
        on every unit.
 
871
 
 
872
        :param sentry_units: list of sentry units
 
873
        :param deployment: amulet deployment object pointer
 
874
        :param max_wait: maximum time to wait in seconds to confirm
 
875
        :returns: None if successful.  Raise on error.
 
876
        """
 
877
        self.log.debug('Setting ssl charm config option:  off')
 
878
 
 
879
        # Disable RMQ SSL
 
880
        config = {'ssl': 'off'}
 
881
        deployment.d.configure('rabbitmq-server', config)
 
882
 
 
883
        # Wait for unit status
 
884
        self.rmq_wait_for_cluster(deployment)
 
885
 
 
886
        # Confirm
 
887
        tries = 0
 
888
        ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 
889
        while ret and tries < (max_wait / 4):
 
890
            time.sleep(4)
 
891
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 
892
            ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 
893
            tries += 1
 
894
 
 
895
        if ret:
 
896
            amulet.raise_status(amulet.FAIL, ret)
 
897
 
 
898
    def connect_amqp_by_unit(self, sentry_unit, ssl=False,
 
899
                             port=None, fatal=True,
 
900
                             username="testuser1", password="changeme"):
 
901
        """Establish and return a pika amqp connection to the rabbitmq service
 
902
        running on a rmq juju unit.
 
903
 
 
904
        :param sentry_unit: sentry unit pointer
 
905
        :param ssl: boolean, default to False
 
906
        :param port: amqp port, use defaults if None
 
907
        :param fatal: boolean, default to True (raises on connect error)
 
908
        :param username: amqp user name, default to testuser1
 
909
        :param password: amqp user password
 
910
        :returns: pika amqp connection pointer or None if failed and non-fatal
 
911
        """
 
912
        host = sentry_unit.info['public-address']
 
913
        unit_name = sentry_unit.info['unit_name']
 
914
 
 
915
        # Default port logic if port is not specified
 
916
        if ssl and not port:
 
917
            port = 5671
 
918
        elif not ssl and not port:
 
919
            port = 5672
 
920
 
 
921
        self.log.debug('Connecting to amqp on {}:{} ({}) as '
 
922
                       '{}...'.format(host, port, unit_name, username))
 
923
 
 
924
        try:
 
925
            credentials = pika.PlainCredentials(username, password)
 
926
            parameters = pika.ConnectionParameters(host=host, port=port,
 
927
                                                   credentials=credentials,
 
928
                                                   ssl=ssl,
 
929
                                                   connection_attempts=3,
 
930
                                                   retry_delay=5,
 
931
                                                   socket_timeout=1)
 
932
            connection = pika.BlockingConnection(parameters)
 
933
            assert connection.server_properties['product'] == 'RabbitMQ'
 
934
            self.log.debug('Connect OK')
 
935
            return connection
 
936
        except Exception as e:
 
937
            msg = ('amqp connection failed to {}:{} as '
 
938
                   '{} ({})'.format(host, port, username, str(e)))
 
939
            if fatal:
 
940
                amulet.raise_status(amulet.FAIL, msg)
 
941
            else:
 
942
                self.log.warn(msg)
 
943
                return None
 
944
 
 
945
    def publish_amqp_message_by_unit(self, sentry_unit, message,
 
946
                                     queue="test", ssl=False,
 
947
                                     username="testuser1",
 
948
                                     password="changeme",
 
949
                                     port=None):
 
950
        """Publish an amqp message to a rmq juju unit.
 
951
 
 
952
        :param sentry_unit: sentry unit pointer
 
953
        :param message: amqp message string
 
954
        :param queue: message queue, default to test
 
955
        :param username: amqp user name, default to testuser1
 
956
        :param password: amqp user password
 
957
        :param ssl: boolean, default to False
 
958
        :param port: amqp port, use defaults if None
 
959
        :returns: None.  Raises exception if publish failed.
 
960
        """
 
961
        self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
 
962
                                                                    message))
 
963
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 
964
                                               port=port,
 
965
                                               username=username,
 
966
                                               password=password)
 
967
 
 
968
        # NOTE(beisner): extra debug here re: pika hang potential:
 
969
        #   https://github.com/pika/pika/issues/297
 
970
        #   https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
 
971
        self.log.debug('Defining channel...')
 
972
        channel = connection.channel()
 
973
        self.log.debug('Declaring queue...')
 
974
        channel.queue_declare(queue=queue, auto_delete=False, durable=True)
 
975
        self.log.debug('Publishing message...')
 
976
        channel.basic_publish(exchange='', routing_key=queue, body=message)
 
977
        self.log.debug('Closing channel...')
 
978
        channel.close()
 
979
        self.log.debug('Closing connection...')
 
980
        connection.close()
 
981
 
 
982
    def get_amqp_message_by_unit(self, sentry_unit, queue="test",
 
983
                                 username="testuser1",
 
984
                                 password="changeme",
 
985
                                 ssl=False, port=None):
 
986
        """Get an amqp message from a rmq juju unit.
 
987
 
 
988
        :param sentry_unit: sentry unit pointer
 
989
        :param queue: message queue, default to test
 
990
        :param username: amqp user name, default to testuser1
 
991
        :param password: amqp user password
 
992
        :param ssl: boolean, default to False
 
993
        :param port: amqp port, use defaults if None
 
994
        :returns: amqp message body as string.  Raise if get fails.
 
995
        """
 
996
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 
997
                                               port=port,
 
998
                                               username=username,
 
999
                                               password=password)
 
1000
        channel = connection.channel()
 
1001
        method_frame, _, body = channel.basic_get(queue)
 
1002
 
 
1003
        if method_frame:
 
1004
            self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
 
1005
                                                                         body))
 
1006
            channel.basic_ack(method_frame.delivery_tag)
 
1007
            channel.close()
 
1008
            connection.close()
 
1009
            return body
 
1010
        else:
 
1011
            msg = 'No message retrieved.'
 
1012
            amulet.raise_status(amulet.FAIL, msg)