1
# This file is part of the Juju GUI, which lets users view and manage Juju
2
# environments within a graphical interface (https://launchpad.net/juju-gui).
3
# Copyright (C) 2013 Canonical Ltd.
5
# This program is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU Affero General Public License version 3, as published by
7
# the Free Software Foundation.
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranties of MERCHANTABILITY,
11
# SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12
# Affero General Public License for more details.
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
"""Juju GUI server watchers."""
19
from concurrent.futures import Future
22
class WatcherError(Exception):
23
"""Errors in the execution of the watcher methods."""
26
class AsyncWatcher(object):
27
"""An asynchronous watcher implementation returning Futures.
29
Creating the watcher and putting changes in it is straightforward:
31
watcher = AsyncWatcher()
32
watcher.put('a change')
34
Listeners can ask to be notified of the next changes using a watcher
35
identifier (any hashable object, usually an integer number):
37
changes_future = watcher.next(42)
39
A request for changes returns a Future whose result is a list of changes
40
not yet seen by the listener identified by the watcher id (42).
41
If the watcher already includes changes that are new for a specific
42
listener, the future is suddenly fired; otherwise, a call to
43
changes_future.result() blocks until a new change is made available.
44
Use this watcher in combination with Tornado's gen.coroutine decorator in
45
order to suspend the function execution (and release the IO loop) until a
46
change is available, e.g.:
49
def my function(watcher):
50
changes = yield watcher.next(42)
51
print('New changes:', changes)
53
A watcher can be closed with a final change by invoking its close() method.
54
When a watcher is closed, it is no longer possible to put new changes in
55
it, and subsequent listeners will receive only the closing change.
62
# The _futures attribute maps watcher identifiers to pending Futures.
64
# The _positions attribute maps watcher identifiers to the
65
# corresponding position in the changes list.
68
def _fire_futures(self, changes):
69
"""Set a result to all pending Futures.
71
Update the position for all involved listeners.
73
position = len(self._changes)
74
for watcher_id, future in self._futures.items():
75
self._positions[watcher_id] = position
76
future.set_result(changes)
81
"""Return True if the watcher is empty, False otherwise."""
82
return not self._changes
84
def next(self, watcher_id):
85
"""Subscribe the given watcher id to the watcher, requesting changes.
87
Return a Future whose result is a list of unseen changes.
89
if watcher_id in self._futures:
91
'watcher {} is already waiting for changes'.format(watcher_id))
94
future.set_result(self._changes)
96
position = len(self._changes)
97
watcher_position = self._positions.get(watcher_id, 0)
98
if watcher_position < position:
99
# There are already unseen changes to send.
100
missing_changes = self._changes[watcher_position:]
101
future.set_result(missing_changes)
102
self._positions[watcher_id] = position
104
# There are not unseen changes, the returned future will be
105
# probably fired later.
106
self._futures[watcher_id] = future
110
"""Return the last notified change.
112
Raise an error if the watcher is empty.
115
return self._changes[-1]
116
raise WatcherError('the watcher is empty')
118
def put(self, change):
119
"""Put a change into the watcher."""
121
raise WatcherError('unable to put changes in a closed watcher')
122
self._changes.append(change)
123
self._fire_futures([change])
125
def close(self, change):
126
"""Close the watcher with the given closing message."""
128
raise WatcherError('the watcher is already closed')
130
self._changes = [change]
131
self._fire_futures([change])