~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/apiserver/presence/pinger.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2016 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package presence
 
5
 
 
6
import (
 
7
        "fmt"
 
8
        "time"
 
9
 
 
10
        "github.com/juju/errors"
 
11
        "github.com/juju/juju/worker"
 
12
        "github.com/juju/juju/worker/catacomb"
 
13
        "github.com/juju/loggo"
 
14
        "github.com/juju/utils/clock"
 
15
        "gopkg.in/juju/names.v2"
 
16
)
 
17
 
 
18
// Pinger exposes some methods implemented by state/presence.Pinger.
 
19
type Pinger interface {
 
20
        // Stop kills the pinger, then waits for it to exit.
 
21
        Stop() error
 
22
        // Wait waits for the pinger to stop.
 
23
        Wait() error
 
24
}
 
25
 
 
26
// Config contains the information necessary to drive a Worker.
 
27
type Config struct {
 
28
 
 
29
        // Identity records the entity whose connectedness is being
 
30
        // affirmed by this worker. It's used to create a logger that
 
31
        // can let us see which agent's pinger is actually failing.
 
32
        Identity names.Tag
 
33
 
 
34
        // Start starts a new, running Pinger or returns an error.
 
35
        Start func() (Pinger, error)
 
36
 
 
37
        // Clock is used to throttle failed Start attempts.
 
38
        Clock clock.Clock
 
39
 
 
40
        // RetryDelay controls by how much we throttle failed Start
 
41
        // attempts. Note that we only apply the delay when a Start
 
42
        // fails; if a Pinger ran, however briefly, we'll try to restart
 
43
        // it immediately, so as to minimise the changes of erroneously
 
44
        // causing agent-lost to be reported.
 
45
        RetryDelay time.Duration
 
46
}
 
47
 
 
48
// Validate returns an error if Config cannot be expected to drive a
 
49
// Worker.
 
50
func (config Config) Validate() error {
 
51
        if config.Identity == nil {
 
52
                return errors.NotValidf("nil Identity")
 
53
        }
 
54
        if config.Start == nil {
 
55
                return errors.NotValidf("nil Start")
 
56
        }
 
57
        if config.Clock == nil {
 
58
                return errors.NotValidf("nil Clock")
 
59
        }
 
60
        if config.RetryDelay <= 0 {
 
61
                return errors.NotValidf("non-positive RetryDelay")
 
62
        }
 
63
        return nil
 
64
}
 
65
 
 
66
// New returns a Worker backed by Config. The caller is responsible for
 
67
// Kill()ing the Worker and handling any errors returned from Wait();
 
68
// but as it happens it's designed to be an apiserver/common.Resource,
 
69
// and never to exit unless Kill()ed, so in practice Stop(), which will
 
70
// call Kill() and Wait() internally, is Good Enough.
 
71
func New(config Config) (*Worker, error) {
 
72
        if err := config.Validate(); err != nil {
 
73
                return nil, errors.Trace(err)
 
74
        }
 
75
        name := fmt.Sprintf("juju.apiserver.presence.%s", config.Identity)
 
76
        w := &Worker{
 
77
                config:  config,
 
78
                logger:  loggo.GetLogger(name),
 
79
                running: make(chan struct{}),
 
80
        }
 
81
        err := catacomb.Invoke(catacomb.Plan{
 
82
                Site: &w.catacomb,
 
83
                Work: w.loop,
 
84
        })
 
85
        if err != nil {
 
86
                return nil, errors.Trace(err)
 
87
        }
 
88
 
 
89
        // To support unhappy assumptions in apiserver/server_test.go,
 
90
        // we block New until at least one attempt to start a Pinger
 
91
        // has been made. This preserves the apparent behaviour of an
 
92
        // unwrapped Pinger under normal conditions.
 
93
        select {
 
94
        case <-w.catacomb.Dying():
 
95
                if err := w.Wait(); err != nil {
 
96
                        return nil, errors.Trace(err)
 
97
                }
 
98
                return nil, errors.New("worker stopped abnormally without reporting an error")
 
99
        case <-w.running:
 
100
                return w, nil
 
101
        }
 
102
}
 
103
 
 
104
// Worker creates a Pinger as configured, and recreates it as it fails
 
105
// until the Worker is stopped; at which point it shuts down any extant
 
106
// Pinger before returning.
 
107
type Worker struct {
 
108
        catacomb catacomb.Catacomb
 
109
        config   Config
 
110
        logger   loggo.Logger
 
111
        running  chan struct{}
 
112
}
 
113
 
 
114
// Kill is part of the worker.Worker interface.
 
115
func (w *Worker) Kill() {
 
116
        w.catacomb.Kill(nil)
 
117
}
 
118
 
 
119
// Wait is part of the worker.Worker interface.
 
120
func (w *Worker) Wait() error {
 
121
        return w.catacomb.Wait()
 
122
}
 
123
 
 
124
// Stop is part of the apiserver/common.Resource interface.
 
125
//
 
126
// It's not a very good idea -- see comments on lp:1572237 -- but we're
 
127
// only addressing the proximate cause of the issue here.
 
128
func (w *Worker) Stop() error {
 
129
        return worker.Stop(w)
 
130
}
 
131
 
 
132
// loop runs Pingers until w is stopped.
 
133
func (w *Worker) loop() error {
 
134
        var delay time.Duration
 
135
        for {
 
136
                select {
 
137
                case <-w.catacomb.Dying():
 
138
                        return w.catacomb.ErrDying()
 
139
                case <-w.config.Clock.After(delay):
 
140
                        maybePinger := w.maybeStartPinger()
 
141
                        w.reportRunning()
 
142
                        w.waitPinger(maybePinger)
 
143
                }
 
144
                delay = w.config.RetryDelay
 
145
        }
 
146
}
 
147
 
 
148
// maybeStartPinger starts and returns a new Pinger; or, if it
 
149
// encounters an error, logs it and returns nil.
 
150
func (w *Worker) maybeStartPinger() Pinger {
 
151
        w.logger.Tracef("starting pinger...")
 
152
        pinger, err := w.config.Start()
 
153
        if err != nil {
 
154
                w.logger.Errorf("cannot start pinger: %v", err)
 
155
                return nil
 
156
        }
 
157
        w.logger.Tracef("pinger started")
 
158
        return pinger
 
159
}
 
160
 
 
161
// reportRunning is a foul hack designed to delay apparent worker start
 
162
// until at least one ping has been delivered (or attempted). It only
 
163
// exists to make various distant tests, which should ideally not be
 
164
// depending on these implementation details, reliable.
 
165
func (w *Worker) reportRunning() {
 
166
        select {
 
167
        case <-w.running:
 
168
        default:
 
169
                close(w.running)
 
170
        }
 
171
}
 
172
 
 
173
// waitPinger waits for the death of either the pinger or the worker;
 
174
// stops the pinger if necessary; and returns once the pinger is
 
175
// finished. If pinger is nil, it returns immediately.
 
176
func (w *Worker) waitPinger(pinger Pinger) {
 
177
        if pinger == nil {
 
178
                return
 
179
        }
 
180
 
 
181
        // Set up a channel that will last as long as this method call.
 
182
        done := make(chan struct{})
 
183
        defer close(done)
 
184
 
 
185
        // Start a goroutine to stop the Pinger if the worker is killed.
 
186
        // If the enclosing method completes, we know that the Pinger
 
187
        // has already stopped, and we can return immediately.
 
188
        //
 
189
        // Note that we ignore errors out of Stop(), depending on the
 
190
        // Pinger to manage errors properly and report them via Wait()
 
191
        // below.
 
192
        go func() {
 
193
                select {
 
194
                case <-done:
 
195
                case <-w.catacomb.Dying():
 
196
                        w.logger.Tracef("stopping pinger")
 
197
                        pinger.Stop()
 
198
                }
 
199
        }()
 
200
 
 
201
        // Now, just wait for the Pinger to stop. It might be caused by
 
202
        // the Worker's death, or it might have failed on its own; in
 
203
        // any case, errors are worth recording, but we don't need to
 
204
        // respond in any way because that's loop()'s responsibility.
 
205
        w.logger.Tracef("waiting for pinger...")
 
206
        if err := pinger.Wait(); err != nil {
 
207
                w.logger.Errorf("pinger failed: %v", err)
 
208
        }
 
209
}