1
# -*- coding: utf-8 -*-
3
# vim: tabstop=4 shiftwidth=4 softtabstop=4
5
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
7
# Licensed under the Apache License, Version 2.0 (the "License"); you may
8
# not use this file except in compliance with the License. You may obtain
9
# a copy of the License at
11
# http://www.apache.org/licenses/LICENSE-2.0
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
# License for the specific language governing permissions and limitations
22
from cinder.openstack.common import uuidutils
24
from cinder.taskflow import decorators
25
from cinder.taskflow import exceptions as exc
26
from cinder.taskflow import states
27
from cinder.taskflow import utils
31
"""The base abstract class of all flow implementations.
33
It provides a set of parents to flows that have a concept of parent flows
34
as well as a state and state utility functions to the deriving classes. It
35
also provides a name and an identifier (uuid or other) to the flow so that
36
it can be uniquely identifed among many flows.
38
Flows are expected to provide (if desired) the following methods:
48
__metaclass__ = abc.ABCMeta
50
# Common states that certain actions can be performed in. If the flow
51
# is not in these sets of states then it is likely that the flow operation
53
RESETTABLE_STATES = set([
59
SOFT_RESETTABLE_STATES = set([
62
UNINTERRUPTIBLE_STATES = set([
67
RUNNABLE_STATES = set([
71
def __init__(self, name, parents=None, uuid=None):
72
self._name = str(name)
73
# The state of this flow.
74
self._state = states.PENDING
75
# If this flow has a parent flow/s which need to be reverted if
76
# this flow fails then please include them here to allow this child
77
# to call the parents...
79
self.parents = tuple(parents)
82
# Any objects that want to listen when a wf/task starts/stops/completes
83
# or errors should be registered here. This can be used to monitor
84
# progress and record tasks finishing (so that it becomes possible to
85
# store the result of a task in some persistent or semi-persistent
87
self.notifier = utils.TransitionNotifier()
88
self.task_notifier = utils.TransitionNotifier()
89
# Ensure that modifications and/or multiple runs aren't happening
90
# at the same time in the same flow at the same time.
91
self._lock = threading.RLock()
92
# Assign this flow a unique identifer.
96
self._id = uuidutils.generate_uuid()
100
"""A non-unique name for this flow (human readable)"""
105
"""Uniquely identifies this flow"""
106
return "f-%s" % (self._id)
110
"""Provides a read-only view of the flow state."""
113
def _change_state(self, context, new_state):
115
old_state = self.state
117
if self.state != new_state:
118
old_state = self.state
119
self._state = new_state
122
# Don't notify while holding the lock.
123
self.notifier.notify(self.state, details={
126
'old_state': old_state,
130
lines = ["Flow: %s" % (self.name)]
131
lines.append("%s" % (self.uuid))
132
lines.append("%s" % (len(self.parents)))
133
lines.append("%s" % (self.state))
134
return "; ".join(lines)
138
"""Adds a given task to this flow.
140
Returns the uuid that is associated with the task for later operations
141
before and after it is ran.
143
raise NotImplementedError()
146
def add_many(self, tasks):
147
"""Adds many tasks to this flow.
149
Returns a list of uuids (one for each task added).
153
uuids.append(self.add(t))
157
"""Attempts to interrupt the current flow and any tasks that are
158
currently not running in the flow.
160
Returns how many tasks were interrupted (if any).
162
if self.state in self.UNINTERRUPTIBLE_STATES:
163
raise exc.InvalidStateException(("Can not interrupt when"
164
" in state %s") % (self.state))
165
# Note(harlowja): Do *not* acquire the lock here so that the flow may
166
# be interrupted while running. This does mean the the above check may
167
# not be valid but we can worry about that if it becomes an issue.
168
old_state = self.state
169
if old_state != states.INTERRUPTED:
170
self._state = states.INTERRUPTED
171
self.notifier.notify(self.state, details={
174
'old_state': old_state,
180
"""Fully resets the internal state of this flow, allowing for the flow
183
Note: Listeners are also reset.
185
if self.state not in self.RESETTABLE_STATES:
186
raise exc.InvalidStateException(("Can not reset when"
187
" in state %s") % (self.state))
188
self.notifier.reset()
189
self.task_notifier.reset()
190
self._change_state(None, states.PENDING)
193
def soft_reset(self):
194
"""Partially resets the internal state of this flow, allowing for the
195
flow to be ran again from an interrupted state only.
197
if self.state not in self.SOFT_RESETTABLE_STATES:
198
raise exc.InvalidStateException(("Can not soft reset when"
199
" in state %s") % (self.state))
200
self._change_state(None, states.PENDING)
203
def run(self, context, *args, **kwargs):
204
"""Executes the workflow."""
205
if self.state not in self.RUNNABLE_STATES:
206
raise exc.InvalidStateException("Unable to run flow when "
207
"in state %s" % (self.state))
210
def rollback(self, context, cause):
211
"""Performs rollback of this workflow and any attached parent workflows