~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/runner.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2012, 2013 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package worker
 
5
 
 
6
import (
 
7
        "time"
 
8
 
 
9
        "github.com/juju/errors"
 
10
        "launchpad.net/tomb"
 
11
)
 
12
 
 
13
// RestartDelay holds the length of time that a worker
 
14
// will wait between exiting and restarting.
 
15
const RestartDelay = 3 * time.Second
 
16
 
 
17
// Runner is implemented by instances capable of starting and stopping workers.
 
18
type Runner interface {
 
19
        Worker
 
20
        StartWorker(id string, startFunc func() (Worker, error)) error
 
21
        StopWorker(id string) error
 
22
}
 
23
 
 
24
// runner runs a set of workers, restarting them as necessary
 
25
// when they fail.
 
26
type runner struct {
 
27
        tomb          tomb.Tomb
 
28
        startc        chan startReq
 
29
        stopc         chan string
 
30
        donec         chan doneInfo
 
31
        startedc      chan startInfo
 
32
        isFatal       func(error) bool
 
33
        moreImportant func(err0, err1 error) bool
 
34
 
 
35
        // restartDelay holds the length of time that a worker
 
36
        // will wait between exiting and restarting.
 
37
        restartDelay time.Duration
 
38
}
 
39
 
 
40
type startReq struct {
 
41
        id    string
 
42
        start func() (Worker, error)
 
43
}
 
44
 
 
45
type startInfo struct {
 
46
        id     string
 
47
        worker Worker
 
48
}
 
49
 
 
50
type doneInfo struct {
 
51
        id  string
 
52
        err error
 
53
}
 
54
 
 
55
// NewRunner creates a new Runner.  When a worker finishes, if its error
 
56
// is deemed fatal (determined by calling isFatal), all the other workers
 
57
// will be stopped and the runner itself will finish.  Of all the fatal errors
 
58
// returned by the stopped workers, only the most important one,
 
59
// determined by calling moreImportant, will be returned from
 
60
// Runner.Wait. Non-fatal errors will not be returned.
 
61
//
 
62
// The function isFatal(err) returns whether err is a fatal error.  The
 
63
// function moreImportant(err0, err1) returns whether err0 is considered
 
64
// more important than err1.
 
65
func NewRunner(isFatal func(error) bool, moreImportant func(err0, err1 error) bool, restartDelay time.Duration) Runner {
 
66
        runner := &runner{
 
67
                startc:        make(chan startReq),
 
68
                stopc:         make(chan string),
 
69
                donec:         make(chan doneInfo),
 
70
                startedc:      make(chan startInfo),
 
71
                isFatal:       isFatal,
 
72
                moreImportant: moreImportant,
 
73
                restartDelay:  restartDelay,
 
74
        }
 
75
        go func() {
 
76
                defer runner.tomb.Done()
 
77
                runner.tomb.Kill(runner.run())
 
78
        }()
 
79
        return runner
 
80
}
 
81
 
 
82
var ErrDead = errors.New("worker runner is not running")
 
83
 
 
84
// StartWorker starts a worker running associated with the given id.
 
85
// The startFunc function will be called to create the worker;
 
86
// when the worker exits, it will be restarted as long as it
 
87
// does not return a fatal error.
 
88
//
 
89
// If there is already a worker with the given id, nothing will be done.
 
90
//
 
91
// StartWorker returns ErrDead if the runner is not running.
 
92
func (runner *runner) StartWorker(id string, startFunc func() (Worker, error)) error {
 
93
        select {
 
94
        case runner.startc <- startReq{id, startFunc}:
 
95
                return nil
 
96
        case <-runner.tomb.Dead():
 
97
        }
 
98
        return ErrDead
 
99
}
 
100
 
 
101
// StopWorker stops the worker associated with the given id.
 
102
// It does nothing if there is no such worker.
 
103
//
 
104
// StopWorker returns ErrDead if the runner is not running.
 
105
func (runner *runner) StopWorker(id string) error {
 
106
        select {
 
107
        case runner.stopc <- id:
 
108
                return nil
 
109
        case <-runner.tomb.Dead():
 
110
        }
 
111
        return ErrDead
 
112
}
 
113
 
 
114
func (runner *runner) Wait() error {
 
115
        return runner.tomb.Wait()
 
116
}
 
117
 
 
118
func (runner *runner) Kill() {
 
119
        logger.Debugf("killing runner %p", runner)
 
120
        runner.tomb.Kill(nil)
 
121
}
 
122
 
 
123
type workerInfo struct {
 
124
        start        func() (Worker, error)
 
125
        worker       Worker
 
126
        restartDelay time.Duration
 
127
        stopping     bool
 
128
}
 
129
 
 
130
func (runner *runner) run() error {
 
131
        // workers holds the current set of workers.  All workers with a
 
132
        // running goroutine have an entry here.
 
133
        workers := make(map[string]*workerInfo)
 
134
        var finalError error
 
135
 
 
136
        // isDying holds whether the runner is currently dying.  When it
 
137
        // is dying (whether as a result of being killed or due to a
 
138
        // fatal error), all existing workers are killed, no new workers
 
139
        // will be started, and the loop will exit when all existing
 
140
        // workers have stopped.
 
141
        isDying := false
 
142
        tombDying := runner.tomb.Dying()
 
143
        for {
 
144
                if isDying && len(workers) == 0 {
 
145
                        return finalError
 
146
                }
 
147
                select {
 
148
                case <-tombDying:
 
149
                        logger.Infof("runner is dying")
 
150
                        isDying = true
 
151
                        killAll(workers)
 
152
                        tombDying = nil
 
153
                case req := <-runner.startc:
 
154
                        if isDying {
 
155
                                logger.Infof("ignoring start request for %q when dying", req.id)
 
156
                                break
 
157
                        }
 
158
                        info := workers[req.id]
 
159
                        if info == nil {
 
160
                                workers[req.id] = &workerInfo{
 
161
                                        start:        req.start,
 
162
                                        restartDelay: runner.restartDelay,
 
163
                                }
 
164
                                go runner.runWorker(0, req.id, req.start)
 
165
                                break
 
166
                        }
 
167
                        if !info.stopping {
 
168
                                // The worker is already running, so leave it alone
 
169
                                break
 
170
                        }
 
171
                        // The worker previously existed and is
 
172
                        // currently being stopped.  When it eventually
 
173
                        // does stop, we'll restart it immediately with
 
174
                        // the new start function.
 
175
                        info.start = req.start
 
176
                        info.restartDelay = 0
 
177
                case id := <-runner.stopc:
 
178
                        logger.Debugf("stop %q", id)
 
179
                        if info := workers[id]; info != nil {
 
180
                                killWorker(id, info)
 
181
                        }
 
182
                case info := <-runner.startedc:
 
183
                        logger.Debugf("%q started", info.id)
 
184
                        workerInfo := workers[info.id]
 
185
                        workerInfo.worker = info.worker
 
186
                        if isDying || workerInfo.stopping {
 
187
                                killWorker(info.id, workerInfo)
 
188
                        }
 
189
                case info := <-runner.donec:
 
190
                        logger.Debugf("%q done: %v", info.id, info.err)
 
191
                        workerInfo := workers[info.id]
 
192
                        if !workerInfo.stopping && info.err == nil {
 
193
                                logger.Debugf("removing %q from known workers", info.id)
 
194
                                delete(workers, info.id)
 
195
                                break
 
196
                        }
 
197
                        if info.err != nil {
 
198
                                if runner.isFatal(info.err) {
 
199
                                        logger.Errorf("fatal %q: %v", info.id, info.err)
 
200
                                        if finalError == nil || runner.moreImportant(info.err, finalError) {
 
201
                                                finalError = info.err
 
202
                                        }
 
203
                                        delete(workers, info.id)
 
204
                                        if !isDying {
 
205
                                                isDying = true
 
206
                                                killAll(workers)
 
207
                                        }
 
208
                                        break
 
209
                                } else {
 
210
                                        logger.Errorf("exited %q: %v", info.id, info.err)
 
211
                                }
 
212
                        }
 
213
                        if workerInfo.start == nil {
 
214
                                logger.Debugf("no restart, removing %q from known workers", info.id)
 
215
 
 
216
                                // The worker has been deliberately stopped;
 
217
                                // we can now remove it from the list of workers.
 
218
                                delete(workers, info.id)
 
219
                                break
 
220
                        }
 
221
                        go runner.runWorker(workerInfo.restartDelay, info.id, workerInfo.start)
 
222
                        workerInfo.restartDelay = runner.restartDelay
 
223
                }
 
224
        }
 
225
}
 
226
 
 
227
func killAll(workers map[string]*workerInfo) {
 
228
        for id, info := range workers {
 
229
                killWorker(id, info)
 
230
        }
 
231
}
 
232
 
 
233
func killWorker(id string, info *workerInfo) {
 
234
        if info.worker != nil {
 
235
                logger.Debugf("killing %q", id)
 
236
                info.worker.Kill()
 
237
                info.worker = nil
 
238
        } else {
 
239
                logger.Debugf("couldn't kill %q, not yet started", id)
 
240
        }
 
241
        info.stopping = true
 
242
        info.start = nil
 
243
}
 
244
 
 
245
// runWorker starts the given worker after waiting for the given delay.
 
246
func (runner *runner) runWorker(delay time.Duration, id string, start func() (Worker, error)) {
 
247
        if delay > 0 {
 
248
                logger.Infof("restarting %q in %v", id, delay)
 
249
                select {
 
250
                case <-runner.tomb.Dying():
 
251
                        runner.donec <- doneInfo{id, nil}
 
252
                        return
 
253
                case <-time.After(delay):
 
254
                }
 
255
        }
 
256
        logger.Infof("start %q", id)
 
257
        worker, err := start()
 
258
        if err == nil {
 
259
                runner.startedc <- startInfo{id, worker}
 
260
                err = worker.Wait()
 
261
        }
 
262
        logger.Infof("stopped %q, err: %v", id, err)
 
263
        runner.donec <- doneInfo{id, err}
 
264
}