1
# Copyright 2014-2015 Canonical Limited.
3
# This file is part of charm-helpers.
5
# charm-helpers is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Lesser General Public License version 3 as
7
# published by the Free Software Foundation.
9
# charm-helpers is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU Lesser General Public License for more details.
14
# You should have received a copy of the GNU Lesser General Public License
15
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
26
import cinderclient.v1.client as cinder_client
27
import glanceclient.v1.client as glance_client
28
import heatclient.v1.client as heat_client
29
import keystoneclient.v2_0 as keystone_client
30
import novaclient.v1_1.client as nova_client
34
from charmhelpers.contrib.amulet.utils import (
42
class OpenStackAmuletUtils(AmuletUtils):
43
"""OpenStack amulet utilities.
45
This class inherits from AmuletUtils and has additional support
46
that is specifically for use by OpenStack charm tests.
49
def __init__(self, log_level=ERROR):
50
"""Initialize the deployment environment."""
51
super(OpenStackAmuletUtils, self).__init__(log_level)
53
def validate_endpoint_data(self, endpoints, admin_port, internal_port,
54
public_port, expected):
55
"""Validate endpoint data.
57
Validate actual endpoint data vs expected endpoint data. The ports
58
are used to find the matching endpoint.
60
self.log.debug('Validating endpoint data...')
61
self.log.debug('actual: {}'.format(repr(endpoints)))
64
self.log.debug('endpoint: {}'.format(repr(ep)))
65
if (admin_port in ep.adminurl and
66
internal_port in ep.internalurl and
67
public_port in ep.publicurl):
69
actual = {'id': ep.id,
71
'adminurl': ep.adminurl,
72
'internalurl': ep.internalurl,
73
'publicurl': ep.publicurl,
74
'service_id': ep.service_id}
75
ret = self._validate_dict_data(expected, actual)
77
return 'unexpected endpoint data - {}'.format(ret)
80
return 'endpoint not found'
82
def validate_svc_catalog_endpoint_data(self, expected, actual):
83
"""Validate service catalog endpoint data.
85
Validate a list of actual service catalog endpoints vs a list of
86
expected service catalog endpoints.
88
self.log.debug('Validating service catalog endpoint data...')
89
self.log.debug('actual: {}'.format(repr(actual)))
90
for k, v in six.iteritems(expected):
92
ret = self._validate_dict_data(expected[k][0], actual[k][0])
94
return self.endpoint_error(k, ret)
96
return "endpoint {} does not exist".format(k)
99
def validate_tenant_data(self, expected, actual):
100
"""Validate tenant data.
102
Validate a list of actual tenant data vs list of expected tenant
105
self.log.debug('Validating tenant data...')
106
self.log.debug('actual: {}'.format(repr(actual)))
110
a = {'enabled': act.enabled, 'description': act.description,
111
'name': act.name, 'id': act.id}
112
if e['name'] == a['name']:
114
ret = self._validate_dict_data(e, a)
116
return "unexpected tenant data - {}".format(ret)
118
return "tenant {} does not exist".format(e['name'])
121
def validate_role_data(self, expected, actual):
122
"""Validate role data.
124
Validate a list of actual role data vs a list of expected role
127
self.log.debug('Validating role data...')
128
self.log.debug('actual: {}'.format(repr(actual)))
132
a = {'name': act.name, 'id': act.id}
133
if e['name'] == a['name']:
135
ret = self._validate_dict_data(e, a)
137
return "unexpected role data - {}".format(ret)
139
return "role {} does not exist".format(e['name'])
142
def validate_user_data(self, expected, actual):
143
"""Validate user data.
145
Validate a list of actual user data vs a list of expected user
148
self.log.debug('Validating user data...')
149
self.log.debug('actual: {}'.format(repr(actual)))
153
a = {'enabled': act.enabled, 'name': act.name,
154
'email': act.email, 'tenantId': act.tenantId,
156
if e['name'] == a['name']:
158
ret = self._validate_dict_data(e, a)
160
return "unexpected user data - {}".format(ret)
162
return "user {} does not exist".format(e['name'])
165
def validate_flavor_data(self, expected, actual):
166
"""Validate flavor data.
168
Validate a list of actual flavors vs a list of expected flavors.
170
self.log.debug('Validating flavor data...')
171
self.log.debug('actual: {}'.format(repr(actual)))
172
act = [a.name for a in actual]
173
return self._validate_list_data(expected, act)
175
def tenant_exists(self, keystone, tenant):
176
"""Return True if tenant exists."""
177
self.log.debug('Checking if tenant exists ({})...'.format(tenant))
178
return tenant in [t.name for t in keystone.tenants.list()]
180
def authenticate_cinder_admin(self, keystone_sentry, username,
182
"""Authenticates admin user with cinder."""
183
# NOTE(beisner): cinder python client doesn't accept tokens.
185
keystone_sentry.relation('shared-db',
186
'mysql:shared-db')['private-address']
187
ept = "http://{}:5000/v2.0".format(service_ip.strip().decode('utf-8'))
188
return cinder_client.Client(username, password, tenant, ept)
190
def authenticate_keystone_admin(self, keystone_sentry, user, password,
192
"""Authenticates admin user with the keystone admin endpoint."""
193
self.log.debug('Authenticating keystone admin...')
194
unit = keystone_sentry
195
service_ip = unit.relation('shared-db',
196
'mysql:shared-db')['private-address']
197
ep = "http://{}:35357/v2.0".format(service_ip.strip().decode('utf-8'))
198
return keystone_client.Client(username=user, password=password,
199
tenant_name=tenant, auth_url=ep)
201
def authenticate_keystone_user(self, keystone, user, password, tenant):
202
"""Authenticates a regular user with the keystone public endpoint."""
203
self.log.debug('Authenticating keystone user ({})...'.format(user))
204
ep = keystone.service_catalog.url_for(service_type='identity',
205
endpoint_type='publicURL')
206
return keystone_client.Client(username=user, password=password,
207
tenant_name=tenant, auth_url=ep)
209
def authenticate_glance_admin(self, keystone):
210
"""Authenticates admin user with glance."""
211
self.log.debug('Authenticating glance admin...')
212
ep = keystone.service_catalog.url_for(service_type='image',
213
endpoint_type='adminURL')
214
return glance_client.Client(ep, token=keystone.auth_token)
216
def authenticate_heat_admin(self, keystone):
217
"""Authenticates the admin user with heat."""
218
self.log.debug('Authenticating heat admin...')
219
ep = keystone.service_catalog.url_for(service_type='orchestration',
220
endpoint_type='publicURL')
221
return heat_client.Client(endpoint=ep, token=keystone.auth_token)
223
def authenticate_nova_user(self, keystone, user, password, tenant):
224
"""Authenticates a regular user with nova-api."""
225
self.log.debug('Authenticating nova user ({})...'.format(user))
226
ep = keystone.service_catalog.url_for(service_type='identity',
227
endpoint_type='publicURL')
228
return nova_client.Client(username=user, api_key=password,
229
project_id=tenant, auth_url=ep)
231
def authenticate_swift_user(self, keystone, user, password, tenant):
232
"""Authenticates a regular user with swift api."""
233
self.log.debug('Authenticating swift user ({})...'.format(user))
234
ep = keystone.service_catalog.url_for(service_type='identity',
235
endpoint_type='publicURL')
236
return swiftclient.Connection(authurl=ep,
242
def create_cirros_image(self, glance, image_name):
243
"""Download the latest cirros image and upload it to glance,
244
validate and return a resource pointer.
246
:param glance: pointer to authenticated glance connection
247
:param image_name: display name for new image
248
:returns: glance image pointer
250
self.log.debug('Creating glance cirros image '
251
'({})...'.format(image_name))
253
# Download cirros image
254
http_proxy = os.getenv('AMULET_HTTP_PROXY')
255
self.log.debug('AMULET_HTTP_PROXY: {}'.format(http_proxy))
257
proxies = {'http': http_proxy}
258
opener = urllib.FancyURLopener(proxies)
260
opener = urllib.FancyURLopener()
262
f = opener.open('http://download.cirros-cloud.net/version/released')
263
version = f.read().strip()
264
cirros_img = 'cirros-{}-x86_64-disk.img'.format(version)
265
local_path = os.path.join('tests', cirros_img)
267
if not os.path.exists(local_path):
268
cirros_url = 'http://{}/{}/{}'.format('download.cirros-cloud.net',
270
opener.retrieve(cirros_url, local_path)
273
# Create glance image
274
with open(local_path) as f:
275
image = glance.images.create(name=image_name, is_public=True,
277
container_format='bare', data=f)
279
# Wait for image to reach active status
281
ret = self.resource_reaches_status(glance.images, img_id,
282
expected_stat='active',
283
msg='Image status wait')
285
msg = 'Glance image failed to reach expected state.'
286
amulet.raise_status(amulet.FAIL, msg=msg)
288
# Re-validate new image
289
self.log.debug('Validating image attributes...')
290
val_img_name = glance.images.get(img_id).name
291
val_img_stat = glance.images.get(img_id).status
292
val_img_pub = glance.images.get(img_id).is_public
293
val_img_cfmt = glance.images.get(img_id).container_format
294
val_img_dfmt = glance.images.get(img_id).disk_format
295
msg_attr = ('Image attributes - name:{} public:{} id:{} stat:{} '
296
'container fmt:{} disk fmt:{}'.format(
297
val_img_name, val_img_pub, img_id,
298
val_img_stat, val_img_cfmt, val_img_dfmt))
300
if val_img_name == image_name and val_img_stat == 'active' \
301
and val_img_pub is True and val_img_cfmt == 'bare' \
302
and val_img_dfmt == 'qcow2':
303
self.log.debug(msg_attr)
305
msg = ('Volume validation failed, {}'.format(msg_attr))
306
amulet.raise_status(amulet.FAIL, msg=msg)
310
def delete_image(self, glance, image):
311
"""Delete the specified image."""
313
# /!\ DEPRECATION WARNING
314
self.log.warn('/!\\ DEPRECATION WARNING: use '
315
'delete_resource instead of delete_image.')
316
self.log.debug('Deleting glance image ({})...'.format(image))
317
return self.delete_resource(glance.images, image, msg='glance image')
319
def create_instance(self, nova, image_name, instance_name, flavor):
320
"""Create the specified instance."""
321
self.log.debug('Creating instance '
322
'({}|{}|{})'.format(instance_name, image_name, flavor))
323
image = nova.images.find(name=image_name)
324
flavor = nova.flavors.find(name=flavor)
325
instance = nova.servers.create(name=instance_name, image=image,
329
status = instance.status
330
while status != 'ACTIVE' and count < 60:
332
instance = nova.servers.get(instance.id)
333
status = instance.status
334
self.log.debug('instance status: {}'.format(status))
337
if status != 'ACTIVE':
338
self.log.error('instance creation timed out')
343
def delete_instance(self, nova, instance):
344
"""Delete the specified instance."""
346
# /!\ DEPRECATION WARNING
347
self.log.warn('/!\\ DEPRECATION WARNING: use '
348
'delete_resource instead of delete_instance.')
349
self.log.debug('Deleting instance ({})...'.format(instance))
350
return self.delete_resource(nova.servers, instance,
353
def create_or_get_keypair(self, nova, keypair_name="testkey"):
354
"""Create a new keypair, or return pointer if it already exists."""
356
_keypair = nova.keypairs.get(keypair_name)
357
self.log.debug('Keypair ({}) already exists, '
358
'using it.'.format(keypair_name))
361
self.log.debug('Keypair ({}) does not exist, '
362
'creating it.'.format(keypair_name))
364
_keypair = nova.keypairs.create(name=keypair_name)
367
def create_cinder_volume(self, cinder, vol_name="demo-vol", vol_size=1,
368
img_id=None, src_vol_id=None, snap_id=None):
369
"""Create cinder volume, optionally from a glance image, OR
370
optionally as a clone of an existing volume, OR optionally
371
from a snapshot. Wait for the new volume status to reach
372
the expected status, validate and return a resource pointer.
374
:param vol_name: cinder volume display name
375
:param vol_size: size in gigabytes
376
:param img_id: optional glance image id
377
:param src_vol_id: optional source volume id to clone
378
:param snap_id: optional snapshot id to use
379
:returns: cinder volume pointer
381
# Handle parameter input and avoid impossible combinations
382
if img_id and not src_vol_id and not snap_id:
383
# Create volume from image
384
self.log.debug('Creating cinder volume from glance image...')
386
elif src_vol_id and not img_id and not snap_id:
387
# Clone an existing volume
388
self.log.debug('Cloning cinder volume...')
389
bootable = cinder.volumes.get(src_vol_id).bootable
390
elif snap_id and not src_vol_id and not img_id:
391
# Create volume from snapshot
392
self.log.debug('Creating cinder volume from snapshot...')
393
snap = cinder.volume_snapshots.find(id=snap_id)
395
snap_vol_id = cinder.volume_snapshots.get(snap_id).volume_id
396
bootable = cinder.volumes.get(snap_vol_id).bootable
397
elif not img_id and not src_vol_id and not snap_id:
399
self.log.debug('Creating cinder volume...')
402
# Impossible combination of parameters
403
msg = ('Invalid method use - name:{} size:{} img_id:{} '
404
'src_vol_id:{} snap_id:{}'.format(vol_name, vol_size,
407
amulet.raise_status(amulet.FAIL, msg=msg)
411
vol_new = cinder.volumes.create(display_name=vol_name,
414
source_volid=src_vol_id,
417
except Exception as e:
418
msg = 'Failed to create volume: {}'.format(e)
419
amulet.raise_status(amulet.FAIL, msg=msg)
421
# Wait for volume to reach available status
422
ret = self.resource_reaches_status(cinder.volumes, vol_id,
423
expected_stat="available",
424
msg="Volume status wait")
426
msg = 'Cinder volume failed to reach expected state.'
427
amulet.raise_status(amulet.FAIL, msg=msg)
429
# Re-validate new volume
430
self.log.debug('Validating volume attributes...')
431
val_vol_name = cinder.volumes.get(vol_id).display_name
432
val_vol_boot = cinder.volumes.get(vol_id).bootable
433
val_vol_stat = cinder.volumes.get(vol_id).status
434
val_vol_size = cinder.volumes.get(vol_id).size
435
msg_attr = ('Volume attributes - name:{} id:{} stat:{} boot:'
436
'{} size:{}'.format(val_vol_name, vol_id,
437
val_vol_stat, val_vol_boot,
440
if val_vol_boot == bootable and val_vol_stat == 'available' \
441
and val_vol_name == vol_name and val_vol_size == vol_size:
442
self.log.debug(msg_attr)
444
msg = ('Volume validation failed, {}'.format(msg_attr))
445
amulet.raise_status(amulet.FAIL, msg=msg)
449
def delete_resource(self, resource, resource_id,
450
msg="resource", max_wait=120):
451
"""Delete one openstack resource, such as one instance, keypair,
452
image, volume, stack, etc., and confirm deletion within max wait time.
454
:param resource: pointer to os resource type, ex:glance_client.images
455
:param resource_id: unique name or id for the openstack resource
456
:param msg: text to identify purpose in logging
457
:param max_wait: maximum wait time in seconds
458
:returns: True if successful, otherwise False
460
self.log.debug('Deleting OpenStack resource '
461
'{} ({})'.format(resource_id, msg))
462
num_before = len(list(resource.list()))
463
resource.delete(resource_id)
466
num_after = len(list(resource.list()))
467
while num_after != (num_before - 1) and tries < (max_wait / 4):
468
self.log.debug('{} delete check: '
469
'{} [{}:{}] {}'.format(msg, tries,
474
num_after = len(list(resource.list()))
477
self.log.debug('{}: expected, actual count = {}, '
478
'{}'.format(msg, num_before - 1, num_after))
480
if num_after == (num_before - 1):
483
self.log.error('{} delete timed out'.format(msg))
486
def resource_reaches_status(self, resource, resource_id,
487
expected_stat='available',
488
msg='resource', max_wait=120):
489
"""Wait for an openstack resources status to reach an
490
expected status within a specified time. Useful to confirm that
491
nova instances, cinder vols, snapshots, glance images, heat stacks
492
and other resources eventually reach the expected status.
494
:param resource: pointer to os resource type, ex: heat_client.stacks
495
:param resource_id: unique id for the openstack resource
496
:param expected_stat: status to expect resource to reach
497
:param msg: text to identify purpose in logging
498
:param max_wait: maximum wait time in seconds
499
:returns: True if successful, False if status is not reached
503
resource_stat = resource.get(resource_id).status
504
while resource_stat != expected_stat and tries < (max_wait / 4):
505
self.log.debug('{} status check: '
506
'{} [{}:{}] {}'.format(msg, tries,
511
resource_stat = resource.get(resource_id).status
514
self.log.debug('{}: expected, actual status = {}, '
515
'{}'.format(msg, resource_stat, expected_stat))
517
if resource_stat == expected_stat:
520
self.log.debug('{} never reached expected status: '
521
'{}'.format(resource_id, expected_stat))
524
def get_ceph_osd_id_cmd(self, index):
525
"""Produce a shell command that will return a ceph-osd id."""
526
return ("`initctl list | grep 'ceph-osd ' | "
527
"awk 'NR=={} {{ print $2 }}' | "
528
"grep -o '[0-9]*'`".format(index + 1))
530
def get_ceph_pools(self, sentry_unit):
531
"""Return a dict of ceph pools from a single ceph unit, with
532
pool name as keys, pool id as vals."""
534
cmd = 'sudo ceph osd lspools'
535
output, code = sentry_unit.run(cmd)
537
msg = ('{} `{}` returned {} '
538
'{}'.format(sentry_unit.info['unit_name'],
540
amulet.raise_status(amulet.FAIL, msg=msg)
542
# Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
543
for pool in str(output).split(','):
544
pool_id_name = pool.split(' ')
545
if len(pool_id_name) == 2:
546
pool_id = pool_id_name[0]
547
pool_name = pool_id_name[1]
548
pools[pool_name] = int(pool_id)
550
self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
554
def get_ceph_df(self, sentry_unit):
555
"""Return dict of ceph df json output, including ceph pool state.
557
:param sentry_unit: Pointer to amulet sentry instance (juju unit)
558
:returns: Dict of ceph df output
560
cmd = 'sudo ceph df --format=json'
561
output, code = sentry_unit.run(cmd)
563
msg = ('{} `{}` returned {} '
564
'{}'.format(sentry_unit.info['unit_name'],
566
amulet.raise_status(amulet.FAIL, msg=msg)
567
return json.loads(output)
569
def get_ceph_pool_sample(self, sentry_unit, pool_id=0):
570
"""Take a sample of attributes of a ceph pool, returning ceph
571
pool name, object count and disk space used for the specified
574
:param sentry_unit: Pointer to amulet sentry instance (juju unit)
575
:param pool_id: Ceph pool ID
576
:returns: List of pool name, object count, kb disk space used
578
df = self.get_ceph_df(sentry_unit)
579
pool_name = df['pools'][pool_id]['name']
580
obj_count = df['pools'][pool_id]['stats']['objects']
581
kb_used = df['pools'][pool_id]['stats']['kb_used']
582
self.log.debug('Ceph {} pool (ID {}): {} objects, '
583
'{} kb used'.format(pool_name, pool_id,
585
return pool_name, obj_count, kb_used
587
def validate_ceph_pool_samples(self, samples, sample_type="resource pool"):
588
"""Validate ceph pool samples taken over time, such as pool
589
object counts or pool kb used, before adding, after adding, and
590
after deleting items which affect those pool attributes. The
591
2nd element is expected to be greater than the 1st; 3rd is expected
592
to be less than the 2nd.
594
:param samples: List containing 3 data samples
595
:param sample_type: String for logging and usage context
596
:returns: None if successful, Failure message otherwise
598
original, created, deleted = range(3)
599
if samples[created] <= samples[original] or \
600
samples[deleted] >= samples[created]:
601
return ('Ceph {} samples ({}) '
602
'unexpected.'.format(sample_type, samples))
604
self.log.debug('Ceph {} samples (OK): '
605
'{}'.format(sample_type, samples))
608
# rabbitmq/amqp specific helpers:
610
def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
611
"""Wait for rmq units extended status to show cluster readiness,
612
after an optional initial sleep period. Initial sleep is likely
613
necessary to be effective following a config change, as status
614
message may not instantly update to non-ready."""
617
time.sleep(init_sleep)
619
message = re.compile('^Unit is ready and clustered$')
620
deployment._auto_wait_for_status(message=message,
622
include_only=['rabbitmq-server'])
624
def add_rmq_test_user(self, sentry_units,
625
username="testuser1", password="changeme"):
626
"""Add a test user via the first rmq juju unit, check connection as
627
the new user against all sentry units.
629
:param sentry_units: list of sentry unit pointers
630
:param username: amqp user name, default to testuser1
631
:param password: amqp user password
632
:returns: None if successful. Raise on error.
634
self.log.debug('Adding rmq user ({})...'.format(username))
636
# Check that user does not already exist
637
cmd_user_list = 'rabbitmqctl list_users'
638
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
639
if username in output:
640
self.log.warning('User ({}) already exists, returning '
641
'gracefully.'.format(username))
644
perms = '".*" ".*" ".*"'
645
cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
646
'rabbitmqctl set_permissions {} {}'.format(username, perms)]
648
# Add user via first unit
650
output, _ = self.run_cmd_unit(sentry_units[0], cmd)
652
# Check connection against the other sentry_units
653
self.log.debug('Checking user connect against units...')
654
for sentry_unit in sentry_units:
655
connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
660
def delete_rmq_test_user(self, sentry_units, username="testuser1"):
661
"""Delete a rabbitmq user via the first rmq juju unit.
663
:param sentry_units: list of sentry unit pointers
664
:param username: amqp user name, default to testuser1
665
:param password: amqp user password
666
:returns: None if successful or no such user.
668
self.log.debug('Deleting rmq user ({})...'.format(username))
670
# Check that the user exists
671
cmd_user_list = 'rabbitmqctl list_users'
672
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
674
if username not in output:
675
self.log.warning('User ({}) does not exist, returning '
676
'gracefully.'.format(username))
680
cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
681
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
683
def get_rmq_cluster_status(self, sentry_unit):
684
"""Execute rabbitmq cluster status command on a unit and return
687
:param unit: sentry unit
688
:returns: String containing console output of cluster status command
690
cmd = 'rabbitmqctl cluster_status'
691
output, _ = self.run_cmd_unit(sentry_unit, cmd)
692
self.log.debug('{} cluster_status:\n{}'.format(
693
sentry_unit.info['unit_name'], output))
696
def get_rmq_cluster_running_nodes(self, sentry_unit):
697
"""Parse rabbitmqctl cluster_status output string, return list of
698
running rabbitmq cluster nodes.
700
:param unit: sentry unit
701
:returns: List containing node names of running nodes
703
# NOTE(beisner): rabbitmqctl cluster_status output is not
704
# json-parsable, do string chop foo, then json.loads that.
705
str_stat = self.get_rmq_cluster_status(sentry_unit)
706
if 'running_nodes' in str_stat:
707
pos_start = str_stat.find("{running_nodes,") + 15
708
pos_end = str_stat.find("]},", pos_start) + 1
709
str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
710
run_nodes = json.loads(str_run_nodes)
715
def validate_rmq_cluster_running_nodes(self, sentry_units):
716
"""Check that all rmq unit hostnames are represented in the
717
cluster_status output of all units.
719
:param host_names: dict of juju unit names to host names
720
:param units: list of sentry unit pointers (all rmq units)
721
:returns: None if successful, otherwise return error message
723
host_names = self.get_unit_hostnames(sentry_units)
726
# Query every unit for cluster_status running nodes
727
for query_unit in sentry_units:
728
query_unit_name = query_unit.info['unit_name']
729
running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
731
# Confirm that every unit is represented in the queried unit's
732
# cluster_status running nodes output.
733
for validate_unit in sentry_units:
734
val_host_name = host_names[validate_unit.info['unit_name']]
735
val_node_name = 'rabbit@{}'.format(val_host_name)
737
if val_node_name not in running_nodes:
738
errors.append('Cluster member check failed on {}: {} not '
739
'in {}\n'.format(query_unit_name,
743
return ''.join(errors)
745
def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
746
"""Check a single juju rmq unit for ssl and port in the config file."""
747
host = sentry_unit.info['public-address']
748
unit_name = sentry_unit.info['unit_name']
750
conf_file = '/etc/rabbitmq/rabbitmq.config'
751
conf_contents = str(self.file_contents_safe(sentry_unit,
752
conf_file, max_wait=16))
754
conf_ssl = 'ssl' in conf_contents
755
conf_port = str(port) in conf_contents
757
# Port explicitly checked in config
758
if port and conf_port and conf_ssl:
759
self.log.debug('SSL is enabled @{}:{} '
760
'({})'.format(host, port, unit_name))
762
elif port and not conf_port and conf_ssl:
763
self.log.debug('SSL is enabled @{} but not on port {} '
764
'({})'.format(host, port, unit_name))
766
# Port not checked (useful when checking that ssl is disabled)
767
elif not port and conf_ssl:
768
self.log.debug('SSL is enabled @{}:{} '
769
'({})'.format(host, port, unit_name))
772
self.log.debug('SSL not enabled @{}:{} '
773
'({})'.format(host, port, unit_name))
776
msg = ('Unknown condition when checking SSL status @{}:{} '
777
'({})'.format(host, port, unit_name))
778
amulet.raise_status(amulet.FAIL, msg)
780
def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
781
"""Check that ssl is enabled on rmq juju sentry units.
783
:param sentry_units: list of all rmq sentry units
784
:param port: optional ssl port override to validate
785
:returns: None if successful, otherwise return error message
787
for sentry_unit in sentry_units:
788
if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
789
return ('Unexpected condition: ssl is disabled on unit '
790
'({})'.format(sentry_unit.info['unit_name']))
793
def validate_rmq_ssl_disabled_units(self, sentry_units):
794
"""Check that ssl is enabled on listed rmq juju sentry units.
796
:param sentry_units: list of all rmq sentry units
797
:returns: True if successful. Raise on error.
799
for sentry_unit in sentry_units:
800
if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
801
return ('Unexpected condition: ssl is enabled on unit '
802
'({})'.format(sentry_unit.info['unit_name']))
805
def configure_rmq_ssl_on(self, sentry_units, deployment,
806
port=None, max_wait=60):
807
"""Turn ssl charm config option on, with optional non-default
808
ssl port specification. Confirm that it is enabled on every
811
:param sentry_units: list of sentry units
812
:param deployment: amulet deployment object pointer
813
:param port: amqp port, use defaults if None
814
:param max_wait: maximum time to wait in seconds to confirm
815
:returns: None if successful. Raise on error.
817
self.log.debug('Setting ssl charm config option: on')
820
config = {'ssl': 'on'}
822
config['ssl_port'] = port
824
deployment.d.configure('rabbitmq-server', config)
826
# Wait for unit status
827
self.rmq_wait_for_cluster(deployment)
831
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
832
while ret and tries < (max_wait / 4):
834
self.log.debug('Attempt {}: {}'.format(tries, ret))
835
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
839
amulet.raise_status(amulet.FAIL, ret)
841
def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
842
"""Turn ssl charm config option off, confirm that it is disabled
845
:param sentry_units: list of sentry units
846
:param deployment: amulet deployment object pointer
847
:param max_wait: maximum time to wait in seconds to confirm
848
:returns: None if successful. Raise on error.
850
self.log.debug('Setting ssl charm config option: off')
853
config = {'ssl': 'off'}
854
deployment.d.configure('rabbitmq-server', config)
856
# Wait for unit status
857
self.rmq_wait_for_cluster(deployment)
861
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
862
while ret and tries < (max_wait / 4):
864
self.log.debug('Attempt {}: {}'.format(tries, ret))
865
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
869
amulet.raise_status(amulet.FAIL, ret)
871
def connect_amqp_by_unit(self, sentry_unit, ssl=False,
872
port=None, fatal=True,
873
username="testuser1", password="changeme"):
874
"""Establish and return a pika amqp connection to the rabbitmq service
875
running on a rmq juju unit.
877
:param sentry_unit: sentry unit pointer
878
:param ssl: boolean, default to False
879
:param port: amqp port, use defaults if None
880
:param fatal: boolean, default to True (raises on connect error)
881
:param username: amqp user name, default to testuser1
882
:param password: amqp user password
883
:returns: pika amqp connection pointer or None if failed and non-fatal
885
host = sentry_unit.info['public-address']
886
unit_name = sentry_unit.info['unit_name']
888
# Default port logic if port is not specified
891
elif not ssl and not port:
894
self.log.debug('Connecting to amqp on {}:{} ({}) as '
895
'{}...'.format(host, port, unit_name, username))
898
credentials = pika.PlainCredentials(username, password)
899
parameters = pika.ConnectionParameters(host=host, port=port,
900
credentials=credentials,
902
connection_attempts=3,
905
connection = pika.BlockingConnection(parameters)
906
assert connection.server_properties['product'] == 'RabbitMQ'
907
self.log.debug('Connect OK')
909
except Exception as e:
910
msg = ('amqp connection failed to {}:{} as '
911
'{} ({})'.format(host, port, username, str(e)))
913
amulet.raise_status(amulet.FAIL, msg)
918
def publish_amqp_message_by_unit(self, sentry_unit, message,
919
queue="test", ssl=False,
920
username="testuser1",
923
"""Publish an amqp message to a rmq juju unit.
925
:param sentry_unit: sentry unit pointer
926
:param message: amqp message string
927
:param queue: message queue, default to test
928
:param username: amqp user name, default to testuser1
929
:param password: amqp user password
930
:param ssl: boolean, default to False
931
:param port: amqp port, use defaults if None
932
:returns: None. Raises exception if publish failed.
934
self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
936
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
941
# NOTE(beisner): extra debug here re: pika hang potential:
942
# https://github.com/pika/pika/issues/297
943
# https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
944
self.log.debug('Defining channel...')
945
channel = connection.channel()
946
self.log.debug('Declaring queue...')
947
channel.queue_declare(queue=queue, auto_delete=False, durable=True)
948
self.log.debug('Publishing message...')
949
channel.basic_publish(exchange='', routing_key=queue, body=message)
950
self.log.debug('Closing channel...')
952
self.log.debug('Closing connection...')
955
def get_amqp_message_by_unit(self, sentry_unit, queue="test",
956
username="testuser1",
958
ssl=False, port=None):
959
"""Get an amqp message from a rmq juju unit.
961
:param sentry_unit: sentry unit pointer
962
:param queue: message queue, default to test
963
:param username: amqp user name, default to testuser1
964
:param password: amqp user password
965
:param ssl: boolean, default to False
966
:param port: amqp port, use defaults if None
967
:returns: amqp message body as string. Raise if get fails.
969
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
973
channel = connection.channel()
974
method_frame, _, body = channel.basic_get(queue)
977
self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
979
channel.basic_ack(method_frame.delivery_tag)
984
msg = 'No message retrieved.'
985
amulet.raise_status(amulet.FAIL, msg)