277
342
def delete_instance(self, nova, instance):
278
343
"""Delete the specified instance."""
279
num_before = len(list(nova.servers.list()))
280
nova.servers.delete(instance)
283
num_after = len(list(nova.servers.list()))
284
while num_after != (num_before - 1) and count < 10:
286
num_after = len(list(nova.servers.list()))
287
self.log.debug('number of instances: {}'.format(num_after))
290
if num_after != (num_before - 1):
291
self.log.error('instance deletion timed out')
345
# /!\ DEPRECATION WARNING
346
self.log.warn('/!\\ DEPRECATION WARNING: use '
347
'delete_resource instead of delete_instance.')
348
self.log.debug('Deleting instance ({})...'.format(instance))
349
return self.delete_resource(nova.servers, instance,
352
def create_or_get_keypair(self, nova, keypair_name="testkey"):
353
"""Create a new keypair, or return pointer if it already exists."""
355
_keypair = nova.keypairs.get(keypair_name)
356
self.log.debug('Keypair ({}) already exists, '
357
'using it.'.format(keypair_name))
360
self.log.debug('Keypair ({}) does not exist, '
361
'creating it.'.format(keypair_name))
363
_keypair = nova.keypairs.create(name=keypair_name)
366
def create_cinder_volume(self, cinder, vol_name="demo-vol", vol_size=1,
367
img_id=None, src_vol_id=None, snap_id=None):
368
"""Create cinder volume, optionally from a glance image, OR
369
optionally as a clone of an existing volume, OR optionally
370
from a snapshot. Wait for the new volume status to reach
371
the expected status, validate and return a resource pointer.
373
:param vol_name: cinder volume display name
374
:param vol_size: size in gigabytes
375
:param img_id: optional glance image id
376
:param src_vol_id: optional source volume id to clone
377
:param snap_id: optional snapshot id to use
378
:returns: cinder volume pointer
380
# Handle parameter input and avoid impossible combinations
381
if img_id and not src_vol_id and not snap_id:
382
# Create volume from image
383
self.log.debug('Creating cinder volume from glance image...')
385
elif src_vol_id and not img_id and not snap_id:
386
# Clone an existing volume
387
self.log.debug('Cloning cinder volume...')
388
bootable = cinder.volumes.get(src_vol_id).bootable
389
elif snap_id and not src_vol_id and not img_id:
390
# Create volume from snapshot
391
self.log.debug('Creating cinder volume from snapshot...')
392
snap = cinder.volume_snapshots.find(id=snap_id)
394
snap_vol_id = cinder.volume_snapshots.get(snap_id).volume_id
395
bootable = cinder.volumes.get(snap_vol_id).bootable
396
elif not img_id and not src_vol_id and not snap_id:
398
self.log.debug('Creating cinder volume...')
401
# Impossible combination of parameters
402
msg = ('Invalid method use - name:{} size:{} img_id:{} '
403
'src_vol_id:{} snap_id:{}'.format(vol_name, vol_size,
406
amulet.raise_status(amulet.FAIL, msg=msg)
410
vol_new = cinder.volumes.create(display_name=vol_name,
413
source_volid=src_vol_id,
416
except Exception as e:
417
msg = 'Failed to create volume: {}'.format(e)
418
amulet.raise_status(amulet.FAIL, msg=msg)
420
# Wait for volume to reach available status
421
ret = self.resource_reaches_status(cinder.volumes, vol_id,
422
expected_stat="available",
423
msg="Volume status wait")
425
msg = 'Cinder volume failed to reach expected state.'
426
amulet.raise_status(amulet.FAIL, msg=msg)
428
# Re-validate new volume
429
self.log.debug('Validating volume attributes...')
430
val_vol_name = cinder.volumes.get(vol_id).display_name
431
val_vol_boot = cinder.volumes.get(vol_id).bootable
432
val_vol_stat = cinder.volumes.get(vol_id).status
433
val_vol_size = cinder.volumes.get(vol_id).size
434
msg_attr = ('Volume attributes - name:{} id:{} stat:{} boot:'
435
'{} size:{}'.format(val_vol_name, vol_id,
436
val_vol_stat, val_vol_boot,
439
if val_vol_boot == bootable and val_vol_stat == 'available' \
440
and val_vol_name == vol_name and val_vol_size == vol_size:
441
self.log.debug(msg_attr)
443
msg = ('Volume validation failed, {}'.format(msg_attr))
444
amulet.raise_status(amulet.FAIL, msg=msg)
448
def delete_resource(self, resource, resource_id,
449
msg="resource", max_wait=120):
450
"""Delete one openstack resource, such as one instance, keypair,
451
image, volume, stack, etc., and confirm deletion within max wait time.
453
:param resource: pointer to os resource type, ex:glance_client.images
454
:param resource_id: unique name or id for the openstack resource
455
:param msg: text to identify purpose in logging
456
:param max_wait: maximum wait time in seconds
457
:returns: True if successful, otherwise False
459
self.log.debug('Deleting OpenStack resource '
460
'{} ({})'.format(resource_id, msg))
461
num_before = len(list(resource.list()))
462
resource.delete(resource_id)
465
num_after = len(list(resource.list()))
466
while num_after != (num_before - 1) and tries < (max_wait / 4):
467
self.log.debug('{} delete check: '
468
'{} [{}:{}] {}'.format(msg, tries,
473
num_after = len(list(resource.list()))
476
self.log.debug('{}: expected, actual count = {}, '
477
'{}'.format(msg, num_before - 1, num_after))
479
if num_after == (num_before - 1):
482
self.log.error('{} delete timed out'.format(msg))
485
def resource_reaches_status(self, resource, resource_id,
486
expected_stat='available',
487
msg='resource', max_wait=120):
488
"""Wait for an openstack resources status to reach an
489
expected status within a specified time. Useful to confirm that
490
nova instances, cinder vols, snapshots, glance images, heat stacks
491
and other resources eventually reach the expected status.
493
:param resource: pointer to os resource type, ex: heat_client.stacks
494
:param resource_id: unique id for the openstack resource
495
:param expected_stat: status to expect resource to reach
496
:param msg: text to identify purpose in logging
497
:param max_wait: maximum wait time in seconds
498
:returns: True if successful, False if status is not reached
502
resource_stat = resource.get(resource_id).status
503
while resource_stat != expected_stat and tries < (max_wait / 4):
504
self.log.debug('{} status check: '
505
'{} [{}:{}] {}'.format(msg, tries,
510
resource_stat = resource.get(resource_id).status
513
self.log.debug('{}: expected, actual status = {}, '
514
'{}'.format(msg, resource_stat, expected_stat))
516
if resource_stat == expected_stat:
519
self.log.debug('{} never reached expected status: '
520
'{}'.format(resource_id, expected_stat))
523
def get_ceph_osd_id_cmd(self, index):
524
"""Produce a shell command that will return a ceph-osd id."""
525
return ("`initctl list | grep 'ceph-osd ' | "
526
"awk 'NR=={} {{ print $2 }}' | "
527
"grep -o '[0-9]*'`".format(index + 1))
529
def get_ceph_pools(self, sentry_unit):
530
"""Return a dict of ceph pools from a single ceph unit, with
531
pool name as keys, pool id as vals."""
533
cmd = 'sudo ceph osd lspools'
534
output, code = sentry_unit.run(cmd)
536
msg = ('{} `{}` returned {} '
537
'{}'.format(sentry_unit.info['unit_name'],
539
amulet.raise_status(amulet.FAIL, msg=msg)
541
# Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
542
for pool in str(output).split(','):
543
pool_id_name = pool.split(' ')
544
if len(pool_id_name) == 2:
545
pool_id = pool_id_name[0]
546
pool_name = pool_id_name[1]
547
pools[pool_name] = int(pool_id)
549
self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
553
def get_ceph_df(self, sentry_unit):
554
"""Return dict of ceph df json output, including ceph pool state.
556
:param sentry_unit: Pointer to amulet sentry instance (juju unit)
557
:returns: Dict of ceph df output
559
cmd = 'sudo ceph df --format=json'
560
output, code = sentry_unit.run(cmd)
562
msg = ('{} `{}` returned {} '
563
'{}'.format(sentry_unit.info['unit_name'],
565
amulet.raise_status(amulet.FAIL, msg=msg)
566
return json.loads(output)
568
def get_ceph_pool_sample(self, sentry_unit, pool_id=0):
569
"""Take a sample of attributes of a ceph pool, returning ceph
570
pool name, object count and disk space used for the specified
573
:param sentry_unit: Pointer to amulet sentry instance (juju unit)
574
:param pool_id: Ceph pool ID
575
:returns: List of pool name, object count, kb disk space used
577
df = self.get_ceph_df(sentry_unit)
578
pool_name = df['pools'][pool_id]['name']
579
obj_count = df['pools'][pool_id]['stats']['objects']
580
kb_used = df['pools'][pool_id]['stats']['kb_used']
581
self.log.debug('Ceph {} pool (ID {}): {} objects, '
582
'{} kb used'.format(pool_name, pool_id,
584
return pool_name, obj_count, kb_used
586
def validate_ceph_pool_samples(self, samples, sample_type="resource pool"):
587
"""Validate ceph pool samples taken over time, such as pool
588
object counts or pool kb used, before adding, after adding, and
589
after deleting items which affect those pool attributes. The
590
2nd element is expected to be greater than the 1st; 3rd is expected
591
to be less than the 2nd.
593
:param samples: List containing 3 data samples
594
:param sample_type: String for logging and usage context
595
:returns: None if successful, Failure message otherwise
597
original, created, deleted = range(3)
598
if samples[created] <= samples[original] or \
599
samples[deleted] >= samples[created]:
600
return ('Ceph {} samples ({}) '
601
'unexpected.'.format(sample_type, samples))
603
self.log.debug('Ceph {} samples (OK): '
604
'{}'.format(sample_type, samples))
607
# rabbitmq/amqp specific helpers:
608
def add_rmq_test_user(self, sentry_units,
609
username="testuser1", password="changeme"):
610
"""Add a test user via the first rmq juju unit, check connection as
611
the new user against all sentry units.
613
:param sentry_units: list of sentry unit pointers
614
:param username: amqp user name, default to testuser1
615
:param password: amqp user password
616
:returns: None if successful. Raise on error.
618
self.log.debug('Adding rmq user ({})...'.format(username))
620
# Check that user does not already exist
621
cmd_user_list = 'rabbitmqctl list_users'
622
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
623
if username in output:
624
self.log.warning('User ({}) already exists, returning '
625
'gracefully.'.format(username))
628
perms = '".*" ".*" ".*"'
629
cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
630
'rabbitmqctl set_permissions {} {}'.format(username, perms)]
632
# Add user via first unit
634
output, _ = self.run_cmd_unit(sentry_units[0], cmd)
636
# Check connection against the other sentry_units
637
self.log.debug('Checking user connect against units...')
638
for sentry_unit in sentry_units:
639
connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
644
def delete_rmq_test_user(self, sentry_units, username="testuser1"):
645
"""Delete a rabbitmq user via the first rmq juju unit.
647
:param sentry_units: list of sentry unit pointers
648
:param username: amqp user name, default to testuser1
649
:param password: amqp user password
650
:returns: None if successful or no such user.
652
self.log.debug('Deleting rmq user ({})...'.format(username))
654
# Check that the user exists
655
cmd_user_list = 'rabbitmqctl list_users'
656
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
658
if username not in output:
659
self.log.warning('User ({}) does not exist, returning '
660
'gracefully.'.format(username))
664
cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
665
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
667
def get_rmq_cluster_status(self, sentry_unit):
668
"""Execute rabbitmq cluster status command on a unit and return
671
:param unit: sentry unit
672
:returns: String containing console output of cluster status command
674
cmd = 'rabbitmqctl cluster_status'
675
output, _ = self.run_cmd_unit(sentry_unit, cmd)
676
self.log.debug('{} cluster_status:\n{}'.format(
677
sentry_unit.info['unit_name'], output))
680
def get_rmq_cluster_running_nodes(self, sentry_unit):
681
"""Parse rabbitmqctl cluster_status output string, return list of
682
running rabbitmq cluster nodes.
684
:param unit: sentry unit
685
:returns: List containing node names of running nodes
687
# NOTE(beisner): rabbitmqctl cluster_status output is not
688
# json-parsable, do string chop foo, then json.loads that.
689
str_stat = self.get_rmq_cluster_status(sentry_unit)
690
if 'running_nodes' in str_stat:
691
pos_start = str_stat.find("{running_nodes,") + 15
692
pos_end = str_stat.find("]},", pos_start) + 1
693
str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
694
run_nodes = json.loads(str_run_nodes)
699
def validate_rmq_cluster_running_nodes(self, sentry_units):
700
"""Check that all rmq unit hostnames are represented in the
701
cluster_status output of all units.
703
:param host_names: dict of juju unit names to host names
704
:param units: list of sentry unit pointers (all rmq units)
705
:returns: None if successful, otherwise return error message
707
host_names = self.get_unit_hostnames(sentry_units)
710
# Query every unit for cluster_status running nodes
711
for query_unit in sentry_units:
712
query_unit_name = query_unit.info['unit_name']
713
running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
715
# Confirm that every unit is represented in the queried unit's
716
# cluster_status running nodes output.
717
for validate_unit in sentry_units:
718
val_host_name = host_names[validate_unit.info['unit_name']]
719
val_node_name = 'rabbit@{}'.format(val_host_name)
721
if val_node_name not in running_nodes:
722
errors.append('Cluster member check failed on {}: {} not '
723
'in {}\n'.format(query_unit_name,
727
return ''.join(errors)
729
def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
730
"""Check a single juju rmq unit for ssl and port in the config file."""
731
host = sentry_unit.info['public-address']
732
unit_name = sentry_unit.info['unit_name']
734
conf_file = '/etc/rabbitmq/rabbitmq.config'
735
conf_contents = str(self.file_contents_safe(sentry_unit,
736
conf_file, max_wait=16))
738
conf_ssl = 'ssl' in conf_contents
739
conf_port = str(port) in conf_contents
741
# Port explicitly checked in config
742
if port and conf_port and conf_ssl:
743
self.log.debug('SSL is enabled @{}:{} '
744
'({})'.format(host, port, unit_name))
746
elif port and not conf_port and conf_ssl:
747
self.log.debug('SSL is enabled @{} but not on port {} '
748
'({})'.format(host, port, unit_name))
750
# Port not checked (useful when checking that ssl is disabled)
751
elif not port and conf_ssl:
752
self.log.debug('SSL is enabled @{}:{} '
753
'({})'.format(host, port, unit_name))
756
self.log.debug('SSL not enabled @{}:{} '
757
'({})'.format(host, port, unit_name))
760
msg = ('Unknown condition when checking SSL status @{}:{} '
761
'({})'.format(host, port, unit_name))
762
amulet.raise_status(amulet.FAIL, msg)
764
def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
765
"""Check that ssl is enabled on rmq juju sentry units.
767
:param sentry_units: list of all rmq sentry units
768
:param port: optional ssl port override to validate
769
:returns: None if successful, otherwise return error message
771
for sentry_unit in sentry_units:
772
if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
773
return ('Unexpected condition: ssl is disabled on unit '
774
'({})'.format(sentry_unit.info['unit_name']))
777
def validate_rmq_ssl_disabled_units(self, sentry_units):
778
"""Check that ssl is enabled on listed rmq juju sentry units.
780
:param sentry_units: list of all rmq sentry units
781
:returns: True if successful. Raise on error.
783
for sentry_unit in sentry_units:
784
if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
785
return ('Unexpected condition: ssl is enabled on unit '
786
'({})'.format(sentry_unit.info['unit_name']))
789
def configure_rmq_ssl_on(self, sentry_units, deployment,
790
port=None, max_wait=60):
791
"""Turn ssl charm config option on, with optional non-default
792
ssl port specification. Confirm that it is enabled on every
795
:param sentry_units: list of sentry units
796
:param deployment: amulet deployment object pointer
797
:param port: amqp port, use defaults if None
798
:param max_wait: maximum time to wait in seconds to confirm
799
:returns: None if successful. Raise on error.
801
self.log.debug('Setting ssl charm config option: on')
804
config = {'ssl': 'on'}
806
config['ssl_port'] = port
808
deployment.configure('rabbitmq-server', config)
812
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
813
while ret and tries < (max_wait / 4):
815
self.log.debug('Attempt {}: {}'.format(tries, ret))
816
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
820
amulet.raise_status(amulet.FAIL, ret)
822
def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
823
"""Turn ssl charm config option off, confirm that it is disabled
826
:param sentry_units: list of sentry units
827
:param deployment: amulet deployment object pointer
828
:param max_wait: maximum time to wait in seconds to confirm
829
:returns: None if successful. Raise on error.
831
self.log.debug('Setting ssl charm config option: off')
834
config = {'ssl': 'off'}
835
deployment.configure('rabbitmq-server', config)
839
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
840
while ret and tries < (max_wait / 4):
842
self.log.debug('Attempt {}: {}'.format(tries, ret))
843
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
847
amulet.raise_status(amulet.FAIL, ret)
849
def connect_amqp_by_unit(self, sentry_unit, ssl=False,
850
port=None, fatal=True,
851
username="testuser1", password="changeme"):
852
"""Establish and return a pika amqp connection to the rabbitmq service
853
running on a rmq juju unit.
855
:param sentry_unit: sentry unit pointer
856
:param ssl: boolean, default to False
857
:param port: amqp port, use defaults if None
858
:param fatal: boolean, default to True (raises on connect error)
859
:param username: amqp user name, default to testuser1
860
:param password: amqp user password
861
:returns: pika amqp connection pointer or None if failed and non-fatal
863
host = sentry_unit.info['public-address']
864
unit_name = sentry_unit.info['unit_name']
866
# Default port logic if port is not specified
869
elif not ssl and not port:
872
self.log.debug('Connecting to amqp on {}:{} ({}) as '
873
'{}...'.format(host, port, unit_name, username))
876
credentials = pika.PlainCredentials(username, password)
877
parameters = pika.ConnectionParameters(host=host, port=port,
878
credentials=credentials,
880
connection_attempts=3,
883
connection = pika.BlockingConnection(parameters)
884
assert connection.server_properties['product'] == 'RabbitMQ'
885
self.log.debug('Connect OK')
887
except Exception as e:
888
msg = ('amqp connection failed to {}:{} as '
889
'{} ({})'.format(host, port, username, str(e)))
891
amulet.raise_status(amulet.FAIL, msg)
896
def publish_amqp_message_by_unit(self, sentry_unit, message,
897
queue="test", ssl=False,
898
username="testuser1",
901
"""Publish an amqp message to a rmq juju unit.
903
:param sentry_unit: sentry unit pointer
904
:param message: amqp message string
905
:param queue: message queue, default to test
906
:param username: amqp user name, default to testuser1
907
:param password: amqp user password
908
:param ssl: boolean, default to False
909
:param port: amqp port, use defaults if None
910
:returns: None. Raises exception if publish failed.
912
self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
914
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
919
# NOTE(beisner): extra debug here re: pika hang potential:
920
# https://github.com/pika/pika/issues/297
921
# https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
922
self.log.debug('Defining channel...')
923
channel = connection.channel()
924
self.log.debug('Declaring queue...')
925
channel.queue_declare(queue=queue, auto_delete=False, durable=True)
926
self.log.debug('Publishing message...')
927
channel.basic_publish(exchange='', routing_key=queue, body=message)
928
self.log.debug('Closing channel...')
930
self.log.debug('Closing connection...')
933
def get_amqp_message_by_unit(self, sentry_unit, queue="test",
934
username="testuser1",
936
ssl=False, port=None):
937
"""Get an amqp message from a rmq juju unit.
939
:param sentry_unit: sentry unit pointer
940
:param queue: message queue, default to test
941
:param username: amqp user name, default to testuser1
942
:param password: amqp user password
943
:param ssl: boolean, default to False
944
:param port: amqp port, use defaults if None
945
:returns: amqp message body as string. Raise if get fails.
947
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
951
channel = connection.channel()
952
method_frame, _, body = channel.basic_get(queue)
955
self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
957
channel.basic_ack(method_frame.delivery_tag)
962
msg = 'No message retrieved.'
963
amulet.raise_status(amulet.FAIL, msg)