~ubuntu-branches/ubuntu/wily/juju-core/wily

« back to all changes in this revision

Viewing changes to src/launchpad.net/juju-core/cmd/jujud/machine.go

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-02-28 16:53:15 UTC
  • mfrom: (1.1.19)
  • Revision ID: package-import@ubuntu.com-20140228165315-g8n1ds0jrtekhxq6
Tags: 1.17.4-0ubuntu1
* New upstream point release (LP: #1261628):
  - https://launchpad.net/juju-core/trunk/1.17.4
  - d/control: Prefer juju-mongodb over mongodb-server for juju-local
    package.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
        "launchpad.net/juju-core/cmd"
19
19
        "launchpad.net/juju-core/container/kvm"
20
20
        "launchpad.net/juju-core/instance"
21
 
        "launchpad.net/juju-core/log/syslog"
22
21
        "launchpad.net/juju-core/names"
23
22
        "launchpad.net/juju-core/provider"
24
23
        "launchpad.net/juju-core/state"
27
26
        "launchpad.net/juju-core/state/api/params"
28
27
        apiprovisioner "launchpad.net/juju-core/state/api/provisioner"
29
28
        "launchpad.net/juju-core/state/apiserver"
 
29
        "launchpad.net/juju-core/upgrades"
30
30
        "launchpad.net/juju-core/upstart"
 
31
        "launchpad.net/juju-core/version"
31
32
        "launchpad.net/juju-core/worker"
32
33
        "launchpad.net/juju-core/worker/authenticationworker"
33
34
        "launchpad.net/juju-core/worker/charmrevisionworker"
42
43
        "launchpad.net/juju-core/worker/minunitsworker"
43
44
        "launchpad.net/juju-core/worker/provisioner"
44
45
        "launchpad.net/juju-core/worker/resumer"
 
46
        "launchpad.net/juju-core/worker/rsyslog"
45
47
        "launchpad.net/juju-core/worker/terminationworker"
46
48
        "launchpad.net/juju-core/worker/upgrader"
47
49
)
61
63
// MachineAgent is a cmd.Command responsible for running a machine agent.
62
64
type MachineAgent struct {
63
65
        cmd.CommandBase
64
 
        tomb      tomb.Tomb
65
 
        Conf      AgentConf
66
 
        MachineId string
67
 
        runner    worker.Runner
 
66
        tomb            tomb.Tomb
 
67
        Conf            AgentConf
 
68
        MachineId       string
 
69
        runner          worker.Runner
 
70
        upgradeComplete chan struct{}
68
71
}
69
72
 
70
73
// Info returns usage information for the command.
89
92
                return err
90
93
        }
91
94
        a.runner = newRunner(isFatal, moreImportant)
 
95
        a.upgradeComplete = make(chan struct{})
92
96
        return nil
93
97
}
94
98
 
111
115
        // lines of all logging in the log file.
112
116
        loggo.RemoveWriter("logfile")
113
117
        defer a.tomb.Done()
114
 
        logger.Infof("machine agent %v start", a.Tag())
 
118
        logger.Infof("machine agent %v start (%s)", a.Tag(), version.Current)
115
119
        if err := a.Conf.read(a.Tag()); err != nil {
116
120
                return err
117
121
        }
174
178
                        break
175
179
                }
176
180
        }
 
181
        rsyslogMode := rsyslog.RsyslogModeForwarding
 
182
        for _, job := range entity.Jobs() {
 
183
                if job == params.JobManageEnviron {
 
184
                        rsyslogMode = rsyslog.RsyslogModeAccumulate
 
185
                        break
 
186
                }
 
187
        }
 
188
 
177
189
        runner := newRunner(connectionIsFatal(st), moreImportant)
178
 
        runner.StartWorker("machiner", func() (worker.Worker, error) {
179
 
                return machiner.NewMachiner(st.Machiner(), agentConfig), nil
180
 
        })
 
190
 
 
191
        // Run the upgrader and the upgrade-steps worker without waiting for the upgrade steps to complete.
181
192
        runner.StartWorker("upgrader", func() (worker.Worker, error) {
182
193
                return upgrader.NewUpgrader(st.Upgrader(), agentConfig), nil
183
194
        })
184
 
        runner.StartWorker("logger", func() (worker.Worker, error) {
 
195
        runner.StartWorker("upgrade-steps", func() (worker.Worker, error) {
 
196
                return a.upgradeWorker(st, entity.Jobs()), nil
 
197
        })
 
198
 
 
199
        // All other workers must wait for the upgrade steps to complete before starting.
 
200
        a.startWorkerAfterUpgrade(runner, "machiner", func() (worker.Worker, error) {
 
201
                return machiner.NewMachiner(st.Machiner(), agentConfig), nil
 
202
        })
 
203
        a.startWorkerAfterUpgrade(runner, "logger", func() (worker.Worker, error) {
185
204
                return workerlogger.NewLogger(st.Logger(), agentConfig), nil
186
205
        })
187
 
        runner.StartWorker("machineenvironmentworker", func() (worker.Worker, error) {
 
206
        a.startWorkerAfterUpgrade(runner, "machineenvironmentworker", func() (worker.Worker, error) {
188
207
                return machineenvironmentworker.NewMachineEnvironmentWorker(st.Environment(), agentConfig), nil
189
208
        })
 
209
        a.startWorkerAfterUpgrade(runner, "rsyslog", func() (worker.Worker, error) {
 
210
                return newRsyslogConfigWorker(st.Rsyslog(), agentConfig, rsyslogMode)
 
211
        })
190
212
 
191
213
        // If not a local provider bootstrap machine, start the worker to manage SSH keys.
192
214
        providerType := agentConfig.Value(agent.ProviderType)
193
215
        if providerType != provider.Local || a.MachineId != bootstrapMachineId {
194
 
                runner.StartWorker("authenticationworker", func() (worker.Worker, error) {
 
216
                a.startWorkerAfterUpgrade(runner, "authenticationworker", func() (worker.Worker, error) {
195
217
                        return authenticationworker.NewWorker(st.KeyUpdater(), agentConfig), nil
196
218
                })
197
219
        }
203
225
        for _, job := range entity.Jobs() {
204
226
                switch job {
205
227
                case params.JobHostUnits:
206
 
                        runner.StartWorker("deployer", func() (worker.Worker, error) {
 
228
                        a.startWorkerAfterUpgrade(runner, "deployer", func() (worker.Worker, error) {
207
229
                                apiDeployer := st.Deployer()
208
230
                                context := newDeployContext(apiDeployer, agentConfig)
209
231
                                return deployer.NewDeployer(apiDeployer, context), nil
210
232
                        })
211
233
                case params.JobManageEnviron:
212
 
                        runner.StartWorker("environ-provisioner", func() (worker.Worker, error) {
 
234
                        a.startWorkerAfterUpgrade(runner, "environ-provisioner", func() (worker.Worker, error) {
213
235
                                return provisioner.NewEnvironProvisioner(st.Provisioner(), agentConfig), nil
214
236
                        })
215
237
                        // TODO(axw) 2013-09-24 bug #1229506
216
238
                        // Make another job to enable the firewaller. Not all environments
217
239
                        // are capable of managing ports centrally.
218
 
                        runner.StartWorker("firewaller", func() (worker.Worker, error) {
 
240
                        a.startWorkerAfterUpgrade(runner, "firewaller", func() (worker.Worker, error) {
219
241
                                return firewaller.NewFirewaller(st.Firewaller())
220
242
                        })
221
 
                        runner.StartWorker("charm-revision-updater", func() (worker.Worker, error) {
 
243
                        a.startWorkerAfterUpgrade(runner, "charm-revision-updater", func() (worker.Worker, error) {
222
244
                                return charmrevisionworker.NewRevisionUpdateWorker(st.CharmRevisionUpdater()), nil
223
245
                        })
224
246
                case params.JobManageStateDeprecated:
274
296
        // Start the watcher to fire when a container is first requested on the machine.
275
297
        watcherName := fmt.Sprintf("%s-container-watcher", machine.Id())
276
298
        handler := provisioner.NewContainerSetupHandler(runner, watcherName, containers, machine, pr, a.Conf.config)
277
 
        runner.StartWorker(watcherName, func() (worker.Worker, error) {
 
299
        a.startWorkerAfterUpgrade(runner, watcherName, func() (worker.Worker, error) {
278
300
                return worker.NewStringsWorker(handler), nil
279
301
        })
280
302
        return nil
296
318
        // the storage provider on one machine, and that is the "bootstrap" node.
297
319
        providerType := agentConfig.Value(agent.ProviderType)
298
320
        if (providerType == provider.Local || provider.IsManual(providerType)) && m.Id() == bootstrapMachineId {
299
 
                runner.StartWorker("local-storage", func() (worker.Worker, error) {
 
321
                a.startWorkerAfterUpgrade(runner, "local-storage", func() (worker.Worker, error) {
300
322
                        // TODO(axw) 2013-09-24 bug #1229507
301
323
                        // Make another job to enable storage.
302
324
                        // There's nothing special about this.
308
330
                case state.JobHostUnits:
309
331
                        // Implemented in APIWorker.
310
332
                case state.JobManageEnviron:
311
 
                        runner.StartWorker("instancepoller", func() (worker.Worker, error) {
 
333
                        a.startWorkerAfterUpgrade(runner, "instancepoller", func() (worker.Worker, error) {
312
334
                                return instancepoller.NewWorker(st), nil
313
335
                        })
314
336
                        runner.StartWorker("apiserver", func() (worker.Worker, error) {
325
347
                                dataDir := a.Conf.config.DataDir()
326
348
                                return apiserver.NewServer(st, fmt.Sprintf(":%d", port), cert, key, dataDir)
327
349
                        })
328
 
                        runner.StartWorker("cleaner", func() (worker.Worker, error) {
 
350
                        a.startWorkerAfterUpgrade(runner, "cleaner", func() (worker.Worker, error) {
329
351
                                return cleaner.NewCleaner(st), nil
330
352
                        })
331
 
                        runner.StartWorker("resumer", func() (worker.Worker, error) {
 
353
                        a.startWorkerAfterUpgrade(runner, "resumer", func() (worker.Worker, error) {
332
354
                                // The action of resumer is so subtle that it is not tested,
333
355
                                // because we can't figure out how to do so without brutalising
334
356
                                // the transaction log.
335
357
                                return resumer.NewResumer(st), nil
336
358
                        })
337
 
                        runner.StartWorker("minunitsworker", func() (worker.Worker, error) {
 
359
                        a.startWorkerAfterUpgrade(runner, "minunitsworker", func() (worker.Worker, error) {
338
360
                                return minunitsworker.NewMinUnitsWorker(st), nil
339
361
                        })
340
362
                case state.JobManageStateDeprecated:
346
368
        return newCloseWorker(runner, st), nil
347
369
}
348
370
 
 
371
// startWorker starts a worker to run the specified child worker but only after waiting for upgrades to complete.
 
372
func (a *MachineAgent) startWorkerAfterUpgrade(runner worker.Runner, name string, start func() (worker.Worker, error)) {
 
373
        runner.StartWorker(name, func() (worker.Worker, error) {
 
374
                return a.upgradeWaiterWorker(start), nil
 
375
        })
 
376
}
 
377
 
 
378
// upgradeWaiterWorker runs the specified worker after upgrades have completed.
 
379
func (a *MachineAgent) upgradeWaiterWorker(start func() (worker.Worker, error)) worker.Worker {
 
380
        return worker.NewSimpleWorker(func(stop <-chan struct{}) error {
 
381
                // wait for the upgrade to complete (or for us to be stopped)
 
382
                select {
 
383
                case <-stop:
 
384
                        return nil
 
385
                case <-a.upgradeComplete:
 
386
                }
 
387
                w, err := start()
 
388
                if err != nil {
 
389
                        return err
 
390
                }
 
391
                waitCh := make(chan error)
 
392
                go func() {
 
393
                        waitCh <- w.Wait()
 
394
                }()
 
395
                select {
 
396
                case err := <-waitCh:
 
397
                        return err
 
398
                case <-stop:
 
399
                        w.Kill()
 
400
                }
 
401
                return <-waitCh
 
402
        })
 
403
}
 
404
 
 
405
// upgradeWorker runs the required upgrade operations to upgrade to the current Juju version.
 
406
func (a *MachineAgent) upgradeWorker(apiState *api.State, jobs []params.MachineJob) worker.Worker {
 
407
        return worker.NewSimpleWorker(func(stop <-chan struct{}) error {
 
408
                select {
 
409
                case <-a.upgradeComplete:
 
410
                        // Our work is already done (we're probably being restarted
 
411
                        // because the API connection has gone down), so do nothing.
 
412
                        <-stop
 
413
                        return nil
 
414
                default:
 
415
                }
 
416
                err := a.runUpgrades(apiState, jobs)
 
417
                if err != nil {
 
418
                        return err
 
419
                }
 
420
                logger.Infof("Upgrade to %v completed.", version.Current)
 
421
                close(a.upgradeComplete)
 
422
                <-stop
 
423
                return nil
 
424
        })
 
425
}
 
426
 
 
427
// runUpgrades runs the upgrade operations for each job type and updates the updatedToVersion on success.
 
428
func (a *MachineAgent) runUpgrades(st *api.State, jobs []params.MachineJob) error {
 
429
        agentConfig := a.Conf.config
 
430
        from := version.Current
 
431
        from.Number = agentConfig.UpgradedToVersion()
 
432
        if from == version.Current {
 
433
                logger.Infof("Upgrade to %v already completed.", version.Current)
 
434
                return nil
 
435
        }
 
436
        context := upgrades.NewContext(agentConfig, st)
 
437
        for _, job := range jobs {
 
438
                var target upgrades.Target
 
439
                switch job {
 
440
                case params.JobManageEnviron:
 
441
                        target = upgrades.StateServer
 
442
                case params.JobHostUnits:
 
443
                        target = upgrades.HostMachine
 
444
                default:
 
445
                        continue
 
446
                }
 
447
                logger.Infof("Starting upgrade from %v to %v for %v", from, version.Current, target)
 
448
                if err := upgrades.PerformUpgrade(from.Number, target, context); err != nil {
 
449
                        return fmt.Errorf("cannot perform upgrade from %v to %v for %v: %v", from, version.Current, target, err)
 
450
                }
 
451
        }
 
452
        return a.Conf.config.WriteUpgradedToVersion(version.Current.Number)
 
453
}
 
454
 
349
455
func (a *MachineAgent) Entity(st *state.State) (AgentState, error) {
350
456
        m, err := st.Machine(a.MachineId)
351
457
        if err != nil {
385
491
                        errors = append(errors, fmt.Errorf("cannot remove service %q: %v", agentServiceName, err))
386
492
                }
387
493
        }
388
 
        // Remove the rsyslog conf file and restart rsyslogd.
389
 
        if rsyslogConfPath := a.Conf.config.Value(agent.RsyslogConfPath); rsyslogConfPath != "" {
390
 
                if err := os.Remove(rsyslogConfPath); err != nil {
391
 
                        errors = append(errors, err)
392
 
                }
393
 
                if err := syslog.Restart(); err != nil {
394
 
                        errors = append(errors, err)
395
 
                }
396
 
        }
397
494
        // Remove the juju-run symlink.
398
495
        if err := os.Remove(jujuRun); err != nil && !os.IsNotExist(err) {
399
496
                errors = append(errors, err)