~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/cmd/jujud/agent/machine.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2012, 2013 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package agent
 
5
 
 
6
import (
 
7
        "fmt"
 
8
        "net"
 
9
        "os"
 
10
        "path/filepath"
 
11
        "runtime"
 
12
        "strconv"
 
13
        "strings"
 
14
        "sync"
 
15
        "time"
 
16
 
 
17
        "github.com/juju/cmd"
 
18
        "github.com/juju/errors"
 
19
        apiagent "github.com/juju/juju/api/agent"
 
20
        apimachiner "github.com/juju/juju/api/machiner"
 
21
        "github.com/juju/juju/controller"
 
22
        "github.com/juju/loggo"
 
23
        "github.com/juju/replicaset"
 
24
        "github.com/juju/utils"
 
25
        "github.com/juju/utils/clock"
 
26
        "github.com/juju/utils/featureflag"
 
27
        "github.com/juju/utils/series"
 
28
        "github.com/juju/utils/set"
 
29
        "github.com/juju/utils/symlink"
 
30
        "github.com/juju/utils/voyeur"
 
31
        "github.com/juju/version"
 
32
        "gopkg.in/juju/charmrepo.v2-unstable"
 
33
        "gopkg.in/juju/names.v2"
 
34
        "gopkg.in/mgo.v2"
 
35
        "gopkg.in/natefinch/lumberjack.v2"
 
36
        "launchpad.net/gnuflag"
 
37
        "launchpad.net/tomb"
 
38
 
 
39
        "github.com/juju/juju/agent"
 
40
        "github.com/juju/juju/agent/tools"
 
41
        "github.com/juju/juju/api"
 
42
        apideployer "github.com/juju/juju/api/deployer"
 
43
        "github.com/juju/juju/api/metricsmanager"
 
44
        apiprovisioner "github.com/juju/juju/api/provisioner"
 
45
        "github.com/juju/juju/apiserver"
 
46
        "github.com/juju/juju/apiserver/observer"
 
47
        "github.com/juju/juju/apiserver/params"
 
48
        "github.com/juju/juju/audit"
 
49
        "github.com/juju/juju/cert"
 
50
        "github.com/juju/juju/cmd/jujud/agent/machine"
 
51
        "github.com/juju/juju/cmd/jujud/agent/model"
 
52
        "github.com/juju/juju/cmd/jujud/reboot"
 
53
        cmdutil "github.com/juju/juju/cmd/jujud/util"
 
54
        "github.com/juju/juju/container"
 
55
        "github.com/juju/juju/container/kvm"
 
56
        "github.com/juju/juju/environs"
 
57
        "github.com/juju/juju/environs/simplestreams"
 
58
        "github.com/juju/juju/instance"
 
59
        jujunames "github.com/juju/juju/juju/names"
 
60
        "github.com/juju/juju/juju/paths"
 
61
        "github.com/juju/juju/mongo"
 
62
        "github.com/juju/juju/service"
 
63
        "github.com/juju/juju/service/common"
 
64
        "github.com/juju/juju/state"
 
65
        "github.com/juju/juju/state/multiwatcher"
 
66
        "github.com/juju/juju/state/stateenvirons"
 
67
        "github.com/juju/juju/storage/looputil"
 
68
        "github.com/juju/juju/upgrades"
 
69
        jujuversion "github.com/juju/juju/version"
 
70
        "github.com/juju/juju/watcher"
 
71
        "github.com/juju/juju/worker"
 
72
        "github.com/juju/juju/worker/apicaller"
 
73
        "github.com/juju/juju/worker/certupdater"
 
74
        "github.com/juju/juju/worker/conv2state"
 
75
        "github.com/juju/juju/worker/dblogpruner"
 
76
        "github.com/juju/juju/worker/dependency"
 
77
        "github.com/juju/juju/worker/deployer"
 
78
        "github.com/juju/juju/worker/gate"
 
79
        "github.com/juju/juju/worker/imagemetadataworker"
 
80
        "github.com/juju/juju/worker/logsender"
 
81
        "github.com/juju/juju/worker/modelworkermanager"
 
82
        "github.com/juju/juju/worker/mongoupgrader"
 
83
        "github.com/juju/juju/worker/peergrouper"
 
84
        "github.com/juju/juju/worker/provisioner"
 
85
        "github.com/juju/juju/worker/singular"
 
86
        "github.com/juju/juju/worker/txnpruner"
 
87
        "github.com/juju/juju/worker/upgradesteps"
 
88
)
 
89
 
 
90
var (
 
91
        logger       = loggo.GetLogger("juju.cmd.jujud")
 
92
        jujuRun      = paths.MustSucceed(paths.JujuRun(series.HostSeries()))
 
93
        jujuDumpLogs = paths.MustSucceed(paths.JujuDumpLogs(series.HostSeries()))
 
94
 
 
95
        // The following are defined as variables to allow the tests to
 
96
        // intercept calls to the functions. In every case, they should
 
97
        // be expressed as explicit dependencies, but nobody has yet had
 
98
        // the intestinal fortitude to untangle this package. Be that
 
99
        // person! Juju Needs You.
 
100
        useMultipleCPUs       = utils.UseMultipleCPUs
 
101
        newSingularRunner     = singular.New
 
102
        peergrouperNew        = peergrouper.New
 
103
        newCertificateUpdater = certupdater.NewCertificateUpdater
 
104
        newMetadataUpdater    = imagemetadataworker.NewWorker
 
105
        newUpgradeMongoWorker = mongoupgrader.New
 
106
        reportOpenedState     = func(*state.State) {}
 
107
 
 
108
        modelManifolds   = model.Manifolds
 
109
        machineManifolds = machine.Manifolds
 
110
)
 
111
 
 
112
// Variable to override in tests, default is true
 
113
var ProductionMongoWriteConcern = true
 
114
 
 
115
func init() {
 
116
        stateWorkerDialOpts = mongo.DefaultDialOpts()
 
117
        stateWorkerDialOpts.PostDial = func(session *mgo.Session) error {
 
118
                safe := mgo.Safe{}
 
119
                if ProductionMongoWriteConcern {
 
120
                        safe.J = true
 
121
                        _, err := replicaset.CurrentConfig(session)
 
122
                        if err == nil {
 
123
                                // set mongo to write-majority (writes only returned after
 
124
                                // replicated to a majority of replica-set members).
 
125
                                safe.WMode = "majority"
 
126
                        }
 
127
                }
 
128
                session.SetSafe(&safe)
 
129
                return nil
 
130
        }
 
131
}
 
132
 
 
133
// AgentInitializer handles initializing a type for use as a Jujud
 
134
// agent.
 
135
type AgentInitializer interface {
 
136
        AddFlags(*gnuflag.FlagSet)
 
137
        CheckArgs([]string) error
 
138
}
 
139
 
 
140
// AgentConfigWriter encapsulates disk I/O operations with the agent
 
141
// config.
 
142
type AgentConfigWriter interface {
 
143
        // ReadConfig reads the config for the given tag from disk.
 
144
        ReadConfig(tag string) error
 
145
        // ChangeConfig executes the given agent.ConfigMutator in a
 
146
        // thread-safe context.
 
147
        ChangeConfig(agent.ConfigMutator) error
 
148
        // CurrentConfig returns a copy of the in-memory agent config.
 
149
        CurrentConfig() agent.Config
 
150
}
 
151
 
 
152
// NewMachineAgentCmd creates a Command which handles parsing
 
153
// command-line arguments and instantiating and running a
 
154
// MachineAgent.
 
155
func NewMachineAgentCmd(
 
156
        ctx *cmd.Context,
 
157
        machineAgentFactory func(string) *MachineAgent,
 
158
        agentInitializer AgentInitializer,
 
159
        configFetcher AgentConfigWriter,
 
160
) cmd.Command {
 
161
        return &machineAgentCmd{
 
162
                ctx:                 ctx,
 
163
                machineAgentFactory: machineAgentFactory,
 
164
                agentInitializer:    agentInitializer,
 
165
                currentConfig:       configFetcher,
 
166
        }
 
167
}
 
168
 
 
169
type machineAgentCmd struct {
 
170
        cmd.CommandBase
 
171
 
 
172
        // This group of arguments is required.
 
173
        agentInitializer    AgentInitializer
 
174
        currentConfig       AgentConfigWriter
 
175
        machineAgentFactory func(string) *MachineAgent
 
176
        ctx                 *cmd.Context
 
177
 
 
178
        // This group is for debugging purposes.
 
179
        logToStdErr bool
 
180
 
 
181
        // The following are set via command-line flags.
 
182
        machineId string
 
183
}
 
184
 
 
185
// Init is called by the cmd system to initialize the structure for
 
186
// running.
 
187
func (a *machineAgentCmd) Init(args []string) error {
 
188
 
 
189
        if !names.IsValidMachine(a.machineId) {
 
190
                return fmt.Errorf("--machine-id option must be set, and expects a non-negative integer")
 
191
        }
 
192
        if err := a.agentInitializer.CheckArgs(args); err != nil {
 
193
                return err
 
194
        }
 
195
 
 
196
        // Due to changes in the logging, and needing to care about old
 
197
        // models that have been upgraded, we need to explicitly remove the
 
198
        // file writer if one has been added, otherwise we will get duplicate
 
199
        // lines of all logging in the log file.
 
200
        loggo.RemoveWriter("logfile")
 
201
 
 
202
        if a.logToStdErr {
 
203
                return nil
 
204
        }
 
205
 
 
206
        err := a.currentConfig.ReadConfig(names.NewMachineTag(a.machineId).String())
 
207
        if err != nil {
 
208
                return errors.Annotate(err, "cannot read agent configuration")
 
209
        }
 
210
 
 
211
        // the context's stderr is set as the loggo writer in github.com/juju/cmd/logging.go
 
212
        a.ctx.Stderr = &lumberjack.Logger{
 
213
                Filename:   agent.LogFilename(a.currentConfig.CurrentConfig()),
 
214
                MaxSize:    300, // megabytes
 
215
                MaxBackups: 2,
 
216
        }
 
217
 
 
218
        return nil
 
219
}
 
220
 
 
221
// Run instantiates a MachineAgent and runs it.
 
222
func (a *machineAgentCmd) Run(c *cmd.Context) error {
 
223
        machineAgent := a.machineAgentFactory(a.machineId)
 
224
        return machineAgent.Run(c)
 
225
}
 
226
 
 
227
// SetFlags adds the requisite flags to run this command.
 
228
func (a *machineAgentCmd) SetFlags(f *gnuflag.FlagSet) {
 
229
        a.agentInitializer.AddFlags(f)
 
230
        f.StringVar(&a.machineId, "machine-id", "", "id of the machine to run")
 
231
}
 
232
 
 
233
// Info returns usage information for the command.
 
234
func (a *machineAgentCmd) Info() *cmd.Info {
 
235
        return &cmd.Info{
 
236
                Name:    "machine",
 
237
                Purpose: "run a juju machine agent",
 
238
        }
 
239
}
 
240
 
 
241
// MachineAgentFactoryFn returns a function which instantiates a
 
242
// MachineAgent given a machineId.
 
243
func MachineAgentFactoryFn(
 
244
        agentConfWriter AgentConfigWriter,
 
245
        bufferedLogs logsender.LogRecordCh,
 
246
        rootDir string,
 
247
) func(string) *MachineAgent {
 
248
        return func(machineId string) *MachineAgent {
 
249
                return NewMachineAgent(
 
250
                        machineId,
 
251
                        agentConfWriter,
 
252
                        bufferedLogs,
 
253
                        worker.NewRunner(cmdutil.IsFatal, cmdutil.MoreImportant, worker.RestartDelay),
 
254
                        looputil.NewLoopDeviceManager(),
 
255
                        rootDir,
 
256
                )
 
257
        }
 
258
}
 
259
 
 
260
// NewMachineAgent instantiates a new MachineAgent.
 
261
func NewMachineAgent(
 
262
        machineId string,
 
263
        agentConfWriter AgentConfigWriter,
 
264
        bufferedLogs logsender.LogRecordCh,
 
265
        runner worker.Runner,
 
266
        loopDeviceManager looputil.LoopDeviceManager,
 
267
        rootDir string,
 
268
) *MachineAgent {
 
269
        return &MachineAgent{
 
270
                machineId:                   machineId,
 
271
                AgentConfigWriter:           agentConfWriter,
 
272
                configChangedVal:            voyeur.NewValue(true),
 
273
                bufferedLogs:                bufferedLogs,
 
274
                workersStarted:              make(chan struct{}),
 
275
                runner:                      runner,
 
276
                rootDir:                     rootDir,
 
277
                initialUpgradeCheckComplete: gate.NewLock(),
 
278
                loopDeviceManager:           loopDeviceManager,
 
279
        }
 
280
}
 
281
 
 
282
// MachineAgent is responsible for tying together all functionality
 
283
// needed to orchestrate a Jujud instance which controls a machine.
 
284
type MachineAgent struct {
 
285
        AgentConfigWriter
 
286
 
 
287
        tomb             tomb.Tomb
 
288
        machineId        string
 
289
        runner           worker.Runner
 
290
        rootDir          string
 
291
        bufferedLogs     logsender.LogRecordCh
 
292
        configChangedVal *voyeur.Value
 
293
        upgradeComplete  gate.Lock
 
294
        workersStarted   chan struct{}
 
295
 
 
296
        // XXX(fwereade): these smell strongly of goroutine-unsafeness.
 
297
        restoreMode bool
 
298
        restoring   bool
 
299
 
 
300
        // Used to signal that the upgrade worker will not
 
301
        // reboot the agent on startup because there are no
 
302
        // longer any immediately pending agent upgrades.
 
303
        initialUpgradeCheckComplete gate.Lock
 
304
 
 
305
        discoverSpacesComplete gate.Lock
 
306
 
 
307
        mongoInitMutex   sync.Mutex
 
308
        mongoInitialized bool
 
309
 
 
310
        loopDeviceManager looputil.LoopDeviceManager
 
311
}
 
312
 
 
313
// IsRestorePreparing returns bool representing if we are in restore mode
 
314
// but not running restore.
 
315
func (a *MachineAgent) IsRestorePreparing() bool {
 
316
        return a.restoreMode && !a.restoring
 
317
}
 
318
 
 
319
// IsRestoreRunning returns bool representing if we are in restore mode
 
320
// and running the actual restore process.
 
321
func (a *MachineAgent) IsRestoreRunning() bool {
 
322
        return a.restoring
 
323
}
 
324
 
 
325
func (a *MachineAgent) isUpgradeRunning() bool {
 
326
        return !a.upgradeComplete.IsUnlocked()
 
327
}
 
328
 
 
329
func (a *MachineAgent) isInitialUpgradeCheckPending() bool {
 
330
        return !a.initialUpgradeCheckComplete.IsUnlocked()
 
331
}
 
332
 
 
333
// Wait waits for the machine agent to finish.
 
334
func (a *MachineAgent) Wait() error {
 
335
        return a.tomb.Wait()
 
336
}
 
337
 
 
338
// Stop stops the machine agent.
 
339
func (a *MachineAgent) Stop() error {
 
340
        a.runner.Kill()
 
341
        return a.tomb.Wait()
 
342
}
 
343
 
 
344
// upgradeCertificateDNSNames ensure that the controller certificate
 
345
// recorded in the agent config and also mongo server.pem contains the
 
346
// DNSNames entires required by Juju/
 
347
func (a *MachineAgent) upgradeCertificateDNSNames() error {
 
348
        agentConfig := a.CurrentConfig()
 
349
        si, ok := agentConfig.StateServingInfo()
 
350
        if !ok || si.CAPrivateKey == "" {
 
351
                // No certificate information exists yet, nothing to do.
 
352
                return nil
 
353
        }
 
354
        // Parse the current certificate to get the current dns names.
 
355
        serverCert, err := cert.ParseCert(si.Cert)
 
356
        if err != nil {
 
357
                return err
 
358
        }
 
359
        update := false
 
360
        dnsNames := set.NewStrings(serverCert.DNSNames...)
 
361
        requiredDNSNames := []string{"local", "juju-apiserver", "juju-mongodb"}
 
362
        for _, dnsName := range requiredDNSNames {
 
363
                if dnsNames.Contains(dnsName) {
 
364
                        continue
 
365
                }
 
366
                dnsNames.Add(dnsName)
 
367
                update = true
 
368
        }
 
369
        if !update {
 
370
                return nil
 
371
        }
 
372
        // Write a new certificate to the mongo pem and agent config files.
 
373
        si.Cert, si.PrivateKey, err = cert.NewDefaultServer(agentConfig.CACert(), si.CAPrivateKey, dnsNames.Values())
 
374
        if err != nil {
 
375
                return err
 
376
        }
 
377
        if err := mongo.UpdateSSLKey(agentConfig.DataDir(), si.Cert, si.PrivateKey); err != nil {
 
378
                return err
 
379
        }
 
380
        return a.AgentConfigWriter.ChangeConfig(func(config agent.ConfigSetter) error {
 
381
                config.SetStateServingInfo(si)
 
382
                return nil
 
383
        })
 
384
}
 
385
 
 
386
// Run runs a machine agent.
 
387
func (a *MachineAgent) Run(*cmd.Context) error {
 
388
 
 
389
        defer a.tomb.Done()
 
390
        if err := a.ReadConfig(a.Tag().String()); err != nil {
 
391
                return fmt.Errorf("cannot read agent configuration: %v", err)
 
392
        }
 
393
 
 
394
        logger.Infof("machine agent %v start (%s [%s])", a.Tag(), jujuversion.Current, runtime.Compiler)
 
395
        if flags := featureflag.String(); flags != "" {
 
396
                logger.Warningf("developer feature flags enabled: %s", flags)
 
397
        }
 
398
 
 
399
        // Before doing anything else, we need to make sure the certificate generated for
 
400
        // use by mongo to validate controller connections is correct. This needs to be done
 
401
        // before any possible restart of the mongo service.
 
402
        // See bug http://pad.lv/1434680
 
403
        if err := a.upgradeCertificateDNSNames(); err != nil {
 
404
                return errors.Annotate(err, "error upgrading server certificate")
 
405
        }
 
406
 
 
407
        if upgradeComplete, err := upgradesteps.NewLock(a); err != nil {
 
408
                return errors.Annotate(err, "error during creating upgrade completion channel")
 
409
        } else {
 
410
                a.upgradeComplete = upgradeComplete
 
411
        }
 
412
 
 
413
        agentConfig := a.CurrentConfig()
 
414
        createEngine := a.makeEngineCreator(agentConfig.UpgradedToVersion())
 
415
        charmrepo.CacheDir = filepath.Join(agentConfig.DataDir(), "charmcache")
 
416
        if err := a.createJujudSymlinks(agentConfig.DataDir()); err != nil {
 
417
                return err
 
418
        }
 
419
        a.runner.StartWorker("engine", createEngine)
 
420
 
 
421
        // At this point, all workers will have been configured to start
 
422
        close(a.workersStarted)
 
423
        err := a.runner.Wait()
 
424
        switch errors.Cause(err) {
 
425
        case worker.ErrTerminateAgent:
 
426
                err = a.uninstallAgent()
 
427
        case worker.ErrRebootMachine:
 
428
                logger.Infof("Caught reboot error")
 
429
                err = a.executeRebootOrShutdown(params.ShouldReboot)
 
430
        case worker.ErrShutdownMachine:
 
431
                logger.Infof("Caught shutdown error")
 
432
                err = a.executeRebootOrShutdown(params.ShouldShutdown)
 
433
        }
 
434
        err = cmdutil.AgentDone(logger, err)
 
435
        a.tomb.Kill(err)
 
436
        return err
 
437
}
 
438
 
 
439
func (a *MachineAgent) makeEngineCreator(previousAgentVersion version.Number) func() (worker.Worker, error) {
 
440
        return func() (worker.Worker, error) {
 
441
                config := dependency.EngineConfig{
 
442
                        IsFatal:     cmdutil.IsFatal,
 
443
                        WorstError:  cmdutil.MoreImportantError,
 
444
                        ErrorDelay:  3 * time.Second,
 
445
                        BounceDelay: 10 * time.Millisecond,
 
446
                }
 
447
                engine, err := dependency.NewEngine(config)
 
448
                if err != nil {
 
449
                        return nil, err
 
450
                }
 
451
                manifolds := machineManifolds(machine.ManifoldsConfig{
 
452
                        PreviousAgentVersion: previousAgentVersion,
 
453
                        Agent:                agent.APIHostPortsSetter{Agent: a},
 
454
                        RootDir:              a.rootDir,
 
455
                        AgentConfigChanged:   a.configChangedVal,
 
456
                        UpgradeStepsLock:     a.upgradeComplete,
 
457
                        UpgradeCheckLock:     a.initialUpgradeCheckComplete,
 
458
                        OpenState:            a.initState,
 
459
                        OpenStateForUpgrade:  a.openStateForUpgrade,
 
460
                        StartStateWorkers:    a.startStateWorkers,
 
461
                        StartAPIWorkers:      a.startAPIWorkers,
 
462
                        PreUpgradeSteps:      upgrades.PreUpgradeSteps,
 
463
                        LogSource:            a.bufferedLogs,
 
464
                        NewDeployContext:     newDeployContext,
 
465
                        Clock:                clock.WallClock,
 
466
                })
 
467
                if err := dependency.Install(engine, manifolds); err != nil {
 
468
                        if err := worker.Stop(engine); err != nil {
 
469
                                logger.Errorf("while stopping engine with bad manifolds: %v", err)
 
470
                        }
 
471
                        return nil, err
 
472
                }
 
473
                return engine, nil
 
474
        }
 
475
}
 
476
 
 
477
func (a *MachineAgent) executeRebootOrShutdown(action params.RebootAction) error {
 
478
        // At this stage, all API connections would have been closed
 
479
        // We need to reopen the API to clear the reboot flag after
 
480
        // scheduling the reboot. It may be cleaner to do this in the reboot
 
481
        // worker, before returning the ErrRebootMachine.
 
482
        conn, err := apicaller.OnlyConnect(a, apicaller.APIOpen)
 
483
        if err != nil {
 
484
                logger.Infof("Reboot: Error connecting to state")
 
485
                return errors.Trace(err)
 
486
        }
 
487
 
 
488
        // block until all units/containers are ready, and reboot/shutdown
 
489
        finalize, err := reboot.NewRebootWaiter(conn, a.CurrentConfig())
 
490
        if err != nil {
 
491
                return errors.Trace(err)
 
492
        }
 
493
 
 
494
        logger.Infof("Reboot: Executing reboot")
 
495
        err = finalize.ExecuteReboot(action)
 
496
        if err != nil {
 
497
                logger.Infof("Reboot: Error executing reboot: %v", err)
 
498
                return errors.Trace(err)
 
499
        }
 
500
        // On windows, the shutdown command is asynchronous. We return ErrRebootMachine
 
501
        // so the agent will simply exit without error pending reboot/shutdown.
 
502
        return worker.ErrRebootMachine
 
503
}
 
504
 
 
505
func (a *MachineAgent) ChangeConfig(mutate agent.ConfigMutator) error {
 
506
        err := a.AgentConfigWriter.ChangeConfig(mutate)
 
507
        a.configChangedVal.Set(true)
 
508
        return errors.Trace(err)
 
509
}
 
510
 
 
511
func (a *MachineAgent) maybeStopMongo(ver mongo.Version, isMaster bool) error {
 
512
        if !a.mongoInitialized {
 
513
                return nil
 
514
        }
 
515
 
 
516
        conf := a.AgentConfigWriter.CurrentConfig()
 
517
        v := conf.MongoVersion()
 
518
 
 
519
        logger.Errorf("Got version change %v", ver)
 
520
        // TODO(perrito666) replace with "read-only" mode for environment when
 
521
        // it is available.
 
522
        if ver.NewerThan(v) > 0 {
 
523
                err := a.AgentConfigWriter.ChangeConfig(func(config agent.ConfigSetter) error {
 
524
                        config.SetMongoVersion(mongo.MongoUpgrade)
 
525
                        return nil
 
526
                })
 
527
                if err != nil {
 
528
                        return err
 
529
                }
 
530
 
 
531
        }
 
532
        return nil
 
533
 
 
534
}
 
535
 
 
536
// PrepareRestore will flag the agent to allow only a limited set
 
537
// of commands defined in
 
538
// "github.com/juju/juju/apiserver".allowedMethodsAboutToRestore
 
539
// the most noteworthy is:
 
540
// Backups.Restore: this will ensure that we can do all the file movements
 
541
// required for restore and no one will do changes while we do that.
 
542
// it will return error if the machine is already in this state.
 
543
func (a *MachineAgent) PrepareRestore() error {
 
544
        if a.restoreMode {
 
545
                return errors.Errorf("already in restore mode")
 
546
        }
 
547
        a.restoreMode = true
 
548
        return nil
 
549
}
 
550
 
 
551
// BeginRestore will flag the agent to disallow all commands since
 
552
// restore should be running and therefore making changes that
 
553
// would override anything done.
 
554
func (a *MachineAgent) BeginRestore() error {
 
555
        switch {
 
556
        case !a.restoreMode:
 
557
                return errors.Errorf("not in restore mode, cannot begin restoration")
 
558
        case a.restoring:
 
559
                return errors.Errorf("already restoring")
 
560
        }
 
561
        a.restoring = true
 
562
        return nil
 
563
}
 
564
 
 
565
// EndRestore will flag the agent to allow all commands
 
566
// This being invoked means that restore process failed
 
567
// since success restarts the agent.
 
568
func (a *MachineAgent) EndRestore() {
 
569
        a.restoreMode = false
 
570
        a.restoring = false
 
571
}
 
572
 
 
573
// newRestoreStateWatcherWorker will return a worker or err if there
 
574
// is a failure, the worker takes care of watching the state of
 
575
// restoreInfo doc and put the agent in the different restore modes.
 
576
func (a *MachineAgent) newRestoreStateWatcherWorker(st *state.State) (worker.Worker, error) {
 
577
        rWorker := func(stopch <-chan struct{}) error {
 
578
                return a.restoreStateWatcher(st, stopch)
 
579
        }
 
580
        return worker.NewSimpleWorker(rWorker), nil
 
581
}
 
582
 
 
583
// restoreChanged will be called whenever restoreInfo doc changes signaling a new
 
584
// step in the restore process.
 
585
func (a *MachineAgent) restoreChanged(st *state.State) error {
 
586
        status, err := st.RestoreInfo().Status()
 
587
        if err != nil {
 
588
                return errors.Annotate(err, "cannot read restore state")
 
589
        }
 
590
        switch status {
 
591
        case state.RestorePending:
 
592
                a.PrepareRestore()
 
593
        case state.RestoreInProgress:
 
594
                a.BeginRestore()
 
595
        case state.RestoreFailed:
 
596
                a.EndRestore()
 
597
        }
 
598
        return nil
 
599
}
 
600
 
 
601
// restoreStateWatcher watches for restoreInfo looking for changes in the restore process.
 
602
func (a *MachineAgent) restoreStateWatcher(st *state.State, stopch <-chan struct{}) error {
 
603
        restoreWatch := st.WatchRestoreInfoChanges()
 
604
        defer func() {
 
605
                restoreWatch.Kill()
 
606
                restoreWatch.Wait()
 
607
        }()
 
608
 
 
609
        for {
 
610
                select {
 
611
                case <-restoreWatch.Changes():
 
612
                        if err := a.restoreChanged(st); err != nil {
 
613
                                return err
 
614
                        }
 
615
                case <-stopch:
 
616
                        return nil
 
617
                }
 
618
        }
 
619
}
 
620
 
 
621
var newEnvirons = environs.New
 
622
 
 
623
// startAPIWorkers is called to start workers which rely on the
 
624
// machine agent's API connection (via the apiworkers manifold). It
 
625
// returns a Runner with a number of workers attached to it.
 
626
//
 
627
// The workers started here need to be converted to run under the
 
628
// dependency engine. Once they have all been converted, this method -
 
629
// and the apiworkers manifold - can be removed.
 
630
func (a *MachineAgent) startAPIWorkers(apiConn api.Connection) (_ worker.Worker, outErr error) {
 
631
        agentConfig := a.CurrentConfig()
 
632
 
 
633
        entity, err := apiagent.NewState(apiConn).Entity(a.Tag())
 
634
        if err != nil {
 
635
                return nil, errors.Trace(err)
 
636
        }
 
637
 
 
638
        var isModelManager bool
 
639
        for _, job := range entity.Jobs() {
 
640
                switch job {
 
641
                case multiwatcher.JobManageModel:
 
642
                        isModelManager = true
 
643
                default:
 
644
                        // TODO(dimitern): Once all workers moved over to using
 
645
                        // the API, report "unknown job type" here.
 
646
                }
 
647
        }
 
648
 
 
649
        runner := newConnRunner(apiConn)
 
650
        defer func() {
 
651
                // If startAPIWorkers exits early with an error, stop the
 
652
                // runner so that any already started runners aren't leaked.
 
653
                if outErr != nil {
 
654
                        worker.Stop(runner)
 
655
                }
 
656
        }()
 
657
 
 
658
        // Perform the operations needed to set up hosting for containers.
 
659
        if err := a.setupContainerSupport(runner, apiConn, agentConfig); err != nil {
 
660
                cause := errors.Cause(err)
 
661
                if params.IsCodeDead(cause) || cause == worker.ErrTerminateAgent {
 
662
                        return nil, worker.ErrTerminateAgent
 
663
                }
 
664
                return nil, fmt.Errorf("setting up container support: %v", err)
 
665
        }
 
666
 
 
667
        if isModelManager {
 
668
 
 
669
                // Published image metadata for some providers are in simple streams.
 
670
                // Providers that do not depend on simple streams do not need this worker.
 
671
                env, err := environs.GetEnviron(apiagent.NewState(apiConn), newEnvirons)
 
672
                if err != nil {
 
673
                        return nil, errors.Annotate(err, "getting environ")
 
674
                }
 
675
                if _, ok := env.(simplestreams.HasRegion); ok {
 
676
                        // Start worker that stores published image metadata in state.
 
677
                        runner.StartWorker("imagemetadata", func() (worker.Worker, error) {
 
678
                                return newMetadataUpdater(apiConn.MetadataUpdater()), nil
 
679
                        })
 
680
                }
 
681
 
 
682
                // We don't have instance info set and the network config for the
 
683
                // bootstrap machine only, so update it now. All the other machines will
 
684
                // have instance info including network config set at provisioning time.
 
685
                if err := a.setControllerNetworkConfig(apiConn); err != nil {
 
686
                        return nil, errors.Annotate(err, "setting controller network config")
 
687
                }
 
688
        } else {
 
689
                runner.StartWorker("stateconverter", func() (worker.Worker, error) {
 
690
                        // TODO(fwereade): this worker needs its own facade.
 
691
                        facade := apimachiner.NewState(apiConn)
 
692
                        handler := conv2state.New(facade, a)
 
693
                        w, err := watcher.NewNotifyWorker(watcher.NotifyConfig{
 
694
                                Handler: handler,
 
695
                        })
 
696
                        if err != nil {
 
697
                                return nil, errors.Annotate(err, "cannot start controller promoter worker")
 
698
                        }
 
699
                        return w, nil
 
700
                })
 
701
        }
 
702
        return runner, nil
 
703
}
 
704
 
 
705
func (a *MachineAgent) setControllerNetworkConfig(apiConn api.Connection) error {
 
706
        machinerAPI := apimachiner.NewState(apiConn)
 
707
        agentConfig := a.CurrentConfig()
 
708
 
 
709
        tag := agentConfig.Tag().(names.MachineTag)
 
710
        machine, err := machinerAPI.Machine(tag)
 
711
        if errors.IsNotFound(err) || err == nil && machine.Life() == params.Dead {
 
712
                return worker.ErrTerminateAgent
 
713
        }
 
714
        if err != nil {
 
715
                return errors.Annotatef(err, "cannot load machine %s from state", tag)
 
716
        }
 
717
 
 
718
        if err := machine.SetProviderNetworkConfig(); err != nil {
 
719
                return errors.Annotate(err, "cannot set controller provider network config")
 
720
        }
 
721
        return nil
 
722
}
 
723
 
 
724
// Restart restarts the agent's service.
 
725
func (a *MachineAgent) Restart() error {
 
726
        name := a.CurrentConfig().Value(agent.AgentServiceName)
 
727
        return service.Restart(name)
 
728
}
 
729
 
 
730
// openStateForUpgrade exists to be passed into the upgradesteps
 
731
// worker. The upgradesteps worker opens state independently of the
 
732
// state worker so that it isn't affected by the state worker's
 
733
// lifetime. It ensures the MongoDB server is configured and started,
 
734
// and then opens a state connection.
 
735
//
 
736
// TODO(mjs)- review the need for this once the dependency engine is
 
737
// in use. Why can't upgradesteps depend on the main state connection?
 
738
func (a *MachineAgent) openStateForUpgrade() (*state.State, error) {
 
739
        agentConfig := a.CurrentConfig()
 
740
        if err := a.ensureMongoServer(agentConfig); err != nil {
 
741
                return nil, errors.Trace(err)
 
742
        }
 
743
        info, ok := agentConfig.MongoInfo()
 
744
        if !ok {
 
745
                return nil, errors.New("no state info available")
 
746
        }
 
747
        st, err := state.Open(
 
748
                agentConfig.Model(), info, mongo.DefaultDialOpts(),
 
749
                stateenvirons.GetNewPolicyFunc(
 
750
                        stateenvirons.GetNewEnvironFunc(environs.New),
 
751
                ),
 
752
        )
 
753
        if err != nil {
 
754
                return nil, errors.Trace(err)
 
755
        }
 
756
        return st, nil
 
757
}
 
758
 
 
759
// setupContainerSupport determines what containers can be run on this machine and
 
760
// initialises suitable infrastructure to support such containers.
 
761
func (a *MachineAgent) setupContainerSupport(runner worker.Runner, st api.Connection, agentConfig agent.Config) error {
 
762
        var supportedContainers []instance.ContainerType
 
763
        supportsContainers := container.ContainersSupported()
 
764
        if supportsContainers {
 
765
                supportedContainers = append(supportedContainers, instance.LXD)
 
766
        }
 
767
 
 
768
        supportsKvm, err := kvm.IsKVMSupported()
 
769
        if err != nil {
 
770
                logger.Warningf("determining kvm support: %v\nno kvm containers possible", err)
 
771
        }
 
772
        if err == nil && supportsKvm {
 
773
                supportedContainers = append(supportedContainers, instance.KVM)
 
774
        }
 
775
 
 
776
        return a.updateSupportedContainers(runner, st, supportedContainers, agentConfig)
 
777
}
 
778
 
 
779
// updateSupportedContainers records in state that a machine can run the specified containers.
 
780
// It starts a watcher and when a container of a given type is first added to the machine,
 
781
// the watcher is killed, the machine is set up to be able to start containers of the given type,
 
782
// and a suitable provisioner is started.
 
783
func (a *MachineAgent) updateSupportedContainers(
 
784
        runner worker.Runner,
 
785
        st api.Connection,
 
786
        containers []instance.ContainerType,
 
787
        agentConfig agent.Config,
 
788
) error {
 
789
        pr := apiprovisioner.NewState(st)
 
790
        tag := agentConfig.Tag().(names.MachineTag)
 
791
        machine, err := pr.Machine(tag)
 
792
        if errors.IsNotFound(err) || err == nil && machine.Life() == params.Dead {
 
793
                return worker.ErrTerminateAgent
 
794
        }
 
795
        if err != nil {
 
796
                return errors.Annotatef(err, "cannot load machine %s from state", tag)
 
797
        }
 
798
        if len(containers) == 0 {
 
799
                if err := machine.SupportsNoContainers(); err != nil {
 
800
                        return errors.Annotatef(err, "clearing supported containers for %s", tag)
 
801
                }
 
802
                return nil
 
803
        }
 
804
        if err := machine.SetSupportedContainers(containers...); err != nil {
 
805
                return errors.Annotatef(err, "setting supported containers for %s", tag)
 
806
        }
 
807
        // Start the watcher to fire when a container is first requested on the machine.
 
808
        watcherName := fmt.Sprintf("%s-container-watcher", machine.Id())
 
809
        params := provisioner.ContainerSetupParams{
 
810
                Runner:              runner,
 
811
                WorkerName:          watcherName,
 
812
                SupportedContainers: containers,
 
813
                Machine:             machine,
 
814
                Provisioner:         pr,
 
815
                Config:              agentConfig,
 
816
                InitLockName:        agent.MachineLockName,
 
817
        }
 
818
        handler := provisioner.NewContainerSetupHandler(params)
 
819
        a.startWorkerAfterUpgrade(runner, watcherName, func() (worker.Worker, error) {
 
820
                w, err := watcher.NewStringsWorker(watcher.StringsConfig{
 
821
                        Handler: handler,
 
822
                })
 
823
                if err != nil {
 
824
                        return nil, errors.Annotatef(err, "cannot start %s worker", watcherName)
 
825
                }
 
826
                return w, nil
 
827
        })
 
828
        return nil
 
829
}
 
830
 
 
831
func (a *MachineAgent) initState(agentConfig agent.Config) (*state.State, error) {
 
832
        // Start MongoDB server and dial.
 
833
        if err := a.ensureMongoServer(agentConfig); err != nil {
 
834
                return nil, err
 
835
        }
 
836
 
 
837
        st, _, err := openState(agentConfig, stateWorkerDialOpts)
 
838
        if err != nil {
 
839
                return nil, err
 
840
        }
 
841
 
 
842
        reportOpenedState(st)
 
843
 
 
844
        return st, nil
 
845
}
 
846
 
 
847
// startStateWorkers returns a worker running all the workers that
 
848
// require a *state.State connection.
 
849
func (a *MachineAgent) startStateWorkers(st *state.State) (worker.Worker, error) {
 
850
        agentConfig := a.CurrentConfig()
 
851
 
 
852
        m, err := getMachine(st, agentConfig.Tag())
 
853
        if err != nil {
 
854
                return nil, errors.Annotate(err, "machine lookup")
 
855
        }
 
856
 
 
857
        runner := newConnRunner(st)
 
858
        singularRunner, err := newSingularStateRunner(runner, st, m)
 
859
        if err != nil {
 
860
                return nil, errors.Trace(err)
 
861
        }
 
862
 
 
863
        for _, job := range m.Jobs() {
 
864
                switch job {
 
865
                case state.JobHostUnits:
 
866
                        // Implemented elsewhere with workers that use the API.
 
867
                case state.JobManageModel:
 
868
                        useMultipleCPUs()
 
869
                        a.startWorkerAfterUpgrade(runner, "model worker manager", func() (worker.Worker, error) {
 
870
                                w, err := modelworkermanager.New(modelworkermanager.Config{
 
871
                                        Backend:    st,
 
872
                                        NewWorker:  a.startModelWorkers,
 
873
                                        ErrorDelay: worker.RestartDelay,
 
874
                                })
 
875
                                if err != nil {
 
876
                                        return nil, errors.Annotate(err, "cannot start model worker manager")
 
877
                                }
 
878
                                return w, nil
 
879
                        })
 
880
                        a.startWorkerAfterUpgrade(runner, "peergrouper", func() (worker.Worker, error) {
 
881
                                env, err := stateenvirons.GetNewEnvironFunc(environs.New)(st)
 
882
                                if err != nil {
 
883
                                        return nil, errors.Annotate(err, "getting environ from state")
 
884
                                }
 
885
                                supportsSpaces := environs.SupportsSpaces(env)
 
886
                                w, err := peergrouperNew(st, supportsSpaces)
 
887
                                if err != nil {
 
888
                                        return nil, errors.Annotate(err, "cannot start peergrouper worker")
 
889
                                }
 
890
                                return w, nil
 
891
                        })
 
892
                        a.startWorkerAfterUpgrade(runner, "restore", func() (worker.Worker, error) {
 
893
                                w, err := a.newRestoreStateWatcherWorker(st)
 
894
                                if err != nil {
 
895
                                        return nil, errors.Annotate(err, "cannot start backup-restorer worker")
 
896
                                }
 
897
                                return w, nil
 
898
                        })
 
899
                        a.startWorkerAfterUpgrade(runner, "mongoupgrade", func() (worker.Worker, error) {
 
900
                                return newUpgradeMongoWorker(st, a.machineId, a.maybeStopMongo)
 
901
                        })
 
902
 
 
903
                        // certChangedChan is shared by multiple workers it's up
 
904
                        // to the agent to close it rather than any one of the
 
905
                        // workers.  It is possible that multiple cert changes
 
906
                        // come in before the apiserver is up to receive them.
 
907
                        // Specify a bigger buffer to prevent deadlock when
 
908
                        // the apiserver isn't up yet.  Use a size of 10 since we
 
909
                        // allow up to 7 controllers, and might also update the
 
910
                        // addresses of the local machine (127.0.0.1, ::1, etc).
 
911
                        //
 
912
                        // TODO(cherylj/waigani) Remove this workaround when
 
913
                        // certupdater and apiserver can properly manage dependencies
 
914
                        // through the dependency engine.
 
915
                        //
 
916
                        // TODO(ericsnow) For now we simply do not close the channel.
 
917
                        certChangedChan := make(chan params.StateServingInfo, 10)
 
918
                        // Each time apiserver worker is restarted, we need a fresh copy of state due
 
919
                        // to the fact that state holds lease managers which are killed and need to be reset.
 
920
                        stateOpener := func() (*state.State, error) {
 
921
                                logger.Debugf("opening state for apiserver worker")
 
922
                                st, _, err := openState(agentConfig, stateWorkerDialOpts)
 
923
                                return st, err
 
924
                        }
 
925
                        runner.StartWorker("apiserver", a.apiserverWorkerStarter(stateOpener, certChangedChan))
 
926
                        var stateServingSetter certupdater.StateServingInfoSetter = func(info params.StateServingInfo, done <-chan struct{}) error {
 
927
                                return a.ChangeConfig(func(config agent.ConfigSetter) error {
 
928
                                        config.SetStateServingInfo(info)
 
929
                                        logger.Infof("update apiserver worker with new certificate")
 
930
                                        select {
 
931
                                        case certChangedChan <- info:
 
932
                                                return nil
 
933
                                        case <-done:
 
934
                                                return nil
 
935
                                        }
 
936
                                })
 
937
                        }
 
938
                        a.startWorkerAfterUpgrade(runner, "certupdater", func() (worker.Worker, error) {
 
939
                                return newCertificateUpdater(m, agentConfig, st, st, stateServingSetter), nil
 
940
                        })
 
941
 
 
942
                        a.startWorkerAfterUpgrade(singularRunner, "dblogpruner", func() (worker.Worker, error) {
 
943
                                return dblogpruner.New(st, dblogpruner.NewLogPruneParams()), nil
 
944
                        })
 
945
 
 
946
                        a.startWorkerAfterUpgrade(singularRunner, "txnpruner", func() (worker.Worker, error) {
 
947
                                return txnpruner.New(st, time.Hour*2), nil
 
948
                        })
 
949
                default:
 
950
                        return nil, errors.Errorf("unknown job type %q", job)
 
951
                }
 
952
        }
 
953
        return runner, nil
 
954
}
 
955
 
 
956
// startModelWorkers starts the set of workers that run for every model
 
957
// in each controller.
 
958
func (a *MachineAgent) startModelWorkers(uuid string) (worker.Worker, error) {
 
959
        modelAgent, err := model.WrapAgent(a, uuid)
 
960
        if err != nil {
 
961
                return nil, errors.Trace(err)
 
962
        }
 
963
 
 
964
        engine, err := dependency.NewEngine(dependency.EngineConfig{
 
965
                IsFatal:     model.IsFatal,
 
966
                WorstError:  model.WorstError,
 
967
                Filter:      model.IgnoreErrRemoved,
 
968
                ErrorDelay:  3 * time.Second,
 
969
                BounceDelay: 10 * time.Millisecond,
 
970
        })
 
971
        if err != nil {
 
972
                return nil, errors.Trace(err)
 
973
        }
 
974
 
 
975
        manifolds := modelManifolds(model.ManifoldsConfig{
 
976
                Agent:                       modelAgent,
 
977
                AgentConfigChanged:          a.configChangedVal,
 
978
                Clock:                       clock.WallClock,
 
979
                RunFlagDuration:             time.Minute,
 
980
                CharmRevisionUpdateInterval: 24 * time.Hour,
 
981
                InstPollerAggregationDelay:  3 * time.Second,
 
982
                // TODO(perrito666) the status history pruning numbers need
 
983
                // to be adjusting, after collecting user data from large install
 
984
                // bases, to numbers allowing a rich and useful back history.
 
985
                StatusHistoryPrunerMaxHistoryTime: 336 * time.Hour, // 2 weeks
 
986
                StatusHistoryPrunerMaxHistoryMB:   5120,            // 5G
 
987
                StatusHistoryPrunerInterval:       5 * time.Minute,
 
988
                SpacesImportedGate:                a.discoverSpacesComplete,
 
989
                NewEnvironFunc:                    newEnvirons,
 
990
        })
 
991
        if err := dependency.Install(engine, manifolds); err != nil {
 
992
                if err := worker.Stop(engine); err != nil {
 
993
                        logger.Errorf("while stopping engine with bad manifolds: %v", err)
 
994
                }
 
995
                return nil, errors.Trace(err)
 
996
        }
 
997
        return engine, nil
 
998
}
 
999
 
 
1000
// stateWorkerDialOpts is a mongo.DialOpts suitable
 
1001
// for use by StateWorker to dial mongo.
 
1002
//
 
1003
// This must be overridden in tests, as it assumes
 
1004
// journaling is enabled.
 
1005
var stateWorkerDialOpts mongo.DialOpts
 
1006
 
 
1007
func (a *MachineAgent) apiserverWorkerStarter(
 
1008
        stateOpener func() (*state.State, error), certChanged chan params.StateServingInfo,
 
1009
) func() (worker.Worker, error) {
 
1010
        return func() (worker.Worker, error) {
 
1011
                st, err := stateOpener()
 
1012
                if err != nil {
 
1013
                        return nil, errors.Trace(err)
 
1014
                }
 
1015
                return a.newApiserverWorker(st, certChanged)
 
1016
        }
 
1017
}
 
1018
 
 
1019
func (a *MachineAgent) newApiserverWorker(st *state.State, certChanged chan params.StateServingInfo) (worker.Worker, error) {
 
1020
        agentConfig := a.CurrentConfig()
 
1021
        // If the configuration does not have the required information,
 
1022
        // it is currently not a recoverable error, so we kill the whole
 
1023
        // agent, potentially enabling human intervention to fix
 
1024
        // the agent's configuration file.
 
1025
        info, ok := agentConfig.StateServingInfo()
 
1026
        if !ok {
 
1027
                return nil, &cmdutil.FatalError{"StateServingInfo not available and we need it"}
 
1028
        }
 
1029
        cert := []byte(info.Cert)
 
1030
        key := []byte(info.PrivateKey)
 
1031
 
 
1032
        if len(cert) == 0 || len(key) == 0 {
 
1033
                return nil, &cmdutil.FatalError{"configuration does not have controller cert/key"}
 
1034
        }
 
1035
        tag := agentConfig.Tag()
 
1036
        dataDir := agentConfig.DataDir()
 
1037
        logDir := agentConfig.LogDir()
 
1038
 
 
1039
        endpoint := net.JoinHostPort("", strconv.Itoa(info.APIPort))
 
1040
        listener, err := net.Listen("tcp", endpoint)
 
1041
        if err != nil {
 
1042
                return nil, err
 
1043
        }
 
1044
 
 
1045
        // TODO(katco): We should be doing something more serious than
 
1046
        // logging audit errors. Failures in the auditing systems should
 
1047
        // stop the api server until the problem can be corrected.
 
1048
        auditErrorHandler := func(err error) {
 
1049
                logger.Criticalf("%v", err)
 
1050
        }
 
1051
 
 
1052
        controllerConfig, err := st.ControllerConfig()
 
1053
        if err != nil {
 
1054
                return nil, errors.Annotate(err, "cannot fetch the controller config")
 
1055
        }
 
1056
 
 
1057
        server, err := apiserver.NewServer(st, listener, apiserver.ServerConfig{
 
1058
                Cert:        cert,
 
1059
                Key:         key,
 
1060
                Tag:         tag,
 
1061
                DataDir:     dataDir,
 
1062
                LogDir:      logDir,
 
1063
                Validator:   a.limitLogins,
 
1064
                CertChanged: certChanged,
 
1065
                NewObserver: newObserverFn(
 
1066
                        controllerConfig,
 
1067
                        clock.WallClock,
 
1068
                        jujuversion.Current,
 
1069
                        agentConfig.Model().Id(),
 
1070
                        newAuditEntrySink(st, logDir),
 
1071
                        auditErrorHandler,
 
1072
                ),
 
1073
        })
 
1074
        if err != nil {
 
1075
                return nil, errors.Annotate(err, "cannot start api server worker")
 
1076
        }
 
1077
 
 
1078
        return server, nil
 
1079
}
 
1080
 
 
1081
func newAuditEntrySink(st *state.State, logDir string) audit.AuditEntrySinkFn {
 
1082
        persistFn := st.PutAuditEntryFn()
 
1083
        fileSinkFn := audit.NewLogFileSink(logDir)
 
1084
        return func(entry audit.AuditEntry) error {
 
1085
                // We don't care about auditing anything but user actions.
 
1086
                if _, err := names.ParseUserTag(entry.OriginName); err != nil {
 
1087
                        return nil
 
1088
                }
 
1089
                // TODO(wallyworld) - Pinger requests should not originate as a user action.
 
1090
                if strings.HasPrefix(entry.Operation, "Pinger:") {
 
1091
                        return nil
 
1092
                }
 
1093
                persistErr := persistFn(entry)
 
1094
                sinkErr := fileSinkFn(entry)
 
1095
                if persistErr == nil {
 
1096
                        return errors.Annotate(sinkErr, "cannot save audit record to file")
 
1097
                }
 
1098
                if sinkErr == nil {
 
1099
                        return errors.Annotate(persistErr, "cannot save audit record to database")
 
1100
                }
 
1101
                return errors.Annotate(persistErr, "cannot save audit record to file or database")
 
1102
        }
 
1103
}
 
1104
 
 
1105
func newObserverFn(
 
1106
        controllerConfig controller.Config,
 
1107
        clock clock.Clock,
 
1108
        jujuServerVersion version.Number,
 
1109
        modelUUID string,
 
1110
        persistAuditEntry audit.AuditEntrySinkFn,
 
1111
        auditErrorHandler observer.ErrorHandler,
 
1112
) observer.ObserverFactory {
 
1113
 
 
1114
        var observerFactories []observer.ObserverFactory
 
1115
 
 
1116
        // Common logging of RPC requests
 
1117
        observerFactories = append(observerFactories, func() observer.Observer {
 
1118
                logger := loggo.GetLogger("juju.apiserver")
 
1119
                ctx := observer.RequestObserverContext{
 
1120
                        Clock:  clock,
 
1121
                        Logger: logger,
 
1122
                }
 
1123
                return observer.NewRequestObserver(ctx)
 
1124
        })
 
1125
 
 
1126
        // Auditing observer
 
1127
        // TODO(katco): Auditing needs feature tests (lp:1604551)
 
1128
        if controllerConfig.AuditingEnabled() {
 
1129
                observerFactories = append(observerFactories, func() observer.Observer {
 
1130
                        ctx := &observer.AuditContext{
 
1131
                                JujuServerVersion: jujuServerVersion,
 
1132
                                ModelUUID:         modelUUID,
 
1133
                        }
 
1134
                        return observer.NewAudit(ctx, persistAuditEntry, auditErrorHandler)
 
1135
                })
 
1136
        }
 
1137
 
 
1138
        return observer.ObserverFactoryMultiplexer(observerFactories...)
 
1139
 
 
1140
}
 
1141
 
 
1142
// limitLogins is called by the API server for each login attempt.
 
1143
// it returns an error if upgrades or restore are running.
 
1144
func (a *MachineAgent) limitLogins(req params.LoginRequest) error {
 
1145
        if err := a.limitLoginsDuringRestore(req); err != nil {
 
1146
                return err
 
1147
        }
 
1148
        if err := a.limitLoginsDuringUpgrade(req); err != nil {
 
1149
                return err
 
1150
        }
 
1151
        return a.limitLoginsDuringMongoUpgrade(req)
 
1152
}
 
1153
 
 
1154
func (a *MachineAgent) limitLoginsDuringMongoUpgrade(req params.LoginRequest) error {
 
1155
        // If upgrade is running we will not be able to lock AgentConfigWriter
 
1156
        // and it also means we are not upgrading mongo.
 
1157
        if a.isUpgradeRunning() {
 
1158
                return nil
 
1159
        }
 
1160
        cfg := a.AgentConfigWriter.CurrentConfig()
 
1161
        ver := cfg.MongoVersion()
 
1162
        if ver == mongo.MongoUpgrade {
 
1163
                return errors.New("Upgrading Mongo")
 
1164
        }
 
1165
        return nil
 
1166
}
 
1167
 
 
1168
// limitLoginsDuringRestore will only allow logins for restore related purposes
 
1169
// while the different steps of restore are running.
 
1170
func (a *MachineAgent) limitLoginsDuringRestore(req params.LoginRequest) error {
 
1171
        var err error
 
1172
        switch {
 
1173
        case a.IsRestoreRunning():
 
1174
                err = apiserver.RestoreInProgressError
 
1175
        case a.IsRestorePreparing():
 
1176
                err = apiserver.AboutToRestoreError
 
1177
        }
 
1178
        if err != nil {
 
1179
                authTag, parseErr := names.ParseTag(req.AuthTag)
 
1180
                if parseErr != nil {
 
1181
                        return errors.Annotate(err, "could not parse auth tag")
 
1182
                }
 
1183
                switch authTag := authTag.(type) {
 
1184
                case names.UserTag:
 
1185
                        // use a restricted API mode
 
1186
                        return err
 
1187
                case names.MachineTag:
 
1188
                        if authTag == a.Tag() {
 
1189
                                // allow logins from the local machine
 
1190
                                return nil
 
1191
                        }
 
1192
                }
 
1193
                return errors.Errorf("login for %q blocked because restore is in progress", authTag)
 
1194
        }
 
1195
        return nil
 
1196
}
 
1197
 
 
1198
// limitLoginsDuringUpgrade is called by the API server for each login
 
1199
// attempt. It returns an error if upgrades are in progress unless the
 
1200
// login is for a user (i.e. a client) or the local machine.
 
1201
func (a *MachineAgent) limitLoginsDuringUpgrade(req params.LoginRequest) error {
 
1202
        if a.isUpgradeRunning() || a.isInitialUpgradeCheckPending() {
 
1203
                authTag, err := names.ParseTag(req.AuthTag)
 
1204
                if err != nil {
 
1205
                        return errors.Annotate(err, "could not parse auth tag")
 
1206
                }
 
1207
                switch authTag := authTag.(type) {
 
1208
                case names.UserTag:
 
1209
                        // use a restricted API mode
 
1210
                        return params.UpgradeInProgressError
 
1211
                case names.MachineTag:
 
1212
                        if authTag == a.Tag() {
 
1213
                                // allow logins from the local machine
 
1214
                                return nil
 
1215
                        }
 
1216
                }
 
1217
                return errors.Errorf("login for %q blocked because %s", authTag, params.CodeUpgradeInProgress)
 
1218
        } else {
 
1219
                return nil // allow all logins
 
1220
        }
 
1221
}
 
1222
 
 
1223
var stateWorkerServingConfigErr = errors.New("state worker started with no state serving info")
 
1224
 
 
1225
// ensureMongoServer ensures that mongo is installed and running,
 
1226
// and ready for opening a state connection.
 
1227
func (a *MachineAgent) ensureMongoServer(agentConfig agent.Config) (err error) {
 
1228
        a.mongoInitMutex.Lock()
 
1229
        defer a.mongoInitMutex.Unlock()
 
1230
        if a.mongoInitialized {
 
1231
                logger.Debugf("mongo is already initialized")
 
1232
                return nil
 
1233
        }
 
1234
        defer func() {
 
1235
                if err == nil {
 
1236
                        a.mongoInitialized = true
 
1237
                }
 
1238
        }()
 
1239
 
 
1240
        mongoInstalled, err := mongo.IsServiceInstalled()
 
1241
        if err != nil {
 
1242
                return errors.Annotate(err, "error while checking if mongodb service is installed")
 
1243
        }
 
1244
 
 
1245
        if !mongoInstalled {
 
1246
                // EnsureMongoServer installs/upgrades the init config as necessary.
 
1247
                ensureServerParams, err := cmdutil.NewEnsureServerParams(agentConfig)
 
1248
                if err != nil {
 
1249
                        return err
 
1250
                }
 
1251
                if err := cmdutil.EnsureMongoServer(ensureServerParams); err != nil {
 
1252
                        return err
 
1253
                }
 
1254
        }
 
1255
        logger.Debugf("mongodb service is installed")
 
1256
 
 
1257
        // Mongo is installed, record the version.
 
1258
        err = a.ChangeConfig(func(config agent.ConfigSetter) error {
 
1259
                config.SetMongoVersion(mongo.InstalledVersion())
 
1260
                return nil
 
1261
        })
 
1262
        if err != nil {
 
1263
                return errors.Annotate(err, "cannot set mongo version")
 
1264
        }
 
1265
        return nil
 
1266
}
 
1267
 
 
1268
func openState(agentConfig agent.Config, dialOpts mongo.DialOpts) (_ *state.State, _ *state.Machine, err error) {
 
1269
        info, ok := agentConfig.MongoInfo()
 
1270
        if !ok {
 
1271
                return nil, nil, fmt.Errorf("no state info available")
 
1272
        }
 
1273
        st, err := state.Open(agentConfig.Model(), info, dialOpts,
 
1274
                stateenvirons.GetNewPolicyFunc(
 
1275
                        stateenvirons.GetNewEnvironFunc(environs.New),
 
1276
                ),
 
1277
        )
 
1278
        if err != nil {
 
1279
                return nil, nil, err
 
1280
        }
 
1281
        defer func() {
 
1282
                if err != nil {
 
1283
                        st.Close()
 
1284
                }
 
1285
        }()
 
1286
        m0, err := st.FindEntity(agentConfig.Tag())
 
1287
        if err != nil {
 
1288
                if errors.IsNotFound(err) {
 
1289
                        err = worker.ErrTerminateAgent
 
1290
                }
 
1291
                return nil, nil, err
 
1292
        }
 
1293
        m := m0.(*state.Machine)
 
1294
        if m.Life() == state.Dead {
 
1295
                return nil, nil, worker.ErrTerminateAgent
 
1296
        }
 
1297
        // Check the machine nonce as provisioned matches the agent.Conf value.
 
1298
        if !m.CheckProvisioned(agentConfig.Nonce()) {
 
1299
                // The agent is running on a different machine to the one it
 
1300
                // should be according to state. It must stop immediately.
 
1301
                logger.Errorf("running machine %v agent on inappropriate instance", m)
 
1302
                return nil, nil, worker.ErrTerminateAgent
 
1303
        }
 
1304
        return st, m, nil
 
1305
}
 
1306
 
 
1307
func getMachine(st *state.State, tag names.Tag) (*state.Machine, error) {
 
1308
        m0, err := st.FindEntity(tag)
 
1309
        if err != nil {
 
1310
                return nil, err
 
1311
        }
 
1312
        return m0.(*state.Machine), nil
 
1313
}
 
1314
 
 
1315
// startWorkerAfterUpgrade starts a worker to run the specified child worker
 
1316
// but only after waiting for upgrades to complete.
 
1317
func (a *MachineAgent) startWorkerAfterUpgrade(runner worker.Runner, name string, start func() (worker.Worker, error)) {
 
1318
        runner.StartWorker(name, func() (worker.Worker, error) {
 
1319
                return a.upgradeWaiterWorker(name, start), nil
 
1320
        })
 
1321
}
 
1322
 
 
1323
// upgradeWaiterWorker runs the specified worker after upgrades have completed.
 
1324
func (a *MachineAgent) upgradeWaiterWorker(name string, start func() (worker.Worker, error)) worker.Worker {
 
1325
        return worker.NewSimpleWorker(func(stop <-chan struct{}) error {
 
1326
                // Wait for the agent upgrade and upgrade steps to complete (or for us to be stopped).
 
1327
                for _, ch := range []<-chan struct{}{
 
1328
                        a.upgradeComplete.Unlocked(),
 
1329
                        a.initialUpgradeCheckComplete.Unlocked(),
 
1330
                } {
 
1331
                        select {
 
1332
                        case <-stop:
 
1333
                                return nil
 
1334
                        case <-ch:
 
1335
                        }
 
1336
                }
 
1337
                logger.Debugf("upgrades done, starting worker %q", name)
 
1338
 
 
1339
                // Upgrades are done, start the worker.
 
1340
                w, err := start()
 
1341
                if err != nil {
 
1342
                        return err
 
1343
                }
 
1344
                // Wait for worker to finish or for us to be stopped.
 
1345
                done := make(chan error, 1)
 
1346
                go func() {
 
1347
                        done <- w.Wait()
 
1348
                }()
 
1349
                select {
 
1350
                case err := <-done:
 
1351
                        return errors.Annotatef(err, "worker %q exited", name)
 
1352
                case <-stop:
 
1353
                        logger.Debugf("stopping so killing worker %q", name)
 
1354
                        return worker.Stop(w)
 
1355
                }
 
1356
        })
 
1357
}
 
1358
 
 
1359
// WorkersStarted returns a channel that's closed once all top level workers
 
1360
// have been started. This is provided for testing purposes.
 
1361
func (a *MachineAgent) WorkersStarted() <-chan struct{} {
 
1362
        return a.workersStarted
 
1363
}
 
1364
 
 
1365
func (a *MachineAgent) Tag() names.Tag {
 
1366
        return names.NewMachineTag(a.machineId)
 
1367
}
 
1368
 
 
1369
func (a *MachineAgent) createJujudSymlinks(dataDir string) error {
 
1370
        jujud := filepath.Join(tools.ToolsDir(dataDir, a.Tag().String()), jujunames.Jujud)
 
1371
        for _, link := range []string{jujuRun, jujuDumpLogs} {
 
1372
                err := a.createSymlink(jujud, link)
 
1373
                if err != nil {
 
1374
                        return errors.Annotatef(err, "failed to create %s symlink", link)
 
1375
                }
 
1376
        }
 
1377
        return nil
 
1378
}
 
1379
 
 
1380
func (a *MachineAgent) createSymlink(target, link string) error {
 
1381
        fullLink := utils.EnsureBaseDir(a.rootDir, link)
 
1382
 
 
1383
        currentTarget, err := symlink.Read(fullLink)
 
1384
        if err != nil && !os.IsNotExist(err) {
 
1385
                return err
 
1386
        } else if err == nil {
 
1387
                // Link already in place - check it.
 
1388
                if currentTarget == target {
 
1389
                        // Link already points to the right place - nothing to do.
 
1390
                        return nil
 
1391
                }
 
1392
                // Link points to the wrong place - delete it.
 
1393
                if err := os.Remove(fullLink); err != nil {
 
1394
                        return err
 
1395
                }
 
1396
        }
 
1397
 
 
1398
        if err := os.MkdirAll(filepath.Dir(fullLink), os.FileMode(0755)); err != nil {
 
1399
                return err
 
1400
        }
 
1401
        return symlink.New(target, fullLink)
 
1402
}
 
1403
 
 
1404
func (a *MachineAgent) removeJujudSymlinks() (errs []error) {
 
1405
        for _, link := range []string{jujuRun, jujuDumpLogs} {
 
1406
                err := os.Remove(utils.EnsureBaseDir(a.rootDir, link))
 
1407
                if err != nil && !os.IsNotExist(err) {
 
1408
                        errs = append(errs, errors.Annotatef(err, "failed to remove %s symlink", link))
 
1409
                }
 
1410
        }
 
1411
        return
 
1412
}
 
1413
 
 
1414
func (a *MachineAgent) uninstallAgent() error {
 
1415
        // We should only uninstall if the uninstall file is present.
 
1416
        if !agent.CanUninstall(a) {
 
1417
                logger.Infof("ignoring uninstall request")
 
1418
                return nil
 
1419
        }
 
1420
        logger.Infof("uninstalling agent")
 
1421
 
 
1422
        agentConfig := a.CurrentConfig()
 
1423
        var errs []error
 
1424
        agentServiceName := agentConfig.Value(agent.AgentServiceName)
 
1425
        if agentServiceName == "" {
 
1426
                // For backwards compatibility, handle lack of AgentServiceName.
 
1427
                agentServiceName = os.Getenv("UPSTART_JOB")
 
1428
        }
 
1429
 
 
1430
        if agentServiceName != "" {
 
1431
                svc, err := service.DiscoverService(agentServiceName, common.Conf{})
 
1432
                if err != nil {
 
1433
                        errs = append(errs, fmt.Errorf("cannot remove service %q: %v", agentServiceName, err))
 
1434
                } else if err := svc.Remove(); err != nil {
 
1435
                        errs = append(errs, fmt.Errorf("cannot remove service %q: %v", agentServiceName, err))
 
1436
                }
 
1437
        }
 
1438
 
 
1439
        errs = append(errs, a.removeJujudSymlinks()...)
 
1440
 
 
1441
        // TODO(fwereade): surely this shouldn't be happening here? Once we're
 
1442
        // at this point we should expect to be killed in short order; if this
 
1443
        // work is remotely important we should be blocking machine death on
 
1444
        // its completion.
 
1445
        insideContainer := container.RunningInContainer()
 
1446
        if insideContainer {
 
1447
                // We're running inside a container, so loop devices may leak. Detach
 
1448
                // any loop devices that are backed by files on this machine.
 
1449
                if err := a.loopDeviceManager.DetachLoopDevices("/", agentConfig.DataDir()); err != nil {
 
1450
                        errs = append(errs, err)
 
1451
                }
 
1452
        }
 
1453
 
 
1454
        if err := mongo.RemoveService(); err != nil {
 
1455
                errs = append(errs, errors.Annotate(err, "cannot stop/remove mongo service"))
 
1456
        }
 
1457
        if err := os.RemoveAll(agentConfig.DataDir()); err != nil {
 
1458
                errs = append(errs, err)
 
1459
        }
 
1460
        if len(errs) == 0 {
 
1461
                return nil
 
1462
        }
 
1463
        return fmt.Errorf("uninstall failed: %v", errs)
 
1464
}
 
1465
 
 
1466
func newConnRunner(conns ...cmdutil.Pinger) worker.Runner {
 
1467
        return worker.NewRunner(cmdutil.ConnectionIsFatal(logger, conns...), cmdutil.MoreImportant, worker.RestartDelay)
 
1468
}
 
1469
 
 
1470
type MongoSessioner interface {
 
1471
        MongoSession() *mgo.Session
 
1472
}
 
1473
 
 
1474
func newSingularStateRunner(runner worker.Runner, st MongoSessioner, m *state.Machine) (worker.Runner, error) {
 
1475
        singularStateConn := singularStateConn{st.MongoSession(), m}
 
1476
        singularRunner, err := newSingularRunner(runner, singularStateConn)
 
1477
        if err != nil {
 
1478
                return nil, errors.Annotate(err, "cannot make singular State Runner")
 
1479
        }
 
1480
        return singularRunner, err
 
1481
}
 
1482
 
 
1483
// singularStateConn implements singular.Conn on
 
1484
// top of a State connection.
 
1485
type singularStateConn struct {
 
1486
        session *mgo.Session
 
1487
        machine *state.Machine
 
1488
}
 
1489
 
 
1490
func (c singularStateConn) IsMaster() (bool, error) {
 
1491
        return mongo.IsMaster(c.session, c.machine)
 
1492
}
 
1493
 
 
1494
func (c singularStateConn) Ping() error {
 
1495
        return c.session.Ping()
 
1496
}
 
1497
 
 
1498
func metricAPI(st api.Connection) (metricsmanager.MetricsManagerClient, error) {
 
1499
        client, err := metricsmanager.NewClient(st)
 
1500
        if err != nil {
 
1501
                return nil, errors.Trace(err)
 
1502
        }
 
1503
        return client, nil
 
1504
}
 
1505
 
 
1506
// newDeployContext gives the tests the opportunity to create a deployer.Context
 
1507
// that can be used for testing so as to avoid (1) deploying units to the system
 
1508
// running the tests and (2) get access to the *State used internally, so that
 
1509
// tests can be run without waiting for the 5s watcher refresh time to which we would
 
1510
// otherwise be restricted.
 
1511
var newDeployContext = func(st *apideployer.State, agentConfig agent.Config) deployer.Context {
 
1512
        return deployer.NewSimpleContext(agentConfig, st)
 
1513
}