1
// Copyright 2014 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
11
"github.com/juju/replicaset"
12
jc "github.com/juju/testing/checkers"
13
"github.com/juju/utils/voyeur"
14
gc "gopkg.in/check.v1"
16
"github.com/juju/juju/instance"
17
"github.com/juju/juju/network"
18
"github.com/juju/juju/state"
19
coretesting "github.com/juju/juju/testing"
20
"github.com/juju/juju/worker/workertest"
23
type TestIPVersion struct {
27
machineFormatHost string
31
addressType network.AddressType
35
testIPv4 = TestIPVersion{
37
formatHostPort: "0.1.2.%d:%d",
38
formatHost: "0.1.2.%d",
39
machineFormatHost: "0.1.2.%d",
40
extraHostPort: "0.1.99.99:9876",
41
extraHost: "0.1.99.13",
42
extraAddress: "0.1.99.13:1234",
43
addressType: network.IPv4Address,
45
testIPv6 = TestIPVersion{
47
formatHostPort: "[2001:DB8::%d]:%d",
48
formatHost: "[2001:DB8::%d]",
49
machineFormatHost: "2001:DB8::%d",
50
extraHostPort: "[2001:DB8::99:99]:9876",
51
extraHost: "2001:DB8::99:13",
52
extraAddress: "[2001:DB8::99:13]:1234",
53
addressType: network.IPv6Address,
57
// DoTestForIPv4AndIPv6 runs the passed test for IPv4 and IPv6.
58
func DoTestForIPv4AndIPv6(t func(ipVersion TestIPVersion)) {
63
type workerSuite struct {
67
var _ = gc.Suite(&workerSuite{})
69
func (s *workerSuite) SetUpTest(c *gc.C) {
70
s.BaseSuite.SetUpTest(c)
73
// InitState initializes the fake state with a single
74
// replicaset member and numMachines machines
76
func InitState(c *gc.C, st *fakeState, numMachines int, ipVersion TestIPVersion) {
78
for i := 10; i < 10+numMachines; i++ {
80
m := st.addMachine(id, true)
81
m.setInstanceId(instance.Id("id-" + id))
82
m.setStateHostPort(fmt.Sprintf(ipVersion.formatHostPort, i, mongoPort))
84
c.Assert(m.MongoHostPorts(), gc.HasLen, 1)
86
m.setAPIHostPorts(network.NewHostPorts(
87
apiPort, fmt.Sprintf(ipVersion.formatHost, i),
90
st.machine("10").SetHasVote(true)
91
st.setControllers(ids...)
92
st.session.Set(mkMembers("0v", ipVersion))
93
st.session.setStatus(mkStatuses("0p", ipVersion))
94
st.check = checkInvariants
97
// ExpectedAPIHostPorts returns the expected addresses
98
// of the machines as created by InitState.
99
func ExpectedAPIHostPorts(n int, ipVersion TestIPVersion) [][]network.HostPort {
100
servers := make([][]network.HostPort, n)
101
for i := range servers {
102
servers[i] = network.NewHostPorts(
104
fmt.Sprintf(ipVersion.formatHost, i+10),
110
func (s *workerSuite) TestSetsAndUpdatesMembers(c *gc.C) {
111
DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
112
s.PatchValue(&pollInterval, 5*time.Millisecond)
115
InitState(c, st, 3, ipVersion)
117
memberWatcher := st.session.members.Watch()
118
mustNext(c, memberWatcher)
119
assertMembers(c, memberWatcher.Value(), mkMembers("0v", ipVersion))
121
logger.Infof("starting worker")
122
w, err := newWorker(st, noPublisher{}, false)
123
c.Assert(err, jc.ErrorIsNil)
124
defer workertest.CleanKill(c, w)
126
// Wait for the worker to set the initial members.
127
mustNext(c, memberWatcher)
128
assertMembers(c, memberWatcher.Value(), mkMembers("0v 1 2", ipVersion))
130
// Update the status of the new members
131
// and check that they become voting.
132
c.Logf("updating new member status")
133
st.session.setStatus(mkStatuses("0p 1s 2s", ipVersion))
134
mustNext(c, memberWatcher)
135
assertMembers(c, memberWatcher.Value(), mkMembers("0v 1v 2v", ipVersion))
137
c.Logf("adding another machine")
138
// Add another machine.
139
m13 := st.addMachine("13", false)
140
m13.setStateHostPort(fmt.Sprintf(ipVersion.formatHostPort, 13, mongoPort))
141
st.setControllers("10", "11", "12", "13")
143
c.Logf("waiting for new member to be added")
144
mustNext(c, memberWatcher)
145
assertMembers(c, memberWatcher.Value(), mkMembers("0v 1v 2v 3", ipVersion))
147
// Remove vote from an existing member;
148
// and give it to the new machine.
149
// Also set the status of the new machine to
151
c.Logf("removing vote from machine 10 and adding it to machine 13")
152
st.machine("10").setWantsVote(false)
153
st.machine("13").setWantsVote(true)
155
st.session.setStatus(mkStatuses("0p 1s 2s 3s", ipVersion))
157
// Check that the new machine gets the vote and the
158
// old machine loses it.
159
c.Logf("waiting for vote switch")
160
mustNext(c, memberWatcher)
161
assertMembers(c, memberWatcher.Value(), mkMembers("0 1v 2v 3v", ipVersion))
163
c.Logf("removing old machine")
164
// Remove the old machine.
165
st.removeMachine("10")
166
st.setControllers("11", "12", "13")
168
// Check that it's removed from the members.
169
c.Logf("waiting for removal")
170
mustNext(c, memberWatcher)
171
assertMembers(c, memberWatcher.Value(), mkMembers("1v 2v 3v", ipVersion))
175
func (s *workerSuite) TestHasVoteMaintainedEvenWhenReplicaSetFails(c *gc.C) {
176
DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
179
// Simulate a state where we have four controllers,
180
// one has gone down, and we're replacing it:
181
// 0 - hasvote true, wantsvote false, down
182
// 1 - hasvote true, wantsvote true
183
// 2 - hasvote true, wantsvote true
184
// 3 - hasvote false, wantsvote true
186
// When it starts, the worker should move the vote from
187
// 0 to 3. We'll arrange things so that it will succeed in
188
// setting the membership but fail setting the HasVote
190
InitState(c, st, 4, ipVersion)
191
st.machine("10").SetHasVote(true)
192
st.machine("11").SetHasVote(true)
193
st.machine("12").SetHasVote(true)
194
st.machine("13").SetHasVote(false)
196
st.machine("10").setWantsVote(false)
197
st.machine("11").setWantsVote(true)
198
st.machine("12").setWantsVote(true)
199
st.machine("13").setWantsVote(true)
201
st.session.Set(mkMembers("0v 1v 2v 3", ipVersion))
202
st.session.setStatus(mkStatuses("0H 1p 2s 3s", ipVersion))
204
// Make the worker fail to set HasVote to false
205
// after changing the replica set membership.
206
st.errors.setErrorFor("Machine.SetHasVote * false", errors.New("frood"))
208
memberWatcher := st.session.members.Watch()
209
mustNext(c, memberWatcher)
210
assertMembers(c, memberWatcher.Value(), mkMembers("0v 1v 2v 3", ipVersion))
212
w, err := newWorker(st, noPublisher{}, false)
213
c.Assert(err, jc.ErrorIsNil)
214
done := make(chan error)
219
// Wait for the worker to set the initial members.
220
mustNext(c, memberWatcher)
221
assertMembers(c, memberWatcher.Value(), mkMembers("0 1v 2v 3v", ipVersion))
223
// The worker should encounter an error setting the
224
// has-vote status to false and exit.
227
c.Assert(err, gc.ErrorMatches, `cannot set HasVote removed: cannot set voting status of "[0-9]+" to false: frood`)
228
case <-time.After(coretesting.LongWait):
229
c.Fatalf("timed out waiting for worker to exit")
232
// Start the worker again - although the membership should
233
// not change, the HasVote status should be updated correctly.
234
st.errors.resetErrors()
235
w, err = newWorker(st, noPublisher{}, false)
236
c.Assert(err, jc.ErrorIsNil)
237
defer workertest.CleanKill(c, w)
239
// Watch all the machines for changes, so we can check
240
// their has-vote status without polling.
241
changed := make(chan struct{}, 1)
242
for i := 10; i < 14; i++ {
243
watcher := st.machine(fmt.Sprint(i)).val.Watch()
244
defer watcher.Close()
248
case changed <- struct{}{}:
254
timeout := time.After(coretesting.LongWait)
260
for i := 10; i < 14; i++ {
261
hasVote := st.machine(fmt.Sprint(i)).HasVote()
262
expectHasVote := i != 10
263
if hasVote != expectHasVote {
271
c.Fatalf("timed out waiting for vote to be set")
277
func (s *workerSuite) TestAddressChange(c *gc.C) {
278
DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
280
InitState(c, st, 3, ipVersion)
282
memberWatcher := st.session.members.Watch()
283
mustNext(c, memberWatcher)
284
assertMembers(c, memberWatcher.Value(), mkMembers("0v", ipVersion))
286
logger.Infof("starting worker")
287
w, err := newWorker(st, noPublisher{}, false)
288
c.Assert(err, jc.ErrorIsNil)
289
defer workertest.CleanKill(c, w)
291
// Wait for the worker to set the initial members.
292
mustNext(c, memberWatcher)
293
assertMembers(c, memberWatcher.Value(), mkMembers("0v 1 2", ipVersion))
295
// Change an address and wait for it to be changed in the
297
st.machine("11").setStateHostPort(ipVersion.extraHostPort)
299
mustNext(c, memberWatcher)
300
expectMembers := mkMembers("0v 1 2", ipVersion)
301
expectMembers[1].Address = ipVersion.extraHostPort
302
assertMembers(c, memberWatcher.Value(), expectMembers)
306
var fatalErrorsTests = []struct {
311
errPattern: "State.ControllerInfo",
312
expectErr: "cannot get controller info: sample",
314
errPattern: "Machine.SetHasVote 11 true",
315
expectErr: `cannot set HasVote added: cannot set voting status of "11" to true: sample`,
317
errPattern: "Session.CurrentStatus",
318
expectErr: "cannot get peergrouper info: cannot get replica set status: sample",
320
errPattern: "Session.CurrentMembers",
321
expectErr: "cannot get peergrouper info: cannot get replica set members: sample",
323
errPattern: "State.Machine *",
324
expectErr: `cannot get machine "10": sample`,
326
errPattern: "Machine.InstanceId *",
327
expectErr: `cannot get API server info: sample`,
330
func (s *workerSuite) TestFatalErrors(c *gc.C) {
331
DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
332
s.PatchValue(&pollInterval, 5*time.Millisecond)
333
for i, testCase := range fatalErrorsTests {
334
c.Logf("test %d: %s -> %s", i, testCase.errPattern, testCase.expectErr)
336
st.session.InstantlyReady = true
337
InitState(c, st, 3, ipVersion)
338
st.errors.setErrorFor(testCase.errPattern, errors.New("sample"))
339
w, err := newWorker(st, noPublisher{}, false)
340
c.Assert(err, jc.ErrorIsNil)
341
done := make(chan error)
347
c.Assert(err, gc.ErrorMatches, testCase.expectErr)
348
case <-time.After(coretesting.LongWait):
349
c.Fatalf("timed out waiting for error")
355
func (s *workerSuite) TestSetMembersErrorIsNotFatal(c *gc.C) {
356
coretesting.SkipIfI386(c, "lp:1425569")
358
DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
360
InitState(c, st, 3, ipVersion)
361
st.session.setStatus(mkStatuses("0p 1s 2s", ipVersion))
362
var setCount voyeur.Value
363
st.errors.setErrorFuncFor("Session.Set", func() error {
365
return errors.New("sample")
367
s.PatchValue(&initialRetryInterval, 10*time.Microsecond)
368
s.PatchValue(&maxRetryInterval, coretesting.ShortWait/4)
370
w, err := newWorker(st, noPublisher{}, false)
371
c.Assert(err, jc.ErrorIsNil)
372
defer workertest.CleanKill(c, w)
374
// See that the worker is retrying.
375
setCountW := setCount.Watch()
376
mustNext(c, setCountW)
377
mustNext(c, setCountW)
378
mustNext(c, setCountW)
382
type PublisherFunc func(apiServers [][]network.HostPort, instanceIds []instance.Id) error
384
func (f PublisherFunc) publishAPIServers(apiServers [][]network.HostPort, instanceIds []instance.Id) error {
385
return f(apiServers, instanceIds)
388
func (s *workerSuite) TestControllersArePublished(c *gc.C) {
389
DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
390
publishCh := make(chan [][]network.HostPort)
391
publish := func(apiServers [][]network.HostPort, instanceIds []instance.Id) error {
392
publishCh <- apiServers
397
InitState(c, st, 3, ipVersion)
398
w, err := newWorker(st, PublisherFunc(publish), false)
399
c.Assert(err, jc.ErrorIsNil)
400
defer workertest.CleanKill(c, w)
403
case servers := <-publishCh:
404
AssertAPIHostPorts(c, servers, ExpectedAPIHostPorts(3, ipVersion))
405
case <-time.After(coretesting.LongWait):
406
c.Fatalf("timed out waiting for publish")
409
// Change one of the servers' API addresses and check that it's published.
410
var newMachine10APIHostPorts []network.HostPort
411
newMachine10APIHostPorts = network.NewHostPorts(apiPort, ipVersion.extraHost)
412
st.machine("10").setAPIHostPorts(newMachine10APIHostPorts)
414
case servers := <-publishCh:
415
expected := ExpectedAPIHostPorts(3, ipVersion)
416
expected[0] = newMachine10APIHostPorts
417
AssertAPIHostPorts(c, servers, expected)
418
case <-time.After(coretesting.LongWait):
419
c.Fatalf("timed out waiting for publish")
424
func hostPortInSpace(address, spaceName string) network.HostPort {
425
netAddress := network.Address{
427
Type: network.IPv4Address,
428
Scope: network.ScopeUnknown,
429
SpaceName: network.SpaceName(spaceName),
431
return network.HostPort{
437
func mongoSpaceTestCommonSetup(c *gc.C, ipVersion TestIPVersion, noSpaces bool) (*fakeState, []string, []network.HostPort) {
439
InitState(c, st, 3, ipVersion)
440
var hostPorts []network.HostPort
443
hostPorts = []network.HostPort{
444
hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 1), ""),
445
hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 2), ""),
446
hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 3), ""),
449
hostPorts = []network.HostPort{
450
hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 1), "one"),
451
hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 2), "two"),
452
hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 3), "three"),
456
machines := []string{"10", "11", "12"}
457
for _, machine := range machines {
458
st.machine(machine).SetHasVote(true)
459
st.machine(machine).setWantsVote(true)
462
st.session.Set(mkMembers("0v 1v 2v", ipVersion))
464
return st, machines, hostPorts
467
func startWorkerSupportingSpaces(c *gc.C, st *fakeState, ipVersion TestIPVersion) *pgWorker {
468
w, err := newWorker(st, noPublisher{}, true)
469
c.Assert(err, jc.ErrorIsNil)
473
func runWorkerUntilMongoStateIs(c *gc.C, st *fakeState, w *pgWorker, mss state.MongoSpaceStates) {
474
changes := st.controllers.Watch()
476
for st.getMongoSpaceState() != mss {
479
workertest.CleanKill(c, w)
482
func (s *workerSuite) TestMongoFindAndUseSpace(c *gc.C) {
483
DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
484
st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, false)
486
for i, machine := range machines {
487
// machine 10 gets a host port in space one
488
// machine 11 gets host ports in spaces one and two
489
// machine 12 gets host ports in spaces one, two and three
490
st.machine(machine).setMongoHostPorts(hostPorts[0 : i+1])
493
w := startWorkerSupportingSpaces(c, st, ipVersion)
494
runWorkerUntilMongoStateIs(c, st, w, state.MongoSpaceValid)
496
// Only space one has all three servers in it
497
c.Assert(st.getMongoSpaceName(), gc.Equals, "one")
499
// All machines have the same address in this test for simplicity. The
500
// space three address is 0.0.0.3 giving us the host port of 0.0.0.3:4711
501
members := st.session.members.Get().([]replicaset.Member)
502
c.Assert(members, gc.HasLen, 3)
503
for i := 0; i < 3; i++ {
504
c.Assert(members[i].Address, gc.Equals, fmt.Sprintf(ipVersion.formatHostPort, 1, 4711))
509
func (s *workerSuite) TestMongoErrorNoCommonSpace(c *gc.C) {
510
c.Skip("dimitern: test disabled as it needs refactoring")
511
DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
512
st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, false)
514
for i, machine := range machines {
515
// machine 10 gets a host port in space one
516
// machine 11 gets a host port in space two
517
// machine 12 gets a host port in space three
518
st.machine(machine).setMongoHostPorts(hostPorts[i : i+1])
521
w := startWorkerSupportingSpaces(c, st, ipVersion)
522
done := make(chan error)
528
c.Assert(err, gc.ErrorMatches, ".*couldn't find a space containing all peer group machines")
529
case <-time.After(coretesting.LongWait):
530
c.Fatalf("timed out waiting for worker to exit")
533
// Each machine is in a unique space, so the Mongo space should be empty
534
c.Assert(st.getMongoSpaceName(), gc.Equals, "")
535
c.Assert(st.getMongoSpaceState(), gc.Equals, state.MongoSpaceInvalid)
539
func (s *workerSuite) TestMongoNoSpaces(c *gc.C) {
540
DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
541
st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, true)
543
for i, machine := range machines {
544
st.machine(machine).setMongoHostPorts(hostPorts[i : i+1])
547
w := startWorkerSupportingSpaces(c, st, ipVersion)
548
runWorkerUntilMongoStateIs(c, st, w, state.MongoSpaceValid)
550
// Only space one has all three servers in it
551
c.Assert(st.getMongoSpaceName(), gc.Equals, "")
555
func (s *workerSuite) TestMongoSpaceNotOverwritten(c *gc.C) {
556
DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
557
st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, false)
559
for i, machine := range machines {
560
// machine 10 gets a host port in space one
561
// machine 11 gets host ports in spaces one and two
562
// machine 12 gets host ports in spaces one, two and three
563
st.machine(machine).setMongoHostPorts(hostPorts[0 : i+1])
566
w := startWorkerSupportingSpaces(c, st, ipVersion)
567
runWorkerUntilMongoStateIs(c, st, w, state.MongoSpaceValid)
569
// Only space one has all three servers in it
570
c.Assert(st.getMongoSpaceName(), gc.Equals, "one")
572
// Set st.mongoSpaceName to something different
574
st.SetMongoSpaceState(state.MongoSpaceUnknown)
575
st.SetOrGetMongoSpaceName("testing")
577
// Only space one has all three servers in it
578
c.Assert(st.getMongoSpaceName(), gc.Equals, "testing")
579
c.Assert(st.getMongoSpaceState(), gc.Equals, state.MongoSpaceValid)
583
func (s *workerSuite) TestMongoSpaceNotCalculatedWhenSpacesNotSupported(c *gc.C) {
584
DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
585
st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, false)
587
for i, machine := range machines {
588
// machine 10 gets a host port in space one
589
// machine 11 gets host ports in spaces one and two
590
// machine 12 gets host ports in spaces one, two and three
591
st.machine(machine).setMongoHostPorts(hostPorts[0 : i+1])
594
// Set some garbage up to check that it isn't overwritten
595
st.SetOrGetMongoSpaceName("garbage")
596
st.SetMongoSpaceState(state.MongoSpaceUnknown)
598
// Start a worker that doesn't support spaces
599
w, err := newWorker(st, noPublisher{}, false)
600
c.Assert(err, jc.ErrorIsNil)
601
runWorkerUntilMongoStateIs(c, st, w.(*pgWorker), state.MongoSpaceUnsupported)
603
// Only space one has all three servers in it
604
c.Assert(st.getMongoSpaceName(), gc.Equals, "garbage")
605
c.Assert(st.getMongoSpaceState(), gc.Equals, state.MongoSpaceUnsupported)
609
func (s *workerSuite) TestWorkerRetriesOnPublishError(c *gc.C) {
610
DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
611
s.PatchValue(&pollInterval, coretesting.LongWait+time.Second)
612
s.PatchValue(&initialRetryInterval, 5*time.Millisecond)
613
s.PatchValue(&maxRetryInterval, initialRetryInterval)
615
publishCh := make(chan [][]network.HostPort, 100)
618
publish := func(apiServers [][]network.HostPort, instanceIds []instance.Id) error {
619
publishCh <- apiServers
622
return fmt.Errorf("publish error")
627
InitState(c, st, 3, ipVersion)
629
w, err := newWorker(st, PublisherFunc(publish), false)
630
c.Assert(err, jc.ErrorIsNil)
631
defer workertest.CleanKill(c, w)
633
for i := 0; i < 4; i++ {
635
case servers := <-publishCh:
636
AssertAPIHostPorts(c, servers, ExpectedAPIHostPorts(3, ipVersion))
637
case <-time.After(coretesting.LongWait):
638
c.Fatalf("timed out waiting for publish #%d", i)
643
c.Errorf("unexpected publish event")
644
case <-time.After(coretesting.ShortWait):
649
func (s *workerSuite) TestWorkerPublishesInstanceIds(c *gc.C) {
650
DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
651
s.PatchValue(&pollInterval, coretesting.LongWait+time.Second)
652
s.PatchValue(&initialRetryInterval, 5*time.Millisecond)
653
s.PatchValue(&maxRetryInterval, initialRetryInterval)
655
publishCh := make(chan []instance.Id, 100)
657
publish := func(apiServers [][]network.HostPort, instanceIds []instance.Id) error {
658
publishCh <- instanceIds
662
InitState(c, st, 3, ipVersion)
664
w, err := newWorker(st, PublisherFunc(publish), false)
665
c.Assert(err, jc.ErrorIsNil)
666
defer workertest.CleanKill(c, w)
669
case instanceIds := <-publishCh:
670
c.Assert(instanceIds, jc.SameContents, []instance.Id{"id-10", "id-11", "id-12"})
671
case <-time.After(coretesting.LongWait):
672
c.Errorf("timed out waiting for publish")
677
// mustNext waits for w's value to be set and returns it.
678
func mustNext(c *gc.C, w *voyeur.Watcher) (val interface{}) {
679
type voyeurResult struct {
683
done := make(chan voyeurResult)
685
c.Logf("mustNext %p", w)
688
c.Logf("mustNext done %p, ok: %v, val: %#v", w, ok, val)
689
done <- voyeurResult{ok, val}
692
case result := <-done:
693
c.Assert(result.ok, jc.IsTrue)
695
case <-time.After(coretesting.LongWait):
696
c.Fatalf("timed out waiting for value to be set")
701
type noPublisher struct{}
703
func (noPublisher) publishAPIServers(apiServers [][]network.HostPort, instanceIds []instance.Id) error {