~ubuntu-branches/ubuntu/vivid/heat/vivid

« back to all changes in this revision

Viewing changes to heat/engine/service.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Corey Bryant
  • Date: 2015-01-06 08:55:22 UTC
  • mfrom: (1.1.21)
  • Revision ID: package-import@ubuntu.com-20150106085522-4o3hnaff5lacvtrf
Tags: 2015.1~b1-0ubuntu1
[ Chuck Short ]
* Open up for vivid.
* debian/control: Update bzr branch. 
* debian/control: Add python-saharaclient,
  python-osprofiler, python-oslo.middleware, python-oslo.serialization.
* debian/patches/fix-reqirements.patch: Refreshed.
* debian/patches/skip-tests.patch: Updated to skip more tests.
* debian/rules: Skip integration tests.

[ Corey Bryant ]
* New upstream release.
  - d/control: Align requirements with upstream.
  - d/watch: Update uversionmangle for kilo beta naming.
  - d/rules: Generate heat.conf.sample and apply patch before copy.
  - d/rules: Run base tests instead of integration tests.
  - d/p/fix-requirements.patch: Refreshed.
  - d/p/remove-gettextutils-import.patch: Cherry picked from master.
* d/control: Bumped Standards-Version to 3.9.6.

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
import eventlet
20
20
from oslo.config import cfg
21
21
from oslo import messaging
 
22
from oslo.serialization import jsonutils
22
23
from oslo.utils import timeutils
 
24
from osprofiler import profiler
23
25
import requests
24
26
import six
25
27
import warnings
28
30
from heat.common import context
29
31
from heat.common import exception
30
32
from heat.common.i18n import _
 
33
from heat.common.i18n import _LE
 
34
from heat.common.i18n import _LI
 
35
from heat.common.i18n import _LW
31
36
from heat.common import identifier
32
37
from heat.common import messaging as rpc_messaging
33
38
from heat.db import api as db_api
35
40
from heat.engine import attributes
36
41
from heat.engine import clients
37
42
from heat.engine import environment
38
 
from heat.engine.event import Event
 
43
from heat.engine import event as evt
39
44
from heat.engine import parameter_groups
40
 
from heat.engine import parser
41
45
from heat.engine import properties
42
 
from heat.engine import resource
43
46
from heat.engine import resources
 
47
from heat.engine import stack as parser
44
48
from heat.engine import stack_lock
45
49
from heat.engine import template as templatem
46
50
from heat.engine import watchrule
47
 
from heat.openstack.common import jsonutils
48
51
from heat.openstack.common import log as logging
49
52
from heat.openstack.common import service
50
53
from heat.openstack.common import threadgroup
93
96
        """
94
97
        pass
95
98
 
 
99
    def _serialize_profile_info(self):
 
100
        prof = profiler.get()
 
101
        trace_info = None
 
102
        if prof:
 
103
            trace_info = {
 
104
                "hmac_key": prof.hmac_key,
 
105
                "base_id": prof.get_base_id(),
 
106
                "parent_id": prof.get_id()
 
107
            }
 
108
        return trace_info
 
109
 
 
110
    def _start_with_trace(self, trace, func, *args, **kwargs):
 
111
        if trace:
 
112
            profiler.init(**trace)
 
113
        return func(*args, **kwargs)
 
114
 
96
115
    def start(self, stack_id, func, *args, **kwargs):
97
116
        """
98
117
        Run the given method in a sub-thread.
99
118
        """
100
119
        if stack_id not in self.groups:
101
120
            self.groups[stack_id] = threadgroup.ThreadGroup()
102
 
        return self.groups[stack_id].add_thread(func, *args, **kwargs)
 
121
        return self.groups[stack_id].add_thread(self._start_with_trace,
 
122
                                                self._serialize_profile_info(),
 
123
                                                func, *args, **kwargs)
103
124
 
104
125
    def start_with_lock(self, cnxt, stack, engine_id, func, *args, **kwargs):
105
126
        """
236
257
        db_stack = db_api.stack_get(admin_context, sid, tenant_safe=False,
237
258
                                    eager_load=True)
238
259
        if not db_stack:
239
 
            LOG.error(_("Unable to retrieve stack %s for periodic task") % sid)
 
260
            LOG.error(_LE("Unable to retrieve stack %s for periodic task"),
 
261
                      sid)
240
262
            return
241
263
        stack = parser.Stack.load(admin_context, stack=db_stack,
242
264
                                  use_stored_context=True)
250
272
        try:
251
273
            wrs = db_api.watch_rule_get_all_by_stack(admin_context, sid)
252
274
        except Exception as ex:
253
 
            LOG.warn(_('periodic_task db error watch rule removed? %(ex)s')
254
 
                     % ex)
 
275
            LOG.warn(_LW('periodic_task db error watch rule removed? %(ex)s'),
 
276
                     ex)
255
277
            return
256
278
 
257
279
        def run_alarm_action(stack, actions, details):
276
298
        self.check_stack_watches(sid)
277
299
 
278
300
 
 
301
@profiler.trace_cls("rpc")
279
302
class EngineListener(service.Service):
280
303
    '''
281
304
    Listen on an AMQP queue named for the engine.  Allows individual
288
311
        super(EngineListener, self).__init__()
289
312
        self.thread_group_mgr = thread_group_mgr
290
313
        self.engine_id = engine_id
 
314
        self.host = host
291
315
 
292
316
    def start(self):
293
317
        super(EngineListener, self).start()
294
318
        self.target = messaging.Target(
295
 
            server=cfg.CONF.host, topic=self.engine_id)
 
319
            server=self.host, topic=self.engine_id)
296
320
        server = rpc_messaging.get_rpc_server(self.target, self)
297
321
        server.start()
298
322
 
313
337
        self.thread_group_mgr.send(stack_id, message)
314
338
 
315
339
 
 
340
@profiler.trace_cls("rpc")
316
341
class EngineService(service.Service):
317
342
    """
318
343
    Manages the running instances from creation to destruction.
324
349
    by the RPC caller.
325
350
    """
326
351
 
327
 
    RPC_API_VERSION = '1.1'
 
352
    RPC_API_VERSION = '1.2'
328
353
 
329
354
    def __init__(self, host, topic, manager=None):
330
355
        super(EngineService, self).__init__()
345
370
                          'deprecated and will be removed in the Juno '
346
371
                          'release.', DeprecationWarning)
347
372
 
 
373
        if cfg.CONF.trusts_delegated_roles:
 
374
            warnings.warn('The default value of "trusts_delegated_roles" '
 
375
                          'option in heat.conf is changed to [] in Kilo '
 
376
                          'and heat will delegate all roles of trustor. '
 
377
                          'Please keep the same if you do not want to '
 
378
                          'delegate subset roles when upgrading.',
 
379
                          Warning)
 
380
 
348
381
    def create_periodic_tasks(self):
349
382
        LOG.debug("Starting periodic watch tasks pid=%s" % os.getpid())
350
383
        # Note with multiple workers, the parent process hasn't called start()
367
400
        LOG.debug("Starting listener for engine %s" % self.engine_id)
368
401
        self.listener.start()
369
402
        target = messaging.Target(
370
 
            version=self.RPC_API_VERSION, server=cfg.CONF.host,
 
403
            version=self.RPC_API_VERSION, server=self.host,
371
404
            topic=self.topic)
372
405
        self.target = target
373
406
        server = rpc_messaging.get_rpc_server(target, self)
379
412
 
380
413
    def stop(self):
381
414
        # Stop rpc connection at first for preventing new requests
382
 
        LOG.info(_("Attempting to stop engine service..."))
 
415
        LOG.info(_LI("Attempting to stop engine service..."))
383
416
        try:
384
417
            self.conn.close()
385
418
        except Exception:
390
423
            # Ignore dummy service task
391
424
            if stack_id == cfg.CONF.periodic_interval:
392
425
                continue
393
 
            LOG.info(_("Waiting stack %s processing to be finished")
394
 
                     % stack_id)
 
426
            LOG.info(_LI("Waiting stack %s processing to be finished"),
 
427
                     stack_id)
395
428
            # Stop threads gracefully
396
429
            self.thread_group_mgr.stop(stack_id, True)
397
 
            LOG.info(_("Stack %s processing was finished") % stack_id)
 
430
            LOG.info(_LI("Stack %s processing was finished"), stack_id)
398
431
 
399
432
        # Terminate the engine process
400
 
        LOG.info(_("All threads were gone, terminating engine"))
 
433
        LOG.info(_LI("All threads were gone, terminating engine"))
401
434
        super(EngineService, self).stop()
402
435
 
403
436
    @request_context
539
572
            raise exception.RequestLimitExceeded(message=message)
540
573
 
541
574
    def _parse_template_and_validate_stack(self, cnxt, stack_name, template,
542
 
                                           params, files, args, owner_id=None):
 
575
                                           params, files, args, owner_id=None,
 
576
                                           nested_depth=0, user_creds_id=None,
 
577
                                           stack_user_project_id=None):
 
578
        # If it is stack-adopt, use parameters from adopt_stack_data
 
579
        common_params = api.extract_args(args)
 
580
        if (rpc_api.PARAM_ADOPT_STACK_DATA in common_params and
 
581
                not cfg.CONF.enable_stack_adopt):
 
582
            raise exception.NotSupported(feature='Stack Adopt')
 
583
 
543
584
        tmpl = templatem.Template(template, files=files)
544
585
        self._validate_new_stack(cnxt, stack_name, tmpl)
545
586
 
546
 
        # If it is stack-adopt, use parameters from adopt_stack_data
547
 
        common_params = api.extract_args(args)
548
 
 
549
587
        if rpc_api.PARAM_ADOPT_STACK_DATA in common_params:
550
588
            params[rpc_api.STACK_PARAMETERS] = common_params[
551
589
                rpc_api.PARAM_ADOPT_STACK_DATA]['environment'][
554
592
        env = environment.Environment(params)
555
593
        stack = parser.Stack(cnxt, stack_name, tmpl, env,
556
594
                             owner_id=owner_id,
 
595
                             nested_depth=nested_depth,
 
596
                             user_creds_id=user_creds_id,
 
597
                             stack_user_project_id=stack_user_project_id,
557
598
                             **common_params)
558
599
 
559
600
        self._validate_deferred_auth_context(cnxt, stack)
576
617
        :param args: Request parameters/args passed from API
577
618
        """
578
619
 
579
 
        LOG.info(_('previewing stack %s') % stack_name)
 
620
        LOG.info(_LI('previewing stack %s'), stack_name)
580
621
        stack = self._parse_template_and_validate_stack(cnxt,
581
622
                                                        stack_name,
582
623
                                                        template,
588
629
 
589
630
    @request_context
590
631
    def create_stack(self, cnxt, stack_name, template, params, files, args,
591
 
                     owner_id=None):
 
632
                     owner_id=None, nested_depth=0, user_creds_id=None,
 
633
                     stack_user_project_id=None):
592
634
        """
593
635
        The create_stack method creates a new stack using the template
594
636
        provided.
603
645
        :param args: Request parameters/args passed from API
604
646
        :param owner_id: parent stack ID for nested stacks, only expected when
605
647
                         called from another heat-engine (not a user option)
 
648
        :param nested_depth: the nested depth for nested stacks, only expected
 
649
                         when called from another heat-engine
 
650
        :param user_creds_id: the parent user_creds record for nested stacks
 
651
        :param stack_user_project_id: the parent stack_user_project_id for
 
652
                         nested stacks
606
653
        """
607
 
        LOG.info(_('Creating stack %s') % stack_name)
 
654
        LOG.info(_LI('Creating stack %s'), stack_name)
608
655
 
609
656
        def _stack_create(stack):
610
657
 
611
658
            if not stack.stack_user_project_id:
612
 
                stack.create_stack_user_project_id()
 
659
                try:
 
660
                    stack.create_stack_user_project_id()
 
661
                except exception.AuthorizationFailure as ex:
 
662
                    stack.state_set(stack.action, stack.FAILED,
 
663
                                    six.text_type(ex))
613
664
 
614
665
            # Create/Adopt a stack, and create the periodic task if successful
615
666
            if stack.adopt_stack_data:
616
 
                if not cfg.CONF.enable_stack_adopt:
617
 
                    raise exception.NotSupported(feature='Stack Adopt')
618
 
 
619
667
                stack.adopt()
620
 
            else:
 
668
            elif stack.status != stack.FAILED:
621
669
                stack.create()
622
670
 
623
671
            if (stack.action in (stack.CREATE, stack.ADOPT)
626
674
                    # Schedule a periodic watcher task for this stack
627
675
                    self.stack_watch.start_watch_task(stack.id, cnxt)
628
676
            else:
629
 
                LOG.info(_("Stack create failed, status %s") % stack.status)
 
677
                LOG.info(_LI("Stack create failed, status %s"), stack.status)
630
678
 
631
679
        stack = self._parse_template_and_validate_stack(cnxt,
632
680
                                                        stack_name,
634
682
                                                        params,
635
683
                                                        files,
636
684
                                                        args,
637
 
                                                        owner_id)
 
685
                                                        owner_id,
 
686
                                                        nested_depth,
 
687
                                                        user_creds_id,
 
688
                                                        stack_user_project_id)
638
689
 
639
690
        stack.store()
640
691
 
661
712
        """
662
713
        # Get the database representation of the existing stack
663
714
        db_stack = self._get_stack(cnxt, stack_identity)
664
 
        LOG.info(_('Updating stack %s') % db_stack.name)
 
715
        LOG.info(_LI('Updating stack %s'), db_stack.name)
665
716
 
666
717
        current_stack = parser.Stack.load(cnxt, stack=db_stack)
667
718
 
723
774
            msg = _("Cancelling update when stack is %s"
724
775
                    ) % str(current_stack.state)
725
776
            raise exception.NotSupported(feature=msg)
726
 
        LOG.info(_('Starting cancel of updating stack %s') % db_stack.name)
 
777
        LOG.info(_LI('Starting cancel of updating stack %s') % db_stack.name)
727
778
        # stop the running update and take the lock
728
779
        # as we cancel only running update, the acquire_result is
729
780
        # always some engine_id, not None
757
808
        :param template: Template of stack you want to create.
758
809
        :param params: Stack Input Params
759
810
        """
760
 
        LOG.info(_('validate_template'))
 
811
        LOG.info(_LI('validate_template'))
761
812
        if template is None:
762
813
            msg = _("No Template provided.")
763
814
            return webob.exc.HTTPBadRequest(explanation=msg)
794
845
                return {'Error': six.text_type(ex)}
795
846
 
796
847
        # validate parameters
797
 
        tmpl_params = tmpl.parameters(None, {})
 
848
        tmpl_params = tmpl.parameters(None, user_params=env.params)
798
849
        tmpl_params.validate(validate_value=False, context=cnxt)
799
850
        is_real_param = lambda p: p.name not in tmpl_params.PSEUDO_PARAMETERS
800
851
        params = tmpl_params.map(api.format_validate_parameter, is_real_param)
852
903
        """
853
904
 
854
905
        st = self._get_stack(cnxt, stack_identity)
855
 
        LOG.info(_('Deleting stack %s') % st.name)
 
906
        LOG.info(_LI('Deleting stack %s'), st.name)
856
907
        stack = parser.Stack.load(cnxt, stack=st)
857
908
 
858
909
        lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
905
956
            raise exception.NotSupported(feature='Stack Abandon')
906
957
 
907
958
        st = self._get_stack(cnxt, stack_identity)
908
 
        LOG.info(_('abandoning stack %s') % st.name)
 
959
        LOG.info(_LI('abandoning stack %s'), st.name)
909
960
        stack = parser.Stack.load(cnxt, stack=st)
910
961
        lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
911
962
        with lock.thread_lock(stack.id):
923
974
 
924
975
        :param cnxt: RPC context.
925
976
        """
926
 
        return resource.get_types(support_status)
 
977
        return resources.global_env().get_types(support_status)
927
978
 
928
979
    def resource_schema(self, cnxt, type_name):
929
980
        """
933
984
        :param type_name: Name of the resource type to obtain the schema of.
934
985
        """
935
986
        try:
936
 
            resource_class = resource.get_class(type_name)
 
987
            resource_class = resources.global_env().get_class(type_name)
937
988
        except exception.StackValidationFailed:
938
989
            raise exception.ResourceTypeNotFound(type_name=type_name)
 
990
        except exception.NotFound as ex:
 
991
            raise exception.StackValidationFailed(message=ex.message)
939
992
 
940
993
        def properties_schema():
941
994
            for name, schema_dict in resource_class.properties_schema.items():
962
1015
        :param type_name: Name of the resource type to generate a template for.
963
1016
        """
964
1017
        try:
965
 
            return \
966
 
                resource.get_class(type_name).resource_to_template(type_name)
 
1018
            return resources.global_env().get_class(
 
1019
                type_name).resource_to_template(type_name)
967
1020
        except exception.StackValidationFailed:
968
1021
            raise exception.ResourceTypeNotFound(type_name=type_name)
 
1022
        except exception.NotFound as ex:
 
1023
            raise exception.StackValidationFailed(message=ex.message)
969
1024
 
970
1025
    @request_context
971
1026
    def list_events(self, cnxt, stack_identity, filters=None, limit=None,
1007
1062
                stacks[stack_id] = parser.Stack.load(cnxt, stack_id)
1008
1063
            return stacks[stack_id]
1009
1064
 
1010
 
        return [api.format_event(Event.load(cnxt,
1011
 
                                            e.id, e,
1012
 
                                            get_stack(e.stack_id)))
 
1065
        return [api.format_event(evt.Event.load(cnxt,
 
1066
                                                e.id, e,
 
1067
                                                get_stack(e.stack_id)))
1013
1068
                for e in events]
1014
1069
 
1015
1070
    def _authorize_stack_user(self, cnxt, stack, resource_name):
1034
1089
        access_key = ec2_creds.get('access')
1035
1090
        return stack.access_allowed(access_key, resource_name)
1036
1091
 
 
1092
    def _verify_stack_resource(self, stack, resource_name):
 
1093
        if resource_name not in stack:
 
1094
            raise exception.ResourceNotFound(resource_name=resource_name,
 
1095
                                             stack_name=stack.name)
 
1096
 
 
1097
        resource = stack[resource_name]
 
1098
        if resource.id is None:
 
1099
            raise exception.ResourceNotAvailable(resource_name=resource_name)
 
1100
 
1037
1101
    @request_context
1038
 
    def describe_stack_resource(self, cnxt, stack_identity, resource_name):
 
1102
    def describe_stack_resource(self, cnxt, stack_identity, resource_name,
 
1103
                                with_attr=None):
1039
1104
        s = self._get_stack(cnxt, stack_identity)
1040
1105
        stack = parser.Stack.load(cnxt, stack=s)
1041
1106
 
1042
1107
        if cfg.CONF.heat_stack_user_role in cnxt.roles:
1043
1108
            if not self._authorize_stack_user(cnxt, stack, resource_name):
1044
 
                LOG.warning(_("Access denied to resource %s") % resource_name)
 
1109
                LOG.warn(_LW("Access denied to resource %s"), resource_name)
1045
1110
                raise exception.Forbidden()
1046
1111
 
1047
 
        if resource_name not in stack:
1048
 
            raise exception.ResourceNotFound(resource_name=resource_name,
1049
 
                                             stack_name=stack.name)
1050
 
 
1051
 
        resource = stack[resource_name]
1052
 
        if resource.id is None:
1053
 
            raise exception.ResourceNotAvailable(resource_name=resource_name)
1054
 
 
1055
 
        return api.format_stack_resource(stack[resource_name])
 
1112
        self._verify_stack_resource(stack, resource_name)
 
1113
 
 
1114
        return api.format_stack_resource(stack[resource_name],
 
1115
                                         with_attr=with_attr)
1056
1116
 
1057
1117
    @request_context
1058
1118
    def resource_signal(self, cnxt, stack_identity, resource_name, details):
1063
1123
        # signal doesn't have permission to read the secret key of
1064
1124
        # the user associated with the cfn-credentials file
1065
1125
        stack = parser.Stack.load(cnxt, stack=s, use_stored_context=True)
1066
 
 
1067
 
        if resource_name not in stack:
1068
 
            raise exception.ResourceNotFound(resource_name=resource_name,
1069
 
                                             stack_name=stack.name)
1070
 
 
1071
 
        resource = stack[resource_name]
1072
 
        if resource.id is None:
1073
 
            raise exception.ResourceNotAvailable(resource_name=resource_name)
 
1126
        self._verify_stack_resource(stack, resource_name)
1074
1127
 
1075
1128
        if callable(stack[resource_name].signal):
1076
1129
            stack[resource_name].signal(details)
1083
1136
            if res.name != resource_name and res.id is not None:
1084
1137
                res.metadata_update()
1085
1138
 
1086
 
        return resource.metadata_get()
 
1139
        return stack[resource_name].metadata_get()
1087
1140
 
1088
1141
    @request_context
1089
1142
    def find_physical_resource(self, cnxt, physical_resource_id):
1206
1259
        '''
1207
1260
        s = self._get_stack(cnxt, stack_identity)
1208
1261
        stack = parser.Stack.load(cnxt, stack=s)
1209
 
        LOG.info(_("Checking stack %s") % stack.name)
 
1262
        LOG.info(_LI("Checking stack %s"), stack.name)
1210
1263
 
1211
1264
        self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
1212
1265
                                              stack.check)
1213
1266
 
1214
1267
    @request_context
 
1268
    def stack_restore(self, cnxt, stack_identity, snapshot_id):
 
1269
        def _stack_restore(stack, snapshot):
 
1270
            LOG.debug("restoring stack %s" % stack.name)
 
1271
            stack.restore(snapshot)
 
1272
 
 
1273
        s = self._get_stack(cnxt, stack_identity)
 
1274
        snapshot = db_api.snapshot_get(cnxt, snapshot_id)
 
1275
 
 
1276
        stack = parser.Stack.load(cnxt, stack=s)
 
1277
 
 
1278
        self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
 
1279
                                              _stack_restore, stack, snapshot)
 
1280
 
 
1281
    @request_context
1215
1282
    def stack_list_snapshots(self, cnxt, stack_identity):
1216
1283
        s = self._get_stack(cnxt, stack_identity)
1217
1284
        data = db_api.snapshot_get_all(cnxt, s.id)
1296
1363
            try:
1297
1364
                wrn = [w.name for w in db_api.watch_rule_get_all(cnxt)]
1298
1365
            except Exception as ex:
1299
 
                LOG.warn(_('show_watch (all) db error %s') % ex)
 
1366
                LOG.warn(_LW('show_watch (all) db error %s'), ex)
1300
1367
                return
1301
1368
 
1302
1369
        wrs = [watchrule.WatchRule.load(cnxt, w) for w in wrn]
1319
1386
        # namespace/metric, but we will want this at some point
1320
1387
        # for now, the API can query all metric data and filter locally
1321
1388
        if metric_namespace is not None or metric_name is not None:
1322
 
            LOG.error(_("Filtering by namespace/metric not yet supported"))
 
1389
            LOG.error(_LE("Filtering by namespace/metric not yet supported"))
1323
1390
            return
1324
1391
 
1325
1392
        try:
1326
1393
            wds = db_api.watch_data_get_all(cnxt)
1327
1394
        except Exception as ex:
1328
 
            LOG.warn(_('show_metric (all) db error %s') % ex)
 
1395
            LOG.warn(_LW('show_metric (all) db error %s'), ex)
1329
1396
            return
1330
1397
 
1331
1398
        result = [api.format_watch_data(w) for w in wds]