27
26
"launchpad.net/juju-core/state/api/params"
28
27
apiprovisioner "launchpad.net/juju-core/state/api/provisioner"
29
28
"launchpad.net/juju-core/state/apiserver"
29
"launchpad.net/juju-core/upgrades"
30
30
"launchpad.net/juju-core/upstart"
31
"launchpad.net/juju-core/version"
31
32
"launchpad.net/juju-core/worker"
32
33
"launchpad.net/juju-core/worker/authenticationworker"
33
34
"launchpad.net/juju-core/worker/charmrevisionworker"
181
rsyslogMode := rsyslog.RsyslogModeForwarding
182
for _, job := range entity.Jobs() {
183
if job == params.JobManageEnviron {
184
rsyslogMode = rsyslog.RsyslogModeAccumulate
177
189
runner := newRunner(connectionIsFatal(st), moreImportant)
178
runner.StartWorker("machiner", func() (worker.Worker, error) {
179
return machiner.NewMachiner(st.Machiner(), agentConfig), nil
191
// Run the upgrader and the upgrade-steps worker without waiting for the upgrade steps to complete.
181
192
runner.StartWorker("upgrader", func() (worker.Worker, error) {
182
193
return upgrader.NewUpgrader(st.Upgrader(), agentConfig), nil
184
runner.StartWorker("logger", func() (worker.Worker, error) {
195
runner.StartWorker("upgrade-steps", func() (worker.Worker, error) {
196
return a.upgradeWorker(st, entity.Jobs()), nil
199
// All other workers must wait for the upgrade steps to complete before starting.
200
a.startWorkerAfterUpgrade(runner, "machiner", func() (worker.Worker, error) {
201
return machiner.NewMachiner(st.Machiner(), agentConfig), nil
203
a.startWorkerAfterUpgrade(runner, "logger", func() (worker.Worker, error) {
185
204
return workerlogger.NewLogger(st.Logger(), agentConfig), nil
187
runner.StartWorker("machineenvironmentworker", func() (worker.Worker, error) {
206
a.startWorkerAfterUpgrade(runner, "machineenvironmentworker", func() (worker.Worker, error) {
188
207
return machineenvironmentworker.NewMachineEnvironmentWorker(st.Environment(), agentConfig), nil
209
a.startWorkerAfterUpgrade(runner, "rsyslog", func() (worker.Worker, error) {
210
return newRsyslogConfigWorker(st.Rsyslog(), agentConfig, rsyslogMode)
191
213
// If not a local provider bootstrap machine, start the worker to manage SSH keys.
192
214
providerType := agentConfig.Value(agent.ProviderType)
193
215
if providerType != provider.Local || a.MachineId != bootstrapMachineId {
194
runner.StartWorker("authenticationworker", func() (worker.Worker, error) {
216
a.startWorkerAfterUpgrade(runner, "authenticationworker", func() (worker.Worker, error) {
195
217
return authenticationworker.NewWorker(st.KeyUpdater(), agentConfig), nil
203
225
for _, job := range entity.Jobs() {
205
227
case params.JobHostUnits:
206
runner.StartWorker("deployer", func() (worker.Worker, error) {
228
a.startWorkerAfterUpgrade(runner, "deployer", func() (worker.Worker, error) {
207
229
apiDeployer := st.Deployer()
208
230
context := newDeployContext(apiDeployer, agentConfig)
209
231
return deployer.NewDeployer(apiDeployer, context), nil
211
233
case params.JobManageEnviron:
212
runner.StartWorker("environ-provisioner", func() (worker.Worker, error) {
234
a.startWorkerAfterUpgrade(runner, "environ-provisioner", func() (worker.Worker, error) {
213
235
return provisioner.NewEnvironProvisioner(st.Provisioner(), agentConfig), nil
215
237
// TODO(axw) 2013-09-24 bug #1229506
216
238
// Make another job to enable the firewaller. Not all environments
217
239
// are capable of managing ports centrally.
218
runner.StartWorker("firewaller", func() (worker.Worker, error) {
240
a.startWorkerAfterUpgrade(runner, "firewaller", func() (worker.Worker, error) {
219
241
return firewaller.NewFirewaller(st.Firewaller())
221
runner.StartWorker("charm-revision-updater", func() (worker.Worker, error) {
243
a.startWorkerAfterUpgrade(runner, "charm-revision-updater", func() (worker.Worker, error) {
222
244
return charmrevisionworker.NewRevisionUpdateWorker(st.CharmRevisionUpdater()), nil
224
246
case params.JobManageStateDeprecated:
274
296
// Start the watcher to fire when a container is first requested on the machine.
275
297
watcherName := fmt.Sprintf("%s-container-watcher", machine.Id())
276
298
handler := provisioner.NewContainerSetupHandler(runner, watcherName, containers, machine, pr, a.Conf.config)
277
runner.StartWorker(watcherName, func() (worker.Worker, error) {
299
a.startWorkerAfterUpgrade(runner, watcherName, func() (worker.Worker, error) {
278
300
return worker.NewStringsWorker(handler), nil
296
318
// the storage provider on one machine, and that is the "bootstrap" node.
297
319
providerType := agentConfig.Value(agent.ProviderType)
298
320
if (providerType == provider.Local || provider.IsManual(providerType)) && m.Id() == bootstrapMachineId {
299
runner.StartWorker("local-storage", func() (worker.Worker, error) {
321
a.startWorkerAfterUpgrade(runner, "local-storage", func() (worker.Worker, error) {
300
322
// TODO(axw) 2013-09-24 bug #1229507
301
323
// Make another job to enable storage.
302
324
// There's nothing special about this.
308
330
case state.JobHostUnits:
309
331
// Implemented in APIWorker.
310
332
case state.JobManageEnviron:
311
runner.StartWorker("instancepoller", func() (worker.Worker, error) {
333
a.startWorkerAfterUpgrade(runner, "instancepoller", func() (worker.Worker, error) {
312
334
return instancepoller.NewWorker(st), nil
314
336
runner.StartWorker("apiserver", func() (worker.Worker, error) {
325
347
dataDir := a.Conf.config.DataDir()
326
348
return apiserver.NewServer(st, fmt.Sprintf(":%d", port), cert, key, dataDir)
328
runner.StartWorker("cleaner", func() (worker.Worker, error) {
350
a.startWorkerAfterUpgrade(runner, "cleaner", func() (worker.Worker, error) {
329
351
return cleaner.NewCleaner(st), nil
331
runner.StartWorker("resumer", func() (worker.Worker, error) {
353
a.startWorkerAfterUpgrade(runner, "resumer", func() (worker.Worker, error) {
332
354
// The action of resumer is so subtle that it is not tested,
333
355
// because we can't figure out how to do so without brutalising
334
356
// the transaction log.
335
357
return resumer.NewResumer(st), nil
337
runner.StartWorker("minunitsworker", func() (worker.Worker, error) {
359
a.startWorkerAfterUpgrade(runner, "minunitsworker", func() (worker.Worker, error) {
338
360
return minunitsworker.NewMinUnitsWorker(st), nil
340
362
case state.JobManageStateDeprecated:
346
368
return newCloseWorker(runner, st), nil
371
// startWorker starts a worker to run the specified child worker but only after waiting for upgrades to complete.
372
func (a *MachineAgent) startWorkerAfterUpgrade(runner worker.Runner, name string, start func() (worker.Worker, error)) {
373
runner.StartWorker(name, func() (worker.Worker, error) {
374
return a.upgradeWaiterWorker(start), nil
378
// upgradeWaiterWorker runs the specified worker after upgrades have completed.
379
func (a *MachineAgent) upgradeWaiterWorker(start func() (worker.Worker, error)) worker.Worker {
380
return worker.NewSimpleWorker(func(stop <-chan struct{}) error {
381
// wait for the upgrade to complete (or for us to be stopped)
385
case <-a.upgradeComplete:
391
waitCh := make(chan error)
396
case err := <-waitCh:
405
// upgradeWorker runs the required upgrade operations to upgrade to the current Juju version.
406
func (a *MachineAgent) upgradeWorker(apiState *api.State, jobs []params.MachineJob) worker.Worker {
407
return worker.NewSimpleWorker(func(stop <-chan struct{}) error {
409
case <-a.upgradeComplete:
410
// Our work is already done (we're probably being restarted
411
// because the API connection has gone down), so do nothing.
416
err := a.runUpgrades(apiState, jobs)
420
logger.Infof("Upgrade to %v completed.", version.Current)
421
close(a.upgradeComplete)
427
// runUpgrades runs the upgrade operations for each job type and updates the updatedToVersion on success.
428
func (a *MachineAgent) runUpgrades(st *api.State, jobs []params.MachineJob) error {
429
agentConfig := a.Conf.config
430
from := version.Current
431
from.Number = agentConfig.UpgradedToVersion()
432
if from == version.Current {
433
logger.Infof("Upgrade to %v already completed.", version.Current)
436
context := upgrades.NewContext(agentConfig, st)
437
for _, job := range jobs {
438
var target upgrades.Target
440
case params.JobManageEnviron:
441
target = upgrades.StateServer
442
case params.JobHostUnits:
443
target = upgrades.HostMachine
447
logger.Infof("Starting upgrade from %v to %v for %v", from, version.Current, target)
448
if err := upgrades.PerformUpgrade(from.Number, target, context); err != nil {
449
return fmt.Errorf("cannot perform upgrade from %v to %v for %v: %v", from, version.Current, target, err)
452
return a.Conf.config.WriteUpgradedToVersion(version.Current.Number)
349
455
func (a *MachineAgent) Entity(st *state.State) (AgentState, error) {
350
456
m, err := st.Machine(a.MachineId)
385
491
errors = append(errors, fmt.Errorf("cannot remove service %q: %v", agentServiceName, err))
388
// Remove the rsyslog conf file and restart rsyslogd.
389
if rsyslogConfPath := a.Conf.config.Value(agent.RsyslogConfPath); rsyslogConfPath != "" {
390
if err := os.Remove(rsyslogConfPath); err != nil {
391
errors = append(errors, err)
393
if err := syslog.Restart(); err != nil {
394
errors = append(errors, err)
397
494
// Remove the juju-run symlink.
398
495
if err := os.Remove(jujuRun); err != nil && !os.IsNotExist(err) {
399
496
errors = append(errors, err)