1
// Copyright 2016 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
10
"github.com/juju/errors"
11
"github.com/juju/juju/worker"
12
"github.com/juju/juju/worker/catacomb"
13
"github.com/juju/loggo"
14
"github.com/juju/utils/clock"
15
"gopkg.in/juju/names.v2"
18
// Pinger exposes some methods implemented by state/presence.Pinger.
19
type Pinger interface {
20
// Stop kills the pinger, then waits for it to exit.
22
// Wait waits for the pinger to stop.
26
// Config contains the information necessary to drive a Worker.
29
// Identity records the entity whose connectedness is being
30
// affirmed by this worker. It's used to create a logger that
31
// can let us see which agent's pinger is actually failing.
34
// Start starts a new, running Pinger or returns an error.
35
Start func() (Pinger, error)
37
// Clock is used to throttle failed Start attempts.
40
// RetryDelay controls by how much we throttle failed Start
41
// attempts. Note that we only apply the delay when a Start
42
// fails; if a Pinger ran, however briefly, we'll try to restart
43
// it immediately, so as to minimise the changes of erroneously
44
// causing agent-lost to be reported.
45
RetryDelay time.Duration
48
// Validate returns an error if Config cannot be expected to drive a
50
func (config Config) Validate() error {
51
if config.Identity == nil {
52
return errors.NotValidf("nil Identity")
54
if config.Start == nil {
55
return errors.NotValidf("nil Start")
57
if config.Clock == nil {
58
return errors.NotValidf("nil Clock")
60
if config.RetryDelay <= 0 {
61
return errors.NotValidf("non-positive RetryDelay")
66
// New returns a Worker backed by Config. The caller is responsible for
67
// Kill()ing the Worker and handling any errors returned from Wait();
68
// but as it happens it's designed to be an apiserver/common.Resource,
69
// and never to exit unless Kill()ed, so in practice Stop(), which will
70
// call Kill() and Wait() internally, is Good Enough.
71
func New(config Config) (*Worker, error) {
72
if err := config.Validate(); err != nil {
73
return nil, errors.Trace(err)
75
name := fmt.Sprintf("juju.apiserver.presence.%s", config.Identity)
78
logger: loggo.GetLogger(name),
79
running: make(chan struct{}),
81
err := catacomb.Invoke(catacomb.Plan{
86
return nil, errors.Trace(err)
89
// To support unhappy assumptions in apiserver/server_test.go,
90
// we block New until at least one attempt to start a Pinger
91
// has been made. This preserves the apparent behaviour of an
92
// unwrapped Pinger under normal conditions.
94
case <-w.catacomb.Dying():
95
if err := w.Wait(); err != nil {
96
return nil, errors.Trace(err)
98
return nil, errors.New("worker stopped abnormally without reporting an error")
104
// Worker creates a Pinger as configured, and recreates it as it fails
105
// until the Worker is stopped; at which point it shuts down any extant
106
// Pinger before returning.
108
catacomb catacomb.Catacomb
111
running chan struct{}
114
// Kill is part of the worker.Worker interface.
115
func (w *Worker) Kill() {
119
// Wait is part of the worker.Worker interface.
120
func (w *Worker) Wait() error {
121
return w.catacomb.Wait()
124
// Stop is part of the apiserver/common.Resource interface.
126
// It's not a very good idea -- see comments on lp:1572237 -- but we're
127
// only addressing the proximate cause of the issue here.
128
func (w *Worker) Stop() error {
129
return worker.Stop(w)
132
// loop runs Pingers until w is stopped.
133
func (w *Worker) loop() error {
134
var delay time.Duration
137
case <-w.catacomb.Dying():
138
return w.catacomb.ErrDying()
139
case <-w.config.Clock.After(delay):
140
maybePinger := w.maybeStartPinger()
142
w.waitPinger(maybePinger)
144
delay = w.config.RetryDelay
148
// maybeStartPinger starts and returns a new Pinger; or, if it
149
// encounters an error, logs it and returns nil.
150
func (w *Worker) maybeStartPinger() Pinger {
151
w.logger.Tracef("starting pinger...")
152
pinger, err := w.config.Start()
154
w.logger.Errorf("cannot start pinger: %v", err)
157
w.logger.Tracef("pinger started")
161
// reportRunning is a foul hack designed to delay apparent worker start
162
// until at least one ping has been delivered (or attempted). It only
163
// exists to make various distant tests, which should ideally not be
164
// depending on these implementation details, reliable.
165
func (w *Worker) reportRunning() {
173
// waitPinger waits for the death of either the pinger or the worker;
174
// stops the pinger if necessary; and returns once the pinger is
175
// finished. If pinger is nil, it returns immediately.
176
func (w *Worker) waitPinger(pinger Pinger) {
181
// Set up a channel that will last as long as this method call.
182
done := make(chan struct{})
185
// Start a goroutine to stop the Pinger if the worker is killed.
186
// If the enclosing method completes, we know that the Pinger
187
// has already stopped, and we can return immediately.
189
// Note that we ignore errors out of Stop(), depending on the
190
// Pinger to manage errors properly and report them via Wait()
195
case <-w.catacomb.Dying():
196
w.logger.Tracef("stopping pinger")
201
// Now, just wait for the Pinger to stop. It might be caused by
202
// the Worker's death, or it might have failed on its own; in
203
// any case, errors are worth recording, but we don't need to
204
// respond in any way because that's loop()'s responsibility.
205
w.logger.Tracef("waiting for pinger...")
206
if err := pinger.Wait(); err != nil {
207
w.logger.Errorf("pinger failed: %v", err)