1
// Copyright 2012, 2013 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
8
"launchpad.net/juju-core/log"
13
// RestartDelay holds the length of time that a worker
14
// will wait between exiting and restarting.
15
var RestartDelay = 3 * time.Second
17
// Worker is implemented by a running worker.
18
type Worker interface {
19
// Kill asks the worker to stop without necessarily
20
// waiting for it to do so.
22
// Wait waits for the worker to exit and returns any
23
// error encountered when it was running.
27
// Runner runs a set of workers, restarting them as necessary
34
startedc chan startInfo
35
isFatal func(error) bool
36
moreImportant func(err0, err1 error) bool
39
type startReq struct {
41
start func() (Worker, error)
44
type startInfo struct {
49
type doneInfo struct {
54
// NewRunner creates a new Runner. When a worker finishes, if its error
55
// is deemed fatal (determined by calling isFatal), all the other workers
56
// will be stopped and the runner itself will finish. Of all the fatal errors
57
// returned by the stopped workers, only the most important one,
58
// determined by calling moreImportant, will be returned from
59
// Runner.Wait. Non-fatal errors will not be returned.
61
// The function isFatal(err) returns whether err is a fatal error. The
62
// function moreImportant(err0, err1) returns whether err0 is considered
63
// more important than err1.
64
func NewRunner(isFatal func(error) bool, moreImportant func(err0, err1 error) bool) *Runner {
66
startc: make(chan startReq),
67
stopc: make(chan string),
68
donec: make(chan doneInfo),
69
startedc: make(chan startInfo),
71
moreImportant: moreImportant,
74
defer runner.tomb.Done()
75
runner.tomb.Kill(runner.run())
80
var ErrDead = errors.New("worker runner is not running")
82
// StartWorker starts a worker running associated with the given id.
83
// The startFunc function will be called to create the worker;
84
// when the worker exits, it will be restarted as long as it
85
// does not return a fatal error.
87
// If there is already a worker with the given id, nothing will be done.
89
// StartWorker returns ErrDead if the runner is not running.
90
func (runner *Runner) StartWorker(id string, startFunc func() (Worker, error)) error {
92
case runner.startc <- startReq{id, startFunc}:
94
case <-runner.tomb.Dead():
99
// StopWorker stops the worker associated with the given id.
100
// It does nothing if there is no such worker.
102
// StopWorker returns ErrDead if the runner is not running.
103
func (runner *Runner) StopWorker(id string) error {
105
case runner.stopc <- id:
107
case <-runner.tomb.Dead():
112
func (runner *Runner) Wait() error {
113
return runner.tomb.Wait()
116
func (runner *Runner) Kill() {
117
log.Debugf("worker: killing runner %p", runner)
118
runner.tomb.Kill(nil)
121
// Stop kills the given worker and waits for it to exit.
122
func Stop(worker Worker) error {
127
type workerInfo struct {
128
start func() (Worker, error)
130
restartDelay time.Duration
134
func (runner *Runner) run() error {
135
// workers holds the current set of workers. All workers with a
136
// running goroutine have an entry here.
137
workers := make(map[string]*workerInfo)
140
// isDying holds whether the runner is currently dying. When it
141
// is dying (whether as a result of being killed or due to a
142
// fatal error), all existing workers are killed, no new workers
143
// will be started, and the loop will exit when all existing
144
// workers have stopped.
146
tombDying := runner.tomb.Dying()
148
if isDying && len(workers) == 0 {
153
log.Infof("worker: runner is dying")
157
case req := <-runner.startc:
159
log.Infof("worker: ignoring start request for %q when dying", req.id)
162
info := workers[req.id]
164
workers[req.id] = &workerInfo{
166
restartDelay: RestartDelay,
168
go runner.runWorker(0, req.id, req.start)
172
// The worker is already running, so leave it alone
175
// The worker previously existed and is
176
// currently being stopped. When it eventually
177
// does stop, we'll restart it immediately with
178
// the new start function.
179
info.start = req.start
180
info.restartDelay = 0
181
case id := <-runner.stopc:
182
if info := workers[id]; info != nil {
185
case info := <-runner.startedc:
186
workerInfo := workers[info.id]
187
workerInfo.worker = info.worker
189
killWorker(info.id, workerInfo)
191
case info := <-runner.donec:
192
workerInfo := workers[info.id]
193
if !workerInfo.stopping && info.err == nil {
194
info.err = errors.New("unexpected quit")
197
if runner.isFatal(info.err) {
198
log.Errorf("worker: fatal %q: %v", info.id, info.err)
199
if finalError == nil || runner.moreImportant(info.err, finalError) {
200
finalError = info.err
202
delete(workers, info.id)
209
log.Errorf("worker: exited %q: %v", info.id, info.err)
212
if workerInfo.start == nil {
213
// The worker has been deliberately stopped;
214
// we can now remove it from the list of workers.
215
delete(workers, info.id)
218
go runner.runWorker(workerInfo.restartDelay, info.id, workerInfo.start)
219
workerInfo.restartDelay = RestartDelay
225
func killAll(workers map[string]*workerInfo) {
226
for id, info := range workers {
231
func killWorker(id string, info *workerInfo) {
232
if info.worker != nil {
233
log.Debugf("worker: killing %q", id)
241
// runWorker starts the given worker after waiting for the given delay.
242
func (runner *Runner) runWorker(delay time.Duration, id string, start func() (Worker, error)) {
244
log.Infof("worker: restarting %q in %v", id, delay)
246
case <-runner.tomb.Dying():
247
runner.donec <- doneInfo{id, nil}
249
case <-time.After(delay):
252
log.Infof("worker: start %q", id)
253
worker, err := start()
255
runner.startedc <- startInfo{id, worker}
258
runner.donec <- doneInfo{id, err}