1
// Copyright 2012, 2013 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
10
gitjujutesting "github.com/juju/testing"
11
jc "github.com/juju/testing/checkers"
12
gc "gopkg.in/check.v1"
17
"github.com/juju/juju/state/watcher"
18
"github.com/juju/juju/testing"
21
// Test tuning parameters.
23
// worstCase is used for timeouts when timing out
24
// will fail the test. Raising this value should
25
// not affect the overall running time of the tests
27
worstCase = testing.LongWait
29
// justLongEnough is used for timeouts that
30
// are expected to happen for a test to complete
31
// successfully. Reducing this value will make
32
// the tests run faster at the expense of making them
33
// fail more often on heavily loaded or slow hardware.
34
justLongEnough = testing.ShortWait
36
// fastPeriod specifies the period of the watcher for
37
// tests where the timing is not critical.
38
fastPeriod = 10 * time.Millisecond
40
// slowPeriod specifies the period of the watcher
41
// for tests where the timing is important.
42
slowPeriod = 1 * time.Second
45
func TestPackage(t *stdtesting.T) {
46
testing.MgoTestPackage(t)
49
type watcherSuite struct {
50
gitjujutesting.MgoSuite
57
ch chan watcher.Change
58
oldPeriod time.Duration
61
// FastPeriodSuite implements tests that should
62
// work regardless of the watcher refresh period.
63
type FastPeriodSuite struct {
67
func (s *FastPeriodSuite) SetUpSuite(c *gc.C) {
68
s.watcherSuite.SetUpSuite(c)
69
watcher.Period = fastPeriod
72
var _ = gc.Suite(&FastPeriodSuite{})
74
func (s *watcherSuite) SetUpSuite(c *gc.C) {
75
s.BaseSuite.SetUpSuite(c)
76
s.MgoSuite.SetUpSuite(c)
77
s.oldPeriod = watcher.Period
80
func (s *watcherSuite) TearDownSuite(c *gc.C) {
81
s.MgoSuite.TearDownSuite(c)
82
s.BaseSuite.TearDownSuite(c)
83
watcher.Period = s.oldPeriod
86
func (s *watcherSuite) SetUpTest(c *gc.C) {
87
s.BaseSuite.SetUpTest(c)
88
s.MgoSuite.SetUpTest(c)
90
db := s.MgoSuite.Session.DB("juju")
91
s.log = db.C("txnlog")
92
s.log.Create(&mgo.CollectionInfo{
96
s.stash = db.C("txn.stash")
97
s.runner = txn.NewRunner(db.C("txn"))
98
s.runner.ChangeLog(s.log)
99
s.w = watcher.New(s.log)
100
s.ch = make(chan watcher.Change)
103
func (s *watcherSuite) TearDownTest(c *gc.C) {
104
c.Assert(s.w.Stop(), gc.IsNil)
106
s.MgoSuite.TearDownTest(c)
107
s.BaseSuite.TearDownTest(c)
110
type M map[string]interface{}
112
func assertChange(c *gc.C, watch <-chan watcher.Change, want watcher.Change) {
116
c.Fatalf("watch reported %v, want %v", got, want)
118
case <-time.After(worstCase):
119
c.Fatalf("watch reported nothing, want %v", want)
123
func assertNoChange(c *gc.C, watch <-chan watcher.Change) {
126
c.Fatalf("watch reported %v, want nothing", got)
127
case <-time.After(justLongEnough):
131
func assertOrder(c *gc.C, revnos ...int64) {
133
for _, revno := range revnos {
135
c.Fatalf("got bad revno sequence: %v", revnos)
141
func (s *watcherSuite) revno(c string, id interface{}) (revno int64) {
143
Revno int64 `bson:"txn-revno"`
145
err := s.log.Database.C(c).FindId(id).One(&doc)
152
func (s *watcherSuite) insert(c *gc.C, coll string, id interface{}) (revno int64) {
153
ops := []txn.Op{{C: coll, Id: id, Insert: M{"n": 1}}}
154
err := s.runner.Run(ops, "", nil)
158
revno = s.revno(coll, id)
159
c.Logf("insert(%#v, %#v) => revno %d", coll, id, revno)
163
func (s *watcherSuite) insertAll(c *gc.C, coll string, ids ...interface{}) (revnos []int64) {
165
for _, id := range ids {
166
ops = append(ops, txn.Op{C: coll, Id: id, Insert: M{"n": 1}})
168
err := s.runner.Run(ops, "", nil)
172
for _, id := range ids {
173
revnos = append(revnos, s.revno(coll, id))
175
c.Logf("insertAll(%#v, %v) => revnos %v", coll, ids, revnos)
179
func (s *watcherSuite) update(c *gc.C, coll string, id interface{}) (revno int64) {
180
ops := []txn.Op{{C: coll, Id: id, Update: M{"$inc": M{"n": 1}}}}
181
err := s.runner.Run(ops, "", nil)
185
revno = s.revno(coll, id)
186
c.Logf("update(%#v, %#v) => revno %d", coll, id, revno)
190
func (s *watcherSuite) remove(c *gc.C, coll string, id interface{}) (revno int64) {
191
ops := []txn.Op{{C: coll, Id: id, Remove: true}}
192
err := s.runner.Run(ops, "", nil)
196
c.Logf("remove(%#v, %#v) => revno -1", coll, id)
200
func (s *FastPeriodSuite) TestErrAndDead(c *gc.C) {
201
c.Assert(s.w.Err(), gc.Equals, tomb.ErrStillAlive)
204
c.Fatalf("Dead channel fired unexpectedly")
207
c.Assert(s.w.Stop(), gc.IsNil)
208
c.Assert(s.w.Err(), gc.IsNil)
212
c.Fatalf("Dead channel should have fired")
216
func (s *FastPeriodSuite) TestWatchBeforeKnown(c *gc.C) {
217
s.w.Watch("test", "a", -1, s.ch)
218
assertNoChange(c, s.ch)
220
revno := s.insert(c, "test", "a")
223
assertChange(c, s.ch, watcher.Change{"test", "a", revno})
224
assertNoChange(c, s.ch)
226
assertOrder(c, -1, revno)
229
func (s *FastPeriodSuite) TestWatchAfterKnown(c *gc.C) {
230
revno := s.insert(c, "test", "a")
234
s.w.Watch("test", "a", -1, s.ch)
235
assertChange(c, s.ch, watcher.Change{"test", "a", revno})
236
assertNoChange(c, s.ch)
238
assertOrder(c, -1, revno)
241
func (s *FastPeriodSuite) TestWatchIgnoreUnwatched(c *gc.C) {
242
s.w.Watch("test", "a", -1, s.ch)
243
assertNoChange(c, s.ch)
245
s.insert(c, "test", "b")
248
assertNoChange(c, s.ch)
251
func (s *FastPeriodSuite) TestWatchOrder(c *gc.C) {
253
for _, id := range []string{"a", "b", "c", "d"} {
254
s.w.Watch("test", id, -1, s.ch)
256
revno1 := s.insert(c, "test", "a")
257
revno2 := s.insert(c, "test", "b")
258
revno3 := s.insert(c, "test", "c")
261
assertChange(c, s.ch, watcher.Change{"test", "a", revno1})
262
assertChange(c, s.ch, watcher.Change{"test", "b", revno2})
263
assertChange(c, s.ch, watcher.Change{"test", "c", revno3})
264
assertNoChange(c, s.ch)
267
func (s *FastPeriodSuite) TestTransactionWithMultiple(c *gc.C) {
269
for _, id := range []string{"a", "b", "c"} {
270
s.w.Watch("test", id, -1, s.ch)
272
revnos := s.insertAll(c, "test", "a", "b", "c")
274
assertChange(c, s.ch, watcher.Change{"test", "a", revnos[0]})
275
assertChange(c, s.ch, watcher.Change{"test", "b", revnos[1]})
276
assertChange(c, s.ch, watcher.Change{"test", "c", revnos[2]})
277
assertNoChange(c, s.ch)
280
func (s *FastPeriodSuite) TestWatchMultipleChannels(c *gc.C) {
281
ch1 := make(chan watcher.Change)
282
ch2 := make(chan watcher.Change)
283
ch3 := make(chan watcher.Change)
284
s.w.Watch("test1", 1, -1, ch1)
285
s.w.Watch("test2", 2, -1, ch2)
286
s.w.Watch("test3", 3, -1, ch3)
287
revno1 := s.insert(c, "test1", 1)
288
revno2 := s.insert(c, "test2", 2)
289
revno3 := s.insert(c, "test3", 3)
291
s.w.Unwatch("test2", 2, ch2)
292
assertChange(c, ch1, watcher.Change{"test1", 1, revno1})
294
assertChange(c, ch3, watcher.Change{"test3", 3, revno3})
295
assertNoChange(c, ch1)
296
assertNoChange(c, ch2)
297
assertNoChange(c, ch3)
300
func (s *FastPeriodSuite) TestIgnoreAncientHistory(c *gc.C) {
301
s.insert(c, "test", "a")
303
w := watcher.New(s.log)
307
w.Watch("test", "a", -1, s.ch)
308
assertNoChange(c, s.ch)
311
func (s *FastPeriodSuite) TestUpdate(c *gc.C) {
312
s.w.Watch("test", "a", -1, s.ch)
313
assertNoChange(c, s.ch)
315
revno1 := s.insert(c, "test", "a")
317
assertChange(c, s.ch, watcher.Change{"test", "a", revno1})
318
assertNoChange(c, s.ch)
320
revno2 := s.update(c, "test", "a")
322
assertChange(c, s.ch, watcher.Change{"test", "a", revno2})
324
assertOrder(c, -1, revno1, revno2)
327
func (s *FastPeriodSuite) TestRemove(c *gc.C) {
328
s.w.Watch("test", "a", -1, s.ch)
329
assertNoChange(c, s.ch)
331
revno1 := s.insert(c, "test", "a")
333
assertChange(c, s.ch, watcher.Change{"test", "a", revno1})
334
assertNoChange(c, s.ch)
336
revno2 := s.remove(c, "test", "a")
338
assertChange(c, s.ch, watcher.Change{"test", "a", -1})
339
assertNoChange(c, s.ch)
341
revno3 := s.insert(c, "test", "a")
343
assertChange(c, s.ch, watcher.Change{"test", "a", revno3})
344
assertNoChange(c, s.ch)
346
assertOrder(c, revno2, revno1)
347
assertOrder(c, revno2, revno3)
350
func (s *FastPeriodSuite) TestWatchKnownRemove(c *gc.C) {
351
revno1 := s.insert(c, "test", "a")
352
revno2 := s.remove(c, "test", "a")
355
s.w.Watch("test", "a", revno1, s.ch)
356
assertChange(c, s.ch, watcher.Change{"test", "a", revno2})
358
assertOrder(c, revno2, revno1)
361
func (s *FastPeriodSuite) TestScale(c *gc.C) {
365
c.Logf("Creating %d documents, %d per transaction...", N, T)
366
ops := make([]txn.Op, T)
367
for i := 0; i < (N / T); i++ {
369
for j := 0; j < T && i*T+j < N; j++ {
370
ops = append(ops, txn.Op{C: "test", Id: i*T + j, Insert: M{"n": 1}})
372
err := s.runner.Run(ops, "", nil)
373
c.Assert(err, jc.ErrorIsNil)
376
c.Logf("Watching all documents...")
377
for i := 0; i < N; i++ {
378
s.w.Watch("test", i, -1, s.ch)
381
c.Logf("Forcing a refresh...")
384
count, err := s.Session.DB("juju").C("test").Count()
385
c.Assert(err, jc.ErrorIsNil)
386
c.Logf("Got %d documents in the collection...", count)
387
c.Assert(count, gc.Equals, N)
389
c.Logf("Reading all changes...")
390
seen := make(map[interface{}]bool)
391
for i := 0; i < N; i++ {
393
case change := <-s.ch:
394
seen[change.Id] = true
395
case <-time.After(worstCase):
396
c.Fatalf("not enough changes: got %d, want %d", len(seen), N)
399
c.Assert(len(seen), gc.Equals, N)
402
func (s *FastPeriodSuite) TestWatchUnwatchOnQueue(c *gc.C) {
404
for i := 0; i < N; i++ {
405
s.insert(c, "test", i)
408
for i := 0; i < N; i++ {
409
s.w.Watch("test", i, -1, s.ch)
411
for i := 1; i < N; i += 2 {
412
s.w.Unwatch("test", i, s.ch)
415
seen := make(map[interface{}]bool)
416
for i := 0; i < N/2; i++ {
418
case change := <-s.ch:
419
seen[change.Id] = true
420
case <-time.After(worstCase):
421
c.Fatalf("not enough changes: got %d, want %d", len(seen), N/2)
424
c.Assert(len(seen), gc.Equals, N/2)
425
assertNoChange(c, s.ch)
428
func (s *FastPeriodSuite) TestStartSync(c *gc.C) {
429
s.w.Watch("test", "a", -1, s.ch)
431
revno := s.insert(c, "test", "a")
433
done := make(chan bool)
443
case <-time.After(worstCase):
444
c.Fatalf("StartSync failed to return")
447
assertChange(c, s.ch, watcher.Change{"test", "a", revno})
450
func (s *FastPeriodSuite) TestWatchCollection(c *gc.C) {
451
chA1 := make(chan watcher.Change)
452
chB1 := make(chan watcher.Change)
453
chA := make(chan watcher.Change)
454
chB := make(chan watcher.Change)
456
s.w.Watch("testA", 1, -1, chA1)
457
s.w.Watch("testB", 1, -1, chB1)
458
s.w.WatchCollection("testA", chA)
459
s.w.WatchCollection("testB", chB)
461
revno1 := s.insert(c, "testA", 1)
462
revno2 := s.insert(c, "testA", 2)
463
revno3 := s.insert(c, "testB", 1)
464
revno4 := s.insert(c, "testB", 2)
468
seen := map[chan<- watcher.Change][]watcher.Change{}
469
timeout := time.After(testing.LongWait)
475
seen[chA1] = append(seen[chA1], chg)
477
seen[chB1] = append(seen[chB1], chg)
479
seen[chA] = append(seen[chA], chg)
481
seen[chB] = append(seen[chB], chg)
488
c.Check(seen[chA1], gc.DeepEquals, []watcher.Change{{"testA", 1, revno1}})
489
c.Check(seen[chB1], gc.DeepEquals, []watcher.Change{{"testB", 1, revno3}})
490
c.Check(seen[chA], gc.DeepEquals, []watcher.Change{{"testA", 1, revno1}, {"testA", 2, revno2}})
491
c.Check(seen[chB], gc.DeepEquals, []watcher.Change{{"testB", 1, revno3}, {"testB", 2, revno4}})
496
s.w.UnwatchCollection("testB", chB)
497
s.w.Unwatch("testB", 1, chB1)
499
revno1 = s.update(c, "testA", 1)
503
timeout = time.After(testing.LongWait)
504
seen = map[chan<- watcher.Change][]watcher.Change{}
510
seen[chA1] = append(seen[chA1], chg)
512
seen[chB1] = append(seen[chB1], chg)
514
seen[chA] = append(seen[chA], chg)
516
seen[chB] = append(seen[chB], chg)
522
c.Check(seen[chA1], gc.DeepEquals, []watcher.Change{{"testA", 1, revno1}})
523
c.Check(seen[chB1], gc.IsNil)
524
c.Check(seen[chA], gc.DeepEquals, []watcher.Change{{"testA", 1, revno1}})
525
c.Check(seen[chB], gc.IsNil)
527
// Check that no extra events arrive.
528
seen = map[chan<- watcher.Change][]watcher.Change{}
529
timeout = time.After(testing.ShortWait)
534
seen[chA1] = append(seen[chA1], chg)
536
seen[chB1] = append(seen[chB1], chg)
538
seen[chA] = append(seen[chA], chg)
540
seen[chB] = append(seen[chB], chg)
545
c.Check(seen[chA1], gc.IsNil)
546
c.Check(seen[chB1], gc.IsNil)
547
c.Check(seen[chA], gc.IsNil)
548
c.Check(seen[chB], gc.IsNil)
551
func (s *FastPeriodSuite) TestUnwatchCollectionWithFilter(c *gc.C) {
552
filter := func(key interface{}) bool {
556
chA := make(chan watcher.Change)
557
s.w.WatchCollectionWithFilter("testA", chA, filter)
558
revnoA := s.insert(c, "testA", 1)
559
assertChange(c, chA, watcher.Change{"testA", 1, revnoA})
560
s.insert(c, "testA", 2)
561
assertNoChange(c, chA)
562
s.insert(c, "testA", 3)
564
assertChange(c, chA, watcher.Change{"testA", 3, revnoA})
567
func (s *FastPeriodSuite) TestUnwatchCollectionWithOutstandingRequest(c *gc.C) {
568
chA := make(chan watcher.Change)
569
s.w.WatchCollection("testA", chA)
570
chB := make(chan watcher.Change)
571
s.w.Watch("testB", 1, -1, chB)
572
revnoA := s.insert(c, "testA", 1)
573
s.insert(c, "testA", 2)
574
// By inserting this *after* the testA document, we ensure that
575
// the watcher will try to send on chB after sending on chA.
576
// The original bug that this test guards against meant that the
577
// UnwatchCollection did not correctly cancel the outstanding
578
// request, so the loop would never get around to sending on
580
revnoB := s.insert(c, "testB", 1)
582
// When we receive the first change on chA, we know that
583
// the watcher is trying to send changes on all the
584
// watcher channels (2 changes on chA and 1 change on chB).
585
assertChange(c, chA, watcher.Change{"testA", 1, revnoA})
586
s.w.UnwatchCollection("testA", chA)
587
assertChange(c, chB, watcher.Change{"testB", 1, revnoB})
590
func (s *FastPeriodSuite) TestNonMutatingTxn(c *gc.C) {
591
chA1 := make(chan watcher.Change)
592
chA := make(chan watcher.Change)
594
revno1 := s.insert(c, "test", "a")
598
s.w.Watch("test", 1, revno1, chA1)
599
s.w.WatchCollection("test", chA)
601
revno2 := s.insert(c, "test", "a")
603
c.Assert(revno1, gc.Equals, revno2)
607
assertNoChange(c, chA1)
608
assertNoChange(c, chA)
611
// SlowPeriodSuite implements tests
612
// that are flaky when the watcher refresh period
614
type SlowPeriodSuite struct {
618
func (s *SlowPeriodSuite) SetUpSuite(c *gc.C) {
619
s.watcherSuite.SetUpSuite(c)
620
watcher.Period = slowPeriod
623
var _ = gc.Suite(&SlowPeriodSuite{})
625
func (s *SlowPeriodSuite) TestWatchBeforeRemoveKnown(c *gc.C) {
626
revno1 := s.insert(c, "test", "a")
628
revno2 := s.remove(c, "test", "a")
630
s.w.Watch("test", "a", -1, s.ch)
631
assertChange(c, s.ch, watcher.Change{"test", "a", revno1})
633
assertChange(c, s.ch, watcher.Change{"test", "a", revno2})
635
assertOrder(c, revno2, revno1)
638
func (s *SlowPeriodSuite) TestDoubleUpdate(c *gc.C) {
639
assertNoChange(c, s.ch)
641
revno1 := s.insert(c, "test", "a")
644
revno2 := s.update(c, "test", "a")
645
revno3 := s.update(c, "test", "a")
647
s.w.Watch("test", "a", revno2, s.ch)
648
assertNoChange(c, s.ch)
651
assertChange(c, s.ch, watcher.Change{"test", "a", revno3})
652
assertNoChange(c, s.ch)
654
assertOrder(c, -1, revno1, revno2, revno3)
657
func (s *SlowPeriodSuite) TestWatchPeriod(c *gc.C) {
658
revno1 := s.insert(c, "test", "a")
661
s.w.Watch("test", "a", revno1, s.ch)
662
revno2 := s.update(c, "test", "a")
664
leeway := watcher.Period / 4
667
gotPeriod := time.Since(t0)
668
c.Assert(got, gc.Equals, watcher.Change{"test", "a", revno2})
669
if gotPeriod < watcher.Period-leeway {
670
c.Fatalf("watcher not waiting long enough; got %v want %v", gotPeriod, watcher.Period)
672
case <-time.After(watcher.Period + leeway):
673
gotPeriod := time.Since(t0)
674
c.Fatalf("watcher waited too long; got %v want %v", gotPeriod, watcher.Period)
677
assertOrder(c, -1, revno1, revno2)
680
func (s *SlowPeriodSuite) TestStartSyncStartsImmediately(c *gc.C) {
681
// Ensure we're at the start of a sync cycle.
683
time.Sleep(justLongEnough)
685
// Watching after StartSync should see the current state of affairs.
686
revno := s.insert(c, "test", "a")
688
s.w.Watch("test", "a", -1, s.ch)
691
c.Assert(got.Revno, gc.Equals, revno)
692
case <-time.After(watcher.Period / 2):
693
c.Fatalf("watch after StartSync is still using old info")
696
s.remove(c, "test", "a")
698
ch := make(chan watcher.Change)
699
s.w.Watch("test", "a", -1, ch)
702
c.Fatalf("got event %#v when starting watcher after doc was removed", got)
703
case <-time.After(justLongEnough):