~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/storageprovisioner/machines.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2015 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package storageprovisioner
 
5
 
 
6
import (
 
7
        "github.com/juju/errors"
 
8
        "gopkg.in/juju/names.v2"
 
9
 
 
10
        "github.com/juju/juju/apiserver/params"
 
11
        "github.com/juju/juju/instance"
 
12
        "github.com/juju/juju/worker"
 
13
        "github.com/juju/juju/worker/catacomb"
 
14
)
 
15
 
 
16
// watchMachine starts a machine watcher if there is not already one for the
 
17
// specified tag. The watcher will notify the worker when the machine changes,
 
18
// for example when it is provisioned.
 
19
func watchMachine(ctx *context, tag names.MachineTag) {
 
20
        _, ok := ctx.machines[tag]
 
21
        if ok {
 
22
                return
 
23
        }
 
24
        w, err := newMachineWatcher(ctx.config.Machines, tag, ctx.machineChanges)
 
25
        if err != nil {
 
26
                ctx.kill(errors.Trace(err))
 
27
        } else if err := ctx.addWorker(w); err != nil {
 
28
                ctx.kill(errors.Trace(err))
 
29
        } else {
 
30
                ctx.machines[tag] = w
 
31
        }
 
32
}
 
33
 
 
34
// refreshMachine refreshes the specified machine's instance ID. If it is set,
 
35
// then the machine watcher is stopped and pending entities' parameters are
 
36
// updated. If the machine is not provisioned yet, this method is a no-op.
 
37
func refreshMachine(ctx *context, tag names.MachineTag) error {
 
38
        w, ok := ctx.machines[tag]
 
39
        if !ok {
 
40
                return errors.Errorf("machine %s is not being watched", tag.Id())
 
41
        }
 
42
        stopAndRemove := func() error {
 
43
                worker.Stop(w)
 
44
                delete(ctx.machines, tag)
 
45
                return nil
 
46
        }
 
47
        results, err := ctx.config.Machines.InstanceIds([]names.MachineTag{tag})
 
48
        if err != nil {
 
49
                return errors.Annotate(err, "getting machine instance ID")
 
50
        }
 
51
        if err := results[0].Error; err != nil {
 
52
                if params.IsCodeNotProvisioned(err) {
 
53
                        return nil
 
54
                } else if params.IsCodeNotFound(err) {
 
55
                        // Machine is gone, so stop watching.
 
56
                        return stopAndRemove()
 
57
                }
 
58
                return errors.Annotate(err, "getting machine instance ID")
 
59
        }
 
60
        machineProvisioned(ctx, tag, instance.Id(results[0].Result))
 
61
        // machine provisioning is the only thing we care about;
 
62
        // stop the watcher.
 
63
        return stopAndRemove()
 
64
}
 
65
 
 
66
// machineProvisioned is called when a watched machine is provisioned.
 
67
func machineProvisioned(ctx *context, tag names.MachineTag, instanceId instance.Id) {
 
68
        for _, params := range ctx.incompleteVolumeParams {
 
69
                if params.Attachment.Machine != tag || params.Attachment.InstanceId != "" {
 
70
                        continue
 
71
                }
 
72
                params.Attachment.InstanceId = instanceId
 
73
                updatePendingVolume(ctx, params)
 
74
        }
 
75
        for id, params := range ctx.incompleteVolumeAttachmentParams {
 
76
                if params.Machine != tag || params.InstanceId != "" {
 
77
                        continue
 
78
                }
 
79
                params.InstanceId = instanceId
 
80
                updatePendingVolumeAttachment(ctx, id, params)
 
81
        }
 
82
        for id, params := range ctx.incompleteFilesystemAttachmentParams {
 
83
                if params.Machine != tag || params.InstanceId != "" {
 
84
                        continue
 
85
                }
 
86
                params.InstanceId = instanceId
 
87
                updatePendingFilesystemAttachment(ctx, id, params)
 
88
        }
 
89
}
 
90
 
 
91
type machineWatcher struct {
 
92
        catacomb   catacomb.Catacomb
 
93
        accessor   MachineAccessor
 
94
        tag        names.MachineTag
 
95
        instanceId instance.Id
 
96
        out        chan<- names.MachineTag
 
97
}
 
98
 
 
99
func newMachineWatcher(
 
100
        accessor MachineAccessor,
 
101
        tag names.MachineTag,
 
102
        out chan<- names.MachineTag,
 
103
) (*machineWatcher, error) {
 
104
        w := &machineWatcher{
 
105
                accessor: accessor,
 
106
                tag:      tag,
 
107
                out:      out,
 
108
        }
 
109
        err := catacomb.Invoke(catacomb.Plan{
 
110
                Site: &w.catacomb,
 
111
                Work: w.loop,
 
112
        })
 
113
        if err != nil {
 
114
                return nil, errors.Trace(err)
 
115
        }
 
116
        return w, nil
 
117
}
 
118
 
 
119
func (mw *machineWatcher) loop() error {
 
120
        w, err := mw.accessor.WatchMachine(mw.tag)
 
121
        if err != nil {
 
122
                return errors.Annotate(err, "watching machine")
 
123
        }
 
124
        if err := mw.catacomb.Add(w); err != nil {
 
125
                return errors.Trace(err)
 
126
        }
 
127
        logger.Debugf("watching machine %s", mw.tag.Id())
 
128
        defer logger.Debugf("finished watching machine %s", mw.tag.Id())
 
129
        var out chan<- names.MachineTag
 
130
        for {
 
131
                select {
 
132
                case <-mw.catacomb.Dying():
 
133
                        return mw.catacomb.ErrDying()
 
134
                case _, ok := <-w.Changes():
 
135
                        if !ok {
 
136
                                return errors.New("machine watcher closed")
 
137
                        }
 
138
                        out = mw.out
 
139
                case out <- mw.tag:
 
140
                        out = nil
 
141
                }
 
142
        }
 
143
}
 
144
 
 
145
// Kill is part of the worker.Worker interface.
 
146
func (mw *machineWatcher) Kill() {
 
147
        mw.catacomb.Kill(nil)
 
148
}
 
149
 
 
150
// Wait is part of the worker.Worker interface.
 
151
func (mw *machineWatcher) Wait() error {
 
152
        return mw.catacomb.Wait()
 
153
}