1
// Copyright 2015 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
4
package storageprovisioner
7
"github.com/juju/errors"
8
"github.com/juju/names"
11
"github.com/juju/juju/apiserver/params"
12
"github.com/juju/juju/instance"
13
"github.com/juju/juju/state/watcher"
16
// watchMachine starts a machine watcher if there is not already one for the
17
// specified tag. The watcher will notify the worker when the machine changes,
18
// for example when it is provisioned.
19
func watchMachine(ctx *context, tag names.MachineTag) {
20
_, ok := ctx.machines[tag]
24
w := newMachineWatcher(ctx.machineAccessor, tag, ctx.machineChanges)
28
// refreshMachine refreshes the specified machine's instance ID. If it is set,
29
// then the machine watcher is stopped and pending entities' parameters are
30
// updated. If the machine is not provisioned yet, this method is a no-op.
31
func refreshMachine(ctx *context, tag names.MachineTag) error {
32
w, ok := ctx.machines[tag]
34
return errors.Errorf("machine %s is not being watched", tag.Id())
36
stopAndRemove := func() error {
37
if err := w.stop(); err != nil {
38
return errors.Annotate(err, "stopping machine watcher")
40
delete(ctx.machines, tag)
43
results, err := ctx.machineAccessor.InstanceIds([]names.MachineTag{tag})
45
return errors.Annotate(err, "getting machine instance ID")
47
if err := results[0].Error; err != nil {
48
if params.IsCodeNotProvisioned(err) {
50
} else if params.IsCodeNotFound(err) {
51
// Machine is gone, so stop watching.
52
return stopAndRemove()
54
return errors.Annotate(err, "getting machine instance ID")
56
machineProvisioned(ctx, tag, instance.Id(results[0].Result))
57
// machine provisioning is the only thing we care about;
59
return stopAndRemove()
62
// machineProvisioned is called when a watched machine is provisioned.
63
func machineProvisioned(ctx *context, tag names.MachineTag, instanceId instance.Id) {
64
for _, params := range ctx.pendingVolumes {
65
if params.Attachment.Machine != tag || params.Attachment.InstanceId != "" {
68
params.Attachment.InstanceId = instanceId
70
for id, params := range ctx.pendingVolumeAttachments {
71
if params.Machine != tag || params.InstanceId != "" {
74
params.InstanceId = instanceId
75
ctx.pendingVolumeAttachments[id] = params
77
for id, params := range ctx.pendingFilesystemAttachments {
78
if params.Machine != tag || params.InstanceId != "" {
81
params.InstanceId = instanceId
82
ctx.pendingFilesystemAttachments[id] = params
86
type machineWatcher struct {
88
accessor MachineAccessor
90
instanceId instance.Id
91
out chan<- names.MachineTag
94
func newMachineWatcher(
95
accessor MachineAccessor,
97
out chan<- names.MachineTag,
106
w.tomb.Kill(w.loop())
111
func (mw *machineWatcher) stop() error {
113
return mw.tomb.Wait()
116
func (mw *machineWatcher) loop() error {
117
w, err := mw.accessor.WatchMachine(mw.tag)
119
return errors.Annotate(err, "watching machine")
121
logger.Debugf("watching machine %s", mw.tag.Id())
122
defer logger.Debugf("finished watching machine %s", mw.tag.Id())
123
var out chan<- names.MachineTag
126
case <-mw.tomb.Dying():
128
case _, ok := <-w.Changes():
130
return watcher.EnsureErr(w)