~rogpeppe/juju-core/azure

« back to all changes in this revision

Viewing changes to state/megawatcher.go

  • Committer: Gustavo Niemeyer
  • Date: 2011-12-20 18:59:17 UTC
  • mto: This revision was merged to the branch mainline in revision 34.
  • Revision ID: gustavo@niemeyer.net-20111220185917-erfh4gkuk5w2gsu1
store: use log package

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
// Copyright 2013 Canonical Ltd.
2
 
// Licensed under the AGPLv3, see LICENCE file for details.
3
 
 
4
 
package state
5
 
 
6
 
import (
7
 
        "fmt"
8
 
        "reflect"
9
 
        "strings"
10
 
 
11
 
        "labix.org/v2/mgo"
12
 
 
13
 
        "launchpad.net/juju-core/errors"
14
 
        "launchpad.net/juju-core/state/api/params"
15
 
        "launchpad.net/juju-core/state/multiwatcher"
16
 
        "launchpad.net/juju-core/state/watcher"
17
 
)
18
 
 
19
 
// allWatcherStateBacking implements allWatcherBacking by
20
 
// fetching entities from the State.
21
 
type allWatcherStateBacking struct {
22
 
        st *State
23
 
        // collections
24
 
        collectionByName map[string]allWatcherStateCollection
25
 
        collectionByType map[reflect.Type]allWatcherStateCollection
26
 
}
27
 
 
28
 
type backingMachine machineDoc
29
 
 
30
 
func (m *backingMachine) updated(st *State, store *multiwatcher.Store, id interface{}) error {
31
 
        info := &params.MachineInfo{
32
 
                Id: m.Id,
33
 
        }
34
 
        oldInfo := store.Get(info.EntityId())
35
 
        if oldInfo == nil {
36
 
                // We're adding the entry for the first time,
37
 
                // so fetch the associated machine status.
38
 
                sdoc, err := getStatus(st, machineGlobalKey(m.Id))
39
 
                if err != nil {
40
 
                        return err
41
 
                }
42
 
                info.Status = sdoc.Status
43
 
                info.StatusInfo = sdoc.StatusInfo
44
 
        } else {
45
 
                // The entry already exists, so preserve the current status and instance id.
46
 
                oldInfo := oldInfo.(*params.MachineInfo)
47
 
                info.Status = oldInfo.Status
48
 
                info.StatusInfo = oldInfo.StatusInfo
49
 
                info.InstanceId = oldInfo.InstanceId
50
 
        }
51
 
        // If the machine is been provisioned, fetch the instance id if required.
52
 
        if m.Nonce != "" && info.InstanceId == "" {
53
 
                instanceData, err := getInstanceData(st, m.Id)
54
 
                if err == nil {
55
 
                        info.InstanceId = string(instanceData.InstanceId)
56
 
                } else if !errors.IsNotFoundError(err) {
57
 
                        return err
58
 
                }
59
 
        }
60
 
        store.Update(info)
61
 
        return nil
62
 
}
63
 
 
64
 
func (svc *backingMachine) removed(st *State, store *multiwatcher.Store, id interface{}) error {
65
 
        store.Remove(params.EntityId{
66
 
                Kind: "machine",
67
 
                Id:   id,
68
 
        })
69
 
        return nil
70
 
}
71
 
 
72
 
func (m *backingMachine) mongoId() interface{} {
73
 
        return m.Id
74
 
}
75
 
 
76
 
type backingUnit unitDoc
77
 
 
78
 
func (u *backingUnit) updated(st *State, store *multiwatcher.Store, id interface{}) error {
79
 
        info := &params.UnitInfo{
80
 
                Name:           u.Name,
81
 
                Service:        u.Service,
82
 
                Series:         u.Series,
83
 
                PublicAddress:  u.PublicAddress,
84
 
                PrivateAddress: u.PrivateAddress,
85
 
                MachineId:      u.MachineId,
86
 
                Ports:          u.Ports,
87
 
        }
88
 
        if u.CharmURL != nil {
89
 
                info.CharmURL = u.CharmURL.String()
90
 
        }
91
 
        oldInfo := store.Get(info.EntityId())
92
 
        if oldInfo == nil {
93
 
                // We're adding the entry for the first time,
94
 
                // so fetch the associated unit status.
95
 
                sdoc, err := getStatus(st, unitGlobalKey(u.Name))
96
 
                if err != nil {
97
 
                        return err
98
 
                }
99
 
                info.Status = sdoc.Status
100
 
                info.StatusInfo = sdoc.StatusInfo
101
 
        } else {
102
 
                // The entry already exists, so preserve the current status.
103
 
                oldInfo := oldInfo.(*params.UnitInfo)
104
 
                info.Status = oldInfo.Status
105
 
                info.StatusInfo = oldInfo.StatusInfo
106
 
        }
107
 
        store.Update(info)
108
 
        return nil
109
 
}
110
 
 
111
 
func (svc *backingUnit) removed(st *State, store *multiwatcher.Store, id interface{}) error {
112
 
        store.Remove(params.EntityId{
113
 
                Kind: "unit",
114
 
                Id:   id,
115
 
        })
116
 
        return nil
117
 
}
118
 
 
119
 
func (m *backingUnit) mongoId() interface{} {
120
 
        return m.Name
121
 
}
122
 
 
123
 
type backingService serviceDoc
124
 
 
125
 
func (svc *backingService) updated(st *State, store *multiwatcher.Store, id interface{}) error {
126
 
        info := &params.ServiceInfo{
127
 
                Name:     svc.Name,
128
 
                Exposed:  svc.Exposed,
129
 
                CharmURL: svc.CharmURL.String(),
130
 
                Life:     params.Life(svc.Life.String()),
131
 
        }
132
 
        oldInfo := store.Get(info.EntityId())
133
 
        needConfig := false
134
 
        if oldInfo == nil {
135
 
                // We're adding the entry for the first time,
136
 
                // so fetch the associated child documents.
137
 
                c, err := readConstraints(st, serviceGlobalKey(svc.Name))
138
 
                if err != nil {
139
 
                        return err
140
 
                }
141
 
                info.Constraints = c
142
 
                needConfig = true
143
 
        } else {
144
 
                // The entry already exists, so preserve the current status.
145
 
                oldInfo := oldInfo.(*params.ServiceInfo)
146
 
                info.Constraints = oldInfo.Constraints
147
 
                if info.CharmURL == oldInfo.CharmURL {
148
 
                        // The charm URL remains the same - we can continue to
149
 
                        // use the same config settings.
150
 
                        info.Config = oldInfo.Config
151
 
                } else {
152
 
                        // The charm URL has changed - we need to fetch the
153
 
                        // settings from the new charm's settings doc.
154
 
                        needConfig = true
155
 
                }
156
 
        }
157
 
        if needConfig {
158
 
                var err error
159
 
                info.Config, _, err = readSettingsDoc(st, serviceSettingsKey(svc.Name, svc.CharmURL))
160
 
                if err != nil {
161
 
                        return err
162
 
                }
163
 
        }
164
 
        store.Update(info)
165
 
        return nil
166
 
}
167
 
 
168
 
func (svc *backingService) removed(st *State, store *multiwatcher.Store, id interface{}) error {
169
 
        store.Remove(params.EntityId{
170
 
                Kind: "service",
171
 
                Id:   id,
172
 
        })
173
 
        return nil
174
 
}
175
 
 
176
 
func (m *backingService) mongoId() interface{} {
177
 
        return m.Name
178
 
}
179
 
 
180
 
type backingRelation relationDoc
181
 
 
182
 
func (r *backingRelation) updated(st *State, store *multiwatcher.Store, id interface{}) error {
183
 
        eps := make([]params.Endpoint, len(r.Endpoints))
184
 
        for i, ep := range r.Endpoints {
185
 
                eps[i] = params.Endpoint{
186
 
                        ServiceName: ep.ServiceName,
187
 
                        Relation:    ep.Relation,
188
 
                }
189
 
        }
190
 
        info := &params.RelationInfo{
191
 
                Key:       r.Key,
192
 
                Endpoints: eps,
193
 
        }
194
 
        store.Update(info)
195
 
        return nil
196
 
}
197
 
 
198
 
func (svc *backingRelation) removed(st *State, store *multiwatcher.Store, id interface{}) error {
199
 
        store.Remove(params.EntityId{
200
 
                Kind: "relation",
201
 
                Id:   id,
202
 
        })
203
 
        return nil
204
 
}
205
 
 
206
 
func (m *backingRelation) mongoId() interface{} {
207
 
        return m.Key
208
 
}
209
 
 
210
 
type backingAnnotation annotatorDoc
211
 
 
212
 
func (a *backingAnnotation) updated(st *State, store *multiwatcher.Store, id interface{}) error {
213
 
        info := &params.AnnotationInfo{
214
 
                Tag:         a.Tag,
215
 
                Annotations: a.Annotations,
216
 
        }
217
 
        store.Update(info)
218
 
        return nil
219
 
}
220
 
 
221
 
func (svc *backingAnnotation) removed(st *State, store *multiwatcher.Store, id interface{}) error {
222
 
        tag, ok := tagForGlobalKey(id.(string))
223
 
        if !ok {
224
 
                panic(fmt.Errorf("unknown global key %q in state", id))
225
 
        }
226
 
        store.Remove(params.EntityId{
227
 
                Kind: "annotation",
228
 
                Id:   tag,
229
 
        })
230
 
        return nil
231
 
}
232
 
 
233
 
func (a *backingAnnotation) mongoId() interface{} {
234
 
        return a.GlobalKey
235
 
}
236
 
 
237
 
type backingStatus statusDoc
238
 
 
239
 
func (s *backingStatus) updated(st *State, store *multiwatcher.Store, id interface{}) error {
240
 
        parentId, ok := backingEntityIdForGlobalKey(id.(string))
241
 
        if !ok {
242
 
                return nil
243
 
        }
244
 
        info0 := store.Get(parentId)
245
 
        switch info := info0.(type) {
246
 
        case nil:
247
 
                // The parent info doesn't exist. Ignore the status until it does.
248
 
                return nil
249
 
        case *params.UnitInfo:
250
 
                newInfo := *info
251
 
                newInfo.Status = s.Status
252
 
                newInfo.StatusInfo = s.StatusInfo
253
 
                info0 = &newInfo
254
 
        case *params.MachineInfo:
255
 
                newInfo := *info
256
 
                newInfo.Status = s.Status
257
 
                newInfo.StatusInfo = s.StatusInfo
258
 
                info0 = &newInfo
259
 
        default:
260
 
                panic(fmt.Errorf("status for unexpected entity with id %q; type %T", id, info))
261
 
        }
262
 
        store.Update(info0)
263
 
        return nil
264
 
}
265
 
 
266
 
func (s *backingStatus) removed(st *State, store *multiwatcher.Store, id interface{}) error {
267
 
        // If the status is removed, the parent will follow not long after,
268
 
        // so do nothing.
269
 
        return nil
270
 
}
271
 
 
272
 
func (a *backingStatus) mongoId() interface{} {
273
 
        panic("cannot find mongo id from status document")
274
 
}
275
 
 
276
 
type backingConstraints constraintsDoc
277
 
 
278
 
func (s *backingConstraints) updated(st *State, store *multiwatcher.Store, id interface{}) error {
279
 
        parentId, ok := backingEntityIdForGlobalKey(id.(string))
280
 
        if !ok {
281
 
                return nil
282
 
        }
283
 
        info0 := store.Get(parentId)
284
 
        switch info := info0.(type) {
285
 
        case nil:
286
 
                // The parent info doesn't exist. Ignore the status until it does.
287
 
                return nil
288
 
        case *params.UnitInfo, *params.MachineInfo:
289
 
                // We don't (yet) publish unit or machine constraints.
290
 
                return nil
291
 
        case *params.ServiceInfo:
292
 
                newInfo := *info
293
 
                newInfo.Constraints = constraintsDoc(*s).value()
294
 
                info0 = &newInfo
295
 
        default:
296
 
                panic(fmt.Errorf("status for unexpected entity with id %q; type %T", id, info))
297
 
        }
298
 
        store.Update(info0)
299
 
        return nil
300
 
}
301
 
 
302
 
func (s *backingConstraints) removed(st *State, store *multiwatcher.Store, id interface{}) error {
303
 
        return nil
304
 
}
305
 
 
306
 
func (a *backingConstraints) mongoId() interface{} {
307
 
        panic("cannot find mongo id from constraints document")
308
 
}
309
 
 
310
 
type backingSettings map[string]interface{}
311
 
 
312
 
func (s *backingSettings) updated(st *State, store *multiwatcher.Store, id interface{}) error {
313
 
        parentId, url, ok := backingEntityIdForSettingsKey(id.(string))
314
 
        if !ok {
315
 
                return nil
316
 
        }
317
 
        info0 := store.Get(parentId)
318
 
        switch info := info0.(type) {
319
 
        case nil:
320
 
                // The parent info doesn't exist. Ignore the status until it does.
321
 
                return nil
322
 
        case *params.ServiceInfo:
323
 
                // If we're seeing settings for the service with a different
324
 
                // charm URL, we ignore them - we will fetch
325
 
                // them again when the service charm changes.
326
 
                // By doing this we make sure that the settings in the
327
 
                // ServiceInfo are always consistent with the charm URL.
328
 
                if info.CharmURL != url {
329
 
                        break
330
 
                }
331
 
                newInfo := *info
332
 
                cleanSettingsMap(*s)
333
 
                newInfo.Config = *s
334
 
                info0 = &newInfo
335
 
        default:
336
 
                return nil
337
 
        }
338
 
        store.Update(info0)
339
 
        return nil
340
 
}
341
 
 
342
 
func (s *backingSettings) removed(st *State, store *multiwatcher.Store, id interface{}) error {
343
 
        return nil
344
 
}
345
 
 
346
 
func (a *backingSettings) mongoId() interface{} {
347
 
        panic("cannot find mongo id from settings document")
348
 
}
349
 
 
350
 
// backingEntityIdForSettingsKey returns the entity id for the given
351
 
// settings key. Any extra information in the key is returned in
352
 
// extra.
353
 
func backingEntityIdForSettingsKey(key string) (eid params.EntityId, extra string, ok bool) {
354
 
        if !strings.HasPrefix(key, "s#") {
355
 
                eid, ok = backingEntityIdForGlobalKey(key)
356
 
                return
357
 
        }
358
 
        key = key[2:]
359
 
        i := strings.Index(key, "#")
360
 
        if i == -1 {
361
 
                return params.EntityId{}, "", false
362
 
        }
363
 
        eid = (&params.ServiceInfo{Name: key[0:i]}).EntityId()
364
 
        extra = key[i+1:]
365
 
        ok = true
366
 
        return
367
 
}
368
 
 
369
 
// backingEntityIdForGlobalKey returns the entity id for the given global key.
370
 
// It returns false if the key is not recognized.
371
 
func backingEntityIdForGlobalKey(key string) (params.EntityId, bool) {
372
 
        if len(key) < 3 || key[1] != '#' {
373
 
                return params.EntityId{}, false
374
 
        }
375
 
        id := key[2:]
376
 
        switch key[0] {
377
 
        case 'm':
378
 
                return (&params.MachineInfo{Id: id}).EntityId(), true
379
 
        case 'u':
380
 
                return (&params.UnitInfo{Name: id}).EntityId(), true
381
 
        case 's':
382
 
                return (&params.ServiceInfo{Name: id}).EntityId(), true
383
 
        }
384
 
        return params.EntityId{}, false
385
 
}
386
 
 
387
 
// backingEntityDoc is implemented by the documents in
388
 
// collections that the allWatcherStateBacking watches.
389
 
type backingEntityDoc interface {
390
 
        // updated is called when the document has changed.
391
 
        // The mongo _id value of the document is provided in id.
392
 
        updated(st *State, store *multiwatcher.Store, id interface{}) error
393
 
 
394
 
        // removed is called when the document has changed.
395
 
        // The receiving instance will not contain any data.
396
 
        // The mongo _id value of the document is provided in id.
397
 
        removed(st *State, store *multiwatcher.Store, id interface{}) error
398
 
 
399
 
        // mongoId returns the mongo _id field of the document.
400
 
        // It is currently never called for subsidiary documents.
401
 
        mongoId() interface{}
402
 
}
403
 
 
404
 
var (
405
 
        _ backingEntityDoc = (*backingMachine)(nil)
406
 
        _ backingEntityDoc = (*backingUnit)(nil)
407
 
        _ backingEntityDoc = (*backingService)(nil)
408
 
        _ backingEntityDoc = (*backingRelation)(nil)
409
 
        _ backingEntityDoc = (*backingAnnotation)(nil)
410
 
        _ backingEntityDoc = (*backingStatus)(nil)
411
 
        _ backingEntityDoc = (*backingConstraints)(nil)
412
 
        _ backingEntityDoc = (*backingSettings)(nil)
413
 
)
414
 
 
415
 
// allWatcherStateCollection holds information about a
416
 
// collection watched by an allWatcher and the
417
 
// type of value we use to store entity information
418
 
// for that collection.
419
 
type allWatcherStateCollection struct {
420
 
        *mgo.Collection
421
 
 
422
 
        // infoSliceType stores the type of a slice of the info type
423
 
        // that we use for this collection.  In Go 1.1 we can change
424
 
        // this to use the type itself, as we'll have reflect.SliceOf.
425
 
        infoSliceType reflect.Type
426
 
        // subsidiary is true if the collection is used only
427
 
        // to modify a primary entity.
428
 
        subsidiary bool
429
 
}
430
 
 
431
 
func newAllWatcherStateBacking(st *State) multiwatcher.Backing {
432
 
        b := &allWatcherStateBacking{
433
 
                st:               st,
434
 
                collectionByName: make(map[string]allWatcherStateCollection),
435
 
                collectionByType: make(map[reflect.Type]allWatcherStateCollection),
436
 
        }
437
 
        collections := []allWatcherStateCollection{{
438
 
                Collection:    st.machines,
439
 
                infoSliceType: reflect.TypeOf([]backingMachine(nil)),
440
 
        }, {
441
 
                Collection:    st.units,
442
 
                infoSliceType: reflect.TypeOf([]backingUnit(nil)),
443
 
        }, {
444
 
                Collection:    st.services,
445
 
                infoSliceType: reflect.TypeOf([]backingService(nil)),
446
 
        }, {
447
 
                Collection:    st.relations,
448
 
                infoSliceType: reflect.TypeOf([]backingRelation(nil)),
449
 
        }, {
450
 
                Collection:    st.annotations,
451
 
                infoSliceType: reflect.TypeOf([]backingAnnotation(nil)),
452
 
        }, {
453
 
                Collection:    st.statuses,
454
 
                infoSliceType: reflect.TypeOf([]backingStatus(nil)),
455
 
                subsidiary:    true,
456
 
        }, {
457
 
                Collection:    st.constraints,
458
 
                infoSliceType: reflect.TypeOf([]backingConstraints(nil)),
459
 
                subsidiary:    true,
460
 
        }, {
461
 
                Collection:    st.settings,
462
 
                infoSliceType: reflect.TypeOf([]backingSettings(nil)),
463
 
                subsidiary:    true,
464
 
        }}
465
 
        // Populate the collection maps from the above set of collections.
466
 
        for _, c := range collections {
467
 
                docType := c.infoSliceType.Elem()
468
 
                if _, ok := b.collectionByType[docType]; ok {
469
 
                        panic(fmt.Errorf("duplicate collection type %s", docType))
470
 
                }
471
 
                b.collectionByType[docType] = c
472
 
                if _, ok := b.collectionByName[c.Name]; ok {
473
 
                        panic(fmt.Errorf("duplicate collection name %q", c.Name))
474
 
                }
475
 
                b.collectionByName[c.Name] = c
476
 
        }
477
 
        return b
478
 
}
479
 
 
480
 
// Watch watches all the collections.
481
 
func (b *allWatcherStateBacking) Watch(in chan<- watcher.Change) {
482
 
        for _, c := range b.collectionByName {
483
 
                b.st.watcher.WatchCollection(c.Name, in)
484
 
        }
485
 
}
486
 
 
487
 
// Unwatch unwatches all the collections.
488
 
func (b *allWatcherStateBacking) Unwatch(in chan<- watcher.Change) {
489
 
        for _, c := range b.collectionByName {
490
 
                b.st.watcher.UnwatchCollection(c.Name, in)
491
 
        }
492
 
}
493
 
 
494
 
// GetAll fetches all items that we want to watch from the state.
495
 
func (b *allWatcherStateBacking) GetAll(all *multiwatcher.Store) error {
496
 
        // TODO(rog) fetch collections concurrently?
497
 
        for _, c := range b.collectionByName {
498
 
                if c.subsidiary {
499
 
                        continue
500
 
                }
501
 
                infoSlicePtr := reflect.New(c.infoSliceType)
502
 
                if err := c.Find(nil).All(infoSlicePtr.Interface()); err != nil {
503
 
                        return fmt.Errorf("cannot get all %s: %v", c.Name, err)
504
 
                }
505
 
                infos := infoSlicePtr.Elem()
506
 
                for i := 0; i < infos.Len(); i++ {
507
 
                        info := infos.Index(i).Addr().Interface().(backingEntityDoc)
508
 
                        info.updated(b.st, all, info.mongoId())
509
 
                }
510
 
        }
511
 
        return nil
512
 
}
513
 
 
514
 
// Changed updates the allWatcher's idea of the current state
515
 
// in response to the given change.
516
 
func (b *allWatcherStateBacking) Changed(all *multiwatcher.Store, change watcher.Change) error {
517
 
        c, ok := b.collectionByName[change.C]
518
 
        if !ok {
519
 
                panic(fmt.Errorf("unknown collection %q in fetch request", change.C))
520
 
        }
521
 
        doc := reflect.New(c.infoSliceType.Elem()).Interface().(backingEntityDoc)
522
 
        // TODO(rog) investigate ways that this can be made more efficient
523
 
        // than simply fetching each entity in turn.
524
 
        // TODO(rog) avoid fetching documents that we have no interest
525
 
        // in, such as settings changes to entities we don't care about.
526
 
        err := c.FindId(change.Id).One(doc)
527
 
        if err == mgo.ErrNotFound {
528
 
                return doc.removed(b.st, all, change.Id)
529
 
        }
530
 
        if err != nil {
531
 
                return err
532
 
        }
533
 
        return doc.updated(b.st, all, change.Id)
534
 
}