~fwereade/juju-core/move-hook-context

« back to all changes in this revision

Viewing changes to state/watcher.go

state,worker: full lifecycle machines watcher

MachinesWatcher now returns a list of ids that have had their
lifecycle changed in any way. It's up to the consumer to work
out details.

R=TheMue, aram
CC=
https://codereview.appspot.com/6566066

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
package state
2
2
 
3
3
import (
4
 
        "fmt"
5
4
        "labix.org/v2/mgo"
6
5
        "launchpad.net/juju-core/environs/config"
7
6
        "launchpad.net/juju-core/state/watcher"
158
157
        return nil
159
158
}
160
159
 
161
 
// MachinesWatcher notifies about machines being added or removed
162
 
// from the environment.
 
160
// MachinesWatcher notifies about lifecycle changes for all machines
 
161
// in the environment.
 
162
// 
 
163
// The first event emitted will contain the ids of all machines found
 
164
// irrespective of their life state. From then on a new event is emitted
 
165
// whenever one or more machines are added or change their lifecycle.
 
166
//
 
167
// After a machine is found to be Dead, no further event will include it.
163
168
type MachinesWatcher struct {
164
169
        commonWatcher
165
 
        out   chan *MachinesChange
166
 
        alive map[int]bool
167
 
}
168
 
 
169
 
// MachinesChange holds the ids of machines that are observed to
170
 
// be alive or dead.
171
 
type MachinesChange struct {
172
 
        Alive []int
173
 
        Dead  []int
174
 
}
175
 
 
176
 
func (c *MachinesChange) empty() bool {
177
 
        return len(c.Alive)+len(c.Dead) == 0
178
 
}
179
 
 
180
 
// WatchMachines returns a watcher for observing machines being
181
 
// added or removed.
 
170
        out  chan []int
 
171
        life map[int]Life
 
172
}
 
173
 
 
174
var lifeFields = D{{"_id", 1}, {"life", 1}}
 
175
 
 
176
// WatchMachines returns a new MachinesWatcher.
182
177
func (s *State) WatchMachines() *MachinesWatcher {
183
178
        return newMachinesWatcher(s)
184
179
}
185
180
 
186
 
// newMachinesWatcher creates and starts a watcher to watch information
187
 
// about machines being added or deleted.
 
181
// WatchMachines returns a new MachinesWatcher.
188
182
func newMachinesWatcher(st *State) *MachinesWatcher {
189
183
        w := &MachinesWatcher{
190
184
                commonWatcher: commonWatcher{st: st},
191
 
                out:           make(chan *MachinesChange),
192
 
                alive:         make(map[int]bool),
 
185
                out:           make(chan []int),
 
186
                life:          make(map[int]Life),
193
187
        }
194
188
        go func() {
195
189
                defer w.tomb.Done()
199
193
        return w
200
194
}
201
195
 
202
 
// Changes returns a channel that will receive changes when machines are
203
 
// added or deleted. The Alive field in the first event on the channel
204
 
// holds the initial state as returned by State.AllMachines.
205
 
func (w *MachinesWatcher) Changes() <-chan *MachinesChange {
 
196
// Changes returns the event channel for the MachinesWatcher.
 
197
func (w *MachinesWatcher) Changes() <-chan []int {
206
198
        return w.out
207
199
}
208
200
 
209
 
func (w *MachinesWatcher) initial(changes *MachinesChange) (err error) {
210
 
        iter := w.st.machines.Find(notDead).Select(D{{"_id", 1}}).Iter()
211
 
        var doc struct {
212
 
                Id int `bson:"_id"`
213
 
        }
 
201
func (w *MachinesWatcher) initial() (ids []int, err error) {
 
202
        iter := w.st.machines.Find(nil).Select(lifeFields).Iter()
 
203
        var doc machineDoc
214
204
        for iter.Next(&doc) {
215
 
                changes.Alive = append(changes.Alive, doc.Id)
216
 
                w.alive[doc.Id] = true
 
205
                ids = append(ids, doc.Id)
 
206
                w.life[doc.Id] = doc.Life
217
207
        }
218
208
        if err := iter.Err(); err != nil {
219
 
                return err
 
209
                return nil, err
220
210
        }
221
 
        return nil
 
211
        return ids, nil
222
212
}
223
213
 
224
 
func (w *MachinesWatcher) merge(changes *MachinesChange, ch watcher.Change) error {
 
214
func (w *MachinesWatcher) merge(ids []int, ch watcher.Change) ([]int, error) {
225
215
        id := ch.Id.(int)
226
 
        if ch.Revno == -1 && w.alive[id] {
227
 
                panic("machine removed before being dead")
228
 
        }
229
 
        qdoc := D{{"_id", id}, {"life", D{{"$ne", Dead}}}}
230
 
        c, err := w.st.machines.Find(qdoc).Count()
231
 
        if err != nil {
232
 
                return err
233
 
        }
234
 
        if c > 0 {
235
 
                if !w.alive[id] {
236
 
                        w.alive[id] = true
237
 
                        changes.Alive = append(changes.Alive, id)
238
 
                }
239
 
        } else {
240
 
                if w.alive[id] {
241
 
                        delete(w.alive, id)
242
 
                        changes.Dead = append(changes.Dead, id)
243
 
                }
244
 
        }
245
 
        return nil
 
216
        for _, pending := range ids {
 
217
                if id == pending {
 
218
                        return ids, nil
 
219
                }
 
220
        }
 
221
        if ch.Revno == -1 {
 
222
                if life, ok := w.life[id]; ok && life != Dead {
 
223
                        ids = append(ids, id)
 
224
                }
 
225
                delete(w.life, id)
 
226
                return ids, nil
 
227
        }
 
228
        doc := machineDoc{Id: id, Life: Dead}
 
229
        err := w.st.machines.FindId(id).Select(lifeFields).One(&doc)
 
230
        if err != nil && err != mgo.ErrNotFound {
 
231
                return nil, err
 
232
        }
 
233
        if life, ok := w.life[id]; !ok || doc.Life != life {
 
234
                ids = append(ids, id)
 
235
                if err != mgo.ErrNotFound {
 
236
                        w.life[id] = doc.Life
 
237
                }
 
238
        }
 
239
        return ids, nil
246
240
}
247
241
 
248
242
func (w *MachinesWatcher) loop() (err error) {
249
243
        ch := make(chan watcher.Change)
250
244
        w.st.watcher.WatchCollection(w.st.machines.Name, ch)
251
245
        defer w.st.watcher.UnwatchCollection(w.st.machines.Name, ch)
252
 
        changes := &MachinesChange{}
253
 
        if err = w.initial(changes); err != nil {
 
246
        ids, err := w.initial()
 
247
        if err != nil {
254
248
                return err
255
249
        }
256
250
        out := w.out
261
255
                case <-w.tomb.Dying():
262
256
                        return tomb.ErrDying
263
257
                case c := <-ch:
264
 
                        if err := w.merge(changes, c); err != nil {
 
258
                        if ids, err = w.merge(ids, c); err != nil {
265
259
                                return err
266
260
                        }
267
 
                        if !changes.empty() {
 
261
                        if len(ids) > 0 {
268
262
                                out = w.out
269
263
                        }
270
 
                case out <- changes:
271
 
                        changes = &MachinesChange{}
 
264
                case out <- ids:
 
265
                        ids = nil
272
266
                        out = nil
273
267
                }
274
268
        }
666
660
}
667
661
 
668
662
func (w *MachineUnitsWatcher) mergeChange(changes *MachineUnitsChange, ch watcher.Change) (err error) {
669
 
        if ch.Revno == -1 {
670
 
                return fmt.Errorf("machine has been removed")
671
 
        }
672
663
        err = w.machine.Refresh()
673
664
        if err != nil {
674
665
                return err