~rogpeppe/juju-core/438-local-instance-Addresses

« back to all changes in this revision

Viewing changes to worker/addressupdater/observer.go

[r=rogpeppe] worker/addressupdater: wire up

It could do with some more comprehensive
testing, but it might be OK to land anyway.

https://codereview.appspot.com/14306043/

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2013 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package addressupdater
 
5
 
 
6
import (
 
7
        "sync"
 
8
 
 
9
        "launchpad.net/tomb"
 
10
 
 
11
        "launchpad.net/juju-core/environs"
 
12
        "launchpad.net/juju-core/state"
 
13
        "launchpad.net/juju-core/state/watcher"
 
14
        "launchpad.net/juju-core/worker"
 
15
)
 
16
 
 
17
// TODO(rog) 2013-10-02
 
18
// Put this somewhere generally available and
 
19
// refactor other workers to use it.
 
20
 
 
21
// environObserver watches the current environment configuration
 
22
// and makes it available. It discards invalid environment
 
23
// configurations.
 
24
type environObserver struct {
 
25
        tomb           tomb.Tomb
 
26
        environWatcher state.NotifyWatcher
 
27
        st             *state.State
 
28
        mu             sync.Mutex
 
29
        environ        environs.Environ
 
30
}
 
31
 
 
32
// newEnvironObserver waits for the state to have a valid environment
 
33
// configuration and returns a new environment observer. While waiting
 
34
// for the first environment configuration, it will return with
 
35
// tomb.ErrDying if it receives a value on dying.
 
36
func newEnvironObserver(st *state.State, dying <-chan struct{}) (*environObserver, error) {
 
37
        environWatcher := st.WatchForEnvironConfigChanges()
 
38
        environ, err := worker.WaitForEnviron(environWatcher, st, dying)
 
39
        if err != nil {
 
40
                return nil, err
 
41
        }
 
42
        obs := &environObserver{
 
43
                st:             st,
 
44
                environ:        environ,
 
45
                environWatcher: environWatcher,
 
46
        }
 
47
        go func() {
 
48
                defer obs.tomb.Done()
 
49
                defer watcher.Stop(environWatcher, &obs.tomb)
 
50
                obs.tomb.Kill(obs.loop())
 
51
        }()
 
52
        return obs, nil
 
53
}
 
54
 
 
55
func (obs *environObserver) loop() error {
 
56
        for {
 
57
                select {
 
58
                case <-obs.tomb.Dying():
 
59
                        return nil
 
60
                case _, ok := <-obs.environWatcher.Changes():
 
61
                        if !ok {
 
62
                                return watcher.MustErr(obs.environWatcher)
 
63
                        }
 
64
                }
 
65
                config, err := obs.st.EnvironConfig()
 
66
                if err != nil {
 
67
                        logger.Warningf("error reading environment config: %v", err)
 
68
                        continue
 
69
                }
 
70
                environ, err := environs.New(config)
 
71
                if err != nil {
 
72
                        logger.Warningf("error creating Environ: %v", err)
 
73
                        continue
 
74
                }
 
75
                obs.mu.Lock()
 
76
                obs.environ = environ
 
77
                obs.mu.Unlock()
 
78
        }
 
79
}
 
80
 
 
81
// Environ returns the most recent valid Environ.
 
82
func (obs *environObserver) Environ() environs.Environ {
 
83
        obs.mu.Lock()
 
84
        defer obs.mu.Unlock()
 
85
        return obs.environ
 
86
}
 
87
 
 
88
func (obs *environObserver) Kill() {
 
89
        obs.tomb.Kill(nil)
 
90
}
 
91
 
 
92
func (obs *environObserver) Wait() error {
 
93
        return obs.tomb.Wait()
 
94
}