1
// Copyright 2013 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
15
ErrStopped = errors.New("try was stopped")
16
ErrClosed = errors.New("try was closed")
19
// Try represents an attempt made concurrently
20
// by a number of goroutines.
28
combineErrors func(err0, err1 error) error
33
// NewTry returns an object that runs functions concurrently until one
34
// succeeds. The result of the first function that returns without an
35
// error is available from the Result method. If maxParallel is
36
// positive, it limits the number of concurrently running functions.
38
// The function combineErrors(oldErr, newErr) is called to determine
39
// the error return (see the Result method). The first time it is called,
40
// oldErr will be nil; subsequently oldErr will be the error previously
41
// returned by combineErrors. If combineErrors is nil, the last
42
// encountered error is chosen.
43
func NewTry(maxParallel int, combineErrors func(err0, err1 error) error) *Try {
44
if combineErrors == nil {
45
combineErrors = chooseLastError
48
combineErrors: combineErrors,
49
maxParallel: maxParallel,
50
close: make(chan struct{}, 1),
51
result: make(chan result),
52
start: make(chan func()),
54
if t.maxParallel > 0 {
55
t.limiter = make(chan struct{}, t.maxParallel)
56
for i := 0; i < t.maxParallel; i++ {
57
t.limiter <- struct{}{}
69
func chooseLastError(err0, err1 error) error {
78
func (t *Try) loop() (io.Closer, error) {
91
err = t.combineErrors(err, r.err)
93
if close == nil && nrunning == 0 {
96
case <-t.tomb.Dying():
98
return nil, ErrStopped
110
// Start requests the given function to be started, waiting until there
111
// are less than maxParallel functions running if necessary. It returns
112
// an error if the function has not been started (ErrClosed if the Try
113
// has been closed, and ErrStopped if the try is finishing).
115
// The function should listen on the stop channel and return if it
116
// receives a value, though this is advisory only - the Try does not
117
// wait for all started functions to return before completing.
119
// If the function returns a nil error but some earlier try was
120
// successful (that is, the returned value is being discarded),
121
// its returned value will be closed by calling its Close method.
122
func (t *Try) Start(try func(stop <-chan struct{}) (io.Closer, error)) error {
123
if t.limiter != nil {
124
// Wait for availability slot.
127
case <-t.tomb.Dying():
133
dying := t.tomb.Dying()
135
val, err := try(dying)
136
if t.limiter != nil {
137
// Signal availability slot is now free.
138
t.limiter <- struct{}{}
142
case t.result <- result{val, err}:
159
// Close closes the Try. No more functions will be started
160
// if Start is called, and the Try will terminate when all
161
// outstanding functions have completed (or earlier
163
func (t *Try) Close() {
165
defer t.closeMutex.Unlock()
173
// Dead returns a channel that is closed when the
175
func (t *Try) Dead() <-chan struct{} {
179
// Wait waits for the Try to complete and returns the same
180
// error returned by Result.
181
func (t *Try) Wait() error {
185
// Result waits for the Try to complete and returns the result of the
186
// first successful function started by Start.
188
// If no function succeeded, the last error returned by
189
// combineErrors is returned. If there were no errors or
190
// combineErrors returned nil, ErrStopped is returned.
191
func (t *Try) Result() (io.Closer, error) {
193
return t.endResult, err
196
// Kill stops the try and all its currently executing functions.
197
func (t *Try) Kill() {