~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/apiconfigwatcher/manifold.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2016 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package apiconfigwatcher
 
5
 
 
6
import (
 
7
        "sort"
 
8
 
 
9
        "github.com/juju/errors"
 
10
        "github.com/juju/loggo"
 
11
        "github.com/juju/utils/voyeur"
 
12
        "launchpad.net/tomb"
 
13
 
 
14
        "github.com/juju/juju/agent"
 
15
        "github.com/juju/juju/worker"
 
16
        "github.com/juju/juju/worker/dependency"
 
17
)
 
18
 
 
19
var logger = loggo.GetLogger("juju.worker.apiconfigwatcher")
 
20
 
 
21
type ManifoldConfig struct {
 
22
        AgentName          string
 
23
        AgentConfigChanged *voyeur.Value
 
24
}
 
25
 
 
26
// Manifold returns a dependency.Manifold which wraps an agent's
 
27
// voyeur.Value which is set whenever the agent config is
 
28
// changed. When the API server addresses in the config change the
 
29
// manifold will bounce itself.
 
30
//
 
31
// The manifold is intended to be a dependency for the api-caller
 
32
// manifold and is required to support model migrations.
 
33
func Manifold(config ManifoldConfig) dependency.Manifold {
 
34
        return dependency.Manifold{
 
35
                Inputs: []string{config.AgentName},
 
36
                Start: func(context dependency.Context) (worker.Worker, error) {
 
37
                        if config.AgentConfigChanged == nil {
 
38
                                return nil, errors.NotValidf("nil AgentConfigChanged")
 
39
                        }
 
40
 
 
41
                        var a agent.Agent
 
42
                        if err := context.Get(config.AgentName, &a); err != nil {
 
43
                                return nil, err
 
44
                        }
 
45
 
 
46
                        w := &apiconfigwatcher{
 
47
                                agent:              a,
 
48
                                agentConfigChanged: config.AgentConfigChanged,
 
49
                                addrs:              getAPIAddresses(a),
 
50
                        }
 
51
                        go func() {
 
52
                                defer w.tomb.Done()
 
53
                                w.tomb.Kill(w.loop())
 
54
                        }()
 
55
                        return w, nil
 
56
                },
 
57
        }
 
58
}
 
59
 
 
60
type apiconfigwatcher struct {
 
61
        tomb               tomb.Tomb
 
62
        agent              agent.Agent
 
63
        agentConfigChanged *voyeur.Value
 
64
        addrs              []string
 
65
}
 
66
 
 
67
func (w *apiconfigwatcher) loop() error {
 
68
        watch := w.agentConfigChanged.Watch()
 
69
        defer watch.Close()
 
70
 
 
71
        // TODO(mjs) - this is pretty awful. There should be a
 
72
        // NotifyWatcher for voyeur.Value. Note also that this code is
 
73
        // repeated elsewhere.
 
74
        watchCh := make(chan bool)
 
75
        go func() {
 
76
                for {
 
77
                        if watch.Next() {
 
78
                                select {
 
79
                                case <-w.tomb.Dying():
 
80
                                        return
 
81
                                case watchCh <- true:
 
82
                                }
 
83
                        } else {
 
84
                                // watcher or voyeur.Value closed.
 
85
                                close(watchCh)
 
86
                                return
 
87
                        }
 
88
                }
 
89
        }()
 
90
 
 
91
        for {
 
92
                // Always unconditionally check for a change in API addresses
 
93
                // first, in case there was a change between the start func
 
94
                // and the call to Watch.
 
95
                if !stringSliceEq(w.addrs, getAPIAddresses(w.agent)) {
 
96
                        logger.Debugf("API addresses changed in agent config")
 
97
                        return dependency.ErrBounce
 
98
                }
 
99
 
 
100
                select {
 
101
                case <-w.tomb.Dying():
 
102
                        return tomb.ErrDying
 
103
                case _, ok := <-watchCh:
 
104
                        if !ok {
 
105
                                return errors.New("config changed value closed")
 
106
                        }
 
107
                }
 
108
        }
 
109
}
 
110
 
 
111
// Kill implements worker.Worker.
 
112
func (w *apiconfigwatcher) Kill() {
 
113
        w.tomb.Kill(nil)
 
114
}
 
115
 
 
116
// Wait implements worker.Worker.
 
117
func (w *apiconfigwatcher) Wait() error {
 
118
        return w.tomb.Wait()
 
119
}
 
120
 
 
121
func getAPIAddresses(a agent.Agent) []string {
 
122
        config := a.CurrentConfig()
 
123
        addrs, err := config.APIAddresses()
 
124
        if err != nil {
 
125
                logger.Errorf("retrieving API addresses: %s", err)
 
126
                addrs = nil
 
127
        }
 
128
        sort.Strings(addrs)
 
129
        return addrs
 
130
}
 
131
 
 
132
func stringSliceEq(a, b []string) bool {
 
133
        if a == nil && b == nil {
 
134
                return true
 
135
        }
 
136
 
 
137
        if a == nil || b == nil {
 
138
                return false
 
139
        }
 
140
 
 
141
        if len(a) != len(b) {
 
142
                return false
 
143
        }
 
144
 
 
145
        for i := range a {
 
146
                if a[i] != b[i] {
 
147
                        return false
 
148
                }
 
149
        }
 
150
        return true
 
151
}