~juju-qa/ubuntu/xenial/juju/xenial-2.0-beta3

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/peergrouper/worker_test.go

  • Committer: Martin Packman
  • Date: 2016-03-30 19:31:08 UTC
  • mfrom: (1.1.41)
  • Revision ID: martin.packman@canonical.com-20160330193108-h9iz3ak334uk0z5r
Merge new upstream source 2.0~beta3

Show diffs side-by-side

added added

removed removed

Lines of Context:
8
8
        "fmt"
9
9
        "time"
10
10
 
 
11
        "github.com/juju/replicaset"
11
12
        jc "github.com/juju/testing/checkers"
12
13
        "github.com/juju/utils/voyeur"
13
14
        gc "gopkg.in/check.v1"
14
15
 
15
16
        "github.com/juju/juju/instance"
16
17
        "github.com/juju/juju/network"
 
18
        "github.com/juju/juju/state"
17
19
        coretesting "github.com/juju/juju/testing"
18
20
        "github.com/juju/juju/worker"
19
21
)
117
119
                assertMembers(c, memberWatcher.Value(), mkMembers("0v", ipVersion))
118
120
 
119
121
                logger.Infof("starting worker")
120
 
                w := newWorker(st, noPublisher{})
 
122
                w := newWorker(st, noPublisher{}, false)
121
123
                defer func() {
122
124
                        c.Check(worker.Stop(w), gc.IsNil)
123
125
                }()
208
210
                mustNext(c, memberWatcher)
209
211
                assertMembers(c, memberWatcher.Value(), mkMembers("0v 1v 2v 3", ipVersion))
210
212
 
211
 
                w := newWorker(st, noPublisher{})
 
213
                w := newWorker(st, noPublisher{}, false)
212
214
                done := make(chan error)
213
215
                go func() {
214
216
                        done <- w.Wait()
230
232
                // Start the worker again - although the membership should
231
233
                // not change, the HasVote status should be updated correctly.
232
234
                st.errors.resetErrors()
233
 
                w = newWorker(st, noPublisher{})
 
235
                w = newWorker(st, noPublisher{}, false)
234
236
 
235
237
                // Watch all the machines for changes, so we can check
236
238
                // their has-vote status without polling.
280
282
                assertMembers(c, memberWatcher.Value(), mkMembers("0v", ipVersion))
281
283
 
282
284
                logger.Infof("starting worker")
283
 
                w := newWorker(st, noPublisher{})
 
285
                w := newWorker(st, noPublisher{}, false)
284
286
                defer func() {
285
287
                        c.Check(worker.Stop(w), gc.IsNil)
286
288
                }()
333
335
                        st.session.InstantlyReady = true
334
336
                        InitState(c, st, 3, ipVersion)
335
337
                        st.errors.setErrorFor(testCase.errPattern, errors.New("sample"))
336
 
                        w := newWorker(st, noPublisher{})
 
338
                        w := newWorker(st, noPublisher{}, false)
337
339
                        done := make(chan error)
338
340
                        go func() {
339
341
                                done <- w.Wait()
363
365
                s.PatchValue(&initialRetryInterval, 10*time.Microsecond)
364
366
                s.PatchValue(&maxRetryInterval, coretesting.ShortWait/4)
365
367
 
366
 
                w := newWorker(st, noPublisher{})
 
368
                w := newWorker(st, noPublisher{}, false)
367
369
                defer func() {
368
370
                        c.Check(worker.Stop(w), gc.IsNil)
369
371
                }()
392
394
 
393
395
                st := NewFakeState()
394
396
                InitState(c, st, 3, ipVersion)
395
 
                w := newWorker(st, PublisherFunc(publish))
 
397
                w := newWorker(st, PublisherFunc(publish), false)
396
398
                defer func() {
397
399
                        c.Check(worker.Stop(w), gc.IsNil)
398
400
                }()
418
420
        })
419
421
}
420
422
 
 
423
func hostPortInSpace(address, spaceName string) network.HostPort {
 
424
        netAddress := network.Address{
 
425
                Value:       address,
 
426
                Type:        network.IPv4Address,
 
427
                NetworkName: "net",
 
428
                Scope:       network.ScopeUnknown,
 
429
                SpaceName:   network.SpaceName(spaceName),
 
430
        }
 
431
        return network.HostPort{netAddress, 4711}
 
432
}
 
433
 
 
434
func mongoSpaceTestCommonSetup(c *gc.C, ipVersion TestIPVersion, noSpaces bool) (*fakeState, []string, []network.HostPort) {
 
435
        st := NewFakeState()
 
436
        InitState(c, st, 3, ipVersion)
 
437
        var hostPorts []network.HostPort
 
438
 
 
439
        if noSpaces {
 
440
                hostPorts = []network.HostPort{
 
441
                        hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 1), ""),
 
442
                        hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 2), ""),
 
443
                        hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 3), ""),
 
444
                }
 
445
        } else {
 
446
                hostPorts = []network.HostPort{
 
447
                        hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 1), "one"),
 
448
                        hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 2), "two"),
 
449
                        hostPortInSpace(fmt.Sprintf(ipVersion.machineFormatHost, 3), "three"),
 
450
                }
 
451
        }
 
452
 
 
453
        machines := []string{"10", "11", "12"}
 
454
        for _, machine := range machines {
 
455
                st.machine(machine).SetHasVote(true)
 
456
                st.machine(machine).setWantsVote(true)
 
457
        }
 
458
 
 
459
        st.session.Set(mkMembers("0v 1v 2v", ipVersion))
 
460
 
 
461
        return st, machines, hostPorts
 
462
}
 
463
 
 
464
func startWorkerSupportingSpaces(st *fakeState, ipVersion TestIPVersion) *pgWorker {
 
465
        return newWorker(st, noPublisher{}, true).(*pgWorker)
 
466
}
 
467
 
 
468
func runWorkerUntilMongoStateIs(c *gc.C, st *fakeState, w *pgWorker, mss state.MongoSpaceStates) {
 
469
        changes := st.controllers.Watch()
 
470
        changes.Next()
 
471
        for st.getMongoSpaceState() != mss {
 
472
                changes.Next()
 
473
        }
 
474
        c.Check(worker.Stop(w), gc.IsNil)
 
475
}
 
476
 
 
477
func (s *workerSuite) TestMongoFindAndUseSpace(c *gc.C) {
 
478
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
479
                st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, false)
 
480
 
 
481
                for i, machine := range machines {
 
482
                        // machine 10 gets a host port in space one
 
483
                        // machine 11 gets host ports in spaces one and two
 
484
                        // machine 12 gets host ports in spaces one, two and three
 
485
                        st.machine(machine).setMongoHostPorts(hostPorts[0 : i+1])
 
486
                }
 
487
 
 
488
                w := startWorkerSupportingSpaces(st, ipVersion)
 
489
                runWorkerUntilMongoStateIs(c, st, w, state.MongoSpaceValid)
 
490
 
 
491
                // Only space one has all three servers in it
 
492
                c.Assert(st.getMongoSpaceName(), gc.Equals, "one")
 
493
 
 
494
                // All machines have the same address in this test for simplicity. The
 
495
                // space three address is 0.0.0.3 giving us the host port of 0.0.0.3:4711
 
496
                members := st.session.members.Get().([]replicaset.Member)
 
497
                c.Assert(members, gc.HasLen, 3)
 
498
                for i := 0; i < 3; i++ {
 
499
                        c.Assert(members[i].Address, gc.Equals, fmt.Sprintf(ipVersion.formatHostPort, 1, 4711))
 
500
                }
 
501
        })
 
502
}
 
503
 
 
504
func (s *workerSuite) TestMongoErrorNoCommonSpace(c *gc.C) {
 
505
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
506
                st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, false)
 
507
 
 
508
                for i, machine := range machines {
 
509
                        // machine 10 gets a host port in space one
 
510
                        // machine 11 gets a host port in space two
 
511
                        // machine 12 gets a host port in space three
 
512
                        st.machine(machine).setMongoHostPorts(hostPorts[i : i+1])
 
513
                }
 
514
 
 
515
                w := startWorkerSupportingSpaces(st, ipVersion)
 
516
                done := make(chan error)
 
517
                go func() {
 
518
                        done <- w.Wait()
 
519
                }()
 
520
                select {
 
521
                case err := <-done:
 
522
                        c.Assert(err, gc.ErrorMatches, ".*couldn't find a space containing all peer group machines")
 
523
                case <-time.After(coretesting.LongWait):
 
524
                        c.Fatalf("timed out waiting for worker to exit")
 
525
                }
 
526
 
 
527
                // Each machine is in a unique space, so the Mongo space should be empty
 
528
                c.Assert(st.getMongoSpaceName(), gc.Equals, "")
 
529
                c.Assert(st.getMongoSpaceState(), gc.Equals, state.MongoSpaceInvalid)
 
530
        })
 
531
}
 
532
 
 
533
func (s *workerSuite) TestMongoNoSpaces(c *gc.C) {
 
534
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
535
                st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, true)
 
536
 
 
537
                for i, machine := range machines {
 
538
                        st.machine(machine).setMongoHostPorts(hostPorts[i : i+1])
 
539
                }
 
540
 
 
541
                w := startWorkerSupportingSpaces(st, ipVersion)
 
542
                runWorkerUntilMongoStateIs(c, st, w, state.MongoSpaceValid)
 
543
 
 
544
                // Only space one has all three servers in it
 
545
                c.Assert(st.getMongoSpaceName(), gc.Equals, "")
 
546
        })
 
547
}
 
548
 
 
549
func (s *workerSuite) TestMongoSpaceNotOverwritten(c *gc.C) {
 
550
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
551
                st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, false)
 
552
 
 
553
                for i, machine := range machines {
 
554
                        // machine 10 gets a host port in space one
 
555
                        // machine 11 gets host ports in spaces one and two
 
556
                        // machine 12 gets host ports in spaces one, two and three
 
557
                        st.machine(machine).setMongoHostPorts(hostPorts[0 : i+1])
 
558
                }
 
559
 
 
560
                w := startWorkerSupportingSpaces(st, ipVersion)
 
561
                runWorkerUntilMongoStateIs(c, st, w, state.MongoSpaceValid)
 
562
 
 
563
                // Only space one has all three servers in it
 
564
                c.Assert(st.getMongoSpaceName(), gc.Equals, "one")
 
565
 
 
566
                // Set st.mongoSpaceName to something different
 
567
 
 
568
                st.SetMongoSpaceState(state.MongoSpaceUnknown)
 
569
                st.SetOrGetMongoSpaceName("testing")
 
570
 
 
571
                // Manually run getMongoSpace - it should do nothing because we already have
 
572
                // a space. If it did re-calculate the space name it will change back to "one".
 
573
                w.getMongoSpace(&peerGroupInfo{})
 
574
 
 
575
                // Only space one has all three servers in it
 
576
                c.Assert(st.getMongoSpaceName(), gc.Equals, "testing")
 
577
                c.Assert(st.getMongoSpaceState(), gc.Equals, state.MongoSpaceValid)
 
578
        })
 
579
}
 
580
 
 
581
func (s *workerSuite) TestMongoSpaceNotCalculatedWhenSpacesNotSupported(c *gc.C) {
 
582
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
 
583
                st, machines, hostPorts := mongoSpaceTestCommonSetup(c, ipVersion, false)
 
584
 
 
585
                for i, machine := range machines {
 
586
                        // machine 10 gets a host port in space one
 
587
                        // machine 11 gets host ports in spaces one and two
 
588
                        // machine 12 gets host ports in spaces one, two and three
 
589
                        st.machine(machine).setMongoHostPorts(hostPorts[0 : i+1])
 
590
                }
 
591
 
 
592
                // Set some garbage up to check that it isn't overwritten
 
593
                st.SetOrGetMongoSpaceName("garbage")
 
594
                st.SetMongoSpaceState(state.MongoSpaceUnknown)
 
595
 
 
596
                // Start a worker that doesn't support spaces
 
597
                w := newWorker(st, noPublisher{}, false).(*pgWorker)
 
598
                runWorkerUntilMongoStateIs(c, st, w, state.MongoSpaceUnsupported)
 
599
 
 
600
                // Only space one has all three servers in it
 
601
                c.Assert(st.getMongoSpaceName(), gc.Equals, "garbage")
 
602
                c.Assert(st.getMongoSpaceState(), gc.Equals, state.MongoSpaceUnsupported)
 
603
        })
 
604
}
 
605
 
421
606
func (s *workerSuite) TestWorkerRetriesOnPublishError(c *gc.C) {
422
607
        DoTestForIPv4AndIPv6(func(ipVersion TestIPVersion) {
423
608
                s.PatchValue(&pollInterval, coretesting.LongWait+time.Second)
438
623
                st := NewFakeState()
439
624
                InitState(c, st, 3, ipVersion)
440
625
 
441
 
                w := newWorker(st, PublisherFunc(publish))
 
626
                w := newWorker(st, PublisherFunc(publish), false)
442
627
                defer func() {
443
628
                        c.Check(worker.Stop(w), gc.IsNil)
444
629
                }()
474
659
                st := NewFakeState()
475
660
                InitState(c, st, 3, ipVersion)
476
661
 
477
 
                w := newWorker(st, PublisherFunc(publish))
 
662
                w := newWorker(st, PublisherFunc(publish), false)
478
663
                defer func() {
479
664
                        c.Check(worker.Stop(w), gc.IsNil)
480
665
                }()