~rogpeppe/juju-core/azure

« back to all changes in this revision

Viewing changes to state/megawatcher.go

[r=gz] Check correct info attr in cloudinit.apiHostAddrs

Fixes a copy/paste error in cloudinit.MachineConfig to agent.Conf
conversion helper function. No test as the only codepath that uses
this goes through verifyConfig which already asserts both StateInfo
and APIInfo are not nil, so not possible to write a failing test
using exposed functions.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2013 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package state
 
5
 
 
6
import (
 
7
        "fmt"
 
8
        "reflect"
 
9
        "strings"
 
10
 
 
11
        "labix.org/v2/mgo"
 
12
 
 
13
        "launchpad.net/juju-core/errors"
 
14
        "launchpad.net/juju-core/state/api/params"
 
15
        "launchpad.net/juju-core/state/multiwatcher"
 
16
        "launchpad.net/juju-core/state/watcher"
 
17
)
 
18
 
 
19
// allWatcherStateBacking implements allWatcherBacking by
 
20
// fetching entities from the State.
 
21
type allWatcherStateBacking struct {
 
22
        st *State
 
23
        // collections
 
24
        collectionByName map[string]allWatcherStateCollection
 
25
        collectionByType map[reflect.Type]allWatcherStateCollection
 
26
}
 
27
 
 
28
type backingMachine machineDoc
 
29
 
 
30
func (m *backingMachine) updated(st *State, store *multiwatcher.Store, id interface{}) error {
 
31
        info := &params.MachineInfo{
 
32
                Id: m.Id,
 
33
        }
 
34
        oldInfo := store.Get(info.EntityId())
 
35
        if oldInfo == nil {
 
36
                // We're adding the entry for the first time,
 
37
                // so fetch the associated machine status.
 
38
                sdoc, err := getStatus(st, machineGlobalKey(m.Id))
 
39
                if err != nil {
 
40
                        return err
 
41
                }
 
42
                info.Status = sdoc.Status
 
43
                info.StatusInfo = sdoc.StatusInfo
 
44
        } else {
 
45
                // The entry already exists, so preserve the current status and instance id.
 
46
                oldInfo := oldInfo.(*params.MachineInfo)
 
47
                info.Status = oldInfo.Status
 
48
                info.StatusInfo = oldInfo.StatusInfo
 
49
                info.InstanceId = oldInfo.InstanceId
 
50
        }
 
51
        // If the machine is been provisioned, fetch the instance id if required.
 
52
        if m.Nonce != "" && info.InstanceId == "" {
 
53
                instanceData, err := getInstanceData(st, m.Id)
 
54
                if err == nil {
 
55
                        info.InstanceId = string(instanceData.InstanceId)
 
56
                } else if !errors.IsNotFoundError(err) {
 
57
                        return err
 
58
                }
 
59
        }
 
60
        store.Update(info)
 
61
        return nil
 
62
}
 
63
 
 
64
func (svc *backingMachine) removed(st *State, store *multiwatcher.Store, id interface{}) error {
 
65
        store.Remove(params.EntityId{
 
66
                Kind: "machine",
 
67
                Id:   id,
 
68
        })
 
69
        return nil
 
70
}
 
71
 
 
72
func (m *backingMachine) mongoId() interface{} {
 
73
        return m.Id
 
74
}
 
75
 
 
76
type backingUnit unitDoc
 
77
 
 
78
func (u *backingUnit) updated(st *State, store *multiwatcher.Store, id interface{}) error {
 
79
        info := &params.UnitInfo{
 
80
                Name:           u.Name,
 
81
                Service:        u.Service,
 
82
                Series:         u.Series,
 
83
                PublicAddress:  u.PublicAddress,
 
84
                PrivateAddress: u.PrivateAddress,
 
85
                MachineId:      u.MachineId,
 
86
                Ports:          u.Ports,
 
87
        }
 
88
        if u.CharmURL != nil {
 
89
                info.CharmURL = u.CharmURL.String()
 
90
        }
 
91
        oldInfo := store.Get(info.EntityId())
 
92
        if oldInfo == nil {
 
93
                // We're adding the entry for the first time,
 
94
                // so fetch the associated unit status.
 
95
                sdoc, err := getStatus(st, unitGlobalKey(u.Name))
 
96
                if err != nil {
 
97
                        return err
 
98
                }
 
99
                info.Status = sdoc.Status
 
100
                info.StatusInfo = sdoc.StatusInfo
 
101
        } else {
 
102
                // The entry already exists, so preserve the current status.
 
103
                oldInfo := oldInfo.(*params.UnitInfo)
 
104
                info.Status = oldInfo.Status
 
105
                info.StatusInfo = oldInfo.StatusInfo
 
106
        }
 
107
        store.Update(info)
 
108
        return nil
 
109
}
 
110
 
 
111
func (svc *backingUnit) removed(st *State, store *multiwatcher.Store, id interface{}) error {
 
112
        store.Remove(params.EntityId{
 
113
                Kind: "unit",
 
114
                Id:   id,
 
115
        })
 
116
        return nil
 
117
}
 
118
 
 
119
func (m *backingUnit) mongoId() interface{} {
 
120
        return m.Name
 
121
}
 
122
 
 
123
type backingService serviceDoc
 
124
 
 
125
func (svc *backingService) updated(st *State, store *multiwatcher.Store, id interface{}) error {
 
126
        info := &params.ServiceInfo{
 
127
                Name:     svc.Name,
 
128
                Exposed:  svc.Exposed,
 
129
                CharmURL: svc.CharmURL.String(),
 
130
                Life:     params.Life(svc.Life.String()),
 
131
        }
 
132
        oldInfo := store.Get(info.EntityId())
 
133
        needConfig := false
 
134
        if oldInfo == nil {
 
135
                // We're adding the entry for the first time,
 
136
                // so fetch the associated child documents.
 
137
                c, err := readConstraints(st, serviceGlobalKey(svc.Name))
 
138
                if err != nil {
 
139
                        return err
 
140
                }
 
141
                info.Constraints = c
 
142
                needConfig = true
 
143
        } else {
 
144
                // The entry already exists, so preserve the current status.
 
145
                oldInfo := oldInfo.(*params.ServiceInfo)
 
146
                info.Constraints = oldInfo.Constraints
 
147
                if info.CharmURL == oldInfo.CharmURL {
 
148
                        // The charm URL remains the same - we can continue to
 
149
                        // use the same config settings.
 
150
                        info.Config = oldInfo.Config
 
151
                } else {
 
152
                        // The charm URL has changed - we need to fetch the
 
153
                        // settings from the new charm's settings doc.
 
154
                        needConfig = true
 
155
                }
 
156
        }
 
157
        if needConfig {
 
158
                var err error
 
159
                info.Config, _, err = readSettingsDoc(st, serviceSettingsKey(svc.Name, svc.CharmURL))
 
160
                if err != nil {
 
161
                        return err
 
162
                }
 
163
        }
 
164
        store.Update(info)
 
165
        return nil
 
166
}
 
167
 
 
168
func (svc *backingService) removed(st *State, store *multiwatcher.Store, id interface{}) error {
 
169
        store.Remove(params.EntityId{
 
170
                Kind: "service",
 
171
                Id:   id,
 
172
        })
 
173
        return nil
 
174
}
 
175
 
 
176
func (m *backingService) mongoId() interface{} {
 
177
        return m.Name
 
178
}
 
179
 
 
180
type backingRelation relationDoc
 
181
 
 
182
func (r *backingRelation) updated(st *State, store *multiwatcher.Store, id interface{}) error {
 
183
        eps := make([]params.Endpoint, len(r.Endpoints))
 
184
        for i, ep := range r.Endpoints {
 
185
                eps[i] = params.Endpoint{
 
186
                        ServiceName: ep.ServiceName,
 
187
                        Relation:    ep.Relation,
 
188
                }
 
189
        }
 
190
        info := &params.RelationInfo{
 
191
                Key:       r.Key,
 
192
                Endpoints: eps,
 
193
        }
 
194
        store.Update(info)
 
195
        return nil
 
196
}
 
197
 
 
198
func (svc *backingRelation) removed(st *State, store *multiwatcher.Store, id interface{}) error {
 
199
        store.Remove(params.EntityId{
 
200
                Kind: "relation",
 
201
                Id:   id,
 
202
        })
 
203
        return nil
 
204
}
 
205
 
 
206
func (m *backingRelation) mongoId() interface{} {
 
207
        return m.Key
 
208
}
 
209
 
 
210
type backingAnnotation annotatorDoc
 
211
 
 
212
func (a *backingAnnotation) updated(st *State, store *multiwatcher.Store, id interface{}) error {
 
213
        info := &params.AnnotationInfo{
 
214
                Tag:         a.Tag,
 
215
                Annotations: a.Annotations,
 
216
        }
 
217
        store.Update(info)
 
218
        return nil
 
219
}
 
220
 
 
221
func (svc *backingAnnotation) removed(st *State, store *multiwatcher.Store, id interface{}) error {
 
222
        tag, ok := tagForGlobalKey(id.(string))
 
223
        if !ok {
 
224
                panic(fmt.Errorf("unknown global key %q in state", id))
 
225
        }
 
226
        store.Remove(params.EntityId{
 
227
                Kind: "annotation",
 
228
                Id:   tag,
 
229
        })
 
230
        return nil
 
231
}
 
232
 
 
233
func (a *backingAnnotation) mongoId() interface{} {
 
234
        return a.GlobalKey
 
235
}
 
236
 
 
237
type backingStatus statusDoc
 
238
 
 
239
func (s *backingStatus) updated(st *State, store *multiwatcher.Store, id interface{}) error {
 
240
        parentId, ok := backingEntityIdForGlobalKey(id.(string))
 
241
        if !ok {
 
242
                return nil
 
243
        }
 
244
        info0 := store.Get(parentId)
 
245
        switch info := info0.(type) {
 
246
        case nil:
 
247
                // The parent info doesn't exist. Ignore the status until it does.
 
248
                return nil
 
249
        case *params.UnitInfo:
 
250
                newInfo := *info
 
251
                newInfo.Status = s.Status
 
252
                newInfo.StatusInfo = s.StatusInfo
 
253
                info0 = &newInfo
 
254
        case *params.MachineInfo:
 
255
                newInfo := *info
 
256
                newInfo.Status = s.Status
 
257
                newInfo.StatusInfo = s.StatusInfo
 
258
                info0 = &newInfo
 
259
        default:
 
260
                panic(fmt.Errorf("status for unexpected entity with id %q; type %T", id, info))
 
261
        }
 
262
        store.Update(info0)
 
263
        return nil
 
264
}
 
265
 
 
266
func (s *backingStatus) removed(st *State, store *multiwatcher.Store, id interface{}) error {
 
267
        // If the status is removed, the parent will follow not long after,
 
268
        // so do nothing.
 
269
        return nil
 
270
}
 
271
 
 
272
func (a *backingStatus) mongoId() interface{} {
 
273
        panic("cannot find mongo id from status document")
 
274
}
 
275
 
 
276
type backingConstraints constraintsDoc
 
277
 
 
278
func (s *backingConstraints) updated(st *State, store *multiwatcher.Store, id interface{}) error {
 
279
        parentId, ok := backingEntityIdForGlobalKey(id.(string))
 
280
        if !ok {
 
281
                return nil
 
282
        }
 
283
        info0 := store.Get(parentId)
 
284
        switch info := info0.(type) {
 
285
        case nil:
 
286
                // The parent info doesn't exist. Ignore the status until it does.
 
287
                return nil
 
288
        case *params.UnitInfo, *params.MachineInfo:
 
289
                // We don't (yet) publish unit or machine constraints.
 
290
                return nil
 
291
        case *params.ServiceInfo:
 
292
                newInfo := *info
 
293
                newInfo.Constraints = constraintsDoc(*s).value()
 
294
                info0 = &newInfo
 
295
        default:
 
296
                panic(fmt.Errorf("status for unexpected entity with id %q; type %T", id, info))
 
297
        }
 
298
        store.Update(info0)
 
299
        return nil
 
300
}
 
301
 
 
302
func (s *backingConstraints) removed(st *State, store *multiwatcher.Store, id interface{}) error {
 
303
        return nil
 
304
}
 
305
 
 
306
func (a *backingConstraints) mongoId() interface{} {
 
307
        panic("cannot find mongo id from constraints document")
 
308
}
 
309
 
 
310
type backingSettings map[string]interface{}
 
311
 
 
312
func (s *backingSettings) updated(st *State, store *multiwatcher.Store, id interface{}) error {
 
313
        parentId, url, ok := backingEntityIdForSettingsKey(id.(string))
 
314
        if !ok {
 
315
                return nil
 
316
        }
 
317
        info0 := store.Get(parentId)
 
318
        switch info := info0.(type) {
 
319
        case nil:
 
320
                // The parent info doesn't exist. Ignore the status until it does.
 
321
                return nil
 
322
        case *params.ServiceInfo:
 
323
                // If we're seeing settings for the service with a different
 
324
                // charm URL, we ignore them - we will fetch
 
325
                // them again when the service charm changes.
 
326
                // By doing this we make sure that the settings in the
 
327
                // ServiceInfo are always consistent with the charm URL.
 
328
                if info.CharmURL != url {
 
329
                        break
 
330
                }
 
331
                newInfo := *info
 
332
                cleanSettingsMap(*s)
 
333
                newInfo.Config = *s
 
334
                info0 = &newInfo
 
335
        default:
 
336
                return nil
 
337
        }
 
338
        store.Update(info0)
 
339
        return nil
 
340
}
 
341
 
 
342
func (s *backingSettings) removed(st *State, store *multiwatcher.Store, id interface{}) error {
 
343
        return nil
 
344
}
 
345
 
 
346
func (a *backingSettings) mongoId() interface{} {
 
347
        panic("cannot find mongo id from settings document")
 
348
}
 
349
 
 
350
// backingEntityIdForSettingsKey returns the entity id for the given
 
351
// settings key. Any extra information in the key is returned in
 
352
// extra.
 
353
func backingEntityIdForSettingsKey(key string) (eid params.EntityId, extra string, ok bool) {
 
354
        if !strings.HasPrefix(key, "s#") {
 
355
                eid, ok = backingEntityIdForGlobalKey(key)
 
356
                return
 
357
        }
 
358
        key = key[2:]
 
359
        i := strings.Index(key, "#")
 
360
        if i == -1 {
 
361
                return params.EntityId{}, "", false
 
362
        }
 
363
        eid = (&params.ServiceInfo{Name: key[0:i]}).EntityId()
 
364
        extra = key[i+1:]
 
365
        ok = true
 
366
        return
 
367
}
 
368
 
 
369
// backingEntityIdForGlobalKey returns the entity id for the given global key.
 
370
// It returns false if the key is not recognized.
 
371
func backingEntityIdForGlobalKey(key string) (params.EntityId, bool) {
 
372
        if len(key) < 3 || key[1] != '#' {
 
373
                return params.EntityId{}, false
 
374
        }
 
375
        id := key[2:]
 
376
        switch key[0] {
 
377
        case 'm':
 
378
                return (&params.MachineInfo{Id: id}).EntityId(), true
 
379
        case 'u':
 
380
                return (&params.UnitInfo{Name: id}).EntityId(), true
 
381
        case 's':
 
382
                return (&params.ServiceInfo{Name: id}).EntityId(), true
 
383
        }
 
384
        return params.EntityId{}, false
 
385
}
 
386
 
 
387
// backingEntityDoc is implemented by the documents in
 
388
// collections that the allWatcherStateBacking watches.
 
389
type backingEntityDoc interface {
 
390
        // updated is called when the document has changed.
 
391
        // The mongo _id value of the document is provided in id.
 
392
        updated(st *State, store *multiwatcher.Store, id interface{}) error
 
393
 
 
394
        // removed is called when the document has changed.
 
395
        // The receiving instance will not contain any data.
 
396
        // The mongo _id value of the document is provided in id.
 
397
        removed(st *State, store *multiwatcher.Store, id interface{}) error
 
398
 
 
399
        // mongoId returns the mongo _id field of the document.
 
400
        // It is currently never called for subsidiary documents.
 
401
        mongoId() interface{}
 
402
}
 
403
 
 
404
var (
 
405
        _ backingEntityDoc = (*backingMachine)(nil)
 
406
        _ backingEntityDoc = (*backingUnit)(nil)
 
407
        _ backingEntityDoc = (*backingService)(nil)
 
408
        _ backingEntityDoc = (*backingRelation)(nil)
 
409
        _ backingEntityDoc = (*backingAnnotation)(nil)
 
410
        _ backingEntityDoc = (*backingStatus)(nil)
 
411
        _ backingEntityDoc = (*backingConstraints)(nil)
 
412
        _ backingEntityDoc = (*backingSettings)(nil)
 
413
)
 
414
 
 
415
// allWatcherStateCollection holds information about a
 
416
// collection watched by an allWatcher and the
 
417
// type of value we use to store entity information
 
418
// for that collection.
 
419
type allWatcherStateCollection struct {
 
420
        *mgo.Collection
 
421
 
 
422
        // infoSliceType stores the type of a slice of the info type
 
423
        // that we use for this collection.  In Go 1.1 we can change
 
424
        // this to use the type itself, as we'll have reflect.SliceOf.
 
425
        infoSliceType reflect.Type
 
426
        // subsidiary is true if the collection is used only
 
427
        // to modify a primary entity.
 
428
        subsidiary bool
 
429
}
 
430
 
 
431
func newAllWatcherStateBacking(st *State) multiwatcher.Backing {
 
432
        b := &allWatcherStateBacking{
 
433
                st:               st,
 
434
                collectionByName: make(map[string]allWatcherStateCollection),
 
435
                collectionByType: make(map[reflect.Type]allWatcherStateCollection),
 
436
        }
 
437
        collections := []allWatcherStateCollection{{
 
438
                Collection:    st.machines,
 
439
                infoSliceType: reflect.TypeOf([]backingMachine(nil)),
 
440
        }, {
 
441
                Collection:    st.units,
 
442
                infoSliceType: reflect.TypeOf([]backingUnit(nil)),
 
443
        }, {
 
444
                Collection:    st.services,
 
445
                infoSliceType: reflect.TypeOf([]backingService(nil)),
 
446
        }, {
 
447
                Collection:    st.relations,
 
448
                infoSliceType: reflect.TypeOf([]backingRelation(nil)),
 
449
        }, {
 
450
                Collection:    st.annotations,
 
451
                infoSliceType: reflect.TypeOf([]backingAnnotation(nil)),
 
452
        }, {
 
453
                Collection:    st.statuses,
 
454
                infoSliceType: reflect.TypeOf([]backingStatus(nil)),
 
455
                subsidiary:    true,
 
456
        }, {
 
457
                Collection:    st.constraints,
 
458
                infoSliceType: reflect.TypeOf([]backingConstraints(nil)),
 
459
                subsidiary:    true,
 
460
        }, {
 
461
                Collection:    st.settings,
 
462
                infoSliceType: reflect.TypeOf([]backingSettings(nil)),
 
463
                subsidiary:    true,
 
464
        }}
 
465
        // Populate the collection maps from the above set of collections.
 
466
        for _, c := range collections {
 
467
                docType := c.infoSliceType.Elem()
 
468
                if _, ok := b.collectionByType[docType]; ok {
 
469
                        panic(fmt.Errorf("duplicate collection type %s", docType))
 
470
                }
 
471
                b.collectionByType[docType] = c
 
472
                if _, ok := b.collectionByName[c.Name]; ok {
 
473
                        panic(fmt.Errorf("duplicate collection name %q", c.Name))
 
474
                }
 
475
                b.collectionByName[c.Name] = c
 
476
        }
 
477
        return b
 
478
}
 
479
 
 
480
// Watch watches all the collections.
 
481
func (b *allWatcherStateBacking) Watch(in chan<- watcher.Change) {
 
482
        for _, c := range b.collectionByName {
 
483
                b.st.watcher.WatchCollection(c.Name, in)
 
484
        }
 
485
}
 
486
 
 
487
// Unwatch unwatches all the collections.
 
488
func (b *allWatcherStateBacking) Unwatch(in chan<- watcher.Change) {
 
489
        for _, c := range b.collectionByName {
 
490
                b.st.watcher.UnwatchCollection(c.Name, in)
 
491
        }
 
492
}
 
493
 
 
494
// GetAll fetches all items that we want to watch from the state.
 
495
func (b *allWatcherStateBacking) GetAll(all *multiwatcher.Store) error {
 
496
        // TODO(rog) fetch collections concurrently?
 
497
        for _, c := range b.collectionByName {
 
498
                if c.subsidiary {
 
499
                        continue
 
500
                }
 
501
                infoSlicePtr := reflect.New(c.infoSliceType)
 
502
                if err := c.Find(nil).All(infoSlicePtr.Interface()); err != nil {
 
503
                        return fmt.Errorf("cannot get all %s: %v", c.Name, err)
 
504
                }
 
505
                infos := infoSlicePtr.Elem()
 
506
                for i := 0; i < infos.Len(); i++ {
 
507
                        info := infos.Index(i).Addr().Interface().(backingEntityDoc)
 
508
                        info.updated(b.st, all, info.mongoId())
 
509
                }
 
510
        }
 
511
        return nil
 
512
}
 
513
 
 
514
// Changed updates the allWatcher's idea of the current state
 
515
// in response to the given change.
 
516
func (b *allWatcherStateBacking) Changed(all *multiwatcher.Store, change watcher.Change) error {
 
517
        c, ok := b.collectionByName[change.C]
 
518
        if !ok {
 
519
                panic(fmt.Errorf("unknown collection %q in fetch request", change.C))
 
520
        }
 
521
        doc := reflect.New(c.infoSliceType.Elem()).Interface().(backingEntityDoc)
 
522
        // TODO(rog) investigate ways that this can be made more efficient
 
523
        // than simply fetching each entity in turn.
 
524
        // TODO(rog) avoid fetching documents that we have no interest
 
525
        // in, such as settings changes to entities we don't care about.
 
526
        err := c.FindId(change.Id).One(doc)
 
527
        if err == mgo.ErrNotFound {
 
528
                return doc.removed(b.st, all, change.Id)
 
529
        }
 
530
        if err != nil {
 
531
                return err
 
532
        }
 
533
        return doc.updated(b.st, all, change.Id)
 
534
}