161
// MachinesWatcher notifies about machines being added or removed
162
// from the environment.
160
// MachinesWatcher notifies about lifecycle changes for all machines
161
// in the environment.
163
// The first event emitted will contain the ids of all machines found
164
// irrespective of their life state. From then on a new event is emitted
165
// whenever one or more machines are added or change their lifecycle.
167
// After a machine is found to be Dead, no further event will include it.
163
168
type MachinesWatcher struct {
165
out chan *MachinesChange
169
// MachinesChange holds the ids of machines that are observed to
171
type MachinesChange struct {
176
func (c *MachinesChange) empty() bool {
177
return len(c.Alive)+len(c.Dead) == 0
180
// WatchMachines returns a watcher for observing machines being
174
var lifeFields = D{{"_id", 1}, {"life", 1}}
176
// WatchMachines returns a new MachinesWatcher.
182
177
func (s *State) WatchMachines() *MachinesWatcher {
183
178
return newMachinesWatcher(s)
186
// newMachinesWatcher creates and starts a watcher to watch information
187
// about machines being added or deleted.
181
// WatchMachines returns a new MachinesWatcher.
188
182
func newMachinesWatcher(st *State) *MachinesWatcher {
189
183
w := &MachinesWatcher{
190
184
commonWatcher: commonWatcher{st: st},
191
out: make(chan *MachinesChange),
192
alive: make(map[int]bool),
185
out: make(chan []int),
186
life: make(map[int]Life),
195
189
defer w.tomb.Done()
202
// Changes returns a channel that will receive changes when machines are
203
// added or deleted. The Alive field in the first event on the channel
204
// holds the initial state as returned by State.AllMachines.
205
func (w *MachinesWatcher) Changes() <-chan *MachinesChange {
196
// Changes returns the event channel for the MachinesWatcher.
197
func (w *MachinesWatcher) Changes() <-chan []int {
209
func (w *MachinesWatcher) initial(changes *MachinesChange) (err error) {
210
iter := w.st.machines.Find(notDead).Select(D{{"_id", 1}}).Iter()
201
func (w *MachinesWatcher) initial() (ids []int, err error) {
202
iter := w.st.machines.Find(nil).Select(lifeFields).Iter()
214
204
for iter.Next(&doc) {
215
changes.Alive = append(changes.Alive, doc.Id)
216
w.alive[doc.Id] = true
205
ids = append(ids, doc.Id)
206
w.life[doc.Id] = doc.Life
218
208
if err := iter.Err(); err != nil {
224
func (w *MachinesWatcher) merge(changes *MachinesChange, ch watcher.Change) error {
214
func (w *MachinesWatcher) merge(ids []int, ch watcher.Change) ([]int, error) {
225
215
id := ch.Id.(int)
226
if ch.Revno == -1 && w.alive[id] {
227
panic("machine removed before being dead")
229
qdoc := D{{"_id", id}, {"life", D{{"$ne", Dead}}}}
230
c, err := w.st.machines.Find(qdoc).Count()
237
changes.Alive = append(changes.Alive, id)
242
changes.Dead = append(changes.Dead, id)
216
for _, pending := range ids {
222
if life, ok := w.life[id]; ok && life != Dead {
223
ids = append(ids, id)
228
doc := machineDoc{Id: id, Life: Dead}
229
err := w.st.machines.FindId(id).Select(lifeFields).One(&doc)
230
if err != nil && err != mgo.ErrNotFound {
233
if life, ok := w.life[id]; !ok || doc.Life != life {
234
ids = append(ids, id)
235
if err != mgo.ErrNotFound {
236
w.life[id] = doc.Life
248
242
func (w *MachinesWatcher) loop() (err error) {
249
243
ch := make(chan watcher.Change)
250
244
w.st.watcher.WatchCollection(w.st.machines.Name, ch)
251
245
defer w.st.watcher.UnwatchCollection(w.st.machines.Name, ch)
252
changes := &MachinesChange{}
253
if err = w.initial(changes); err != nil {
246
ids, err := w.initial()