~juju-deployers/python-jujuclient/trunk

« back to all changes in this revision

Viewing changes to jujuclient/watch.py

  • Committer: Tim Van Steenburgh
  • Date: 2016-07-12 02:28:46 UTC
  • Revision ID: tvansteenburgh@gmail.com-20160712022846-g14fe37eild7gk2e
juju-2.0beta11 compatibility

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from contextlib import contextmanager
 
2
import signal
 
3
import socket
 
4
import time
 
5
 
 
6
from .rpc import BaseRPC
 
7
from .exc import (
 
8
    EnvError,
 
9
    UnitErrors,
 
10
)
 
11
 
 
12
 
 
13
class TimeoutWatchInProgress(Exception):
 
14
    pass
 
15
 
 
16
 
 
17
class BaseWatcher(BaseRPC):
 
18
 
 
19
    _auth = True
 
20
    version = 0
 
21
 
 
22
    def __init__(self, conn, auto_reconnect=True):
 
23
        self.conn = conn
 
24
        self.watcher_id = None
 
25
        self.running = False
 
26
        self.auto_reconnect = auto_reconnect
 
27
        # For debugging, attach the wrapper
 
28
        self.context = None
 
29
 
 
30
    def start_args(self):
 
31
        raise NotImplementedError()
 
32
 
 
33
    def next_args(self):
 
34
        raise NotImplementedError()
 
35
 
 
36
    def stop_args(self):
 
37
        raise NotImplementedError()
 
38
 
 
39
    def get_watcher_id(self, result):
 
40
        raise NotImplementedError()
 
41
 
 
42
    def get_deltas(self, result):
 
43
        raise NotImplementedError()
 
44
 
 
45
    def start(self):
 
46
        result = self._rpc(self.start_args())
 
47
        self.watcher_id = self.get_watcher_id(result)
 
48
        self.running = True
 
49
        return result
 
50
 
 
51
    def next(self):
 
52
        if self.watcher_id is None:
 
53
            self.start()
 
54
        if not self.running:
 
55
            raise StopIteration("Stopped")
 
56
        try:
 
57
            result = self._rpc(self.next_args())
 
58
        except EnvError as e:
 
59
            if "state watcher was stopped" in e.message:
 
60
                if not self.auto_reconnect:
 
61
                    raise
 
62
                if not self.reconnect():
 
63
                    raise
 
64
                return next(self)
 
65
            raise
 
66
        return self.get_deltas(result)
 
67
 
 
68
    # py3 compat
 
69
    __next__ = next
 
70
 
 
71
    def reconnect(self):
 
72
        self.watcher_id = None
 
73
        self.running = False
 
74
        return super(BaseWatcher, self).reconnect()
 
75
 
 
76
    def stop(self):
 
77
        if not self.conn.connected:
 
78
            return
 
79
        try:
 
80
            result = self._rpc(self.stop_args())
 
81
        except (EnvError, socket.error):
 
82
            # We're about to close the connection.
 
83
            result = None
 
84
        self.conn.close()
 
85
        self.watcher_id = None
 
86
        self.running = False
 
87
        return result
 
88
 
 
89
    def set_context(self, context):
 
90
        self.context = context
 
91
        return self
 
92
 
 
93
    def __iter__(self):
 
94
        return self
 
95
 
 
96
    def __enter__(self):
 
97
        return self
 
98
 
 
99
    def __exit__(self, exc, v, t):
 
100
        self.stop()
 
101
 
 
102
 
 
103
class BaseTimeoutWatcher(object):
 
104
    # A simple non concurrent watch using signals..
 
105
 
 
106
    def __init__(self, *args, **kw):
 
107
        super(BaseTimeoutWatcher, self).__init__(*args, **kw)
 
108
        self.start_time = time.time()
 
109
        self._timeout = 0
 
110
 
 
111
    def time_remaining(self):
 
112
        """Return number of seconds until this watch times out.
 
113
 
 
114
        """
 
115
        return int(self._timeout - (time.time() - self.start_time))
 
116
 
 
117
    def set_timeout(self, timeout):
 
118
        self.start_time = time.time()
 
119
        self._timeout = timeout
 
120
 
 
121
    def next(self):
 
122
        with self._set_alarm(self.time_remaining()):
 
123
            return super(BaseTimeoutWatcher, self).next()
 
124
 
 
125
    # py3 compat
 
126
    __next__ = next
 
127
 
 
128
    @classmethod
 
129
    @contextmanager
 
130
    def _set_alarm(cls, timeout):
 
131
        if timeout < 0:
 
132
            raise TimeoutError()
 
133
 
 
134
        try:
 
135
            handler = signal.getsignal(signal.SIGALRM)
 
136
            if callable(handler):
 
137
                if handler.__name__ == '_set_alarm':
 
138
                    raise TimeoutWatchInProgress()
 
139
                raise RuntimeError(
 
140
                    "Existing signal handler found %r" % handler)
 
141
            signal.signal(signal.SIGALRM, cls._on_alarm)
 
142
            signal.alarm(timeout)
 
143
            yield None
 
144
        finally:
 
145
            signal.alarm(0)
 
146
            signal.signal(signal.SIGALRM, signal.SIG_DFL)
 
147
 
 
148
    @classmethod
 
149
    def _on_alarm(cls, x, frame):
 
150
        raise TimeoutError()
 
151
 
 
152
 
 
153
class WatchWrapper(object):
 
154
 
 
155
    def __init__(self, watch):
 
156
        self.watch = watch
 
157
 
 
158
    def run(self, callback=None):
 
159
        seen_initial = False
 
160
        with self.watch.set_context(self):
 
161
            for change_set in self.watch:
 
162
                for change in change_set:
 
163
                    self.process(*change)
 
164
                    if seen_initial and callable(callback):
 
165
                        callback(*change)
 
166
                if self.complete() is True:
 
167
                    self.watch.stop()
 
168
                    break
 
169
                seen_initial = True
 
170
 
 
171
    def process(self):
 
172
        """process watch events."""
 
173
 
 
174
    def complete(self):
 
175
        """watch wrapper complete """
 
176
 
 
177
 
 
178
class BaseWaitForUnits(WatchWrapper):
 
179
    """
 
180
    Wait for units of the environment to reach a particular goal state.
 
181
    """
 
182
    def __init__(self, watch, state='started', service=None):
 
183
        super(BaseWaitForUnits, self).__init__(watch)
 
184
        self.units = {}
 
185
        self.goal_state = state
 
186
        self.service = service
 
187
 
 
188
    def get_unit_name(self, data):
 
189
        raise NotImplementedError()
 
190
 
 
191
    def get_unit_status(self, data):
 
192
        raise NotImplementedError()
 
193
 
 
194
    def process(self, entity_type, change, data):
 
195
        if entity_type != "unit":
 
196
            return
 
197
 
 
198
        unit_name = self.get_unit_name(data)
 
199
        if change == "remove" and unit_name in self.units:
 
200
            del self.units[unit_name]
 
201
        else:
 
202
            self.units[unit_name] = data
 
203
 
 
204
    def complete(self):
 
205
        state = {'pending': [], 'errors': []}
 
206
 
 
207
        for k, v in list(self.units.items()):
 
208
            status = self.get_unit_status(v)
 
209
            if status == "error":
 
210
                state['errors'] = [v]
 
211
            elif status != self.goal_state:
 
212
                state['pending'] = [v]
 
213
 
 
214
        if not state['pending'] and not state['errors']:
 
215
            return True
 
216
 
 
217
        if state['errors'] and not self.goal_state == "removed":
 
218
            raise UnitErrors(state['errors'])
 
219
 
 
220
        return state['pending']
 
221
 
 
222
 
 
223
class BaseWaitForNoMachines(WatchWrapper):
 
224
    """
 
225
    Wait for all non state servers to be terminated.
 
226
    """
 
227
 
 
228
    def __init__(self, watch, initial_machines=None):
 
229
        super(BaseWaitForNoMachines, self).__init__(watch)
 
230
        self.machines = initial_machines or {}
 
231
 
 
232
    def get_machine_id(self, data):
 
233
        raise NotImplementedError()
 
234
 
 
235
    def complete(self):
 
236
        raise NotImplementedError()
 
237
 
 
238
    def process(self, entity_type, change, data):
 
239
        if entity_type != 'machine':
 
240
            return
 
241
 
 
242
        machine_id = self.get_machine_id(data)
 
243
        if change == 'remove' and machine_id in self.machines:
 
244
            del self.machines[machine_id]
 
245
        else:
 
246
            self.machines[machine_id] = data