1
# Copyright 2014-2015 Canonical Limited.
3
# This file is part of charm-helpers.
5
# charm-helpers is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Lesser General Public License version 3 as
7
# published by the Free Software Foundation.
9
# charm-helpers is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU Lesser General Public License for more details.
14
# You should have received a copy of the GNU Lesser General Public License
15
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
17
The coordinator module allows you to use Juju's leadership feature to
18
coordinate operations between units of a service.
20
Behavior is defined in subclasses of coordinator.BaseCoordinator.
21
One implementation is provided (coordinator.Serial), which allows an
22
operation to be run on a single unit at a time, on a first come, first
23
served basis. You can trivially define more complex behavior by
24
subclassing BaseCoordinator or Serial.
26
:author: Stuart Bishop <stuart.bishop@canonical.com>
29
Services Framework Usage
30
========================
32
Ensure a peers relation is defined in metadata.yaml. Instantiate a
33
BaseCoordinator subclass before invoking ServiceManager.manage().
34
Ensure that ServiceManager.manage() is wired up to the leader-elected,
35
leader-settings-changed, peers relation-changed and peers
36
relation-departed hooks in addition to any other hooks you need, or your
37
service will deadlock.
39
Ensure calls to acquire() are guarded, so that locks are only requested
40
when they are really needed (and thus hooks only triggered when necessary).
41
Failing to do this and calling acquire() unconditionally will put your unit
42
into a hook loop. Calls to granted() do not need to be guarded.
46
from charmhelpers.core import hookenv, services
47
from charmhelpers import coordinator
49
def maybe_restart(servicename):
50
serial = coordinator.Serial()
52
serial.acquire('restart')
53
if serial.granted('restart'):
54
hookenv.service_restart(servicename)
56
services = [dict(service='servicename',
57
data_ready=[maybe_restart])]
59
if __name__ == '__main__':
60
_ = coordinator.Serial() # Must instantiate before manager.manage()
61
manager = services.ServiceManager(services)
65
You can implement a similar pattern using a decorator. If the lock has
66
not been granted, an attempt to acquire() it will be made if the guard
67
function returns True. If the lock has been granted, the decorated function
70
from charmhelpers.core import hookenv, services
71
from charmhelpers import coordinator
73
serial = coordinator.Serial() # Global, instatiated on module import.
76
[ ... Introspect state. Return True if restart is needed ... ]
78
@serial.require('restart', needs_restart)
79
def maybe_restart(servicename):
80
hookenv.service_restart(servicename)
82
services = [dict(service='servicename',
83
data_ready=[maybe_restart])]
85
if __name__ == '__main__':
86
manager = services.ServiceManager(services)
93
Ensure a peers relation is defined in metadata.yaml.
95
If you are using charmhelpers.core.hookenv.Hooks, ensure that a
96
BaseCoordinator subclass is instantiated before calling Hooks.execute.
98
If you are not using charmhelpers.core.hookenv.Hooks, ensure
99
that a BaseCoordinator subclass is instantiated and its handle()
100
method called at the start of all your hooks.
105
from charmhelpers.core import hookenv
106
from charmhelpers import coordinator
108
hooks = hookenv.Hooks()
111
serial = coordinator.Serial()
112
if serial.granted('restart'):
113
hookenv.service_restart('myservice')
116
def config_changed():
118
serial = coordinator.Serial()
120
serial.acquire('restart'):
123
# Cluster hooks must be wired up.
124
@hooks.hook('cluster-relation-changed', 'cluster-relation-departed')
125
def cluster_relation_changed():
128
# Leader hooks must be wired up.
129
@hooks.hook('leader-elected', 'leader-settings-changed')
130
def leader_settings_changed():
133
[ ... repeat for *all* other hooks you are using ... ]
135
if __name__ == '__main__':
136
_ = coordinator.Serial() # Must instantiate before execute()
137
hooks.execute(sys.argv)
140
You can also use the require decorator. If the lock has not been granted,
141
an attempt to acquire() it will be made if the guard function returns True.
142
If the lock has been granted, the decorated function is run as normal::
144
from charmhelpers.core import hookenv
146
hooks = hookenv.Hooks()
147
serial = coordinator.Serial() # Must instantiate before execute()
149
@require('restart', needs_restart)
151
hookenv.service_restart('myservice')
153
@hooks.hook('install', 'config-changed', 'upgrade-charm',
154
# Peers and leader hooks must be wired up.
155
'cluster-relation-changed', 'cluster-relation-departed',
156
'leader-elected', 'leader-settings-changed')
161
if __name__ == '__main__':
168
A simple API is provided similar to traditional locking APIs. A lock
169
may be requested using the acquire() method, and the granted() method
170
may be used do to check if a lock previously requested by acquire() has
171
been granted. It doesn't matter how many times acquire() is called in a
174
Locks are released at the end of the hook they are acquired in. This may
175
be the current hook if the unit is leader and the lock is free. It is
176
more likely a future hook (probably leader-settings-changed, possibly
177
the peers relation-changed or departed hook, potentially any hook).
179
Whenever a charm needs to perform a coordinated action it will acquire()
180
the lock and perform the action immediately if acquisition is
181
successful. It will also need to perform the same action in every other
182
hook if the lock has been granted.
188
Why do you need to be able to perform the same action in every hook?
189
If the unit is the leader, then it may be able to grant its own lock
190
and perform the action immediately in the source hook. If the unit is
191
the leader and cannot immediately grant the lock, then its only
192
guaranteed chance of acquiring the lock is in the peers relation-joined,
193
relation-changed or peers relation-departed hooks when another unit has
194
released it (the only channel to communicate to the leader is the peers
195
relation). If the unit is not the leader, then it is unlikely the lock
196
is granted in the source hook (a previous hook must have also made the
197
request for this to happen). A non-leader is notified about the lock via
198
leader settings. These changes may be visible in any hook, even before
199
the leader-settings-changed hook has been invoked. Or the requesting
200
unit may be promoted to leader after making a request, in which case the
201
lock may be granted in leader-elected or in a future peers
202
relation-changed or relation-departed hook.
204
This could be simpler if leader-settings-changed was invoked on the
205
leader. We could then never grant locks except in
206
leader-settings-changed hooks giving one place for the operation to be
207
performed. Unfortunately this is not the case with Juju 1.23 leadership.
209
But of course, this doesn't really matter to most people as most people
210
seem to prefer the Services Framework or similar reset-the-world
211
approaches, rather than the twisty maze of attempting to deduce what
212
should be done based on what hook happens to be running (which always
213
seems to evolve into reset-the-world anyway when the charm grows beyond
216
I chose not to implement a callback model, where a callback was passed
217
to acquire to be executed when the lock is granted, because the callback
218
may become invalid between making the request and the lock being granted
219
due to an upgrade-charm being run in the interim. And it would create
220
restrictions, such no lambdas, callback defined at the top level of a
221
module, etc. Still, we could implement it on top of what is here, eg.
222
by adding a defer decorator that stores a pickle of itself to disk and
223
have BaseCoordinator unpickle and execute them when the locks are granted.
225
from datetime import datetime
226
from functools import wraps
230
from six import with_metaclass
232
from charmhelpers.core import hookenv
235
# We make BaseCoordinator and subclasses singletons, so that if we
236
# need to spill to local storage then only a single instance does so,
237
# rather than having multiple instances stomp over each other.
238
class Singleton(type):
241
def __call__(cls, *args, **kwargs):
242
if cls not in cls._instances:
243
cls._instances[cls] = super(Singleton, cls).__call__(*args,
245
return cls._instances[cls]
248
class BaseCoordinator(with_metaclass(Singleton, object)):
249
relid = None # Peer relation-id, set by __init__
252
grants = None # self.grants[unit][lock] == timestamp
253
requests = None # self.requests[unit][lock] == timestamp
255
def __init__(self, relation_key='coordinator', peer_relation_name=None):
256
'''Instatiate a Coordinator.
258
Data is stored on the peers relation and in leadership storage
259
under the provided relation_key.
261
The peers relation is identified by peer_relation_name, and defaults
262
to the first one found in metadata.yaml.
264
# Most initialization is deferred, since invoking hook tools from
265
# the constructor makes testing hard.
266
self.key = relation_key
267
self.relname = peer_relation_name
268
hookenv.atstart(self.initialize)
270
# Ensure that handle() is called, without placing that burden on
271
# the charm author. They still need to do this manually if they
272
# are not using a hook framework.
273
hookenv.atstart(self.handle)
275
def initialize(self):
276
if self.requests is not None:
277
return # Already initialized.
279
assert hookenv.has_juju_version('1.23'), 'Needs Juju 1.23+'
281
if self.relname is None:
282
self.relname = _implicit_peer_relation_name()
284
relids = hookenv.relation_ids(self.relname)
286
self.relid = sorted(relids)[0]
288
# Load our state, from leadership, the peer relationship, and maybe
289
# local state as a fallback. Populates self.requests and self.grants.
293
# Save our state if the hook completes successfully.
294
hookenv.atexit(self._save_state)
296
# Schedule release of granted locks for the end of the hook.
297
# This needs to be the last of our atexit callbacks to ensure
298
# it will be run first when the hook is complete, because there
299
# is no point mutating our state after it has been saved.
300
hookenv.atexit(self._release_granted)
302
def acquire(self, lock):
303
'''Acquire the named lock, non-blocking.
305
The lock may be granted immediately, or in a future hook.
307
Returns True if the lock has been granted. The lock will be
308
automatically released at the end of the hook in which it is
311
Do not mindlessly call this method, as it triggers a cascade of
312
hooks. For example, if you call acquire() every time in your
313
peers relation-changed hook you will end up with an infinite loop
314
of hooks. It should almost always be guarded by some condition.
316
unit = hookenv.local_unit()
317
ts = self.requests[unit].get(lock)
319
# If there is no outstanding request on the peers relation,
321
self.requests.setdefault(lock, {})
322
self.requests[unit][lock] = _timestamp()
323
self.msg('Requested {}'.format(lock))
325
# If the leader has granted the lock, yay.
326
if self.granted(lock):
327
self.msg('Acquired {}'.format(lock))
330
# If the unit making the request also happens to be the
331
# leader, it must handle the request now. Even though the
332
# request has been stored on the peers relation, the peers
333
# relation-changed hook will not be triggered.
334
if hookenv.is_leader():
335
return self.grant(lock, unit)
337
return False # Can't acquire lock, yet. Maybe next hook.
339
def granted(self, lock):
340
'''Return True if a previously requested lock has been granted'''
341
unit = hookenv.local_unit()
342
ts = self.requests[unit].get(lock)
343
if ts and self.grants.get(unit, {}).get(lock) == ts:
347
def requested(self, lock):
348
'''Return True if we are in the queue for the lock'''
349
return lock in self.requests[hookenv.local_unit()]
351
def request_timestamp(self, lock):
352
'''Return the timestamp of our outstanding request for lock, or None.
354
Returns a datetime.datetime() UTC timestamp, with no tzinfo attribute.
356
ts = self.requests[hookenv.local_unit()].get(lock, None)
358
return datetime.strptime(ts, _timestamp_format)
361
if not hookenv.is_leader():
362
return # Only the leader can grant requests.
364
self.msg('Leader handling coordinator requests')
366
# Clear our grants that have been released.
367
for unit in self.grants.keys():
368
for lock, grant_ts in list(self.grants[unit].items()):
369
req_ts = self.requests.get(unit, {}).get(lock)
370
if req_ts != grant_ts:
371
# The request timestamp does not match the granted
372
# timestamp. Several hooks on 'unit' may have run
373
# before the leader got a chance to make a decision,
374
# and 'unit' may have released its lock and attempted
375
# to reacquire it. This will change the timestamp,
376
# and we correctly revoke the old grant putting it
377
# to the end of the queue.
378
ts = datetime.strptime(self.grants[unit][lock],
380
del self.grants[unit][lock]
381
self.released(unit, lock, ts)
384
for unit in self.requests.keys():
385
for lock in self.requests[unit]:
386
self.grant(lock, unit)
388
def grant(self, lock, unit):
389
'''Maybe grant the lock to a unit.
391
The decision to grant the lock or not is made for $lock
392
by a corresponding method grant_$lock, which you may define
393
in a subclass. If no such method is defined, the default_grant
394
method is used. See Serial.default_grant() for details.
396
if not hookenv.is_leader():
397
return False # Not the leader, so we cannot grant.
399
# Set of units already granted the lock.
401
for u in self.grants:
402
if lock in self.grants[u]:
405
return True # Already granted.
407
# Ordered list of units waiting for the lock.
409
for u in self.requests:
411
continue # In the granted set. Not wanted in the req list.
412
for l, ts in self.requests[u].items():
415
queue = [t[1] for t in sorted(reqs)]
416
if unit not in queue:
417
return False # Unit has not requested the lock.
419
# Locate custom logic, or fallback to the default.
420
grant_func = getattr(self, 'grant_{}'.format(lock), self.default_grant)
422
if grant_func(lock, unit, granted, queue):
424
self.msg('Leader grants {} to {}'.format(lock, unit))
425
self.grants.setdefault(unit, {})[lock] = self.requests[unit][lock]
430
def released(self, unit, lock, timestamp):
431
'''Called on the leader when it has released a lock.
433
By default, does nothing but log messages. Override if you
434
need to perform additional housekeeping when a lock is released,
435
for example recording timestamps.
437
interval = _utcnow() - timestamp
438
self.msg('Leader released {} from {}, held {}'.format(lock, unit,
441
def require(self, lock, guard_func, *guard_args, **guard_kw):
442
"""Decorate a function to be run only when a lock is acquired.
444
The lock is requested if the guard function returns True.
446
The decorated function is called if the lock has been granted.
450
def wrapper(*args, **kw):
451
if self.granted(lock):
452
self.msg('Granted {}'.format(lock))
453
return f(*args, **kw)
454
if guard_func(*guard_args, **guard_kw) and self.acquire(lock):
455
return f(*args, **kw)
461
'''Emit a message. Override to customize log spam.'''
462
hookenv.log('coordinator.{} {}'.format(self._name(), msg),
466
return self.__class__.__name__
468
def _load_state(self):
469
self.msg('Loading state'.format(self._name()))
471
# All responses must be stored in the leadership settings.
472
# The leader cannot use local state, as a different unit may
473
# be leader next time. Which is fine, as the leadership
474
# settings are always available.
475
self.grants = json.loads(hookenv.leader_get(self.key) or '{}')
477
local_unit = hookenv.local_unit()
479
# All requests must be stored on the peers relation. This is
480
# the only channel units have to communicate with the leader.
481
# Even the leader needs to store its requests here, as a
482
# different unit may be leader by the time the request can be
484
if self.relid is None:
485
# The peers relation is not available. Maybe we are early in
486
# the units's lifecycle. Maybe this unit is standalone.
487
# Fallback to using local state.
488
self.msg('No peer relation. Loading local state')
489
self.requests = {local_unit: self._load_local_state()}
491
self.requests = self._load_peer_state()
492
if local_unit not in self.requests:
493
# The peers relation has just been joined. Update any state
494
# loaded from our peers with our local state.
495
self.msg('New peer relation. Merging local state')
496
self.requests[local_unit] = self._load_local_state()
498
def _emit_state(self):
499
# Emit this units lock status.
500
for lock in sorted(self.requests[hookenv.local_unit()].keys()):
501
if self.granted(lock):
502
self.msg('Granted {}'.format(lock))
504
self.msg('Waiting on {}'.format(lock))
506
def _save_state(self):
507
self.msg('Publishing state'.format(self._name()))
508
if hookenv.is_leader():
509
# sort_keys to ensure stability.
510
raw = json.dumps(self.grants, sort_keys=True)
511
hookenv.leader_set({self.key: raw})
513
local_unit = hookenv.local_unit()
515
if self.relid is None:
516
# No peers relation yet. Fallback to local state.
517
self.msg('No peer relation. Saving local state')
518
self._save_local_state(self.requests[local_unit])
520
# sort_keys to ensure stability.
521
raw = json.dumps(self.requests[local_unit], sort_keys=True)
522
hookenv.relation_set(self.relid, relation_settings={self.key: raw})
524
def _load_peer_state(self):
526
units = set(hookenv.related_units(self.relid))
527
units.add(hookenv.local_unit())
529
raw = hookenv.relation_get(self.key, unit, self.relid)
531
requests[unit] = json.loads(raw)
534
def _local_state_filename(self):
535
# Include the class name. We allow multiple BaseCoordinator
536
# subclasses to be instantiated, and they are singletons, so
537
# this avoids conflicts (unless someone creates and uses two
538
# BaseCoordinator subclasses with the same class name, so don't
540
return '.charmhelpers.coordinator.{}'.format(self._name())
542
def _load_local_state(self):
543
fn = self._local_state_filename()
544
if os.path.exists(fn):
545
with open(fn, 'r') as f:
549
def _save_local_state(self, state):
550
fn = self._local_state_filename()
551
with open(fn, 'w') as f:
554
def _release_granted(self):
555
# At the end of every hook, release all locks granted to
556
# this unit. If a hook neglects to make use of what it
557
# requested, it will just have to make the request again.
558
# Implicit release is the only way this will work, as
559
# if the unit is standalone there may be no future triggers
560
# called to do a manual release.
561
unit = hookenv.local_unit()
562
for lock in list(self.requests[unit].keys()):
563
if self.granted(lock):
564
self.msg('Released local {} lock'.format(lock))
565
del self.requests[unit][lock]
568
class Serial(BaseCoordinator):
569
def default_grant(self, lock, unit, granted, queue):
570
'''Default logic to grant a lock to a unit. Unless overridden,
571
only one unit may hold the lock and it will be granted to the
572
earliest queued request.
574
To define custom logic for $lock, create a subclass and
575
define a grant_$lock method.
577
`unit` is the unit name making the request.
579
`granted` is the set of units already granted the lock. It will
580
never include `unit`. It may be empty.
582
`queue` is the list of units waiting for the lock, ordered by time
583
of request. It will always include `unit`, but `unit` is not
586
Returns True if the lock should be granted to `unit`.
588
return unit == queue[0] and not granted
591
def _implicit_peer_relation_name():
592
md = hookenv.metadata()
593
assert 'peers' in md, 'No peer relations in metadata.yaml'
594
return sorted(md['peers'].keys())[0]
597
# A human readable, sortable UTC timestamp format.
598
_timestamp_format = '%Y-%m-%d %H:%M:%S.%fZ'
601
def _utcnow(): # pragma: no cover
602
# This wrapper exists as mocking datetime methods is problematic.
603
return datetime.utcnow()
607
return _utcnow().strftime(_timestamp_format)