~james-page/charms/trusty/swift-proxy/trunk

« back to all changes in this revision

Viewing changes to charmhelpers/contrib/openstack/amulet/utils.py

  • Committer: Ryan Beisner
  • Date: 2016-01-19 12:47:49 UTC
  • mto: This revision was merged to the branch mainline in revision 135.
  • Revision ID: ryan.beisner@canonical.com-20160119124749-5uj102427wonbfmx
Fix typo in mitaka amulet test definition

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2014-2015 Canonical Limited.
 
2
#
 
3
# This file is part of charm-helpers.
 
4
#
 
5
# charm-helpers is free software: you can redistribute it and/or modify
 
6
# it under the terms of the GNU Lesser General Public License version 3 as
 
7
# published by the Free Software Foundation.
 
8
#
 
9
# charm-helpers is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU Lesser General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU Lesser General Public License
 
15
# along with charm-helpers.  If not, see <http://www.gnu.org/licenses/>.
 
16
 
 
17
import amulet
 
18
import json
 
19
import logging
 
20
import os
 
21
import re
 
22
import six
 
23
import time
 
24
import urllib
 
25
 
 
26
import cinderclient.v1.client as cinder_client
 
27
import glanceclient.v1.client as glance_client
 
28
import heatclient.v1.client as heat_client
 
29
import keystoneclient.v2_0 as keystone_client
 
30
import novaclient.v1_1.client as nova_client
 
31
import pika
 
32
import swiftclient
 
33
 
 
34
from charmhelpers.contrib.amulet.utils import (
 
35
    AmuletUtils
 
36
)
 
37
 
 
38
DEBUG = logging.DEBUG
 
39
ERROR = logging.ERROR
 
40
 
 
41
 
 
42
class OpenStackAmuletUtils(AmuletUtils):
 
43
    """OpenStack amulet utilities.
 
44
 
 
45
       This class inherits from AmuletUtils and has additional support
 
46
       that is specifically for use by OpenStack charm tests.
 
47
       """
 
48
 
 
49
    def __init__(self, log_level=ERROR):
 
50
        """Initialize the deployment environment."""
 
51
        super(OpenStackAmuletUtils, self).__init__(log_level)
 
52
 
 
53
    def validate_endpoint_data(self, endpoints, admin_port, internal_port,
 
54
                               public_port, expected):
 
55
        """Validate endpoint data.
 
56
 
 
57
           Validate actual endpoint data vs expected endpoint data. The ports
 
58
           are used to find the matching endpoint.
 
59
           """
 
60
        self.log.debug('Validating endpoint data...')
 
61
        self.log.debug('actual: {}'.format(repr(endpoints)))
 
62
        found = False
 
63
        for ep in endpoints:
 
64
            self.log.debug('endpoint: {}'.format(repr(ep)))
 
65
            if (admin_port in ep.adminurl and
 
66
                    internal_port in ep.internalurl and
 
67
                    public_port in ep.publicurl):
 
68
                found = True
 
69
                actual = {'id': ep.id,
 
70
                          'region': ep.region,
 
71
                          'adminurl': ep.adminurl,
 
72
                          'internalurl': ep.internalurl,
 
73
                          'publicurl': ep.publicurl,
 
74
                          'service_id': ep.service_id}
 
75
                ret = self._validate_dict_data(expected, actual)
 
76
                if ret:
 
77
                    return 'unexpected endpoint data - {}'.format(ret)
 
78
 
 
79
        if not found:
 
80
            return 'endpoint not found'
 
81
 
 
82
    def validate_svc_catalog_endpoint_data(self, expected, actual):
 
83
        """Validate service catalog endpoint data.
 
84
 
 
85
           Validate a list of actual service catalog endpoints vs a list of
 
86
           expected service catalog endpoints.
 
87
           """
 
88
        self.log.debug('Validating service catalog endpoint data...')
 
89
        self.log.debug('actual: {}'.format(repr(actual)))
 
90
        for k, v in six.iteritems(expected):
 
91
            if k in actual:
 
92
                ret = self._validate_dict_data(expected[k][0], actual[k][0])
 
93
                if ret:
 
94
                    return self.endpoint_error(k, ret)
 
95
            else:
 
96
                return "endpoint {} does not exist".format(k)
 
97
        return ret
 
98
 
 
99
    def validate_tenant_data(self, expected, actual):
 
100
        """Validate tenant data.
 
101
 
 
102
           Validate a list of actual tenant data vs list of expected tenant
 
103
           data.
 
104
           """
 
105
        self.log.debug('Validating tenant data...')
 
106
        self.log.debug('actual: {}'.format(repr(actual)))
 
107
        for e in expected:
 
108
            found = False
 
109
            for act in actual:
 
110
                a = {'enabled': act.enabled, 'description': act.description,
 
111
                     'name': act.name, 'id': act.id}
 
112
                if e['name'] == a['name']:
 
113
                    found = True
 
114
                    ret = self._validate_dict_data(e, a)
 
115
                    if ret:
 
116
                        return "unexpected tenant data - {}".format(ret)
 
117
            if not found:
 
118
                return "tenant {} does not exist".format(e['name'])
 
119
        return ret
 
120
 
 
121
    def validate_role_data(self, expected, actual):
 
122
        """Validate role data.
 
123
 
 
124
           Validate a list of actual role data vs a list of expected role
 
125
           data.
 
126
           """
 
127
        self.log.debug('Validating role data...')
 
128
        self.log.debug('actual: {}'.format(repr(actual)))
 
129
        for e in expected:
 
130
            found = False
 
131
            for act in actual:
 
132
                a = {'name': act.name, 'id': act.id}
 
133
                if e['name'] == a['name']:
 
134
                    found = True
 
135
                    ret = self._validate_dict_data(e, a)
 
136
                    if ret:
 
137
                        return "unexpected role data - {}".format(ret)
 
138
            if not found:
 
139
                return "role {} does not exist".format(e['name'])
 
140
        return ret
 
141
 
 
142
    def validate_user_data(self, expected, actual):
 
143
        """Validate user data.
 
144
 
 
145
           Validate a list of actual user data vs a list of expected user
 
146
           data.
 
147
           """
 
148
        self.log.debug('Validating user data...')
 
149
        self.log.debug('actual: {}'.format(repr(actual)))
 
150
        for e in expected:
 
151
            found = False
 
152
            for act in actual:
 
153
                a = {'enabled': act.enabled, 'name': act.name,
 
154
                     'email': act.email, 'tenantId': act.tenantId,
 
155
                     'id': act.id}
 
156
                if e['name'] == a['name']:
 
157
                    found = True
 
158
                    ret = self._validate_dict_data(e, a)
 
159
                    if ret:
 
160
                        return "unexpected user data - {}".format(ret)
 
161
            if not found:
 
162
                return "user {} does not exist".format(e['name'])
 
163
        return ret
 
164
 
 
165
    def validate_flavor_data(self, expected, actual):
 
166
        """Validate flavor data.
 
167
 
 
168
           Validate a list of actual flavors vs a list of expected flavors.
 
169
           """
 
170
        self.log.debug('Validating flavor data...')
 
171
        self.log.debug('actual: {}'.format(repr(actual)))
 
172
        act = [a.name for a in actual]
 
173
        return self._validate_list_data(expected, act)
 
174
 
 
175
    def tenant_exists(self, keystone, tenant):
 
176
        """Return True if tenant exists."""
 
177
        self.log.debug('Checking if tenant exists ({})...'.format(tenant))
 
178
        return tenant in [t.name for t in keystone.tenants.list()]
 
179
 
 
180
    def authenticate_cinder_admin(self, keystone_sentry, username,
 
181
                                  password, tenant):
 
182
        """Authenticates admin user with cinder."""
 
183
        # NOTE(beisner): cinder python client doesn't accept tokens.
 
184
        service_ip = \
 
185
            keystone_sentry.relation('shared-db',
 
186
                                     'mysql:shared-db')['private-address']
 
187
        ept = "http://{}:5000/v2.0".format(service_ip.strip().decode('utf-8'))
 
188
        return cinder_client.Client(username, password, tenant, ept)
 
189
 
 
190
    def authenticate_keystone_admin(self, keystone_sentry, user, password,
 
191
                                    tenant):
 
192
        """Authenticates admin user with the keystone admin endpoint."""
 
193
        self.log.debug('Authenticating keystone admin...')
 
194
        unit = keystone_sentry
 
195
        service_ip = unit.relation('shared-db',
 
196
                                   'mysql:shared-db')['private-address']
 
197
        ep = "http://{}:35357/v2.0".format(service_ip.strip().decode('utf-8'))
 
198
        return keystone_client.Client(username=user, password=password,
 
199
                                      tenant_name=tenant, auth_url=ep)
 
200
 
 
201
    def authenticate_keystone_user(self, keystone, user, password, tenant):
 
202
        """Authenticates a regular user with the keystone public endpoint."""
 
203
        self.log.debug('Authenticating keystone user ({})...'.format(user))
 
204
        ep = keystone.service_catalog.url_for(service_type='identity',
 
205
                                              endpoint_type='publicURL')
 
206
        return keystone_client.Client(username=user, password=password,
 
207
                                      tenant_name=tenant, auth_url=ep)
 
208
 
 
209
    def authenticate_glance_admin(self, keystone):
 
210
        """Authenticates admin user with glance."""
 
211
        self.log.debug('Authenticating glance admin...')
 
212
        ep = keystone.service_catalog.url_for(service_type='image',
 
213
                                              endpoint_type='adminURL')
 
214
        return glance_client.Client(ep, token=keystone.auth_token)
 
215
 
 
216
    def authenticate_heat_admin(self, keystone):
 
217
        """Authenticates the admin user with heat."""
 
218
        self.log.debug('Authenticating heat admin...')
 
219
        ep = keystone.service_catalog.url_for(service_type='orchestration',
 
220
                                              endpoint_type='publicURL')
 
221
        return heat_client.Client(endpoint=ep, token=keystone.auth_token)
 
222
 
 
223
    def authenticate_nova_user(self, keystone, user, password, tenant):
 
224
        """Authenticates a regular user with nova-api."""
 
225
        self.log.debug('Authenticating nova user ({})...'.format(user))
 
226
        ep = keystone.service_catalog.url_for(service_type='identity',
 
227
                                              endpoint_type='publicURL')
 
228
        return nova_client.Client(username=user, api_key=password,
 
229
                                  project_id=tenant, auth_url=ep)
 
230
 
 
231
    def authenticate_swift_user(self, keystone, user, password, tenant):
 
232
        """Authenticates a regular user with swift api."""
 
233
        self.log.debug('Authenticating swift user ({})...'.format(user))
 
234
        ep = keystone.service_catalog.url_for(service_type='identity',
 
235
                                              endpoint_type='publicURL')
 
236
        return swiftclient.Connection(authurl=ep,
 
237
                                      user=user,
 
238
                                      key=password,
 
239
                                      tenant_name=tenant,
 
240
                                      auth_version='2.0')
 
241
 
 
242
    def create_cirros_image(self, glance, image_name):
 
243
        """Download the latest cirros image and upload it to glance,
 
244
        validate and return a resource pointer.
 
245
 
 
246
        :param glance: pointer to authenticated glance connection
 
247
        :param image_name: display name for new image
 
248
        :returns: glance image pointer
 
249
        """
 
250
        self.log.debug('Creating glance cirros image '
 
251
                       '({})...'.format(image_name))
 
252
 
 
253
        # Download cirros image
 
254
        http_proxy = os.getenv('AMULET_HTTP_PROXY')
 
255
        self.log.debug('AMULET_HTTP_PROXY: {}'.format(http_proxy))
 
256
        if http_proxy:
 
257
            proxies = {'http': http_proxy}
 
258
            opener = urllib.FancyURLopener(proxies)
 
259
        else:
 
260
            opener = urllib.FancyURLopener()
 
261
 
 
262
        f = opener.open('http://download.cirros-cloud.net/version/released')
 
263
        version = f.read().strip()
 
264
        cirros_img = 'cirros-{}-x86_64-disk.img'.format(version)
 
265
        local_path = os.path.join('tests', cirros_img)
 
266
 
 
267
        if not os.path.exists(local_path):
 
268
            cirros_url = 'http://{}/{}/{}'.format('download.cirros-cloud.net',
 
269
                                                  version, cirros_img)
 
270
            opener.retrieve(cirros_url, local_path)
 
271
        f.close()
 
272
 
 
273
        # Create glance image
 
274
        with open(local_path) as f:
 
275
            image = glance.images.create(name=image_name, is_public=True,
 
276
                                         disk_format='qcow2',
 
277
                                         container_format='bare', data=f)
 
278
 
 
279
        # Wait for image to reach active status
 
280
        img_id = image.id
 
281
        ret = self.resource_reaches_status(glance.images, img_id,
 
282
                                           expected_stat='active',
 
283
                                           msg='Image status wait')
 
284
        if not ret:
 
285
            msg = 'Glance image failed to reach expected state.'
 
286
            amulet.raise_status(amulet.FAIL, msg=msg)
 
287
 
 
288
        # Re-validate new image
 
289
        self.log.debug('Validating image attributes...')
 
290
        val_img_name = glance.images.get(img_id).name
 
291
        val_img_stat = glance.images.get(img_id).status
 
292
        val_img_pub = glance.images.get(img_id).is_public
 
293
        val_img_cfmt = glance.images.get(img_id).container_format
 
294
        val_img_dfmt = glance.images.get(img_id).disk_format
 
295
        msg_attr = ('Image attributes - name:{} public:{} id:{} stat:{} '
 
296
                    'container fmt:{} disk fmt:{}'.format(
 
297
                        val_img_name, val_img_pub, img_id,
 
298
                        val_img_stat, val_img_cfmt, val_img_dfmt))
 
299
 
 
300
        if val_img_name == image_name and val_img_stat == 'active' \
 
301
                and val_img_pub is True and val_img_cfmt == 'bare' \
 
302
                and val_img_dfmt == 'qcow2':
 
303
            self.log.debug(msg_attr)
 
304
        else:
 
305
            msg = ('Volume validation failed, {}'.format(msg_attr))
 
306
            amulet.raise_status(amulet.FAIL, msg=msg)
 
307
 
 
308
        return image
 
309
 
 
310
    def delete_image(self, glance, image):
 
311
        """Delete the specified image."""
 
312
 
 
313
        # /!\ DEPRECATION WARNING
 
314
        self.log.warn('/!\\ DEPRECATION WARNING:  use '
 
315
                      'delete_resource instead of delete_image.')
 
316
        self.log.debug('Deleting glance image ({})...'.format(image))
 
317
        return self.delete_resource(glance.images, image, msg='glance image')
 
318
 
 
319
    def create_instance(self, nova, image_name, instance_name, flavor):
 
320
        """Create the specified instance."""
 
321
        self.log.debug('Creating instance '
 
322
                       '({}|{}|{})'.format(instance_name, image_name, flavor))
 
323
        image = nova.images.find(name=image_name)
 
324
        flavor = nova.flavors.find(name=flavor)
 
325
        instance = nova.servers.create(name=instance_name, image=image,
 
326
                                       flavor=flavor)
 
327
 
 
328
        count = 1
 
329
        status = instance.status
 
330
        while status != 'ACTIVE' and count < 60:
 
331
            time.sleep(3)
 
332
            instance = nova.servers.get(instance.id)
 
333
            status = instance.status
 
334
            self.log.debug('instance status: {}'.format(status))
 
335
            count += 1
 
336
 
 
337
        if status != 'ACTIVE':
 
338
            self.log.error('instance creation timed out')
 
339
            return None
 
340
 
 
341
        return instance
 
342
 
 
343
    def delete_instance(self, nova, instance):
 
344
        """Delete the specified instance."""
 
345
 
 
346
        # /!\ DEPRECATION WARNING
 
347
        self.log.warn('/!\\ DEPRECATION WARNING:  use '
 
348
                      'delete_resource instead of delete_instance.')
 
349
        self.log.debug('Deleting instance ({})...'.format(instance))
 
350
        return self.delete_resource(nova.servers, instance,
 
351
                                    msg='nova instance')
 
352
 
 
353
    def create_or_get_keypair(self, nova, keypair_name="testkey"):
 
354
        """Create a new keypair, or return pointer if it already exists."""
 
355
        try:
 
356
            _keypair = nova.keypairs.get(keypair_name)
 
357
            self.log.debug('Keypair ({}) already exists, '
 
358
                           'using it.'.format(keypair_name))
 
359
            return _keypair
 
360
        except:
 
361
            self.log.debug('Keypair ({}) does not exist, '
 
362
                           'creating it.'.format(keypair_name))
 
363
 
 
364
        _keypair = nova.keypairs.create(name=keypair_name)
 
365
        return _keypair
 
366
 
 
367
    def create_cinder_volume(self, cinder, vol_name="demo-vol", vol_size=1,
 
368
                             img_id=None, src_vol_id=None, snap_id=None):
 
369
        """Create cinder volume, optionally from a glance image, OR
 
370
        optionally as a clone of an existing volume, OR optionally
 
371
        from a snapshot.  Wait for the new volume status to reach
 
372
        the expected status, validate and return a resource pointer.
 
373
 
 
374
        :param vol_name: cinder volume display name
 
375
        :param vol_size: size in gigabytes
 
376
        :param img_id: optional glance image id
 
377
        :param src_vol_id: optional source volume id to clone
 
378
        :param snap_id: optional snapshot id to use
 
379
        :returns: cinder volume pointer
 
380
        """
 
381
        # Handle parameter input and avoid impossible combinations
 
382
        if img_id and not src_vol_id and not snap_id:
 
383
            # Create volume from image
 
384
            self.log.debug('Creating cinder volume from glance image...')
 
385
            bootable = 'true'
 
386
        elif src_vol_id and not img_id and not snap_id:
 
387
            # Clone an existing volume
 
388
            self.log.debug('Cloning cinder volume...')
 
389
            bootable = cinder.volumes.get(src_vol_id).bootable
 
390
        elif snap_id and not src_vol_id and not img_id:
 
391
            # Create volume from snapshot
 
392
            self.log.debug('Creating cinder volume from snapshot...')
 
393
            snap = cinder.volume_snapshots.find(id=snap_id)
 
394
            vol_size = snap.size
 
395
            snap_vol_id = cinder.volume_snapshots.get(snap_id).volume_id
 
396
            bootable = cinder.volumes.get(snap_vol_id).bootable
 
397
        elif not img_id and not src_vol_id and not snap_id:
 
398
            # Create volume
 
399
            self.log.debug('Creating cinder volume...')
 
400
            bootable = 'false'
 
401
        else:
 
402
            # Impossible combination of parameters
 
403
            msg = ('Invalid method use - name:{} size:{} img_id:{} '
 
404
                   'src_vol_id:{} snap_id:{}'.format(vol_name, vol_size,
 
405
                                                     img_id, src_vol_id,
 
406
                                                     snap_id))
 
407
            amulet.raise_status(amulet.FAIL, msg=msg)
 
408
 
 
409
        # Create new volume
 
410
        try:
 
411
            vol_new = cinder.volumes.create(display_name=vol_name,
 
412
                                            imageRef=img_id,
 
413
                                            size=vol_size,
 
414
                                            source_volid=src_vol_id,
 
415
                                            snapshot_id=snap_id)
 
416
            vol_id = vol_new.id
 
417
        except Exception as e:
 
418
            msg = 'Failed to create volume: {}'.format(e)
 
419
            amulet.raise_status(amulet.FAIL, msg=msg)
 
420
 
 
421
        # Wait for volume to reach available status
 
422
        ret = self.resource_reaches_status(cinder.volumes, vol_id,
 
423
                                           expected_stat="available",
 
424
                                           msg="Volume status wait")
 
425
        if not ret:
 
426
            msg = 'Cinder volume failed to reach expected state.'
 
427
            amulet.raise_status(amulet.FAIL, msg=msg)
 
428
 
 
429
        # Re-validate new volume
 
430
        self.log.debug('Validating volume attributes...')
 
431
        val_vol_name = cinder.volumes.get(vol_id).display_name
 
432
        val_vol_boot = cinder.volumes.get(vol_id).bootable
 
433
        val_vol_stat = cinder.volumes.get(vol_id).status
 
434
        val_vol_size = cinder.volumes.get(vol_id).size
 
435
        msg_attr = ('Volume attributes - name:{} id:{} stat:{} boot:'
 
436
                    '{} size:{}'.format(val_vol_name, vol_id,
 
437
                                        val_vol_stat, val_vol_boot,
 
438
                                        val_vol_size))
 
439
 
 
440
        if val_vol_boot == bootable and val_vol_stat == 'available' \
 
441
                and val_vol_name == vol_name and val_vol_size == vol_size:
 
442
            self.log.debug(msg_attr)
 
443
        else:
 
444
            msg = ('Volume validation failed, {}'.format(msg_attr))
 
445
            amulet.raise_status(amulet.FAIL, msg=msg)
 
446
 
 
447
        return vol_new
 
448
 
 
449
    def delete_resource(self, resource, resource_id,
 
450
                        msg="resource", max_wait=120):
 
451
        """Delete one openstack resource, such as one instance, keypair,
 
452
        image, volume, stack, etc., and confirm deletion within max wait time.
 
453
 
 
454
        :param resource: pointer to os resource type, ex:glance_client.images
 
455
        :param resource_id: unique name or id for the openstack resource
 
456
        :param msg: text to identify purpose in logging
 
457
        :param max_wait: maximum wait time in seconds
 
458
        :returns: True if successful, otherwise False
 
459
        """
 
460
        self.log.debug('Deleting OpenStack resource '
 
461
                       '{} ({})'.format(resource_id, msg))
 
462
        num_before = len(list(resource.list()))
 
463
        resource.delete(resource_id)
 
464
 
 
465
        tries = 0
 
466
        num_after = len(list(resource.list()))
 
467
        while num_after != (num_before - 1) and tries < (max_wait / 4):
 
468
            self.log.debug('{} delete check: '
 
469
                           '{} [{}:{}] {}'.format(msg, tries,
 
470
                                                  num_before,
 
471
                                                  num_after,
 
472
                                                  resource_id))
 
473
            time.sleep(4)
 
474
            num_after = len(list(resource.list()))
 
475
            tries += 1
 
476
 
 
477
        self.log.debug('{}:  expected, actual count = {}, '
 
478
                       '{}'.format(msg, num_before - 1, num_after))
 
479
 
 
480
        if num_after == (num_before - 1):
 
481
            return True
 
482
        else:
 
483
            self.log.error('{} delete timed out'.format(msg))
 
484
            return False
 
485
 
 
486
    def resource_reaches_status(self, resource, resource_id,
 
487
                                expected_stat='available',
 
488
                                msg='resource', max_wait=120):
 
489
        """Wait for an openstack resources status to reach an
 
490
           expected status within a specified time.  Useful to confirm that
 
491
           nova instances, cinder vols, snapshots, glance images, heat stacks
 
492
           and other resources eventually reach the expected status.
 
493
 
 
494
        :param resource: pointer to os resource type, ex: heat_client.stacks
 
495
        :param resource_id: unique id for the openstack resource
 
496
        :param expected_stat: status to expect resource to reach
 
497
        :param msg: text to identify purpose in logging
 
498
        :param max_wait: maximum wait time in seconds
 
499
        :returns: True if successful, False if status is not reached
 
500
        """
 
501
 
 
502
        tries = 0
 
503
        resource_stat = resource.get(resource_id).status
 
504
        while resource_stat != expected_stat and tries < (max_wait / 4):
 
505
            self.log.debug('{} status check: '
 
506
                           '{} [{}:{}] {}'.format(msg, tries,
 
507
                                                  resource_stat,
 
508
                                                  expected_stat,
 
509
                                                  resource_id))
 
510
            time.sleep(4)
 
511
            resource_stat = resource.get(resource_id).status
 
512
            tries += 1
 
513
 
 
514
        self.log.debug('{}:  expected, actual status = {}, '
 
515
                       '{}'.format(msg, resource_stat, expected_stat))
 
516
 
 
517
        if resource_stat == expected_stat:
 
518
            return True
 
519
        else:
 
520
            self.log.debug('{} never reached expected status: '
 
521
                           '{}'.format(resource_id, expected_stat))
 
522
            return False
 
523
 
 
524
    def get_ceph_osd_id_cmd(self, index):
 
525
        """Produce a shell command that will return a ceph-osd id."""
 
526
        return ("`initctl list | grep 'ceph-osd ' | "
 
527
                "awk 'NR=={} {{ print $2 }}' | "
 
528
                "grep -o '[0-9]*'`".format(index + 1))
 
529
 
 
530
    def get_ceph_pools(self, sentry_unit):
 
531
        """Return a dict of ceph pools from a single ceph unit, with
 
532
        pool name as keys, pool id as vals."""
 
533
        pools = {}
 
534
        cmd = 'sudo ceph osd lspools'
 
535
        output, code = sentry_unit.run(cmd)
 
536
        if code != 0:
 
537
            msg = ('{} `{}` returned {} '
 
538
                   '{}'.format(sentry_unit.info['unit_name'],
 
539
                               cmd, code, output))
 
540
            amulet.raise_status(amulet.FAIL, msg=msg)
 
541
 
 
542
        # Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
 
543
        for pool in str(output).split(','):
 
544
            pool_id_name = pool.split(' ')
 
545
            if len(pool_id_name) == 2:
 
546
                pool_id = pool_id_name[0]
 
547
                pool_name = pool_id_name[1]
 
548
                pools[pool_name] = int(pool_id)
 
549
 
 
550
        self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
 
551
                                                pools))
 
552
        return pools
 
553
 
 
554
    def get_ceph_df(self, sentry_unit):
 
555
        """Return dict of ceph df json output, including ceph pool state.
 
556
 
 
557
        :param sentry_unit: Pointer to amulet sentry instance (juju unit)
 
558
        :returns: Dict of ceph df output
 
559
        """
 
560
        cmd = 'sudo ceph df --format=json'
 
561
        output, code = sentry_unit.run(cmd)
 
562
        if code != 0:
 
563
            msg = ('{} `{}` returned {} '
 
564
                   '{}'.format(sentry_unit.info['unit_name'],
 
565
                               cmd, code, output))
 
566
            amulet.raise_status(amulet.FAIL, msg=msg)
 
567
        return json.loads(output)
 
568
 
 
569
    def get_ceph_pool_sample(self, sentry_unit, pool_id=0):
 
570
        """Take a sample of attributes of a ceph pool, returning ceph
 
571
        pool name, object count and disk space used for the specified
 
572
        pool ID number.
 
573
 
 
574
        :param sentry_unit: Pointer to amulet sentry instance (juju unit)
 
575
        :param pool_id: Ceph pool ID
 
576
        :returns: List of pool name, object count, kb disk space used
 
577
        """
 
578
        df = self.get_ceph_df(sentry_unit)
 
579
        pool_name = df['pools'][pool_id]['name']
 
580
        obj_count = df['pools'][pool_id]['stats']['objects']
 
581
        kb_used = df['pools'][pool_id]['stats']['kb_used']
 
582
        self.log.debug('Ceph {} pool (ID {}): {} objects, '
 
583
                       '{} kb used'.format(pool_name, pool_id,
 
584
                                           obj_count, kb_used))
 
585
        return pool_name, obj_count, kb_used
 
586
 
 
587
    def validate_ceph_pool_samples(self, samples, sample_type="resource pool"):
 
588
        """Validate ceph pool samples taken over time, such as pool
 
589
        object counts or pool kb used, before adding, after adding, and
 
590
        after deleting items which affect those pool attributes.  The
 
591
        2nd element is expected to be greater than the 1st; 3rd is expected
 
592
        to be less than the 2nd.
 
593
 
 
594
        :param samples: List containing 3 data samples
 
595
        :param sample_type: String for logging and usage context
 
596
        :returns: None if successful, Failure message otherwise
 
597
        """
 
598
        original, created, deleted = range(3)
 
599
        if samples[created] <= samples[original] or \
 
600
                samples[deleted] >= samples[created]:
 
601
            return ('Ceph {} samples ({}) '
 
602
                    'unexpected.'.format(sample_type, samples))
 
603
        else:
 
604
            self.log.debug('Ceph {} samples (OK): '
 
605
                           '{}'.format(sample_type, samples))
 
606
            return None
 
607
 
 
608
    # rabbitmq/amqp specific helpers:
 
609
 
 
610
    def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
 
611
        """Wait for rmq units extended status to show cluster readiness,
 
612
        after an optional initial sleep period.  Initial sleep is likely
 
613
        necessary to be effective following a config change, as status
 
614
        message may not instantly update to non-ready."""
 
615
 
 
616
        if init_sleep:
 
617
            time.sleep(init_sleep)
 
618
 
 
619
        message = re.compile('^Unit is ready and clustered$')
 
620
        deployment._auto_wait_for_status(message=message,
 
621
                                         timeout=timeout,
 
622
                                         include_only=['rabbitmq-server'])
 
623
 
 
624
    def add_rmq_test_user(self, sentry_units,
 
625
                          username="testuser1", password="changeme"):
 
626
        """Add a test user via the first rmq juju unit, check connection as
 
627
        the new user against all sentry units.
 
628
 
 
629
        :param sentry_units: list of sentry unit pointers
 
630
        :param username: amqp user name, default to testuser1
 
631
        :param password: amqp user password
 
632
        :returns: None if successful.  Raise on error.
 
633
        """
 
634
        self.log.debug('Adding rmq user ({})...'.format(username))
 
635
 
 
636
        # Check that user does not already exist
 
637
        cmd_user_list = 'rabbitmqctl list_users'
 
638
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 
639
        if username in output:
 
640
            self.log.warning('User ({}) already exists, returning '
 
641
                             'gracefully.'.format(username))
 
642
            return
 
643
 
 
644
        perms = '".*" ".*" ".*"'
 
645
        cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
 
646
                'rabbitmqctl set_permissions {} {}'.format(username, perms)]
 
647
 
 
648
        # Add user via first unit
 
649
        for cmd in cmds:
 
650
            output, _ = self.run_cmd_unit(sentry_units[0], cmd)
 
651
 
 
652
        # Check connection against the other sentry_units
 
653
        self.log.debug('Checking user connect against units...')
 
654
        for sentry_unit in sentry_units:
 
655
            connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
 
656
                                                   username=username,
 
657
                                                   password=password)
 
658
            connection.close()
 
659
 
 
660
    def delete_rmq_test_user(self, sentry_units, username="testuser1"):
 
661
        """Delete a rabbitmq user via the first rmq juju unit.
 
662
 
 
663
        :param sentry_units: list of sentry unit pointers
 
664
        :param username: amqp user name, default to testuser1
 
665
        :param password: amqp user password
 
666
        :returns: None if successful or no such user.
 
667
        """
 
668
        self.log.debug('Deleting rmq user ({})...'.format(username))
 
669
 
 
670
        # Check that the user exists
 
671
        cmd_user_list = 'rabbitmqctl list_users'
 
672
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 
673
 
 
674
        if username not in output:
 
675
            self.log.warning('User ({}) does not exist, returning '
 
676
                             'gracefully.'.format(username))
 
677
            return
 
678
 
 
679
        # Delete the user
 
680
        cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
 
681
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
 
682
 
 
683
    def get_rmq_cluster_status(self, sentry_unit):
 
684
        """Execute rabbitmq cluster status command on a unit and return
 
685
        the full output.
 
686
 
 
687
        :param unit: sentry unit
 
688
        :returns: String containing console output of cluster status command
 
689
        """
 
690
        cmd = 'rabbitmqctl cluster_status'
 
691
        output, _ = self.run_cmd_unit(sentry_unit, cmd)
 
692
        self.log.debug('{} cluster_status:\n{}'.format(
 
693
            sentry_unit.info['unit_name'], output))
 
694
        return str(output)
 
695
 
 
696
    def get_rmq_cluster_running_nodes(self, sentry_unit):
 
697
        """Parse rabbitmqctl cluster_status output string, return list of
 
698
        running rabbitmq cluster nodes.
 
699
 
 
700
        :param unit: sentry unit
 
701
        :returns: List containing node names of running nodes
 
702
        """
 
703
        # NOTE(beisner): rabbitmqctl cluster_status output is not
 
704
        # json-parsable, do string chop foo, then json.loads that.
 
705
        str_stat = self.get_rmq_cluster_status(sentry_unit)
 
706
        if 'running_nodes' in str_stat:
 
707
            pos_start = str_stat.find("{running_nodes,") + 15
 
708
            pos_end = str_stat.find("]},", pos_start) + 1
 
709
            str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
 
710
            run_nodes = json.loads(str_run_nodes)
 
711
            return run_nodes
 
712
        else:
 
713
            return []
 
714
 
 
715
    def validate_rmq_cluster_running_nodes(self, sentry_units):
 
716
        """Check that all rmq unit hostnames are represented in the
 
717
        cluster_status output of all units.
 
718
 
 
719
        :param host_names: dict of juju unit names to host names
 
720
        :param units: list of sentry unit pointers (all rmq units)
 
721
        :returns: None if successful, otherwise return error message
 
722
        """
 
723
        host_names = self.get_unit_hostnames(sentry_units)
 
724
        errors = []
 
725
 
 
726
        # Query every unit for cluster_status running nodes
 
727
        for query_unit in sentry_units:
 
728
            query_unit_name = query_unit.info['unit_name']
 
729
            running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
 
730
 
 
731
            # Confirm that every unit is represented in the queried unit's
 
732
            # cluster_status running nodes output.
 
733
            for validate_unit in sentry_units:
 
734
                val_host_name = host_names[validate_unit.info['unit_name']]
 
735
                val_node_name = 'rabbit@{}'.format(val_host_name)
 
736
 
 
737
                if val_node_name not in running_nodes:
 
738
                    errors.append('Cluster member check failed on {}: {} not '
 
739
                                  'in {}\n'.format(query_unit_name,
 
740
                                                   val_node_name,
 
741
                                                   running_nodes))
 
742
        if errors:
 
743
            return ''.join(errors)
 
744
 
 
745
    def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
 
746
        """Check a single juju rmq unit for ssl and port in the config file."""
 
747
        host = sentry_unit.info['public-address']
 
748
        unit_name = sentry_unit.info['unit_name']
 
749
 
 
750
        conf_file = '/etc/rabbitmq/rabbitmq.config'
 
751
        conf_contents = str(self.file_contents_safe(sentry_unit,
 
752
                                                    conf_file, max_wait=16))
 
753
        # Checks
 
754
        conf_ssl = 'ssl' in conf_contents
 
755
        conf_port = str(port) in conf_contents
 
756
 
 
757
        # Port explicitly checked in config
 
758
        if port and conf_port and conf_ssl:
 
759
            self.log.debug('SSL is enabled  @{}:{} '
 
760
                           '({})'.format(host, port, unit_name))
 
761
            return True
 
762
        elif port and not conf_port and conf_ssl:
 
763
            self.log.debug('SSL is enabled @{} but not on port {} '
 
764
                           '({})'.format(host, port, unit_name))
 
765
            return False
 
766
        # Port not checked (useful when checking that ssl is disabled)
 
767
        elif not port and conf_ssl:
 
768
            self.log.debug('SSL is enabled  @{}:{} '
 
769
                           '({})'.format(host, port, unit_name))
 
770
            return True
 
771
        elif not conf_ssl:
 
772
            self.log.debug('SSL not enabled @{}:{} '
 
773
                           '({})'.format(host, port, unit_name))
 
774
            return False
 
775
        else:
 
776
            msg = ('Unknown condition when checking SSL status @{}:{} '
 
777
                   '({})'.format(host, port, unit_name))
 
778
            amulet.raise_status(amulet.FAIL, msg)
 
779
 
 
780
    def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
 
781
        """Check that ssl is enabled on rmq juju sentry units.
 
782
 
 
783
        :param sentry_units: list of all rmq sentry units
 
784
        :param port: optional ssl port override to validate
 
785
        :returns: None if successful, otherwise return error message
 
786
        """
 
787
        for sentry_unit in sentry_units:
 
788
            if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
 
789
                return ('Unexpected condition:  ssl is disabled on unit '
 
790
                        '({})'.format(sentry_unit.info['unit_name']))
 
791
        return None
 
792
 
 
793
    def validate_rmq_ssl_disabled_units(self, sentry_units):
 
794
        """Check that ssl is enabled on listed rmq juju sentry units.
 
795
 
 
796
        :param sentry_units: list of all rmq sentry units
 
797
        :returns: True if successful.  Raise on error.
 
798
        """
 
799
        for sentry_unit in sentry_units:
 
800
            if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
 
801
                return ('Unexpected condition:  ssl is enabled on unit '
 
802
                        '({})'.format(sentry_unit.info['unit_name']))
 
803
        return None
 
804
 
 
805
    def configure_rmq_ssl_on(self, sentry_units, deployment,
 
806
                             port=None, max_wait=60):
 
807
        """Turn ssl charm config option on, with optional non-default
 
808
        ssl port specification.  Confirm that it is enabled on every
 
809
        unit.
 
810
 
 
811
        :param sentry_units: list of sentry units
 
812
        :param deployment: amulet deployment object pointer
 
813
        :param port: amqp port, use defaults if None
 
814
        :param max_wait: maximum time to wait in seconds to confirm
 
815
        :returns: None if successful.  Raise on error.
 
816
        """
 
817
        self.log.debug('Setting ssl charm config option:  on')
 
818
 
 
819
        # Enable RMQ SSL
 
820
        config = {'ssl': 'on'}
 
821
        if port:
 
822
            config['ssl_port'] = port
 
823
 
 
824
        deployment.d.configure('rabbitmq-server', config)
 
825
 
 
826
        # Wait for unit status
 
827
        self.rmq_wait_for_cluster(deployment)
 
828
 
 
829
        # Confirm
 
830
        tries = 0
 
831
        ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 
832
        while ret and tries < (max_wait / 4):
 
833
            time.sleep(4)
 
834
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 
835
            ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 
836
            tries += 1
 
837
 
 
838
        if ret:
 
839
            amulet.raise_status(amulet.FAIL, ret)
 
840
 
 
841
    def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
 
842
        """Turn ssl charm config option off, confirm that it is disabled
 
843
        on every unit.
 
844
 
 
845
        :param sentry_units: list of sentry units
 
846
        :param deployment: amulet deployment object pointer
 
847
        :param max_wait: maximum time to wait in seconds to confirm
 
848
        :returns: None if successful.  Raise on error.
 
849
        """
 
850
        self.log.debug('Setting ssl charm config option:  off')
 
851
 
 
852
        # Disable RMQ SSL
 
853
        config = {'ssl': 'off'}
 
854
        deployment.d.configure('rabbitmq-server', config)
 
855
 
 
856
        # Wait for unit status
 
857
        self.rmq_wait_for_cluster(deployment)
 
858
 
 
859
        # Confirm
 
860
        tries = 0
 
861
        ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 
862
        while ret and tries < (max_wait / 4):
 
863
            time.sleep(4)
 
864
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 
865
            ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 
866
            tries += 1
 
867
 
 
868
        if ret:
 
869
            amulet.raise_status(amulet.FAIL, ret)
 
870
 
 
871
    def connect_amqp_by_unit(self, sentry_unit, ssl=False,
 
872
                             port=None, fatal=True,
 
873
                             username="testuser1", password="changeme"):
 
874
        """Establish and return a pika amqp connection to the rabbitmq service
 
875
        running on a rmq juju unit.
 
876
 
 
877
        :param sentry_unit: sentry unit pointer
 
878
        :param ssl: boolean, default to False
 
879
        :param port: amqp port, use defaults if None
 
880
        :param fatal: boolean, default to True (raises on connect error)
 
881
        :param username: amqp user name, default to testuser1
 
882
        :param password: amqp user password
 
883
        :returns: pika amqp connection pointer or None if failed and non-fatal
 
884
        """
 
885
        host = sentry_unit.info['public-address']
 
886
        unit_name = sentry_unit.info['unit_name']
 
887
 
 
888
        # Default port logic if port is not specified
 
889
        if ssl and not port:
 
890
            port = 5671
 
891
        elif not ssl and not port:
 
892
            port = 5672
 
893
 
 
894
        self.log.debug('Connecting to amqp on {}:{} ({}) as '
 
895
                       '{}...'.format(host, port, unit_name, username))
 
896
 
 
897
        try:
 
898
            credentials = pika.PlainCredentials(username, password)
 
899
            parameters = pika.ConnectionParameters(host=host, port=port,
 
900
                                                   credentials=credentials,
 
901
                                                   ssl=ssl,
 
902
                                                   connection_attempts=3,
 
903
                                                   retry_delay=5,
 
904
                                                   socket_timeout=1)
 
905
            connection = pika.BlockingConnection(parameters)
 
906
            assert connection.server_properties['product'] == 'RabbitMQ'
 
907
            self.log.debug('Connect OK')
 
908
            return connection
 
909
        except Exception as e:
 
910
            msg = ('amqp connection failed to {}:{} as '
 
911
                   '{} ({})'.format(host, port, username, str(e)))
 
912
            if fatal:
 
913
                amulet.raise_status(amulet.FAIL, msg)
 
914
            else:
 
915
                self.log.warn(msg)
 
916
                return None
 
917
 
 
918
    def publish_amqp_message_by_unit(self, sentry_unit, message,
 
919
                                     queue="test", ssl=False,
 
920
                                     username="testuser1",
 
921
                                     password="changeme",
 
922
                                     port=None):
 
923
        """Publish an amqp message to a rmq juju unit.
 
924
 
 
925
        :param sentry_unit: sentry unit pointer
 
926
        :param message: amqp message string
 
927
        :param queue: message queue, default to test
 
928
        :param username: amqp user name, default to testuser1
 
929
        :param password: amqp user password
 
930
        :param ssl: boolean, default to False
 
931
        :param port: amqp port, use defaults if None
 
932
        :returns: None.  Raises exception if publish failed.
 
933
        """
 
934
        self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
 
935
                                                                    message))
 
936
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 
937
                                               port=port,
 
938
                                               username=username,
 
939
                                               password=password)
 
940
 
 
941
        # NOTE(beisner): extra debug here re: pika hang potential:
 
942
        #   https://github.com/pika/pika/issues/297
 
943
        #   https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
 
944
        self.log.debug('Defining channel...')
 
945
        channel = connection.channel()
 
946
        self.log.debug('Declaring queue...')
 
947
        channel.queue_declare(queue=queue, auto_delete=False, durable=True)
 
948
        self.log.debug('Publishing message...')
 
949
        channel.basic_publish(exchange='', routing_key=queue, body=message)
 
950
        self.log.debug('Closing channel...')
 
951
        channel.close()
 
952
        self.log.debug('Closing connection...')
 
953
        connection.close()
 
954
 
 
955
    def get_amqp_message_by_unit(self, sentry_unit, queue="test",
 
956
                                 username="testuser1",
 
957
                                 password="changeme",
 
958
                                 ssl=False, port=None):
 
959
        """Get an amqp message from a rmq juju unit.
 
960
 
 
961
        :param sentry_unit: sentry unit pointer
 
962
        :param queue: message queue, default to test
 
963
        :param username: amqp user name, default to testuser1
 
964
        :param password: amqp user password
 
965
        :param ssl: boolean, default to False
 
966
        :param port: amqp port, use defaults if None
 
967
        :returns: amqp message body as string.  Raise if get fails.
 
968
        """
 
969
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 
970
                                               port=port,
 
971
                                               username=username,
 
972
                                               password=password)
 
973
        channel = connection.channel()
 
974
        method_frame, _, body = channel.basic_get(queue)
 
975
 
 
976
        if method_frame:
 
977
            self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
 
978
                                                                         body))
 
979
            channel.basic_ack(method_frame.delivery_tag)
 
980
            channel.close()
 
981
            connection.close()
 
982
            return body
 
983
        else:
 
984
            msg = 'No message retrieved.'
 
985
            amulet.raise_status(amulet.FAIL, msg)