~juju-qa/ubuntu/xenial/juju/xenial-2.0-beta3

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/cmd/jujud/agent/machine.go

  • Committer: Martin Packman
  • Date: 2016-03-30 19:31:08 UTC
  • mfrom: (1.1.41)
  • Revision ID: martin.packman@canonical.com-20160330193108-h9iz3ak334uk0z5r
Merge new upstream source 2.0~beta3

Show diffs side-by-side

added added

removed removed

Lines of Context:
28
28
        "github.com/juju/utils/set"
29
29
        "github.com/juju/utils/symlink"
30
30
        "github.com/juju/utils/voyeur"
 
31
        "github.com/juju/version"
31
32
        "gopkg.in/juju/charmrepo.v2-unstable"
32
33
        "gopkg.in/mgo.v2"
33
34
        "gopkg.in/natefinch/lumberjack.v2"
37
38
        "github.com/juju/juju/agent"
38
39
        "github.com/juju/juju/agent/tools"
39
40
        "github.com/juju/juju/api"
40
 
        "github.com/juju/juju/api/agenttools"
41
41
        apideployer "github.com/juju/juju/api/deployer"
42
42
        "github.com/juju/juju/api/metricsmanager"
43
43
        "github.com/juju/juju/api/statushistory"
62
62
        "github.com/juju/juju/service/common"
63
63
        "github.com/juju/juju/state"
64
64
        "github.com/juju/juju/state/multiwatcher"
65
 
        statestorage "github.com/juju/juju/state/storage"
66
65
        "github.com/juju/juju/storage/looputil"
67
66
        "github.com/juju/juju/upgrades"
68
 
        "github.com/juju/juju/version"
 
67
        jujuversion "github.com/juju/juju/version"
69
68
        "github.com/juju/juju/watcher"
70
69
        "github.com/juju/juju/worker"
71
70
        "github.com/juju/juju/worker/addresser"
83
82
        "github.com/juju/juju/worker/imagemetadataworker"
84
83
        "github.com/juju/juju/worker/instancepoller"
85
84
        "github.com/juju/juju/worker/logsender"
86
 
        "github.com/juju/juju/worker/machiner"
87
85
        "github.com/juju/juju/worker/metricworker"
88
86
        "github.com/juju/juju/worker/minunitsworker"
89
87
        "github.com/juju/juju/worker/modelworkermanager"
93
91
        "github.com/juju/juju/worker/singular"
94
92
        "github.com/juju/juju/worker/statushistorypruner"
95
93
        "github.com/juju/juju/worker/storageprovisioner"
96
 
        "github.com/juju/juju/worker/toolsversionchecker"
97
94
        "github.com/juju/juju/worker/txnpruner"
98
95
        "github.com/juju/juju/worker/undertaker"
99
96
        "github.com/juju/juju/worker/unitassigner"
100
97
        "github.com/juju/juju/worker/upgradesteps"
101
98
)
102
99
 
103
 
const bootstrapMachineId = "0"
104
 
 
105
100
var (
106
101
        logger       = loggo.GetLogger("juju.cmd.jujud")
107
 
        retryDelay   = 3 * time.Second
108
102
        jujuRun      = paths.MustSucceed(paths.JujuRun(series.HostSeries()))
109
103
        jujuDumpLogs = paths.MustSucceed(paths.JujuDumpLogs(series.HostSeries()))
110
104
 
115
109
        ensureMongoAdminUser     = mongo.EnsureAdminUser
116
110
        newSingularRunner        = singular.New
117
111
        peergrouperNew           = peergrouper.New
118
 
        newMachiner              = machiner.NewMachiner
119
112
        newDiscoverSpaces        = discoverspaces.NewWorker
120
113
        newFirewaller            = firewaller.NewFirewaller
121
114
        newCertificateUpdater    = certupdater.NewCertificateUpdater
227
220
        if err != nil {
228
221
                return errors.Annotate(err, "cannot read agent configuration")
229
222
        }
230
 
        agentConfig := a.currentConfig.CurrentConfig()
231
223
 
232
224
        // the context's stderr is set as the loggo writer in github.com/juju/cmd/logging.go
233
225
        a.ctx.Stderr = &lumberjack.Logger{
234
 
                Filename:   agent.LogFilename(agentConfig),
 
226
                Filename:   agent.LogFilename(a.currentConfig.CurrentConfig()),
235
227
                MaxSize:    300, // megabytes
236
228
                MaxBackups: 2,
237
229
        }
264
256
func MachineAgentFactoryFn(
265
257
        agentConfWriter AgentConfigWriter,
266
258
        bufferedLogs logsender.LogRecordCh,
267
 
        loopDeviceManager looputil.LoopDeviceManager,
268
259
        rootDir string,
269
260
) func(string) *MachineAgent {
270
261
        return func(machineId string) *MachineAgent {
273
264
                        agentConfWriter,
274
265
                        bufferedLogs,
275
266
                        worker.NewRunner(cmdutil.IsFatal, cmdutil.MoreImportant, worker.RestartDelay),
276
 
                        loopDeviceManager,
 
267
                        looputil.NewLoopDeviceManager(),
277
268
                        rootDir,
278
269
                )
279
270
        }
291
282
        return &MachineAgent{
292
283
                machineId:                   machineId,
293
284
                AgentConfigWriter:           agentConfWriter,
 
285
                configChangedVal:            voyeur.NewValue(true),
294
286
                bufferedLogs:                bufferedLogs,
295
287
                workersStarted:              make(chan struct{}),
296
288
                runner:                      runner,
310
302
        runner           worker.Runner
311
303
        rootDir          string
312
304
        bufferedLogs     logsender.LogRecordCh
313
 
        configChangedVal voyeur.Value
 
305
        configChangedVal *voyeur.Value
314
306
        upgradeComplete  gate.Lock
315
307
        workersStarted   chan struct{}
316
308
 
414
406
                return fmt.Errorf("cannot read agent configuration: %v", err)
415
407
        }
416
408
 
417
 
        logger.Infof("machine agent %v start (%s [%s])", a.Tag(), version.Current, runtime.Compiler)
 
409
        logger.Infof("machine agent %v start (%s [%s])", a.Tag(), jujuversion.Current, runtime.Compiler)
418
410
        if flags := featureflag.String(); flags != "" {
419
411
                logger.Warningf("developer feature flags enabled: %s", flags)
420
412
        }
441
433
                return err
442
434
        }
443
435
        a.runner.StartWorker("engine", createEngine)
444
 
        a.runner.StartWorker("statestarter", a.newStateStarterWorker)
445
436
 
446
437
        // At this point, all workers will have been configured to start
447
438
        close(a.workersStarted)
476
467
                manifolds := machine.Manifolds(machine.ManifoldsConfig{
477
468
                        PreviousAgentVersion: previousAgentVersion,
478
469
                        Agent:                agent.APIHostPortsSetter{a},
 
470
                        AgentConfigChanged:   a.configChangedVal,
479
471
                        UpgradeStepsLock:     a.upgradeComplete,
480
472
                        UpgradeCheckLock:     a.initialUpgradeCheckComplete,
 
473
                        OpenState:            a.initState,
481
474
                        OpenStateForUpgrade:  a.openStateForUpgrade,
 
475
                        StartStateWorkers:    a.startStateWorkers,
482
476
                        WriteUninstallFile:   a.writeUninstallAgentFile,
483
477
                        StartAPIWorkers:      a.startAPIWorkers,
484
478
                        PreUpgradeSteps:      upgrades.PreUpgradeSteps,
527
521
 
528
522
func (a *MachineAgent) ChangeConfig(mutate agent.ConfigMutator) error {
529
523
        err := a.AgentConfigWriter.ChangeConfig(mutate)
530
 
        a.configChangedVal.Set(struct{}{})
531
 
        if err != nil {
532
 
                return errors.Trace(err)
533
 
        }
534
 
        return nil
 
524
        a.configChangedVal.Set(true)
 
525
        return errors.Trace(err)
535
526
}
536
527
 
537
528
func (a *MachineAgent) maybeStopMongo(ver mongo.Version, isMaster bool) error {
644
635
        }
645
636
}
646
637
 
647
 
// newStateStarterWorker wraps stateStarter in a simple worker for use in
648
 
// a.runner.StartWorker.
649
 
func (a *MachineAgent) newStateStarterWorker() (worker.Worker, error) {
650
 
        return worker.NewSimpleWorker(a.stateStarter), nil
651
 
}
652
 
 
653
 
// stateStarter watches for changes to the agent configuration, and
654
 
// starts or stops the state worker as appropriate. We watch the agent
655
 
// configuration because the agent configuration has all the details
656
 
// that we need to start a controller, whether they have been cached
657
 
// or read from the state.
658
 
//
659
 
// It will stop working as soon as stopch is closed.
660
 
func (a *MachineAgent) stateStarter(stopch <-chan struct{}) error {
661
 
        confWatch := a.configChangedVal.Watch()
662
 
        defer confWatch.Close()
663
 
        watchCh := make(chan struct{})
664
 
        go func() {
665
 
                for confWatch.Next() {
666
 
                        watchCh <- struct{}{}
667
 
                }
668
 
        }()
669
 
        for {
670
 
                select {
671
 
                case <-watchCh:
672
 
                        agentConfig := a.CurrentConfig()
673
 
 
674
 
                        // N.B. StartWorker and StopWorker are idempotent.
675
 
                        _, ok := agentConfig.StateServingInfo()
676
 
                        if ok {
677
 
                                a.runner.StartWorker("state", func() (worker.Worker, error) {
678
 
                                        return a.StateWorker()
679
 
                                })
680
 
                        } else {
681
 
                                a.runner.StopWorker("state")
682
 
                        }
683
 
                case <-stopch:
684
 
                        return nil
685
 
                }
686
 
        }
687
 
}
688
 
 
689
638
var newEnvirons = environs.New
690
639
 
691
640
// startAPIWorkers is called to start workers which rely on the
738
687
        }
739
688
 
740
689
        if isModelManager {
741
 
                runner.StartWorker("toolsversionchecker", func() (worker.Worker, error) {
742
 
                        // 4 times a day seems a decent enough amount of checks.
743
 
                        checkerParams := toolsversionchecker.VersionCheckerParams{
744
 
                                CheckInterval: time.Hour * 6,
745
 
                        }
746
 
                        return toolsversionchecker.New(agenttools.NewFacade(apiConn), &checkerParams), nil
747
 
                })
748
690
 
749
691
                // Published image metadata for some providers are in simple streams.
750
692
                // Providers that do not depend on simple streams do not need this worker.
788
730
//
789
731
// TODO(mjs)- review the need for this once the dependency engine is
790
732
// in use. Why can't upgradesteps depend on the main state connection?
791
 
func (a *MachineAgent) openStateForUpgrade() (*state.State, func(), error) {
 
733
func (a *MachineAgent) openStateForUpgrade() (*state.State, error) {
792
734
        agentConfig := a.CurrentConfig()
793
735
        if err := a.ensureMongoServer(agentConfig); err != nil {
794
 
                return nil, nil, errors.Trace(err)
 
736
                return nil, errors.Trace(err)
795
737
        }
796
738
        info, ok := agentConfig.MongoInfo()
797
739
        if !ok {
798
 
                return nil, nil, errors.New("no state info available")
 
740
                return nil, errors.New("no state info available")
799
741
        }
800
742
        st, err := state.Open(agentConfig.Model(), info, mongo.DefaultDialOpts(), environs.NewStatePolicy())
801
743
        if err != nil {
802
 
                return nil, nil, errors.Trace(err)
803
 
        }
804
 
 
805
 
        // Ensure storage is available during upgrades.
806
 
        stor := statestorage.NewStorage(st.ModelUUID(), st.MongoSession())
807
 
        registerSimplestreamsDataSource(stor, false)
808
 
 
809
 
        closer := func() {
810
 
                unregisterSimplestreamsDataSource()
811
 
                st.Close()
812
 
        }
813
 
        return st, closer, nil
 
744
                return nil, errors.Trace(err)
 
745
        }
 
746
        return st, nil
814
747
}
815
748
 
816
749
// setupContainerSupport determines what containers can be run on this machine and
915
848
        return nil
916
849
}
917
850
 
918
 
// StateWorker returns a worker running all the workers that require
919
 
// a *state.State connection.
920
 
func (a *MachineAgent) StateWorker() (worker.Worker, error) {
921
 
        agentConfig := a.CurrentConfig()
922
 
 
 
851
func (a *MachineAgent) initState(agentConfig agent.Config) (*state.State, error) {
923
852
        // Start MongoDB server and dial.
924
853
        if err := a.ensureMongoServer(agentConfig); err != nil {
925
854
                return nil, err
926
855
        }
927
 
        st, m, err := openState(agentConfig, stateWorkerDialOpts)
 
856
 
 
857
        st, _, err := openState(agentConfig, stateWorkerDialOpts)
928
858
        if err != nil {
929
859
                return nil, err
930
860
        }
 
861
 
931
862
        reportOpenedState(st)
932
863
 
 
864
        return st, nil
 
865
}
 
866
 
 
867
// startStateWorkers returns a worker running all the workers that
 
868
// require a *state.State connection.
 
869
func (a *MachineAgent) startStateWorkers(st *state.State) (worker.Worker, error) {
 
870
        agentConfig := a.CurrentConfig()
 
871
 
 
872
        m, err := getMachine(st, agentConfig.Tag())
 
873
        if err != nil {
 
874
                return nil, errors.Annotate(err, "machine lookup")
 
875
        }
 
876
 
933
877
        runner := newConnRunner(st)
934
878
        singularRunner, err := newSingularStateRunner(runner, st, m)
935
879
        if err != nil {
980
924
                        //
981
925
                        // TODO(ericsnow) For now we simply do not close the channel.
982
926
                        certChangedChan := make(chan params.StateServingInfo, 10)
983
 
                        // Each time aipserver worker is restarted, we need a fresh copy of state due
 
927
                        // Each time apiserver worker is restarted, we need a fresh copy of state due
984
928
                        // to the fact that state holds lease managers which are killed and need to be reset.
985
929
                        stateOpener := func() (*state.State, error) {
986
 
                                logger.Debugf("opening state for apistate worker")
 
930
                                logger.Debugf("opening state for apiserver worker")
987
931
                                st, _, err := openState(agentConfig, stateWorkerDialOpts)
988
932
                                return st, err
989
933
                        }
1015
959
                        return nil, errors.Errorf("unknown job type %q", job)
1016
960
                }
1017
961
        }
1018
 
        return cmdutil.NewCloseWorker(logger, runner, stateWorkerCloser{st}), nil
1019
 
}
1020
 
 
1021
 
type stateWorkerCloser struct {
1022
 
        stateCloser io.Closer
1023
 
}
1024
 
 
1025
 
func (s stateWorkerCloser) Close() error {
1026
 
        // This state-dependent data source will be useless once state is closed -
1027
 
        // un-register it before closing state.
1028
 
        unregisterSimplestreamsDataSource()
1029
 
        return s.stateCloser.Close()
 
962
        return runner, nil
1030
963
}
1031
964
 
1032
965
// startEnvWorkers starts controller workers that need to run per
1220
1153
                return w, nil
1221
1154
        })
1222
1155
 
1223
 
        for name, factory := range registeredModelWorkers {
1224
 
                newWorker := factory(st)
1225
 
                singularRunner.StartWorker(name, newWorker)
1226
 
        }
1227
 
 
1228
1156
        return runner, nil
1229
1157
}
1230
1158
 
1481
1409
                switch authTag := authTag.(type) {
1482
1410
                case names.UserTag:
1483
1411
                        // use a restricted API mode
1484
 
                        return apiserver.UpgradeInProgressError
 
1412
                        return params.UpgradeInProgressError
1485
1413
                case names.MachineTag:
1486
1414
                        if authTag == a.Tag() {
1487
1415
                                // allow logins from the local machine
1488
1416
                                return nil
1489
1417
                        }
1490
1418
                }
1491
 
                return errors.Errorf("login for %q blocked because %s", authTag, apiserver.UpgradeInProgressError.Error())
 
1419
                return errors.Errorf("login for %q blocked because %s", authTag, params.CodeUpgradeInProgress)
1492
1420
        } else {
1493
1421
                return nil // allow all logins
1494
1422
        }
1761
1689
        return st, m, nil
1762
1690
}
1763
1691
 
 
1692
func getMachine(st *state.State, tag names.Tag) (*state.Machine, error) {
 
1693
        m0, err := st.FindEntity(tag)
 
1694
        if err != nil {
 
1695
                return nil, err
 
1696
        }
 
1697
        return m0.(*state.Machine), nil
 
1698
}
 
1699
 
1764
1700
// startWorkerAfterUpgrade starts a worker to run the specified child worker
1765
1701
// but only after waiting for upgrades to complete.
1766
1702
func (a *MachineAgent) startWorkerAfterUpgrade(runner worker.Runner, name string, start func() (worker.Worker, error)) {