1
// Copyright 2015 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
9
"github.com/juju/errors"
10
"github.com/juju/loggo"
11
"gopkg.in/juju/names.v2"
14
"github.com/juju/juju/core/leadership"
17
var logger = loggo.GetLogger("juju.worker.leadership")
21
claimer leadership.Claimer
23
applicationName string
24
duration time.Duration
27
claimLease chan struct{}
28
renewLease <-chan time.Time
29
claimTickets chan chan bool
30
waitLeaderTickets chan chan bool
31
waitMinionTickets chan chan bool
32
waitingLeader []chan bool
33
waitingMinion []chan bool
36
// NewTracker returns a *Tracker that attempts to claim and retain service
37
// leadership for the supplied unit. It will claim leadership for twice the
38
// supplied duration, and once it's leader it will renew leadership every
39
// time the duration elapses.
40
// Thus, successful leadership claims on the resulting Tracker will guarantee
41
// leadership for the duration supplied here without generating additional
42
// calls to the supplied manager (which may very well be on the other side of
43
// a network connection).
44
func NewTracker(tag names.UnitTag, claimer leadership.Claimer, duration time.Duration) *Tracker {
46
serviceName, _ := names.UnitApplication(unitName)
49
applicationName: serviceName,
52
claimTickets: make(chan chan bool),
53
waitLeaderTickets: make(chan chan bool),
54
waitMinionTickets: make(chan chan bool),
59
for _, ticketCh := range t.waitingLeader {
62
for _, ticketCh := range t.waitingMinion {
67
// TODO: jam 2015-04-02 is this the most elegant way to make
68
// sure we shutdown cleanly? Essentially the lowest level sees
69
// that we are dying, and propagates an ErrDying up to us so
70
// that we shut down, which we then are passing back into
72
// Tomb.Kill() special cases the exact object ErrDying, and has
73
// no idea about errors.Cause and the general errors.Trace
74
// mechanisms that we use.
75
// So we explicitly unwrap before calling tomb.Kill() else
76
// tomb.Stop() thinks that we have a genuine error.
77
switch cause := errors.Cause(err); cause {
86
// Kill is part of the worker.Worker interface.
87
func (t *Tracker) Kill() {
91
// Wait is part of the worker.Worker interface.
92
func (t *Tracker) Wait() error {
96
// ApplicationName is part of the leadership.Tracker interface.
97
func (t *Tracker) ApplicationName() string {
98
return t.applicationName
101
// ClaimDuration is part of the leadership.Tracker interface.
102
func (t *Tracker) ClaimDuration() time.Duration {
106
// ClaimLeader is part of the leadership.Tracker interface.
107
func (t *Tracker) ClaimLeader() leadership.Ticket {
108
return t.submit(t.claimTickets)
111
// WaitLeader is part of the leadership.Tracker interface.
112
func (t *Tracker) WaitLeader() leadership.Ticket {
113
return t.submit(t.waitLeaderTickets)
116
// WaitMinion is part of the leadership.Tracker interface.
117
func (t *Tracker) WaitMinion() leadership.Ticket {
118
return t.submit(t.waitMinionTickets)
121
func (t *Tracker) loop() error {
122
logger.Debugf("%s making initial claim for %s leadership", t.unitName, t.applicationName)
123
if err := t.refresh(); err != nil {
124
return errors.Trace(err)
128
case <-t.tomb.Dying():
131
logger.Debugf("%s claiming lease for %s leadership", t.unitName, t.applicationName)
133
if err := t.refresh(); err != nil {
134
return errors.Trace(err)
137
logger.Debugf("%s renewing lease for %s leadership", t.unitName, t.applicationName)
139
if err := t.refresh(); err != nil {
140
return errors.Trace(err)
142
case ticketCh := <-t.claimTickets:
143
logger.Debugf("%s got claim request for %s leadership", t.unitName, t.applicationName)
144
if err := t.resolveClaim(ticketCh); err != nil {
145
return errors.Trace(err)
147
case ticketCh := <-t.waitLeaderTickets:
148
logger.Debugf("%s got wait request for %s leadership", t.unitName, t.applicationName)
149
if err := t.resolveWaitLeader(ticketCh); err != nil {
150
return errors.Trace(err)
152
case ticketCh := <-t.waitMinionTickets:
153
logger.Debugf("%s got wait request for %s leadership loss", t.unitName, t.applicationName)
154
if err := t.resolveWaitMinion(ticketCh); err != nil {
155
return errors.Trace(err)
161
// refresh makes a leadership request, and updates Tracker state to conform to
162
// latest known reality.
163
func (t *Tracker) refresh() error {
164
logger.Debugf("checking %s for %s leadership", t.unitName, t.applicationName)
165
leaseDuration := 2 * t.duration
166
// TODO(fwereade): 2016-03-17 lp:1558657
167
untilTime := time.Now().Add(leaseDuration)
168
err := t.claimer.ClaimLeadership(t.applicationName, t.unitName, leaseDuration)
171
return t.setLeader(untilTime)
172
case errors.Cause(err) == leadership.ErrClaimDenied:
175
return errors.Annotatef(err, "leadership failure")
178
// setLeader arranges for lease renewal.
179
func (t *Tracker) setLeader(untilTime time.Time) error {
180
logger.Debugf("%s confirmed for %s leadership until %s", t.unitName, t.applicationName, untilTime)
181
renewTime := untilTime.Add(-t.duration)
182
logger.Infof("%s will renew %s leadership at %s", t.unitName, t.applicationName, renewTime)
185
// TODO(fwereade): 2016-03-17 lp:1558657
186
t.renewLease = time.After(renewTime.Sub(time.Now()))
188
for len(t.waitingLeader) > 0 {
189
logger.Debugf("notifying %s ticket of impending %s leadership", t.unitName, t.applicationName)
190
var ticketCh chan bool
191
ticketCh, t.waitingLeader = t.waitingLeader[0], t.waitingLeader[1:]
192
defer close(ticketCh)
193
if err := t.sendTrue(ticketCh); err != nil {
194
return errors.Trace(err)
200
// setMinion arranges for lease acquisition when there's an opportunity.
201
func (t *Tracker) setMinion() error {
202
logger.Infof("%s leadership for %s denied", t.applicationName, t.unitName)
205
if t.claimLease == nil {
206
t.claimLease = make(chan struct{})
208
defer close(t.claimLease)
209
logger.Debugf("%s waiting for %s leadership release", t.unitName, t.applicationName)
210
err := t.claimer.BlockUntilLeadershipReleased(t.applicationName)
212
logger.Debugf("error while %s waiting for %s leadership release: %v", t.unitName, t.applicationName, err)
214
// We don't need to do anything else with the error, because we just
215
// close the claimLease channel and trigger a leadership claim on the
216
// main loop; if anything's gone seriously wrong we'll find out right
217
// away and shut down anyway. (And if this goroutine outlives the
218
// Tracker, it keeps it around as a zombie, but I don't see a way
223
for len(t.waitingMinion) > 0 {
224
logger.Debugf("notifying %s ticket of impending loss of %s leadership", t.unitName, t.applicationName)
225
var ticketCh chan bool
226
ticketCh, t.waitingMinion = t.waitingMinion[0], t.waitingMinion[1:]
227
defer close(ticketCh)
228
if err := t.sendTrue(ticketCh); err != nil {
229
return errors.Trace(err)
235
// isLeader returns true if leadership is guaranteed for the Tracker's duration.
236
func (t *Tracker) isLeader() (bool, error) {
238
// Last time we looked, we were leader.
240
case <-t.tomb.Dying():
241
return false, errors.Trace(tomb.ErrDying)
243
logger.Debugf("%s renewing lease for %s leadership", t.unitName, t.applicationName)
245
if err := t.refresh(); err != nil {
246
return false, errors.Trace(err)
249
logger.Debugf("%s still has %s leadership", t.unitName, t.applicationName)
252
return !t.isMinion, nil
255
// resolveClaim will send true on the supplied channel if leadership can be
256
// successfully verified, and will always close it whether or not it sent.
257
func (t *Tracker) resolveClaim(ticketCh chan bool) error {
258
logger.Debugf("resolving %s leadership ticket for %s...", t.applicationName, t.unitName)
259
defer close(ticketCh)
260
if leader, err := t.isLeader(); err != nil {
261
return errors.Trace(err)
263
logger.Debugf("%s is not %s leader", t.unitName, t.applicationName)
266
logger.Debugf("confirming %s leadership for %s", t.applicationName, t.unitName)
267
return t.sendTrue(ticketCh)
270
// resolveWaitLeader will send true on the supplied channel if leadership can be
271
// guaranteed for the Tracker's duration. It will then close the channel. If
272
// leadership cannot be guaranteed, the channel is left untouched until either
273
// the termination of the Tracker or the next invocation of setLeader; at which
274
// point true is sent if applicable, and the channel is closed.
275
func (t *Tracker) resolveWaitLeader(ticketCh chan bool) error {
283
if leader, err := t.isLeader(); err != nil {
284
return errors.Trace(err)
286
logger.Debugf("reporting %s leadership for %s", t.applicationName, t.unitName)
287
return t.sendTrue(ticketCh)
290
logger.Debugf("waiting for %s to attain %s leadership", t.unitName, t.applicationName)
291
t.waitingLeader = append(t.waitingLeader, ticketCh)
296
// resolveWaitMinion will close the supplied channel as soon as leadership cannot
297
// be guaranteed beyond the Tracker's duration.
298
func (t *Tracker) resolveWaitMinion(ticketCh chan bool) error {
306
if leader, err := t.isLeader(); err != nil {
307
return errors.Trace(err)
309
logger.Debugf("waiting for %s to lose %s leadership", t.unitName, t.applicationName)
310
t.waitingMinion = append(t.waitingMinion, ticketCh)
313
logger.Debugf("reporting %s leadership loss for %s", t.applicationName, t.unitName)
319
func (t *Tracker) sendTrue(ticketCh chan bool) error {
321
case <-t.tomb.Dying():
323
case ticketCh <- true:
328
func (t *Tracker) submit(tickets chan chan bool) leadership.Ticket {
329
ticketCh := make(chan bool, 1)
331
case <-t.tomb.Dying():
333
case tickets <- ticketCh:
337
ready: make(chan struct{}),
343
// ticket is used by Tracker to communicate leadership status back to a client.
350
func (t *ticket) run() {
352
// This is only safe/sane because the Tracker promises to close all pending
353
// ticket channels when it shuts down.
359
// Ready is part of the leadership.Ticket interface.
360
func (t *ticket) Ready() <-chan struct{} {
364
// Wait is part of the leadership.Ticket interface.
365
func (t *ticket) Wait() bool {