1
from contextlib import contextmanager
6
from .rpc import BaseRPC
13
class TimeoutWatchInProgress(Exception):
17
class BaseWatcher(BaseRPC):
22
def __init__(self, conn, auto_reconnect=True):
24
self.watcher_id = None
26
self.auto_reconnect = auto_reconnect
27
# For debugging, attach the wrapper
31
raise NotImplementedError()
34
raise NotImplementedError()
37
raise NotImplementedError()
39
def get_watcher_id(self, result):
40
raise NotImplementedError()
42
def get_deltas(self, result):
43
raise NotImplementedError()
46
result = self._rpc(self.start_args())
47
self.watcher_id = self.get_watcher_id(result)
52
if self.watcher_id is None:
55
raise StopIteration("Stopped")
57
result = self._rpc(self.next_args())
59
if "state watcher was stopped" in e.message:
60
if not self.auto_reconnect:
62
if not self.reconnect():
66
return self.get_deltas(result)
72
self.watcher_id = None
74
return super(BaseWatcher, self).reconnect()
77
if not self.conn.connected:
80
result = self._rpc(self.stop_args())
81
except (EnvError, socket.error):
82
# We're about to close the connection.
85
self.watcher_id = None
89
def set_context(self, context):
90
self.context = context
99
def __exit__(self, exc, v, t):
103
class BaseTimeoutWatcher(object):
104
# A simple non concurrent watch using signals..
106
def __init__(self, *args, **kw):
107
super(BaseTimeoutWatcher, self).__init__(*args, **kw)
108
self.start_time = time.time()
111
def time_remaining(self):
112
"""Return number of seconds until this watch times out.
115
return int(self._timeout - (time.time() - self.start_time))
117
def set_timeout(self, timeout):
118
self.start_time = time.time()
119
self._timeout = timeout
122
with self._set_alarm(self.time_remaining()):
123
return super(BaseTimeoutWatcher, self).next()
130
def _set_alarm(cls, timeout):
135
handler = signal.getsignal(signal.SIGALRM)
136
if callable(handler):
137
if handler.__name__ == '_set_alarm':
138
raise TimeoutWatchInProgress()
140
"Existing signal handler found %r" % handler)
141
signal.signal(signal.SIGALRM, cls._on_alarm)
142
signal.alarm(timeout)
146
signal.signal(signal.SIGALRM, signal.SIG_DFL)
149
def _on_alarm(cls, x, frame):
153
class WatchWrapper(object):
155
def __init__(self, watch):
158
def run(self, callback=None):
160
with self.watch.set_context(self):
161
for change_set in self.watch:
162
for change in change_set:
163
self.process(*change)
164
if seen_initial and callable(callback):
166
if self.complete() is True:
172
"""process watch events."""
175
"""watch wrapper complete """
178
class BaseWaitForUnits(WatchWrapper):
180
Wait for units of the environment to reach a particular goal state.
182
def __init__(self, watch, state='started', service=None):
183
super(BaseWaitForUnits, self).__init__(watch)
185
self.goal_state = state
186
self.service = service
188
def get_unit_name(self, data):
189
raise NotImplementedError()
191
def get_unit_status(self, data):
192
raise NotImplementedError()
194
def process(self, entity_type, change, data):
195
if entity_type != "unit":
198
unit_name = self.get_unit_name(data)
199
if change == "remove" and unit_name in self.units:
200
del self.units[unit_name]
202
self.units[unit_name] = data
205
state = {'pending': [], 'errors': []}
207
for k, v in list(self.units.items()):
208
status = self.get_unit_status(v)
209
if status == "error":
210
state['errors'] = [v]
211
elif status != self.goal_state:
212
state['pending'] = [v]
214
if not state['pending'] and not state['errors']:
217
if state['errors'] and not self.goal_state == "removed":
218
raise UnitErrors(state['errors'])
220
return state['pending']
223
class BaseWaitForNoMachines(WatchWrapper):
225
Wait for all non state servers to be terminated.
228
def __init__(self, watch, initial_machines=None):
229
super(BaseWaitForNoMachines, self).__init__(watch)
230
self.machines = initial_machines or {}
232
def get_machine_id(self, data):
233
raise NotImplementedError()
236
raise NotImplementedError()
238
def process(self, entity_type, change, data):
239
if entity_type != 'machine':
242
machine_id = self.get_machine_id(data)
243
if change == 'remove' and machine_id in self.machines:
244
del self.machines[machine_id]
246
self.machines[machine_id] = data