1
// Copyright 2013 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
13
"launchpad.net/juju-core/errors"
14
"launchpad.net/juju-core/state/api/params"
15
"launchpad.net/juju-core/state/multiwatcher"
16
"launchpad.net/juju-core/state/watcher"
19
// allWatcherStateBacking implements allWatcherBacking by
20
// fetching entities from the State.
21
type allWatcherStateBacking struct {
24
collectionByName map[string]allWatcherStateCollection
25
collectionByType map[reflect.Type]allWatcherStateCollection
28
type backingMachine machineDoc
30
func (m *backingMachine) updated(st *State, store *multiwatcher.Store, id interface{}) error {
31
info := ¶ms.MachineInfo{
34
oldInfo := store.Get(info.EntityId())
36
// We're adding the entry for the first time,
37
// so fetch the associated machine status.
38
sdoc, err := getStatus(st, machineGlobalKey(m.Id))
42
info.Status = sdoc.Status
43
info.StatusInfo = sdoc.StatusInfo
45
// The entry already exists, so preserve the current status and instance id.
46
oldInfo := oldInfo.(*params.MachineInfo)
47
info.Status = oldInfo.Status
48
info.StatusInfo = oldInfo.StatusInfo
49
info.InstanceId = oldInfo.InstanceId
51
// If the machine is been provisioned, fetch the instance id if required.
52
if m.Nonce != "" && info.InstanceId == "" {
53
instanceData, err := getInstanceData(st, m.Id)
55
info.InstanceId = string(instanceData.InstanceId)
56
} else if !errors.IsNotFoundError(err) {
64
func (svc *backingMachine) removed(st *State, store *multiwatcher.Store, id interface{}) error {
65
store.Remove(params.EntityId{
72
func (m *backingMachine) mongoId() interface{} {
76
type backingUnit unitDoc
78
func (u *backingUnit) updated(st *State, store *multiwatcher.Store, id interface{}) error {
79
info := ¶ms.UnitInfo{
83
PublicAddress: u.PublicAddress,
84
PrivateAddress: u.PrivateAddress,
85
MachineId: u.MachineId,
88
if u.CharmURL != nil {
89
info.CharmURL = u.CharmURL.String()
91
oldInfo := store.Get(info.EntityId())
93
// We're adding the entry for the first time,
94
// so fetch the associated unit status.
95
sdoc, err := getStatus(st, unitGlobalKey(u.Name))
99
info.Status = sdoc.Status
100
info.StatusInfo = sdoc.StatusInfo
102
// The entry already exists, so preserve the current status.
103
oldInfo := oldInfo.(*params.UnitInfo)
104
info.Status = oldInfo.Status
105
info.StatusInfo = oldInfo.StatusInfo
111
func (svc *backingUnit) removed(st *State, store *multiwatcher.Store, id interface{}) error {
112
store.Remove(params.EntityId{
119
func (m *backingUnit) mongoId() interface{} {
123
type backingService serviceDoc
125
func (svc *backingService) updated(st *State, store *multiwatcher.Store, id interface{}) error {
126
info := ¶ms.ServiceInfo{
128
Exposed: svc.Exposed,
129
CharmURL: svc.CharmURL.String(),
130
Life: params.Life(svc.Life.String()),
132
oldInfo := store.Get(info.EntityId())
135
// We're adding the entry for the first time,
136
// so fetch the associated child documents.
137
c, err := readConstraints(st, serviceGlobalKey(svc.Name))
144
// The entry already exists, so preserve the current status.
145
oldInfo := oldInfo.(*params.ServiceInfo)
146
info.Constraints = oldInfo.Constraints
147
if info.CharmURL == oldInfo.CharmURL {
148
// The charm URL remains the same - we can continue to
149
// use the same config settings.
150
info.Config = oldInfo.Config
152
// The charm URL has changed - we need to fetch the
153
// settings from the new charm's settings doc.
159
info.Config, _, err = readSettingsDoc(st, serviceSettingsKey(svc.Name, svc.CharmURL))
168
func (svc *backingService) removed(st *State, store *multiwatcher.Store, id interface{}) error {
169
store.Remove(params.EntityId{
176
func (m *backingService) mongoId() interface{} {
180
type backingRelation relationDoc
182
func (r *backingRelation) updated(st *State, store *multiwatcher.Store, id interface{}) error {
183
eps := make([]params.Endpoint, len(r.Endpoints))
184
for i, ep := range r.Endpoints {
185
eps[i] = params.Endpoint{
186
ServiceName: ep.ServiceName,
187
Relation: ep.Relation,
190
info := ¶ms.RelationInfo{
198
func (svc *backingRelation) removed(st *State, store *multiwatcher.Store, id interface{}) error {
199
store.Remove(params.EntityId{
206
func (m *backingRelation) mongoId() interface{} {
210
type backingAnnotation annotatorDoc
212
func (a *backingAnnotation) updated(st *State, store *multiwatcher.Store, id interface{}) error {
213
info := ¶ms.AnnotationInfo{
215
Annotations: a.Annotations,
221
func (svc *backingAnnotation) removed(st *State, store *multiwatcher.Store, id interface{}) error {
222
tag, ok := tagForGlobalKey(id.(string))
224
panic(fmt.Errorf("unknown global key %q in state", id))
226
store.Remove(params.EntityId{
233
func (a *backingAnnotation) mongoId() interface{} {
237
type backingStatus statusDoc
239
func (s *backingStatus) updated(st *State, store *multiwatcher.Store, id interface{}) error {
240
parentId, ok := backingEntityIdForGlobalKey(id.(string))
244
info0 := store.Get(parentId)
245
switch info := info0.(type) {
247
// The parent info doesn't exist. Ignore the status until it does.
249
case *params.UnitInfo:
251
newInfo.Status = s.Status
252
newInfo.StatusInfo = s.StatusInfo
254
case *params.MachineInfo:
256
newInfo.Status = s.Status
257
newInfo.StatusInfo = s.StatusInfo
260
panic(fmt.Errorf("status for unexpected entity with id %q; type %T", id, info))
266
func (s *backingStatus) removed(st *State, store *multiwatcher.Store, id interface{}) error {
267
// If the status is removed, the parent will follow not long after,
272
func (a *backingStatus) mongoId() interface{} {
273
panic("cannot find mongo id from status document")
276
type backingConstraints constraintsDoc
278
func (s *backingConstraints) updated(st *State, store *multiwatcher.Store, id interface{}) error {
279
parentId, ok := backingEntityIdForGlobalKey(id.(string))
283
info0 := store.Get(parentId)
284
switch info := info0.(type) {
286
// The parent info doesn't exist. Ignore the status until it does.
288
case *params.UnitInfo, *params.MachineInfo:
289
// We don't (yet) publish unit or machine constraints.
291
case *params.ServiceInfo:
293
newInfo.Constraints = constraintsDoc(*s).value()
296
panic(fmt.Errorf("status for unexpected entity with id %q; type %T", id, info))
302
func (s *backingConstraints) removed(st *State, store *multiwatcher.Store, id interface{}) error {
306
func (a *backingConstraints) mongoId() interface{} {
307
panic("cannot find mongo id from constraints document")
310
type backingSettings map[string]interface{}
312
func (s *backingSettings) updated(st *State, store *multiwatcher.Store, id interface{}) error {
313
parentId, url, ok := backingEntityIdForSettingsKey(id.(string))
317
info0 := store.Get(parentId)
318
switch info := info0.(type) {
320
// The parent info doesn't exist. Ignore the status until it does.
322
case *params.ServiceInfo:
323
// If we're seeing settings for the service with a different
324
// charm URL, we ignore them - we will fetch
325
// them again when the service charm changes.
326
// By doing this we make sure that the settings in the
327
// ServiceInfo are always consistent with the charm URL.
328
if info.CharmURL != url {
342
func (s *backingSettings) removed(st *State, store *multiwatcher.Store, id interface{}) error {
346
func (a *backingSettings) mongoId() interface{} {
347
panic("cannot find mongo id from settings document")
350
// backingEntityIdForSettingsKey returns the entity id for the given
351
// settings key. Any extra information in the key is returned in
353
func backingEntityIdForSettingsKey(key string) (eid params.EntityId, extra string, ok bool) {
354
if !strings.HasPrefix(key, "s#") {
355
eid, ok = backingEntityIdForGlobalKey(key)
359
i := strings.Index(key, "#")
361
return params.EntityId{}, "", false
363
eid = (¶ms.ServiceInfo{Name: key[0:i]}).EntityId()
369
// backingEntityIdForGlobalKey returns the entity id for the given global key.
370
// It returns false if the key is not recognized.
371
func backingEntityIdForGlobalKey(key string) (params.EntityId, bool) {
372
if len(key) < 3 || key[1] != '#' {
373
return params.EntityId{}, false
378
return (¶ms.MachineInfo{Id: id}).EntityId(), true
380
return (¶ms.UnitInfo{Name: id}).EntityId(), true
382
return (¶ms.ServiceInfo{Name: id}).EntityId(), true
384
return params.EntityId{}, false
387
// backingEntityDoc is implemented by the documents in
388
// collections that the allWatcherStateBacking watches.
389
type backingEntityDoc interface {
390
// updated is called when the document has changed.
391
// The mongo _id value of the document is provided in id.
392
updated(st *State, store *multiwatcher.Store, id interface{}) error
394
// removed is called when the document has changed.
395
// The receiving instance will not contain any data.
396
// The mongo _id value of the document is provided in id.
397
removed(st *State, store *multiwatcher.Store, id interface{}) error
399
// mongoId returns the mongo _id field of the document.
400
// It is currently never called for subsidiary documents.
401
mongoId() interface{}
405
_ backingEntityDoc = (*backingMachine)(nil)
406
_ backingEntityDoc = (*backingUnit)(nil)
407
_ backingEntityDoc = (*backingService)(nil)
408
_ backingEntityDoc = (*backingRelation)(nil)
409
_ backingEntityDoc = (*backingAnnotation)(nil)
410
_ backingEntityDoc = (*backingStatus)(nil)
411
_ backingEntityDoc = (*backingConstraints)(nil)
412
_ backingEntityDoc = (*backingSettings)(nil)
415
// allWatcherStateCollection holds information about a
416
// collection watched by an allWatcher and the
417
// type of value we use to store entity information
418
// for that collection.
419
type allWatcherStateCollection struct {
422
// infoSliceType stores the type of a slice of the info type
423
// that we use for this collection. In Go 1.1 we can change
424
// this to use the type itself, as we'll have reflect.SliceOf.
425
infoSliceType reflect.Type
426
// subsidiary is true if the collection is used only
427
// to modify a primary entity.
431
func newAllWatcherStateBacking(st *State) multiwatcher.Backing {
432
b := &allWatcherStateBacking{
434
collectionByName: make(map[string]allWatcherStateCollection),
435
collectionByType: make(map[reflect.Type]allWatcherStateCollection),
437
collections := []allWatcherStateCollection{{
438
Collection: st.machines,
439
infoSliceType: reflect.TypeOf([]backingMachine(nil)),
441
Collection: st.units,
442
infoSliceType: reflect.TypeOf([]backingUnit(nil)),
444
Collection: st.services,
445
infoSliceType: reflect.TypeOf([]backingService(nil)),
447
Collection: st.relations,
448
infoSliceType: reflect.TypeOf([]backingRelation(nil)),
450
Collection: st.annotations,
451
infoSliceType: reflect.TypeOf([]backingAnnotation(nil)),
453
Collection: st.statuses,
454
infoSliceType: reflect.TypeOf([]backingStatus(nil)),
457
Collection: st.constraints,
458
infoSliceType: reflect.TypeOf([]backingConstraints(nil)),
461
Collection: st.settings,
462
infoSliceType: reflect.TypeOf([]backingSettings(nil)),
465
// Populate the collection maps from the above set of collections.
466
for _, c := range collections {
467
docType := c.infoSliceType.Elem()
468
if _, ok := b.collectionByType[docType]; ok {
469
panic(fmt.Errorf("duplicate collection type %s", docType))
471
b.collectionByType[docType] = c
472
if _, ok := b.collectionByName[c.Name]; ok {
473
panic(fmt.Errorf("duplicate collection name %q", c.Name))
475
b.collectionByName[c.Name] = c
480
// Watch watches all the collections.
481
func (b *allWatcherStateBacking) Watch(in chan<- watcher.Change) {
482
for _, c := range b.collectionByName {
483
b.st.watcher.WatchCollection(c.Name, in)
487
// Unwatch unwatches all the collections.
488
func (b *allWatcherStateBacking) Unwatch(in chan<- watcher.Change) {
489
for _, c := range b.collectionByName {
490
b.st.watcher.UnwatchCollection(c.Name, in)
494
// GetAll fetches all items that we want to watch from the state.
495
func (b *allWatcherStateBacking) GetAll(all *multiwatcher.Store) error {
496
// TODO(rog) fetch collections concurrently?
497
for _, c := range b.collectionByName {
501
infoSlicePtr := reflect.New(c.infoSliceType)
502
if err := c.Find(nil).All(infoSlicePtr.Interface()); err != nil {
503
return fmt.Errorf("cannot get all %s: %v", c.Name, err)
505
infos := infoSlicePtr.Elem()
506
for i := 0; i < infos.Len(); i++ {
507
info := infos.Index(i).Addr().Interface().(backingEntityDoc)
508
info.updated(b.st, all, info.mongoId())
514
// Changed updates the allWatcher's idea of the current state
515
// in response to the given change.
516
func (b *allWatcherStateBacking) Changed(all *multiwatcher.Store, change watcher.Change) error {
517
c, ok := b.collectionByName[change.C]
519
panic(fmt.Errorf("unknown collection %q in fetch request", change.C))
521
doc := reflect.New(c.infoSliceType.Elem()).Interface().(backingEntityDoc)
522
// TODO(rog) investigate ways that this can be made more efficient
523
// than simply fetching each entity in turn.
524
// TODO(rog) avoid fetching documents that we have no interest
525
// in, such as settings changes to entities we don't care about.
526
err := c.FindId(change.Id).One(doc)
527
if err == mgo.ErrNotFound {
528
return doc.removed(b.st, all, change.Id)
533
return doc.updated(b.st, all, change.Id)