~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/instancepoller/updater.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2013 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package instancepoller
 
5
 
 
6
import (
 
7
        "fmt"
 
8
        "time"
 
9
 
 
10
        "github.com/juju/errors"
 
11
        "github.com/juju/loggo"
 
12
        "gopkg.in/juju/names.v2"
 
13
 
 
14
        "github.com/juju/juju/apiserver/params"
 
15
        "github.com/juju/juju/instance"
 
16
        "github.com/juju/juju/network"
 
17
        "github.com/juju/juju/status"
 
18
        "github.com/juju/juju/watcher"
 
19
)
 
20
 
 
21
var logger = loggo.GetLogger("juju.worker.instancepoller")
 
22
 
 
23
// ShortPoll and LongPoll hold the polling intervals for the instance
 
24
// updater. When a machine has no address or is not started, it will be
 
25
// polled at ShortPoll intervals until it does, exponentially backing off
 
26
// with an exponent of ShortPollBackoff until a maximum(ish) of LongPoll.
 
27
//
 
28
// When a machine has an address and is started LongPoll will be used to
 
29
// check that the instance address or status has not changed.
 
30
var (
 
31
        ShortPoll        = 1 * time.Second
 
32
        ShortPollBackoff = 2.0
 
33
        LongPoll         = 15 * time.Minute
 
34
)
 
35
 
 
36
type machine interface {
 
37
        Id() string
 
38
        Tag() names.MachineTag
 
39
        InstanceId() (instance.Id, error)
 
40
        ProviderAddresses() ([]network.Address, error)
 
41
        SetProviderAddresses(...network.Address) error
 
42
        InstanceStatus() (params.StatusResult, error)
 
43
        SetInstanceStatus(status.Status, string, map[string]interface{}) error
 
44
        String() string
 
45
        Refresh() error
 
46
        Life() params.Life
 
47
        Status() (params.StatusResult, error)
 
48
        IsManual() (bool, error)
 
49
}
 
50
 
 
51
type instanceInfo struct {
 
52
        addresses []network.Address
 
53
        status    instance.InstanceStatus
 
54
}
 
55
 
 
56
// lifetimeContext was extracted to allow the various context clients to get
 
57
// the benefits of the catacomb encapsulating everything that should happen
 
58
// here. A clean implementation would almost certainly not need this.
 
59
type lifetimeContext interface {
 
60
        kill(error)
 
61
        dying() <-chan struct{}
 
62
        errDying() error
 
63
}
 
64
 
 
65
type machineContext interface {
 
66
        lifetimeContext
 
67
        instanceInfo(id instance.Id) (instanceInfo, error)
 
68
}
 
69
 
 
70
type updaterContext interface {
 
71
        lifetimeContext
 
72
        newMachineContext() machineContext
 
73
        getMachine(tag names.MachineTag) (machine, error)
 
74
}
 
75
 
 
76
type updater struct {
 
77
        context     updaterContext
 
78
        machines    map[names.MachineTag]chan struct{}
 
79
        machineDead chan machine
 
80
}
 
81
 
 
82
// watchMachinesLoop watches for changes provided by the given
 
83
// machinesWatcher and starts machine goroutines to deal with them,
 
84
// using the provided newMachineContext function to create the
 
85
// appropriate context for each new machine tag.
 
86
func watchMachinesLoop(context updaterContext, machinesWatcher watcher.StringsWatcher) (err error) {
 
87
        p := &updater{
 
88
                context:     context,
 
89
                machines:    make(map[names.MachineTag]chan struct{}),
 
90
                machineDead: make(chan machine),
 
91
        }
 
92
        defer func() {
 
93
                // TODO(fwereade): is this a home-grown sync.WaitGroup or something?
 
94
                // strongly suspect these machine goroutines could be managed rather
 
95
                // less opaquely if we made them all workers.
 
96
                for len(p.machines) > 0 {
 
97
                        delete(p.machines, (<-p.machineDead).Tag())
 
98
                }
 
99
        }()
 
100
        for {
 
101
                select {
 
102
                case <-p.context.dying():
 
103
                        return p.context.errDying()
 
104
                case ids, ok := <-machinesWatcher.Changes():
 
105
                        if !ok {
 
106
                                return errors.New("machines watcher closed")
 
107
                        }
 
108
                        tags := make([]names.MachineTag, len(ids))
 
109
                        for i := range ids {
 
110
                                tags[i] = names.NewMachineTag(ids[i])
 
111
                        }
 
112
                        if err := p.startMachines(tags); err != nil {
 
113
                                return err
 
114
                        }
 
115
                case m := <-p.machineDead:
 
116
                        delete(p.machines, m.Tag())
 
117
                }
 
118
        }
 
119
}
 
120
 
 
121
func (p *updater) startMachines(tags []names.MachineTag) error {
 
122
        for _, tag := range tags {
 
123
                if c := p.machines[tag]; c == nil {
 
124
                        // We don't know about the machine - start
 
125
                        // a goroutine to deal with it.
 
126
                        m, err := p.context.getMachine(tag)
 
127
                        if err != nil {
 
128
                                return errors.Trace(err)
 
129
                        }
 
130
                        // We don't poll manual machines.
 
131
                        isManual, err := m.IsManual()
 
132
                        if err != nil {
 
133
                                return errors.Trace(err)
 
134
                        }
 
135
                        if isManual {
 
136
                                continue
 
137
                        }
 
138
                        c = make(chan struct{})
 
139
                        p.machines[tag] = c
 
140
                        go runMachine(p.context.newMachineContext(), m, c, p.machineDead)
 
141
                } else {
 
142
                        select {
 
143
                        case <-p.context.dying():
 
144
                                return p.context.errDying()
 
145
                        case c <- struct{}{}:
 
146
                        }
 
147
                }
 
148
        }
 
149
        return nil
 
150
}
 
151
 
 
152
// runMachine processes the address and status publishing for a given machine.
 
153
// We assume that the machine is alive when this is first called.
 
154
func runMachine(context machineContext, m machine, changed <-chan struct{}, died chan<- machine) {
 
155
        defer func() {
 
156
                // We can't just send on the died channel because the
 
157
                // central loop might be trying to write to us on the
 
158
                // changed channel.
 
159
                for {
 
160
                        select {
 
161
                        case died <- m:
 
162
                                return
 
163
                        case <-changed:
 
164
                        }
 
165
                }
 
166
        }()
 
167
        if err := machineLoop(context, m, changed); err != nil {
 
168
                context.kill(err)
 
169
        }
 
170
}
 
171
 
 
172
func machineLoop(context machineContext, m machine, changed <-chan struct{}) error {
 
173
        // Use a short poll interval when initially waiting for
 
174
        // a machine's address and machine agent to start, and a long one when it already
 
175
        // has an address and the machine agent is started.
 
176
        pollInterval := ShortPoll
 
177
        pollInstance := true
 
178
        for {
 
179
                if pollInstance {
 
180
                        instInfo, err := pollInstanceInfo(context, m)
 
181
                        if err != nil && !params.IsCodeNotProvisioned(err) {
 
182
                                return err
 
183
                        }
 
184
                        machineStatus := status.StatusPending
 
185
                        if err == nil {
 
186
                                if statusInfo, err := m.Status(); err != nil {
 
187
                                        logger.Warningf("cannot get current machine status for machine %v: %v", m.Id(), err)
 
188
                                } else {
 
189
                                        // TODO(perrito666) add status validation.
 
190
                                        machineStatus = status.Status(statusInfo.Status)
 
191
                                }
 
192
                        }
 
193
                        // the extra condition below (checking allocating/pending) is here to improve user experience
 
194
                        // without it the instance status will say "pending" for +10 minutes after the agent comes up to "started"
 
195
                        if instInfo.status.Status != status.StatusAllocating && instInfo.status.Status != status.StatusPending {
 
196
                                if len(instInfo.addresses) > 0 && machineStatus == status.StatusStarted {
 
197
                                        // We've got at least one address and a status and instance is started, so poll infrequently.
 
198
                                        pollInterval = LongPoll
 
199
                                } else if pollInterval < LongPoll {
 
200
                                        // We have no addresses or not started - poll increasingly rarely
 
201
                                        // until we do.
 
202
                                        pollInterval = time.Duration(float64(pollInterval) * ShortPollBackoff)
 
203
                                }
 
204
                        }
 
205
                        pollInstance = false
 
206
                }
 
207
                select {
 
208
                case <-context.dying():
 
209
                        return context.errDying()
 
210
                case <-time.After(pollInterval):
 
211
                        // TODO(fwereade): 2016-03-17 lp:1558657
 
212
                        pollInstance = true
 
213
                case <-changed:
 
214
                        if err := m.Refresh(); err != nil {
 
215
                                return err
 
216
                        }
 
217
                        if m.Life() == params.Dead {
 
218
                                return nil
 
219
                        }
 
220
                }
 
221
        }
 
222
}
 
223
 
 
224
// pollInstanceInfo checks the current provider addresses and status
 
225
// for the given machine's instance, and sets them on the machine if they've changed.
 
226
func pollInstanceInfo(context machineContext, m machine) (instInfo instanceInfo, err error) {
 
227
        instInfo = instanceInfo{}
 
228
        instId, err := m.InstanceId()
 
229
        // We can't ask the machine for its addresses if it isn't provisioned yet.
 
230
        if params.IsCodeNotProvisioned(err) {
 
231
                return instInfo, err
 
232
        }
 
233
        if err != nil {
 
234
                return instInfo, fmt.Errorf("cannot get machine's instance id: %v", err)
 
235
        }
 
236
        instInfo, err = context.instanceInfo(instId)
 
237
        if err != nil {
 
238
                // TODO (anastasiamac 2016-02-01) This does not look like it needs to be removed now.
 
239
                if params.IsCodeNotImplemented(err) {
 
240
                        return instInfo, err
 
241
                }
 
242
                logger.Warningf("cannot get instance info for instance %q: %v", instId, err)
 
243
                return instInfo, nil
 
244
        }
 
245
        instStat, err := m.InstanceStatus()
 
246
        if err != nil {
 
247
                // This should never occur since the machine is provisioned.
 
248
                // But just in case, we reset polled status so we try again next time.
 
249
                logger.Warningf("cannot get current instance status for machine %v: %v", m.Id(), err)
 
250
                instInfo.status = instance.InstanceStatus{status.StatusUnknown, ""}
 
251
        } else {
 
252
                // TODO(perrito666) add status validation.
 
253
                currentInstStatus := instance.InstanceStatus{
 
254
                        Status:  status.Status(instStat.Status),
 
255
                        Message: instStat.Info,
 
256
                }
 
257
                if instInfo.status != currentInstStatus {
 
258
                        logger.Infof("machine %q instance status changed from %q to %q", m.Id(), currentInstStatus, instInfo.status)
 
259
                        if err = m.SetInstanceStatus(instInfo.status.Status, instInfo.status.Message, nil); err != nil {
 
260
                                logger.Errorf("cannot set instance status on %q: %v", m, err)
 
261
                        }
 
262
                }
 
263
        }
 
264
        providerAddresses, err := m.ProviderAddresses()
 
265
        if err != nil {
 
266
                return instInfo, err
 
267
        }
 
268
        if !addressesEqual(providerAddresses, instInfo.addresses) {
 
269
                logger.Infof("machine %q has new addresses: %v", m.Id(), instInfo.addresses)
 
270
                if err = m.SetProviderAddresses(instInfo.addresses...); err != nil {
 
271
                        logger.Errorf("cannot set addresses on %q: %v", m, err)
 
272
                }
 
273
        }
 
274
        return instInfo, err
 
275
}
 
276
 
 
277
// addressesEqual compares the addresses of the machine and the instance information.
 
278
func addressesEqual(a0, a1 []network.Address) bool {
 
279
        if len(a0) != len(a1) {
 
280
                logger.Tracef("address lists have different lengths %d != %d for %v != %v",
 
281
                        len(a0), len(a1), a0, a1)
 
282
                return false
 
283
        }
 
284
 
 
285
        ca0 := make([]network.Address, len(a0))
 
286
        copy(ca0, a0)
 
287
        network.SortAddresses(ca0)
 
288
        ca1 := make([]network.Address, len(a1))
 
289
        copy(ca1, a1)
 
290
        network.SortAddresses(ca1)
 
291
 
 
292
        for i := range ca0 {
 
293
                if ca0[i] != ca1[i] {
 
294
                        logger.Tracef("address entry at offset %d has a different value for %v != %v",
 
295
                                i, ca0, ca1)
 
296
                        return false
 
297
                }
 
298
        }
 
299
        return true
 
300
}