1
// Copyright 2015 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
4
package storageprovisioner
7
"github.com/juju/errors"
8
"gopkg.in/juju/names.v2"
10
"github.com/juju/juju/apiserver/params"
11
"github.com/juju/juju/instance"
12
"github.com/juju/juju/worker"
13
"github.com/juju/juju/worker/catacomb"
16
// watchMachine starts a machine watcher if there is not already one for the
17
// specified tag. The watcher will notify the worker when the machine changes,
18
// for example when it is provisioned.
19
func watchMachine(ctx *context, tag names.MachineTag) {
20
_, ok := ctx.machines[tag]
24
w, err := newMachineWatcher(ctx.config.Machines, tag, ctx.machineChanges)
26
ctx.kill(errors.Trace(err))
27
} else if err := ctx.addWorker(w); err != nil {
28
ctx.kill(errors.Trace(err))
34
// refreshMachine refreshes the specified machine's instance ID. If it is set,
35
// then the machine watcher is stopped and pending entities' parameters are
36
// updated. If the machine is not provisioned yet, this method is a no-op.
37
func refreshMachine(ctx *context, tag names.MachineTag) error {
38
w, ok := ctx.machines[tag]
40
return errors.Errorf("machine %s is not being watched", tag.Id())
42
stopAndRemove := func() error {
44
delete(ctx.machines, tag)
47
results, err := ctx.config.Machines.InstanceIds([]names.MachineTag{tag})
49
return errors.Annotate(err, "getting machine instance ID")
51
if err := results[0].Error; err != nil {
52
if params.IsCodeNotProvisioned(err) {
54
} else if params.IsCodeNotFound(err) {
55
// Machine is gone, so stop watching.
56
return stopAndRemove()
58
return errors.Annotate(err, "getting machine instance ID")
60
machineProvisioned(ctx, tag, instance.Id(results[0].Result))
61
// machine provisioning is the only thing we care about;
63
return stopAndRemove()
66
// machineProvisioned is called when a watched machine is provisioned.
67
func machineProvisioned(ctx *context, tag names.MachineTag, instanceId instance.Id) {
68
for _, params := range ctx.incompleteVolumeParams {
69
if params.Attachment.Machine != tag || params.Attachment.InstanceId != "" {
72
params.Attachment.InstanceId = instanceId
73
updatePendingVolume(ctx, params)
75
for id, params := range ctx.incompleteVolumeAttachmentParams {
76
if params.Machine != tag || params.InstanceId != "" {
79
params.InstanceId = instanceId
80
updatePendingVolumeAttachment(ctx, id, params)
82
for id, params := range ctx.incompleteFilesystemAttachmentParams {
83
if params.Machine != tag || params.InstanceId != "" {
86
params.InstanceId = instanceId
87
updatePendingFilesystemAttachment(ctx, id, params)
91
type machineWatcher struct {
92
catacomb catacomb.Catacomb
93
accessor MachineAccessor
95
instanceId instance.Id
96
out chan<- names.MachineTag
99
func newMachineWatcher(
100
accessor MachineAccessor,
101
tag names.MachineTag,
102
out chan<- names.MachineTag,
103
) (*machineWatcher, error) {
104
w := &machineWatcher{
109
err := catacomb.Invoke(catacomb.Plan{
114
return nil, errors.Trace(err)
119
func (mw *machineWatcher) loop() error {
120
w, err := mw.accessor.WatchMachine(mw.tag)
122
return errors.Annotate(err, "watching machine")
124
if err := mw.catacomb.Add(w); err != nil {
125
return errors.Trace(err)
127
logger.Debugf("watching machine %s", mw.tag.Id())
128
defer logger.Debugf("finished watching machine %s", mw.tag.Id())
129
var out chan<- names.MachineTag
132
case <-mw.catacomb.Dying():
133
return mw.catacomb.ErrDying()
134
case _, ok := <-w.Changes():
136
return errors.New("machine watcher closed")
145
// Kill is part of the worker.Worker interface.
146
func (mw *machineWatcher) Kill() {
147
mw.catacomb.Kill(nil)
150
// Wait is part of the worker.Worker interface.
151
func (mw *machineWatcher) Wait() error {
152
return mw.catacomb.Wait()