~ubuntu-branches/ubuntu/wily/heat/wily

« back to all changes in this revision

Viewing changes to heat/engine/service.py

  • Committer: Package Import Robot
  • Author(s): Corey Bryant, Corey Bryant, James Page
  • Date: 2015-07-07 17:06:19 UTC
  • mfrom: (1.1.26) (45.1.1 vivid-proposed)
  • Revision ID: package-import@ubuntu.com-20150707170619-hra2dbjpfofpou4s
Tags: 1:5.0.0~b1-0ubuntu1
[ Corey Bryant ]
* New upstream milestone for OpenStack Liberty:
  - d/control: Align (build-)depends with upstream.
  - d/p/fix-requirements.patch: Rebased.
  - d/p/sudoers_patch.patch: Rebased.

[ James Page ]
* d/s/options: Ignore any removal of egg-info data during package clean.
* d/control: Drop MySQL and PostgreSQL related BD's, not required for unit
  testing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
from oslo_log import log as logging
23
23
import oslo_messaging as messaging
24
24
from oslo_serialization import jsonutils
 
25
from oslo_service import service
 
26
from oslo_service import threadgroup
25
27
from oslo_utils import uuidutils
26
28
from osprofiler import profiler
27
29
import six
58
60
from heat.objects import stack as stack_object
59
61
from heat.objects import watch_data
60
62
from heat.objects import watch_rule
61
 
from heat.openstack.common import service
62
 
from heat.openstack.common import threadgroup
63
63
from heat.rpc import api as rpc_api
64
64
from heat.rpc import worker_api as rpc_worker_api
65
65
 
135
135
        :param args: Args to be passed to func
136
136
        :param kwargs: Keyword-args to be passed to func.
137
137
        """
138
 
        lock = stack_lock.StackLock(cnxt, stack, engine_id)
139
 
        with lock.thread_lock(stack.id):
 
138
        lock = stack_lock.StackLock(cnxt, stack.id, engine_id)
 
139
        with lock.thread_lock():
140
140
            th = self.start_with_acquired_lock(stack, lock,
141
141
                                               func, *args, **kwargs)
142
142
            return th
156
156
        :param kwargs: Keyword-args to be passed to func
157
157
 
158
158
        """
159
 
        def release(gt, *args):
 
159
        def release(gt):
160
160
            """
161
161
            Callback function that will be passed to GreenThread.link().
162
162
            """
163
 
            lock.release(*args)
 
163
            lock.release()
164
164
 
165
165
        th = self.start(stack.id, func, *args, **kwargs)
166
 
        th.link(release, stack.id)
 
166
        th.link(release)
167
167
        return th
168
168
 
169
169
    def add_timer(self, stack_id, func, *args, **kwargs):
206
206
 
207
207
            for th in threads:
208
208
                th.link(mark_done, th)
209
 
            while not all(links_done.values()):
 
209
            while not all(six.itervalues(links_done)):
210
210
                eventlet.sleep()
211
211
 
212
212
    def send(self, stack_id, message):
266
266
    by the RPC caller.
267
267
    """
268
268
 
269
 
    RPC_API_VERSION = '1.7'
 
269
    RPC_API_VERSION = '1.9'
270
270
 
271
271
    def __init__(self, host, topic, manager=None):
272
272
        super(EngineService, self).__init__()
315
315
        admin_context = context.get_admin_context()
316
316
        stacks = stack_object.Stack.get_all(
317
317
            admin_context,
318
 
            tenant_safe=False)
 
318
            tenant_safe=False,
 
319
            show_hidden=True)
319
320
        for s in stacks:
320
321
            self.stack_watch.start_watch_task(s.id, admin_context)
321
322
 
376
377
                     self.engine_id)
377
378
 
378
379
        # Wait for all active threads to be finished
379
 
        for stack_id in self.thread_group_mgr.groups.keys():
 
380
        for stack_id in list(self.thread_group_mgr.groups.keys()):
380
381
            # Ignore dummy service task
381
382
            if stack_id == cfg.CONF.periodic_interval:
382
383
                continue
395
396
        LOG.info(_LI("All threads were gone, terminating engine"))
396
397
        super(EngineService, self).stop()
397
398
 
 
399
    def reset(self):
 
400
        super(EngineService, self).reset()
 
401
        logging.setup(cfg.CONF, 'heat')
 
402
 
398
403
    @context.request_context
399
404
    def identify_stack(self, cnxt, stack_name):
400
405
        """
466
471
    @context.request_context
467
472
    def list_stacks(self, cnxt, limit=None, marker=None, sort_keys=None,
468
473
                    sort_dir=None, filters=None, tenant_safe=True,
469
 
                    show_deleted=False, show_nested=False):
 
474
                    show_deleted=False, show_nested=False, show_hidden=False,
 
475
                    tags=None, tags_any=None, not_tags=None,
 
476
                    not_tags_any=None):
470
477
        """
471
478
        The list_stacks method returns attributes of all stacks.  It supports
472
479
        pagination (``limit`` and ``marker``), sorting (``sort_keys`` and
481
488
        :param tenant_safe: if true, scope the request by the current tenant
482
489
        :param show_deleted: if true, show soft-deleted stacks
483
490
        :param show_nested: if true, show nested stacks
 
491
        :param show_hidden: if true, show hidden stacks
 
492
        :param tags: show stacks containing these tags, combine multiple
 
493
            tags using the boolean AND expression
 
494
        :param tags_any: show stacks containing these tags, combine multiple
 
495
            tags using the boolean OR expression
 
496
        :param not_tags: show stacks not containing these tags, combine
 
497
            multiple tags using the boolean AND expression
 
498
        :param not_tags_any: show stacks not containing these tags, combine
 
499
            multiple tags using the boolean OR expression
484
500
        :returns: a list of formatted stacks
485
501
        """
486
502
        stacks = parser.Stack.load_all(cnxt, limit, marker, sort_keys,
487
503
                                       sort_dir, filters, tenant_safe,
488
504
                                       show_deleted, resolve_data=False,
489
 
                                       show_nested=show_nested)
 
505
                                       show_nested=show_nested,
 
506
                                       show_hidden=show_hidden,
 
507
                                       tags=tags, tags_any=tags_any,
 
508
                                       not_tags=not_tags,
 
509
                                       not_tags_any=not_tags_any)
490
510
        return [api.format_stack(stack) for stack in stacks]
491
511
 
492
512
    @context.request_context
493
513
    def count_stacks(self, cnxt, filters=None, tenant_safe=True,
494
 
                     show_deleted=False, show_nested=False):
 
514
                     show_deleted=False, show_nested=False, show_hidden=False,
 
515
                     tags=None, tags_any=None, not_tags=None,
 
516
                     not_tags_any=None):
495
517
        """
496
518
        Return the number of stacks that match the given filters
497
519
        :param cnxt: RPC context.
499
521
        :param tenant_safe: if true, scope the request by the current tenant
500
522
        :param show_deleted: if true, count will include the deleted stacks
501
523
        :param show_nested: if true, count will include nested stacks
 
524
        :param show_hidden: if true, count will include hidden stacks
 
525
        :param tags: count stacks containing these tags, combine multiple tags
 
526
            using the boolean AND expression
 
527
        :param tags_any: count stacks containing these tags, combine multiple
 
528
            tags using the boolean OR expression
 
529
        :param not_tags: count stacks not containing these tags, combine
 
530
            multiple tags using the boolean AND expression
 
531
        :param not_tags_any: count stacks not containing these tags, combine
 
532
            multiple tags using the boolean OR expression
502
533
        :returns: a integer representing the number of matched stacks
503
534
        """
504
535
        return stack_object.Stack.count_all(
506
537
            filters=filters,
507
538
            tenant_safe=tenant_safe,
508
539
            show_deleted=show_deleted,
509
 
            show_nested=show_nested)
 
540
            show_nested=show_nested,
 
541
            show_hidden=show_hidden,
 
542
            tags=tags,
 
543
            tags_any=tags_any,
 
544
            not_tags=not_tags,
 
545
            not_tags_any=not_tags_any)
510
546
 
511
547
    def _validate_deferred_auth_context(self, cnxt, stack):
512
548
        if cfg.CONF.deferred_auth_method != 'password':
523
559
    def _validate_new_stack(self, cnxt, stack_name, parsed_template):
524
560
        try:
525
561
            parsed_template.validate()
 
562
        except AssertionError:
 
563
            raise
526
564
        except Exception as ex:
527
565
            raise exception.StackValidationFailed(message=six.text_type(ex))
528
566
 
535
573
                        " Please delete some stacks.") % tenant_limit
536
574
            raise exception.RequestLimitExceeded(message=message)
537
575
 
 
576
        max_resources = cfg.CONF.max_resources_per_stack
 
577
        if max_resources == -1:
 
578
            return
538
579
        num_resources = len(parsed_template[parsed_template.RESOURCES])
539
 
        if num_resources > cfg.CONF.max_resources_per_stack:
 
580
        if num_resources > max_resources:
540
581
            message = exception.StackResourceLimitExceeded.msg_fmt
541
582
            raise exception.RequestLimitExceeded(message=message)
542
583
 
546
587
                                           stack_user_project_id=None,
547
588
                                           convergence=False,
548
589
                                           parent_resource_name=None):
 
590
        common_params = api.extract_args(args)
 
591
 
549
592
        # If it is stack-adopt, use parameters from adopt_stack_data
550
 
        common_params = api.extract_args(args)
551
 
        if (rpc_api.PARAM_ADOPT_STACK_DATA in common_params and
552
 
                not cfg.CONF.enable_stack_adopt):
553
 
            raise exception.NotSupported(feature='Stack Adopt')
554
 
 
555
593
        if rpc_api.PARAM_ADOPT_STACK_DATA in common_params:
 
594
            if not cfg.CONF.enable_stack_adopt:
 
595
                raise exception.NotSupported(feature='Stack Adopt')
 
596
 
556
597
            # Override the params with values given with -P option
557
598
            new_params = common_params[rpc_api.PARAM_ADOPT_STACK_DATA][
558
599
                'environment'][rpc_api.STACK_PARAMETERS].copy()
636
677
        """
637
678
        LOG.info(_LI('Creating stack %s'), stack_name)
638
679
 
639
 
        def _stack_create(stack):
640
 
 
 
680
        def _create_stack_user(stack):
641
681
            if not stack.stack_user_project_id:
642
682
                try:
643
683
                    stack.create_stack_user_project_id()
645
685
                    stack.state_set(stack.action, stack.FAILED,
646
686
                                    six.text_type(ex))
647
687
 
 
688
        def _stack_create(stack):
 
689
            _create_stack_user(stack)
648
690
            # Create/Adopt a stack, and create the periodic task if successful
649
691
            if stack.adopt_stack_data:
650
692
                stack.adopt()
660
702
                LOG.info(_LI("Stack create failed, status %s"), stack.status)
661
703
 
662
704
        convergence = cfg.CONF.convergence_engine
663
 
        if convergence:
664
 
            raise exception.NotSupported(feature=_('Convergence engine'))
665
705
 
666
706
        stack = self._parse_template_and_validate_stack(
667
707
            cnxt, stack_name, template, params, files, args, owner_id,
668
708
            nested_depth, user_creds_id, stack_user_project_id, convergence,
669
709
            parent_resource_name)
670
710
 
671
 
        stack.store()
672
 
 
673
 
        self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
674
 
                                              _stack_create, stack)
 
711
        # once validations are done
 
712
        # if convergence is enabled, take convergence path
 
713
        if convergence:
 
714
            # TODO(later): call _create_stack_user(stack)
 
715
            # call stack.converge_stack(template=stack.t, action=stack.CREATE)
 
716
            raise exception.NotSupported(feature=_('Convergence engine'))
 
717
        else:
 
718
            stack.store()
 
719
            self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
 
720
                                                  _stack_create, stack)
675
721
 
676
722
        return dict(stack.identifier())
677
723
 
713
759
                current_stack.env,
714
760
                args.get(rpc_api.PARAM_CLEAR_PARAMETERS, []))
715
761
        tmpl = templatem.Template(template, files=files, env=env)
716
 
        if len(tmpl[tmpl.RESOURCES]) > cfg.CONF.max_resources_per_stack:
 
762
        max_resources = cfg.CONF.max_resources_per_stack
 
763
        if max_resources != -1 and len(tmpl[tmpl.RESOURCES]) > max_resources:
717
764
            raise exception.RequestLimitExceeded(
718
765
                message=exception.StackResourceLimitExceeded.msg_fmt)
719
766
        stack_name = current_stack.name
733
780
        self._validate_deferred_auth_context(cnxt, updated_stack)
734
781
        updated_stack.validate()
735
782
 
736
 
        event = eventlet.event.Event()
737
 
        th = self.thread_group_mgr.start_with_lock(cnxt, current_stack,
738
 
                                                   self.engine_id,
739
 
                                                   current_stack.update,
740
 
                                                   updated_stack,
741
 
                                                   event=event)
742
 
        th.link(self.thread_group_mgr.remove_event, current_stack.id, event)
743
 
        self.thread_group_mgr.add_event(current_stack.id, event)
 
783
        # Once all the validations are done
 
784
        # if convergence is enabled, take the convergence path
 
785
        if current_kwargs['convergence']:
 
786
            current_stack.converge_stack(template=tmpl)
 
787
        else:
 
788
            event = eventlet.event.Event()
 
789
            th = self.thread_group_mgr.start_with_lock(cnxt, current_stack,
 
790
                                                       self.engine_id,
 
791
                                                       current_stack.update,
 
792
                                                       updated_stack,
 
793
                                                       event=event)
 
794
            th.link(self.thread_group_mgr.remove_event,
 
795
                    current_stack.id, event)
 
796
            self.thread_group_mgr.add_event(current_stack.id, event)
744
797
        return dict(current_stack.identifier())
745
798
 
746
799
    @context.request_context
763
816
        # stop the running update and take the lock
764
817
        # as we cancel only running update, the acquire_result is
765
818
        # always some engine_id, not None
766
 
        lock = stack_lock.StackLock(cnxt, current_stack,
 
819
        lock = stack_lock.StackLock(cnxt, current_stack.id,
767
820
                                    self.engine_id)
768
821
        engine_id = lock.try_acquire()
769
822
        # Current engine has the lock
774
827
        elif stack_lock.StackLock.engine_alive(cnxt, engine_id):
775
828
            cancel_result = self._remote_call(
776
829
                cnxt, engine_id, self.listener.SEND,
777
 
                stack_identity, rpc_api.THREAD_CANCEL)
 
830
                stack_identity=stack_identity, message=rpc_api.THREAD_CANCEL)
778
831
            if cancel_result is None:
779
832
                LOG.debug("Successfully sent %(msg)s message "
780
833
                          "to remote task on engine %(eng)s" % {
811
864
 
812
865
        env = environment.Environment(params)
813
866
 
814
 
        for res in tmpl_resources.values():
 
867
        for name, res in six.iteritems(tmpl_resources):
815
868
            ResourceClass = env.get_class(res['Type'])
816
869
            if ResourceClass == resources.template_resource.TemplateResource:
817
870
                # we can't validate a TemplateResource unless we instantiate
819
872
                # parameters into properties_schema.
820
873
                continue
821
874
 
822
 
            props = properties.Properties(ResourceClass.properties_schema,
823
 
                                          res.get('Properties', {}),
824
 
                                          context=cnxt)
 
875
            props = properties.Properties(
 
876
                ResourceClass.properties_schema,
 
877
                res.get('Properties', {}),
 
878
                parent_name=six.text_type(name),
 
879
                context=cnxt,
 
880
                section='Properties')
825
881
            deletion_policy = res.get('DeletionPolicy', 'Delete')
826
882
            try:
827
883
                ResourceClass.validate_deletion_policy(deletion_policy)
867
923
            return s.raw_template.template
868
924
        return None
869
925
 
870
 
    def _remote_call(self, cnxt, lock_engine_id, call, *args, **kwargs):
 
926
    def _remote_call(self, cnxt, lock_engine_id, call, **kwargs):
871
927
        timeout = cfg.CONF.engine_life_check_timeout
872
928
        self.cctxt = self._client.prepare(
873
929
            version='1.0',
875
931
            topic=rpc_api.LISTENER_TOPIC,
876
932
            server=lock_engine_id)
877
933
        try:
878
 
            self.cctxt.call(cnxt, call, *args, **kwargs)
 
934
            self.cctxt.call(cnxt, call, **kwargs)
879
935
        except messaging.MessagingTimeout:
880
936
            return False
881
937
 
892
948
        LOG.info(_LI('Deleting stack %s'), st.name)
893
949
        stack = parser.Stack.load(cnxt, stack=st)
894
950
 
895
 
        lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
896
 
        with lock.try_thread_lock(stack.id) as acquire_result:
 
951
        if stack.convergence:
 
952
            template = templatem.Template.create_empty_template()
 
953
            stack.converge_stack(template=template, action=stack.DELETE)
 
954
            return
 
955
 
 
956
        lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id)
 
957
        with lock.try_thread_lock() as acquire_result:
897
958
 
898
959
            # Successfully acquired lock
899
960
            if acquire_result is None:
944
1005
        st = self._get_stack(cnxt, stack_identity)
945
1006
        LOG.info(_LI('abandoning stack %s'), st.name)
946
1007
        stack = parser.Stack.load(cnxt, stack=st)
947
 
        lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
948
 
        with lock.thread_lock(stack.id):
 
1008
        lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id)
 
1009
        with lock.thread_lock():
949
1010
            # Get stack details before deleting it.
950
1011
            stack_info = stack.prepare_abandon()
951
1012
            self.thread_group_mgr.start_with_acquired_lock(stack,
971
1032
        """
972
1033
        try:
973
1034
            resource_class = resources.global_env().get_class(type_name)
974
 
        except exception.StackValidationFailed:
975
 
            raise exception.ResourceTypeNotFound(type_name=type_name)
976
 
        except exception.NotFound as ex:
977
 
            raise exception.StackValidationFailed(message=ex.message)
 
1035
        except (exception.InvalidResourceType,
 
1036
                exception.ResourceTypeNotFound,
 
1037
                exception.TemplateNotFound) as ex:
 
1038
            raise ex
978
1039
 
979
1040
        def properties_schema():
980
1041
            for name, schema_dict in resource_class.properties_schema.items():
985
1046
        def attributes_schema():
986
1047
            for name, schema_data in resource_class.attributes_schema.items():
987
1048
                schema = attributes.Schema.from_attribute(schema_data)
988
 
                yield name, {schema.DESCRIPTION: schema.description}
 
1049
                yield name, dict(schema)
989
1050
 
990
1051
        return {
991
1052
            rpc_api.RES_SCHEMA_RES_TYPE: type_name,
992
1053
            rpc_api.RES_SCHEMA_PROPERTIES: dict(properties_schema()),
993
1054
            rpc_api.RES_SCHEMA_ATTRIBUTES: dict(attributes_schema()),
 
1055
            rpc_api.RES_SCHEMA_SUPPORT_STATUS:
 
1056
                resource_class.support_status.to_dict(),
994
1057
        }
995
1058
 
996
 
    def generate_template(self, cnxt, type_name):
 
1059
    def generate_template(self, cnxt, type_name, template_type='cfn'):
997
1060
        """
998
1061
        Generate a template based on the specified type.
999
1062
 
1000
1063
        :param cnxt: RPC context.
1001
1064
        :param type_name: Name of the resource type to generate a template for.
 
1065
        :param template_type: the template type to generate, cfn or hot.
1002
1066
        """
1003
1067
        try:
1004
1068
            return resources.global_env().get_class(
1005
 
                type_name).resource_to_template(type_name)
1006
 
        except exception.StackValidationFailed:
1007
 
            raise exception.ResourceTypeNotFound(type_name=type_name)
1008
 
        except exception.NotFound as ex:
1009
 
            raise exception.StackValidationFailed(message=ex.message)
 
1069
                type_name).resource_to_template(type_name, template_type)
 
1070
        except (exception.InvalidResourceType,
 
1071
                exception.ResourceTypeNotFound,
 
1072
                exception.TemplateNotFound) as ex:
 
1073
            raise ex
1010
1074
 
1011
1075
    @context.request_context
1012
1076
    def list_events(self, cnxt, stack_identity, filters=None, limit=None,
1115
1179
                          implementation.
1116
1180
        '''
1117
1181
 
1118
 
        def _resource_signal(rsrc, details):
1119
 
            stack = rsrc.stack
 
1182
        def _resource_signal(stack, rsrc, details):
1120
1183
            LOG.debug("signaling resource %s:%s" % (stack.name, rsrc.name))
1121
1184
            rsrc.signal(details)
1122
1185
 
1141
1204
        rsrc = stack[resource_name]
1142
1205
        if callable(rsrc.signal):
1143
1206
            if sync_call:
1144
 
                _resource_signal(rsrc, details)
 
1207
                _resource_signal(stack, rsrc, details)
1145
1208
                return rsrc.metadata_get()
1146
1209
            else:
1147
1210
                self.thread_group_mgr.start(stack.id, _resource_signal,
1148
 
                                            rsrc, details)
 
1211
                                            stack, rsrc, details)
1149
1212
 
1150
1213
    @context.request_context
1151
1214
    def find_physical_resource(self, cnxt, physical_resource_id):
1220
1283
    @context.request_context
1221
1284
    def stack_snapshot(self, cnxt, stack_identity, name):
1222
1285
        def _stack_snapshot(stack, snapshot):
1223
 
            LOG.debug("snapshotting stack %s" % stack.name)
1224
 
            stack.snapshot()
1225
 
            data = stack.prepare_abandon()
1226
 
            snapshot_object.Snapshot.update(
1227
 
                cnxt, snapshot.id,
1228
 
                {'data': data, 'status': stack.status,
1229
 
                 'status_reason': stack.status_reason})
 
1286
 
 
1287
            def save_snapshot(stack, action, status, reason):
 
1288
                """Function that saves snapshot before snapshot complete."""
 
1289
                data = stack.prepare_abandon()
 
1290
                data["status"] = status
 
1291
                snapshot_object.Snapshot.update(
 
1292
                    cnxt, snapshot.id,
 
1293
                    {'data': data, 'status': status,
 
1294
                     'status_reason': reason})
 
1295
 
 
1296
            LOG.debug("Snapshotting stack %s" % stack.name)
 
1297
            stack.snapshot(save_snapshot_func=save_snapshot)
1230
1298
 
1231
1299
        s = self._get_stack(cnxt, stack_identity)
1232
1300
 
1239
1307
            raise exception.ActionInProgress(stack_name=stack.name,
1240
1308
                                             action=stack.action)
1241
1309
 
1242
 
        lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
 
1310
        lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id)
1243
1311
 
1244
 
        with lock.thread_lock(stack.id):
 
1312
        with lock.thread_lock():
1245
1313
            snapshot = snapshot_object.Snapshot.create(cnxt, {
1246
1314
                'tenant': cnxt.tenant_id,
1247
1315
                'name': name,
1253
1321
 
1254
1322
    @context.request_context
1255
1323
    def show_snapshot(self, cnxt, stack_identity, snapshot_id):
1256
 
        snapshot = snapshot_object.Snapshot.get_by_id(cnxt, snapshot_id)
 
1324
        s = self._get_stack(cnxt, stack_identity)
 
1325
        snapshot = snapshot_object.Snapshot.get_snapshot_by_stack(
 
1326
            cnxt, snapshot_id, s)
1257
1327
        return api.format_snapshot(snapshot)
1258
1328
 
1259
1329
    @context.request_context
1264
1334
 
1265
1335
        s = self._get_stack(cnxt, stack_identity)
1266
1336
        stack = parser.Stack.load(cnxt, stack=s)
1267
 
        snapshot = snapshot_object.Snapshot.get_by_id(cnxt, snapshot_id)
 
1337
        snapshot = snapshot_object.Snapshot.get_snapshot_by_stack(
 
1338
            cnxt, snapshot_id, s)
 
1339
        if snapshot.status == stack.IN_PROGRESS:
 
1340
            msg = _('Deleting in-progress snapshot')
 
1341
            raise exception.NotSupported(feature=msg)
 
1342
 
1268
1343
        self.thread_group_mgr.start(
1269
1344
            stack.id, _delete_snapshot, stack, snapshot)
1270
1345
 
1287
1362
            stack.restore(snapshot)
1288
1363
 
1289
1364
        s = self._get_stack(cnxt, stack_identity)
1290
 
        snapshot = snapshot_object.Snapshot.get_by_id(cnxt, snapshot_id)
1291
 
 
1292
1365
        stack = parser.Stack.load(cnxt, stack=s)
 
1366
        snapshot = snapshot_object.Snapshot.get_snapshot_by_stack(
 
1367
            cnxt, snapshot_id, s)
1293
1368
 
1294
1369
        self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
1295
1370
                                              _stack_restore, stack, snapshot)
1301
1376
        return [api.format_snapshot(snapshot) for snapshot in data]
1302
1377
 
1303
1378
    @context.request_context
1304
 
    def metadata_update(self, cnxt, stack_identity,
1305
 
                        resource_name, metadata):
1306
 
        """
1307
 
        Update the metadata for the given resource.
1308
 
        DEPRECATED: Use resource_signal instead
1309
 
        """
1310
 
        warnings.warn('metadata_update is deprecated, '
1311
 
                      'use resource_signal instead',
1312
 
                      DeprecationWarning)
1313
 
 
1314
 
        s = self._get_stack(cnxt, stack_identity)
1315
 
 
1316
 
        stack = parser.Stack.load(cnxt, stack=s)
1317
 
        if resource_name not in stack:
1318
 
            raise exception.ResourceNotFound(resource_name=resource_name,
1319
 
                                             stack_name=stack.name)
1320
 
 
1321
 
        resource = stack[resource_name]
1322
 
        resource.metadata_update(new_metadata=metadata)
1323
 
 
1324
 
        # This is not "nice" converting to the stored context here,
1325
 
        # but this happens because the keystone user associated with the
1326
 
        # WaitCondition doesn't have permission to read the secret key of
1327
 
        # the user associated with the cfn-credentials file
1328
 
        refresh_stack = parser.Stack.load(cnxt, stack=s,
1329
 
                                          use_stored_context=True)
1330
 
 
1331
 
        # Refresh the metadata for all other resources, since we expect
1332
 
        # resource_name to be a WaitCondition resource, and other
1333
 
        # resources may refer to WaitCondition Fn::GetAtt Data, which
1334
 
        # is updated here.
1335
 
        for res in refresh_stack.dependencies:
1336
 
            if res.name != resource_name and res.id is not None:
1337
 
                res.metadata_update()
1338
 
 
1339
 
        return resource.metadata_get()
1340
 
 
1341
 
    @context.request_context
1342
1379
    def create_watch_data(self, cnxt, watch_name, stats_data):
1343
1380
        '''
1344
1381
        This could be used by CloudWatch and WaitConditions
1522
1559
    def service_manage_report(self):
1523
1560
        cnxt = context.get_admin_context()
1524
1561
 
1525
 
        if self.service_id is not None:
1526
 
            # Service is already running
1527
 
            service_objects.Service.update_by_id(
1528
 
                cnxt,
1529
 
                self.service_id,
1530
 
                dict(deleted_at=None))
1531
 
            LOG.info(_LI('Service %s is updated'), self.service_id)
1532
 
        else:
 
1562
        if self.service_id is None:
1533
1563
            service_ref = service_objects.Service.create(
1534
1564
                cnxt,
1535
1565
                dict(host=self.host,
1542
1572
            self.service_id = service_ref['id']
1543
1573
            LOG.info(_LI('Service %s is started'), self.service_id)
1544
1574
 
 
1575
        try:
 
1576
            service_objects.Service.update_by_id(
 
1577
                cnxt,
 
1578
                self.service_id,
 
1579
                dict(deleted_at=None))
 
1580
            LOG.info(_LI('Service %s is updated'), self.service_id)
 
1581
        except Exception as ex:
 
1582
            LOG.error(_LE('Service %(service_id)s update '
 
1583
                          'failed: %(error)s'),
 
1584
                      {'service_id': self.service_id, 'error': ex})
 
1585
 
1545
1586
    def service_manage_cleanup(self):
1546
1587
        cnxt = context.get_admin_context()
1547
1588
        last_updated_window = (3 * cfg.CONF.periodic_interval)
1567
1608
                                            filters=filters,
1568
1609
                                            tenant_safe=False) or []
1569
1610
        for s in stacks:
1570
 
            stk = parser.Stack.load(cnxt, stack=s,
1571
 
                                    use_stored_context=True)
1572
 
            lock = stack_lock.StackLock(cnxt, stk, self.engine_id)
 
1611
            lock = stack_lock.StackLock(cnxt, s.id, self.engine_id)
1573
1612
            # If stacklock is released, means stack status may changed.
1574
1613
            engine_id = lock.get_engine_id()
1575
1614
            if not engine_id:
1579
1618
                lock.acquire(retry=False)
1580
1619
            except exception.ActionInProgress:
1581
1620
                continue
 
1621
            stk = parser.Stack.load(cnxt, stack=s,
 
1622
                                    use_stored_context=True)
1582
1623
            LOG.info(_LI('Engine %(engine)s went down when stack %(stack_id)s'
1583
1624
                         ' was in action %(action)s'),
1584
1625
                     {'engine': engine_id, 'action': stk.action,