22
22
from oslo_log import log as logging
23
23
import oslo_messaging as messaging
24
24
from oslo_serialization import jsonutils
25
from oslo_service import service
26
from oslo_service import threadgroup
25
27
from oslo_utils import uuidutils
26
28
from osprofiler import profiler
58
60
from heat.objects import stack as stack_object
59
61
from heat.objects import watch_data
60
62
from heat.objects import watch_rule
61
from heat.openstack.common import service
62
from heat.openstack.common import threadgroup
63
63
from heat.rpc import api as rpc_api
64
64
from heat.rpc import worker_api as rpc_worker_api
135
135
:param args: Args to be passed to func
136
136
:param kwargs: Keyword-args to be passed to func.
138
lock = stack_lock.StackLock(cnxt, stack, engine_id)
139
with lock.thread_lock(stack.id):
138
lock = stack_lock.StackLock(cnxt, stack.id, engine_id)
139
with lock.thread_lock():
140
140
th = self.start_with_acquired_lock(stack, lock,
141
141
func, *args, **kwargs)
156
156
:param kwargs: Keyword-args to be passed to func
159
def release(gt, *args):
161
161
Callback function that will be passed to GreenThread.link().
165
165
th = self.start(stack.id, func, *args, **kwargs)
166
th.link(release, stack.id)
169
169
def add_timer(self, stack_id, func, *args, **kwargs):
378
379
# Wait for all active threads to be finished
379
for stack_id in self.thread_group_mgr.groups.keys():
380
for stack_id in list(self.thread_group_mgr.groups.keys()):
380
381
# Ignore dummy service task
381
382
if stack_id == cfg.CONF.periodic_interval:
395
396
LOG.info(_LI("All threads were gone, terminating engine"))
396
397
super(EngineService, self).stop()
400
super(EngineService, self).reset()
401
logging.setup(cfg.CONF, 'heat')
398
403
@context.request_context
399
404
def identify_stack(self, cnxt, stack_name):
466
471
@context.request_context
467
472
def list_stacks(self, cnxt, limit=None, marker=None, sort_keys=None,
468
473
sort_dir=None, filters=None, tenant_safe=True,
469
show_deleted=False, show_nested=False):
474
show_deleted=False, show_nested=False, show_hidden=False,
475
tags=None, tags_any=None, not_tags=None,
471
478
The list_stacks method returns attributes of all stacks. It supports
472
479
pagination (``limit`` and ``marker``), sorting (``sort_keys`` and
481
488
:param tenant_safe: if true, scope the request by the current tenant
482
489
:param show_deleted: if true, show soft-deleted stacks
483
490
:param show_nested: if true, show nested stacks
491
:param show_hidden: if true, show hidden stacks
492
:param tags: show stacks containing these tags, combine multiple
493
tags using the boolean AND expression
494
:param tags_any: show stacks containing these tags, combine multiple
495
tags using the boolean OR expression
496
:param not_tags: show stacks not containing these tags, combine
497
multiple tags using the boolean AND expression
498
:param not_tags_any: show stacks not containing these tags, combine
499
multiple tags using the boolean OR expression
484
500
:returns: a list of formatted stacks
486
502
stacks = parser.Stack.load_all(cnxt, limit, marker, sort_keys,
487
503
sort_dir, filters, tenant_safe,
488
504
show_deleted, resolve_data=False,
489
show_nested=show_nested)
505
show_nested=show_nested,
506
show_hidden=show_hidden,
507
tags=tags, tags_any=tags_any,
509
not_tags_any=not_tags_any)
490
510
return [api.format_stack(stack) for stack in stacks]
492
512
@context.request_context
493
513
def count_stacks(self, cnxt, filters=None, tenant_safe=True,
494
show_deleted=False, show_nested=False):
514
show_deleted=False, show_nested=False, show_hidden=False,
515
tags=None, tags_any=None, not_tags=None,
496
518
Return the number of stacks that match the given filters
497
519
:param cnxt: RPC context.
499
521
:param tenant_safe: if true, scope the request by the current tenant
500
522
:param show_deleted: if true, count will include the deleted stacks
501
523
:param show_nested: if true, count will include nested stacks
524
:param show_hidden: if true, count will include hidden stacks
525
:param tags: count stacks containing these tags, combine multiple tags
526
using the boolean AND expression
527
:param tags_any: count stacks containing these tags, combine multiple
528
tags using the boolean OR expression
529
:param not_tags: count stacks not containing these tags, combine
530
multiple tags using the boolean AND expression
531
:param not_tags_any: count stacks not containing these tags, combine
532
multiple tags using the boolean OR expression
502
533
:returns: a integer representing the number of matched stacks
504
535
return stack_object.Stack.count_all(
507
538
tenant_safe=tenant_safe,
508
539
show_deleted=show_deleted,
509
show_nested=show_nested)
540
show_nested=show_nested,
541
show_hidden=show_hidden,
545
not_tags_any=not_tags_any)
511
547
def _validate_deferred_auth_context(self, cnxt, stack):
512
548
if cfg.CONF.deferred_auth_method != 'password':
523
559
def _validate_new_stack(self, cnxt, stack_name, parsed_template):
525
561
parsed_template.validate()
562
except AssertionError:
526
564
except Exception as ex:
527
565
raise exception.StackValidationFailed(message=six.text_type(ex))
535
573
" Please delete some stacks.") % tenant_limit
536
574
raise exception.RequestLimitExceeded(message=message)
576
max_resources = cfg.CONF.max_resources_per_stack
577
if max_resources == -1:
538
579
num_resources = len(parsed_template[parsed_template.RESOURCES])
539
if num_resources > cfg.CONF.max_resources_per_stack:
580
if num_resources > max_resources:
540
581
message = exception.StackResourceLimitExceeded.msg_fmt
541
582
raise exception.RequestLimitExceeded(message=message)
546
587
stack_user_project_id=None,
547
588
convergence=False,
548
589
parent_resource_name=None):
590
common_params = api.extract_args(args)
549
592
# If it is stack-adopt, use parameters from adopt_stack_data
550
common_params = api.extract_args(args)
551
if (rpc_api.PARAM_ADOPT_STACK_DATA in common_params and
552
not cfg.CONF.enable_stack_adopt):
553
raise exception.NotSupported(feature='Stack Adopt')
555
593
if rpc_api.PARAM_ADOPT_STACK_DATA in common_params:
594
if not cfg.CONF.enable_stack_adopt:
595
raise exception.NotSupported(feature='Stack Adopt')
556
597
# Override the params with values given with -P option
557
598
new_params = common_params[rpc_api.PARAM_ADOPT_STACK_DATA][
558
599
'environment'][rpc_api.STACK_PARAMETERS].copy()
637
678
LOG.info(_LI('Creating stack %s'), stack_name)
639
def _stack_create(stack):
680
def _create_stack_user(stack):
641
681
if not stack.stack_user_project_id:
643
683
stack.create_stack_user_project_id()
645
685
stack.state_set(stack.action, stack.FAILED,
646
686
six.text_type(ex))
688
def _stack_create(stack):
689
_create_stack_user(stack)
648
690
# Create/Adopt a stack, and create the periodic task if successful
649
691
if stack.adopt_stack_data:
660
702
LOG.info(_LI("Stack create failed, status %s"), stack.status)
662
704
convergence = cfg.CONF.convergence_engine
664
raise exception.NotSupported(feature=_('Convergence engine'))
666
706
stack = self._parse_template_and_validate_stack(
667
707
cnxt, stack_name, template, params, files, args, owner_id,
668
708
nested_depth, user_creds_id, stack_user_project_id, convergence,
669
709
parent_resource_name)
673
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
674
_stack_create, stack)
711
# once validations are done
712
# if convergence is enabled, take convergence path
714
# TODO(later): call _create_stack_user(stack)
715
# call stack.converge_stack(template=stack.t, action=stack.CREATE)
716
raise exception.NotSupported(feature=_('Convergence engine'))
719
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
720
_stack_create, stack)
676
722
return dict(stack.identifier())
713
759
current_stack.env,
714
760
args.get(rpc_api.PARAM_CLEAR_PARAMETERS, []))
715
761
tmpl = templatem.Template(template, files=files, env=env)
716
if len(tmpl[tmpl.RESOURCES]) > cfg.CONF.max_resources_per_stack:
762
max_resources = cfg.CONF.max_resources_per_stack
763
if max_resources != -1 and len(tmpl[tmpl.RESOURCES]) > max_resources:
717
764
raise exception.RequestLimitExceeded(
718
765
message=exception.StackResourceLimitExceeded.msg_fmt)
719
766
stack_name = current_stack.name
733
780
self._validate_deferred_auth_context(cnxt, updated_stack)
734
781
updated_stack.validate()
736
event = eventlet.event.Event()
737
th = self.thread_group_mgr.start_with_lock(cnxt, current_stack,
739
current_stack.update,
742
th.link(self.thread_group_mgr.remove_event, current_stack.id, event)
743
self.thread_group_mgr.add_event(current_stack.id, event)
783
# Once all the validations are done
784
# if convergence is enabled, take the convergence path
785
if current_kwargs['convergence']:
786
current_stack.converge_stack(template=tmpl)
788
event = eventlet.event.Event()
789
th = self.thread_group_mgr.start_with_lock(cnxt, current_stack,
791
current_stack.update,
794
th.link(self.thread_group_mgr.remove_event,
795
current_stack.id, event)
796
self.thread_group_mgr.add_event(current_stack.id, event)
744
797
return dict(current_stack.identifier())
746
799
@context.request_context
763
816
# stop the running update and take the lock
764
817
# as we cancel only running update, the acquire_result is
765
818
# always some engine_id, not None
766
lock = stack_lock.StackLock(cnxt, current_stack,
819
lock = stack_lock.StackLock(cnxt, current_stack.id,
768
821
engine_id = lock.try_acquire()
769
822
# Current engine has the lock
774
827
elif stack_lock.StackLock.engine_alive(cnxt, engine_id):
775
828
cancel_result = self._remote_call(
776
829
cnxt, engine_id, self.listener.SEND,
777
stack_identity, rpc_api.THREAD_CANCEL)
830
stack_identity=stack_identity, message=rpc_api.THREAD_CANCEL)
778
831
if cancel_result is None:
779
832
LOG.debug("Successfully sent %(msg)s message "
780
833
"to remote task on engine %(eng)s" % {
812
865
env = environment.Environment(params)
814
for res in tmpl_resources.values():
867
for name, res in six.iteritems(tmpl_resources):
815
868
ResourceClass = env.get_class(res['Type'])
816
869
if ResourceClass == resources.template_resource.TemplateResource:
817
870
# we can't validate a TemplateResource unless we instantiate
819
872
# parameters into properties_schema.
822
props = properties.Properties(ResourceClass.properties_schema,
823
res.get('Properties', {}),
875
props = properties.Properties(
876
ResourceClass.properties_schema,
877
res.get('Properties', {}),
878
parent_name=six.text_type(name),
880
section='Properties')
825
881
deletion_policy = res.get('DeletionPolicy', 'Delete')
827
883
ResourceClass.validate_deletion_policy(deletion_policy)
867
923
return s.raw_template.template
870
def _remote_call(self, cnxt, lock_engine_id, call, *args, **kwargs):
926
def _remote_call(self, cnxt, lock_engine_id, call, **kwargs):
871
927
timeout = cfg.CONF.engine_life_check_timeout
872
928
self.cctxt = self._client.prepare(
875
931
topic=rpc_api.LISTENER_TOPIC,
876
932
server=lock_engine_id)
878
self.cctxt.call(cnxt, call, *args, **kwargs)
934
self.cctxt.call(cnxt, call, **kwargs)
879
935
except messaging.MessagingTimeout:
892
948
LOG.info(_LI('Deleting stack %s'), st.name)
893
949
stack = parser.Stack.load(cnxt, stack=st)
895
lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
896
with lock.try_thread_lock(stack.id) as acquire_result:
951
if stack.convergence:
952
template = templatem.Template.create_empty_template()
953
stack.converge_stack(template=template, action=stack.DELETE)
956
lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id)
957
with lock.try_thread_lock() as acquire_result:
898
959
# Successfully acquired lock
899
960
if acquire_result is None:
944
1005
st = self._get_stack(cnxt, stack_identity)
945
1006
LOG.info(_LI('abandoning stack %s'), st.name)
946
1007
stack = parser.Stack.load(cnxt, stack=st)
947
lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
948
with lock.thread_lock(stack.id):
1008
lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id)
1009
with lock.thread_lock():
949
1010
# Get stack details before deleting it.
950
1011
stack_info = stack.prepare_abandon()
951
1012
self.thread_group_mgr.start_with_acquired_lock(stack,
973
1034
resource_class = resources.global_env().get_class(type_name)
974
except exception.StackValidationFailed:
975
raise exception.ResourceTypeNotFound(type_name=type_name)
976
except exception.NotFound as ex:
977
raise exception.StackValidationFailed(message=ex.message)
1035
except (exception.InvalidResourceType,
1036
exception.ResourceTypeNotFound,
1037
exception.TemplateNotFound) as ex:
979
1040
def properties_schema():
980
1041
for name, schema_dict in resource_class.properties_schema.items():
985
1046
def attributes_schema():
986
1047
for name, schema_data in resource_class.attributes_schema.items():
987
1048
schema = attributes.Schema.from_attribute(schema_data)
988
yield name, {schema.DESCRIPTION: schema.description}
1049
yield name, dict(schema)
991
1052
rpc_api.RES_SCHEMA_RES_TYPE: type_name,
992
1053
rpc_api.RES_SCHEMA_PROPERTIES: dict(properties_schema()),
993
1054
rpc_api.RES_SCHEMA_ATTRIBUTES: dict(attributes_schema()),
1055
rpc_api.RES_SCHEMA_SUPPORT_STATUS:
1056
resource_class.support_status.to_dict(),
996
def generate_template(self, cnxt, type_name):
1059
def generate_template(self, cnxt, type_name, template_type='cfn'):
998
1061
Generate a template based on the specified type.
1000
1063
:param cnxt: RPC context.
1001
1064
:param type_name: Name of the resource type to generate a template for.
1065
:param template_type: the template type to generate, cfn or hot.
1004
1068
return resources.global_env().get_class(
1005
type_name).resource_to_template(type_name)
1006
except exception.StackValidationFailed:
1007
raise exception.ResourceTypeNotFound(type_name=type_name)
1008
except exception.NotFound as ex:
1009
raise exception.StackValidationFailed(message=ex.message)
1069
type_name).resource_to_template(type_name, template_type)
1070
except (exception.InvalidResourceType,
1071
exception.ResourceTypeNotFound,
1072
exception.TemplateNotFound) as ex:
1011
1075
@context.request_context
1012
1076
def list_events(self, cnxt, stack_identity, filters=None, limit=None,
1115
1179
implementation.
1118
def _resource_signal(rsrc, details):
1182
def _resource_signal(stack, rsrc, details):
1120
1183
LOG.debug("signaling resource %s:%s" % (stack.name, rsrc.name))
1121
1184
rsrc.signal(details)
1141
1204
rsrc = stack[resource_name]
1142
1205
if callable(rsrc.signal):
1144
_resource_signal(rsrc, details)
1207
_resource_signal(stack, rsrc, details)
1145
1208
return rsrc.metadata_get()
1147
1210
self.thread_group_mgr.start(stack.id, _resource_signal,
1211
stack, rsrc, details)
1150
1213
@context.request_context
1151
1214
def find_physical_resource(self, cnxt, physical_resource_id):
1220
1283
@context.request_context
1221
1284
def stack_snapshot(self, cnxt, stack_identity, name):
1222
1285
def _stack_snapshot(stack, snapshot):
1223
LOG.debug("snapshotting stack %s" % stack.name)
1225
data = stack.prepare_abandon()
1226
snapshot_object.Snapshot.update(
1228
{'data': data, 'status': stack.status,
1229
'status_reason': stack.status_reason})
1287
def save_snapshot(stack, action, status, reason):
1288
"""Function that saves snapshot before snapshot complete."""
1289
data = stack.prepare_abandon()
1290
data["status"] = status
1291
snapshot_object.Snapshot.update(
1293
{'data': data, 'status': status,
1294
'status_reason': reason})
1296
LOG.debug("Snapshotting stack %s" % stack.name)
1297
stack.snapshot(save_snapshot_func=save_snapshot)
1231
1299
s = self._get_stack(cnxt, stack_identity)
1239
1307
raise exception.ActionInProgress(stack_name=stack.name,
1240
1308
action=stack.action)
1242
lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
1310
lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id)
1244
with lock.thread_lock(stack.id):
1312
with lock.thread_lock():
1245
1313
snapshot = snapshot_object.Snapshot.create(cnxt, {
1246
1314
'tenant': cnxt.tenant_id,
1254
1322
@context.request_context
1255
1323
def show_snapshot(self, cnxt, stack_identity, snapshot_id):
1256
snapshot = snapshot_object.Snapshot.get_by_id(cnxt, snapshot_id)
1324
s = self._get_stack(cnxt, stack_identity)
1325
snapshot = snapshot_object.Snapshot.get_snapshot_by_stack(
1326
cnxt, snapshot_id, s)
1257
1327
return api.format_snapshot(snapshot)
1259
1329
@context.request_context
1265
1335
s = self._get_stack(cnxt, stack_identity)
1266
1336
stack = parser.Stack.load(cnxt, stack=s)
1267
snapshot = snapshot_object.Snapshot.get_by_id(cnxt, snapshot_id)
1337
snapshot = snapshot_object.Snapshot.get_snapshot_by_stack(
1338
cnxt, snapshot_id, s)
1339
if snapshot.status == stack.IN_PROGRESS:
1340
msg = _('Deleting in-progress snapshot')
1341
raise exception.NotSupported(feature=msg)
1268
1343
self.thread_group_mgr.start(
1269
1344
stack.id, _delete_snapshot, stack, snapshot)
1287
1362
stack.restore(snapshot)
1289
1364
s = self._get_stack(cnxt, stack_identity)
1290
snapshot = snapshot_object.Snapshot.get_by_id(cnxt, snapshot_id)
1292
1365
stack = parser.Stack.load(cnxt, stack=s)
1366
snapshot = snapshot_object.Snapshot.get_snapshot_by_stack(
1367
cnxt, snapshot_id, s)
1294
1369
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
1295
1370
_stack_restore, stack, snapshot)
1301
1376
return [api.format_snapshot(snapshot) for snapshot in data]
1303
1378
@context.request_context
1304
def metadata_update(self, cnxt, stack_identity,
1305
resource_name, metadata):
1307
Update the metadata for the given resource.
1308
DEPRECATED: Use resource_signal instead
1310
warnings.warn('metadata_update is deprecated, '
1311
'use resource_signal instead',
1314
s = self._get_stack(cnxt, stack_identity)
1316
stack = parser.Stack.load(cnxt, stack=s)
1317
if resource_name not in stack:
1318
raise exception.ResourceNotFound(resource_name=resource_name,
1319
stack_name=stack.name)
1321
resource = stack[resource_name]
1322
resource.metadata_update(new_metadata=metadata)
1324
# This is not "nice" converting to the stored context here,
1325
# but this happens because the keystone user associated with the
1326
# WaitCondition doesn't have permission to read the secret key of
1327
# the user associated with the cfn-credentials file
1328
refresh_stack = parser.Stack.load(cnxt, stack=s,
1329
use_stored_context=True)
1331
# Refresh the metadata for all other resources, since we expect
1332
# resource_name to be a WaitCondition resource, and other
1333
# resources may refer to WaitCondition Fn::GetAtt Data, which
1335
for res in refresh_stack.dependencies:
1336
if res.name != resource_name and res.id is not None:
1337
res.metadata_update()
1339
return resource.metadata_get()
1341
@context.request_context
1342
1379
def create_watch_data(self, cnxt, watch_name, stats_data):
1344
1381
This could be used by CloudWatch and WaitConditions
1522
1559
def service_manage_report(self):
1523
1560
cnxt = context.get_admin_context()
1525
if self.service_id is not None:
1526
# Service is already running
1527
service_objects.Service.update_by_id(
1530
dict(deleted_at=None))
1531
LOG.info(_LI('Service %s is updated'), self.service_id)
1562
if self.service_id is None:
1533
1563
service_ref = service_objects.Service.create(
1535
1565
dict(host=self.host,
1542
1572
self.service_id = service_ref['id']
1543
1573
LOG.info(_LI('Service %s is started'), self.service_id)
1576
service_objects.Service.update_by_id(
1579
dict(deleted_at=None))
1580
LOG.info(_LI('Service %s is updated'), self.service_id)
1581
except Exception as ex:
1582
LOG.error(_LE('Service %(service_id)s update '
1583
'failed: %(error)s'),
1584
{'service_id': self.service_id, 'error': ex})
1545
1586
def service_manage_cleanup(self):
1546
1587
cnxt = context.get_admin_context()
1547
1588
last_updated_window = (3 * cfg.CONF.periodic_interval)
1567
1608
filters=filters,
1568
1609
tenant_safe=False) or []
1569
1610
for s in stacks:
1570
stk = parser.Stack.load(cnxt, stack=s,
1571
use_stored_context=True)
1572
lock = stack_lock.StackLock(cnxt, stk, self.engine_id)
1611
lock = stack_lock.StackLock(cnxt, s.id, self.engine_id)
1573
1612
# If stacklock is released, means stack status may changed.
1574
1613
engine_id = lock.get_engine_id()
1575
1614
if not engine_id:
1579
1618
lock.acquire(retry=False)
1580
1619
except exception.ActionInProgress:
1621
stk = parser.Stack.load(cnxt, stack=s,
1622
use_stored_context=True)
1582
1623
LOG.info(_LI('Engine %(engine)s went down when stack %(stack_id)s'
1583
1624
' was in action %(action)s'),
1584
1625
{'engine': engine_id, 'action': stk.action,