~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/leadership/tracker.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2015 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package leadership
 
5
 
 
6
import (
 
7
        "time"
 
8
 
 
9
        "github.com/juju/errors"
 
10
        "github.com/juju/loggo"
 
11
        "gopkg.in/juju/names.v2"
 
12
        "launchpad.net/tomb"
 
13
 
 
14
        "github.com/juju/juju/core/leadership"
 
15
)
 
16
 
 
17
var logger = loggo.GetLogger("juju.worker.leadership")
 
18
 
 
19
type Tracker struct {
 
20
        tomb            tomb.Tomb
 
21
        claimer         leadership.Claimer
 
22
        unitName        string
 
23
        applicationName string
 
24
        duration        time.Duration
 
25
        isMinion        bool
 
26
 
 
27
        claimLease        chan struct{}
 
28
        renewLease        <-chan time.Time
 
29
        claimTickets      chan chan bool
 
30
        waitLeaderTickets chan chan bool
 
31
        waitMinionTickets chan chan bool
 
32
        waitingLeader     []chan bool
 
33
        waitingMinion     []chan bool
 
34
}
 
35
 
 
36
// NewTracker returns a *Tracker that attempts to claim and retain service
 
37
// leadership for the supplied unit. It will claim leadership for twice the
 
38
// supplied duration, and once it's leader it will renew leadership every
 
39
// time the duration elapses.
 
40
// Thus, successful leadership claims on the resulting Tracker will guarantee
 
41
// leadership for the duration supplied here without generating additional
 
42
// calls to the supplied manager (which may very well be on the other side of
 
43
// a network connection).
 
44
func NewTracker(tag names.UnitTag, claimer leadership.Claimer, duration time.Duration) *Tracker {
 
45
        unitName := tag.Id()
 
46
        serviceName, _ := names.UnitApplication(unitName)
 
47
        t := &Tracker{
 
48
                unitName:          unitName,
 
49
                applicationName:   serviceName,
 
50
                claimer:           claimer,
 
51
                duration:          duration,
 
52
                claimTickets:      make(chan chan bool),
 
53
                waitLeaderTickets: make(chan chan bool),
 
54
                waitMinionTickets: make(chan chan bool),
 
55
        }
 
56
        go func() {
 
57
                defer t.tomb.Done()
 
58
                defer func() {
 
59
                        for _, ticketCh := range t.waitingLeader {
 
60
                                close(ticketCh)
 
61
                        }
 
62
                        for _, ticketCh := range t.waitingMinion {
 
63
                                close(ticketCh)
 
64
                        }
 
65
                }()
 
66
                err := t.loop()
 
67
                // TODO: jam 2015-04-02 is this the most elegant way to make
 
68
                // sure we shutdown cleanly? Essentially the lowest level sees
 
69
                // that we are dying, and propagates an ErrDying up to us so
 
70
                // that we shut down, which we then are passing back into
 
71
                // Tomb.Kill().
 
72
                // Tomb.Kill() special cases the exact object ErrDying, and has
 
73
                // no idea about errors.Cause and the general errors.Trace
 
74
                // mechanisms that we use.
 
75
                // So we explicitly unwrap before calling tomb.Kill() else
 
76
                // tomb.Stop() thinks that we have a genuine error.
 
77
                switch cause := errors.Cause(err); cause {
 
78
                case tomb.ErrDying:
 
79
                        err = cause
 
80
                }
 
81
                t.tomb.Kill(err)
 
82
        }()
 
83
        return t
 
84
}
 
85
 
 
86
// Kill is part of the worker.Worker interface.
 
87
func (t *Tracker) Kill() {
 
88
        t.tomb.Kill(nil)
 
89
}
 
90
 
 
91
// Wait is part of the worker.Worker interface.
 
92
func (t *Tracker) Wait() error {
 
93
        return t.tomb.Wait()
 
94
}
 
95
 
 
96
// ApplicationName is part of the leadership.Tracker interface.
 
97
func (t *Tracker) ApplicationName() string {
 
98
        return t.applicationName
 
99
}
 
100
 
 
101
// ClaimDuration is part of the leadership.Tracker interface.
 
102
func (t *Tracker) ClaimDuration() time.Duration {
 
103
        return t.duration
 
104
}
 
105
 
 
106
// ClaimLeader is part of the leadership.Tracker interface.
 
107
func (t *Tracker) ClaimLeader() leadership.Ticket {
 
108
        return t.submit(t.claimTickets)
 
109
}
 
110
 
 
111
// WaitLeader is part of the leadership.Tracker interface.
 
112
func (t *Tracker) WaitLeader() leadership.Ticket {
 
113
        return t.submit(t.waitLeaderTickets)
 
114
}
 
115
 
 
116
// WaitMinion is part of the leadership.Tracker interface.
 
117
func (t *Tracker) WaitMinion() leadership.Ticket {
 
118
        return t.submit(t.waitMinionTickets)
 
119
}
 
120
 
 
121
func (t *Tracker) loop() error {
 
122
        logger.Debugf("%s making initial claim for %s leadership", t.unitName, t.applicationName)
 
123
        if err := t.refresh(); err != nil {
 
124
                return errors.Trace(err)
 
125
        }
 
126
        for {
 
127
                select {
 
128
                case <-t.tomb.Dying():
 
129
                        return tomb.ErrDying
 
130
                case <-t.claimLease:
 
131
                        logger.Debugf("%s claiming lease for %s leadership", t.unitName, t.applicationName)
 
132
                        t.claimLease = nil
 
133
                        if err := t.refresh(); err != nil {
 
134
                                return errors.Trace(err)
 
135
                        }
 
136
                case <-t.renewLease:
 
137
                        logger.Debugf("%s renewing lease for %s leadership", t.unitName, t.applicationName)
 
138
                        t.renewLease = nil
 
139
                        if err := t.refresh(); err != nil {
 
140
                                return errors.Trace(err)
 
141
                        }
 
142
                case ticketCh := <-t.claimTickets:
 
143
                        logger.Debugf("%s got claim request for %s leadership", t.unitName, t.applicationName)
 
144
                        if err := t.resolveClaim(ticketCh); err != nil {
 
145
                                return errors.Trace(err)
 
146
                        }
 
147
                case ticketCh := <-t.waitLeaderTickets:
 
148
                        logger.Debugf("%s got wait request for %s leadership", t.unitName, t.applicationName)
 
149
                        if err := t.resolveWaitLeader(ticketCh); err != nil {
 
150
                                return errors.Trace(err)
 
151
                        }
 
152
                case ticketCh := <-t.waitMinionTickets:
 
153
                        logger.Debugf("%s got wait request for %s leadership loss", t.unitName, t.applicationName)
 
154
                        if err := t.resolveWaitMinion(ticketCh); err != nil {
 
155
                                return errors.Trace(err)
 
156
                        }
 
157
                }
 
158
        }
 
159
}
 
160
 
 
161
// refresh makes a leadership request, and updates Tracker state to conform to
 
162
// latest known reality.
 
163
func (t *Tracker) refresh() error {
 
164
        logger.Debugf("checking %s for %s leadership", t.unitName, t.applicationName)
 
165
        leaseDuration := 2 * t.duration
 
166
        // TODO(fwereade): 2016-03-17 lp:1558657
 
167
        untilTime := time.Now().Add(leaseDuration)
 
168
        err := t.claimer.ClaimLeadership(t.applicationName, t.unitName, leaseDuration)
 
169
        switch {
 
170
        case err == nil:
 
171
                return t.setLeader(untilTime)
 
172
        case errors.Cause(err) == leadership.ErrClaimDenied:
 
173
                return t.setMinion()
 
174
        }
 
175
        return errors.Annotatef(err, "leadership failure")
 
176
}
 
177
 
 
178
// setLeader arranges for lease renewal.
 
179
func (t *Tracker) setLeader(untilTime time.Time) error {
 
180
        logger.Debugf("%s confirmed for %s leadership until %s", t.unitName, t.applicationName, untilTime)
 
181
        renewTime := untilTime.Add(-t.duration)
 
182
        logger.Infof("%s will renew %s leadership at %s", t.unitName, t.applicationName, renewTime)
 
183
        t.isMinion = false
 
184
        t.claimLease = nil
 
185
        // TODO(fwereade): 2016-03-17 lp:1558657
 
186
        t.renewLease = time.After(renewTime.Sub(time.Now()))
 
187
 
 
188
        for len(t.waitingLeader) > 0 {
 
189
                logger.Debugf("notifying %s ticket of impending %s leadership", t.unitName, t.applicationName)
 
190
                var ticketCh chan bool
 
191
                ticketCh, t.waitingLeader = t.waitingLeader[0], t.waitingLeader[1:]
 
192
                defer close(ticketCh)
 
193
                if err := t.sendTrue(ticketCh); err != nil {
 
194
                        return errors.Trace(err)
 
195
                }
 
196
        }
 
197
        return nil
 
198
}
 
199
 
 
200
// setMinion arranges for lease acquisition when there's an opportunity.
 
201
func (t *Tracker) setMinion() error {
 
202
        logger.Infof("%s leadership for %s denied", t.applicationName, t.unitName)
 
203
        t.isMinion = true
 
204
        t.renewLease = nil
 
205
        if t.claimLease == nil {
 
206
                t.claimLease = make(chan struct{})
 
207
                go func() {
 
208
                        defer close(t.claimLease)
 
209
                        logger.Debugf("%s waiting for %s leadership release", t.unitName, t.applicationName)
 
210
                        err := t.claimer.BlockUntilLeadershipReleased(t.applicationName)
 
211
                        if err != nil {
 
212
                                logger.Debugf("error while %s waiting for %s leadership release: %v", t.unitName, t.applicationName, err)
 
213
                        }
 
214
                        // We don't need to do anything else with the error, because we just
 
215
                        // close the claimLease channel and trigger a leadership claim on the
 
216
                        // main loop; if anything's gone seriously wrong we'll find out right
 
217
                        // away and shut down anyway. (And if this goroutine outlives the
 
218
                        // Tracker, it keeps it around as a zombie, but I don't see a way
 
219
                        // around that...)
 
220
                }()
 
221
        }
 
222
 
 
223
        for len(t.waitingMinion) > 0 {
 
224
                logger.Debugf("notifying %s ticket of impending loss of %s leadership", t.unitName, t.applicationName)
 
225
                var ticketCh chan bool
 
226
                ticketCh, t.waitingMinion = t.waitingMinion[0], t.waitingMinion[1:]
 
227
                defer close(ticketCh)
 
228
                if err := t.sendTrue(ticketCh); err != nil {
 
229
                        return errors.Trace(err)
 
230
                }
 
231
        }
 
232
        return nil
 
233
}
 
234
 
 
235
// isLeader returns true if leadership is guaranteed for the Tracker's duration.
 
236
func (t *Tracker) isLeader() (bool, error) {
 
237
        if !t.isMinion {
 
238
                // Last time we looked, we were leader.
 
239
                select {
 
240
                case <-t.tomb.Dying():
 
241
                        return false, errors.Trace(tomb.ErrDying)
 
242
                case <-t.renewLease:
 
243
                        logger.Debugf("%s renewing lease for %s leadership", t.unitName, t.applicationName)
 
244
                        t.renewLease = nil
 
245
                        if err := t.refresh(); err != nil {
 
246
                                return false, errors.Trace(err)
 
247
                        }
 
248
                default:
 
249
                        logger.Debugf("%s still has %s leadership", t.unitName, t.applicationName)
 
250
                }
 
251
        }
 
252
        return !t.isMinion, nil
 
253
}
 
254
 
 
255
// resolveClaim will send true on the supplied channel if leadership can be
 
256
// successfully verified, and will always close it whether or not it sent.
 
257
func (t *Tracker) resolveClaim(ticketCh chan bool) error {
 
258
        logger.Debugf("resolving %s leadership ticket for %s...", t.applicationName, t.unitName)
 
259
        defer close(ticketCh)
 
260
        if leader, err := t.isLeader(); err != nil {
 
261
                return errors.Trace(err)
 
262
        } else if !leader {
 
263
                logger.Debugf("%s is not %s leader", t.unitName, t.applicationName)
 
264
                return nil
 
265
        }
 
266
        logger.Debugf("confirming %s leadership for %s", t.applicationName, t.unitName)
 
267
        return t.sendTrue(ticketCh)
 
268
}
 
269
 
 
270
// resolveWaitLeader will send true on the supplied channel if leadership can be
 
271
// guaranteed for the Tracker's duration. It will then close the channel. If
 
272
// leadership cannot be guaranteed, the channel is left untouched until either
 
273
// the termination of the Tracker or the next invocation of setLeader; at which
 
274
// point true is sent if applicable, and the channel is closed.
 
275
func (t *Tracker) resolveWaitLeader(ticketCh chan bool) error {
 
276
        var dontClose bool
 
277
        defer func() {
 
278
                if !dontClose {
 
279
                        close(ticketCh)
 
280
                }
 
281
        }()
 
282
 
 
283
        if leader, err := t.isLeader(); err != nil {
 
284
                return errors.Trace(err)
 
285
        } else if leader {
 
286
                logger.Debugf("reporting %s leadership for %s", t.applicationName, t.unitName)
 
287
                return t.sendTrue(ticketCh)
 
288
        }
 
289
 
 
290
        logger.Debugf("waiting for %s to attain %s leadership", t.unitName, t.applicationName)
 
291
        t.waitingLeader = append(t.waitingLeader, ticketCh)
 
292
        dontClose = true
 
293
        return nil
 
294
}
 
295
 
 
296
// resolveWaitMinion will close the supplied channel as soon as leadership cannot
 
297
// be guaranteed beyond the Tracker's duration.
 
298
func (t *Tracker) resolveWaitMinion(ticketCh chan bool) error {
 
299
        var dontClose bool
 
300
        defer func() {
 
301
                if !dontClose {
 
302
                        close(ticketCh)
 
303
                }
 
304
        }()
 
305
 
 
306
        if leader, err := t.isLeader(); err != nil {
 
307
                return errors.Trace(err)
 
308
        } else if leader {
 
309
                logger.Debugf("waiting for %s to lose %s leadership", t.unitName, t.applicationName)
 
310
                t.waitingMinion = append(t.waitingMinion, ticketCh)
 
311
                dontClose = true
 
312
        } else {
 
313
                logger.Debugf("reporting %s leadership loss for %s", t.applicationName, t.unitName)
 
314
        }
 
315
        return nil
 
316
 
 
317
}
 
318
 
 
319
func (t *Tracker) sendTrue(ticketCh chan bool) error {
 
320
        select {
 
321
        case <-t.tomb.Dying():
 
322
                return tomb.ErrDying
 
323
        case ticketCh <- true:
 
324
                return nil
 
325
        }
 
326
}
 
327
 
 
328
func (t *Tracker) submit(tickets chan chan bool) leadership.Ticket {
 
329
        ticketCh := make(chan bool, 1)
 
330
        select {
 
331
        case <-t.tomb.Dying():
 
332
                close(ticketCh)
 
333
        case tickets <- ticketCh:
 
334
        }
 
335
        ticket := &ticket{
 
336
                ch:    ticketCh,
 
337
                ready: make(chan struct{}),
 
338
        }
 
339
        go ticket.run()
 
340
        return ticket
 
341
}
 
342
 
 
343
// ticket is used by Tracker to communicate leadership status back to a client.
 
344
type ticket struct {
 
345
        ch      chan bool
 
346
        ready   chan struct{}
 
347
        success bool
 
348
}
 
349
 
 
350
func (t *ticket) run() {
 
351
        defer close(t.ready)
 
352
        // This is only safe/sane because the Tracker promises to close all pending
 
353
        // ticket channels when it shuts down.
 
354
        if <-t.ch {
 
355
                t.success = true
 
356
        }
 
357
}
 
358
 
 
359
// Ready is part of the leadership.Ticket interface.
 
360
func (t *ticket) Ready() <-chan struct{} {
 
361
        return t.ready
 
362
}
 
363
 
 
364
// Wait is part of the leadership.Ticket interface.
 
365
func (t *ticket) Wait() bool {
 
366
        <-t.ready
 
367
        return t.success
 
368
}