~ubuntu-branches/ubuntu/saucy/juju-core/saucy-proposed

« back to all changes in this revision

Viewing changes to src/launchpad.net/juju-core/worker/runner.go

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2013-07-11 17:18:27 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20130711171827-vjqkg40r0dlf7ys2
Tags: 1.11.2-0ubuntu1
* New upstream release.
* Make juju-core the default juju (LP: #1190634):
  - d/control: Add virtual package juju -> juju-core.
  - d/juju-core.postinst.in: Bump priority of alternatives over that of
    python juju packages.
* Enable for all architectures (LP: #1172505):
  - d/control: Version BD on golang-go to >= 2:1.1.1 to ensure CGO
    support for non-x86 archs, make juju-core Arch: any.
  - d/README.source: Dropped - no longer required.
* d/watch: Updated for new upstream tarball naming.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2012, 2013 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package worker
 
5
 
 
6
import (
 
7
        "errors"
 
8
        "launchpad.net/juju-core/log"
 
9
        "launchpad.net/tomb"
 
10
        "time"
 
11
)
 
12
 
 
13
// RestartDelay holds the length of time that a worker
 
14
// will wait between exiting and restarting.
 
15
var RestartDelay = 3 * time.Second
 
16
 
 
17
// Worker is implemented by a running worker.
 
18
type Worker interface {
 
19
        // Kill asks the worker to stop without necessarily
 
20
        // waiting for it to do so.
 
21
        Kill()
 
22
        // Wait waits for the worker to exit and returns any
 
23
        // error encountered when it was running.
 
24
        Wait() error
 
25
}
 
26
 
 
27
// Runner runs a set of workers, restarting them as necessary
 
28
// when they fail.
 
29
type Runner struct {
 
30
        tomb          tomb.Tomb
 
31
        startc        chan startReq
 
32
        stopc         chan string
 
33
        donec         chan doneInfo
 
34
        startedc      chan startInfo
 
35
        isFatal       func(error) bool
 
36
        moreImportant func(err0, err1 error) bool
 
37
}
 
38
 
 
39
type startReq struct {
 
40
        id    string
 
41
        start func() (Worker, error)
 
42
}
 
43
 
 
44
type startInfo struct {
 
45
        id     string
 
46
        worker Worker
 
47
}
 
48
 
 
49
type doneInfo struct {
 
50
        id  string
 
51
        err error
 
52
}
 
53
 
 
54
// NewRunner creates a new Runner.  When a worker finishes, if its error
 
55
// is deemed fatal (determined by calling isFatal), all the other workers
 
56
// will be stopped and the runner itself will finish.  Of all the fatal errors
 
57
// returned by the stopped workers, only the most important one,
 
58
// determined by calling moreImportant, will be returned from
 
59
// Runner.Wait. Non-fatal errors will not be returned.
 
60
//
 
61
// The function isFatal(err) returns whether err is a fatal error.  The
 
62
// function moreImportant(err0, err1) returns whether err0 is considered
 
63
// more important than err1.
 
64
func NewRunner(isFatal func(error) bool, moreImportant func(err0, err1 error) bool) *Runner {
 
65
        runner := &Runner{
 
66
                startc:        make(chan startReq),
 
67
                stopc:         make(chan string),
 
68
                donec:         make(chan doneInfo),
 
69
                startedc:      make(chan startInfo),
 
70
                isFatal:       isFatal,
 
71
                moreImportant: moreImportant,
 
72
        }
 
73
        go func() {
 
74
                defer runner.tomb.Done()
 
75
                runner.tomb.Kill(runner.run())
 
76
        }()
 
77
        return runner
 
78
}
 
79
 
 
80
var ErrDead = errors.New("worker runner is not running")
 
81
 
 
82
// StartWorker starts a worker running associated with the given id.
 
83
// The startFunc function will be called to create the worker;
 
84
// when the worker exits, it will be restarted as long as it
 
85
// does not return a fatal error.
 
86
//
 
87
// If there is already a worker with the given id, nothing will be done.
 
88
//
 
89
// StartWorker returns ErrDead if the runner is not running.
 
90
func (runner *Runner) StartWorker(id string, startFunc func() (Worker, error)) error {
 
91
        select {
 
92
        case runner.startc <- startReq{id, startFunc}:
 
93
                return nil
 
94
        case <-runner.tomb.Dead():
 
95
        }
 
96
        return ErrDead
 
97
}
 
98
 
 
99
// StopWorker stops the worker associated with the given id.
 
100
// It does nothing if there is no such worker.
 
101
//
 
102
// StopWorker returns ErrDead if the runner is not running.
 
103
func (runner *Runner) StopWorker(id string) error {
 
104
        select {
 
105
        case runner.stopc <- id:
 
106
                return nil
 
107
        case <-runner.tomb.Dead():
 
108
        }
 
109
        return ErrDead
 
110
}
 
111
 
 
112
func (runner *Runner) Wait() error {
 
113
        return runner.tomb.Wait()
 
114
}
 
115
 
 
116
func (runner *Runner) Kill() {
 
117
        log.Debugf("worker: killing runner %p", runner)
 
118
        runner.tomb.Kill(nil)
 
119
}
 
120
 
 
121
// Stop kills the given worker and waits for it to exit.
 
122
func Stop(worker Worker) error {
 
123
        worker.Kill()
 
124
        return worker.Wait()
 
125
}
 
126
 
 
127
type workerInfo struct {
 
128
        start        func() (Worker, error)
 
129
        worker       Worker
 
130
        restartDelay time.Duration
 
131
        stopping     bool
 
132
}
 
133
 
 
134
func (runner *Runner) run() error {
 
135
        // workers holds the current set of workers.  All workers with a
 
136
        // running goroutine have an entry here.
 
137
        workers := make(map[string]*workerInfo)
 
138
        var finalError error
 
139
 
 
140
        // isDying holds whether the runner is currently dying.  When it
 
141
        // is dying (whether as a result of being killed or due to a
 
142
        // fatal error), all existing workers are killed, no new workers
 
143
        // will be started, and the loop will exit when all existing
 
144
        // workers have stopped.
 
145
        isDying := false
 
146
        tombDying := runner.tomb.Dying()
 
147
        for {
 
148
                if isDying && len(workers) == 0 {
 
149
                        return finalError
 
150
                }
 
151
                select {
 
152
                case <-tombDying:
 
153
                        log.Infof("worker: runner is dying")
 
154
                        isDying = true
 
155
                        killAll(workers)
 
156
                        tombDying = nil
 
157
                case req := <-runner.startc:
 
158
                        if isDying {
 
159
                                log.Infof("worker: ignoring start request for %q when dying", req.id)
 
160
                                break
 
161
                        }
 
162
                        info := workers[req.id]
 
163
                        if info == nil {
 
164
                                workers[req.id] = &workerInfo{
 
165
                                        start:        req.start,
 
166
                                        restartDelay: RestartDelay,
 
167
                                }
 
168
                                go runner.runWorker(0, req.id, req.start)
 
169
                                break
 
170
                        }
 
171
                        if !info.stopping {
 
172
                                // The worker is already running, so leave it alone
 
173
                                break
 
174
                        }
 
175
                        // The worker previously existed and is
 
176
                        // currently being stopped.  When it eventually
 
177
                        // does stop, we'll restart it immediately with
 
178
                        // the new start function.
 
179
                        info.start = req.start
 
180
                        info.restartDelay = 0
 
181
                case id := <-runner.stopc:
 
182
                        if info := workers[id]; info != nil {
 
183
                                killWorker(id, info)
 
184
                        }
 
185
                case info := <-runner.startedc:
 
186
                        workerInfo := workers[info.id]
 
187
                        workerInfo.worker = info.worker
 
188
                        if isDying {
 
189
                                killWorker(info.id, workerInfo)
 
190
                        }
 
191
                case info := <-runner.donec:
 
192
                        workerInfo := workers[info.id]
 
193
                        if !workerInfo.stopping && info.err == nil {
 
194
                                info.err = errors.New("unexpected quit")
 
195
                        }
 
196
                        if info.err != nil {
 
197
                                if runner.isFatal(info.err) {
 
198
                                        log.Errorf("worker: fatal %q: %v", info.id, info.err)
 
199
                                        if finalError == nil || runner.moreImportant(info.err, finalError) {
 
200
                                                finalError = info.err
 
201
                                        }
 
202
                                        delete(workers, info.id)
 
203
                                        if !isDying {
 
204
                                                isDying = true
 
205
                                                killAll(workers)
 
206
                                        }
 
207
                                        break
 
208
                                } else {
 
209
                                        log.Errorf("worker: exited %q: %v", info.id, info.err)
 
210
                                }
 
211
                        }
 
212
                        if workerInfo.start == nil {
 
213
                                // The worker has been deliberately stopped;
 
214
                                // we can now remove it from the list of workers.
 
215
                                delete(workers, info.id)
 
216
                                break
 
217
                        }
 
218
                        go runner.runWorker(workerInfo.restartDelay, info.id, workerInfo.start)
 
219
                        workerInfo.restartDelay = RestartDelay
 
220
                }
 
221
        }
 
222
        panic("unreachable")
 
223
}
 
224
 
 
225
func killAll(workers map[string]*workerInfo) {
 
226
        for id, info := range workers {
 
227
                killWorker(id, info)
 
228
        }
 
229
}
 
230
 
 
231
func killWorker(id string, info *workerInfo) {
 
232
        if info.worker != nil {
 
233
                log.Debugf("worker: killing %q", id)
 
234
                info.worker.Kill()
 
235
                info.worker = nil
 
236
        }
 
237
        info.stopping = true
 
238
        info.start = nil
 
239
}
 
240
 
 
241
// runWorker starts the given worker after waiting for the given delay.
 
242
func (runner *Runner) runWorker(delay time.Duration, id string, start func() (Worker, error)) {
 
243
        if delay > 0 {
 
244
                log.Infof("worker: restarting %q in %v", id, delay)
 
245
                select {
 
246
                case <-runner.tomb.Dying():
 
247
                        runner.donec <- doneInfo{id, nil}
 
248
                        return
 
249
                case <-time.After(delay):
 
250
                }
 
251
        }
 
252
        log.Infof("worker: start %q", id)
 
253
        worker, err := start()
 
254
        if err == nil {
 
255
                runner.startedc <- startInfo{id, worker}
 
256
                err = worker.Wait()
 
257
        }
 
258
        runner.donec <- doneInfo{id, err}
 
259
}