~james-page/ubuntu/saucy/juju-core/1.16.5

« back to all changes in this revision

Viewing changes to src/launchpad.net/juju-core/worker/addressupdater/updater.go

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2013-10-10 18:07:45 UTC
  • mfrom: (1.1.10)
  • Revision ID: package-import@ubuntu.com-20131010180745-wuo0vv7hq7faavdk
Tags: 1.16.0-0ubuntu1
New upstream stable release (LP: #1219879).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2013 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package addressupdater
 
5
 
 
6
import (
 
7
        "fmt"
 
8
        "time"
 
9
 
 
10
        "launchpad.net/loggo"
 
11
 
 
12
        "launchpad.net/juju-core/errors"
 
13
        "launchpad.net/juju-core/instance"
 
14
        "launchpad.net/juju-core/state"
 
15
        "launchpad.net/juju-core/state/watcher"
 
16
)
 
17
 
 
18
var logger = loggo.GetLogger("juju.worker.addressupdater")
 
19
 
 
20
// ShortPoll and LongPoll hold the polling intervals for the address
 
21
// updater. When a machine has no address, it will be polled at
 
22
// ShortPoll intervals until it does, exponentially backing off with an
 
23
// exponent of ShortPollBackoff until a maximum(ish) of LongPoll.
 
24
//
 
25
// When a machine has an address LongPoll will be used to check that the
 
26
// instance address has not changed.
 
27
var (
 
28
        ShortPoll        = 1 * time.Second
 
29
        ShortPollBackoff = 2.0
 
30
        LongPoll         = 15 * time.Minute
 
31
)
 
32
 
 
33
type machine interface {
 
34
        Id() string
 
35
        Addresses() []instance.Address
 
36
        InstanceId() (instance.Id, error)
 
37
        SetAddresses([]instance.Address) error
 
38
        String() string
 
39
        Refresh() error
 
40
        Life() state.Life
 
41
}
 
42
 
 
43
type machineContext interface {
 
44
        killAll(err error)
 
45
        addresses(id instance.Id) ([]instance.Address, error)
 
46
        dying() <-chan struct{}
 
47
}
 
48
 
 
49
type machineAddress struct {
 
50
        machine   machine
 
51
        addresses []instance.Address
 
52
}
 
53
 
 
54
var _ machine = (*state.Machine)(nil)
 
55
 
 
56
type machinesWatcher interface {
 
57
        Changes() <-chan []string
 
58
        Err() error
 
59
        Stop() error
 
60
}
 
61
 
 
62
type updaterContext interface {
 
63
        newMachineContext() machineContext
 
64
        getMachine(id string) (machine, error)
 
65
        dying() <-chan struct{}
 
66
}
 
67
 
 
68
type updater struct {
 
69
        context     updaterContext
 
70
        machines    map[string]chan struct{}
 
71
        machineDead chan machine
 
72
}
 
73
 
 
74
// watchMachinesLoop watches for changes provided by the given
 
75
// machinesWatcher and starts machine goroutines to deal
 
76
// with them, using the provided newMachineContext
 
77
// function to create the appropriate context for each new machine id.
 
78
func watchMachinesLoop(context updaterContext, w machinesWatcher) (err error) {
 
79
        p := &updater{
 
80
                context:     context,
 
81
                machines:    make(map[string]chan struct{}),
 
82
                machineDead: make(chan machine),
 
83
        }
 
84
        defer func() {
 
85
                if stopErr := w.Stop(); stopErr != nil {
 
86
                        if err == nil {
 
87
                                err = fmt.Errorf("error stopping watcher: %v", stopErr)
 
88
                        } else {
 
89
                                logger.Warningf("ignoring error when stopping watcher: %v", stopErr)
 
90
                        }
 
91
                }
 
92
                for len(p.machines) > 0 {
 
93
                        delete(p.machines, (<-p.machineDead).Id())
 
94
                }
 
95
        }()
 
96
        for {
 
97
                select {
 
98
                case ids, ok := <-w.Changes():
 
99
                        if !ok {
 
100
                                return watcher.MustErr(w)
 
101
                        }
 
102
                        if err := p.startMachines(ids); err != nil {
 
103
                                return err
 
104
                        }
 
105
                case m := <-p.machineDead:
 
106
                        delete(p.machines, m.Id())
 
107
                case <-p.context.dying():
 
108
                        return nil
 
109
                }
 
110
        }
 
111
}
 
112
 
 
113
func (p *updater) startMachines(ids []string) error {
 
114
        for _, id := range ids {
 
115
                if c := p.machines[id]; c == nil {
 
116
                        // We don't know about the machine - start
 
117
                        // a goroutine to deal with it.
 
118
                        m, err := p.context.getMachine(id)
 
119
                        if errors.IsNotFoundError(err) {
 
120
                                logger.Warningf("watcher gave notification of non-existent machine %q", id)
 
121
                                continue
 
122
                        }
 
123
                        if err != nil {
 
124
                                return err
 
125
                        }
 
126
                        c = make(chan struct{})
 
127
                        p.machines[id] = c
 
128
                        go runMachine(p.context.newMachineContext(), m, c, p.machineDead)
 
129
                } else {
 
130
                        c <- struct{}{}
 
131
                }
 
132
        }
 
133
        return nil
 
134
}
 
135
 
 
136
// runMachine processes the address publishing for a given machine.
 
137
// We assume that the machine is alive when this is first called.
 
138
func runMachine(context machineContext, m machine, changed <-chan struct{}, died chan<- machine) {
 
139
        defer func() {
 
140
                // We can't just send on the died channel because the
 
141
                // central loop might be trying to write to us on the
 
142
                // changed channel.
 
143
                for {
 
144
                        select {
 
145
                        case died <- m:
 
146
                                return
 
147
                        case <-changed:
 
148
                        }
 
149
                }
 
150
        }()
 
151
        if err := machineLoop(context, m, changed); err != nil {
 
152
                context.killAll(err)
 
153
        }
 
154
}
 
155
 
 
156
func machineLoop(context machineContext, m machine, changed <-chan struct{}) error {
 
157
        // Use a short poll interval when initially waiting for
 
158
        // a machine's address, and a long one when it already
 
159
        // has an address.
 
160
        pollInterval := ShortPoll
 
161
        checkAddress := true
 
162
        for {
 
163
                if checkAddress {
 
164
                        if err := checkMachineAddresses(context, m); err != nil {
 
165
                                // If the provider doesn't implement addresses now,
 
166
                                // it never will until we're upgraded, so don't bother
 
167
                                // asking any more. We could use less resources
 
168
                                // by taking down the entire address updater worker,
 
169
                                // but this is easier for now (and hopefully the local
 
170
                                // provider will implement Addresses in the not-too-distant
 
171
                                // future), so we won't need to worry about this case at all.
 
172
                                if errors.IsNotImplementedError(err) {
 
173
                                        pollInterval = 365 * 24 * time.Hour
 
174
                                } else {
 
175
                                        return err
 
176
                                }
 
177
                        }
 
178
                        if len(m.Addresses()) > 0 {
 
179
                                // We've got at least one address, so poll infrequently.
 
180
                                pollInterval = LongPoll
 
181
                        } else if pollInterval < LongPoll {
 
182
                                // We have no addresses - poll increasingly rarely
 
183
                                // until we do.
 
184
                                pollInterval = time.Duration(float64(pollInterval) * ShortPollBackoff)
 
185
                        }
 
186
                        checkAddress = false
 
187
                }
 
188
                select {
 
189
                case <-time.After(pollInterval):
 
190
                        checkAddress = true
 
191
                case <-context.dying():
 
192
                        return nil
 
193
                case <-changed:
 
194
                        if err := m.Refresh(); err != nil {
 
195
                                return err
 
196
                        }
 
197
                        if m.Life() == state.Dead {
 
198
                                return nil
 
199
                        }
 
200
                }
 
201
        }
 
202
}
 
203
 
 
204
// checkMachineAddresses checks the current provider addresses
 
205
// for the given machine's instance, and sets them
 
206
// on the machine if they've changed.
 
207
func checkMachineAddresses(context machineContext, m machine) error {
 
208
        instId, err := m.InstanceId()
 
209
        if err != nil && !state.IsNotProvisionedError(err) {
 
210
                return fmt.Errorf("cannot get machine's instance id: %v", err)
 
211
        }
 
212
        var newAddrs []instance.Address
 
213
        if err == nil {
 
214
                newAddrs, err = context.addresses(instId)
 
215
                if err != nil {
 
216
                        if errors.IsNotImplementedError(err) {
 
217
                                return err
 
218
                        }
 
219
                        logger.Warningf("cannot get addresses for instance %q: %v", instId, err)
 
220
                        return nil
 
221
                }
 
222
        }
 
223
        if addressesEqual(m.Addresses(), newAddrs) {
 
224
                return nil
 
225
        }
 
226
        logger.Infof("machine %q has new addresses: %v", m.Id(), newAddrs)
 
227
        if err := m.SetAddresses(newAddrs); err != nil {
 
228
                return fmt.Errorf("cannot set addresses on %q: %v", m, err)
 
229
        }
 
230
        return nil
 
231
}
 
232
 
 
233
func addressesEqual(a0, a1 []instance.Address) bool {
 
234
        if len(a0) != len(a1) {
 
235
                return false
 
236
        }
 
237
        for i := range a0 {
 
238
                if a0[i] != a1[i] {
 
239
                        return false
 
240
                }
 
241
        }
 
242
        return true
 
243
}