~juju-qa/ubuntu/yakkety/juju/2.0-rc3-again

« back to all changes in this revision

Viewing changes to src/launchpad.net/juju-core/worker/provisioner/provisioner.go

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2013-04-24 22:34:47 UTC
  • Revision ID: package-import@ubuntu.com-20130424223447-f0qdji7ubnyo0s71
Tags: upstream-1.10.0.1
ImportĀ upstreamĀ versionĀ 1.10.0.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
package provisioner
 
2
 
 
3
import (
 
4
        "errors"
 
5
        "fmt"
 
6
        "launchpad.net/juju-core/environs"
 
7
        "launchpad.net/juju-core/environs/config"
 
8
        "launchpad.net/juju-core/log"
 
9
        "launchpad.net/juju-core/state"
 
10
        "launchpad.net/juju-core/state/api"
 
11
        "launchpad.net/juju-core/state/api/params"
 
12
        "launchpad.net/juju-core/state/watcher"
 
13
        "launchpad.net/juju-core/utils"
 
14
        "launchpad.net/juju-core/worker"
 
15
        "launchpad.net/tomb"
 
16
        "sync"
 
17
)
 
18
 
 
19
// Provisioner represents a running provisioning worker.
 
20
type Provisioner struct {
 
21
        st        *state.State
 
22
        machineId string // Which machine runs the provisioner.
 
23
        stateInfo *state.Info
 
24
        apiInfo   *api.Info
 
25
        environ   environs.Environ
 
26
        tomb      tomb.Tomb
 
27
 
 
28
        // machine.Id => environs.Instance
 
29
        instances map[string]environs.Instance
 
30
        // instance.Id => machine id
 
31
        machines map[state.InstanceId]string
 
32
 
 
33
        configObserver
 
34
}
 
35
 
 
36
type configObserver struct {
 
37
        sync.Mutex
 
38
        observer chan<- *config.Config
 
39
}
 
40
 
 
41
// nofity notifies the observer of a configuration change.
 
42
func (o *configObserver) notify(cfg *config.Config) {
 
43
        o.Lock()
 
44
        if o.observer != nil {
 
45
                o.observer <- cfg
 
46
        }
 
47
        o.Unlock()
 
48
}
 
49
 
 
50
// NewProvisioner returns a new Provisioner. When new machines
 
51
// are added to the state, it allocates instances from the environment
 
52
// and allocates them to the new machines.
 
53
func NewProvisioner(st *state.State, machineId string) *Provisioner {
 
54
        p := &Provisioner{
 
55
                st:        st,
 
56
                machineId: machineId,
 
57
                instances: make(map[string]environs.Instance),
 
58
                machines:  make(map[state.InstanceId]string),
 
59
        }
 
60
        go func() {
 
61
                defer p.tomb.Done()
 
62
                p.tomb.Kill(p.loop())
 
63
        }()
 
64
        return p
 
65
}
 
66
 
 
67
func (p *Provisioner) loop() error {
 
68
        environWatcher := p.st.WatchEnvironConfig()
 
69
        defer watcher.Stop(environWatcher, &p.tomb)
 
70
 
 
71
        var err error
 
72
        p.environ, err = worker.WaitForEnviron(environWatcher, p.tomb.Dying())
 
73
        if err != nil {
 
74
                return err
 
75
        }
 
76
 
 
77
        // Get a new StateInfo from the environment: the one used to
 
78
        // launch the agent may refer to localhost, which will be
 
79
        // unhelpful when attempting to run an agent on a new machine.
 
80
        if p.stateInfo, p.apiInfo, err = p.environ.StateInfo(); err != nil {
 
81
                return err
 
82
        }
 
83
 
 
84
        // Call processMachines to stop any unknown instances before watching machines.
 
85
        if err := p.processMachines(nil); err != nil {
 
86
                return err
 
87
        }
 
88
 
 
89
        // Start responding to changes in machines, and to any further updates
 
90
        // to the environment config.
 
91
        machinesWatcher := p.st.WatchMachines()
 
92
        defer watcher.Stop(machinesWatcher, &p.tomb)
 
93
        for {
 
94
                select {
 
95
                case <-p.tomb.Dying():
 
96
                        return tomb.ErrDying
 
97
                case cfg, ok := <-environWatcher.Changes():
 
98
                        if !ok {
 
99
                                return watcher.MustErr(environWatcher)
 
100
                        }
 
101
                        if err := p.setConfig(cfg); err != nil {
 
102
                                log.Errorf("worker/provisioner: loaded invalid environment configuration: %v", err)
 
103
                        }
 
104
                case ids, ok := <-machinesWatcher.Changes():
 
105
                        if !ok {
 
106
                                return watcher.MustErr(machinesWatcher)
 
107
                        }
 
108
                        // TODO(dfc; lp:1042717) fire process machines periodically to shut down unknown
 
109
                        // instances.
 
110
                        if err := p.processMachines(ids); err != nil {
 
111
                                return err
 
112
                        }
 
113
                }
 
114
        }
 
115
        panic("not reached")
 
116
}
 
117
 
 
118
// setConfig updates the environment configuration and notifies
 
119
// the config observer.
 
120
func (p *Provisioner) setConfig(config *config.Config) error {
 
121
        if err := p.environ.SetConfig(config); err != nil {
 
122
                return err
 
123
        }
 
124
        p.configObserver.notify(config)
 
125
        return nil
 
126
}
 
127
 
 
128
// Err returns the reason why the Provisioner has stopped or tomb.ErrStillAlive
 
129
// when it is still alive.
 
130
func (p *Provisioner) Err() (reason error) {
 
131
        return p.tomb.Err()
 
132
}
 
133
 
 
134
// Wait waits for the Provisioner to exit.
 
135
func (p *Provisioner) Wait() error {
 
136
        return p.tomb.Wait()
 
137
}
 
138
 
 
139
func (p *Provisioner) String() string {
 
140
        return "provisioning worker"
 
141
}
 
142
 
 
143
// Stop stops the Provisioner and returns any error encountered while
 
144
// provisioning.
 
145
func (p *Provisioner) Stop() error {
 
146
        p.tomb.Kill(nil)
 
147
        return p.tomb.Wait()
 
148
}
 
149
 
 
150
func (p *Provisioner) processMachines(ids []string) error {
 
151
        // Find machines without an instance id or that are dead
 
152
        pending, dead, err := p.pendingOrDead(ids)
 
153
        if err != nil {
 
154
                return err
 
155
        }
 
156
 
 
157
        // Find running instances that have no machines associated
 
158
        unknown, err := p.findUnknownInstances()
 
159
        if err != nil {
 
160
                return err
 
161
        }
 
162
 
 
163
        // Stop all machines that are dead
 
164
        stopping, err := p.instancesForMachines(dead)
 
165
        if err != nil {
 
166
                return err
 
167
        }
 
168
 
 
169
        // It's important that we stop unknown instances before starting
 
170
        // pending ones, because if we start an instance and then fail to
 
171
        // set its InstanceId on the machine we don't want to start a new
 
172
        // instance for the same machine ID.
 
173
        if err := p.stopInstances(append(stopping, unknown...)); err != nil {
 
174
                return err
 
175
        }
 
176
 
 
177
        // Start an instance for the pending ones
 
178
        return p.startMachines(pending)
 
179
}
 
180
 
 
181
// findUnknownInstances finds instances which are not associated with a machine.
 
182
func (p *Provisioner) findUnknownInstances() ([]environs.Instance, error) {
 
183
        all, err := p.environ.AllInstances()
 
184
        if err != nil {
 
185
                return nil, err
 
186
        }
 
187
        instances := make(map[state.InstanceId]environs.Instance)
 
188
        for _, i := range all {
 
189
                instances[i.Id()] = i
 
190
        }
 
191
        // TODO(dfc) this is very inefficient.
 
192
        machines, err := p.st.AllMachines()
 
193
        if err != nil {
 
194
                return nil, err
 
195
        }
 
196
        for _, m := range machines {
 
197
                if instId, ok := m.InstanceId(); ok {
 
198
                        delete(instances, instId)
 
199
                }
 
200
        }
 
201
        var unknown []environs.Instance
 
202
        for _, i := range instances {
 
203
                unknown = append(unknown, i)
 
204
        }
 
205
        return unknown, nil
 
206
}
 
207
 
 
208
// pendingOrDead looks up machines with ids and retuns those that do not
 
209
// have an instance id assigned yet, and also those that are dead.
 
210
func (p *Provisioner) pendingOrDead(ids []string) (pending, dead []*state.Machine, err error) {
 
211
        // TODO(niemeyer): ms, err := st.Machines(alive)
 
212
        for _, id := range ids {
 
213
                m, err := p.st.Machine(id)
 
214
                if state.IsNotFound(err) {
 
215
                        log.Infof("worker/provisioner: machine %q not found in state", m)
 
216
                        continue
 
217
                }
 
218
                if err != nil {
 
219
                        return nil, nil, err
 
220
                }
 
221
                switch m.Life() {
 
222
                case state.Dying:
 
223
                        if _, ok := m.InstanceId(); ok {
 
224
                                continue
 
225
                        }
 
226
                        log.Infof("worker/provisioner: killing dying, unprovisioned machine %q", m)
 
227
                        if err := m.EnsureDead(); err != nil {
 
228
                                return nil, nil, err
 
229
                        }
 
230
                        fallthrough
 
231
                case state.Dead:
 
232
                        dead = append(dead, m)
 
233
                        log.Infof("worker/provisioner: removing dead machine %q", m)
 
234
                        if err := m.Remove(); err != nil {
 
235
                                return nil, nil, err
 
236
                        }
 
237
                        continue
 
238
                }
 
239
                if instId, hasInstId := m.InstanceId(); !hasInstId {
 
240
                        status, _, err := m.Status()
 
241
                        if err != nil {
 
242
                                log.Infof("worker/provisioner: cannot get machine %q status: %v", m, err)
 
243
                                continue
 
244
                        }
 
245
                        if status == params.StatusPending {
 
246
                                pending = append(pending, m)
 
247
                                log.Infof("worker/provisioner: found machine %q pending provisioning", m)
 
248
                                continue
 
249
                        }
 
250
                } else {
 
251
                        log.Infof("worker/provisioner: machine %v already started as instance %q", m, instId)
 
252
                }
 
253
        }
 
254
        return
 
255
}
 
256
 
 
257
func (p *Provisioner) startMachines(machines []*state.Machine) error {
 
258
        for _, m := range machines {
 
259
                if err := p.startMachine(m); err != nil {
 
260
                        return fmt.Errorf("cannot start machine %v: %v", m, err)
 
261
                }
 
262
        }
 
263
        return nil
 
264
}
 
265
 
 
266
func (p *Provisioner) startMachine(m *state.Machine) error {
 
267
        // TODO(dfc) the state.Info passed to environ.StartInstance remains contentious
 
268
        // however as the PA only knows one state.Info, and that info is used by MAs and
 
269
        // UAs to locate the state for this environment, it is logical to use the same
 
270
        // state.Info as the PA.
 
271
        stateInfo, apiInfo, err := p.setupAuthentication(m)
 
272
        if err != nil {
 
273
                return err
 
274
        }
 
275
        cons, err := m.Constraints()
 
276
        if err != nil {
 
277
                return err
 
278
        }
 
279
        // Generate a unique nonce for the new instance.
 
280
        uuid, err := utils.NewUUID()
 
281
        if err != nil {
 
282
                return err
 
283
        }
 
284
        // Generated nonce has the format: "machine-#:UUID". The first
 
285
        // part is a badge, specifying the tag of the machine the provisioner
 
286
        // is running on, while the second part is a random UUID.
 
287
        nonce := fmt.Sprintf("%s:%s", state.MachineTag(p.machineId), uuid.String())
 
288
        inst, err := p.environ.StartInstance(m.Id(), nonce, m.Series(), cons, stateInfo, apiInfo)
 
289
        if err != nil {
 
290
                // Set the state to error, so the machine will be skipped next
 
291
                // time until the error is resolved, but don't return an
 
292
                // error; just keep going with the other machines.
 
293
                log.Errorf("worker/provisioner: cannot start instance for machine %q: %v", m, err)
 
294
                if err1 := m.SetStatus(params.StatusError, err.Error()); err1 != nil {
 
295
                        // Something is wrong with this machine, better report it back.
 
296
                        log.Errorf("worker/provisioner: cannot set error status for machine %q: %v", m, err1)
 
297
                        return err1
 
298
                }
 
299
                return nil
 
300
        }
 
301
        if err := m.SetProvisioned(inst.Id(), nonce); err != nil {
 
302
                // The machine is started, but we can't record the mapping in
 
303
                // state. It'll keep running while we fail out and restart,
 
304
                // but will then be detected by findUnknownInstances and
 
305
                // killed again.
 
306
                //
 
307
                // TODO(dimitern) Stop the instance right away here.
 
308
                //
 
309
                // Multiple instantiations of a given machine (with the same
 
310
                // machine ID) cannot coexist, because findUnknownInstances is
 
311
                // called before startMachines. However, if the first machine
 
312
                // had started to do work before being replaced, we may
 
313
                // encounter surprising problems.
 
314
                return err
 
315
        }
 
316
        // populate the local cache
 
317
        p.instances[m.Id()] = inst
 
318
        p.machines[inst.Id()] = m.Id()
 
319
        log.Noticef("worker/provisioner: started machine %s as instance %s", m, inst.Id())
 
320
        return nil
 
321
}
 
322
 
 
323
func (p *Provisioner) setupAuthentication(m *state.Machine) (*state.Info, *api.Info, error) {
 
324
        password, err := utils.RandomPassword()
 
325
        if err != nil {
 
326
                return nil, nil, fmt.Errorf("cannot make password for machine %v: %v", m, err)
 
327
        }
 
328
        if err := m.SetMongoPassword(password); err != nil {
 
329
                return nil, nil, fmt.Errorf("cannot set password for machine %v: %v", m, err)
 
330
        }
 
331
        stateInfo := *p.stateInfo
 
332
        stateInfo.Tag = m.Tag()
 
333
        stateInfo.Password = password
 
334
        apiInfo := *p.apiInfo
 
335
        apiInfo.Tag = m.Tag()
 
336
        apiInfo.Password = password
 
337
        return &stateInfo, &apiInfo, nil
 
338
}
 
339
 
 
340
func (p *Provisioner) stopInstances(instances []environs.Instance) error {
 
341
        // Although calling StopInstance with an empty slice should produce no change in the
 
342
        // provider, environs like dummy do not consider this a noop.
 
343
        if len(instances) == 0 {
 
344
                return nil
 
345
        }
 
346
        if err := p.environ.StopInstances(instances); err != nil {
 
347
                return err
 
348
        }
 
349
 
 
350
        // cleanup cache
 
351
        for _, i := range instances {
 
352
                if id, ok := p.machines[i.Id()]; ok {
 
353
                        delete(p.machines, i.Id())
 
354
                        delete(p.instances, id)
 
355
                }
 
356
        }
 
357
        return nil
 
358
}
 
359
 
 
360
var errNotProvisioned = errors.New("machine has no instance id set")
 
361
 
 
362
// instanceForMachine returns the environs.Instance that represents this machine's instance.
 
363
func (p *Provisioner) instanceForMachine(m *state.Machine) (environs.Instance, error) {
 
364
        inst, ok := p.instances[m.Id()]
 
365
        if ok {
 
366
                return inst, nil
 
367
        }
 
368
        instId, ok := m.InstanceId()
 
369
        if !ok {
 
370
                return nil, errNotProvisioned
 
371
        }
 
372
        // TODO(dfc): Ask for all instances at once.
 
373
        insts, err := p.environ.Instances([]state.InstanceId{instId})
 
374
        if err != nil {
 
375
                return nil, err
 
376
        }
 
377
        inst = insts[0]
 
378
        return inst, nil
 
379
}
 
380
 
 
381
// instancesForMachines returns a list of environs.Instance that represent
 
382
// the list of machines running in the provider. Missing machines are
 
383
// omitted from the list.
 
384
func (p *Provisioner) instancesForMachines(ms []*state.Machine) ([]environs.Instance, error) {
 
385
        var insts []environs.Instance
 
386
        for _, m := range ms {
 
387
                switch inst, err := p.instanceForMachine(m); err {
 
388
                case nil:
 
389
                        insts = append(insts, inst)
 
390
                case errNotProvisioned, environs.ErrNoInstances:
 
391
                default:
 
392
                        return nil, err
 
393
                }
 
394
        }
 
395
        return insts, nil
 
396
}