1
// Copyright 2012, 2013 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
9
"github.com/juju/errors"
13
// RestartDelay holds the length of time that a worker
14
// will wait between exiting and restarting.
15
const RestartDelay = 3 * time.Second
17
// Runner is implemented by instances capable of starting and stopping workers.
18
type Runner interface {
20
StartWorker(id string, startFunc func() (Worker, error)) error
21
StopWorker(id string) error
24
// runner runs a set of workers, restarting them as necessary
31
startedc chan startInfo
32
isFatal func(error) bool
33
moreImportant func(err0, err1 error) bool
35
// restartDelay holds the length of time that a worker
36
// will wait between exiting and restarting.
37
restartDelay time.Duration
40
type startReq struct {
42
start func() (Worker, error)
45
type startInfo struct {
50
type doneInfo struct {
55
// NewRunner creates a new Runner. When a worker finishes, if its error
56
// is deemed fatal (determined by calling isFatal), all the other workers
57
// will be stopped and the runner itself will finish. Of all the fatal errors
58
// returned by the stopped workers, only the most important one,
59
// determined by calling moreImportant, will be returned from
60
// Runner.Wait. Non-fatal errors will not be returned.
62
// The function isFatal(err) returns whether err is a fatal error. The
63
// function moreImportant(err0, err1) returns whether err0 is considered
64
// more important than err1.
65
func NewRunner(isFatal func(error) bool, moreImportant func(err0, err1 error) bool, restartDelay time.Duration) Runner {
67
startc: make(chan startReq),
68
stopc: make(chan string),
69
donec: make(chan doneInfo),
70
startedc: make(chan startInfo),
72
moreImportant: moreImportant,
73
restartDelay: restartDelay,
76
defer runner.tomb.Done()
77
runner.tomb.Kill(runner.run())
82
var ErrDead = errors.New("worker runner is not running")
84
// StartWorker starts a worker running associated with the given id.
85
// The startFunc function will be called to create the worker;
86
// when the worker exits, it will be restarted as long as it
87
// does not return a fatal error.
89
// If there is already a worker with the given id, nothing will be done.
91
// StartWorker returns ErrDead if the runner is not running.
92
func (runner *runner) StartWorker(id string, startFunc func() (Worker, error)) error {
94
case runner.startc <- startReq{id, startFunc}:
96
case <-runner.tomb.Dead():
101
// StopWorker stops the worker associated with the given id.
102
// It does nothing if there is no such worker.
104
// StopWorker returns ErrDead if the runner is not running.
105
func (runner *runner) StopWorker(id string) error {
107
case runner.stopc <- id:
109
case <-runner.tomb.Dead():
114
func (runner *runner) Wait() error {
115
return runner.tomb.Wait()
118
func (runner *runner) Kill() {
119
logger.Debugf("killing runner %p", runner)
120
runner.tomb.Kill(nil)
123
type workerInfo struct {
124
start func() (Worker, error)
126
restartDelay time.Duration
130
func (runner *runner) run() error {
131
// workers holds the current set of workers. All workers with a
132
// running goroutine have an entry here.
133
workers := make(map[string]*workerInfo)
136
// isDying holds whether the runner is currently dying. When it
137
// is dying (whether as a result of being killed or due to a
138
// fatal error), all existing workers are killed, no new workers
139
// will be started, and the loop will exit when all existing
140
// workers have stopped.
142
tombDying := runner.tomb.Dying()
144
if isDying && len(workers) == 0 {
149
logger.Infof("runner is dying")
153
case req := <-runner.startc:
155
logger.Infof("ignoring start request for %q when dying", req.id)
158
info := workers[req.id]
160
workers[req.id] = &workerInfo{
162
restartDelay: runner.restartDelay,
164
go runner.runWorker(0, req.id, req.start)
168
// The worker is already running, so leave it alone
171
// The worker previously existed and is
172
// currently being stopped. When it eventually
173
// does stop, we'll restart it immediately with
174
// the new start function.
175
info.start = req.start
176
info.restartDelay = 0
177
case id := <-runner.stopc:
178
logger.Debugf("stop %q", id)
179
if info := workers[id]; info != nil {
182
case info := <-runner.startedc:
183
logger.Debugf("%q started", info.id)
184
workerInfo := workers[info.id]
185
workerInfo.worker = info.worker
186
if isDying || workerInfo.stopping {
187
killWorker(info.id, workerInfo)
189
case info := <-runner.donec:
190
logger.Debugf("%q done: %v", info.id, info.err)
191
workerInfo := workers[info.id]
192
if !workerInfo.stopping && info.err == nil {
193
logger.Debugf("removing %q from known workers", info.id)
194
delete(workers, info.id)
198
if runner.isFatal(info.err) {
199
logger.Errorf("fatal %q: %v", info.id, info.err)
200
if finalError == nil || runner.moreImportant(info.err, finalError) {
201
finalError = info.err
203
delete(workers, info.id)
210
logger.Errorf("exited %q: %v", info.id, info.err)
213
if workerInfo.start == nil {
214
logger.Debugf("no restart, removing %q from known workers", info.id)
216
// The worker has been deliberately stopped;
217
// we can now remove it from the list of workers.
218
delete(workers, info.id)
221
go runner.runWorker(workerInfo.restartDelay, info.id, workerInfo.start)
222
workerInfo.restartDelay = runner.restartDelay
227
func killAll(workers map[string]*workerInfo) {
228
for id, info := range workers {
233
func killWorker(id string, info *workerInfo) {
234
if info.worker != nil {
235
logger.Debugf("killing %q", id)
239
logger.Debugf("couldn't kill %q, not yet started", id)
245
// runWorker starts the given worker after waiting for the given delay.
246
func (runner *runner) runWorker(delay time.Duration, id string, start func() (Worker, error)) {
248
logger.Infof("restarting %q in %v", id, delay)
250
case <-runner.tomb.Dying():
251
runner.donec <- doneInfo{id, nil}
253
case <-time.After(delay):
256
logger.Infof("start %q", id)
257
worker, err := start()
259
runner.startedc <- startInfo{id, worker}
262
logger.Infof("stopped %q, err: %v", id, err)
263
runner.donec <- doneInfo{id, err}