20
20
from oslo.config import cfg
21
21
from oslo import messaging
22
from oslo.serialization import jsonutils
22
23
from oslo.utils import timeutils
24
from osprofiler import profiler
28
30
from heat.common import context
29
31
from heat.common import exception
30
32
from heat.common.i18n import _
33
from heat.common.i18n import _LE
34
from heat.common.i18n import _LI
35
from heat.common.i18n import _LW
31
36
from heat.common import identifier
32
37
from heat.common import messaging as rpc_messaging
33
38
from heat.db import api as db_api
35
40
from heat.engine import attributes
36
41
from heat.engine import clients
37
42
from heat.engine import environment
38
from heat.engine.event import Event
43
from heat.engine import event as evt
39
44
from heat.engine import parameter_groups
40
from heat.engine import parser
41
45
from heat.engine import properties
42
from heat.engine import resource
43
46
from heat.engine import resources
47
from heat.engine import stack as parser
44
48
from heat.engine import stack_lock
45
49
from heat.engine import template as templatem
46
50
from heat.engine import watchrule
47
from heat.openstack.common import jsonutils
48
51
from heat.openstack.common import log as logging
49
52
from heat.openstack.common import service
50
53
from heat.openstack.common import threadgroup
99
def _serialize_profile_info(self):
100
prof = profiler.get()
104
"hmac_key": prof.hmac_key,
105
"base_id": prof.get_base_id(),
106
"parent_id": prof.get_id()
110
def _start_with_trace(self, trace, func, *args, **kwargs):
112
profiler.init(**trace)
113
return func(*args, **kwargs)
96
115
def start(self, stack_id, func, *args, **kwargs):
98
117
Run the given method in a sub-thread.
100
119
if stack_id not in self.groups:
101
120
self.groups[stack_id] = threadgroup.ThreadGroup()
102
return self.groups[stack_id].add_thread(func, *args, **kwargs)
121
return self.groups[stack_id].add_thread(self._start_with_trace,
122
self._serialize_profile_info(),
123
func, *args, **kwargs)
104
125
def start_with_lock(self, cnxt, stack, engine_id, func, *args, **kwargs):
236
257
db_stack = db_api.stack_get(admin_context, sid, tenant_safe=False,
239
LOG.error(_("Unable to retrieve stack %s for periodic task") % sid)
260
LOG.error(_LE("Unable to retrieve stack %s for periodic task"),
241
263
stack = parser.Stack.load(admin_context, stack=db_stack,
242
264
use_stored_context=True)
251
273
wrs = db_api.watch_rule_get_all_by_stack(admin_context, sid)
252
274
except Exception as ex:
253
LOG.warn(_('periodic_task db error watch rule removed? %(ex)s')
275
LOG.warn(_LW('periodic_task db error watch rule removed? %(ex)s'),
257
279
def run_alarm_action(stack, actions, details):
288
311
super(EngineListener, self).__init__()
289
312
self.thread_group_mgr = thread_group_mgr
290
313
self.engine_id = engine_id
293
317
super(EngineListener, self).start()
294
318
self.target = messaging.Target(
295
server=cfg.CONF.host, topic=self.engine_id)
319
server=self.host, topic=self.engine_id)
296
320
server = rpc_messaging.get_rpc_server(self.target, self)
313
337
self.thread_group_mgr.send(stack_id, message)
340
@profiler.trace_cls("rpc")
316
341
class EngineService(service.Service):
318
343
Manages the running instances from creation to destruction.
345
370
'deprecated and will be removed in the Juno '
346
371
'release.', DeprecationWarning)
373
if cfg.CONF.trusts_delegated_roles:
374
warnings.warn('The default value of "trusts_delegated_roles" '
375
'option in heat.conf is changed to [] in Kilo '
376
'and heat will delegate all roles of trustor. '
377
'Please keep the same if you do not want to '
378
'delegate subset roles when upgrading.',
348
381
def create_periodic_tasks(self):
349
382
LOG.debug("Starting periodic watch tasks pid=%s" % os.getpid())
350
383
# Note with multiple workers, the parent process hasn't called start()
367
400
LOG.debug("Starting listener for engine %s" % self.engine_id)
368
401
self.listener.start()
369
402
target = messaging.Target(
370
version=self.RPC_API_VERSION, server=cfg.CONF.host,
403
version=self.RPC_API_VERSION, server=self.host,
371
404
topic=self.topic)
372
405
self.target = target
373
406
server = rpc_messaging.get_rpc_server(target, self)
390
423
# Ignore dummy service task
391
424
if stack_id == cfg.CONF.periodic_interval:
393
LOG.info(_("Waiting stack %s processing to be finished")
426
LOG.info(_LI("Waiting stack %s processing to be finished"),
395
428
# Stop threads gracefully
396
429
self.thread_group_mgr.stop(stack_id, True)
397
LOG.info(_("Stack %s processing was finished") % stack_id)
430
LOG.info(_LI("Stack %s processing was finished"), stack_id)
399
432
# Terminate the engine process
400
LOG.info(_("All threads were gone, terminating engine"))
433
LOG.info(_LI("All threads were gone, terminating engine"))
401
434
super(EngineService, self).stop()
539
572
raise exception.RequestLimitExceeded(message=message)
541
574
def _parse_template_and_validate_stack(self, cnxt, stack_name, template,
542
params, files, args, owner_id=None):
575
params, files, args, owner_id=None,
576
nested_depth=0, user_creds_id=None,
577
stack_user_project_id=None):
578
# If it is stack-adopt, use parameters from adopt_stack_data
579
common_params = api.extract_args(args)
580
if (rpc_api.PARAM_ADOPT_STACK_DATA in common_params and
581
not cfg.CONF.enable_stack_adopt):
582
raise exception.NotSupported(feature='Stack Adopt')
543
584
tmpl = templatem.Template(template, files=files)
544
585
self._validate_new_stack(cnxt, stack_name, tmpl)
546
# If it is stack-adopt, use parameters from adopt_stack_data
547
common_params = api.extract_args(args)
549
587
if rpc_api.PARAM_ADOPT_STACK_DATA in common_params:
550
588
params[rpc_api.STACK_PARAMETERS] = common_params[
551
589
rpc_api.PARAM_ADOPT_STACK_DATA]['environment'][
554
592
env = environment.Environment(params)
555
593
stack = parser.Stack(cnxt, stack_name, tmpl, env,
556
594
owner_id=owner_id,
595
nested_depth=nested_depth,
596
user_creds_id=user_creds_id,
597
stack_user_project_id=stack_user_project_id,
559
600
self._validate_deferred_auth_context(cnxt, stack)
576
617
:param args: Request parameters/args passed from API
579
LOG.info(_('previewing stack %s') % stack_name)
620
LOG.info(_LI('previewing stack %s'), stack_name)
580
621
stack = self._parse_template_and_validate_stack(cnxt,
590
631
def create_stack(self, cnxt, stack_name, template, params, files, args,
632
owner_id=None, nested_depth=0, user_creds_id=None,
633
stack_user_project_id=None):
593
635
The create_stack method creates a new stack using the template
603
645
:param args: Request parameters/args passed from API
604
646
:param owner_id: parent stack ID for nested stacks, only expected when
605
647
called from another heat-engine (not a user option)
648
:param nested_depth: the nested depth for nested stacks, only expected
649
when called from another heat-engine
650
:param user_creds_id: the parent user_creds record for nested stacks
651
:param stack_user_project_id: the parent stack_user_project_id for
607
LOG.info(_('Creating stack %s') % stack_name)
654
LOG.info(_LI('Creating stack %s'), stack_name)
609
656
def _stack_create(stack):
611
658
if not stack.stack_user_project_id:
612
stack.create_stack_user_project_id()
660
stack.create_stack_user_project_id()
661
except exception.AuthorizationFailure as ex:
662
stack.state_set(stack.action, stack.FAILED,
614
665
# Create/Adopt a stack, and create the periodic task if successful
615
666
if stack.adopt_stack_data:
616
if not cfg.CONF.enable_stack_adopt:
617
raise exception.NotSupported(feature='Stack Adopt')
668
elif stack.status != stack.FAILED:
623
671
if (stack.action in (stack.CREATE, stack.ADOPT)
626
674
# Schedule a periodic watcher task for this stack
627
675
self.stack_watch.start_watch_task(stack.id, cnxt)
629
LOG.info(_("Stack create failed, status %s") % stack.status)
677
LOG.info(_LI("Stack create failed, status %s"), stack.status)
631
679
stack = self._parse_template_and_validate_stack(cnxt,
662
713
# Get the database representation of the existing stack
663
714
db_stack = self._get_stack(cnxt, stack_identity)
664
LOG.info(_('Updating stack %s') % db_stack.name)
715
LOG.info(_LI('Updating stack %s'), db_stack.name)
666
717
current_stack = parser.Stack.load(cnxt, stack=db_stack)
723
774
msg = _("Cancelling update when stack is %s"
724
775
) % str(current_stack.state)
725
776
raise exception.NotSupported(feature=msg)
726
LOG.info(_('Starting cancel of updating stack %s') % db_stack.name)
777
LOG.info(_LI('Starting cancel of updating stack %s') % db_stack.name)
727
778
# stop the running update and take the lock
728
779
# as we cancel only running update, the acquire_result is
729
780
# always some engine_id, not None
757
808
:param template: Template of stack you want to create.
758
809
:param params: Stack Input Params
760
LOG.info(_('validate_template'))
811
LOG.info(_LI('validate_template'))
761
812
if template is None:
762
813
msg = _("No Template provided.")
763
814
return webob.exc.HTTPBadRequest(explanation=msg)
794
845
return {'Error': six.text_type(ex)}
796
847
# validate parameters
797
tmpl_params = tmpl.parameters(None, {})
848
tmpl_params = tmpl.parameters(None, user_params=env.params)
798
849
tmpl_params.validate(validate_value=False, context=cnxt)
799
850
is_real_param = lambda p: p.name not in tmpl_params.PSEUDO_PARAMETERS
800
851
params = tmpl_params.map(api.format_validate_parameter, is_real_param)
854
905
st = self._get_stack(cnxt, stack_identity)
855
LOG.info(_('Deleting stack %s') % st.name)
906
LOG.info(_LI('Deleting stack %s'), st.name)
856
907
stack = parser.Stack.load(cnxt, stack=st)
858
909
lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
905
956
raise exception.NotSupported(feature='Stack Abandon')
907
958
st = self._get_stack(cnxt, stack_identity)
908
LOG.info(_('abandoning stack %s') % st.name)
959
LOG.info(_LI('abandoning stack %s'), st.name)
909
960
stack = parser.Stack.load(cnxt, stack=st)
910
961
lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
911
962
with lock.thread_lock(stack.id):
924
975
:param cnxt: RPC context.
926
return resource.get_types(support_status)
977
return resources.global_env().get_types(support_status)
928
979
def resource_schema(self, cnxt, type_name):
933
984
:param type_name: Name of the resource type to obtain the schema of.
936
resource_class = resource.get_class(type_name)
987
resource_class = resources.global_env().get_class(type_name)
937
988
except exception.StackValidationFailed:
938
989
raise exception.ResourceTypeNotFound(type_name=type_name)
990
except exception.NotFound as ex:
991
raise exception.StackValidationFailed(message=ex.message)
940
993
def properties_schema():
941
994
for name, schema_dict in resource_class.properties_schema.items():
962
1015
:param type_name: Name of the resource type to generate a template for.
966
resource.get_class(type_name).resource_to_template(type_name)
1018
return resources.global_env().get_class(
1019
type_name).resource_to_template(type_name)
967
1020
except exception.StackValidationFailed:
968
1021
raise exception.ResourceTypeNotFound(type_name=type_name)
1022
except exception.NotFound as ex:
1023
raise exception.StackValidationFailed(message=ex.message)
970
1025
@request_context
971
1026
def list_events(self, cnxt, stack_identity, filters=None, limit=None,
1007
1062
stacks[stack_id] = parser.Stack.load(cnxt, stack_id)
1008
1063
return stacks[stack_id]
1010
return [api.format_event(Event.load(cnxt,
1012
get_stack(e.stack_id)))
1065
return [api.format_event(evt.Event.load(cnxt,
1067
get_stack(e.stack_id)))
1013
1068
for e in events]
1015
1070
def _authorize_stack_user(self, cnxt, stack, resource_name):
1034
1089
access_key = ec2_creds.get('access')
1035
1090
return stack.access_allowed(access_key, resource_name)
1092
def _verify_stack_resource(self, stack, resource_name):
1093
if resource_name not in stack:
1094
raise exception.ResourceNotFound(resource_name=resource_name,
1095
stack_name=stack.name)
1097
resource = stack[resource_name]
1098
if resource.id is None:
1099
raise exception.ResourceNotAvailable(resource_name=resource_name)
1037
1101
@request_context
1038
def describe_stack_resource(self, cnxt, stack_identity, resource_name):
1102
def describe_stack_resource(self, cnxt, stack_identity, resource_name,
1039
1104
s = self._get_stack(cnxt, stack_identity)
1040
1105
stack = parser.Stack.load(cnxt, stack=s)
1042
1107
if cfg.CONF.heat_stack_user_role in cnxt.roles:
1043
1108
if not self._authorize_stack_user(cnxt, stack, resource_name):
1044
LOG.warning(_("Access denied to resource %s") % resource_name)
1109
LOG.warn(_LW("Access denied to resource %s"), resource_name)
1045
1110
raise exception.Forbidden()
1047
if resource_name not in stack:
1048
raise exception.ResourceNotFound(resource_name=resource_name,
1049
stack_name=stack.name)
1051
resource = stack[resource_name]
1052
if resource.id is None:
1053
raise exception.ResourceNotAvailable(resource_name=resource_name)
1055
return api.format_stack_resource(stack[resource_name])
1112
self._verify_stack_resource(stack, resource_name)
1114
return api.format_stack_resource(stack[resource_name],
1115
with_attr=with_attr)
1057
1117
@request_context
1058
1118
def resource_signal(self, cnxt, stack_identity, resource_name, details):
1063
1123
# signal doesn't have permission to read the secret key of
1064
1124
# the user associated with the cfn-credentials file
1065
1125
stack = parser.Stack.load(cnxt, stack=s, use_stored_context=True)
1067
if resource_name not in stack:
1068
raise exception.ResourceNotFound(resource_name=resource_name,
1069
stack_name=stack.name)
1071
resource = stack[resource_name]
1072
if resource.id is None:
1073
raise exception.ResourceNotAvailable(resource_name=resource_name)
1126
self._verify_stack_resource(stack, resource_name)
1075
1128
if callable(stack[resource_name].signal):
1076
1129
stack[resource_name].signal(details)
1083
1136
if res.name != resource_name and res.id is not None:
1084
1137
res.metadata_update()
1086
return resource.metadata_get()
1139
return stack[resource_name].metadata_get()
1088
1141
@request_context
1089
1142
def find_physical_resource(self, cnxt, physical_resource_id):
1207
1260
s = self._get_stack(cnxt, stack_identity)
1208
1261
stack = parser.Stack.load(cnxt, stack=s)
1209
LOG.info(_("Checking stack %s") % stack.name)
1262
LOG.info(_LI("Checking stack %s"), stack.name)
1211
1264
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
1214
1267
@request_context
1268
def stack_restore(self, cnxt, stack_identity, snapshot_id):
1269
def _stack_restore(stack, snapshot):
1270
LOG.debug("restoring stack %s" % stack.name)
1271
stack.restore(snapshot)
1273
s = self._get_stack(cnxt, stack_identity)
1274
snapshot = db_api.snapshot_get(cnxt, snapshot_id)
1276
stack = parser.Stack.load(cnxt, stack=s)
1278
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
1279
_stack_restore, stack, snapshot)
1215
1282
def stack_list_snapshots(self, cnxt, stack_identity):
1216
1283
s = self._get_stack(cnxt, stack_identity)
1217
1284
data = db_api.snapshot_get_all(cnxt, s.id)
1297
1364
wrn = [w.name for w in db_api.watch_rule_get_all(cnxt)]
1298
1365
except Exception as ex:
1299
LOG.warn(_('show_watch (all) db error %s') % ex)
1366
LOG.warn(_LW('show_watch (all) db error %s'), ex)
1302
1369
wrs = [watchrule.WatchRule.load(cnxt, w) for w in wrn]
1319
1386
# namespace/metric, but we will want this at some point
1320
1387
# for now, the API can query all metric data and filter locally
1321
1388
if metric_namespace is not None or metric_name is not None:
1322
LOG.error(_("Filtering by namespace/metric not yet supported"))
1389
LOG.error(_LE("Filtering by namespace/metric not yet supported"))
1326
1393
wds = db_api.watch_data_get_all(cnxt)
1327
1394
except Exception as ex:
1328
LOG.warn(_('show_metric (all) db error %s') % ex)
1395
LOG.warn(_LW('show_metric (all) db error %s'), ex)
1331
1398
result = [api.format_watch_data(w) for w in wds]