~rogpeppe/juju-core/232-megawatcher-allinfo

« back to all changes in this revision

Viewing changes to state/megawatcher.go

  • Committer: Roger Peppe
  • Date: 2013-03-08 23:56:33 UTC
  • Revision ID: roger.peppe@canonical.com-20130308235633-vo41r1v0samqqm9h
state: refactor allWatcher

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
package state
2
2
 
3
3
import (
 
4
        "container/list"
4
5
        "fmt"
5
 
        "launchpad.net/juju-core/state/watcher"
6
 
        "launchpad.net/juju-core/log"
7
6
        "labix.org/v2/mgo"
8
 
        "container/list"
9
7
        "launchpad.net/juju-core/state/api/params"
 
8
        "launchpad.net/juju-core/state/watcher"
 
9
        "launchpad.net/tomb"
10
10
        "reflect"
11
 
        "launchpad.net/tomb"
12
11
)
13
12
 
14
13
// StateWatcher watches any changes to the state.
85
84
// allWatcher holds a shared record of all current state and replies to
86
85
// requests from StateWatches to tell them when it changes.
87
86
type allWatcher struct {
88
 
        tomb    tomb.Tomb
89
 
        st      *State
 
87
        tomb tomb.Tomb
 
88
        st   *State
 
89
 
 
90
        // request receives requests from StateWatcher clients.
90
91
        request chan *allRequest
 
92
 
 
93
        // all holds information on everything the allWatcher cares about.
 
94
        all *allInfo
 
95
 
91
96
        // newInfo describes how to create a new entity info value given
92
97
        // the name of the collection it's stored in.
93
 
        newInfo     map[string]func() params.EntityInfo
 
98
        newInfo map[string]func() params.EntityInfo
 
99
 
 
100
        // Each entry in the waiting map holds a linked list of Next requests
 
101
        // outstanding for the associated StateWatcher.
 
102
        waiting map[*StateWatcher]*allRequest
94
103
}
95
104
 
96
105
func newAllWatcher(st *State) *allWatcher {
103
112
                        st.relations.Name: func() params.EntityInfo { return new(params.RelationInfo) },
104
113
                },
105
114
                request: make(chan *allRequest),
 
115
                waiting: make(map[*StateWatcher]*allRequest),
 
116
                all:     newAllInfo(),
106
117
        }
107
118
        go func() {
108
119
                defer aw.tomb.Done()
144
155
}
145
156
 
146
157
func (aw *allWatcher) loop() error {
147
 
        // Each entry in the map holds a linked list of Next requests
148
 
        // outstanding for the associated StateWatcher.
149
 
        reqs := make(map[*StateWatcher]*allRequest)
150
 
 
 
158
        if err := aw.getAll(); err != nil {
 
159
                return err
 
160
        }
151
161
        in := make(chan watcher.Change)
152
 
        all := newAllInfo()
153
 
        if err := all.getAll(aw.st); err != nil {
154
 
                return err
155
 
        }
156
162
        aw.st.watcher.WatchCollection(aw.st.machines.Name, in)
157
163
        aw.st.watcher.WatchCollection(aw.st.services.Name, in)
158
164
        aw.st.watcher.WatchCollection(aw.st.units.Name, in)
164
170
                aw.st.watcher.UnwatchCollection(aw.st.relations.Name, in)
165
171
        }()
166
172
        for {
167
 
                log.Printf("allWatcher: waiting for something to happen")
168
173
                select {
169
174
                case <-aw.tomb.Dying():
170
 
                        log.Printf("allWatcher: dying")
171
175
                        return tomb.ErrDying
172
176
                case change := <-in:
173
 
                        log.Printf("allWatcher: got change on %s", change.C)
174
 
                        // TODO(rog) fetch concurrently/batched?
175
 
                        id := entityId{change.C, change.Id}
176
 
                        log.Printf("allWatcher: fetching %v", id)
177
 
                        info, err := aw.fetch(id)
178
 
                        if err == mgo.ErrNotFound {
179
 
                                log.Printf("allWatcher: marking %v as removed", id)
180
 
                                all.markRemoved(id)
181
 
                                break
182
 
                        }
183
 
                        if err != nil {
184
 
                                log.Printf("allWatcher: error fetching: %v", err)
 
177
                        if err := aw.changed(change); err != nil {
185
178
                                return err
186
179
                        }
187
 
                        log.Printf("allWatcher: updating %v, %#v", id, info)
188
 
                        all.update(id, info)
189
 
                        log.Printf("allWatcher: updated")
190
180
                case req := <-aw.request:
191
 
                        log.Printf("allWatcher: got request")
192
 
                        if req.w.stopped {
193
 
                                // The watcher has previously been stopped.
194
 
                                req.reply <- false
195
 
                                break
196
 
                        }
197
 
                        if req.reply == nil {
198
 
                                // This is a request to stop the watcher.
199
 
                                for req := reqs[req.w]; req != nil; req = req.next {
200
 
                                        req.reply <- false
201
 
                                }
202
 
                                delete(reqs, req.w)
203
 
                                aw.leave(all, req.w)
204
 
                                break
205
 
                        }
206
 
                        // Add request to head of list.
207
 
                        req.next = reqs[req.w]
208
 
                        reqs[req.w] = req
209
 
                }
210
 
                log.Printf("allWatcher: satisfying requests")
211
 
                // Something has changed - go through all watchers that
212
 
                // have outstanding requests and satisfy them if
213
 
                // possible.
214
 
                for w, req := range reqs {
215
 
                        changes := all.changesSince(w.revno)
216
 
                        req.changes = changes
217
 
                        w.revno = all.latestRevno
218
 
                        req.reply <- true
219
 
                        if req := req.next; req == nil {
220
 
                                // Last request for this watcher.
221
 
                                delete(reqs, w)
222
 
                        } else {
223
 
                                reqs[w] = req
224
 
                        }
225
 
                        aw.adjustRefCounts(all, w.revno)
226
 
                }
 
181
                        aw.handle(req)
 
182
                }
 
183
                aw.respond()
227
184
        }
228
185
        panic("unreachable")
229
186
}
230
187
 
 
188
func (aw *allWatcher) handle(req *allRequest) {
 
189
        if req.w.stopped {
 
190
                // The watcher has previously been stopped.
 
191
                req.reply <- false
 
192
                return
 
193
        }
 
194
        if req.reply == nil {
 
195
                // This is a request to stop the watcher.
 
196
                for req := aw.waiting[req.w]; req != nil; req = req.next {
 
197
                        req.reply <- false
 
198
                }
 
199
                delete(aw.waiting, req.w)
 
200
                aw.leave(req.w)
 
201
                return
 
202
        }
 
203
        // Add request to head of list.
 
204
        req.next = aw.waiting[req.w]
 
205
        aw.waiting[req.w] = req
 
206
}
 
207
 
 
208
// respond responds to all outstanding requests that can be satisfied.
 
209
func (aw *allWatcher) respond() {
 
210
        for w, req := range aw.waiting {
 
211
                changes := aw.all.changesSince(w.revno)
 
212
                if len(changes) == 0 {
 
213
                        continue
 
214
                }
 
215
                req.changes = changes
 
216
                w.revno = aw.all.latestRevno
 
217
                req.reply <- true
 
218
                if req := req.next; req == nil {
 
219
                        // Last request for this watcher.
 
220
                        delete(aw.waiting, w)
 
221
                } else {
 
222
                        aw.waiting[w] = req
 
223
                }
 
224
                aw.adjustRefCounts(w.revno)
 
225
        }
 
226
}
 
227
 
 
228
func (aw *allWatcher) changed(change watcher.Change) error {
 
229
        id := entityId{change.C, change.Id}
 
230
        info, err := aw.fetch(id)
 
231
        if err == mgo.ErrNotFound {
 
232
                aw.all.markRemoved(id)
 
233
                return nil
 
234
        }
 
235
        if err != nil {
 
236
                return err
 
237
        }
 
238
        aw.all.update(id, info)
 
239
        return nil
 
240
}
 
241
 
231
242
func (aw *allWatcher) fetch(id entityId) (params.EntityInfo, error) {
232
243
        info := aw.newInfo[id.collection]()
233
244
        collection := collectionForInfo(aw.st, info)
242
253
// created since the given revno and decrements the reference counts of
243
254
// all entities created before the given revno that have now been
244
255
// removed.
245
 
func (aw *allWatcher) adjustRefCounts(all *allInfo, revno int64) {
246
 
        for e := all.list.Front(); e != nil; {
 
256
func (aw *allWatcher) adjustRefCounts(revno int64) {
 
257
        for e := aw.all.list.Front(); e != nil; {
247
258
                prev := e.Prev()
248
259
                entry := e.Value.(*entityEntry)
249
260
                if entry.creationRevno > revno {
256
267
                        // This an entity that has already been seen,
257
268
                        // so decrement its refCount, removing the entry if
258
269
                        // necessary.
259
 
                        all.decRef(entry, entityIdForInfo(aw.st, entry.info))
 
270
                        aw.all.decRef(entry, entityIdForInfo(aw.st, entry.info))
260
271
                }
261
272
                e = prev
262
273
        }
264
275
 
265
276
// leave is called when the given watcher leaves.  It decrements the reference
266
277
// counts of any entities that have been seen by the watcher.
267
 
func (aw *allWatcher) leave(all *allInfo, w *StateWatcher) {
268
 
        for e := all.list.Front(); e != nil; {
 
278
func (aw *allWatcher) leave(w *StateWatcher) {
 
279
        for e := aw.all.list.Front(); e != nil; {
269
280
                prev := e.Prev()
270
281
                entry := e.Value.(*entityEntry)
271
282
                if entry.creationRevno <= w.revno {
276
287
                                // so its refcount has already been decremented.
277
288
                                continue
278
289
                        }
279
 
                        all.decRef(entry, entityIdForInfo(aw.st, entry.info))
 
290
                        aw.all.decRef(entry, entityIdForInfo(aw.st, entry.info))
280
291
                }
281
292
                e = prev
282
293
        }
283
294
}
284
295
 
285
 
 
286
296
// getAllCollection fetches all the items in the given collection
287
297
// into the given slice.
288
 
func (all *allInfo) getAllCollection(st *State, c *mgo.Collection, into interface{}) error {
 
298
func (aw *allWatcher) getAllCollection(c *mgo.Collection, into interface{}) error {
289
299
        err := c.Find(nil).All(into)
290
300
        if err != nil {
291
301
                return fmt.Errorf("cannot get all %s: %v", c.Name, err)
293
303
        infos := reflect.ValueOf(into).Elem()
294
304
        for i := 0; i < infos.Len(); i++ {
295
305
                info := infos.Index(i).Addr().Interface().(params.EntityInfo)
296
 
                all.add(entityIdForInfo(st, info), info)
 
306
                aw.all.add(entityIdForInfo(aw.st, info), info)
297
307
        }
298
308
        return nil
299
309
}
300
310
 
301
311
// getAll retrieves information about all known entities in the state
302
312
// into the receiving allInfo.
303
 
func (all *allInfo) getAll(st *State) error {
 
313
func (aw *allWatcher) getAll() error {
304
314
        // TODO(rog) fetch collections concurrently?
305
 
        if err := all.getAllCollection(st, st.machines, new([]params.MachineInfo)); err != nil {
306
 
                return err
307
 
        }
308
 
        if err := all.getAllCollection(st, st.relations, new([]params.RelationInfo)); err != nil {
309
 
                return err
310
 
        }
311
 
        if err := all.getAllCollection(st, st.units, new([]params.UnitInfo)); err != nil {
312
 
                return err
313
 
        }
314
 
        if err := all.getAllCollection(st, st.services, new([]params.ServiceInfo)); err != nil {
 
315
        if err := aw.getAllCollection(aw.st.machines, new([]params.MachineInfo)); err != nil {
 
316
                return err
 
317
        }
 
318
        if err := aw.getAllCollection(aw.st.relations, new([]params.RelationInfo)); err != nil {
 
319
                return err
 
320
        }
 
321
        if err := aw.getAllCollection(aw.st.units, new([]params.UnitInfo)); err != nil {
 
322
                return err
 
323
        }
 
324
        if err := aw.getAllCollection(aw.st.services, new([]params.ServiceInfo)); err != nil {
315
325
                return err
316
326
        }
317
327
        return nil
318
328
}
319
329
 
320
 
 
321
330
// entityId holds the mongo identifier of an entity.
322
331
type entityId struct {
323
332
        collection string
470
479
                entry := e.Value.(*entityEntry)
471
480
                changes = append(changes, params.Delta{
472
481
                        Removed: entry.removed,
473
 
                        Entity: entry.info,
 
482
                        Entity:  entry.info,
474
483
                })
475
484
        }
476
485
        return changes
501
510
        }
502
511
        panic(fmt.Errorf("unknown entity type %T", i))
503
512
}
504