1
// Copyright 2013 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
10
"github.com/juju/errors"
11
"github.com/juju/loggo"
12
"gopkg.in/juju/names.v2"
14
"github.com/juju/juju/apiserver/params"
15
"github.com/juju/juju/instance"
16
"github.com/juju/juju/network"
17
"github.com/juju/juju/status"
18
"github.com/juju/juju/watcher"
21
var logger = loggo.GetLogger("juju.worker.instancepoller")
23
// ShortPoll and LongPoll hold the polling intervals for the instance
24
// updater. When a machine has no address or is not started, it will be
25
// polled at ShortPoll intervals until it does, exponentially backing off
26
// with an exponent of ShortPollBackoff until a maximum(ish) of LongPoll.
28
// When a machine has an address and is started LongPoll will be used to
29
// check that the instance address or status has not changed.
31
ShortPoll = 1 * time.Second
32
ShortPollBackoff = 2.0
33
LongPoll = 15 * time.Minute
36
type machine interface {
38
Tag() names.MachineTag
39
InstanceId() (instance.Id, error)
40
ProviderAddresses() ([]network.Address, error)
41
SetProviderAddresses(...network.Address) error
42
InstanceStatus() (params.StatusResult, error)
43
SetInstanceStatus(status.Status, string, map[string]interface{}) error
47
Status() (params.StatusResult, error)
48
IsManual() (bool, error)
51
type instanceInfo struct {
52
addresses []network.Address
53
status instance.InstanceStatus
56
// lifetimeContext was extracted to allow the various context clients to get
57
// the benefits of the catacomb encapsulating everything that should happen
58
// here. A clean implementation would almost certainly not need this.
59
type lifetimeContext interface {
61
dying() <-chan struct{}
65
type machineContext interface {
67
instanceInfo(id instance.Id) (instanceInfo, error)
70
type updaterContext interface {
72
newMachineContext() machineContext
73
getMachine(tag names.MachineTag) (machine, error)
77
context updaterContext
78
machines map[names.MachineTag]chan struct{}
79
machineDead chan machine
82
// watchMachinesLoop watches for changes provided by the given
83
// machinesWatcher and starts machine goroutines to deal with them,
84
// using the provided newMachineContext function to create the
85
// appropriate context for each new machine tag.
86
func watchMachinesLoop(context updaterContext, machinesWatcher watcher.StringsWatcher) (err error) {
89
machines: make(map[names.MachineTag]chan struct{}),
90
machineDead: make(chan machine),
93
// TODO(fwereade): is this a home-grown sync.WaitGroup or something?
94
// strongly suspect these machine goroutines could be managed rather
95
// less opaquely if we made them all workers.
96
for len(p.machines) > 0 {
97
delete(p.machines, (<-p.machineDead).Tag())
102
case <-p.context.dying():
103
return p.context.errDying()
104
case ids, ok := <-machinesWatcher.Changes():
106
return errors.New("machines watcher closed")
108
tags := make([]names.MachineTag, len(ids))
110
tags[i] = names.NewMachineTag(ids[i])
112
if err := p.startMachines(tags); err != nil {
115
case m := <-p.machineDead:
116
delete(p.machines, m.Tag())
121
func (p *updater) startMachines(tags []names.MachineTag) error {
122
for _, tag := range tags {
123
if c := p.machines[tag]; c == nil {
124
// We don't know about the machine - start
125
// a goroutine to deal with it.
126
m, err := p.context.getMachine(tag)
128
return errors.Trace(err)
130
// We don't poll manual machines.
131
isManual, err := m.IsManual()
133
return errors.Trace(err)
138
c = make(chan struct{})
140
go runMachine(p.context.newMachineContext(), m, c, p.machineDead)
143
case <-p.context.dying():
144
return p.context.errDying()
145
case c <- struct{}{}:
152
// runMachine processes the address and status publishing for a given machine.
153
// We assume that the machine is alive when this is first called.
154
func runMachine(context machineContext, m machine, changed <-chan struct{}, died chan<- machine) {
156
// We can't just send on the died channel because the
157
// central loop might be trying to write to us on the
167
if err := machineLoop(context, m, changed); err != nil {
172
func machineLoop(context machineContext, m machine, changed <-chan struct{}) error {
173
// Use a short poll interval when initially waiting for
174
// a machine's address and machine agent to start, and a long one when it already
175
// has an address and the machine agent is started.
176
pollInterval := ShortPoll
180
instInfo, err := pollInstanceInfo(context, m)
181
if err != nil && !params.IsCodeNotProvisioned(err) {
184
machineStatus := status.StatusPending
186
if statusInfo, err := m.Status(); err != nil {
187
logger.Warningf("cannot get current machine status for machine %v: %v", m.Id(), err)
189
// TODO(perrito666) add status validation.
190
machineStatus = status.Status(statusInfo.Status)
193
// the extra condition below (checking allocating/pending) is here to improve user experience
194
// without it the instance status will say "pending" for +10 minutes after the agent comes up to "started"
195
if instInfo.status.Status != status.StatusAllocating && instInfo.status.Status != status.StatusPending {
196
if len(instInfo.addresses) > 0 && machineStatus == status.StatusStarted {
197
// We've got at least one address and a status and instance is started, so poll infrequently.
198
pollInterval = LongPoll
199
} else if pollInterval < LongPoll {
200
// We have no addresses or not started - poll increasingly rarely
202
pollInterval = time.Duration(float64(pollInterval) * ShortPollBackoff)
208
case <-context.dying():
209
return context.errDying()
210
case <-time.After(pollInterval):
211
// TODO(fwereade): 2016-03-17 lp:1558657
214
if err := m.Refresh(); err != nil {
217
if m.Life() == params.Dead {
224
// pollInstanceInfo checks the current provider addresses and status
225
// for the given machine's instance, and sets them on the machine if they've changed.
226
func pollInstanceInfo(context machineContext, m machine) (instInfo instanceInfo, err error) {
227
instInfo = instanceInfo{}
228
instId, err := m.InstanceId()
229
// We can't ask the machine for its addresses if it isn't provisioned yet.
230
if params.IsCodeNotProvisioned(err) {
234
return instInfo, fmt.Errorf("cannot get machine's instance id: %v", err)
236
instInfo, err = context.instanceInfo(instId)
238
// TODO (anastasiamac 2016-02-01) This does not look like it needs to be removed now.
239
if params.IsCodeNotImplemented(err) {
242
logger.Warningf("cannot get instance info for instance %q: %v", instId, err)
245
instStat, err := m.InstanceStatus()
247
// This should never occur since the machine is provisioned.
248
// But just in case, we reset polled status so we try again next time.
249
logger.Warningf("cannot get current instance status for machine %v: %v", m.Id(), err)
250
instInfo.status = instance.InstanceStatus{status.StatusUnknown, ""}
252
// TODO(perrito666) add status validation.
253
currentInstStatus := instance.InstanceStatus{
254
Status: status.Status(instStat.Status),
255
Message: instStat.Info,
257
if instInfo.status != currentInstStatus {
258
logger.Infof("machine %q instance status changed from %q to %q", m.Id(), currentInstStatus, instInfo.status)
259
if err = m.SetInstanceStatus(instInfo.status.Status, instInfo.status.Message, nil); err != nil {
260
logger.Errorf("cannot set instance status on %q: %v", m, err)
264
providerAddresses, err := m.ProviderAddresses()
268
if !addressesEqual(providerAddresses, instInfo.addresses) {
269
logger.Infof("machine %q has new addresses: %v", m.Id(), instInfo.addresses)
270
if err = m.SetProviderAddresses(instInfo.addresses...); err != nil {
271
logger.Errorf("cannot set addresses on %q: %v", m, err)
277
// addressesEqual compares the addresses of the machine and the instance information.
278
func addressesEqual(a0, a1 []network.Address) bool {
279
if len(a0) != len(a1) {
280
logger.Tracef("address lists have different lengths %d != %d for %v != %v",
281
len(a0), len(a1), a0, a1)
285
ca0 := make([]network.Address, len(a0))
287
network.SortAddresses(ca0)
288
ca1 := make([]network.Address, len(a1))
290
network.SortAddresses(ca1)
293
if ca0[i] != ca1[i] {
294
logger.Tracef("address entry at offset %d has a different value for %v != %v",