~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/cmd/jujud/agent/util_test.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2012-2016 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package agent
 
5
 
 
6
import (
 
7
        "fmt"
 
8
        "io/ioutil"
 
9
        "path/filepath"
 
10
        "reflect"
 
11
        "sync"
 
12
        "time"
 
13
 
 
14
        "github.com/juju/cmd"
 
15
        jc "github.com/juju/testing/checkers"
 
16
        "github.com/juju/utils/arch"
 
17
        "github.com/juju/utils/series"
 
18
        "github.com/juju/utils/set"
 
19
        "github.com/juju/version"
 
20
        gc "gopkg.in/check.v1"
 
21
        "gopkg.in/juju/charmrepo.v2-unstable"
 
22
        "gopkg.in/juju/names.v2"
 
23
 
 
24
        "github.com/juju/juju/agent"
 
25
        "github.com/juju/juju/api"
 
26
        apideployer "github.com/juju/juju/api/deployer"
 
27
        "github.com/juju/juju/cmd/jujud/agent/agenttest"
 
28
        cmdutil "github.com/juju/juju/cmd/jujud/util"
 
29
        "github.com/juju/juju/environs"
 
30
        "github.com/juju/juju/environs/config"
 
31
        "github.com/juju/juju/instance"
 
32
        jujutesting "github.com/juju/juju/juju/testing"
 
33
        "github.com/juju/juju/mongo/mongotest"
 
34
        "github.com/juju/juju/network"
 
35
        "github.com/juju/juju/provider/dummy"
 
36
        "github.com/juju/juju/service/upstart"
 
37
        "github.com/juju/juju/state"
 
38
        coretesting "github.com/juju/juju/testing"
 
39
        "github.com/juju/juju/tools"
 
40
        jujuversion "github.com/juju/juju/version"
 
41
        "github.com/juju/juju/worker"
 
42
        "github.com/juju/juju/worker/authenticationworker"
 
43
        "github.com/juju/juju/worker/deployer"
 
44
        "github.com/juju/juju/worker/logsender"
 
45
        "github.com/juju/juju/worker/singular"
 
46
)
 
47
 
 
48
const (
 
49
        initialMachinePassword = "machine-password-1234567890"
 
50
        initialUnitPassword    = "unit-password-1234567890"
 
51
        startWorkerWait        = 250 * time.Millisecond
 
52
)
 
53
 
 
54
var fastDialOpts = api.DialOpts{
 
55
        Timeout:    coretesting.LongWait,
 
56
        RetryDelay: coretesting.ShortWait,
 
57
}
 
58
 
 
59
type commonMachineSuite struct {
 
60
        singularRecord  *singularRunnerRecord
 
61
        fakeEnsureMongo *agenttest.FakeEnsureMongo
 
62
        AgentSuite
 
63
}
 
64
 
 
65
func (s *commonMachineSuite) SetUpSuite(c *gc.C) {
 
66
        s.AgentSuite.SetUpSuite(c)
 
67
        s.AgentSuite.PatchValue(&jujuversion.Current, coretesting.FakeVersionNumber)
 
68
        s.AgentSuite.PatchValue(&stateWorkerDialOpts, mongotest.DialOpts())
 
69
}
 
70
 
 
71
func (s *commonMachineSuite) TearDownSuite(c *gc.C) {
 
72
        s.AgentSuite.TearDownSuite(c)
 
73
}
 
74
 
 
75
func (s *commonMachineSuite) SetUpTest(c *gc.C) {
 
76
        s.AgentSuite.SetUpTest(c)
 
77
        s.AgentSuite.PatchValue(&charmrepo.CacheDir, c.MkDir())
 
78
 
 
79
        // Patch ssh user to avoid touching ~ubuntu/.ssh/authorized_keys.
 
80
        s.AgentSuite.PatchValue(&authenticationworker.SSHUser, "")
 
81
 
 
82
        testpath := c.MkDir()
 
83
        s.AgentSuite.PatchEnvPathPrepend(testpath)
 
84
        // mock out the start method so we can fake install services without sudo
 
85
        fakeCmd(filepath.Join(testpath, "start"))
 
86
        fakeCmd(filepath.Join(testpath, "stop"))
 
87
 
 
88
        s.AgentSuite.PatchValue(&upstart.InitDir, c.MkDir())
 
89
 
 
90
        s.singularRecord = newSingularRunnerRecord()
 
91
        s.AgentSuite.PatchValue(&newSingularRunner, s.singularRecord.newSingularRunner)
 
92
        s.AgentSuite.PatchValue(&peergrouperNew, func(*state.State, bool) (worker.Worker, error) {
 
93
                return newDummyWorker(), nil
 
94
        })
 
95
 
 
96
        s.fakeEnsureMongo = agenttest.InstallFakeEnsureMongo(s)
 
97
}
 
98
 
 
99
func (s *commonMachineSuite) assertChannelActive(c *gc.C, aChannel chan struct{}, intent string) {
 
100
        // Wait for channel to be active.
 
101
        select {
 
102
        case <-aChannel:
 
103
        case <-time.After(coretesting.LongWait):
 
104
                c.Fatalf("timeout while waiting for %v", intent)
 
105
        }
 
106
}
 
107
 
 
108
func (s *commonMachineSuite) assertChannelInactive(c *gc.C, aChannel chan struct{}, intent string) {
 
109
        // Now make sure the channel is not active.
 
110
        select {
 
111
        case <-aChannel:
 
112
                c.Fatalf("%v unexpectedly", intent)
 
113
        case <-time.After(startWorkerWait):
 
114
        }
 
115
}
 
116
 
 
117
func fakeCmd(path string) {
 
118
        err := ioutil.WriteFile(path, []byte("#!/bin/bash --norc\nexit 0"), 0755)
 
119
        if err != nil {
 
120
                panic(err)
 
121
        }
 
122
}
 
123
 
 
124
func (s *commonMachineSuite) TearDownTest(c *gc.C) {
 
125
        s.AgentSuite.TearDownTest(c)
 
126
}
 
127
 
 
128
// primeAgent adds a new Machine to run the given jobs, and sets up the
 
129
// machine agent's directory.  It returns the new machine, the
 
130
// agent's configuration and the tools currently running.
 
131
func (s *commonMachineSuite) primeAgent(c *gc.C, jobs ...state.MachineJob) (m *state.Machine, agentConfig agent.ConfigSetterWriter, tools *tools.Tools) {
 
132
        vers := version.Binary{
 
133
                Number: jujuversion.Current,
 
134
                Arch:   arch.HostArch(),
 
135
                Series: series.HostSeries(),
 
136
        }
 
137
        return s.primeAgentVersion(c, vers, jobs...)
 
138
}
 
139
 
 
140
// primeAgentVersion is similar to primeAgent, but permits the
 
141
// caller to specify the version.Binary to prime with.
 
142
func (s *commonMachineSuite) primeAgentVersion(c *gc.C, vers version.Binary, jobs ...state.MachineJob) (m *state.Machine, agentConfig agent.ConfigSetterWriter, tools *tools.Tools) {
 
143
        m, err := s.State.AddMachine("quantal", jobs...)
 
144
        c.Assert(err, jc.ErrorIsNil)
 
145
        return s.primeAgentWithMachine(c, m, vers)
 
146
}
 
147
 
 
148
func (s *commonMachineSuite) primeAgentWithMachine(c *gc.C, m *state.Machine, vers version.Binary) (*state.Machine, agent.ConfigSetterWriter, *tools.Tools) {
 
149
        pinger, err := m.SetAgentPresence()
 
150
        c.Assert(err, jc.ErrorIsNil)
 
151
        s.AddCleanup(func(c *gc.C) {
 
152
                c.Assert(worker.Stop(pinger), jc.ErrorIsNil)
 
153
        })
 
154
        return s.configureMachine(c, m.Id(), vers)
 
155
}
 
156
 
 
157
func (s *commonMachineSuite) configureMachine(c *gc.C, machineId string, vers version.Binary) (
 
158
        machine *state.Machine, agentConfig agent.ConfigSetterWriter, tools *tools.Tools,
 
159
) {
 
160
        m, err := s.State.Machine(machineId)
 
161
        c.Assert(err, jc.ErrorIsNil)
 
162
 
 
163
        // Add a machine and ensure it is provisioned.
 
164
        inst, md := jujutesting.AssertStartInstance(c, s.Environ, s.ControllerConfig.ControllerUUID(), machineId)
 
165
        c.Assert(m.SetProvisioned(inst.Id(), agent.BootstrapNonce, md), jc.ErrorIsNil)
 
166
 
 
167
        // Add an address for the tests in case the initiateMongoServer
 
168
        // codepath is exercised.
 
169
        s.setFakeMachineAddresses(c, m)
 
170
 
 
171
        // Set up the new machine.
 
172
        err = m.SetAgentVersion(vers)
 
173
        c.Assert(err, jc.ErrorIsNil)
 
174
        err = m.SetPassword(initialMachinePassword)
 
175
        c.Assert(err, jc.ErrorIsNil)
 
176
        tag := m.Tag()
 
177
        if m.IsManager() {
 
178
                err = m.SetMongoPassword(initialMachinePassword)
 
179
                c.Assert(err, jc.ErrorIsNil)
 
180
                agentConfig, tools = s.PrimeStateAgentVersion(c, tag, initialMachinePassword, vers)
 
181
                info, ok := agentConfig.StateServingInfo()
 
182
                c.Assert(ok, jc.IsTrue)
 
183
                ssi := cmdutil.ParamsStateServingInfoToStateStateServingInfo(info)
 
184
                err = s.State.SetStateServingInfo(ssi)
 
185
                c.Assert(err, jc.ErrorIsNil)
 
186
        } else {
 
187
                agentConfig, tools = s.PrimeAgentVersion(c, tag, initialMachinePassword, vers)
 
188
        }
 
189
        err = agentConfig.Write()
 
190
        c.Assert(err, jc.ErrorIsNil)
 
191
        return m, agentConfig, tools
 
192
}
 
193
 
 
194
func NewTestMachineAgentFactory(
 
195
        agentConfWriter AgentConfigWriter,
 
196
        bufferedLogs logsender.LogRecordCh,
 
197
        rootDir string,
 
198
) func(string) *MachineAgent {
 
199
        return func(machineId string) *MachineAgent {
 
200
                return NewMachineAgent(
 
201
                        machineId,
 
202
                        agentConfWriter,
 
203
                        bufferedLogs,
 
204
                        worker.NewRunner(cmdutil.IsFatal, cmdutil.MoreImportant, worker.RestartDelay),
 
205
                        &mockLoopDeviceManager{},
 
206
                        rootDir,
 
207
                )
 
208
        }
 
209
}
 
210
 
 
211
// newAgent returns a new MachineAgent instance
 
212
func (s *commonMachineSuite) newAgent(c *gc.C, m *state.Machine) *MachineAgent {
 
213
        agentConf := agentConf{dataDir: s.DataDir()}
 
214
        agentConf.ReadConfig(names.NewMachineTag(m.Id()).String())
 
215
        machineAgentFactory := NewTestMachineAgentFactory(&agentConf, nil, c.MkDir())
 
216
        return machineAgentFactory(m.Id())
 
217
}
 
218
 
 
219
func patchDeployContext(c *gc.C, st *state.State) (*fakeContext, func()) {
 
220
        ctx := &fakeContext{
 
221
                inited:   newSignal(),
 
222
                deployed: make(set.Strings),
 
223
        }
 
224
        orig := newDeployContext
 
225
        newDeployContext = func(dst *apideployer.State, agentConfig agent.Config) deployer.Context {
 
226
                ctx.st = st
 
227
                ctx.agentConfig = agentConfig
 
228
                ctx.inited.trigger()
 
229
                return ctx
 
230
        }
 
231
        return ctx, func() { newDeployContext = orig }
 
232
}
 
233
 
 
234
func (s *commonMachineSuite) setFakeMachineAddresses(c *gc.C, machine *state.Machine) {
 
235
        addrs := network.NewAddresses("0.1.2.3")
 
236
        err := machine.SetProviderAddresses(addrs...)
 
237
        c.Assert(err, jc.ErrorIsNil)
 
238
        // Set the addresses in the environ instance as well so that if the instance poller
 
239
        // runs it won't overwrite them.
 
240
        instId, err := machine.InstanceId()
 
241
        c.Assert(err, jc.ErrorIsNil)
 
242
        insts, err := s.Environ.Instances([]instance.Id{instId})
 
243
        c.Assert(err, jc.ErrorIsNil)
 
244
        dummy.SetInstanceAddresses(insts[0], addrs)
 
245
}
 
246
 
 
247
// opRecvTimeout waits for any of the given kinds of operation to
 
248
// be received from ops, and times out if not.
 
249
func opRecvTimeout(c *gc.C, st *state.State, opc <-chan dummy.Operation, kinds ...dummy.Operation) dummy.Operation {
 
250
        st.StartSync()
 
251
        timeout := time.After(coretesting.LongWait)
 
252
        for {
 
253
                select {
 
254
                case op := <-opc:
 
255
                        for _, k := range kinds {
 
256
                                if reflect.TypeOf(op) == reflect.TypeOf(k) {
 
257
                                        return op
 
258
                                }
 
259
                        }
 
260
                        c.Logf("discarding unknown event %#v", op)
 
261
                case <-time.After(coretesting.ShortWait):
 
262
                        st.StartSync()
 
263
                case <-timeout:
 
264
                        c.Fatalf("time out wating for operation")
 
265
                }
 
266
        }
 
267
}
 
268
 
 
269
type mockAgentConfig struct {
 
270
        agent.Config
 
271
        providerType string
 
272
        tag          names.Tag
 
273
}
 
274
 
 
275
func (m *mockAgentConfig) Tag() names.Tag {
 
276
        return m.tag
 
277
}
 
278
 
 
279
func (m *mockAgentConfig) Value(key string) string {
 
280
        if key == agent.ProviderType {
 
281
                return m.providerType
 
282
        }
 
283
        return ""
 
284
}
 
285
 
 
286
type singularRunnerRecord struct {
 
287
        runnerC chan *fakeSingularRunner
 
288
}
 
289
 
 
290
func newSingularRunnerRecord() *singularRunnerRecord {
 
291
        return &singularRunnerRecord{
 
292
                runnerC: make(chan *fakeSingularRunner, 64),
 
293
        }
 
294
}
 
295
 
 
296
func (r *singularRunnerRecord) newSingularRunner(runner worker.Runner, conn singular.Conn) (worker.Runner, error) {
 
297
        sr, err := singular.New(runner, conn)
 
298
        if err != nil {
 
299
                return nil, err
 
300
        }
 
301
        fakeRunner := &fakeSingularRunner{
 
302
                Runner: sr,
 
303
                startC: make(chan string, 64),
 
304
        }
 
305
        r.runnerC <- fakeRunner
 
306
        return fakeRunner, nil
 
307
}
 
308
 
 
309
// nextRunner blocks until a new singular runner is created.
 
310
func (r *singularRunnerRecord) nextRunner(c *gc.C) *fakeSingularRunner {
 
311
        timeout := time.After(coretesting.LongWait)
 
312
        for {
 
313
                select {
 
314
                case r := <-r.runnerC:
 
315
                        return r
 
316
                case <-timeout:
 
317
                        c.Fatal("timed out waiting for singular runner to be created")
 
318
                }
 
319
        }
 
320
}
 
321
 
 
322
type fakeSingularRunner struct {
 
323
        worker.Runner
 
324
        startC chan string
 
325
}
 
326
 
 
327
func (r *fakeSingularRunner) StartWorker(name string, start func() (worker.Worker, error)) error {
 
328
        logger.Infof("starting fake worker %q", name)
 
329
        r.startC <- name
 
330
        return r.Runner.StartWorker(name, start)
 
331
}
 
332
 
 
333
// waitForWorker waits for a given worker to be started, returning all
 
334
// workers started while waiting.
 
335
func (r *fakeSingularRunner) waitForWorker(c *gc.C, target string) []string {
 
336
        var seen []string
 
337
        timeout := time.After(coretesting.LongWait)
 
338
        for {
 
339
                select {
 
340
                case <-time.After(coretesting.ShortWait):
 
341
                        c.Logf("still waiting for %q; workers seen so far: %+v", target, seen)
 
342
                case workerName := <-r.startC:
 
343
                        seen = append(seen, workerName)
 
344
                        if workerName == target {
 
345
                                c.Logf("target worker %q started; workers seen so far: %+v", workerName, seen)
 
346
                                return seen
 
347
                        }
 
348
                        c.Logf("worker %q started; still waiting for %q; workers seen so far: %+v", workerName, target, seen)
 
349
                case <-timeout:
 
350
                        c.Fatal("timed out waiting for " + target)
 
351
                }
 
352
        }
 
353
}
 
354
 
 
355
// waitForWorkers waits for a given worker to be started, returning all
 
356
// workers started while waiting.
 
357
func (r *fakeSingularRunner) waitForWorkers(c *gc.C, targets []string) []string {
 
358
        var seen []string
 
359
        seenTargets := make(map[string]bool)
 
360
        numSeenTargets := 0
 
361
        timeout := time.After(coretesting.LongWait)
 
362
        for {
 
363
                select {
 
364
                case workerName := <-r.startC:
 
365
                        c.Logf("worker %q started; workers seen so far: %+v (len: %d, len(targets): %d)", workerName, seen, len(seen), len(targets))
 
366
                        if seenTargets[workerName] == true {
 
367
                                c.Fatal("worker started twice: " + workerName)
 
368
                        }
 
369
                        seenTargets[workerName] = true
 
370
                        numSeenTargets++
 
371
                        seen = append(seen, workerName)
 
372
                        if numSeenTargets == len(targets) {
 
373
                                c.Logf("all expected target workers started: %+v", seen)
 
374
                                return seen
 
375
                        }
 
376
                        c.Logf("still waiting for workers %+v to start; numSeenTargets=%d", targets, numSeenTargets)
 
377
                case <-timeout:
 
378
                        c.Fatalf("timed out waiting for %v", targets)
 
379
                }
 
380
        }
 
381
}
 
382
 
 
383
type mockMetricAPI struct {
 
384
        stop          chan struct{}
 
385
        cleanUpCalled chan struct{}
 
386
        sendCalled    chan struct{}
 
387
}
 
388
 
 
389
func newMockMetricAPI() *mockMetricAPI {
 
390
        return &mockMetricAPI{
 
391
                stop:          make(chan struct{}),
 
392
                cleanUpCalled: make(chan struct{}),
 
393
                sendCalled:    make(chan struct{}),
 
394
        }
 
395
}
 
396
 
 
397
func (m *mockMetricAPI) CleanupOldMetrics() error {
 
398
        go func() {
 
399
                select {
 
400
                case m.cleanUpCalled <- struct{}{}:
 
401
                case <-m.stop:
 
402
                        break
 
403
                }
 
404
        }()
 
405
        return nil
 
406
}
 
407
 
 
408
func (m *mockMetricAPI) SendMetrics() error {
 
409
        go func() {
 
410
                select {
 
411
                case m.sendCalled <- struct{}{}:
 
412
                case <-m.stop:
 
413
                        break
 
414
                }
 
415
        }()
 
416
        return nil
 
417
}
 
418
 
 
419
func (m *mockMetricAPI) SendCalled() <-chan struct{} {
 
420
        return m.sendCalled
 
421
}
 
422
 
 
423
func (m *mockMetricAPI) CleanupCalled() <-chan struct{} {
 
424
        return m.cleanUpCalled
 
425
}
 
426
 
 
427
func (m *mockMetricAPI) Stop() {
 
428
        close(m.stop)
 
429
}
 
430
 
 
431
type mockLoopDeviceManager struct {
 
432
        detachLoopDevicesArgRootfs string
 
433
        detachLoopDevicesArgPrefix string
 
434
}
 
435
 
 
436
func (m *mockLoopDeviceManager) DetachLoopDevices(rootfs, prefix string) error {
 
437
        m.detachLoopDevicesArgRootfs = rootfs
 
438
        m.detachLoopDevicesArgPrefix = prefix
 
439
        return nil
 
440
}
 
441
 
 
442
func newSignal() *signal {
 
443
        return &signal{ch: make(chan struct{})}
 
444
}
 
445
 
 
446
type signal struct {
 
447
        mu sync.Mutex
 
448
        ch chan struct{}
 
449
}
 
450
 
 
451
func (s *signal) triggered() <-chan struct{} {
 
452
        return s.ch
 
453
}
 
454
 
 
455
func (s *signal) assertTriggered(c *gc.C, thing string) {
 
456
        select {
 
457
        case <-s.triggered():
 
458
        case <-time.After(coretesting.LongWait):
 
459
                c.Fatalf("timed out waiting for " + thing)
 
460
        }
 
461
}
 
462
 
 
463
func (s *signal) assertNotTriggered(c *gc.C, wait time.Duration, thing string) {
 
464
        select {
 
465
        case <-s.triggered():
 
466
                c.Fatalf("%v unexpectedly", thing)
 
467
        case <-time.After(wait):
 
468
        }
 
469
}
 
470
 
 
471
func (s *signal) trigger() {
 
472
        s.mu.Lock()
 
473
        defer s.mu.Unlock()
 
474
 
 
475
        select {
 
476
        case <-s.ch:
 
477
                // Already closed.
 
478
        default:
 
479
                close(s.ch)
 
480
        }
 
481
}
 
482
 
 
483
type runner interface {
 
484
        Run(*cmd.Context) error
 
485
        Stop() error
 
486
}
 
487
 
 
488
// runWithTimeout runs an agent and waits
 
489
// for it to complete within a reasonable time.
 
490
func runWithTimeout(r runner) error {
 
491
        done := make(chan error)
 
492
        go func() {
 
493
                done <- r.Run(nil)
 
494
        }()
 
495
        select {
 
496
        case err := <-done:
 
497
                return err
 
498
        case <-time.After(coretesting.LongWait):
 
499
        }
 
500
        err := r.Stop()
 
501
        return fmt.Errorf("timed out waiting for agent to finish; stop error: %v", err)
 
502
}
 
503
 
 
504
func newDummyWorker() worker.Worker {
 
505
        return worker.NewSimpleWorker(func(stop <-chan struct{}) error {
 
506
                <-stop
 
507
                return nil
 
508
        })
 
509
}
 
510
 
 
511
type FakeConfig struct {
 
512
        agent.ConfigSetter
 
513
}
 
514
 
 
515
func (FakeConfig) LogDir() string {
 
516
        return filepath.FromSlash("/var/log/juju/")
 
517
}
 
518
 
 
519
func (FakeConfig) Tag() names.Tag {
 
520
        return names.NewMachineTag("42")
 
521
}
 
522
 
 
523
type FakeAgentConfig struct {
 
524
        AgentConf
 
525
}
 
526
 
 
527
func (FakeAgentConfig) ReadConfig(string) error { return nil }
 
528
 
 
529
func (FakeAgentConfig) CurrentConfig() agent.Config {
 
530
        return FakeConfig{}
 
531
}
 
532
 
 
533
func (FakeAgentConfig) ChangeConfig(mutate agent.ConfigMutator) error {
 
534
        return mutate(FakeConfig{})
 
535
}
 
536
 
 
537
func (FakeAgentConfig) CheckArgs([]string) error { return nil }
 
538
 
 
539
// minModelWorkersEnviron implements just enough of environs.Environ
 
540
// to allow model workers to run.
 
541
type minModelWorkersEnviron struct {
 
542
        environs.Environ
 
543
}
 
544
 
 
545
func (e *minModelWorkersEnviron) Config() *config.Config {
 
546
        attrs := coretesting.FakeConfig()
 
547
        cfg, err := config.New(config.NoDefaults, attrs)
 
548
        if err != nil {
 
549
                panic(err)
 
550
        }
 
551
        return cfg
 
552
}
 
553
 
 
554
func (e *minModelWorkersEnviron) SetConfig(*config.Config) error {
 
555
        return nil
 
556
}
 
557
 
 
558
func (e *minModelWorkersEnviron) AllInstances() ([]instance.Instance, error) {
 
559
        return nil, nil
 
560
}