~canonical-hw-cert/charms/xenial/snappy-device-agent/trunk

« back to all changes in this revision

Viewing changes to hooks/charmhelpers/contrib/openstack/amulet/utils.py

  • Committer: Paul Larson
  • Date: 2016-05-16 20:27:32 UTC
  • Revision ID: paul.larson@canonical.com-20160516202732-9r4nkyl2f91w9xo3
Add support for xenial

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# You should have received a copy of the GNU Lesser General Public License
15
15
# along with charm-helpers.  If not, see <http://www.gnu.org/licenses/>.
16
16
 
 
17
import amulet
 
18
import json
17
19
import logging
18
20
import os
 
21
import re
 
22
import six
19
23
import time
20
24
import urllib
21
25
 
 
26
import cinderclient.v1.client as cinder_client
22
27
import glanceclient.v1.client as glance_client
 
28
import heatclient.v1.client as heat_client
23
29
import keystoneclient.v2_0 as keystone_client
24
 
import novaclient.v1_1.client as nova_client
 
30
from keystoneclient.auth.identity import v3 as keystone_id_v3
 
31
from keystoneclient import session as keystone_session
 
32
from keystoneclient.v3 import client as keystone_client_v3
25
33
 
26
 
import six
 
34
import novaclient.client as nova_client
 
35
import pika
 
36
import swiftclient
27
37
 
28
38
from charmhelpers.contrib.amulet.utils import (
29
39
    AmuletUtils
32
42
DEBUG = logging.DEBUG
33
43
ERROR = logging.ERROR
34
44
 
 
45
NOVA_CLIENT_VERSION = "2"
 
46
 
35
47
 
36
48
class OpenStackAmuletUtils(AmuletUtils):
37
49
    """OpenStack amulet utilities.
38
50
 
39
51
       This class inherits from AmuletUtils and has additional support
40
 
       that is specifically for use by OpenStack charms.
 
52
       that is specifically for use by OpenStack charm tests.
41
53
       """
42
54
 
43
55
    def __init__(self, log_level=ERROR):
51
63
           Validate actual endpoint data vs expected endpoint data. The ports
52
64
           are used to find the matching endpoint.
53
65
           """
 
66
        self.log.debug('Validating endpoint data...')
 
67
        self.log.debug('actual: {}'.format(repr(endpoints)))
54
68
        found = False
55
69
        for ep in endpoints:
56
70
            self.log.debug('endpoint: {}'.format(repr(ep)))
77
91
           Validate a list of actual service catalog endpoints vs a list of
78
92
           expected service catalog endpoints.
79
93
           """
 
94
        self.log.debug('Validating service catalog endpoint data...')
80
95
        self.log.debug('actual: {}'.format(repr(actual)))
81
96
        for k, v in six.iteritems(expected):
82
97
            if k in actual:
93
108
           Validate a list of actual tenant data vs list of expected tenant
94
109
           data.
95
110
           """
 
111
        self.log.debug('Validating tenant data...')
96
112
        self.log.debug('actual: {}'.format(repr(actual)))
97
113
        for e in expected:
98
114
            found = False
114
130
           Validate a list of actual role data vs a list of expected role
115
131
           data.
116
132
           """
 
133
        self.log.debug('Validating role data...')
117
134
        self.log.debug('actual: {}'.format(repr(actual)))
118
135
        for e in expected:
119
136
            found = False
128
145
                return "role {} does not exist".format(e['name'])
129
146
        return ret
130
147
 
131
 
    def validate_user_data(self, expected, actual):
 
148
    def validate_user_data(self, expected, actual, api_version=None):
132
149
        """Validate user data.
133
150
 
134
151
           Validate a list of actual user data vs a list of expected user
135
152
           data.
136
153
           """
 
154
        self.log.debug('Validating user data...')
137
155
        self.log.debug('actual: {}'.format(repr(actual)))
138
156
        for e in expected:
139
157
            found = False
140
158
            for act in actual:
141
 
                a = {'enabled': act.enabled, 'name': act.name,
142
 
                     'email': act.email, 'tenantId': act.tenantId,
143
 
                     'id': act.id}
144
 
                if e['name'] == a['name']:
 
159
                if e['name'] == act.name:
 
160
                    a = {'enabled': act.enabled, 'name': act.name,
 
161
                         'email': act.email, 'id': act.id}
 
162
                    if api_version == 3:
 
163
                        a['default_project_id'] = getattr(act,
 
164
                                                          'default_project_id',
 
165
                                                          'none')
 
166
                    else:
 
167
                        a['tenantId'] = act.tenantId
145
168
                    found = True
146
169
                    ret = self._validate_dict_data(e, a)
147
170
                    if ret:
155
178
 
156
179
           Validate a list of actual flavors vs a list of expected flavors.
157
180
           """
 
181
        self.log.debug('Validating flavor data...')
158
182
        self.log.debug('actual: {}'.format(repr(actual)))
159
183
        act = [a.name for a in actual]
160
184
        return self._validate_list_data(expected, act)
161
185
 
162
186
    def tenant_exists(self, keystone, tenant):
163
187
        """Return True if tenant exists."""
 
188
        self.log.debug('Checking if tenant exists ({})...'.format(tenant))
164
189
        return tenant in [t.name for t in keystone.tenants.list()]
165
190
 
 
191
    def authenticate_cinder_admin(self, keystone_sentry, username,
 
192
                                  password, tenant):
 
193
        """Authenticates admin user with cinder."""
 
194
        # NOTE(beisner): cinder python client doesn't accept tokens.
 
195
        service_ip = \
 
196
            keystone_sentry.relation('shared-db',
 
197
                                     'mysql:shared-db')['private-address']
 
198
        ept = "http://{}:5000/v2.0".format(service_ip.strip().decode('utf-8'))
 
199
        return cinder_client.Client(username, password, tenant, ept)
 
200
 
166
201
    def authenticate_keystone_admin(self, keystone_sentry, user, password,
167
 
                                    tenant):
 
202
                                    tenant=None, api_version=None,
 
203
                                    keystone_ip=None):
168
204
        """Authenticates admin user with the keystone admin endpoint."""
 
205
        self.log.debug('Authenticating keystone admin...')
169
206
        unit = keystone_sentry
170
 
        service_ip = unit.relation('shared-db',
171
 
                                   'mysql:shared-db')['private-address']
172
 
        ep = "http://{}:35357/v2.0".format(service_ip.strip().decode('utf-8'))
173
 
        return keystone_client.Client(username=user, password=password,
174
 
                                      tenant_name=tenant, auth_url=ep)
 
207
        if not keystone_ip:
 
208
            keystone_ip = unit.relation('shared-db',
 
209
                                        'mysql:shared-db')['private-address']
 
210
        base_ep = "http://{}:35357".format(keystone_ip.strip().decode('utf-8'))
 
211
        if not api_version or api_version == 2:
 
212
            ep = base_ep + "/v2.0"
 
213
            return keystone_client.Client(username=user, password=password,
 
214
                                          tenant_name=tenant, auth_url=ep)
 
215
        else:
 
216
            ep = base_ep + "/v3"
 
217
            auth = keystone_id_v3.Password(
 
218
                user_domain_name='admin_domain',
 
219
                username=user,
 
220
                password=password,
 
221
                domain_name='admin_domain',
 
222
                auth_url=ep,
 
223
            )
 
224
            sess = keystone_session.Session(auth=auth)
 
225
            return keystone_client_v3.Client(session=sess)
175
226
 
176
227
    def authenticate_keystone_user(self, keystone, user, password, tenant):
177
228
        """Authenticates a regular user with the keystone public endpoint."""
 
229
        self.log.debug('Authenticating keystone user ({})...'.format(user))
178
230
        ep = keystone.service_catalog.url_for(service_type='identity',
179
231
                                              endpoint_type='publicURL')
180
232
        return keystone_client.Client(username=user, password=password,
182
234
 
183
235
    def authenticate_glance_admin(self, keystone):
184
236
        """Authenticates admin user with glance."""
 
237
        self.log.debug('Authenticating glance admin...')
185
238
        ep = keystone.service_catalog.url_for(service_type='image',
186
239
                                              endpoint_type='adminURL')
187
240
        return glance_client.Client(ep, token=keystone.auth_token)
188
241
 
 
242
    def authenticate_heat_admin(self, keystone):
 
243
        """Authenticates the admin user with heat."""
 
244
        self.log.debug('Authenticating heat admin...')
 
245
        ep = keystone.service_catalog.url_for(service_type='orchestration',
 
246
                                              endpoint_type='publicURL')
 
247
        return heat_client.Client(endpoint=ep, token=keystone.auth_token)
 
248
 
189
249
    def authenticate_nova_user(self, keystone, user, password, tenant):
190
250
        """Authenticates a regular user with nova-api."""
 
251
        self.log.debug('Authenticating nova user ({})...'.format(user))
191
252
        ep = keystone.service_catalog.url_for(service_type='identity',
192
253
                                              endpoint_type='publicURL')
193
 
        return nova_client.Client(username=user, api_key=password,
 
254
        return nova_client.Client(NOVA_CLIENT_VERSION,
 
255
                                  username=user, api_key=password,
194
256
                                  project_id=tenant, auth_url=ep)
195
257
 
 
258
    def authenticate_swift_user(self, keystone, user, password, tenant):
 
259
        """Authenticates a regular user with swift api."""
 
260
        self.log.debug('Authenticating swift user ({})...'.format(user))
 
261
        ep = keystone.service_catalog.url_for(service_type='identity',
 
262
                                              endpoint_type='publicURL')
 
263
        return swiftclient.Connection(authurl=ep,
 
264
                                      user=user,
 
265
                                      key=password,
 
266
                                      tenant_name=tenant,
 
267
                                      auth_version='2.0')
 
268
 
196
269
    def create_cirros_image(self, glance, image_name):
197
 
        """Download the latest cirros image and upload it to glance."""
 
270
        """Download the latest cirros image and upload it to glance,
 
271
        validate and return a resource pointer.
 
272
 
 
273
        :param glance: pointer to authenticated glance connection
 
274
        :param image_name: display name for new image
 
275
        :returns: glance image pointer
 
276
        """
 
277
        self.log.debug('Creating glance cirros image '
 
278
                       '({})...'.format(image_name))
 
279
 
 
280
        # Download cirros image
198
281
        http_proxy = os.getenv('AMULET_HTTP_PROXY')
199
282
        self.log.debug('AMULET_HTTP_PROXY: {}'.format(http_proxy))
200
283
        if http_proxy:
203
286
        else:
204
287
            opener = urllib.FancyURLopener()
205
288
 
206
 
        f = opener.open("http://download.cirros-cloud.net/version/released")
 
289
        f = opener.open('http://download.cirros-cloud.net/version/released')
207
290
        version = f.read().strip()
208
 
        cirros_img = "cirros-{}-x86_64-disk.img".format(version)
 
291
        cirros_img = 'cirros-{}-x86_64-disk.img'.format(version)
209
292
        local_path = os.path.join('tests', cirros_img)
210
293
 
211
294
        if not os.path.exists(local_path):
212
 
            cirros_url = "http://{}/{}/{}".format("download.cirros-cloud.net",
 
295
            cirros_url = 'http://{}/{}/{}'.format('download.cirros-cloud.net',
213
296
                                                  version, cirros_img)
214
297
            opener.retrieve(cirros_url, local_path)
215
298
        f.close()
216
299
 
 
300
        # Create glance image
217
301
        with open(local_path) as f:
218
302
            image = glance.images.create(name=image_name, is_public=True,
219
303
                                         disk_format='qcow2',
220
304
                                         container_format='bare', data=f)
221
 
        count = 1
222
 
        status = image.status
223
 
        while status != 'active' and count < 10:
224
 
            time.sleep(3)
225
 
            image = glance.images.get(image.id)
226
 
            status = image.status
227
 
            self.log.debug('image status: {}'.format(status))
228
 
            count += 1
229
 
 
230
 
        if status != 'active':
231
 
            self.log.error('image creation timed out')
232
 
            return None
 
305
 
 
306
        # Wait for image to reach active status
 
307
        img_id = image.id
 
308
        ret = self.resource_reaches_status(glance.images, img_id,
 
309
                                           expected_stat='active',
 
310
                                           msg='Image status wait')
 
311
        if not ret:
 
312
            msg = 'Glance image failed to reach expected state.'
 
313
            amulet.raise_status(amulet.FAIL, msg=msg)
 
314
 
 
315
        # Re-validate new image
 
316
        self.log.debug('Validating image attributes...')
 
317
        val_img_name = glance.images.get(img_id).name
 
318
        val_img_stat = glance.images.get(img_id).status
 
319
        val_img_pub = glance.images.get(img_id).is_public
 
320
        val_img_cfmt = glance.images.get(img_id).container_format
 
321
        val_img_dfmt = glance.images.get(img_id).disk_format
 
322
        msg_attr = ('Image attributes - name:{} public:{} id:{} stat:{} '
 
323
                    'container fmt:{} disk fmt:{}'.format(
 
324
                        val_img_name, val_img_pub, img_id,
 
325
                        val_img_stat, val_img_cfmt, val_img_dfmt))
 
326
 
 
327
        if val_img_name == image_name and val_img_stat == 'active' \
 
328
                and val_img_pub is True and val_img_cfmt == 'bare' \
 
329
                and val_img_dfmt == 'qcow2':
 
330
            self.log.debug(msg_attr)
 
331
        else:
 
332
            msg = ('Volume validation failed, {}'.format(msg_attr))
 
333
            amulet.raise_status(amulet.FAIL, msg=msg)
233
334
 
234
335
        return image
235
336
 
236
337
    def delete_image(self, glance, image):
237
338
        """Delete the specified image."""
238
 
        num_before = len(list(glance.images.list()))
239
 
        glance.images.delete(image)
240
 
 
241
 
        count = 1
242
 
        num_after = len(list(glance.images.list()))
243
 
        while num_after != (num_before - 1) and count < 10:
244
 
            time.sleep(3)
245
 
            num_after = len(list(glance.images.list()))
246
 
            self.log.debug('number of images: {}'.format(num_after))
247
 
            count += 1
248
 
 
249
 
        if num_after != (num_before - 1):
250
 
            self.log.error('image deletion timed out')
251
 
            return False
252
 
 
253
 
        return True
 
339
 
 
340
        # /!\ DEPRECATION WARNING
 
341
        self.log.warn('/!\\ DEPRECATION WARNING:  use '
 
342
                      'delete_resource instead of delete_image.')
 
343
        self.log.debug('Deleting glance image ({})...'.format(image))
 
344
        return self.delete_resource(glance.images, image, msg='glance image')
254
345
 
255
346
    def create_instance(self, nova, image_name, instance_name, flavor):
256
347
        """Create the specified instance."""
 
348
        self.log.debug('Creating instance '
 
349
                       '({}|{}|{})'.format(instance_name, image_name, flavor))
257
350
        image = nova.images.find(name=image_name)
258
351
        flavor = nova.flavors.find(name=flavor)
259
352
        instance = nova.servers.create(name=instance_name, image=image,
276
369
 
277
370
    def delete_instance(self, nova, instance):
278
371
        """Delete the specified instance."""
279
 
        num_before = len(list(nova.servers.list()))
280
 
        nova.servers.delete(instance)
281
 
 
282
 
        count = 1
283
 
        num_after = len(list(nova.servers.list()))
284
 
        while num_after != (num_before - 1) and count < 10:
285
 
            time.sleep(3)
286
 
            num_after = len(list(nova.servers.list()))
287
 
            self.log.debug('number of instances: {}'.format(num_after))
288
 
            count += 1
289
 
 
290
 
        if num_after != (num_before - 1):
291
 
            self.log.error('instance deletion timed out')
292
 
            return False
293
 
 
294
 
        return True
 
372
 
 
373
        # /!\ DEPRECATION WARNING
 
374
        self.log.warn('/!\\ DEPRECATION WARNING:  use '
 
375
                      'delete_resource instead of delete_instance.')
 
376
        self.log.debug('Deleting instance ({})...'.format(instance))
 
377
        return self.delete_resource(nova.servers, instance,
 
378
                                    msg='nova instance')
 
379
 
 
380
    def create_or_get_keypair(self, nova, keypair_name="testkey"):
 
381
        """Create a new keypair, or return pointer if it already exists."""
 
382
        try:
 
383
            _keypair = nova.keypairs.get(keypair_name)
 
384
            self.log.debug('Keypair ({}) already exists, '
 
385
                           'using it.'.format(keypair_name))
 
386
            return _keypair
 
387
        except:
 
388
            self.log.debug('Keypair ({}) does not exist, '
 
389
                           'creating it.'.format(keypair_name))
 
390
 
 
391
        _keypair = nova.keypairs.create(name=keypair_name)
 
392
        return _keypair
 
393
 
 
394
    def create_cinder_volume(self, cinder, vol_name="demo-vol", vol_size=1,
 
395
                             img_id=None, src_vol_id=None, snap_id=None):
 
396
        """Create cinder volume, optionally from a glance image, OR
 
397
        optionally as a clone of an existing volume, OR optionally
 
398
        from a snapshot.  Wait for the new volume status to reach
 
399
        the expected status, validate and return a resource pointer.
 
400
 
 
401
        :param vol_name: cinder volume display name
 
402
        :param vol_size: size in gigabytes
 
403
        :param img_id: optional glance image id
 
404
        :param src_vol_id: optional source volume id to clone
 
405
        :param snap_id: optional snapshot id to use
 
406
        :returns: cinder volume pointer
 
407
        """
 
408
        # Handle parameter input and avoid impossible combinations
 
409
        if img_id and not src_vol_id and not snap_id:
 
410
            # Create volume from image
 
411
            self.log.debug('Creating cinder volume from glance image...')
 
412
            bootable = 'true'
 
413
        elif src_vol_id and not img_id and not snap_id:
 
414
            # Clone an existing volume
 
415
            self.log.debug('Cloning cinder volume...')
 
416
            bootable = cinder.volumes.get(src_vol_id).bootable
 
417
        elif snap_id and not src_vol_id and not img_id:
 
418
            # Create volume from snapshot
 
419
            self.log.debug('Creating cinder volume from snapshot...')
 
420
            snap = cinder.volume_snapshots.find(id=snap_id)
 
421
            vol_size = snap.size
 
422
            snap_vol_id = cinder.volume_snapshots.get(snap_id).volume_id
 
423
            bootable = cinder.volumes.get(snap_vol_id).bootable
 
424
        elif not img_id and not src_vol_id and not snap_id:
 
425
            # Create volume
 
426
            self.log.debug('Creating cinder volume...')
 
427
            bootable = 'false'
 
428
        else:
 
429
            # Impossible combination of parameters
 
430
            msg = ('Invalid method use - name:{} size:{} img_id:{} '
 
431
                   'src_vol_id:{} snap_id:{}'.format(vol_name, vol_size,
 
432
                                                     img_id, src_vol_id,
 
433
                                                     snap_id))
 
434
            amulet.raise_status(amulet.FAIL, msg=msg)
 
435
 
 
436
        # Create new volume
 
437
        try:
 
438
            vol_new = cinder.volumes.create(display_name=vol_name,
 
439
                                            imageRef=img_id,
 
440
                                            size=vol_size,
 
441
                                            source_volid=src_vol_id,
 
442
                                            snapshot_id=snap_id)
 
443
            vol_id = vol_new.id
 
444
        except Exception as e:
 
445
            msg = 'Failed to create volume: {}'.format(e)
 
446
            amulet.raise_status(amulet.FAIL, msg=msg)
 
447
 
 
448
        # Wait for volume to reach available status
 
449
        ret = self.resource_reaches_status(cinder.volumes, vol_id,
 
450
                                           expected_stat="available",
 
451
                                           msg="Volume status wait")
 
452
        if not ret:
 
453
            msg = 'Cinder volume failed to reach expected state.'
 
454
            amulet.raise_status(amulet.FAIL, msg=msg)
 
455
 
 
456
        # Re-validate new volume
 
457
        self.log.debug('Validating volume attributes...')
 
458
        val_vol_name = cinder.volumes.get(vol_id).display_name
 
459
        val_vol_boot = cinder.volumes.get(vol_id).bootable
 
460
        val_vol_stat = cinder.volumes.get(vol_id).status
 
461
        val_vol_size = cinder.volumes.get(vol_id).size
 
462
        msg_attr = ('Volume attributes - name:{} id:{} stat:{} boot:'
 
463
                    '{} size:{}'.format(val_vol_name, vol_id,
 
464
                                        val_vol_stat, val_vol_boot,
 
465
                                        val_vol_size))
 
466
 
 
467
        if val_vol_boot == bootable and val_vol_stat == 'available' \
 
468
                and val_vol_name == vol_name and val_vol_size == vol_size:
 
469
            self.log.debug(msg_attr)
 
470
        else:
 
471
            msg = ('Volume validation failed, {}'.format(msg_attr))
 
472
            amulet.raise_status(amulet.FAIL, msg=msg)
 
473
 
 
474
        return vol_new
 
475
 
 
476
    def delete_resource(self, resource, resource_id,
 
477
                        msg="resource", max_wait=120):
 
478
        """Delete one openstack resource, such as one instance, keypair,
 
479
        image, volume, stack, etc., and confirm deletion within max wait time.
 
480
 
 
481
        :param resource: pointer to os resource type, ex:glance_client.images
 
482
        :param resource_id: unique name or id for the openstack resource
 
483
        :param msg: text to identify purpose in logging
 
484
        :param max_wait: maximum wait time in seconds
 
485
        :returns: True if successful, otherwise False
 
486
        """
 
487
        self.log.debug('Deleting OpenStack resource '
 
488
                       '{} ({})'.format(resource_id, msg))
 
489
        num_before = len(list(resource.list()))
 
490
        resource.delete(resource_id)
 
491
 
 
492
        tries = 0
 
493
        num_after = len(list(resource.list()))
 
494
        while num_after != (num_before - 1) and tries < (max_wait / 4):
 
495
            self.log.debug('{} delete check: '
 
496
                           '{} [{}:{}] {}'.format(msg, tries,
 
497
                                                  num_before,
 
498
                                                  num_after,
 
499
                                                  resource_id))
 
500
            time.sleep(4)
 
501
            num_after = len(list(resource.list()))
 
502
            tries += 1
 
503
 
 
504
        self.log.debug('{}:  expected, actual count = {}, '
 
505
                       '{}'.format(msg, num_before - 1, num_after))
 
506
 
 
507
        if num_after == (num_before - 1):
 
508
            return True
 
509
        else:
 
510
            self.log.error('{} delete timed out'.format(msg))
 
511
            return False
 
512
 
 
513
    def resource_reaches_status(self, resource, resource_id,
 
514
                                expected_stat='available',
 
515
                                msg='resource', max_wait=120):
 
516
        """Wait for an openstack resources status to reach an
 
517
           expected status within a specified time.  Useful to confirm that
 
518
           nova instances, cinder vols, snapshots, glance images, heat stacks
 
519
           and other resources eventually reach the expected status.
 
520
 
 
521
        :param resource: pointer to os resource type, ex: heat_client.stacks
 
522
        :param resource_id: unique id for the openstack resource
 
523
        :param expected_stat: status to expect resource to reach
 
524
        :param msg: text to identify purpose in logging
 
525
        :param max_wait: maximum wait time in seconds
 
526
        :returns: True if successful, False if status is not reached
 
527
        """
 
528
 
 
529
        tries = 0
 
530
        resource_stat = resource.get(resource_id).status
 
531
        while resource_stat != expected_stat and tries < (max_wait / 4):
 
532
            self.log.debug('{} status check: '
 
533
                           '{} [{}:{}] {}'.format(msg, tries,
 
534
                                                  resource_stat,
 
535
                                                  expected_stat,
 
536
                                                  resource_id))
 
537
            time.sleep(4)
 
538
            resource_stat = resource.get(resource_id).status
 
539
            tries += 1
 
540
 
 
541
        self.log.debug('{}:  expected, actual status = {}, '
 
542
                       '{}'.format(msg, resource_stat, expected_stat))
 
543
 
 
544
        if resource_stat == expected_stat:
 
545
            return True
 
546
        else:
 
547
            self.log.debug('{} never reached expected status: '
 
548
                           '{}'.format(resource_id, expected_stat))
 
549
            return False
 
550
 
 
551
    def get_ceph_osd_id_cmd(self, index):
 
552
        """Produce a shell command that will return a ceph-osd id."""
 
553
        return ("`initctl list | grep 'ceph-osd ' | "
 
554
                "awk 'NR=={} {{ print $2 }}' | "
 
555
                "grep -o '[0-9]*'`".format(index + 1))
 
556
 
 
557
    def get_ceph_pools(self, sentry_unit):
 
558
        """Return a dict of ceph pools from a single ceph unit, with
 
559
        pool name as keys, pool id as vals."""
 
560
        pools = {}
 
561
        cmd = 'sudo ceph osd lspools'
 
562
        output, code = sentry_unit.run(cmd)
 
563
        if code != 0:
 
564
            msg = ('{} `{}` returned {} '
 
565
                   '{}'.format(sentry_unit.info['unit_name'],
 
566
                               cmd, code, output))
 
567
            amulet.raise_status(amulet.FAIL, msg=msg)
 
568
 
 
569
        # Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
 
570
        for pool in str(output).split(','):
 
571
            pool_id_name = pool.split(' ')
 
572
            if len(pool_id_name) == 2:
 
573
                pool_id = pool_id_name[0]
 
574
                pool_name = pool_id_name[1]
 
575
                pools[pool_name] = int(pool_id)
 
576
 
 
577
        self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
 
578
                                                pools))
 
579
        return pools
 
580
 
 
581
    def get_ceph_df(self, sentry_unit):
 
582
        """Return dict of ceph df json output, including ceph pool state.
 
583
 
 
584
        :param sentry_unit: Pointer to amulet sentry instance (juju unit)
 
585
        :returns: Dict of ceph df output
 
586
        """
 
587
        cmd = 'sudo ceph df --format=json'
 
588
        output, code = sentry_unit.run(cmd)
 
589
        if code != 0:
 
590
            msg = ('{} `{}` returned {} '
 
591
                   '{}'.format(sentry_unit.info['unit_name'],
 
592
                               cmd, code, output))
 
593
            amulet.raise_status(amulet.FAIL, msg=msg)
 
594
        return json.loads(output)
 
595
 
 
596
    def get_ceph_pool_sample(self, sentry_unit, pool_id=0):
 
597
        """Take a sample of attributes of a ceph pool, returning ceph
 
598
        pool name, object count and disk space used for the specified
 
599
        pool ID number.
 
600
 
 
601
        :param sentry_unit: Pointer to amulet sentry instance (juju unit)
 
602
        :param pool_id: Ceph pool ID
 
603
        :returns: List of pool name, object count, kb disk space used
 
604
        """
 
605
        df = self.get_ceph_df(sentry_unit)
 
606
        pool_name = df['pools'][pool_id]['name']
 
607
        obj_count = df['pools'][pool_id]['stats']['objects']
 
608
        kb_used = df['pools'][pool_id]['stats']['kb_used']
 
609
        self.log.debug('Ceph {} pool (ID {}): {} objects, '
 
610
                       '{} kb used'.format(pool_name, pool_id,
 
611
                                           obj_count, kb_used))
 
612
        return pool_name, obj_count, kb_used
 
613
 
 
614
    def validate_ceph_pool_samples(self, samples, sample_type="resource pool"):
 
615
        """Validate ceph pool samples taken over time, such as pool
 
616
        object counts or pool kb used, before adding, after adding, and
 
617
        after deleting items which affect those pool attributes.  The
 
618
        2nd element is expected to be greater than the 1st; 3rd is expected
 
619
        to be less than the 2nd.
 
620
 
 
621
        :param samples: List containing 3 data samples
 
622
        :param sample_type: String for logging and usage context
 
623
        :returns: None if successful, Failure message otherwise
 
624
        """
 
625
        original, created, deleted = range(3)
 
626
        if samples[created] <= samples[original] or \
 
627
                samples[deleted] >= samples[created]:
 
628
            return ('Ceph {} samples ({}) '
 
629
                    'unexpected.'.format(sample_type, samples))
 
630
        else:
 
631
            self.log.debug('Ceph {} samples (OK): '
 
632
                           '{}'.format(sample_type, samples))
 
633
            return None
 
634
 
 
635
    # rabbitmq/amqp specific helpers:
 
636
 
 
637
    def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
 
638
        """Wait for rmq units extended status to show cluster readiness,
 
639
        after an optional initial sleep period.  Initial sleep is likely
 
640
        necessary to be effective following a config change, as status
 
641
        message may not instantly update to non-ready."""
 
642
 
 
643
        if init_sleep:
 
644
            time.sleep(init_sleep)
 
645
 
 
646
        message = re.compile('^Unit is ready and clustered$')
 
647
        deployment._auto_wait_for_status(message=message,
 
648
                                         timeout=timeout,
 
649
                                         include_only=['rabbitmq-server'])
 
650
 
 
651
    def add_rmq_test_user(self, sentry_units,
 
652
                          username="testuser1", password="changeme"):
 
653
        """Add a test user via the first rmq juju unit, check connection as
 
654
        the new user against all sentry units.
 
655
 
 
656
        :param sentry_units: list of sentry unit pointers
 
657
        :param username: amqp user name, default to testuser1
 
658
        :param password: amqp user password
 
659
        :returns: None if successful.  Raise on error.
 
660
        """
 
661
        self.log.debug('Adding rmq user ({})...'.format(username))
 
662
 
 
663
        # Check that user does not already exist
 
664
        cmd_user_list = 'rabbitmqctl list_users'
 
665
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 
666
        if username in output:
 
667
            self.log.warning('User ({}) already exists, returning '
 
668
                             'gracefully.'.format(username))
 
669
            return
 
670
 
 
671
        perms = '".*" ".*" ".*"'
 
672
        cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
 
673
                'rabbitmqctl set_permissions {} {}'.format(username, perms)]
 
674
 
 
675
        # Add user via first unit
 
676
        for cmd in cmds:
 
677
            output, _ = self.run_cmd_unit(sentry_units[0], cmd)
 
678
 
 
679
        # Check connection against the other sentry_units
 
680
        self.log.debug('Checking user connect against units...')
 
681
        for sentry_unit in sentry_units:
 
682
            connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
 
683
                                                   username=username,
 
684
                                                   password=password)
 
685
            connection.close()
 
686
 
 
687
    def delete_rmq_test_user(self, sentry_units, username="testuser1"):
 
688
        """Delete a rabbitmq user via the first rmq juju unit.
 
689
 
 
690
        :param sentry_units: list of sentry unit pointers
 
691
        :param username: amqp user name, default to testuser1
 
692
        :param password: amqp user password
 
693
        :returns: None if successful or no such user.
 
694
        """
 
695
        self.log.debug('Deleting rmq user ({})...'.format(username))
 
696
 
 
697
        # Check that the user exists
 
698
        cmd_user_list = 'rabbitmqctl list_users'
 
699
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 
700
 
 
701
        if username not in output:
 
702
            self.log.warning('User ({}) does not exist, returning '
 
703
                             'gracefully.'.format(username))
 
704
            return
 
705
 
 
706
        # Delete the user
 
707
        cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
 
708
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
 
709
 
 
710
    def get_rmq_cluster_status(self, sentry_unit):
 
711
        """Execute rabbitmq cluster status command on a unit and return
 
712
        the full output.
 
713
 
 
714
        :param unit: sentry unit
 
715
        :returns: String containing console output of cluster status command
 
716
        """
 
717
        cmd = 'rabbitmqctl cluster_status'
 
718
        output, _ = self.run_cmd_unit(sentry_unit, cmd)
 
719
        self.log.debug('{} cluster_status:\n{}'.format(
 
720
            sentry_unit.info['unit_name'], output))
 
721
        return str(output)
 
722
 
 
723
    def get_rmq_cluster_running_nodes(self, sentry_unit):
 
724
        """Parse rabbitmqctl cluster_status output string, return list of
 
725
        running rabbitmq cluster nodes.
 
726
 
 
727
        :param unit: sentry unit
 
728
        :returns: List containing node names of running nodes
 
729
        """
 
730
        # NOTE(beisner): rabbitmqctl cluster_status output is not
 
731
        # json-parsable, do string chop foo, then json.loads that.
 
732
        str_stat = self.get_rmq_cluster_status(sentry_unit)
 
733
        if 'running_nodes' in str_stat:
 
734
            pos_start = str_stat.find("{running_nodes,") + 15
 
735
            pos_end = str_stat.find("]},", pos_start) + 1
 
736
            str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
 
737
            run_nodes = json.loads(str_run_nodes)
 
738
            return run_nodes
 
739
        else:
 
740
            return []
 
741
 
 
742
    def validate_rmq_cluster_running_nodes(self, sentry_units):
 
743
        """Check that all rmq unit hostnames are represented in the
 
744
        cluster_status output of all units.
 
745
 
 
746
        :param host_names: dict of juju unit names to host names
 
747
        :param units: list of sentry unit pointers (all rmq units)
 
748
        :returns: None if successful, otherwise return error message
 
749
        """
 
750
        host_names = self.get_unit_hostnames(sentry_units)
 
751
        errors = []
 
752
 
 
753
        # Query every unit for cluster_status running nodes
 
754
        for query_unit in sentry_units:
 
755
            query_unit_name = query_unit.info['unit_name']
 
756
            running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
 
757
 
 
758
            # Confirm that every unit is represented in the queried unit's
 
759
            # cluster_status running nodes output.
 
760
            for validate_unit in sentry_units:
 
761
                val_host_name = host_names[validate_unit.info['unit_name']]
 
762
                val_node_name = 'rabbit@{}'.format(val_host_name)
 
763
 
 
764
                if val_node_name not in running_nodes:
 
765
                    errors.append('Cluster member check failed on {}: {} not '
 
766
                                  'in {}\n'.format(query_unit_name,
 
767
                                                   val_node_name,
 
768
                                                   running_nodes))
 
769
        if errors:
 
770
            return ''.join(errors)
 
771
 
 
772
    def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
 
773
        """Check a single juju rmq unit for ssl and port in the config file."""
 
774
        host = sentry_unit.info['public-address']
 
775
        unit_name = sentry_unit.info['unit_name']
 
776
 
 
777
        conf_file = '/etc/rabbitmq/rabbitmq.config'
 
778
        conf_contents = str(self.file_contents_safe(sentry_unit,
 
779
                                                    conf_file, max_wait=16))
 
780
        # Checks
 
781
        conf_ssl = 'ssl' in conf_contents
 
782
        conf_port = str(port) in conf_contents
 
783
 
 
784
        # Port explicitly checked in config
 
785
        if port and conf_port and conf_ssl:
 
786
            self.log.debug('SSL is enabled  @{}:{} '
 
787
                           '({})'.format(host, port, unit_name))
 
788
            return True
 
789
        elif port and not conf_port and conf_ssl:
 
790
            self.log.debug('SSL is enabled @{} but not on port {} '
 
791
                           '({})'.format(host, port, unit_name))
 
792
            return False
 
793
        # Port not checked (useful when checking that ssl is disabled)
 
794
        elif not port and conf_ssl:
 
795
            self.log.debug('SSL is enabled  @{}:{} '
 
796
                           '({})'.format(host, port, unit_name))
 
797
            return True
 
798
        elif not conf_ssl:
 
799
            self.log.debug('SSL not enabled @{}:{} '
 
800
                           '({})'.format(host, port, unit_name))
 
801
            return False
 
802
        else:
 
803
            msg = ('Unknown condition when checking SSL status @{}:{} '
 
804
                   '({})'.format(host, port, unit_name))
 
805
            amulet.raise_status(amulet.FAIL, msg)
 
806
 
 
807
    def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
 
808
        """Check that ssl is enabled on rmq juju sentry units.
 
809
 
 
810
        :param sentry_units: list of all rmq sentry units
 
811
        :param port: optional ssl port override to validate
 
812
        :returns: None if successful, otherwise return error message
 
813
        """
 
814
        for sentry_unit in sentry_units:
 
815
            if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
 
816
                return ('Unexpected condition:  ssl is disabled on unit '
 
817
                        '({})'.format(sentry_unit.info['unit_name']))
 
818
        return None
 
819
 
 
820
    def validate_rmq_ssl_disabled_units(self, sentry_units):
 
821
        """Check that ssl is enabled on listed rmq juju sentry units.
 
822
 
 
823
        :param sentry_units: list of all rmq sentry units
 
824
        :returns: True if successful.  Raise on error.
 
825
        """
 
826
        for sentry_unit in sentry_units:
 
827
            if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
 
828
                return ('Unexpected condition:  ssl is enabled on unit '
 
829
                        '({})'.format(sentry_unit.info['unit_name']))
 
830
        return None
 
831
 
 
832
    def configure_rmq_ssl_on(self, sentry_units, deployment,
 
833
                             port=None, max_wait=60):
 
834
        """Turn ssl charm config option on, with optional non-default
 
835
        ssl port specification.  Confirm that it is enabled on every
 
836
        unit.
 
837
 
 
838
        :param sentry_units: list of sentry units
 
839
        :param deployment: amulet deployment object pointer
 
840
        :param port: amqp port, use defaults if None
 
841
        :param max_wait: maximum time to wait in seconds to confirm
 
842
        :returns: None if successful.  Raise on error.
 
843
        """
 
844
        self.log.debug('Setting ssl charm config option:  on')
 
845
 
 
846
        # Enable RMQ SSL
 
847
        config = {'ssl': 'on'}
 
848
        if port:
 
849
            config['ssl_port'] = port
 
850
 
 
851
        deployment.d.configure('rabbitmq-server', config)
 
852
 
 
853
        # Wait for unit status
 
854
        self.rmq_wait_for_cluster(deployment)
 
855
 
 
856
        # Confirm
 
857
        tries = 0
 
858
        ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 
859
        while ret and tries < (max_wait / 4):
 
860
            time.sleep(4)
 
861
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 
862
            ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 
863
            tries += 1
 
864
 
 
865
        if ret:
 
866
            amulet.raise_status(amulet.FAIL, ret)
 
867
 
 
868
    def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
 
869
        """Turn ssl charm config option off, confirm that it is disabled
 
870
        on every unit.
 
871
 
 
872
        :param sentry_units: list of sentry units
 
873
        :param deployment: amulet deployment object pointer
 
874
        :param max_wait: maximum time to wait in seconds to confirm
 
875
        :returns: None if successful.  Raise on error.
 
876
        """
 
877
        self.log.debug('Setting ssl charm config option:  off')
 
878
 
 
879
        # Disable RMQ SSL
 
880
        config = {'ssl': 'off'}
 
881
        deployment.d.configure('rabbitmq-server', config)
 
882
 
 
883
        # Wait for unit status
 
884
        self.rmq_wait_for_cluster(deployment)
 
885
 
 
886
        # Confirm
 
887
        tries = 0
 
888
        ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 
889
        while ret and tries < (max_wait / 4):
 
890
            time.sleep(4)
 
891
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 
892
            ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 
893
            tries += 1
 
894
 
 
895
        if ret:
 
896
            amulet.raise_status(amulet.FAIL, ret)
 
897
 
 
898
    def connect_amqp_by_unit(self, sentry_unit, ssl=False,
 
899
                             port=None, fatal=True,
 
900
                             username="testuser1", password="changeme"):
 
901
        """Establish and return a pika amqp connection to the rabbitmq service
 
902
        running on a rmq juju unit.
 
903
 
 
904
        :param sentry_unit: sentry unit pointer
 
905
        :param ssl: boolean, default to False
 
906
        :param port: amqp port, use defaults if None
 
907
        :param fatal: boolean, default to True (raises on connect error)
 
908
        :param username: amqp user name, default to testuser1
 
909
        :param password: amqp user password
 
910
        :returns: pika amqp connection pointer or None if failed and non-fatal
 
911
        """
 
912
        host = sentry_unit.info['public-address']
 
913
        unit_name = sentry_unit.info['unit_name']
 
914
 
 
915
        # Default port logic if port is not specified
 
916
        if ssl and not port:
 
917
            port = 5671
 
918
        elif not ssl and not port:
 
919
            port = 5672
 
920
 
 
921
        self.log.debug('Connecting to amqp on {}:{} ({}) as '
 
922
                       '{}...'.format(host, port, unit_name, username))
 
923
 
 
924
        try:
 
925
            credentials = pika.PlainCredentials(username, password)
 
926
            parameters = pika.ConnectionParameters(host=host, port=port,
 
927
                                                   credentials=credentials,
 
928
                                                   ssl=ssl,
 
929
                                                   connection_attempts=3,
 
930
                                                   retry_delay=5,
 
931
                                                   socket_timeout=1)
 
932
            connection = pika.BlockingConnection(parameters)
 
933
            assert connection.server_properties['product'] == 'RabbitMQ'
 
934
            self.log.debug('Connect OK')
 
935
            return connection
 
936
        except Exception as e:
 
937
            msg = ('amqp connection failed to {}:{} as '
 
938
                   '{} ({})'.format(host, port, username, str(e)))
 
939
            if fatal:
 
940
                amulet.raise_status(amulet.FAIL, msg)
 
941
            else:
 
942
                self.log.warn(msg)
 
943
                return None
 
944
 
 
945
    def publish_amqp_message_by_unit(self, sentry_unit, message,
 
946
                                     queue="test", ssl=False,
 
947
                                     username="testuser1",
 
948
                                     password="changeme",
 
949
                                     port=None):
 
950
        """Publish an amqp message to a rmq juju unit.
 
951
 
 
952
        :param sentry_unit: sentry unit pointer
 
953
        :param message: amqp message string
 
954
        :param queue: message queue, default to test
 
955
        :param username: amqp user name, default to testuser1
 
956
        :param password: amqp user password
 
957
        :param ssl: boolean, default to False
 
958
        :param port: amqp port, use defaults if None
 
959
        :returns: None.  Raises exception if publish failed.
 
960
        """
 
961
        self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
 
962
                                                                    message))
 
963
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 
964
                                               port=port,
 
965
                                               username=username,
 
966
                                               password=password)
 
967
 
 
968
        # NOTE(beisner): extra debug here re: pika hang potential:
 
969
        #   https://github.com/pika/pika/issues/297
 
970
        #   https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
 
971
        self.log.debug('Defining channel...')
 
972
        channel = connection.channel()
 
973
        self.log.debug('Declaring queue...')
 
974
        channel.queue_declare(queue=queue, auto_delete=False, durable=True)
 
975
        self.log.debug('Publishing message...')
 
976
        channel.basic_publish(exchange='', routing_key=queue, body=message)
 
977
        self.log.debug('Closing channel...')
 
978
        channel.close()
 
979
        self.log.debug('Closing connection...')
 
980
        connection.close()
 
981
 
 
982
    def get_amqp_message_by_unit(self, sentry_unit, queue="test",
 
983
                                 username="testuser1",
 
984
                                 password="changeme",
 
985
                                 ssl=False, port=None):
 
986
        """Get an amqp message from a rmq juju unit.
 
987
 
 
988
        :param sentry_unit: sentry unit pointer
 
989
        :param queue: message queue, default to test
 
990
        :param username: amqp user name, default to testuser1
 
991
        :param password: amqp user password
 
992
        :param ssl: boolean, default to False
 
993
        :param port: amqp port, use defaults if None
 
994
        :returns: amqp message body as string.  Raise if get fails.
 
995
        """
 
996
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 
997
                                               port=port,
 
998
                                               username=username,
 
999
                                               password=password)
 
1000
        channel = connection.channel()
 
1001
        method_frame, _, body = channel.basic_get(queue)
 
1002
 
 
1003
        if method_frame:
 
1004
            self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
 
1005
                                                                         body))
 
1006
            channel.basic_ack(method_frame.delivery_tag)
 
1007
            channel.close()
 
1008
            connection.close()
 
1009
            return body
 
1010
        else:
 
1011
            msg = 'No message retrieved.'
 
1012
            amulet.raise_status(amulet.FAIL, msg)