1
// Copyright 2013 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
12
"launchpad.net/juju-core/errors"
13
"launchpad.net/juju-core/instance"
14
"launchpad.net/juju-core/state"
15
"launchpad.net/juju-core/state/watcher"
18
var logger = loggo.GetLogger("juju.worker.addressupdater")
20
// ShortPoll and LongPoll hold the polling intervals for the address
21
// updater. When a machine has no address, it will be polled at
22
// ShortPoll intervals until it does, exponentially backing off with an
23
// exponent of ShortPollBackoff until a maximum(ish) of LongPoll.
25
// When a machine has an address LongPoll will be used to check that the
26
// instance address has not changed.
28
ShortPoll = 1 * time.Second
29
ShortPollBackoff = 2.0
30
LongPoll = 15 * time.Minute
33
type machine interface {
35
Addresses() []instance.Address
36
InstanceId() (instance.Id, error)
37
SetAddresses([]instance.Address) error
43
type machineContext interface {
45
addresses(id instance.Id) ([]instance.Address, error)
46
dying() <-chan struct{}
49
type machineAddress struct {
51
addresses []instance.Address
54
var _ machine = (*state.Machine)(nil)
56
type machinesWatcher interface {
57
Changes() <-chan []string
62
type updaterContext interface {
63
newMachineContext() machineContext
64
getMachine(id string) (machine, error)
65
dying() <-chan struct{}
69
context updaterContext
70
machines map[string]chan struct{}
71
machineDead chan machine
74
// watchMachinesLoop watches for changes provided by the given
75
// machinesWatcher and starts machine goroutines to deal
76
// with them, using the provided newMachineContext
77
// function to create the appropriate context for each new machine id.
78
func watchMachinesLoop(context updaterContext, w machinesWatcher) (err error) {
81
machines: make(map[string]chan struct{}),
82
machineDead: make(chan machine),
85
if stopErr := w.Stop(); stopErr != nil {
87
err = fmt.Errorf("error stopping watcher: %v", stopErr)
89
logger.Warningf("ignoring error when stopping watcher: %v", stopErr)
92
for len(p.machines) > 0 {
93
delete(p.machines, (<-p.machineDead).Id())
98
case ids, ok := <-w.Changes():
100
return watcher.MustErr(w)
102
if err := p.startMachines(ids); err != nil {
105
case m := <-p.machineDead:
106
delete(p.machines, m.Id())
107
case <-p.context.dying():
113
func (p *updater) startMachines(ids []string) error {
114
for _, id := range ids {
115
if c := p.machines[id]; c == nil {
116
// We don't know about the machine - start
117
// a goroutine to deal with it.
118
m, err := p.context.getMachine(id)
119
if errors.IsNotFoundError(err) {
120
logger.Warningf("watcher gave notification of non-existent machine %q", id)
126
c = make(chan struct{})
128
go runMachine(p.context.newMachineContext(), m, c, p.machineDead)
136
// runMachine processes the address publishing for a given machine.
137
// We assume that the machine is alive when this is first called.
138
func runMachine(context machineContext, m machine, changed <-chan struct{}, died chan<- machine) {
140
// We can't just send on the died channel because the
141
// central loop might be trying to write to us on the
151
if err := machineLoop(context, m, changed); err != nil {
156
func machineLoop(context machineContext, m machine, changed <-chan struct{}) error {
157
// Use a short poll interval when initially waiting for
158
// a machine's address, and a long one when it already
160
pollInterval := ShortPoll
164
if err := checkMachineAddresses(context, m); err != nil {
165
// If the provider doesn't implement addresses now,
166
// it never will until we're upgraded, so don't bother
167
// asking any more. We could use less resources
168
// by taking down the entire address updater worker,
169
// but this is easier for now (and hopefully the local
170
// provider will implement Addresses in the not-too-distant
171
// future), so we won't need to worry about this case at all.
172
if errors.IsNotImplementedError(err) {
173
pollInterval = 365 * 24 * time.Hour
178
if len(m.Addresses()) > 0 {
179
// We've got at least one address, so poll infrequently.
180
pollInterval = LongPoll
181
} else if pollInterval < LongPoll {
182
// We have no addresses - poll increasingly rarely
184
pollInterval = time.Duration(float64(pollInterval) * ShortPollBackoff)
189
case <-time.After(pollInterval):
191
case <-context.dying():
194
if err := m.Refresh(); err != nil {
197
if m.Life() == state.Dead {
204
// checkMachineAddresses checks the current provider addresses
205
// for the given machine's instance, and sets them
206
// on the machine if they've changed.
207
func checkMachineAddresses(context machineContext, m machine) error {
208
instId, err := m.InstanceId()
209
if err != nil && !state.IsNotProvisionedError(err) {
210
return fmt.Errorf("cannot get machine's instance id: %v", err)
212
var newAddrs []instance.Address
214
newAddrs, err = context.addresses(instId)
216
if errors.IsNotImplementedError(err) {
219
logger.Warningf("cannot get addresses for instance %q: %v", instId, err)
223
if addressesEqual(m.Addresses(), newAddrs) {
226
logger.Infof("machine %q has new addresses: %v", m.Id(), newAddrs)
227
if err := m.SetAddresses(newAddrs); err != nil {
228
return fmt.Errorf("cannot set addresses on %q: %v", m, err)
233
func addressesEqual(a0, a1 []instance.Address) bool {
234
if len(a0) != len(a1) {