~gnuoy/charms/trusty/percona-cluster/1454317

« back to all changes in this revision

Viewing changes to tests/charmhelpers/contrib/openstack/amulet/utils.py

  • Committer: Liam Young
  • Date: 2015-09-30 08:06:12 UTC
  • mfrom: (60.2.16 percona-cluster)
  • Revision ID: liam.young@canonical.com-20150930080612-37dvdelm9iykj3as
Merged next in

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# You should have received a copy of the GNU Lesser General Public License
15
15
# along with charm-helpers.  If not, see <http://www.gnu.org/licenses/>.
16
16
 
 
17
import amulet
 
18
import json
17
19
import logging
18
20
import os
 
21
import six
19
22
import time
20
23
import urllib
21
24
 
 
25
import cinderclient.v1.client as cinder_client
22
26
import glanceclient.v1.client as glance_client
 
27
import heatclient.v1.client as heat_client
23
28
import keystoneclient.v2_0 as keystone_client
24
29
import novaclient.v1_1.client as nova_client
25
 
 
26
 
import six
 
30
import pika
 
31
import swiftclient
27
32
 
28
33
from charmhelpers.contrib.amulet.utils import (
29
34
    AmuletUtils
37
42
    """OpenStack amulet utilities.
38
43
 
39
44
       This class inherits from AmuletUtils and has additional support
40
 
       that is specifically for use by OpenStack charms.
 
45
       that is specifically for use by OpenStack charm tests.
41
46
       """
42
47
 
43
48
    def __init__(self, log_level=ERROR):
51
56
           Validate actual endpoint data vs expected endpoint data. The ports
52
57
           are used to find the matching endpoint.
53
58
           """
 
59
        self.log.debug('Validating endpoint data...')
 
60
        self.log.debug('actual: {}'.format(repr(endpoints)))
54
61
        found = False
55
62
        for ep in endpoints:
56
63
            self.log.debug('endpoint: {}'.format(repr(ep)))
77
84
           Validate a list of actual service catalog endpoints vs a list of
78
85
           expected service catalog endpoints.
79
86
           """
 
87
        self.log.debug('Validating service catalog endpoint data...')
80
88
        self.log.debug('actual: {}'.format(repr(actual)))
81
89
        for k, v in six.iteritems(expected):
82
90
            if k in actual:
93
101
           Validate a list of actual tenant data vs list of expected tenant
94
102
           data.
95
103
           """
 
104
        self.log.debug('Validating tenant data...')
96
105
        self.log.debug('actual: {}'.format(repr(actual)))
97
106
        for e in expected:
98
107
            found = False
114
123
           Validate a list of actual role data vs a list of expected role
115
124
           data.
116
125
           """
 
126
        self.log.debug('Validating role data...')
117
127
        self.log.debug('actual: {}'.format(repr(actual)))
118
128
        for e in expected:
119
129
            found = False
134
144
           Validate a list of actual user data vs a list of expected user
135
145
           data.
136
146
           """
 
147
        self.log.debug('Validating user data...')
137
148
        self.log.debug('actual: {}'.format(repr(actual)))
138
149
        for e in expected:
139
150
            found = False
155
166
 
156
167
           Validate a list of actual flavors vs a list of expected flavors.
157
168
           """
 
169
        self.log.debug('Validating flavor data...')
158
170
        self.log.debug('actual: {}'.format(repr(actual)))
159
171
        act = [a.name for a in actual]
160
172
        return self._validate_list_data(expected, act)
161
173
 
162
174
    def tenant_exists(self, keystone, tenant):
163
175
        """Return True if tenant exists."""
 
176
        self.log.debug('Checking if tenant exists ({})...'.format(tenant))
164
177
        return tenant in [t.name for t in keystone.tenants.list()]
165
178
 
 
179
    def authenticate_cinder_admin(self, keystone_sentry, username,
 
180
                                  password, tenant):
 
181
        """Authenticates admin user with cinder."""
 
182
        # NOTE(beisner): cinder python client doesn't accept tokens.
 
183
        service_ip = \
 
184
            keystone_sentry.relation('shared-db',
 
185
                                     'mysql:shared-db')['private-address']
 
186
        ept = "http://{}:5000/v2.0".format(service_ip.strip().decode('utf-8'))
 
187
        return cinder_client.Client(username, password, tenant, ept)
 
188
 
166
189
    def authenticate_keystone_admin(self, keystone_sentry, user, password,
167
190
                                    tenant):
168
191
        """Authenticates admin user with the keystone admin endpoint."""
 
192
        self.log.debug('Authenticating keystone admin...')
169
193
        unit = keystone_sentry
170
194
        service_ip = unit.relation('shared-db',
171
195
                                   'mysql:shared-db')['private-address']
175
199
 
176
200
    def authenticate_keystone_user(self, keystone, user, password, tenant):
177
201
        """Authenticates a regular user with the keystone public endpoint."""
 
202
        self.log.debug('Authenticating keystone user ({})...'.format(user))
178
203
        ep = keystone.service_catalog.url_for(service_type='identity',
179
204
                                              endpoint_type='publicURL')
180
205
        return keystone_client.Client(username=user, password=password,
182
207
 
183
208
    def authenticate_glance_admin(self, keystone):
184
209
        """Authenticates admin user with glance."""
 
210
        self.log.debug('Authenticating glance admin...')
185
211
        ep = keystone.service_catalog.url_for(service_type='image',
186
212
                                              endpoint_type='adminURL')
187
213
        return glance_client.Client(ep, token=keystone.auth_token)
188
214
 
 
215
    def authenticate_heat_admin(self, keystone):
 
216
        """Authenticates the admin user with heat."""
 
217
        self.log.debug('Authenticating heat admin...')
 
218
        ep = keystone.service_catalog.url_for(service_type='orchestration',
 
219
                                              endpoint_type='publicURL')
 
220
        return heat_client.Client(endpoint=ep, token=keystone.auth_token)
 
221
 
189
222
    def authenticate_nova_user(self, keystone, user, password, tenant):
190
223
        """Authenticates a regular user with nova-api."""
 
224
        self.log.debug('Authenticating nova user ({})...'.format(user))
191
225
        ep = keystone.service_catalog.url_for(service_type='identity',
192
226
                                              endpoint_type='publicURL')
193
227
        return nova_client.Client(username=user, api_key=password,
194
228
                                  project_id=tenant, auth_url=ep)
195
229
 
 
230
    def authenticate_swift_user(self, keystone, user, password, tenant):
 
231
        """Authenticates a regular user with swift api."""
 
232
        self.log.debug('Authenticating swift user ({})...'.format(user))
 
233
        ep = keystone.service_catalog.url_for(service_type='identity',
 
234
                                              endpoint_type='publicURL')
 
235
        return swiftclient.Connection(authurl=ep,
 
236
                                      user=user,
 
237
                                      key=password,
 
238
                                      tenant_name=tenant,
 
239
                                      auth_version='2.0')
 
240
 
196
241
    def create_cirros_image(self, glance, image_name):
197
 
        """Download the latest cirros image and upload it to glance."""
 
242
        """Download the latest cirros image and upload it to glance,
 
243
        validate and return a resource pointer.
 
244
 
 
245
        :param glance: pointer to authenticated glance connection
 
246
        :param image_name: display name for new image
 
247
        :returns: glance image pointer
 
248
        """
 
249
        self.log.debug('Creating glance cirros image '
 
250
                       '({})...'.format(image_name))
 
251
 
 
252
        # Download cirros image
198
253
        http_proxy = os.getenv('AMULET_HTTP_PROXY')
199
254
        self.log.debug('AMULET_HTTP_PROXY: {}'.format(http_proxy))
200
255
        if http_proxy:
203
258
        else:
204
259
            opener = urllib.FancyURLopener()
205
260
 
206
 
        f = opener.open("http://download.cirros-cloud.net/version/released")
 
261
        f = opener.open('http://download.cirros-cloud.net/version/released')
207
262
        version = f.read().strip()
208
 
        cirros_img = "cirros-{}-x86_64-disk.img".format(version)
 
263
        cirros_img = 'cirros-{}-x86_64-disk.img'.format(version)
209
264
        local_path = os.path.join('tests', cirros_img)
210
265
 
211
266
        if not os.path.exists(local_path):
212
 
            cirros_url = "http://{}/{}/{}".format("download.cirros-cloud.net",
 
267
            cirros_url = 'http://{}/{}/{}'.format('download.cirros-cloud.net',
213
268
                                                  version, cirros_img)
214
269
            opener.retrieve(cirros_url, local_path)
215
270
        f.close()
216
271
 
 
272
        # Create glance image
217
273
        with open(local_path) as f:
218
274
            image = glance.images.create(name=image_name, is_public=True,
219
275
                                         disk_format='qcow2',
220
276
                                         container_format='bare', data=f)
221
 
        count = 1
222
 
        status = image.status
223
 
        while status != 'active' and count < 10:
224
 
            time.sleep(3)
225
 
            image = glance.images.get(image.id)
226
 
            status = image.status
227
 
            self.log.debug('image status: {}'.format(status))
228
 
            count += 1
229
 
 
230
 
        if status != 'active':
231
 
            self.log.error('image creation timed out')
232
 
            return None
 
277
 
 
278
        # Wait for image to reach active status
 
279
        img_id = image.id
 
280
        ret = self.resource_reaches_status(glance.images, img_id,
 
281
                                           expected_stat='active',
 
282
                                           msg='Image status wait')
 
283
        if not ret:
 
284
            msg = 'Glance image failed to reach expected state.'
 
285
            amulet.raise_status(amulet.FAIL, msg=msg)
 
286
 
 
287
        # Re-validate new image
 
288
        self.log.debug('Validating image attributes...')
 
289
        val_img_name = glance.images.get(img_id).name
 
290
        val_img_stat = glance.images.get(img_id).status
 
291
        val_img_pub = glance.images.get(img_id).is_public
 
292
        val_img_cfmt = glance.images.get(img_id).container_format
 
293
        val_img_dfmt = glance.images.get(img_id).disk_format
 
294
        msg_attr = ('Image attributes - name:{} public:{} id:{} stat:{} '
 
295
                    'container fmt:{} disk fmt:{}'.format(
 
296
                        val_img_name, val_img_pub, img_id,
 
297
                        val_img_stat, val_img_cfmt, val_img_dfmt))
 
298
 
 
299
        if val_img_name == image_name and val_img_stat == 'active' \
 
300
                and val_img_pub is True and val_img_cfmt == 'bare' \
 
301
                and val_img_dfmt == 'qcow2':
 
302
            self.log.debug(msg_attr)
 
303
        else:
 
304
            msg = ('Volume validation failed, {}'.format(msg_attr))
 
305
            amulet.raise_status(amulet.FAIL, msg=msg)
233
306
 
234
307
        return image
235
308
 
236
309
    def delete_image(self, glance, image):
237
310
        """Delete the specified image."""
238
 
        num_before = len(list(glance.images.list()))
239
 
        glance.images.delete(image)
240
 
 
241
 
        count = 1
242
 
        num_after = len(list(glance.images.list()))
243
 
        while num_after != (num_before - 1) and count < 10:
244
 
            time.sleep(3)
245
 
            num_after = len(list(glance.images.list()))
246
 
            self.log.debug('number of images: {}'.format(num_after))
247
 
            count += 1
248
 
 
249
 
        if num_after != (num_before - 1):
250
 
            self.log.error('image deletion timed out')
251
 
            return False
252
 
 
253
 
        return True
 
311
 
 
312
        # /!\ DEPRECATION WARNING
 
313
        self.log.warn('/!\\ DEPRECATION WARNING:  use '
 
314
                      'delete_resource instead of delete_image.')
 
315
        self.log.debug('Deleting glance image ({})...'.format(image))
 
316
        return self.delete_resource(glance.images, image, msg='glance image')
254
317
 
255
318
    def create_instance(self, nova, image_name, instance_name, flavor):
256
319
        """Create the specified instance."""
 
320
        self.log.debug('Creating instance '
 
321
                       '({}|{}|{})'.format(instance_name, image_name, flavor))
257
322
        image = nova.images.find(name=image_name)
258
323
        flavor = nova.flavors.find(name=flavor)
259
324
        instance = nova.servers.create(name=instance_name, image=image,
276
341
 
277
342
    def delete_instance(self, nova, instance):
278
343
        """Delete the specified instance."""
279
 
        num_before = len(list(nova.servers.list()))
280
 
        nova.servers.delete(instance)
281
 
 
282
 
        count = 1
283
 
        num_after = len(list(nova.servers.list()))
284
 
        while num_after != (num_before - 1) and count < 10:
285
 
            time.sleep(3)
286
 
            num_after = len(list(nova.servers.list()))
287
 
            self.log.debug('number of instances: {}'.format(num_after))
288
 
            count += 1
289
 
 
290
 
        if num_after != (num_before - 1):
291
 
            self.log.error('instance deletion timed out')
292
 
            return False
293
 
 
294
 
        return True
 
344
 
 
345
        # /!\ DEPRECATION WARNING
 
346
        self.log.warn('/!\\ DEPRECATION WARNING:  use '
 
347
                      'delete_resource instead of delete_instance.')
 
348
        self.log.debug('Deleting instance ({})...'.format(instance))
 
349
        return self.delete_resource(nova.servers, instance,
 
350
                                    msg='nova instance')
 
351
 
 
352
    def create_or_get_keypair(self, nova, keypair_name="testkey"):
 
353
        """Create a new keypair, or return pointer if it already exists."""
 
354
        try:
 
355
            _keypair = nova.keypairs.get(keypair_name)
 
356
            self.log.debug('Keypair ({}) already exists, '
 
357
                           'using it.'.format(keypair_name))
 
358
            return _keypair
 
359
        except:
 
360
            self.log.debug('Keypair ({}) does not exist, '
 
361
                           'creating it.'.format(keypair_name))
 
362
 
 
363
        _keypair = nova.keypairs.create(name=keypair_name)
 
364
        return _keypair
 
365
 
 
366
    def create_cinder_volume(self, cinder, vol_name="demo-vol", vol_size=1,
 
367
                             img_id=None, src_vol_id=None, snap_id=None):
 
368
        """Create cinder volume, optionally from a glance image, OR
 
369
        optionally as a clone of an existing volume, OR optionally
 
370
        from a snapshot.  Wait for the new volume status to reach
 
371
        the expected status, validate and return a resource pointer.
 
372
 
 
373
        :param vol_name: cinder volume display name
 
374
        :param vol_size: size in gigabytes
 
375
        :param img_id: optional glance image id
 
376
        :param src_vol_id: optional source volume id to clone
 
377
        :param snap_id: optional snapshot id to use
 
378
        :returns: cinder volume pointer
 
379
        """
 
380
        # Handle parameter input and avoid impossible combinations
 
381
        if img_id and not src_vol_id and not snap_id:
 
382
            # Create volume from image
 
383
            self.log.debug('Creating cinder volume from glance image...')
 
384
            bootable = 'true'
 
385
        elif src_vol_id and not img_id and not snap_id:
 
386
            # Clone an existing volume
 
387
            self.log.debug('Cloning cinder volume...')
 
388
            bootable = cinder.volumes.get(src_vol_id).bootable
 
389
        elif snap_id and not src_vol_id and not img_id:
 
390
            # Create volume from snapshot
 
391
            self.log.debug('Creating cinder volume from snapshot...')
 
392
            snap = cinder.volume_snapshots.find(id=snap_id)
 
393
            vol_size = snap.size
 
394
            snap_vol_id = cinder.volume_snapshots.get(snap_id).volume_id
 
395
            bootable = cinder.volumes.get(snap_vol_id).bootable
 
396
        elif not img_id and not src_vol_id and not snap_id:
 
397
            # Create volume
 
398
            self.log.debug('Creating cinder volume...')
 
399
            bootable = 'false'
 
400
        else:
 
401
            # Impossible combination of parameters
 
402
            msg = ('Invalid method use - name:{} size:{} img_id:{} '
 
403
                   'src_vol_id:{} snap_id:{}'.format(vol_name, vol_size,
 
404
                                                     img_id, src_vol_id,
 
405
                                                     snap_id))
 
406
            amulet.raise_status(amulet.FAIL, msg=msg)
 
407
 
 
408
        # Create new volume
 
409
        try:
 
410
            vol_new = cinder.volumes.create(display_name=vol_name,
 
411
                                            imageRef=img_id,
 
412
                                            size=vol_size,
 
413
                                            source_volid=src_vol_id,
 
414
                                            snapshot_id=snap_id)
 
415
            vol_id = vol_new.id
 
416
        except Exception as e:
 
417
            msg = 'Failed to create volume: {}'.format(e)
 
418
            amulet.raise_status(amulet.FAIL, msg=msg)
 
419
 
 
420
        # Wait for volume to reach available status
 
421
        ret = self.resource_reaches_status(cinder.volumes, vol_id,
 
422
                                           expected_stat="available",
 
423
                                           msg="Volume status wait")
 
424
        if not ret:
 
425
            msg = 'Cinder volume failed to reach expected state.'
 
426
            amulet.raise_status(amulet.FAIL, msg=msg)
 
427
 
 
428
        # Re-validate new volume
 
429
        self.log.debug('Validating volume attributes...')
 
430
        val_vol_name = cinder.volumes.get(vol_id).display_name
 
431
        val_vol_boot = cinder.volumes.get(vol_id).bootable
 
432
        val_vol_stat = cinder.volumes.get(vol_id).status
 
433
        val_vol_size = cinder.volumes.get(vol_id).size
 
434
        msg_attr = ('Volume attributes - name:{} id:{} stat:{} boot:'
 
435
                    '{} size:{}'.format(val_vol_name, vol_id,
 
436
                                        val_vol_stat, val_vol_boot,
 
437
                                        val_vol_size))
 
438
 
 
439
        if val_vol_boot == bootable and val_vol_stat == 'available' \
 
440
                and val_vol_name == vol_name and val_vol_size == vol_size:
 
441
            self.log.debug(msg_attr)
 
442
        else:
 
443
            msg = ('Volume validation failed, {}'.format(msg_attr))
 
444
            amulet.raise_status(amulet.FAIL, msg=msg)
 
445
 
 
446
        return vol_new
 
447
 
 
448
    def delete_resource(self, resource, resource_id,
 
449
                        msg="resource", max_wait=120):
 
450
        """Delete one openstack resource, such as one instance, keypair,
 
451
        image, volume, stack, etc., and confirm deletion within max wait time.
 
452
 
 
453
        :param resource: pointer to os resource type, ex:glance_client.images
 
454
        :param resource_id: unique name or id for the openstack resource
 
455
        :param msg: text to identify purpose in logging
 
456
        :param max_wait: maximum wait time in seconds
 
457
        :returns: True if successful, otherwise False
 
458
        """
 
459
        self.log.debug('Deleting OpenStack resource '
 
460
                       '{} ({})'.format(resource_id, msg))
 
461
        num_before = len(list(resource.list()))
 
462
        resource.delete(resource_id)
 
463
 
 
464
        tries = 0
 
465
        num_after = len(list(resource.list()))
 
466
        while num_after != (num_before - 1) and tries < (max_wait / 4):
 
467
            self.log.debug('{} delete check: '
 
468
                           '{} [{}:{}] {}'.format(msg, tries,
 
469
                                                  num_before,
 
470
                                                  num_after,
 
471
                                                  resource_id))
 
472
            time.sleep(4)
 
473
            num_after = len(list(resource.list()))
 
474
            tries += 1
 
475
 
 
476
        self.log.debug('{}:  expected, actual count = {}, '
 
477
                       '{}'.format(msg, num_before - 1, num_after))
 
478
 
 
479
        if num_after == (num_before - 1):
 
480
            return True
 
481
        else:
 
482
            self.log.error('{} delete timed out'.format(msg))
 
483
            return False
 
484
 
 
485
    def resource_reaches_status(self, resource, resource_id,
 
486
                                expected_stat='available',
 
487
                                msg='resource', max_wait=120):
 
488
        """Wait for an openstack resources status to reach an
 
489
           expected status within a specified time.  Useful to confirm that
 
490
           nova instances, cinder vols, snapshots, glance images, heat stacks
 
491
           and other resources eventually reach the expected status.
 
492
 
 
493
        :param resource: pointer to os resource type, ex: heat_client.stacks
 
494
        :param resource_id: unique id for the openstack resource
 
495
        :param expected_stat: status to expect resource to reach
 
496
        :param msg: text to identify purpose in logging
 
497
        :param max_wait: maximum wait time in seconds
 
498
        :returns: True if successful, False if status is not reached
 
499
        """
 
500
 
 
501
        tries = 0
 
502
        resource_stat = resource.get(resource_id).status
 
503
        while resource_stat != expected_stat and tries < (max_wait / 4):
 
504
            self.log.debug('{} status check: '
 
505
                           '{} [{}:{}] {}'.format(msg, tries,
 
506
                                                  resource_stat,
 
507
                                                  expected_stat,
 
508
                                                  resource_id))
 
509
            time.sleep(4)
 
510
            resource_stat = resource.get(resource_id).status
 
511
            tries += 1
 
512
 
 
513
        self.log.debug('{}:  expected, actual status = {}, '
 
514
                       '{}'.format(msg, resource_stat, expected_stat))
 
515
 
 
516
        if resource_stat == expected_stat:
 
517
            return True
 
518
        else:
 
519
            self.log.debug('{} never reached expected status: '
 
520
                           '{}'.format(resource_id, expected_stat))
 
521
            return False
 
522
 
 
523
    def get_ceph_osd_id_cmd(self, index):
 
524
        """Produce a shell command that will return a ceph-osd id."""
 
525
        return ("`initctl list | grep 'ceph-osd ' | "
 
526
                "awk 'NR=={} {{ print $2 }}' | "
 
527
                "grep -o '[0-9]*'`".format(index + 1))
 
528
 
 
529
    def get_ceph_pools(self, sentry_unit):
 
530
        """Return a dict of ceph pools from a single ceph unit, with
 
531
        pool name as keys, pool id as vals."""
 
532
        pools = {}
 
533
        cmd = 'sudo ceph osd lspools'
 
534
        output, code = sentry_unit.run(cmd)
 
535
        if code != 0:
 
536
            msg = ('{} `{}` returned {} '
 
537
                   '{}'.format(sentry_unit.info['unit_name'],
 
538
                               cmd, code, output))
 
539
            amulet.raise_status(amulet.FAIL, msg=msg)
 
540
 
 
541
        # Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
 
542
        for pool in str(output).split(','):
 
543
            pool_id_name = pool.split(' ')
 
544
            if len(pool_id_name) == 2:
 
545
                pool_id = pool_id_name[0]
 
546
                pool_name = pool_id_name[1]
 
547
                pools[pool_name] = int(pool_id)
 
548
 
 
549
        self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
 
550
                                                pools))
 
551
        return pools
 
552
 
 
553
    def get_ceph_df(self, sentry_unit):
 
554
        """Return dict of ceph df json output, including ceph pool state.
 
555
 
 
556
        :param sentry_unit: Pointer to amulet sentry instance (juju unit)
 
557
        :returns: Dict of ceph df output
 
558
        """
 
559
        cmd = 'sudo ceph df --format=json'
 
560
        output, code = sentry_unit.run(cmd)
 
561
        if code != 0:
 
562
            msg = ('{} `{}` returned {} '
 
563
                   '{}'.format(sentry_unit.info['unit_name'],
 
564
                               cmd, code, output))
 
565
            amulet.raise_status(amulet.FAIL, msg=msg)
 
566
        return json.loads(output)
 
567
 
 
568
    def get_ceph_pool_sample(self, sentry_unit, pool_id=0):
 
569
        """Take a sample of attributes of a ceph pool, returning ceph
 
570
        pool name, object count and disk space used for the specified
 
571
        pool ID number.
 
572
 
 
573
        :param sentry_unit: Pointer to amulet sentry instance (juju unit)
 
574
        :param pool_id: Ceph pool ID
 
575
        :returns: List of pool name, object count, kb disk space used
 
576
        """
 
577
        df = self.get_ceph_df(sentry_unit)
 
578
        pool_name = df['pools'][pool_id]['name']
 
579
        obj_count = df['pools'][pool_id]['stats']['objects']
 
580
        kb_used = df['pools'][pool_id]['stats']['kb_used']
 
581
        self.log.debug('Ceph {} pool (ID {}): {} objects, '
 
582
                       '{} kb used'.format(pool_name, pool_id,
 
583
                                           obj_count, kb_used))
 
584
        return pool_name, obj_count, kb_used
 
585
 
 
586
    def validate_ceph_pool_samples(self, samples, sample_type="resource pool"):
 
587
        """Validate ceph pool samples taken over time, such as pool
 
588
        object counts or pool kb used, before adding, after adding, and
 
589
        after deleting items which affect those pool attributes.  The
 
590
        2nd element is expected to be greater than the 1st; 3rd is expected
 
591
        to be less than the 2nd.
 
592
 
 
593
        :param samples: List containing 3 data samples
 
594
        :param sample_type: String for logging and usage context
 
595
        :returns: None if successful, Failure message otherwise
 
596
        """
 
597
        original, created, deleted = range(3)
 
598
        if samples[created] <= samples[original] or \
 
599
                samples[deleted] >= samples[created]:
 
600
            return ('Ceph {} samples ({}) '
 
601
                    'unexpected.'.format(sample_type, samples))
 
602
        else:
 
603
            self.log.debug('Ceph {} samples (OK): '
 
604
                           '{}'.format(sample_type, samples))
 
605
            return None
 
606
 
 
607
# rabbitmq/amqp specific helpers:
 
608
    def add_rmq_test_user(self, sentry_units,
 
609
                          username="testuser1", password="changeme"):
 
610
        """Add a test user via the first rmq juju unit, check connection as
 
611
        the new user against all sentry units.
 
612
 
 
613
        :param sentry_units: list of sentry unit pointers
 
614
        :param username: amqp user name, default to testuser1
 
615
        :param password: amqp user password
 
616
        :returns: None if successful.  Raise on error.
 
617
        """
 
618
        self.log.debug('Adding rmq user ({})...'.format(username))
 
619
 
 
620
        # Check that user does not already exist
 
621
        cmd_user_list = 'rabbitmqctl list_users'
 
622
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 
623
        if username in output:
 
624
            self.log.warning('User ({}) already exists, returning '
 
625
                             'gracefully.'.format(username))
 
626
            return
 
627
 
 
628
        perms = '".*" ".*" ".*"'
 
629
        cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
 
630
                'rabbitmqctl set_permissions {} {}'.format(username, perms)]
 
631
 
 
632
        # Add user via first unit
 
633
        for cmd in cmds:
 
634
            output, _ = self.run_cmd_unit(sentry_units[0], cmd)
 
635
 
 
636
        # Check connection against the other sentry_units
 
637
        self.log.debug('Checking user connect against units...')
 
638
        for sentry_unit in sentry_units:
 
639
            connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
 
640
                                                   username=username,
 
641
                                                   password=password)
 
642
            connection.close()
 
643
 
 
644
    def delete_rmq_test_user(self, sentry_units, username="testuser1"):
 
645
        """Delete a rabbitmq user via the first rmq juju unit.
 
646
 
 
647
        :param sentry_units: list of sentry unit pointers
 
648
        :param username: amqp user name, default to testuser1
 
649
        :param password: amqp user password
 
650
        :returns: None if successful or no such user.
 
651
        """
 
652
        self.log.debug('Deleting rmq user ({})...'.format(username))
 
653
 
 
654
        # Check that the user exists
 
655
        cmd_user_list = 'rabbitmqctl list_users'
 
656
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 
657
 
 
658
        if username not in output:
 
659
            self.log.warning('User ({}) does not exist, returning '
 
660
                             'gracefully.'.format(username))
 
661
            return
 
662
 
 
663
        # Delete the user
 
664
        cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
 
665
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
 
666
 
 
667
    def get_rmq_cluster_status(self, sentry_unit):
 
668
        """Execute rabbitmq cluster status command on a unit and return
 
669
        the full output.
 
670
 
 
671
        :param unit: sentry unit
 
672
        :returns: String containing console output of cluster status command
 
673
        """
 
674
        cmd = 'rabbitmqctl cluster_status'
 
675
        output, _ = self.run_cmd_unit(sentry_unit, cmd)
 
676
        self.log.debug('{} cluster_status:\n{}'.format(
 
677
            sentry_unit.info['unit_name'], output))
 
678
        return str(output)
 
679
 
 
680
    def get_rmq_cluster_running_nodes(self, sentry_unit):
 
681
        """Parse rabbitmqctl cluster_status output string, return list of
 
682
        running rabbitmq cluster nodes.
 
683
 
 
684
        :param unit: sentry unit
 
685
        :returns: List containing node names of running nodes
 
686
        """
 
687
        # NOTE(beisner): rabbitmqctl cluster_status output is not
 
688
        # json-parsable, do string chop foo, then json.loads that.
 
689
        str_stat = self.get_rmq_cluster_status(sentry_unit)
 
690
        if 'running_nodes' in str_stat:
 
691
            pos_start = str_stat.find("{running_nodes,") + 15
 
692
            pos_end = str_stat.find("]},", pos_start) + 1
 
693
            str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
 
694
            run_nodes = json.loads(str_run_nodes)
 
695
            return run_nodes
 
696
        else:
 
697
            return []
 
698
 
 
699
    def validate_rmq_cluster_running_nodes(self, sentry_units):
 
700
        """Check that all rmq unit hostnames are represented in the
 
701
        cluster_status output of all units.
 
702
 
 
703
        :param host_names: dict of juju unit names to host names
 
704
        :param units: list of sentry unit pointers (all rmq units)
 
705
        :returns: None if successful, otherwise return error message
 
706
        """
 
707
        host_names = self.get_unit_hostnames(sentry_units)
 
708
        errors = []
 
709
 
 
710
        # Query every unit for cluster_status running nodes
 
711
        for query_unit in sentry_units:
 
712
            query_unit_name = query_unit.info['unit_name']
 
713
            running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
 
714
 
 
715
            # Confirm that every unit is represented in the queried unit's
 
716
            # cluster_status running nodes output.
 
717
            for validate_unit in sentry_units:
 
718
                val_host_name = host_names[validate_unit.info['unit_name']]
 
719
                val_node_name = 'rabbit@{}'.format(val_host_name)
 
720
 
 
721
                if val_node_name not in running_nodes:
 
722
                    errors.append('Cluster member check failed on {}: {} not '
 
723
                                  'in {}\n'.format(query_unit_name,
 
724
                                                   val_node_name,
 
725
                                                   running_nodes))
 
726
        if errors:
 
727
            return ''.join(errors)
 
728
 
 
729
    def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
 
730
        """Check a single juju rmq unit for ssl and port in the config file."""
 
731
        host = sentry_unit.info['public-address']
 
732
        unit_name = sentry_unit.info['unit_name']
 
733
 
 
734
        conf_file = '/etc/rabbitmq/rabbitmq.config'
 
735
        conf_contents = str(self.file_contents_safe(sentry_unit,
 
736
                                                    conf_file, max_wait=16))
 
737
        # Checks
 
738
        conf_ssl = 'ssl' in conf_contents
 
739
        conf_port = str(port) in conf_contents
 
740
 
 
741
        # Port explicitly checked in config
 
742
        if port and conf_port and conf_ssl:
 
743
            self.log.debug('SSL is enabled  @{}:{} '
 
744
                           '({})'.format(host, port, unit_name))
 
745
            return True
 
746
        elif port and not conf_port and conf_ssl:
 
747
            self.log.debug('SSL is enabled @{} but not on port {} '
 
748
                           '({})'.format(host, port, unit_name))
 
749
            return False
 
750
        # Port not checked (useful when checking that ssl is disabled)
 
751
        elif not port and conf_ssl:
 
752
            self.log.debug('SSL is enabled  @{}:{} '
 
753
                           '({})'.format(host, port, unit_name))
 
754
            return True
 
755
        elif not conf_ssl:
 
756
            self.log.debug('SSL not enabled @{}:{} '
 
757
                           '({})'.format(host, port, unit_name))
 
758
            return False
 
759
        else:
 
760
            msg = ('Unknown condition when checking SSL status @{}:{} '
 
761
                   '({})'.format(host, port, unit_name))
 
762
            amulet.raise_status(amulet.FAIL, msg)
 
763
 
 
764
    def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
 
765
        """Check that ssl is enabled on rmq juju sentry units.
 
766
 
 
767
        :param sentry_units: list of all rmq sentry units
 
768
        :param port: optional ssl port override to validate
 
769
        :returns: None if successful, otherwise return error message
 
770
        """
 
771
        for sentry_unit in sentry_units:
 
772
            if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
 
773
                return ('Unexpected condition:  ssl is disabled on unit '
 
774
                        '({})'.format(sentry_unit.info['unit_name']))
 
775
        return None
 
776
 
 
777
    def validate_rmq_ssl_disabled_units(self, sentry_units):
 
778
        """Check that ssl is enabled on listed rmq juju sentry units.
 
779
 
 
780
        :param sentry_units: list of all rmq sentry units
 
781
        :returns: True if successful.  Raise on error.
 
782
        """
 
783
        for sentry_unit in sentry_units:
 
784
            if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
 
785
                return ('Unexpected condition:  ssl is enabled on unit '
 
786
                        '({})'.format(sentry_unit.info['unit_name']))
 
787
        return None
 
788
 
 
789
    def configure_rmq_ssl_on(self, sentry_units, deployment,
 
790
                             port=None, max_wait=60):
 
791
        """Turn ssl charm config option on, with optional non-default
 
792
        ssl port specification.  Confirm that it is enabled on every
 
793
        unit.
 
794
 
 
795
        :param sentry_units: list of sentry units
 
796
        :param deployment: amulet deployment object pointer
 
797
        :param port: amqp port, use defaults if None
 
798
        :param max_wait: maximum time to wait in seconds to confirm
 
799
        :returns: None if successful.  Raise on error.
 
800
        """
 
801
        self.log.debug('Setting ssl charm config option:  on')
 
802
 
 
803
        # Enable RMQ SSL
 
804
        config = {'ssl': 'on'}
 
805
        if port:
 
806
            config['ssl_port'] = port
 
807
 
 
808
        deployment.configure('rabbitmq-server', config)
 
809
 
 
810
        # Confirm
 
811
        tries = 0
 
812
        ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 
813
        while ret and tries < (max_wait / 4):
 
814
            time.sleep(4)
 
815
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 
816
            ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 
817
            tries += 1
 
818
 
 
819
        if ret:
 
820
            amulet.raise_status(amulet.FAIL, ret)
 
821
 
 
822
    def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
 
823
        """Turn ssl charm config option off, confirm that it is disabled
 
824
        on every unit.
 
825
 
 
826
        :param sentry_units: list of sentry units
 
827
        :param deployment: amulet deployment object pointer
 
828
        :param max_wait: maximum time to wait in seconds to confirm
 
829
        :returns: None if successful.  Raise on error.
 
830
        """
 
831
        self.log.debug('Setting ssl charm config option:  off')
 
832
 
 
833
        # Disable RMQ SSL
 
834
        config = {'ssl': 'off'}
 
835
        deployment.configure('rabbitmq-server', config)
 
836
 
 
837
        # Confirm
 
838
        tries = 0
 
839
        ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 
840
        while ret and tries < (max_wait / 4):
 
841
            time.sleep(4)
 
842
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 
843
            ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 
844
            tries += 1
 
845
 
 
846
        if ret:
 
847
            amulet.raise_status(amulet.FAIL, ret)
 
848
 
 
849
    def connect_amqp_by_unit(self, sentry_unit, ssl=False,
 
850
                             port=None, fatal=True,
 
851
                             username="testuser1", password="changeme"):
 
852
        """Establish and return a pika amqp connection to the rabbitmq service
 
853
        running on a rmq juju unit.
 
854
 
 
855
        :param sentry_unit: sentry unit pointer
 
856
        :param ssl: boolean, default to False
 
857
        :param port: amqp port, use defaults if None
 
858
        :param fatal: boolean, default to True (raises on connect error)
 
859
        :param username: amqp user name, default to testuser1
 
860
        :param password: amqp user password
 
861
        :returns: pika amqp connection pointer or None if failed and non-fatal
 
862
        """
 
863
        host = sentry_unit.info['public-address']
 
864
        unit_name = sentry_unit.info['unit_name']
 
865
 
 
866
        # Default port logic if port is not specified
 
867
        if ssl and not port:
 
868
            port = 5671
 
869
        elif not ssl and not port:
 
870
            port = 5672
 
871
 
 
872
        self.log.debug('Connecting to amqp on {}:{} ({}) as '
 
873
                       '{}...'.format(host, port, unit_name, username))
 
874
 
 
875
        try:
 
876
            credentials = pika.PlainCredentials(username, password)
 
877
            parameters = pika.ConnectionParameters(host=host, port=port,
 
878
                                                   credentials=credentials,
 
879
                                                   ssl=ssl,
 
880
                                                   connection_attempts=3,
 
881
                                                   retry_delay=5,
 
882
                                                   socket_timeout=1)
 
883
            connection = pika.BlockingConnection(parameters)
 
884
            assert connection.server_properties['product'] == 'RabbitMQ'
 
885
            self.log.debug('Connect OK')
 
886
            return connection
 
887
        except Exception as e:
 
888
            msg = ('amqp connection failed to {}:{} as '
 
889
                   '{} ({})'.format(host, port, username, str(e)))
 
890
            if fatal:
 
891
                amulet.raise_status(amulet.FAIL, msg)
 
892
            else:
 
893
                self.log.warn(msg)
 
894
                return None
 
895
 
 
896
    def publish_amqp_message_by_unit(self, sentry_unit, message,
 
897
                                     queue="test", ssl=False,
 
898
                                     username="testuser1",
 
899
                                     password="changeme",
 
900
                                     port=None):
 
901
        """Publish an amqp message to a rmq juju unit.
 
902
 
 
903
        :param sentry_unit: sentry unit pointer
 
904
        :param message: amqp message string
 
905
        :param queue: message queue, default to test
 
906
        :param username: amqp user name, default to testuser1
 
907
        :param password: amqp user password
 
908
        :param ssl: boolean, default to False
 
909
        :param port: amqp port, use defaults if None
 
910
        :returns: None.  Raises exception if publish failed.
 
911
        """
 
912
        self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
 
913
                                                                    message))
 
914
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 
915
                                               port=port,
 
916
                                               username=username,
 
917
                                               password=password)
 
918
 
 
919
        # NOTE(beisner): extra debug here re: pika hang potential:
 
920
        #   https://github.com/pika/pika/issues/297
 
921
        #   https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
 
922
        self.log.debug('Defining channel...')
 
923
        channel = connection.channel()
 
924
        self.log.debug('Declaring queue...')
 
925
        channel.queue_declare(queue=queue, auto_delete=False, durable=True)
 
926
        self.log.debug('Publishing message...')
 
927
        channel.basic_publish(exchange='', routing_key=queue, body=message)
 
928
        self.log.debug('Closing channel...')
 
929
        channel.close()
 
930
        self.log.debug('Closing connection...')
 
931
        connection.close()
 
932
 
 
933
    def get_amqp_message_by_unit(self, sentry_unit, queue="test",
 
934
                                 username="testuser1",
 
935
                                 password="changeme",
 
936
                                 ssl=False, port=None):
 
937
        """Get an amqp message from a rmq juju unit.
 
938
 
 
939
        :param sentry_unit: sentry unit pointer
 
940
        :param queue: message queue, default to test
 
941
        :param username: amqp user name, default to testuser1
 
942
        :param password: amqp user password
 
943
        :param ssl: boolean, default to False
 
944
        :param port: amqp port, use defaults if None
 
945
        :returns: amqp message body as string.  Raise if get fails.
 
946
        """
 
947
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 
948
                                               port=port,
 
949
                                               username=username,
 
950
                                               password=password)
 
951
        channel = connection.channel()
 
952
        method_frame, _, body = channel.basic_get(queue)
 
953
 
 
954
        if method_frame:
 
955
            self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
 
956
                                                                         body))
 
957
            channel.basic_ack(method_frame.delivery_tag)
 
958
            channel.close()
 
959
            connection.close()
 
960
            return body
 
961
        else:
 
962
            msg = 'No message retrieved.'
 
963
            amulet.raise_status(amulet.FAIL, msg)