~ubuntu-branches/ubuntu/utopic/cinder/utopic

« back to all changes in this revision

Viewing changes to cinder/taskflow/patterns/base.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, James Page, Adam Gandelman, Chuck Short
  • Date: 2013-09-08 21:09:46 UTC
  • mfrom: (1.1.18)
  • Revision ID: package-import@ubuntu.com-20130908210946-3dbzq1jy5uji4wad
Tags: 1:2013.2~b3-0ubuntu1
[ James Page ]
* d/control: Switch ceph-common -> python-ceph inline with upstream
  refactoring of Ceph RBD driver, move to Suggests of python-cinder.
  (LP: #1190791). 

[ Adam Gandelman ]
* debian/patches/avoid_paramiko_vers_depends.patch: Dropped, no longer
  required.
* Add minimum requirement python-greenlet (>= 0.3.2).
* Add minimum requirement python-eventlet (>= 0.12.0).
* Add minimum requirement python-paramiko (>= 1.8).

[ Chuck Short ]
* New upstream release.
* debian/patches/skip-sqlachemy-failures.patch: Skip testfailures
  with sqlalchemy 0.8 until they are fixed upstream.
* debian/control: Add python-babel to build-depends.
* debian/control: Add python-novaclient to build-depends.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
 
 
3
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
4
 
 
5
#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
 
6
#
 
7
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
8
#    not use this file except in compliance with the License. You may obtain
 
9
#    a copy of the License at
 
10
#
 
11
#         http://www.apache.org/licenses/LICENSE-2.0
 
12
#
 
13
#    Unless required by applicable law or agreed to in writing, software
 
14
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
15
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
16
#    License for the specific language governing permissions and limitations
 
17
#    under the License.
 
18
 
 
19
import abc
 
20
import threading
 
21
 
 
22
from cinder.openstack.common import uuidutils
 
23
 
 
24
from cinder.taskflow import decorators
 
25
from cinder.taskflow import exceptions as exc
 
26
from cinder.taskflow import states
 
27
from cinder.taskflow import utils
 
28
 
 
29
 
 
30
class Flow(object):
 
31
    """The base abstract class of all flow implementations.
 
32
 
 
33
    It provides a set of parents to flows that have a concept of parent flows
 
34
    as well as a state and state utility functions to the deriving classes. It
 
35
    also provides a name and an identifier (uuid or other) to the flow so that
 
36
    it can be uniquely identifed among many flows.
 
37
 
 
38
    Flows are expected to provide (if desired) the following methods:
 
39
    - add
 
40
    - add_many
 
41
    - interrupt
 
42
    - reset
 
43
    - rollback
 
44
    - run
 
45
    - soft_reset
 
46
    """
 
47
 
 
48
    __metaclass__ = abc.ABCMeta
 
49
 
 
50
    # Common states that certain actions can be performed in. If the flow
 
51
    # is not in these sets of states then it is likely that the flow operation
 
52
    # can not succeed.
 
53
    RESETTABLE_STATES = set([
 
54
        states.INTERRUPTED,
 
55
        states.SUCCESS,
 
56
        states.PENDING,
 
57
        states.FAILURE,
 
58
    ])
 
59
    SOFT_RESETTABLE_STATES = set([
 
60
        states.INTERRUPTED,
 
61
    ])
 
62
    UNINTERRUPTIBLE_STATES = set([
 
63
        states.FAILURE,
 
64
        states.SUCCESS,
 
65
        states.PENDING,
 
66
    ])
 
67
    RUNNABLE_STATES = set([
 
68
        states.PENDING,
 
69
    ])
 
70
 
 
71
    def __init__(self, name, parents=None, uuid=None):
 
72
        self._name = str(name)
 
73
        # The state of this flow.
 
74
        self._state = states.PENDING
 
75
        # If this flow has a parent flow/s which need to be reverted if
 
76
        # this flow fails then please include them here to allow this child
 
77
        # to call the parents...
 
78
        if parents:
 
79
            self.parents = tuple(parents)
 
80
        else:
 
81
            self.parents = ()
 
82
        # Any objects that want to listen when a wf/task starts/stops/completes
 
83
        # or errors should be registered here. This can be used to monitor
 
84
        # progress and record tasks finishing (so that it becomes possible to
 
85
        # store the result of a task in some persistent or semi-persistent
 
86
        # storage backend).
 
87
        self.notifier = utils.TransitionNotifier()
 
88
        self.task_notifier = utils.TransitionNotifier()
 
89
        # Ensure that modifications and/or multiple runs aren't happening
 
90
        # at the same time in the same flow at the same time.
 
91
        self._lock = threading.RLock()
 
92
        # Assign this flow a unique identifer.
 
93
        if uuid:
 
94
            self._id = str(uuid)
 
95
        else:
 
96
            self._id = uuidutils.generate_uuid()
 
97
 
 
98
    @property
 
99
    def name(self):
 
100
        """A non-unique name for this flow (human readable)"""
 
101
        return self._name
 
102
 
 
103
    @property
 
104
    def uuid(self):
 
105
        """Uniquely identifies this flow"""
 
106
        return "f-%s" % (self._id)
 
107
 
 
108
    @property
 
109
    def state(self):
 
110
        """Provides a read-only view of the flow state."""
 
111
        return self._state
 
112
 
 
113
    def _change_state(self, context, new_state):
 
114
        was_changed = False
 
115
        old_state = self.state
 
116
        with self._lock:
 
117
            if self.state != new_state:
 
118
                old_state = self.state
 
119
                self._state = new_state
 
120
                was_changed = True
 
121
        if was_changed:
 
122
            # Don't notify while holding the lock.
 
123
            self.notifier.notify(self.state, details={
 
124
                'context': context,
 
125
                'flow': self,
 
126
                'old_state': old_state,
 
127
            })
 
128
 
 
129
    def __str__(self):
 
130
        lines = ["Flow: %s" % (self.name)]
 
131
        lines.append("%s" % (self.uuid))
 
132
        lines.append("%s" % (len(self.parents)))
 
133
        lines.append("%s" % (self.state))
 
134
        return "; ".join(lines)
 
135
 
 
136
    @abc.abstractmethod
 
137
    def add(self, task):
 
138
        """Adds a given task to this flow.
 
139
 
 
140
        Returns the uuid that is associated with the task for later operations
 
141
        before and after it is ran.
 
142
        """
 
143
        raise NotImplementedError()
 
144
 
 
145
    @decorators.locked
 
146
    def add_many(self, tasks):
 
147
        """Adds many tasks to this flow.
 
148
 
 
149
        Returns a list of uuids (one for each task added).
 
150
        """
 
151
        uuids = []
 
152
        for t in tasks:
 
153
            uuids.append(self.add(t))
 
154
        return uuids
 
155
 
 
156
    def interrupt(self):
 
157
        """Attempts to interrupt the current flow and any tasks that are
 
158
        currently not running in the flow.
 
159
 
 
160
        Returns how many tasks were interrupted (if any).
 
161
        """
 
162
        if self.state in self.UNINTERRUPTIBLE_STATES:
 
163
            raise exc.InvalidStateException(("Can not interrupt when"
 
164
                                             " in state %s") % (self.state))
 
165
        # Note(harlowja): Do *not* acquire the lock here so that the flow may
 
166
        # be interrupted while running. This does mean the the above check may
 
167
        # not be valid but we can worry about that if it becomes an issue.
 
168
        old_state = self.state
 
169
        if old_state != states.INTERRUPTED:
 
170
            self._state = states.INTERRUPTED
 
171
            self.notifier.notify(self.state, details={
 
172
                'context': None,
 
173
                'flow': self,
 
174
                'old_state': old_state,
 
175
            })
 
176
        return 0
 
177
 
 
178
    @decorators.locked
 
179
    def reset(self):
 
180
        """Fully resets the internal state of this flow, allowing for the flow
 
181
        to be ran again.
 
182
 
 
183
        Note: Listeners are also reset.
 
184
        """
 
185
        if self.state not in self.RESETTABLE_STATES:
 
186
            raise exc.InvalidStateException(("Can not reset when"
 
187
                                             " in state %s") % (self.state))
 
188
        self.notifier.reset()
 
189
        self.task_notifier.reset()
 
190
        self._change_state(None, states.PENDING)
 
191
 
 
192
    @decorators.locked
 
193
    def soft_reset(self):
 
194
        """Partially resets the internal state of this flow, allowing for the
 
195
        flow to be ran again from an interrupted state only.
 
196
        """
 
197
        if self.state not in self.SOFT_RESETTABLE_STATES:
 
198
            raise exc.InvalidStateException(("Can not soft reset when"
 
199
                                             " in state %s") % (self.state))
 
200
        self._change_state(None, states.PENDING)
 
201
 
 
202
    @decorators.locked
 
203
    def run(self, context, *args, **kwargs):
 
204
        """Executes the workflow."""
 
205
        if self.state not in self.RUNNABLE_STATES:
 
206
            raise exc.InvalidStateException("Unable to run flow when "
 
207
                                            "in state %s" % (self.state))
 
208
 
 
209
    @decorators.locked
 
210
    def rollback(self, context, cause):
 
211
        """Performs rollback of this workflow and any attached parent workflows
 
212
        if present.
 
213
        """
 
214
        pass