277
370
def delete_instance(self, nova, instance):
278
371
"""Delete the specified instance."""
279
num_before = len(list(nova.servers.list()))
280
nova.servers.delete(instance)
283
num_after = len(list(nova.servers.list()))
284
while num_after != (num_before - 1) and count < 10:
286
num_after = len(list(nova.servers.list()))
287
self.log.debug('number of instances: {}'.format(num_after))
290
if num_after != (num_before - 1):
291
self.log.error('instance deletion timed out')
373
# /!\ DEPRECATION WARNING
374
self.log.warn('/!\\ DEPRECATION WARNING: use '
375
'delete_resource instead of delete_instance.')
376
self.log.debug('Deleting instance ({})...'.format(instance))
377
return self.delete_resource(nova.servers, instance,
380
def create_or_get_keypair(self, nova, keypair_name="testkey"):
381
"""Create a new keypair, or return pointer if it already exists."""
383
_keypair = nova.keypairs.get(keypair_name)
384
self.log.debug('Keypair ({}) already exists, '
385
'using it.'.format(keypair_name))
388
self.log.debug('Keypair ({}) does not exist, '
389
'creating it.'.format(keypair_name))
391
_keypair = nova.keypairs.create(name=keypair_name)
394
def create_cinder_volume(self, cinder, vol_name="demo-vol", vol_size=1,
395
img_id=None, src_vol_id=None, snap_id=None):
396
"""Create cinder volume, optionally from a glance image, OR
397
optionally as a clone of an existing volume, OR optionally
398
from a snapshot. Wait for the new volume status to reach
399
the expected status, validate and return a resource pointer.
401
:param vol_name: cinder volume display name
402
:param vol_size: size in gigabytes
403
:param img_id: optional glance image id
404
:param src_vol_id: optional source volume id to clone
405
:param snap_id: optional snapshot id to use
406
:returns: cinder volume pointer
408
# Handle parameter input and avoid impossible combinations
409
if img_id and not src_vol_id and not snap_id:
410
# Create volume from image
411
self.log.debug('Creating cinder volume from glance image...')
413
elif src_vol_id and not img_id and not snap_id:
414
# Clone an existing volume
415
self.log.debug('Cloning cinder volume...')
416
bootable = cinder.volumes.get(src_vol_id).bootable
417
elif snap_id and not src_vol_id and not img_id:
418
# Create volume from snapshot
419
self.log.debug('Creating cinder volume from snapshot...')
420
snap = cinder.volume_snapshots.find(id=snap_id)
422
snap_vol_id = cinder.volume_snapshots.get(snap_id).volume_id
423
bootable = cinder.volumes.get(snap_vol_id).bootable
424
elif not img_id and not src_vol_id and not snap_id:
426
self.log.debug('Creating cinder volume...')
429
# Impossible combination of parameters
430
msg = ('Invalid method use - name:{} size:{} img_id:{} '
431
'src_vol_id:{} snap_id:{}'.format(vol_name, vol_size,
434
amulet.raise_status(amulet.FAIL, msg=msg)
438
vol_new = cinder.volumes.create(display_name=vol_name,
441
source_volid=src_vol_id,
444
except Exception as e:
445
msg = 'Failed to create volume: {}'.format(e)
446
amulet.raise_status(amulet.FAIL, msg=msg)
448
# Wait for volume to reach available status
449
ret = self.resource_reaches_status(cinder.volumes, vol_id,
450
expected_stat="available",
451
msg="Volume status wait")
453
msg = 'Cinder volume failed to reach expected state.'
454
amulet.raise_status(amulet.FAIL, msg=msg)
456
# Re-validate new volume
457
self.log.debug('Validating volume attributes...')
458
val_vol_name = cinder.volumes.get(vol_id).display_name
459
val_vol_boot = cinder.volumes.get(vol_id).bootable
460
val_vol_stat = cinder.volumes.get(vol_id).status
461
val_vol_size = cinder.volumes.get(vol_id).size
462
msg_attr = ('Volume attributes - name:{} id:{} stat:{} boot:'
463
'{} size:{}'.format(val_vol_name, vol_id,
464
val_vol_stat, val_vol_boot,
467
if val_vol_boot == bootable and val_vol_stat == 'available' \
468
and val_vol_name == vol_name and val_vol_size == vol_size:
469
self.log.debug(msg_attr)
471
msg = ('Volume validation failed, {}'.format(msg_attr))
472
amulet.raise_status(amulet.FAIL, msg=msg)
476
def delete_resource(self, resource, resource_id,
477
msg="resource", max_wait=120):
478
"""Delete one openstack resource, such as one instance, keypair,
479
image, volume, stack, etc., and confirm deletion within max wait time.
481
:param resource: pointer to os resource type, ex:glance_client.images
482
:param resource_id: unique name or id for the openstack resource
483
:param msg: text to identify purpose in logging
484
:param max_wait: maximum wait time in seconds
485
:returns: True if successful, otherwise False
487
self.log.debug('Deleting OpenStack resource '
488
'{} ({})'.format(resource_id, msg))
489
num_before = len(list(resource.list()))
490
resource.delete(resource_id)
493
num_after = len(list(resource.list()))
494
while num_after != (num_before - 1) and tries < (max_wait / 4):
495
self.log.debug('{} delete check: '
496
'{} [{}:{}] {}'.format(msg, tries,
501
num_after = len(list(resource.list()))
504
self.log.debug('{}: expected, actual count = {}, '
505
'{}'.format(msg, num_before - 1, num_after))
507
if num_after == (num_before - 1):
510
self.log.error('{} delete timed out'.format(msg))
513
def resource_reaches_status(self, resource, resource_id,
514
expected_stat='available',
515
msg='resource', max_wait=120):
516
"""Wait for an openstack resources status to reach an
517
expected status within a specified time. Useful to confirm that
518
nova instances, cinder vols, snapshots, glance images, heat stacks
519
and other resources eventually reach the expected status.
521
:param resource: pointer to os resource type, ex: heat_client.stacks
522
:param resource_id: unique id for the openstack resource
523
:param expected_stat: status to expect resource to reach
524
:param msg: text to identify purpose in logging
525
:param max_wait: maximum wait time in seconds
526
:returns: True if successful, False if status is not reached
530
resource_stat = resource.get(resource_id).status
531
while resource_stat != expected_stat and tries < (max_wait / 4):
532
self.log.debug('{} status check: '
533
'{} [{}:{}] {}'.format(msg, tries,
538
resource_stat = resource.get(resource_id).status
541
self.log.debug('{}: expected, actual status = {}, '
542
'{}'.format(msg, resource_stat, expected_stat))
544
if resource_stat == expected_stat:
547
self.log.debug('{} never reached expected status: '
548
'{}'.format(resource_id, expected_stat))
551
def get_ceph_osd_id_cmd(self, index):
552
"""Produce a shell command that will return a ceph-osd id."""
553
return ("`initctl list | grep 'ceph-osd ' | "
554
"awk 'NR=={} {{ print $2 }}' | "
555
"grep -o '[0-9]*'`".format(index + 1))
557
def get_ceph_pools(self, sentry_unit):
558
"""Return a dict of ceph pools from a single ceph unit, with
559
pool name as keys, pool id as vals."""
561
cmd = 'sudo ceph osd lspools'
562
output, code = sentry_unit.run(cmd)
564
msg = ('{} `{}` returned {} '
565
'{}'.format(sentry_unit.info['unit_name'],
567
amulet.raise_status(amulet.FAIL, msg=msg)
569
# Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
570
for pool in str(output).split(','):
571
pool_id_name = pool.split(' ')
572
if len(pool_id_name) == 2:
573
pool_id = pool_id_name[0]
574
pool_name = pool_id_name[1]
575
pools[pool_name] = int(pool_id)
577
self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
581
def get_ceph_df(self, sentry_unit):
582
"""Return dict of ceph df json output, including ceph pool state.
584
:param sentry_unit: Pointer to amulet sentry instance (juju unit)
585
:returns: Dict of ceph df output
587
cmd = 'sudo ceph df --format=json'
588
output, code = sentry_unit.run(cmd)
590
msg = ('{} `{}` returned {} '
591
'{}'.format(sentry_unit.info['unit_name'],
593
amulet.raise_status(amulet.FAIL, msg=msg)
594
return json.loads(output)
596
def get_ceph_pool_sample(self, sentry_unit, pool_id=0):
597
"""Take a sample of attributes of a ceph pool, returning ceph
598
pool name, object count and disk space used for the specified
601
:param sentry_unit: Pointer to amulet sentry instance (juju unit)
602
:param pool_id: Ceph pool ID
603
:returns: List of pool name, object count, kb disk space used
605
df = self.get_ceph_df(sentry_unit)
606
pool_name = df['pools'][pool_id]['name']
607
obj_count = df['pools'][pool_id]['stats']['objects']
608
kb_used = df['pools'][pool_id]['stats']['kb_used']
609
self.log.debug('Ceph {} pool (ID {}): {} objects, '
610
'{} kb used'.format(pool_name, pool_id,
612
return pool_name, obj_count, kb_used
614
def validate_ceph_pool_samples(self, samples, sample_type="resource pool"):
615
"""Validate ceph pool samples taken over time, such as pool
616
object counts or pool kb used, before adding, after adding, and
617
after deleting items which affect those pool attributes. The
618
2nd element is expected to be greater than the 1st; 3rd is expected
619
to be less than the 2nd.
621
:param samples: List containing 3 data samples
622
:param sample_type: String for logging and usage context
623
:returns: None if successful, Failure message otherwise
625
original, created, deleted = range(3)
626
if samples[created] <= samples[original] or \
627
samples[deleted] >= samples[created]:
628
return ('Ceph {} samples ({}) '
629
'unexpected.'.format(sample_type, samples))
631
self.log.debug('Ceph {} samples (OK): '
632
'{}'.format(sample_type, samples))
635
# rabbitmq/amqp specific helpers:
637
def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
638
"""Wait for rmq units extended status to show cluster readiness,
639
after an optional initial sleep period. Initial sleep is likely
640
necessary to be effective following a config change, as status
641
message may not instantly update to non-ready."""
644
time.sleep(init_sleep)
646
message = re.compile('^Unit is ready and clustered$')
647
deployment._auto_wait_for_status(message=message,
649
include_only=['rabbitmq-server'])
651
def add_rmq_test_user(self, sentry_units,
652
username="testuser1", password="changeme"):
653
"""Add a test user via the first rmq juju unit, check connection as
654
the new user against all sentry units.
656
:param sentry_units: list of sentry unit pointers
657
:param username: amqp user name, default to testuser1
658
:param password: amqp user password
659
:returns: None if successful. Raise on error.
661
self.log.debug('Adding rmq user ({})...'.format(username))
663
# Check that user does not already exist
664
cmd_user_list = 'rabbitmqctl list_users'
665
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
666
if username in output:
667
self.log.warning('User ({}) already exists, returning '
668
'gracefully.'.format(username))
671
perms = '".*" ".*" ".*"'
672
cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
673
'rabbitmqctl set_permissions {} {}'.format(username, perms)]
675
# Add user via first unit
677
output, _ = self.run_cmd_unit(sentry_units[0], cmd)
679
# Check connection against the other sentry_units
680
self.log.debug('Checking user connect against units...')
681
for sentry_unit in sentry_units:
682
connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
687
def delete_rmq_test_user(self, sentry_units, username="testuser1"):
688
"""Delete a rabbitmq user via the first rmq juju unit.
690
:param sentry_units: list of sentry unit pointers
691
:param username: amqp user name, default to testuser1
692
:param password: amqp user password
693
:returns: None if successful or no such user.
695
self.log.debug('Deleting rmq user ({})...'.format(username))
697
# Check that the user exists
698
cmd_user_list = 'rabbitmqctl list_users'
699
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
701
if username not in output:
702
self.log.warning('User ({}) does not exist, returning '
703
'gracefully.'.format(username))
707
cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
708
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
710
def get_rmq_cluster_status(self, sentry_unit):
711
"""Execute rabbitmq cluster status command on a unit and return
714
:param unit: sentry unit
715
:returns: String containing console output of cluster status command
717
cmd = 'rabbitmqctl cluster_status'
718
output, _ = self.run_cmd_unit(sentry_unit, cmd)
719
self.log.debug('{} cluster_status:\n{}'.format(
720
sentry_unit.info['unit_name'], output))
723
def get_rmq_cluster_running_nodes(self, sentry_unit):
724
"""Parse rabbitmqctl cluster_status output string, return list of
725
running rabbitmq cluster nodes.
727
:param unit: sentry unit
728
:returns: List containing node names of running nodes
730
# NOTE(beisner): rabbitmqctl cluster_status output is not
731
# json-parsable, do string chop foo, then json.loads that.
732
str_stat = self.get_rmq_cluster_status(sentry_unit)
733
if 'running_nodes' in str_stat:
734
pos_start = str_stat.find("{running_nodes,") + 15
735
pos_end = str_stat.find("]},", pos_start) + 1
736
str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
737
run_nodes = json.loads(str_run_nodes)
742
def validate_rmq_cluster_running_nodes(self, sentry_units):
743
"""Check that all rmq unit hostnames are represented in the
744
cluster_status output of all units.
746
:param host_names: dict of juju unit names to host names
747
:param units: list of sentry unit pointers (all rmq units)
748
:returns: None if successful, otherwise return error message
750
host_names = self.get_unit_hostnames(sentry_units)
753
# Query every unit for cluster_status running nodes
754
for query_unit in sentry_units:
755
query_unit_name = query_unit.info['unit_name']
756
running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
758
# Confirm that every unit is represented in the queried unit's
759
# cluster_status running nodes output.
760
for validate_unit in sentry_units:
761
val_host_name = host_names[validate_unit.info['unit_name']]
762
val_node_name = 'rabbit@{}'.format(val_host_name)
764
if val_node_name not in running_nodes:
765
errors.append('Cluster member check failed on {}: {} not '
766
'in {}\n'.format(query_unit_name,
770
return ''.join(errors)
772
def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
773
"""Check a single juju rmq unit for ssl and port in the config file."""
774
host = sentry_unit.info['public-address']
775
unit_name = sentry_unit.info['unit_name']
777
conf_file = '/etc/rabbitmq/rabbitmq.config'
778
conf_contents = str(self.file_contents_safe(sentry_unit,
779
conf_file, max_wait=16))
781
conf_ssl = 'ssl' in conf_contents
782
conf_port = str(port) in conf_contents
784
# Port explicitly checked in config
785
if port and conf_port and conf_ssl:
786
self.log.debug('SSL is enabled @{}:{} '
787
'({})'.format(host, port, unit_name))
789
elif port and not conf_port and conf_ssl:
790
self.log.debug('SSL is enabled @{} but not on port {} '
791
'({})'.format(host, port, unit_name))
793
# Port not checked (useful when checking that ssl is disabled)
794
elif not port and conf_ssl:
795
self.log.debug('SSL is enabled @{}:{} '
796
'({})'.format(host, port, unit_name))
799
self.log.debug('SSL not enabled @{}:{} '
800
'({})'.format(host, port, unit_name))
803
msg = ('Unknown condition when checking SSL status @{}:{} '
804
'({})'.format(host, port, unit_name))
805
amulet.raise_status(amulet.FAIL, msg)
807
def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
808
"""Check that ssl is enabled on rmq juju sentry units.
810
:param sentry_units: list of all rmq sentry units
811
:param port: optional ssl port override to validate
812
:returns: None if successful, otherwise return error message
814
for sentry_unit in sentry_units:
815
if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
816
return ('Unexpected condition: ssl is disabled on unit '
817
'({})'.format(sentry_unit.info['unit_name']))
820
def validate_rmq_ssl_disabled_units(self, sentry_units):
821
"""Check that ssl is enabled on listed rmq juju sentry units.
823
:param sentry_units: list of all rmq sentry units
824
:returns: True if successful. Raise on error.
826
for sentry_unit in sentry_units:
827
if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
828
return ('Unexpected condition: ssl is enabled on unit '
829
'({})'.format(sentry_unit.info['unit_name']))
832
def configure_rmq_ssl_on(self, sentry_units, deployment,
833
port=None, max_wait=60):
834
"""Turn ssl charm config option on, with optional non-default
835
ssl port specification. Confirm that it is enabled on every
838
:param sentry_units: list of sentry units
839
:param deployment: amulet deployment object pointer
840
:param port: amqp port, use defaults if None
841
:param max_wait: maximum time to wait in seconds to confirm
842
:returns: None if successful. Raise on error.
844
self.log.debug('Setting ssl charm config option: on')
847
config = {'ssl': 'on'}
849
config['ssl_port'] = port
851
deployment.d.configure('rabbitmq-server', config)
853
# Wait for unit status
854
self.rmq_wait_for_cluster(deployment)
858
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
859
while ret and tries < (max_wait / 4):
861
self.log.debug('Attempt {}: {}'.format(tries, ret))
862
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
866
amulet.raise_status(amulet.FAIL, ret)
868
def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
869
"""Turn ssl charm config option off, confirm that it is disabled
872
:param sentry_units: list of sentry units
873
:param deployment: amulet deployment object pointer
874
:param max_wait: maximum time to wait in seconds to confirm
875
:returns: None if successful. Raise on error.
877
self.log.debug('Setting ssl charm config option: off')
880
config = {'ssl': 'off'}
881
deployment.d.configure('rabbitmq-server', config)
883
# Wait for unit status
884
self.rmq_wait_for_cluster(deployment)
888
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
889
while ret and tries < (max_wait / 4):
891
self.log.debug('Attempt {}: {}'.format(tries, ret))
892
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
896
amulet.raise_status(amulet.FAIL, ret)
898
def connect_amqp_by_unit(self, sentry_unit, ssl=False,
899
port=None, fatal=True,
900
username="testuser1", password="changeme"):
901
"""Establish and return a pika amqp connection to the rabbitmq service
902
running on a rmq juju unit.
904
:param sentry_unit: sentry unit pointer
905
:param ssl: boolean, default to False
906
:param port: amqp port, use defaults if None
907
:param fatal: boolean, default to True (raises on connect error)
908
:param username: amqp user name, default to testuser1
909
:param password: amqp user password
910
:returns: pika amqp connection pointer or None if failed and non-fatal
912
host = sentry_unit.info['public-address']
913
unit_name = sentry_unit.info['unit_name']
915
# Default port logic if port is not specified
918
elif not ssl and not port:
921
self.log.debug('Connecting to amqp on {}:{} ({}) as '
922
'{}...'.format(host, port, unit_name, username))
925
credentials = pika.PlainCredentials(username, password)
926
parameters = pika.ConnectionParameters(host=host, port=port,
927
credentials=credentials,
929
connection_attempts=3,
932
connection = pika.BlockingConnection(parameters)
933
assert connection.server_properties['product'] == 'RabbitMQ'
934
self.log.debug('Connect OK')
936
except Exception as e:
937
msg = ('amqp connection failed to {}:{} as '
938
'{} ({})'.format(host, port, username, str(e)))
940
amulet.raise_status(amulet.FAIL, msg)
945
def publish_amqp_message_by_unit(self, sentry_unit, message,
946
queue="test", ssl=False,
947
username="testuser1",
950
"""Publish an amqp message to a rmq juju unit.
952
:param sentry_unit: sentry unit pointer
953
:param message: amqp message string
954
:param queue: message queue, default to test
955
:param username: amqp user name, default to testuser1
956
:param password: amqp user password
957
:param ssl: boolean, default to False
958
:param port: amqp port, use defaults if None
959
:returns: None. Raises exception if publish failed.
961
self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
963
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
968
# NOTE(beisner): extra debug here re: pika hang potential:
969
# https://github.com/pika/pika/issues/297
970
# https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
971
self.log.debug('Defining channel...')
972
channel = connection.channel()
973
self.log.debug('Declaring queue...')
974
channel.queue_declare(queue=queue, auto_delete=False, durable=True)
975
self.log.debug('Publishing message...')
976
channel.basic_publish(exchange='', routing_key=queue, body=message)
977
self.log.debug('Closing channel...')
979
self.log.debug('Closing connection...')
982
def get_amqp_message_by_unit(self, sentry_unit, queue="test",
983
username="testuser1",
985
ssl=False, port=None):
986
"""Get an amqp message from a rmq juju unit.
988
:param sentry_unit: sentry unit pointer
989
:param queue: message queue, default to test
990
:param username: amqp user name, default to testuser1
991
:param password: amqp user password
992
:param ssl: boolean, default to False
993
:param port: amqp port, use defaults if None
994
:returns: amqp message body as string. Raise if get fails.
996
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
1000
channel = connection.channel()
1001
method_frame, _, body = channel.basic_get(queue)
1004
self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
1006
channel.basic_ack(method_frame.delivery_tag)
1011
msg = 'No message retrieved.'
1012
amulet.raise_status(amulet.FAIL, msg)