~ubuntu-branches/ubuntu/vivid/juju-core/vivid-proposed

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/storageprovisioner/machines.go

  • Committer: Package Import Robot
  • Author(s): Curtis C. Hovey
  • Date: 2015-09-29 19:43:29 UTC
  • mfrom: (47.1.4 wily-proposed)
  • Revision ID: package-import@ubuntu.com-20150929194329-9y496tbic30hc7vp
Tags: 1.24.6-0ubuntu1~15.04.1
Backport of 1.24.6 from wily. (LP: #1500916, #1497087)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2015 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package storageprovisioner
 
5
 
 
6
import (
 
7
        "github.com/juju/errors"
 
8
        "github.com/juju/names"
 
9
        "launchpad.net/tomb"
 
10
 
 
11
        "github.com/juju/juju/apiserver/params"
 
12
        "github.com/juju/juju/instance"
 
13
        "github.com/juju/juju/state/watcher"
 
14
)
 
15
 
 
16
// watchMachine starts a machine watcher if there is not already one for the
 
17
// specified tag. The watcher will notify the worker when the machine changes,
 
18
// for example when it is provisioned.
 
19
func watchMachine(ctx *context, tag names.MachineTag) {
 
20
        _, ok := ctx.machines[tag]
 
21
        if ok {
 
22
                return
 
23
        }
 
24
        w := newMachineWatcher(ctx.machineAccessor, tag, ctx.machineChanges)
 
25
        ctx.machines[tag] = w
 
26
}
 
27
 
 
28
// refreshMachine refreshes the specified machine's instance ID. If it is set,
 
29
// then the machine watcher is stopped and pending entities' parameters are
 
30
// updated. If the machine is not provisioned yet, this method is a no-op.
 
31
func refreshMachine(ctx *context, tag names.MachineTag) error {
 
32
        w, ok := ctx.machines[tag]
 
33
        if !ok {
 
34
                return errors.Errorf("machine %s is not being watched", tag.Id())
 
35
        }
 
36
        stopAndRemove := func() error {
 
37
                if err := w.stop(); err != nil {
 
38
                        return errors.Annotate(err, "stopping machine watcher")
 
39
                }
 
40
                delete(ctx.machines, tag)
 
41
                return nil
 
42
        }
 
43
        results, err := ctx.machineAccessor.InstanceIds([]names.MachineTag{tag})
 
44
        if err != nil {
 
45
                return errors.Annotate(err, "getting machine instance ID")
 
46
        }
 
47
        if err := results[0].Error; err != nil {
 
48
                if params.IsCodeNotProvisioned(err) {
 
49
                        return nil
 
50
                } else if params.IsCodeNotFound(err) {
 
51
                        // Machine is gone, so stop watching.
 
52
                        return stopAndRemove()
 
53
                }
 
54
                return errors.Annotate(err, "getting machine instance ID")
 
55
        }
 
56
        machineProvisioned(ctx, tag, instance.Id(results[0].Result))
 
57
        // machine provisioning is the only thing we care about;
 
58
        // stop the watcher.
 
59
        return stopAndRemove()
 
60
}
 
61
 
 
62
// machineProvisioned is called when a watched machine is provisioned.
 
63
func machineProvisioned(ctx *context, tag names.MachineTag, instanceId instance.Id) {
 
64
        for _, params := range ctx.pendingVolumes {
 
65
                if params.Attachment.Machine != tag || params.Attachment.InstanceId != "" {
 
66
                        continue
 
67
                }
 
68
                params.Attachment.InstanceId = instanceId
 
69
        }
 
70
        for id, params := range ctx.pendingVolumeAttachments {
 
71
                if params.Machine != tag || params.InstanceId != "" {
 
72
                        continue
 
73
                }
 
74
                params.InstanceId = instanceId
 
75
                ctx.pendingVolumeAttachments[id] = params
 
76
        }
 
77
        for id, params := range ctx.pendingFilesystemAttachments {
 
78
                if params.Machine != tag || params.InstanceId != "" {
 
79
                        continue
 
80
                }
 
81
                params.InstanceId = instanceId
 
82
                ctx.pendingFilesystemAttachments[id] = params
 
83
        }
 
84
}
 
85
 
 
86
type machineWatcher struct {
 
87
        tomb       tomb.Tomb
 
88
        accessor   MachineAccessor
 
89
        tag        names.MachineTag
 
90
        instanceId instance.Id
 
91
        out        chan<- names.MachineTag
 
92
}
 
93
 
 
94
func newMachineWatcher(
 
95
        accessor MachineAccessor,
 
96
        tag names.MachineTag,
 
97
        out chan<- names.MachineTag,
 
98
) *machineWatcher {
 
99
        w := &machineWatcher{
 
100
                accessor: accessor,
 
101
                tag:      tag,
 
102
                out:      out,
 
103
        }
 
104
        go func() {
 
105
                defer w.tomb.Done()
 
106
                w.tomb.Kill(w.loop())
 
107
        }()
 
108
        return w
 
109
}
 
110
 
 
111
func (mw *machineWatcher) stop() error {
 
112
        mw.tomb.Kill(nil)
 
113
        return mw.tomb.Wait()
 
114
}
 
115
 
 
116
func (mw *machineWatcher) loop() error {
 
117
        w, err := mw.accessor.WatchMachine(mw.tag)
 
118
        if err != nil {
 
119
                return errors.Annotate(err, "watching machine")
 
120
        }
 
121
        logger.Debugf("watching machine %s", mw.tag.Id())
 
122
        defer logger.Debugf("finished watching machine %s", mw.tag.Id())
 
123
        var out chan<- names.MachineTag
 
124
        for {
 
125
                select {
 
126
                case <-mw.tomb.Dying():
 
127
                        return tomb.ErrDying
 
128
                case _, ok := <-w.Changes():
 
129
                        if !ok {
 
130
                                return watcher.EnsureErr(w)
 
131
                        }
 
132
                        out = mw.out
 
133
                case out <- mw.tag:
 
134
                        out = nil
 
135
                }
 
136
        }
 
137
}