1
// Copyright 2012, 2013 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
18
"github.com/juju/errors"
19
apiagent "github.com/juju/juju/api/agent"
20
apimachiner "github.com/juju/juju/api/machiner"
21
"github.com/juju/juju/controller"
22
"github.com/juju/loggo"
23
"github.com/juju/replicaset"
24
"github.com/juju/utils"
25
"github.com/juju/utils/clock"
26
"github.com/juju/utils/featureflag"
27
"github.com/juju/utils/series"
28
"github.com/juju/utils/set"
29
"github.com/juju/utils/symlink"
30
"github.com/juju/utils/voyeur"
31
"github.com/juju/version"
32
"gopkg.in/juju/charmrepo.v2-unstable"
33
"gopkg.in/juju/names.v2"
35
"gopkg.in/natefinch/lumberjack.v2"
36
"launchpad.net/gnuflag"
39
"github.com/juju/juju/agent"
40
"github.com/juju/juju/agent/tools"
41
"github.com/juju/juju/api"
42
apideployer "github.com/juju/juju/api/deployer"
43
"github.com/juju/juju/api/metricsmanager"
44
apiprovisioner "github.com/juju/juju/api/provisioner"
45
"github.com/juju/juju/apiserver"
46
"github.com/juju/juju/apiserver/observer"
47
"github.com/juju/juju/apiserver/params"
48
"github.com/juju/juju/audit"
49
"github.com/juju/juju/cert"
50
"github.com/juju/juju/cmd/jujud/agent/machine"
51
"github.com/juju/juju/cmd/jujud/agent/model"
52
"github.com/juju/juju/cmd/jujud/reboot"
53
cmdutil "github.com/juju/juju/cmd/jujud/util"
54
"github.com/juju/juju/container"
55
"github.com/juju/juju/container/kvm"
56
"github.com/juju/juju/environs"
57
"github.com/juju/juju/environs/simplestreams"
58
"github.com/juju/juju/instance"
59
jujunames "github.com/juju/juju/juju/names"
60
"github.com/juju/juju/juju/paths"
61
"github.com/juju/juju/mongo"
62
"github.com/juju/juju/service"
63
"github.com/juju/juju/service/common"
64
"github.com/juju/juju/state"
65
"github.com/juju/juju/state/multiwatcher"
66
"github.com/juju/juju/state/stateenvirons"
67
"github.com/juju/juju/storage/looputil"
68
"github.com/juju/juju/upgrades"
69
jujuversion "github.com/juju/juju/version"
70
"github.com/juju/juju/watcher"
71
"github.com/juju/juju/worker"
72
"github.com/juju/juju/worker/apicaller"
73
"github.com/juju/juju/worker/certupdater"
74
"github.com/juju/juju/worker/conv2state"
75
"github.com/juju/juju/worker/dblogpruner"
76
"github.com/juju/juju/worker/dependency"
77
"github.com/juju/juju/worker/deployer"
78
"github.com/juju/juju/worker/gate"
79
"github.com/juju/juju/worker/imagemetadataworker"
80
"github.com/juju/juju/worker/logsender"
81
"github.com/juju/juju/worker/modelworkermanager"
82
"github.com/juju/juju/worker/mongoupgrader"
83
"github.com/juju/juju/worker/peergrouper"
84
"github.com/juju/juju/worker/provisioner"
85
"github.com/juju/juju/worker/singular"
86
"github.com/juju/juju/worker/txnpruner"
87
"github.com/juju/juju/worker/upgradesteps"
91
logger = loggo.GetLogger("juju.cmd.jujud")
92
jujuRun = paths.MustSucceed(paths.JujuRun(series.HostSeries()))
93
jujuDumpLogs = paths.MustSucceed(paths.JujuDumpLogs(series.HostSeries()))
95
// The following are defined as variables to allow the tests to
96
// intercept calls to the functions. In every case, they should
97
// be expressed as explicit dependencies, but nobody has yet had
98
// the intestinal fortitude to untangle this package. Be that
99
// person! Juju Needs You.
100
useMultipleCPUs = utils.UseMultipleCPUs
101
newSingularRunner = singular.New
102
peergrouperNew = peergrouper.New
103
newCertificateUpdater = certupdater.NewCertificateUpdater
104
newMetadataUpdater = imagemetadataworker.NewWorker
105
newUpgradeMongoWorker = mongoupgrader.New
106
reportOpenedState = func(*state.State) {}
108
modelManifolds = model.Manifolds
109
machineManifolds = machine.Manifolds
112
// Variable to override in tests, default is true
113
var ProductionMongoWriteConcern = true
116
stateWorkerDialOpts = mongo.DefaultDialOpts()
117
stateWorkerDialOpts.PostDial = func(session *mgo.Session) error {
119
if ProductionMongoWriteConcern {
121
_, err := replicaset.CurrentConfig(session)
123
// set mongo to write-majority (writes only returned after
124
// replicated to a majority of replica-set members).
125
safe.WMode = "majority"
128
session.SetSafe(&safe)
133
// AgentInitializer handles initializing a type for use as a Jujud
135
type AgentInitializer interface {
136
AddFlags(*gnuflag.FlagSet)
137
CheckArgs([]string) error
140
// AgentConfigWriter encapsulates disk I/O operations with the agent
142
type AgentConfigWriter interface {
143
// ReadConfig reads the config for the given tag from disk.
144
ReadConfig(tag string) error
145
// ChangeConfig executes the given agent.ConfigMutator in a
146
// thread-safe context.
147
ChangeConfig(agent.ConfigMutator) error
148
// CurrentConfig returns a copy of the in-memory agent config.
149
CurrentConfig() agent.Config
152
// NewMachineAgentCmd creates a Command which handles parsing
153
// command-line arguments and instantiating and running a
155
func NewMachineAgentCmd(
157
machineAgentFactory func(string) *MachineAgent,
158
agentInitializer AgentInitializer,
159
configFetcher AgentConfigWriter,
161
return &machineAgentCmd{
163
machineAgentFactory: machineAgentFactory,
164
agentInitializer: agentInitializer,
165
currentConfig: configFetcher,
169
type machineAgentCmd struct {
172
// This group of arguments is required.
173
agentInitializer AgentInitializer
174
currentConfig AgentConfigWriter
175
machineAgentFactory func(string) *MachineAgent
178
// This group is for debugging purposes.
181
// The following are set via command-line flags.
185
// Init is called by the cmd system to initialize the structure for
187
func (a *machineAgentCmd) Init(args []string) error {
189
if !names.IsValidMachine(a.machineId) {
190
return fmt.Errorf("--machine-id option must be set, and expects a non-negative integer")
192
if err := a.agentInitializer.CheckArgs(args); err != nil {
196
// Due to changes in the logging, and needing to care about old
197
// models that have been upgraded, we need to explicitly remove the
198
// file writer if one has been added, otherwise we will get duplicate
199
// lines of all logging in the log file.
200
loggo.RemoveWriter("logfile")
206
err := a.currentConfig.ReadConfig(names.NewMachineTag(a.machineId).String())
208
return errors.Annotate(err, "cannot read agent configuration")
211
// the context's stderr is set as the loggo writer in github.com/juju/cmd/logging.go
212
a.ctx.Stderr = &lumberjack.Logger{
213
Filename: agent.LogFilename(a.currentConfig.CurrentConfig()),
214
MaxSize: 300, // megabytes
221
// Run instantiates a MachineAgent and runs it.
222
func (a *machineAgentCmd) Run(c *cmd.Context) error {
223
machineAgent := a.machineAgentFactory(a.machineId)
224
return machineAgent.Run(c)
227
// SetFlags adds the requisite flags to run this command.
228
func (a *machineAgentCmd) SetFlags(f *gnuflag.FlagSet) {
229
a.agentInitializer.AddFlags(f)
230
f.StringVar(&a.machineId, "machine-id", "", "id of the machine to run")
233
// Info returns usage information for the command.
234
func (a *machineAgentCmd) Info() *cmd.Info {
237
Purpose: "run a juju machine agent",
241
// MachineAgentFactoryFn returns a function which instantiates a
242
// MachineAgent given a machineId.
243
func MachineAgentFactoryFn(
244
agentConfWriter AgentConfigWriter,
245
bufferedLogs logsender.LogRecordCh,
247
) func(string) *MachineAgent {
248
return func(machineId string) *MachineAgent {
249
return NewMachineAgent(
253
worker.NewRunner(cmdutil.IsFatal, cmdutil.MoreImportant, worker.RestartDelay),
254
looputil.NewLoopDeviceManager(),
260
// NewMachineAgent instantiates a new MachineAgent.
261
func NewMachineAgent(
263
agentConfWriter AgentConfigWriter,
264
bufferedLogs logsender.LogRecordCh,
265
runner worker.Runner,
266
loopDeviceManager looputil.LoopDeviceManager,
269
return &MachineAgent{
270
machineId: machineId,
271
AgentConfigWriter: agentConfWriter,
272
configChangedVal: voyeur.NewValue(true),
273
bufferedLogs: bufferedLogs,
274
workersStarted: make(chan struct{}),
277
initialUpgradeCheckComplete: gate.NewLock(),
278
loopDeviceManager: loopDeviceManager,
282
// MachineAgent is responsible for tying together all functionality
283
// needed to orchestrate a Jujud instance which controls a machine.
284
type MachineAgent struct {
291
bufferedLogs logsender.LogRecordCh
292
configChangedVal *voyeur.Value
293
upgradeComplete gate.Lock
294
workersStarted chan struct{}
296
// XXX(fwereade): these smell strongly of goroutine-unsafeness.
300
// Used to signal that the upgrade worker will not
301
// reboot the agent on startup because there are no
302
// longer any immediately pending agent upgrades.
303
initialUpgradeCheckComplete gate.Lock
305
discoverSpacesComplete gate.Lock
307
mongoInitMutex sync.Mutex
308
mongoInitialized bool
310
loopDeviceManager looputil.LoopDeviceManager
313
// IsRestorePreparing returns bool representing if we are in restore mode
314
// but not running restore.
315
func (a *MachineAgent) IsRestorePreparing() bool {
316
return a.restoreMode && !a.restoring
319
// IsRestoreRunning returns bool representing if we are in restore mode
320
// and running the actual restore process.
321
func (a *MachineAgent) IsRestoreRunning() bool {
325
func (a *MachineAgent) isUpgradeRunning() bool {
326
return !a.upgradeComplete.IsUnlocked()
329
func (a *MachineAgent) isInitialUpgradeCheckPending() bool {
330
return !a.initialUpgradeCheckComplete.IsUnlocked()
333
// Wait waits for the machine agent to finish.
334
func (a *MachineAgent) Wait() error {
338
// Stop stops the machine agent.
339
func (a *MachineAgent) Stop() error {
344
// upgradeCertificateDNSNames ensure that the controller certificate
345
// recorded in the agent config and also mongo server.pem contains the
346
// DNSNames entires required by Juju/
347
func (a *MachineAgent) upgradeCertificateDNSNames() error {
348
agentConfig := a.CurrentConfig()
349
si, ok := agentConfig.StateServingInfo()
350
if !ok || si.CAPrivateKey == "" {
351
// No certificate information exists yet, nothing to do.
354
// Parse the current certificate to get the current dns names.
355
serverCert, err := cert.ParseCert(si.Cert)
360
dnsNames := set.NewStrings(serverCert.DNSNames...)
361
requiredDNSNames := []string{"local", "juju-apiserver", "juju-mongodb"}
362
for _, dnsName := range requiredDNSNames {
363
if dnsNames.Contains(dnsName) {
366
dnsNames.Add(dnsName)
372
// Write a new certificate to the mongo pem and agent config files.
373
si.Cert, si.PrivateKey, err = cert.NewDefaultServer(agentConfig.CACert(), si.CAPrivateKey, dnsNames.Values())
377
if err := mongo.UpdateSSLKey(agentConfig.DataDir(), si.Cert, si.PrivateKey); err != nil {
380
return a.AgentConfigWriter.ChangeConfig(func(config agent.ConfigSetter) error {
381
config.SetStateServingInfo(si)
386
// Run runs a machine agent.
387
func (a *MachineAgent) Run(*cmd.Context) error {
390
if err := a.ReadConfig(a.Tag().String()); err != nil {
391
return fmt.Errorf("cannot read agent configuration: %v", err)
394
logger.Infof("machine agent %v start (%s [%s])", a.Tag(), jujuversion.Current, runtime.Compiler)
395
if flags := featureflag.String(); flags != "" {
396
logger.Warningf("developer feature flags enabled: %s", flags)
399
// Before doing anything else, we need to make sure the certificate generated for
400
// use by mongo to validate controller connections is correct. This needs to be done
401
// before any possible restart of the mongo service.
402
// See bug http://pad.lv/1434680
403
if err := a.upgradeCertificateDNSNames(); err != nil {
404
return errors.Annotate(err, "error upgrading server certificate")
407
if upgradeComplete, err := upgradesteps.NewLock(a); err != nil {
408
return errors.Annotate(err, "error during creating upgrade completion channel")
410
a.upgradeComplete = upgradeComplete
413
agentConfig := a.CurrentConfig()
414
createEngine := a.makeEngineCreator(agentConfig.UpgradedToVersion())
415
charmrepo.CacheDir = filepath.Join(agentConfig.DataDir(), "charmcache")
416
if err := a.createJujudSymlinks(agentConfig.DataDir()); err != nil {
419
a.runner.StartWorker("engine", createEngine)
421
// At this point, all workers will have been configured to start
422
close(a.workersStarted)
423
err := a.runner.Wait()
424
switch errors.Cause(err) {
425
case worker.ErrTerminateAgent:
426
err = a.uninstallAgent()
427
case worker.ErrRebootMachine:
428
logger.Infof("Caught reboot error")
429
err = a.executeRebootOrShutdown(params.ShouldReboot)
430
case worker.ErrShutdownMachine:
431
logger.Infof("Caught shutdown error")
432
err = a.executeRebootOrShutdown(params.ShouldShutdown)
434
err = cmdutil.AgentDone(logger, err)
439
func (a *MachineAgent) makeEngineCreator(previousAgentVersion version.Number) func() (worker.Worker, error) {
440
return func() (worker.Worker, error) {
441
config := dependency.EngineConfig{
442
IsFatal: cmdutil.IsFatal,
443
WorstError: cmdutil.MoreImportantError,
444
ErrorDelay: 3 * time.Second,
445
BounceDelay: 10 * time.Millisecond,
447
engine, err := dependency.NewEngine(config)
451
manifolds := machineManifolds(machine.ManifoldsConfig{
452
PreviousAgentVersion: previousAgentVersion,
453
Agent: agent.APIHostPortsSetter{Agent: a},
455
AgentConfigChanged: a.configChangedVal,
456
UpgradeStepsLock: a.upgradeComplete,
457
UpgradeCheckLock: a.initialUpgradeCheckComplete,
458
OpenState: a.initState,
459
OpenStateForUpgrade: a.openStateForUpgrade,
460
StartStateWorkers: a.startStateWorkers,
461
StartAPIWorkers: a.startAPIWorkers,
462
PreUpgradeSteps: upgrades.PreUpgradeSteps,
463
LogSource: a.bufferedLogs,
464
NewDeployContext: newDeployContext,
465
Clock: clock.WallClock,
467
if err := dependency.Install(engine, manifolds); err != nil {
468
if err := worker.Stop(engine); err != nil {
469
logger.Errorf("while stopping engine with bad manifolds: %v", err)
477
func (a *MachineAgent) executeRebootOrShutdown(action params.RebootAction) error {
478
// At this stage, all API connections would have been closed
479
// We need to reopen the API to clear the reboot flag after
480
// scheduling the reboot. It may be cleaner to do this in the reboot
481
// worker, before returning the ErrRebootMachine.
482
conn, err := apicaller.OnlyConnect(a, apicaller.APIOpen)
484
logger.Infof("Reboot: Error connecting to state")
485
return errors.Trace(err)
488
// block until all units/containers are ready, and reboot/shutdown
489
finalize, err := reboot.NewRebootWaiter(conn, a.CurrentConfig())
491
return errors.Trace(err)
494
logger.Infof("Reboot: Executing reboot")
495
err = finalize.ExecuteReboot(action)
497
logger.Infof("Reboot: Error executing reboot: %v", err)
498
return errors.Trace(err)
500
// On windows, the shutdown command is asynchronous. We return ErrRebootMachine
501
// so the agent will simply exit without error pending reboot/shutdown.
502
return worker.ErrRebootMachine
505
func (a *MachineAgent) ChangeConfig(mutate agent.ConfigMutator) error {
506
err := a.AgentConfigWriter.ChangeConfig(mutate)
507
a.configChangedVal.Set(true)
508
return errors.Trace(err)
511
func (a *MachineAgent) maybeStopMongo(ver mongo.Version, isMaster bool) error {
512
if !a.mongoInitialized {
516
conf := a.AgentConfigWriter.CurrentConfig()
517
v := conf.MongoVersion()
519
logger.Errorf("Got version change %v", ver)
520
// TODO(perrito666) replace with "read-only" mode for environment when
522
if ver.NewerThan(v) > 0 {
523
err := a.AgentConfigWriter.ChangeConfig(func(config agent.ConfigSetter) error {
524
config.SetMongoVersion(mongo.MongoUpgrade)
536
// PrepareRestore will flag the agent to allow only a limited set
537
// of commands defined in
538
// "github.com/juju/juju/apiserver".allowedMethodsAboutToRestore
539
// the most noteworthy is:
540
// Backups.Restore: this will ensure that we can do all the file movements
541
// required for restore and no one will do changes while we do that.
542
// it will return error if the machine is already in this state.
543
func (a *MachineAgent) PrepareRestore() error {
545
return errors.Errorf("already in restore mode")
551
// BeginRestore will flag the agent to disallow all commands since
552
// restore should be running and therefore making changes that
553
// would override anything done.
554
func (a *MachineAgent) BeginRestore() error {
557
return errors.Errorf("not in restore mode, cannot begin restoration")
559
return errors.Errorf("already restoring")
565
// EndRestore will flag the agent to allow all commands
566
// This being invoked means that restore process failed
567
// since success restarts the agent.
568
func (a *MachineAgent) EndRestore() {
569
a.restoreMode = false
573
// newRestoreStateWatcherWorker will return a worker or err if there
574
// is a failure, the worker takes care of watching the state of
575
// restoreInfo doc and put the agent in the different restore modes.
576
func (a *MachineAgent) newRestoreStateWatcherWorker(st *state.State) (worker.Worker, error) {
577
rWorker := func(stopch <-chan struct{}) error {
578
return a.restoreStateWatcher(st, stopch)
580
return worker.NewSimpleWorker(rWorker), nil
583
// restoreChanged will be called whenever restoreInfo doc changes signaling a new
584
// step in the restore process.
585
func (a *MachineAgent) restoreChanged(st *state.State) error {
586
status, err := st.RestoreInfo().Status()
588
return errors.Annotate(err, "cannot read restore state")
591
case state.RestorePending:
593
case state.RestoreInProgress:
595
case state.RestoreFailed:
601
// restoreStateWatcher watches for restoreInfo looking for changes in the restore process.
602
func (a *MachineAgent) restoreStateWatcher(st *state.State, stopch <-chan struct{}) error {
603
restoreWatch := st.WatchRestoreInfoChanges()
611
case <-restoreWatch.Changes():
612
if err := a.restoreChanged(st); err != nil {
621
var newEnvirons = environs.New
623
// startAPIWorkers is called to start workers which rely on the
624
// machine agent's API connection (via the apiworkers manifold). It
625
// returns a Runner with a number of workers attached to it.
627
// The workers started here need to be converted to run under the
628
// dependency engine. Once they have all been converted, this method -
629
// and the apiworkers manifold - can be removed.
630
func (a *MachineAgent) startAPIWorkers(apiConn api.Connection) (_ worker.Worker, outErr error) {
631
agentConfig := a.CurrentConfig()
633
entity, err := apiagent.NewState(apiConn).Entity(a.Tag())
635
return nil, errors.Trace(err)
638
var isModelManager bool
639
for _, job := range entity.Jobs() {
641
case multiwatcher.JobManageModel:
642
isModelManager = true
644
// TODO(dimitern): Once all workers moved over to using
645
// the API, report "unknown job type" here.
649
runner := newConnRunner(apiConn)
651
// If startAPIWorkers exits early with an error, stop the
652
// runner so that any already started runners aren't leaked.
658
// Perform the operations needed to set up hosting for containers.
659
if err := a.setupContainerSupport(runner, apiConn, agentConfig); err != nil {
660
cause := errors.Cause(err)
661
if params.IsCodeDead(cause) || cause == worker.ErrTerminateAgent {
662
return nil, worker.ErrTerminateAgent
664
return nil, fmt.Errorf("setting up container support: %v", err)
669
// Published image metadata for some providers are in simple streams.
670
// Providers that do not depend on simple streams do not need this worker.
671
env, err := environs.GetEnviron(apiagent.NewState(apiConn), newEnvirons)
673
return nil, errors.Annotate(err, "getting environ")
675
if _, ok := env.(simplestreams.HasRegion); ok {
676
// Start worker that stores published image metadata in state.
677
runner.StartWorker("imagemetadata", func() (worker.Worker, error) {
678
return newMetadataUpdater(apiConn.MetadataUpdater()), nil
682
// We don't have instance info set and the network config for the
683
// bootstrap machine only, so update it now. All the other machines will
684
// have instance info including network config set at provisioning time.
685
if err := a.setControllerNetworkConfig(apiConn); err != nil {
686
return nil, errors.Annotate(err, "setting controller network config")
689
runner.StartWorker("stateconverter", func() (worker.Worker, error) {
690
// TODO(fwereade): this worker needs its own facade.
691
facade := apimachiner.NewState(apiConn)
692
handler := conv2state.New(facade, a)
693
w, err := watcher.NewNotifyWorker(watcher.NotifyConfig{
697
return nil, errors.Annotate(err, "cannot start controller promoter worker")
705
func (a *MachineAgent) setControllerNetworkConfig(apiConn api.Connection) error {
706
machinerAPI := apimachiner.NewState(apiConn)
707
agentConfig := a.CurrentConfig()
709
tag := agentConfig.Tag().(names.MachineTag)
710
machine, err := machinerAPI.Machine(tag)
711
if errors.IsNotFound(err) || err == nil && machine.Life() == params.Dead {
712
return worker.ErrTerminateAgent
715
return errors.Annotatef(err, "cannot load machine %s from state", tag)
718
if err := machine.SetProviderNetworkConfig(); err != nil {
719
return errors.Annotate(err, "cannot set controller provider network config")
724
// Restart restarts the agent's service.
725
func (a *MachineAgent) Restart() error {
726
name := a.CurrentConfig().Value(agent.AgentServiceName)
727
return service.Restart(name)
730
// openStateForUpgrade exists to be passed into the upgradesteps
731
// worker. The upgradesteps worker opens state independently of the
732
// state worker so that it isn't affected by the state worker's
733
// lifetime. It ensures the MongoDB server is configured and started,
734
// and then opens a state connection.
736
// TODO(mjs)- review the need for this once the dependency engine is
737
// in use. Why can't upgradesteps depend on the main state connection?
738
func (a *MachineAgent) openStateForUpgrade() (*state.State, error) {
739
agentConfig := a.CurrentConfig()
740
if err := a.ensureMongoServer(agentConfig); err != nil {
741
return nil, errors.Trace(err)
743
info, ok := agentConfig.MongoInfo()
745
return nil, errors.New("no state info available")
747
st, err := state.Open(
748
agentConfig.Model(), info, mongo.DefaultDialOpts(),
749
stateenvirons.GetNewPolicyFunc(
750
stateenvirons.GetNewEnvironFunc(environs.New),
754
return nil, errors.Trace(err)
759
// setupContainerSupport determines what containers can be run on this machine and
760
// initialises suitable infrastructure to support such containers.
761
func (a *MachineAgent) setupContainerSupport(runner worker.Runner, st api.Connection, agentConfig agent.Config) error {
762
var supportedContainers []instance.ContainerType
763
supportsContainers := container.ContainersSupported()
764
if supportsContainers {
765
supportedContainers = append(supportedContainers, instance.LXD)
768
supportsKvm, err := kvm.IsKVMSupported()
770
logger.Warningf("determining kvm support: %v\nno kvm containers possible", err)
772
if err == nil && supportsKvm {
773
supportedContainers = append(supportedContainers, instance.KVM)
776
return a.updateSupportedContainers(runner, st, supportedContainers, agentConfig)
779
// updateSupportedContainers records in state that a machine can run the specified containers.
780
// It starts a watcher and when a container of a given type is first added to the machine,
781
// the watcher is killed, the machine is set up to be able to start containers of the given type,
782
// and a suitable provisioner is started.
783
func (a *MachineAgent) updateSupportedContainers(
784
runner worker.Runner,
786
containers []instance.ContainerType,
787
agentConfig agent.Config,
789
pr := apiprovisioner.NewState(st)
790
tag := agentConfig.Tag().(names.MachineTag)
791
machine, err := pr.Machine(tag)
792
if errors.IsNotFound(err) || err == nil && machine.Life() == params.Dead {
793
return worker.ErrTerminateAgent
796
return errors.Annotatef(err, "cannot load machine %s from state", tag)
798
if len(containers) == 0 {
799
if err := machine.SupportsNoContainers(); err != nil {
800
return errors.Annotatef(err, "clearing supported containers for %s", tag)
804
if err := machine.SetSupportedContainers(containers...); err != nil {
805
return errors.Annotatef(err, "setting supported containers for %s", tag)
807
// Start the watcher to fire when a container is first requested on the machine.
808
watcherName := fmt.Sprintf("%s-container-watcher", machine.Id())
809
params := provisioner.ContainerSetupParams{
811
WorkerName: watcherName,
812
SupportedContainers: containers,
816
InitLockName: agent.MachineLockName,
818
handler := provisioner.NewContainerSetupHandler(params)
819
a.startWorkerAfterUpgrade(runner, watcherName, func() (worker.Worker, error) {
820
w, err := watcher.NewStringsWorker(watcher.StringsConfig{
824
return nil, errors.Annotatef(err, "cannot start %s worker", watcherName)
831
func (a *MachineAgent) initState(agentConfig agent.Config) (*state.State, error) {
832
// Start MongoDB server and dial.
833
if err := a.ensureMongoServer(agentConfig); err != nil {
837
st, _, err := openState(agentConfig, stateWorkerDialOpts)
842
reportOpenedState(st)
847
// startStateWorkers returns a worker running all the workers that
848
// require a *state.State connection.
849
func (a *MachineAgent) startStateWorkers(st *state.State) (worker.Worker, error) {
850
agentConfig := a.CurrentConfig()
852
m, err := getMachine(st, agentConfig.Tag())
854
return nil, errors.Annotate(err, "machine lookup")
857
runner := newConnRunner(st)
858
singularRunner, err := newSingularStateRunner(runner, st, m)
860
return nil, errors.Trace(err)
863
for _, job := range m.Jobs() {
865
case state.JobHostUnits:
866
// Implemented elsewhere with workers that use the API.
867
case state.JobManageModel:
869
a.startWorkerAfterUpgrade(runner, "model worker manager", func() (worker.Worker, error) {
870
w, err := modelworkermanager.New(modelworkermanager.Config{
872
NewWorker: a.startModelWorkers,
873
ErrorDelay: worker.RestartDelay,
876
return nil, errors.Annotate(err, "cannot start model worker manager")
880
a.startWorkerAfterUpgrade(runner, "peergrouper", func() (worker.Worker, error) {
881
env, err := stateenvirons.GetNewEnvironFunc(environs.New)(st)
883
return nil, errors.Annotate(err, "getting environ from state")
885
supportsSpaces := environs.SupportsSpaces(env)
886
w, err := peergrouperNew(st, supportsSpaces)
888
return nil, errors.Annotate(err, "cannot start peergrouper worker")
892
a.startWorkerAfterUpgrade(runner, "restore", func() (worker.Worker, error) {
893
w, err := a.newRestoreStateWatcherWorker(st)
895
return nil, errors.Annotate(err, "cannot start backup-restorer worker")
899
a.startWorkerAfterUpgrade(runner, "mongoupgrade", func() (worker.Worker, error) {
900
return newUpgradeMongoWorker(st, a.machineId, a.maybeStopMongo)
903
// certChangedChan is shared by multiple workers it's up
904
// to the agent to close it rather than any one of the
905
// workers. It is possible that multiple cert changes
906
// come in before the apiserver is up to receive them.
907
// Specify a bigger buffer to prevent deadlock when
908
// the apiserver isn't up yet. Use a size of 10 since we
909
// allow up to 7 controllers, and might also update the
910
// addresses of the local machine (127.0.0.1, ::1, etc).
912
// TODO(cherylj/waigani) Remove this workaround when
913
// certupdater and apiserver can properly manage dependencies
914
// through the dependency engine.
916
// TODO(ericsnow) For now we simply do not close the channel.
917
certChangedChan := make(chan params.StateServingInfo, 10)
918
// Each time apiserver worker is restarted, we need a fresh copy of state due
919
// to the fact that state holds lease managers which are killed and need to be reset.
920
stateOpener := func() (*state.State, error) {
921
logger.Debugf("opening state for apiserver worker")
922
st, _, err := openState(agentConfig, stateWorkerDialOpts)
925
runner.StartWorker("apiserver", a.apiserverWorkerStarter(stateOpener, certChangedChan))
926
var stateServingSetter certupdater.StateServingInfoSetter = func(info params.StateServingInfo, done <-chan struct{}) error {
927
return a.ChangeConfig(func(config agent.ConfigSetter) error {
928
config.SetStateServingInfo(info)
929
logger.Infof("update apiserver worker with new certificate")
931
case certChangedChan <- info:
938
a.startWorkerAfterUpgrade(runner, "certupdater", func() (worker.Worker, error) {
939
return newCertificateUpdater(m, agentConfig, st, st, stateServingSetter), nil
942
a.startWorkerAfterUpgrade(singularRunner, "dblogpruner", func() (worker.Worker, error) {
943
return dblogpruner.New(st, dblogpruner.NewLogPruneParams()), nil
946
a.startWorkerAfterUpgrade(singularRunner, "txnpruner", func() (worker.Worker, error) {
947
return txnpruner.New(st, time.Hour*2), nil
950
return nil, errors.Errorf("unknown job type %q", job)
956
// startModelWorkers starts the set of workers that run for every model
957
// in each controller.
958
func (a *MachineAgent) startModelWorkers(uuid string) (worker.Worker, error) {
959
modelAgent, err := model.WrapAgent(a, uuid)
961
return nil, errors.Trace(err)
964
engine, err := dependency.NewEngine(dependency.EngineConfig{
965
IsFatal: model.IsFatal,
966
WorstError: model.WorstError,
967
Filter: model.IgnoreErrRemoved,
968
ErrorDelay: 3 * time.Second,
969
BounceDelay: 10 * time.Millisecond,
972
return nil, errors.Trace(err)
975
manifolds := modelManifolds(model.ManifoldsConfig{
977
AgentConfigChanged: a.configChangedVal,
978
Clock: clock.WallClock,
979
RunFlagDuration: time.Minute,
980
CharmRevisionUpdateInterval: 24 * time.Hour,
981
InstPollerAggregationDelay: 3 * time.Second,
982
// TODO(perrito666) the status history pruning numbers need
983
// to be adjusting, after collecting user data from large install
984
// bases, to numbers allowing a rich and useful back history.
985
StatusHistoryPrunerMaxHistoryTime: 336 * time.Hour, // 2 weeks
986
StatusHistoryPrunerMaxHistoryMB: 5120, // 5G
987
StatusHistoryPrunerInterval: 5 * time.Minute,
988
SpacesImportedGate: a.discoverSpacesComplete,
989
NewEnvironFunc: newEnvirons,
991
if err := dependency.Install(engine, manifolds); err != nil {
992
if err := worker.Stop(engine); err != nil {
993
logger.Errorf("while stopping engine with bad manifolds: %v", err)
995
return nil, errors.Trace(err)
1000
// stateWorkerDialOpts is a mongo.DialOpts suitable
1001
// for use by StateWorker to dial mongo.
1003
// This must be overridden in tests, as it assumes
1004
// journaling is enabled.
1005
var stateWorkerDialOpts mongo.DialOpts
1007
func (a *MachineAgent) apiserverWorkerStarter(
1008
stateOpener func() (*state.State, error), certChanged chan params.StateServingInfo,
1009
) func() (worker.Worker, error) {
1010
return func() (worker.Worker, error) {
1011
st, err := stateOpener()
1013
return nil, errors.Trace(err)
1015
return a.newApiserverWorker(st, certChanged)
1019
func (a *MachineAgent) newApiserverWorker(st *state.State, certChanged chan params.StateServingInfo) (worker.Worker, error) {
1020
agentConfig := a.CurrentConfig()
1021
// If the configuration does not have the required information,
1022
// it is currently not a recoverable error, so we kill the whole
1023
// agent, potentially enabling human intervention to fix
1024
// the agent's configuration file.
1025
info, ok := agentConfig.StateServingInfo()
1027
return nil, &cmdutil.FatalError{"StateServingInfo not available and we need it"}
1029
cert := []byte(info.Cert)
1030
key := []byte(info.PrivateKey)
1032
if len(cert) == 0 || len(key) == 0 {
1033
return nil, &cmdutil.FatalError{"configuration does not have controller cert/key"}
1035
tag := agentConfig.Tag()
1036
dataDir := agentConfig.DataDir()
1037
logDir := agentConfig.LogDir()
1039
endpoint := net.JoinHostPort("", strconv.Itoa(info.APIPort))
1040
listener, err := net.Listen("tcp", endpoint)
1045
// TODO(katco): We should be doing something more serious than
1046
// logging audit errors. Failures in the auditing systems should
1047
// stop the api server until the problem can be corrected.
1048
auditErrorHandler := func(err error) {
1049
logger.Criticalf("%v", err)
1052
controllerConfig, err := st.ControllerConfig()
1054
return nil, errors.Annotate(err, "cannot fetch the controller config")
1057
server, err := apiserver.NewServer(st, listener, apiserver.ServerConfig{
1063
Validator: a.limitLogins,
1064
CertChanged: certChanged,
1065
NewObserver: newObserverFn(
1068
jujuversion.Current,
1069
agentConfig.Model().Id(),
1070
newAuditEntrySink(st, logDir),
1075
return nil, errors.Annotate(err, "cannot start api server worker")
1081
func newAuditEntrySink(st *state.State, logDir string) audit.AuditEntrySinkFn {
1082
persistFn := st.PutAuditEntryFn()
1083
fileSinkFn := audit.NewLogFileSink(logDir)
1084
return func(entry audit.AuditEntry) error {
1085
// We don't care about auditing anything but user actions.
1086
if _, err := names.ParseUserTag(entry.OriginName); err != nil {
1089
// TODO(wallyworld) - Pinger requests should not originate as a user action.
1090
if strings.HasPrefix(entry.Operation, "Pinger:") {
1093
persistErr := persistFn(entry)
1094
sinkErr := fileSinkFn(entry)
1095
if persistErr == nil {
1096
return errors.Annotate(sinkErr, "cannot save audit record to file")
1099
return errors.Annotate(persistErr, "cannot save audit record to database")
1101
return errors.Annotate(persistErr, "cannot save audit record to file or database")
1106
controllerConfig controller.Config,
1108
jujuServerVersion version.Number,
1110
persistAuditEntry audit.AuditEntrySinkFn,
1111
auditErrorHandler observer.ErrorHandler,
1112
) observer.ObserverFactory {
1114
var observerFactories []observer.ObserverFactory
1116
// Common logging of RPC requests
1117
observerFactories = append(observerFactories, func() observer.Observer {
1118
logger := loggo.GetLogger("juju.apiserver")
1119
ctx := observer.RequestObserverContext{
1123
return observer.NewRequestObserver(ctx)
1126
// Auditing observer
1127
// TODO(katco): Auditing needs feature tests (lp:1604551)
1128
if controllerConfig.AuditingEnabled() {
1129
observerFactories = append(observerFactories, func() observer.Observer {
1130
ctx := &observer.AuditContext{
1131
JujuServerVersion: jujuServerVersion,
1132
ModelUUID: modelUUID,
1134
return observer.NewAudit(ctx, persistAuditEntry, auditErrorHandler)
1138
return observer.ObserverFactoryMultiplexer(observerFactories...)
1142
// limitLogins is called by the API server for each login attempt.
1143
// it returns an error if upgrades or restore are running.
1144
func (a *MachineAgent) limitLogins(req params.LoginRequest) error {
1145
if err := a.limitLoginsDuringRestore(req); err != nil {
1148
if err := a.limitLoginsDuringUpgrade(req); err != nil {
1151
return a.limitLoginsDuringMongoUpgrade(req)
1154
func (a *MachineAgent) limitLoginsDuringMongoUpgrade(req params.LoginRequest) error {
1155
// If upgrade is running we will not be able to lock AgentConfigWriter
1156
// and it also means we are not upgrading mongo.
1157
if a.isUpgradeRunning() {
1160
cfg := a.AgentConfigWriter.CurrentConfig()
1161
ver := cfg.MongoVersion()
1162
if ver == mongo.MongoUpgrade {
1163
return errors.New("Upgrading Mongo")
1168
// limitLoginsDuringRestore will only allow logins for restore related purposes
1169
// while the different steps of restore are running.
1170
func (a *MachineAgent) limitLoginsDuringRestore(req params.LoginRequest) error {
1173
case a.IsRestoreRunning():
1174
err = apiserver.RestoreInProgressError
1175
case a.IsRestorePreparing():
1176
err = apiserver.AboutToRestoreError
1179
authTag, parseErr := names.ParseTag(req.AuthTag)
1180
if parseErr != nil {
1181
return errors.Annotate(err, "could not parse auth tag")
1183
switch authTag := authTag.(type) {
1185
// use a restricted API mode
1187
case names.MachineTag:
1188
if authTag == a.Tag() {
1189
// allow logins from the local machine
1193
return errors.Errorf("login for %q blocked because restore is in progress", authTag)
1198
// limitLoginsDuringUpgrade is called by the API server for each login
1199
// attempt. It returns an error if upgrades are in progress unless the
1200
// login is for a user (i.e. a client) or the local machine.
1201
func (a *MachineAgent) limitLoginsDuringUpgrade(req params.LoginRequest) error {
1202
if a.isUpgradeRunning() || a.isInitialUpgradeCheckPending() {
1203
authTag, err := names.ParseTag(req.AuthTag)
1205
return errors.Annotate(err, "could not parse auth tag")
1207
switch authTag := authTag.(type) {
1209
// use a restricted API mode
1210
return params.UpgradeInProgressError
1211
case names.MachineTag:
1212
if authTag == a.Tag() {
1213
// allow logins from the local machine
1217
return errors.Errorf("login for %q blocked because %s", authTag, params.CodeUpgradeInProgress)
1219
return nil // allow all logins
1223
var stateWorkerServingConfigErr = errors.New("state worker started with no state serving info")
1225
// ensureMongoServer ensures that mongo is installed and running,
1226
// and ready for opening a state connection.
1227
func (a *MachineAgent) ensureMongoServer(agentConfig agent.Config) (err error) {
1228
a.mongoInitMutex.Lock()
1229
defer a.mongoInitMutex.Unlock()
1230
if a.mongoInitialized {
1231
logger.Debugf("mongo is already initialized")
1236
a.mongoInitialized = true
1240
mongoInstalled, err := mongo.IsServiceInstalled()
1242
return errors.Annotate(err, "error while checking if mongodb service is installed")
1245
if !mongoInstalled {
1246
// EnsureMongoServer installs/upgrades the init config as necessary.
1247
ensureServerParams, err := cmdutil.NewEnsureServerParams(agentConfig)
1251
if err := cmdutil.EnsureMongoServer(ensureServerParams); err != nil {
1255
logger.Debugf("mongodb service is installed")
1257
// Mongo is installed, record the version.
1258
err = a.ChangeConfig(func(config agent.ConfigSetter) error {
1259
config.SetMongoVersion(mongo.InstalledVersion())
1263
return errors.Annotate(err, "cannot set mongo version")
1268
func openState(agentConfig agent.Config, dialOpts mongo.DialOpts) (_ *state.State, _ *state.Machine, err error) {
1269
info, ok := agentConfig.MongoInfo()
1271
return nil, nil, fmt.Errorf("no state info available")
1273
st, err := state.Open(agentConfig.Model(), info, dialOpts,
1274
stateenvirons.GetNewPolicyFunc(
1275
stateenvirons.GetNewEnvironFunc(environs.New),
1279
return nil, nil, err
1286
m0, err := st.FindEntity(agentConfig.Tag())
1288
if errors.IsNotFound(err) {
1289
err = worker.ErrTerminateAgent
1291
return nil, nil, err
1293
m := m0.(*state.Machine)
1294
if m.Life() == state.Dead {
1295
return nil, nil, worker.ErrTerminateAgent
1297
// Check the machine nonce as provisioned matches the agent.Conf value.
1298
if !m.CheckProvisioned(agentConfig.Nonce()) {
1299
// The agent is running on a different machine to the one it
1300
// should be according to state. It must stop immediately.
1301
logger.Errorf("running machine %v agent on inappropriate instance", m)
1302
return nil, nil, worker.ErrTerminateAgent
1307
func getMachine(st *state.State, tag names.Tag) (*state.Machine, error) {
1308
m0, err := st.FindEntity(tag)
1312
return m0.(*state.Machine), nil
1315
// startWorkerAfterUpgrade starts a worker to run the specified child worker
1316
// but only after waiting for upgrades to complete.
1317
func (a *MachineAgent) startWorkerAfterUpgrade(runner worker.Runner, name string, start func() (worker.Worker, error)) {
1318
runner.StartWorker(name, func() (worker.Worker, error) {
1319
return a.upgradeWaiterWorker(name, start), nil
1323
// upgradeWaiterWorker runs the specified worker after upgrades have completed.
1324
func (a *MachineAgent) upgradeWaiterWorker(name string, start func() (worker.Worker, error)) worker.Worker {
1325
return worker.NewSimpleWorker(func(stop <-chan struct{}) error {
1326
// Wait for the agent upgrade and upgrade steps to complete (or for us to be stopped).
1327
for _, ch := range []<-chan struct{}{
1328
a.upgradeComplete.Unlocked(),
1329
a.initialUpgradeCheckComplete.Unlocked(),
1337
logger.Debugf("upgrades done, starting worker %q", name)
1339
// Upgrades are done, start the worker.
1344
// Wait for worker to finish or for us to be stopped.
1345
done := make(chan error, 1)
1351
return errors.Annotatef(err, "worker %q exited", name)
1353
logger.Debugf("stopping so killing worker %q", name)
1354
return worker.Stop(w)
1359
// WorkersStarted returns a channel that's closed once all top level workers
1360
// have been started. This is provided for testing purposes.
1361
func (a *MachineAgent) WorkersStarted() <-chan struct{} {
1362
return a.workersStarted
1365
func (a *MachineAgent) Tag() names.Tag {
1366
return names.NewMachineTag(a.machineId)
1369
func (a *MachineAgent) createJujudSymlinks(dataDir string) error {
1370
jujud := filepath.Join(tools.ToolsDir(dataDir, a.Tag().String()), jujunames.Jujud)
1371
for _, link := range []string{jujuRun, jujuDumpLogs} {
1372
err := a.createSymlink(jujud, link)
1374
return errors.Annotatef(err, "failed to create %s symlink", link)
1380
func (a *MachineAgent) createSymlink(target, link string) error {
1381
fullLink := utils.EnsureBaseDir(a.rootDir, link)
1383
currentTarget, err := symlink.Read(fullLink)
1384
if err != nil && !os.IsNotExist(err) {
1386
} else if err == nil {
1387
// Link already in place - check it.
1388
if currentTarget == target {
1389
// Link already points to the right place - nothing to do.
1392
// Link points to the wrong place - delete it.
1393
if err := os.Remove(fullLink); err != nil {
1398
if err := os.MkdirAll(filepath.Dir(fullLink), os.FileMode(0755)); err != nil {
1401
return symlink.New(target, fullLink)
1404
func (a *MachineAgent) removeJujudSymlinks() (errs []error) {
1405
for _, link := range []string{jujuRun, jujuDumpLogs} {
1406
err := os.Remove(utils.EnsureBaseDir(a.rootDir, link))
1407
if err != nil && !os.IsNotExist(err) {
1408
errs = append(errs, errors.Annotatef(err, "failed to remove %s symlink", link))
1414
func (a *MachineAgent) uninstallAgent() error {
1415
// We should only uninstall if the uninstall file is present.
1416
if !agent.CanUninstall(a) {
1417
logger.Infof("ignoring uninstall request")
1420
logger.Infof("uninstalling agent")
1422
agentConfig := a.CurrentConfig()
1424
agentServiceName := agentConfig.Value(agent.AgentServiceName)
1425
if agentServiceName == "" {
1426
// For backwards compatibility, handle lack of AgentServiceName.
1427
agentServiceName = os.Getenv("UPSTART_JOB")
1430
if agentServiceName != "" {
1431
svc, err := service.DiscoverService(agentServiceName, common.Conf{})
1433
errs = append(errs, fmt.Errorf("cannot remove service %q: %v", agentServiceName, err))
1434
} else if err := svc.Remove(); err != nil {
1435
errs = append(errs, fmt.Errorf("cannot remove service %q: %v", agentServiceName, err))
1439
errs = append(errs, a.removeJujudSymlinks()...)
1441
// TODO(fwereade): surely this shouldn't be happening here? Once we're
1442
// at this point we should expect to be killed in short order; if this
1443
// work is remotely important we should be blocking machine death on
1445
insideContainer := container.RunningInContainer()
1446
if insideContainer {
1447
// We're running inside a container, so loop devices may leak. Detach
1448
// any loop devices that are backed by files on this machine.
1449
if err := a.loopDeviceManager.DetachLoopDevices("/", agentConfig.DataDir()); err != nil {
1450
errs = append(errs, err)
1454
if err := mongo.RemoveService(); err != nil {
1455
errs = append(errs, errors.Annotate(err, "cannot stop/remove mongo service"))
1457
if err := os.RemoveAll(agentConfig.DataDir()); err != nil {
1458
errs = append(errs, err)
1463
return fmt.Errorf("uninstall failed: %v", errs)
1466
func newConnRunner(conns ...cmdutil.Pinger) worker.Runner {
1467
return worker.NewRunner(cmdutil.ConnectionIsFatal(logger, conns...), cmdutil.MoreImportant, worker.RestartDelay)
1470
type MongoSessioner interface {
1471
MongoSession() *mgo.Session
1474
func newSingularStateRunner(runner worker.Runner, st MongoSessioner, m *state.Machine) (worker.Runner, error) {
1475
singularStateConn := singularStateConn{st.MongoSession(), m}
1476
singularRunner, err := newSingularRunner(runner, singularStateConn)
1478
return nil, errors.Annotate(err, "cannot make singular State Runner")
1480
return singularRunner, err
1483
// singularStateConn implements singular.Conn on
1484
// top of a State connection.
1485
type singularStateConn struct {
1486
session *mgo.Session
1487
machine *state.Machine
1490
func (c singularStateConn) IsMaster() (bool, error) {
1491
return mongo.IsMaster(c.session, c.machine)
1494
func (c singularStateConn) Ping() error {
1495
return c.session.Ping()
1498
func metricAPI(st api.Connection) (metricsmanager.MetricsManagerClient, error) {
1499
client, err := metricsmanager.NewClient(st)
1501
return nil, errors.Trace(err)
1506
// newDeployContext gives the tests the opportunity to create a deployer.Context
1507
// that can be used for testing so as to avoid (1) deploying units to the system
1508
// running the tests and (2) get access to the *State used internally, so that
1509
// tests can be run without waiting for the 5s watcher refresh time to which we would
1510
// otherwise be restricted.
1511
var newDeployContext = func(st *apideployer.State, agentConfig agent.Config) deployer.Context {
1512
return deployer.NewSimpleContext(agentConfig, st)