~dstroppa/juju-core/joyent-provider-storage

« back to all changes in this revision

Viewing changes to utils/parallel/try.go

  • Committer: Daniele Stroppa
  • Date: 2014-01-08 15:58:10 UTC
  • mfrom: (1953.1.231 juju-core)
  • Revision ID: daniele.stroppa@joyent.com-20140108155810-xecbwrqkb5i0fyoe
Merging trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2013 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package parallel
 
5
 
 
6
import (
 
7
        "errors"
 
8
        "io"
 
9
        "sync"
 
10
 
 
11
        "launchpad.net/tomb"
 
12
)
 
13
 
 
14
var (
 
15
        ErrStopped = errors.New("try was stopped")
 
16
        ErrClosed  = errors.New("try was closed")
 
17
)
 
18
 
 
19
// Try represents an attempt made concurrently
 
20
// by a number of goroutines.
 
21
type Try struct {
 
22
        tomb          tomb.Tomb
 
23
        closeMutex    sync.Mutex
 
24
        close         chan struct{}
 
25
        limiter       chan struct{}
 
26
        start         chan func()
 
27
        result        chan result
 
28
        combineErrors func(err0, err1 error) error
 
29
        maxParallel   int
 
30
        endResult     io.Closer
 
31
}
 
32
 
 
33
// NewTry returns an object that runs functions concurrently until one
 
34
// succeeds. The result of the first function that returns without an
 
35
// error is available from the Result method. If maxParallel is
 
36
// positive, it limits the number of concurrently running functions.
 
37
//
 
38
// The function combineErrors(oldErr, newErr) is called to determine
 
39
// the error return (see the Result method). The first time it is called,
 
40
// oldErr will be nil; subsequently oldErr will be the error previously
 
41
// returned by combineErrors. If combineErrors is nil, the last
 
42
// encountered error is chosen.
 
43
func NewTry(maxParallel int, combineErrors func(err0, err1 error) error) *Try {
 
44
        if combineErrors == nil {
 
45
                combineErrors = chooseLastError
 
46
        }
 
47
        t := &Try{
 
48
                combineErrors: combineErrors,
 
49
                maxParallel:   maxParallel,
 
50
                close:         make(chan struct{}, 1),
 
51
                result:        make(chan result),
 
52
                start:         make(chan func()),
 
53
        }
 
54
        if t.maxParallel > 0 {
 
55
                t.limiter = make(chan struct{}, t.maxParallel)
 
56
                for i := 0; i < t.maxParallel; i++ {
 
57
                        t.limiter <- struct{}{}
 
58
                }
 
59
        }
 
60
        go func() {
 
61
                defer t.tomb.Done()
 
62
                val, err := t.loop()
 
63
                t.endResult = val
 
64
                t.tomb.Kill(err)
 
65
        }()
 
66
        return t
 
67
}
 
68
 
 
69
func chooseLastError(err0, err1 error) error {
 
70
        return err1
 
71
}
 
72
 
 
73
type result struct {
 
74
        val io.Closer
 
75
        err error
 
76
}
 
77
 
 
78
func (t *Try) loop() (io.Closer, error) {
 
79
        var err error
 
80
        close := t.close
 
81
        nrunning := 0
 
82
        for {
 
83
                select {
 
84
                case f := <-t.start:
 
85
                        nrunning++
 
86
                        go f()
 
87
                case r := <-t.result:
 
88
                        if r.err == nil {
 
89
                                return r.val, r.err
 
90
                        }
 
91
                        err = t.combineErrors(err, r.err)
 
92
                        nrunning--
 
93
                        if close == nil && nrunning == 0 {
 
94
                                return nil, err
 
95
                        }
 
96
                case <-t.tomb.Dying():
 
97
                        if err == nil {
 
98
                                return nil, ErrStopped
 
99
                        }
 
100
                        return nil, err
 
101
                case <-close:
 
102
                        close = nil
 
103
                        if nrunning == 0 {
 
104
                                return nil, err
 
105
                        }
 
106
                }
 
107
        }
 
108
}
 
109
 
 
110
// Start requests the given function to be started, waiting until there
 
111
// are less than maxParallel functions running if necessary. It returns
 
112
// an error if the function has not been started (ErrClosed if the Try
 
113
// has been closed, and ErrStopped if the try is finishing).
 
114
//
 
115
// The function should listen on the stop channel and return if it
 
116
// receives a value, though this is advisory only - the Try does not
 
117
// wait for all started functions to return before completing.
 
118
//
 
119
// If the function returns a nil error but some earlier try was
 
120
// successful (that is, the returned value is being discarded),
 
121
// its returned value will be closed by calling its Close method.
 
122
func (t *Try) Start(try func(stop <-chan struct{}) (io.Closer, error)) error {
 
123
        if t.limiter != nil {
 
124
                // Wait for availability slot.
 
125
                select {
 
126
                case <-t.limiter:
 
127
                case <-t.tomb.Dying():
 
128
                        return ErrStopped
 
129
                case <-t.close:
 
130
                        return ErrClosed
 
131
                }
 
132
        }
 
133
        dying := t.tomb.Dying()
 
134
        f := func() {
 
135
                val, err := try(dying)
 
136
                if t.limiter != nil {
 
137
                        // Signal availability slot is now free.
 
138
                        t.limiter <- struct{}{}
 
139
                }
 
140
                // Deliver result.
 
141
                select {
 
142
                case t.result <- result{val, err}:
 
143
                case <-dying:
 
144
                        if err == nil {
 
145
                                val.Close()
 
146
                        }
 
147
                }
 
148
        }
 
149
        select {
 
150
        case t.start <- f:
 
151
                return nil
 
152
        case <-dying:
 
153
                return ErrStopped
 
154
        case <-t.close:
 
155
                return ErrClosed
 
156
        }
 
157
}
 
158
 
 
159
// Close closes the Try. No more functions will be started
 
160
// if Start is called, and the Try will terminate when all
 
161
// outstanding functions have completed (or earlier
 
162
// if one succeeds)
 
163
func (t *Try) Close() {
 
164
        t.closeMutex.Lock()
 
165
        defer t.closeMutex.Unlock()
 
166
        select {
 
167
        case <-t.close:
 
168
        default:
 
169
                close(t.close)
 
170
        }
 
171
}
 
172
 
 
173
// Dead returns a channel that is closed when the
 
174
// Try completes.
 
175
func (t *Try) Dead() <-chan struct{} {
 
176
        return t.tomb.Dead()
 
177
}
 
178
 
 
179
// Wait waits for the Try to complete and returns the same
 
180
// error returned by Result.
 
181
func (t *Try) Wait() error {
 
182
        return t.tomb.Wait()
 
183
}
 
184
 
 
185
// Result waits for the Try to complete and returns the result of the
 
186
// first successful function started by Start.
 
187
//
 
188
// If no function succeeded, the last error returned by
 
189
// combineErrors is returned. If there were no errors or
 
190
// combineErrors returned nil, ErrStopped is returned.
 
191
func (t *Try) Result() (io.Closer, error) {
 
192
        err := t.tomb.Wait()
 
193
        return t.endResult, err
 
194
}
 
195
 
 
196
// Kill stops the try and all its currently executing functions.
 
197
func (t *Try) Kill() {
 
198
        t.tomb.Kill(nil)
 
199
}