~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/peergrouper/worker_test.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2014 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package peergrouper
 
5
 
 
6
import (
 
7
        "errors"
 
8
        "fmt"
 
9
        "time"
 
10
 
 
11
        "github.com/juju/replicaset"
 
12
        jc "github.com/juju/testing/checkers"
 
13
        "github.com/juju/utils/voyeur"
 
14
        gc "gopkg.in/check.v1"
 
15
 
 
16
        "github.com/juju/juju/instance"
 
17
        "github.com/juju/juju/network"
 
18
        "github.com/juju/juju/state"
 
19
        coretesting "github.com/juju/juju/testing"
 
20
        "github.com/juju/juju/worker/workertest"
 
21
)
 
22
 
 
23
type TestIPVersion struct {
 
24
        version           string
 
25
        formatHostPort    string
 
26
        formatHost        string
 
27
        machineFormatHost string
 
28
        extraHostPort     string
 
29
        extraHost         string
 
30
        extraAddress      string
 
31
        addressType       network.AddressType
 
32
}
 
33
 
 
34
var (
 
35
        testIPv4 = TestIPVersion{
 
36
                version:           "IPv4",
 
37
                formatHostPort:    "0.1.2.%d:%d",
 
38
                formatHost:        "0.1.2.%d",
 
39
                machineFormatHost: "0.1.2.%d",
 
40
                extraHostPort:     "0.1.99.99:9876",
 
41
                extraHost:         "0.1.99.13",
 
42
                extraAddress:      "0.1.99.13:1234",
 
43
                addressType:       network.IPv4Address,
 
44
        }
 
45
        testIPv6 = TestIPVersion{
 
46
                version:           "IPv6",
 
47
                formatHostPort:    "[2001:DB8::%d]:%d",
 
48
                formatHost:        "[2001:DB8::%d]",
 
49
                machineFormatHost: "2001:DB8::%d",
 
50
                extraHostPort:     "[2001:DB8::99:99]:9876",
 
51
                extraHost:         "2001:DB8::99:13",
 
52
                extraAddress:      "[2001:DB8::99:13]:1234",
 
53
                addressType:       network.IPv6Address,
 
54
        }
 
55
)
 
56
 
 
57
// DoTestForIPv4AndIPv6 runs the passed test for IPv4 and IPv6.
 
58
func DoTestForIPv4AndIPv6(t func(ipVersion TestIPVersion)) {
 
59
        t(testIPv4)
 
60
        t(testIPv6)
 
61
}
 
62
 
 
63
type workerSuite struct {
 
64
        coretesting.BaseSuite
 
65
}
 
66
 
 
67
var _ = gc.Suite(&workerSuite{})
 
68
 
 
69
func (s *workerSuite) SetUpTest(c *gc.C) {
 
70
        s.BaseSuite.SetUpTest(c)
 
71
}
 
72
 
 
73
// InitState initializes the fake state with a single
 
74
// replicaset member and numMachines machines
 
75
// primed to vote.
 
76
func InitState(c *gc.C, st *fakeState, numMachines int, ipVersion TestIPVersion) {
 
77
        var ids []string
 
78
        for i := 10; i < 10+numMachines; i++ {
 
79
                id := fmt.Sprint(i)
 
80
                m := st.addMachine(id, true)
 
81
                m.setInstanceId(instance.Id("id-" + id))
 
82
                m.setStateHostPort(fmt.Sprintf(ipVersion.formatHostPort, i, mongoPort))
 
83
                ids = append(ids, id)
 
84
                c.Assert(m.MongoHostPorts(), gc.HasLen, 1)
 
85
 
 
86
                m.setAPIHostPorts(network.NewHostPorts(
 
87
                        apiPort, fmt.Sprintf(ipVersion.formatHost, i),
 
88
                ))
 
89
        }
 
90
        st.machine("10").SetHasVote(true)
 
91
        st.setControllers(ids...)
 
92
        st.session.Set(mkMembers("0v", ipVersion))
 
93
        st.session.setStatus(mkStatuses("0p", ipVersion))
 
94
        st.check = checkInvariants
 
95
}
 
96
 
 
97
// ExpectedAPIHostPorts returns the expected addresses
 
98
// of the machines as created by InitState.
 
99
func ExpectedAPIHostPorts(n int, ipVersion TestIPVersion) [][]network.HostPort {
 
100
        servers := make([][]network.HostPort, n)
 
101
        for i := range servers {
 
102
                servers[i] = network.NewHostPorts(
 
103
                        apiPort,
 
104
                        fmt.Sprintf(ipVersion.formatHost, i+10),
 
105
                )
 
106
        }
 
107
        return servers
 
108
}
 
109
 
 
110
func (s *workerSuite) TestSetsAndUpdatesMembers(c *gc.C) {
 
111
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
112
                s.PatchValue(&pollInterval, 5*time.Millisecond)
 
113
 
 
114
                st := NewFakeState()
 
115
                InitState(c, st, 3, ipVersion)
 
116
 
 
117
                memberWatcher := st.session.members.Watch()
 
118
                mustNext(c, memberWatcher)
 
119
                assertMembers(c, memberWatcher.Value(), mkMembers("0v", ipVersion))
 
120
 
 
121
                logger.Infof("starting worker")
 
122
                w, err := newWorker(st, noPublisher{}, false)
 
123
                c.Assert(err, jc.ErrorIsNil)
 
124
                defer workertest.CleanKill(c, w)
 
125
 
 
126
                // Wait for the worker to set the initial members.
 
127
                mustNext(c, memberWatcher)
 
128
                assertMembers(c, memberWatcher.Value(), mkMembers("0v 1 2", ipVersion))
 
129
 
 
130
                // Update the status of the new members
 
131
                // and check that they become voting.
 
132
                c.Logf("updating new member status")
 
133
                st.session.setStatus(mkStatuses("0p 1s 2s", ipVersion))
 
134
                mustNext(c, memberWatcher)
 
135
                assertMembers(c, memberWatcher.Value(), mkMembers("0v 1v 2v", ipVersion))
 
136
 
 
137
                c.Logf("adding another machine")
 
138
                // Add another machine.
 
139
                m13 := st.addMachine("13", false)
 
140
                m13.setStateHostPort(fmt.Sprintf(ipVersion.formatHostPort, 13, mongoPort))
 
141
                st.setControllers("10", "11", "12", "13")
 
142
 
 
143
                c.Logf("waiting for new member to be added")
 
144
                mustNext(c, memberWatcher)
 
145
                assertMembers(c, memberWatcher.Value(), mkMembers("0v 1v 2v 3", ipVersion))
 
146
 
 
147
                // Remove vote from an existing member;
 
148
                // and give it to the new machine.
 
149
                // Also set the status of the new machine to
 
150
                // healthy.
 
151
                c.Logf("removing vote from machine 10 and adding it to machine 13")
 
152
                st.machine("10").setWantsVote(false)
 
153
                st.machine("13").setWantsVote(true)
 
154
 
 
155
                st.session.setStatus(mkStatuses("0p 1s 2s 3s", ipVersion))
 
156
 
 
157
                // Check that the new machine gets the vote and the
 
158
                // old machine loses it.
 
159
                c.Logf("waiting for vote switch")
 
160
                mustNext(c, memberWatcher)
 
161
                assertMembers(c, memberWatcher.Value(), mkMembers("0 1v 2v 3v", ipVersion))
 
162
 
 
163
                c.Logf("removing old machine")
 
164
                // Remove the old machine.
 
165
                st.removeMachine("10")
 
166
                st.setControllers("11", "12", "13")
 
167
 
 
168
                // Check that it's removed from the members.
 
169
                c.Logf("waiting for removal")
 
170
                mustNext(c, memberWatcher)
 
171
                assertMembers(c, memberWatcher.Value(), mkMembers("1v 2v 3v", ipVersion))
 
172
        })
 
173
}
 
174
 
 
175
func (s *workerSuite) TestHasVoteMaintainedEvenWhenReplicaSetFails(c *gc.C) {
 
176
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
177
                st := NewFakeState()
 
178
 
 
179
                // Simulate a state where we have four controllers,
 
180
                // one has gone down, and we're replacing it:
 
181
                // 0 - hasvote true, wantsvote false, down
 
182
                // 1 - hasvote true, wantsvote true
 
183
                // 2 - hasvote true, wantsvote true
 
184
                // 3 - hasvote false, wantsvote true
 
185
                //
 
186
                // When it starts, the worker should move the vote from
 
187
                // 0 to 3. We'll arrange things so that it will succeed in
 
188
                // setting the membership but fail setting the HasVote
 
189
                // to false.
 
190
                InitState(c, st, 4, ipVersion)
 
191
                st.machine("10").SetHasVote(true)
 
192
                st.machine("11").SetHasVote(true)
 
193
                st.machine("12").SetHasVote(true)
 
194
                st.machine("13").SetHasVote(false)
 
195
 
 
196
                st.machine("10").setWantsVote(false)
 
197
                st.machine("11").setWantsVote(true)
 
198
                st.machine("12").setWantsVote(true)
 
199
                st.machine("13").setWantsVote(true)
 
200
 
 
201
                st.session.Set(mkMembers("0v 1v 2v 3", ipVersion))
 
202
                st.session.setStatus(mkStatuses("0H 1p 2s 3s", ipVersion))
 
203
 
 
204
                // Make the worker fail to set HasVote to false
 
205
                // after changing the replica set membership.
 
206
                st.errors.setErrorFor("Machine.SetHasVote * false", errors.New("frood"))
 
207
 
 
208
                memberWatcher := st.session.members.Watch()
 
209
                mustNext(c, memberWatcher)
 
210
                assertMembers(c, memberWatcher.Value(), mkMembers("0v 1v 2v 3", ipVersion))
 
211
 
 
212
                w, err := newWorker(st, noPublisher{}, false)
 
213
                c.Assert(err, jc.ErrorIsNil)
 
214
                done := make(chan error)
 
215
                go func() {
 
216
                        done <- w.Wait()
 
217
                }()
 
218
 
 
219
                // Wait for the worker to set the initial members.
 
220
                mustNext(c, memberWatcher)
 
221
                assertMembers(c, memberWatcher.Value(), mkMembers("0 1v 2v 3v", ipVersion))
 
222
 
 
223
                // The worker should encounter an error setting the
 
224
                // has-vote status to false and exit.
 
225
                select {
 
226
                case err := <-done:
 
227
                        c.Assert(err, gc.ErrorMatches, `cannot set HasVote removed: cannot set voting status of "[0-9]+" to false: frood`)
 
228
                case <-time.After(coretesting.LongWait):
 
229
                        c.Fatalf("timed out waiting for worker to exit")
 
230
                }
 
231
 
 
232
                // Start the worker again - although the membership should
 
233
                // not change, the HasVote status should be updated correctly.
 
234
                st.errors.resetErrors()
 
235
                w, err = newWorker(st, noPublisher{}, false)
 
236
                c.Assert(err, jc.ErrorIsNil)
 
237
                defer workertest.CleanKill(c, w)
 
238
 
 
239
                // Watch all the machines for changes, so we can check
 
240
                // their has-vote status without polling.
 
241
                changed := make(chan struct{}, 1)
 
242
                for i := 10; i < 14; i++ {
 
243
                        watcher := st.machine(fmt.Sprint(i)).val.Watch()
 
244
                        defer watcher.Close()
 
245
                        go func() {
 
246
                                for watcher.Next() {
 
247
                                        select {
 
248
                                        case changed <- struct{}{}:
 
249
                                        default:
 
250
                                        }
 
251
                                }
 
252
                        }()
 
253
                }
 
254
                timeout := time.After(coretesting.LongWait)
 
255
        loop:
 
256
                for {
 
257
                        select {
 
258
                        case <-changed:
 
259
                                correct := true
 
260
                                for i := 10; i < 14; i++ {
 
261
                                        hasVote := st.machine(fmt.Sprint(i)).HasVote()
 
262
                                        expectHasVote := i != 10
 
263
                                        if hasVote != expectHasVote {
 
264
                                                correct = false
 
265
                                        }
 
266
                                }
 
267
                                if correct {
 
268
                                        break loop
 
269
                                }
 
270
                        case <-timeout:
 
271
                                c.Fatalf("timed out waiting for vote to be set")
 
272
                        }
 
273
                }
 
274
        })
 
275
}
 
276
 
 
277
func (s *workerSuite) TestAddressChange(c *gc.C) {
 
278
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
279
                st := NewFakeState()
 
280
                InitState(c, st, 3, ipVersion)
 
281
 
 
282
                memberWatcher := st.session.members.Watch()
 
283
                mustNext(c, memberWatcher)
 
284
                assertMembers(c, memberWatcher.Value(), mkMembers("0v", ipVersion))
 
285
 
 
286
                logger.Infof("starting worker")
 
287
                w, err := newWorker(st, noPublisher{}, false)
 
288
                c.Assert(err, jc.ErrorIsNil)
 
289
                defer workertest.CleanKill(c, w)
 
290
 
 
291
                // Wait for the worker to set the initial members.
 
292
                mustNext(c, memberWatcher)
 
293
                assertMembers(c, memberWatcher.Value(), mkMembers("0v 1 2", ipVersion))
 
294
 
 
295
                // Change an address and wait for it to be changed in the
 
296
                // members.
 
297
                st.machine("11").setStateHostPort(ipVersion.extraHostPort)
 
298
 
 
299
                mustNext(c, memberWatcher)
 
300
                expectMembers := mkMembers("0v 1 2", ipVersion)
 
301
                expectMembers[1].Address = ipVersion.extraHostPort
 
302
                assertMembers(c, memberWatcher.Value(), expectMembers)
 
303
        })
 
304
}
 
305
 
 
306
var fatalErrorsTests = []struct {
 
307
        errPattern string
 
308
        err        error
 
309
        expectErr  string
 
310
}{{
 
311
        errPattern: "State.ControllerInfo",
 
312
        expectErr:  "cannot get controller info: sample",
 
313
}, {
 
314
        errPattern: "Machine.SetHasVote 11 true",
 
315
        expectErr:  `cannot set HasVote added: cannot set voting status of "11" to true: sample`,
 
316
}, {
 
317
        errPattern: "Session.CurrentStatus",
 
318
        expectErr:  "cannot get peergrouper info: cannot get replica set status: sample",
 
319
}, {
 
320
        errPattern: "Session.CurrentMembers",
 
321
        expectErr:  "cannot get peergrouper info: cannot get replica set members: sample",
 
322
}, {
 
323
        errPattern: "State.Machine *",
 
324
        expectErr:  `cannot get machine "10": sample`,
 
325
}, {
 
326
        errPattern: "Machine.InstanceId *",
 
327
        expectErr:  `cannot get API server info: sample`,
 
328
}}
 
329
 
 
330
func (s *workerSuite) TestFatalErrors(c *gc.C) {
 
331
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
332
                s.PatchValue(&pollInterval, 5*time.Millisecond)
 
333
                for i, testCase := range fatalErrorsTests {
 
334
                        c.Logf("test %d: %s -> %s", i, testCase.errPattern, testCase.expectErr)
 
335
                        st := NewFakeState()
 
336
                        st.session.InstantlyReady = true
 
337
                        InitState(c, st, 3, ipVersion)
 
338
                        st.errors.setErrorFor(testCase.errPattern, errors.New("sample"))
 
339
                        w, err := newWorker(st, noPublisher{}, false)
 
340
                        c.Assert(err, jc.ErrorIsNil)
 
341
                        done := make(chan error)
 
342
                        go func() {
 
343
                                done <- w.Wait()
 
344
                        }()
 
345
                        select {
 
346
                        case err := <-done:
 
347
                                c.Assert(err, gc.ErrorMatches, testCase.expectErr)
 
348
                        case <-time.After(coretesting.LongWait):
 
349
                                c.Fatalf("timed out waiting for error")
 
350
                        }
 
351
                }
 
352
        })
 
353
}
 
354
 
 
355
func (s *workerSuite) TestSetMembersErrorIsNotFatal(c *gc.C) {
 
356
        coretesting.SkipIfI386(c, "lp:1425569")
 
357
 
 
358
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
359
                st := NewFakeState()
 
360
                InitState(c, st, 3, ipVersion)
 
361
                st.session.setStatus(mkStatuses("0p 1s 2s", ipVersion))
 
362
                var setCount voyeur.Value
 
363
                st.errors.setErrorFuncFor("Session.Set", func() error {
 
364
                        setCount.Set(true)
 
365
                        return errors.New("sample")
 
366
                })
 
367
                s.PatchValue(&initialRetryInterval, 10*time.Microsecond)
 
368
                s.PatchValue(&maxRetryInterval, coretesting.ShortWait/4)
 
369
 
 
370
                w, err := newWorker(st, noPublisher{}, false)
 
371
                c.Assert(err, jc.ErrorIsNil)
 
372
                defer workertest.CleanKill(c, w)
 
373
 
 
374
                // See that the worker is retrying.
 
375
                setCountW := setCount.Watch()
 
376
                mustNext(c, setCountW)
 
377
                mustNext(c, setCountW)
 
378
                mustNext(c, setCountW)
 
379
        })
 
380
}
 
381
 
 
382
type PublisherFunc func(apiServers [][]network.HostPort, instanceIds []instance.Id) error
 
383
 
 
384
func (f PublisherFunc) publishAPIServers(apiServers [][]network.HostPort, instanceIds []instance.Id) error {
 
385
        return f(apiServers, instanceIds)
 
386
}
 
387
 
 
388
func (s *workerSuite) TestControllersArePublished(c *gc.C) {
 
389
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
390
                publishCh := make(chan [][]network.HostPort)
 
391
                publish := func(apiServers [][]network.HostPort, instanceIds []instance.Id) error {
 
392
                        publishCh <- apiServers
 
393
                        return nil
 
394
                }
 
395
 
 
396
                st := NewFakeState()
 
397
                InitState(c, st, 3, ipVersion)
 
398
                w, err := newWorker(st, PublisherFunc(publish), false)
 
399
                c.Assert(err, jc.ErrorIsNil)
 
400
                defer workertest.CleanKill(c, w)
 
401
 
 
402
                select {
 
403
                case servers := <-publishCh:
 
404
                        AssertAPIHostPorts(c, servers, ExpectedAPIHostPorts(3, ipVersion))
 
405
                case <-time.After(coretesting.LongWait):
 
406
                        c.Fatalf("timed out waiting for publish")
 
407
                }
 
408
 
 
409
                // Change one of the servers' API addresses and check that it's published.
 
410
                var newMachine10APIHostPorts []network.HostPort
 
411
                newMachine10APIHostPorts = network.NewHostPorts(apiPort, ipVersion.extraHost)
 
412
                st.machine("10").setAPIHostPorts(newMachine10APIHostPorts)
 
413
                select {
 
414
                case servers := <-publishCh:
 
415
                        expected := ExpectedAPIHostPorts(3, ipVersion)
 
416
                        expected[0] = newMachine10APIHostPorts
 
417
                        AssertAPIHostPorts(c, servers, expected)
 
418
                case <-time.After(coretesting.LongWait):
 
419
                        c.Fatalf("timed out waiting for publish")
 
420
                }
 
421
        })
 
422
}
 
423
 
 
424
func hostPortInSpace(address, spaceName string) network.HostPort {
 
425
        netAddress := network.Address{
 
426
                Value:     address,
 
427
                Type:      network.IPv4Address,
 
428
                Scope:     network.ScopeUnknown,
 
429
                SpaceName: network.SpaceName(spaceName),
 
430
        }
 
431
        return network.HostPort{
 
432
                Address: netAddress,
 
433
                Port:    4711,
 
434
        }
 
435
}
 
436
 
 
437
func mongoSpaceTestCommonSetup(c *gc.C, ipVersion TestIPVersion, noSpaces bool) (*fakeState, []string, []network.HostPort) {
 
438
        st := NewFakeState()
 
439
        InitState(c, st, 3, ipVersion)
 
440
        var hostPorts []network.HostPort
 
441
 
 
442
        if noSpaces {
 
443
                hostPorts = []network.HostPort{
 
444
                        hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 1), ""),
 
445
                        hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 2), ""),
 
446
                        hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 3), ""),
 
447
                }
 
448
        } else {
 
449
                hostPorts = []network.HostPort{
 
450
                        hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 1), "one"),
 
451
                        hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 2), "two"),
 
452
                        hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 3), "three"),
 
453
                }
 
454
        }
 
455
 
 
456
        machines := []string{"10", "11", "12"}
 
457
        for _, machine := range machines {
 
458
                st.machine(machine).SetHasVote(true)
 
459
                st.machine(machine).setWantsVote(true)
 
460
        }
 
461
 
 
462
        st.session.Set(mkMembers("0v 1v 2v", ipVersion))
 
463
 
 
464
        return st, machines, hostPorts
 
465
}
 
466
 
 
467
func startWorkerSupportingSpaces(c *gc.C, st *fakeState, ipVersion TestIPVersion) *pgWorker {
 
468
        w, err := newWorker(st, noPublisher{}, true)
 
469
        c.Assert(err, jc.ErrorIsNil)
 
470
        return w.(*pgWorker)
 
471
}
 
472
 
 
473
func runWorkerUntilMongoStateIs(c *gc.C, st *fakeState, w *pgWorker, mss state.MongoSpaceStates) {
 
474
        changes := st.controllers.Watch()
 
475
        changes.Next()
 
476
        for st.getMongoSpaceState() != mss {
 
477
                changes.Next()
 
478
        }
 
479
        workertest.CleanKill(c, w)
 
480
}
 
481
 
 
482
func (s *workerSuite) TestMongoFindAndUseSpace(c *gc.C) {
 
483
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
484
                st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, false)
 
485
 
 
486
                for i, machine := range machines {
 
487
                        // machine 10 gets a host port in space one
 
488
                        // machine 11 gets host ports in spaces one and two
 
489
                        // machine 12 gets host ports in spaces one, two and three
 
490
                        st.machine(machine).setMongoHostPorts(hostPorts[0 : i+1])
 
491
                }
 
492
 
 
493
                w := startWorkerSupportingSpaces(c, st, ipVersion)
 
494
                runWorkerUntilMongoStateIs(c, st, w, state.MongoSpaceValid)
 
495
 
 
496
                // Only space one has all three servers in it
 
497
                c.Assert(st.getMongoSpaceName(), gc.Equals, "one")
 
498
 
 
499
                // All machines have the same address in this test for simplicity. The
 
500
                // space three address is 0.0.0.3 giving us the host port of 0.0.0.3:4711
 
501
                members := st.session.members.Get().([]replicaset.Member)
 
502
                c.Assert(members, gc.HasLen, 3)
 
503
                for i := 0; i < 3; i++ {
 
504
                        c.Assert(members[i].Address, gc.Equals, fmt.Sprintf(ipVersion.formatHostPort, 1, 4711))
 
505
                }
 
506
        })
 
507
}
 
508
 
 
509
func (s *workerSuite) TestMongoErrorNoCommonSpace(c *gc.C) {
 
510
        c.Skip("dimitern: test disabled as it needs refactoring")
 
511
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
512
                st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, false)
 
513
 
 
514
                for i, machine := range machines {
 
515
                        // machine 10 gets a host port in space one
 
516
                        // machine 11 gets a host port in space two
 
517
                        // machine 12 gets a host port in space three
 
518
                        st.machine(machine).setMongoHostPorts(hostPorts[i : i+1])
 
519
                }
 
520
 
 
521
                w := startWorkerSupportingSpaces(c, st, ipVersion)
 
522
                done := make(chan error)
 
523
                go func() {
 
524
                        done <- w.Wait()
 
525
                }()
 
526
                select {
 
527
                case err := <-done:
 
528
                        c.Assert(err, gc.ErrorMatches, ".*couldn't find a space containing all peer group machines")
 
529
                case <-time.After(coretesting.LongWait):
 
530
                        c.Fatalf("timed out waiting for worker to exit")
 
531
                }
 
532
 
 
533
                // Each machine is in a unique space, so the Mongo space should be empty
 
534
                c.Assert(st.getMongoSpaceName(), gc.Equals, "")
 
535
                c.Assert(st.getMongoSpaceState(), gc.Equals, state.MongoSpaceInvalid)
 
536
        })
 
537
}
 
538
 
 
539
func (s *workerSuite) TestMongoNoSpaces(c *gc.C) {
 
540
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
541
                st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, true)
 
542
 
 
543
                for i, machine := range machines {
 
544
                        st.machine(machine).setMongoHostPorts(hostPorts[i : i+1])
 
545
                }
 
546
 
 
547
                w := startWorkerSupportingSpaces(c, st, ipVersion)
 
548
                runWorkerUntilMongoStateIs(c, st, w, state.MongoSpaceValid)
 
549
 
 
550
                // Only space one has all three servers in it
 
551
                c.Assert(st.getMongoSpaceName(), gc.Equals, "")
 
552
        })
 
553
}
 
554
 
 
555
func (s *workerSuite) TestMongoSpaceNotOverwritten(c *gc.C) {
 
556
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
557
                st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, false)
 
558
 
 
559
                for i, machine := range machines {
 
560
                        // machine 10 gets a host port in space one
 
561
                        // machine 11 gets host ports in spaces one and two
 
562
                        // machine 12 gets host ports in spaces one, two and three
 
563
                        st.machine(machine).setMongoHostPorts(hostPorts[0 : i+1])
 
564
                }
 
565
 
 
566
                w := startWorkerSupportingSpaces(c, st, ipVersion)
 
567
                runWorkerUntilMongoStateIs(c, st, w, state.MongoSpaceValid)
 
568
 
 
569
                // Only space one has all three servers in it
 
570
                c.Assert(st.getMongoSpaceName(), gc.Equals, "one")
 
571
 
 
572
                // Set st.mongoSpaceName to something different
 
573
 
 
574
                st.SetMongoSpaceState(state.MongoSpaceUnknown)
 
575
                st.SetOrGetMongoSpaceName("testing")
 
576
 
 
577
                // Only space one has all three servers in it
 
578
                c.Assert(st.getMongoSpaceName(), gc.Equals, "testing")
 
579
                c.Assert(st.getMongoSpaceState(), gc.Equals, state.MongoSpaceValid)
 
580
        })
 
581
}
 
582
 
 
583
func (s *workerSuite) TestMongoSpaceNotCalculatedWhenSpacesNotSupported(c *gc.C) {
 
584
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
585
                st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, false)
 
586
 
 
587
                for i, machine := range machines {
 
588
                        // machine 10 gets a host port in space one
 
589
                        // machine 11 gets host ports in spaces one and two
 
590
                        // machine 12 gets host ports in spaces one, two and three
 
591
                        st.machine(machine).setMongoHostPorts(hostPorts[0 : i+1])
 
592
                }
 
593
 
 
594
                // Set some garbage up to check that it isn't overwritten
 
595
                st.SetOrGetMongoSpaceName("garbage")
 
596
                st.SetMongoSpaceState(state.MongoSpaceUnknown)
 
597
 
 
598
                // Start a worker that doesn't support spaces
 
599
                w, err := newWorker(st, noPublisher{}, false)
 
600
                c.Assert(err, jc.ErrorIsNil)
 
601
                runWorkerUntilMongoStateIs(c, st, w.(*pgWorker), state.MongoSpaceUnsupported)
 
602
 
 
603
                // Only space one has all three servers in it
 
604
                c.Assert(st.getMongoSpaceName(), gc.Equals, "garbage")
 
605
                c.Assert(st.getMongoSpaceState(), gc.Equals, state.MongoSpaceUnsupported)
 
606
        })
 
607
}
 
608
 
 
609
func (s *workerSuite) TestWorkerRetriesOnPublishError(c *gc.C) {
 
610
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
611
                s.PatchValue(&pollInterval, coretesting.LongWait+time.Second)
 
612
                s.PatchValue(&initialRetryInterval, 5*time.Millisecond)
 
613
                s.PatchValue(&maxRetryInterval, initialRetryInterval)
 
614
 
 
615
                publishCh := make(chan [][]network.HostPort, 100)
 
616
 
 
617
                count := 0
 
618
                publish := func(apiServers [][]network.HostPort, instanceIds []instance.Id) error {
 
619
                        publishCh <- apiServers
 
620
                        count++
 
621
                        if count <= 3 {
 
622
                                return fmt.Errorf("publish error")
 
623
                        }
 
624
                        return nil
 
625
                }
 
626
                st := NewFakeState()
 
627
                InitState(c, st, 3, ipVersion)
 
628
 
 
629
                w, err := newWorker(st, PublisherFunc(publish), false)
 
630
                c.Assert(err, jc.ErrorIsNil)
 
631
                defer workertest.CleanKill(c, w)
 
632
 
 
633
                for i := 0; i < 4; i++ {
 
634
                        select {
 
635
                        case servers := <-publishCh:
 
636
                                AssertAPIHostPorts(c, servers, ExpectedAPIHostPorts(3, ipVersion))
 
637
                        case <-time.After(coretesting.LongWait):
 
638
                                c.Fatalf("timed out waiting for publish #%d", i)
 
639
                        }
 
640
                }
 
641
                select {
 
642
                case <-publishCh:
 
643
                        c.Errorf("unexpected publish event")
 
644
                case <-time.After(coretesting.ShortWait):
 
645
                }
 
646
        })
 
647
}
 
648
 
 
649
func (s *workerSuite) TestWorkerPublishesInstanceIds(c *gc.C) {
 
650
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
651
                s.PatchValue(&pollInterval, coretesting.LongWait+time.Second)
 
652
                s.PatchValue(&initialRetryInterval, 5*time.Millisecond)
 
653
                s.PatchValue(&maxRetryInterval, initialRetryInterval)
 
654
 
 
655
                publishCh := make(chan []instance.Id, 100)
 
656
 
 
657
                publish := func(apiServers [][]network.HostPort, instanceIds []instance.Id) error {
 
658
                        publishCh <- instanceIds
 
659
                        return nil
 
660
                }
 
661
                st := NewFakeState()
 
662
                InitState(c, st, 3, ipVersion)
 
663
 
 
664
                w, err := newWorker(st, PublisherFunc(publish), false)
 
665
                c.Assert(err, jc.ErrorIsNil)
 
666
                defer workertest.CleanKill(c, w)
 
667
 
 
668
                select {
 
669
                case instanceIds := <-publishCh:
 
670
                        c.Assert(instanceIds, jc.SameContents, []instance.Id{"id-10", "id-11", "id-12"})
 
671
                case <-time.After(coretesting.LongWait):
 
672
                        c.Errorf("timed out waiting for publish")
 
673
                }
 
674
        })
 
675
}
 
676
 
 
677
// mustNext waits for w's value to be set and returns it.
 
678
func mustNext(c *gc.C, w *voyeur.Watcher) (val interface{}) {
 
679
        type voyeurResult struct {
 
680
                ok  bool
 
681
                val interface{}
 
682
        }
 
683
        done := make(chan voyeurResult)
 
684
        go func() {
 
685
                c.Logf("mustNext %p", w)
 
686
                ok := w.Next()
 
687
                val = w.Value()
 
688
                c.Logf("mustNext done %p, ok: %v, val: %#v", w, ok, val)
 
689
                done <- voyeurResult{ok, val}
 
690
        }()
 
691
        select {
 
692
        case result := <-done:
 
693
                c.Assert(result.ok, jc.IsTrue)
 
694
                return result.val
 
695
        case <-time.After(coretesting.LongWait):
 
696
                c.Fatalf("timed out waiting for value to be set")
 
697
        }
 
698
        panic("unreachable")
 
699
}
 
700
 
 
701
type noPublisher struct{}
 
702
 
 
703
func (noPublisher) publishAPIServers(apiServers [][]network.HostPort, instanceIds []instance.Id) error {
 
704
        return nil
 
705
}