~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/state/watcher/watcher_test.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2012, 2013 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package watcher_test
 
5
 
 
6
import (
 
7
        stdtesting "testing"
 
8
        "time"
 
9
 
 
10
        gitjujutesting "github.com/juju/testing"
 
11
        jc "github.com/juju/testing/checkers"
 
12
        gc "gopkg.in/check.v1"
 
13
        "gopkg.in/mgo.v2"
 
14
        "gopkg.in/mgo.v2/txn"
 
15
        "launchpad.net/tomb"
 
16
 
 
17
        "github.com/juju/juju/state/watcher"
 
18
        "github.com/juju/juju/testing"
 
19
)
 
20
 
 
21
// Test tuning parameters.
 
22
const (
 
23
        // worstCase is used for timeouts when timing out
 
24
        // will fail the test. Raising this value should
 
25
        // not affect the overall running time of the tests
 
26
        // unless they fail.
 
27
        worstCase = testing.LongWait
 
28
 
 
29
        // justLongEnough is used for timeouts that
 
30
        // are expected to happen for a test to complete
 
31
        // successfully. Reducing this value will make
 
32
        // the tests run faster at the expense of making them
 
33
        // fail more often on heavily loaded or slow hardware.
 
34
        justLongEnough = testing.ShortWait
 
35
 
 
36
        // fastPeriod specifies the period of the watcher for
 
37
        // tests where the timing is not critical.
 
38
        fastPeriod = 10 * time.Millisecond
 
39
 
 
40
        // slowPeriod specifies the period of the watcher
 
41
        // for tests where the timing is important.
 
42
        slowPeriod = 1 * time.Second
 
43
)
 
44
 
 
45
func TestPackage(t *stdtesting.T) {
 
46
        testing.MgoTestPackage(t)
 
47
}
 
48
 
 
49
type watcherSuite struct {
 
50
        gitjujutesting.MgoSuite
 
51
        testing.BaseSuite
 
52
 
 
53
        log       *mgo.Collection
 
54
        stash     *mgo.Collection
 
55
        runner    *txn.Runner
 
56
        w         *watcher.Watcher
 
57
        ch        chan watcher.Change
 
58
        oldPeriod time.Duration
 
59
}
 
60
 
 
61
// FastPeriodSuite implements tests that should
 
62
// work regardless of the watcher refresh period.
 
63
type FastPeriodSuite struct {
 
64
        watcherSuite
 
65
}
 
66
 
 
67
func (s *FastPeriodSuite) SetUpSuite(c *gc.C) {
 
68
        s.watcherSuite.SetUpSuite(c)
 
69
        watcher.Period = fastPeriod
 
70
}
 
71
 
 
72
var _ = gc.Suite(&FastPeriodSuite{})
 
73
 
 
74
func (s *watcherSuite) SetUpSuite(c *gc.C) {
 
75
        s.BaseSuite.SetUpSuite(c)
 
76
        s.MgoSuite.SetUpSuite(c)
 
77
        s.oldPeriod = watcher.Period
 
78
}
 
79
 
 
80
func (s *watcherSuite) TearDownSuite(c *gc.C) {
 
81
        s.MgoSuite.TearDownSuite(c)
 
82
        s.BaseSuite.TearDownSuite(c)
 
83
        watcher.Period = s.oldPeriod
 
84
}
 
85
 
 
86
func (s *watcherSuite) SetUpTest(c *gc.C) {
 
87
        s.BaseSuite.SetUpTest(c)
 
88
        s.MgoSuite.SetUpTest(c)
 
89
 
 
90
        db := s.MgoSuite.Session.DB("juju")
 
91
        s.log = db.C("txnlog")
 
92
        s.log.Create(&mgo.CollectionInfo{
 
93
                Capped:   true,
 
94
                MaxBytes: 1000000,
 
95
        })
 
96
        s.stash = db.C("txn.stash")
 
97
        s.runner = txn.NewRunner(db.C("txn"))
 
98
        s.runner.ChangeLog(s.log)
 
99
        s.w = watcher.New(s.log)
 
100
        s.ch = make(chan watcher.Change)
 
101
}
 
102
 
 
103
func (s *watcherSuite) TearDownTest(c *gc.C) {
 
104
        c.Assert(s.w.Stop(), gc.IsNil)
 
105
 
 
106
        s.MgoSuite.TearDownTest(c)
 
107
        s.BaseSuite.TearDownTest(c)
 
108
}
 
109
 
 
110
type M map[string]interface{}
 
111
 
 
112
func assertChange(c *gc.C, watch <-chan watcher.Change, want watcher.Change) {
 
113
        select {
 
114
        case got := <-watch:
 
115
                if got != want {
 
116
                        c.Fatalf("watch reported %v, want %v", got, want)
 
117
                }
 
118
        case <-time.After(worstCase):
 
119
                c.Fatalf("watch reported nothing, want %v", want)
 
120
        }
 
121
}
 
122
 
 
123
func assertNoChange(c *gc.C, watch <-chan watcher.Change) {
 
124
        select {
 
125
        case got := <-watch:
 
126
                c.Fatalf("watch reported %v, want nothing", got)
 
127
        case <-time.After(justLongEnough):
 
128
        }
 
129
}
 
130
 
 
131
func assertOrder(c *gc.C, revnos ...int64) {
 
132
        last := int64(-2)
 
133
        for _, revno := range revnos {
 
134
                if revno <= last {
 
135
                        c.Fatalf("got bad revno sequence: %v", revnos)
 
136
                }
 
137
                last = revno
 
138
        }
 
139
}
 
140
 
 
141
func (s *watcherSuite) revno(c string, id interface{}) (revno int64) {
 
142
        var doc struct {
 
143
                Revno int64 `bson:"txn-revno"`
 
144
        }
 
145
        err := s.log.Database.C(c).FindId(id).One(&doc)
 
146
        if err != nil {
 
147
                panic(err)
 
148
        }
 
149
        return doc.Revno
 
150
}
 
151
 
 
152
func (s *watcherSuite) insert(c *gc.C, coll string, id interface{}) (revno int64) {
 
153
        ops := []txn.Op{{C: coll, Id: id, Insert: M{"n": 1}}}
 
154
        err := s.runner.Run(ops, "", nil)
 
155
        if err != nil {
 
156
                panic(err)
 
157
        }
 
158
        revno = s.revno(coll, id)
 
159
        c.Logf("insert(%#v, %#v) => revno %d", coll, id, revno)
 
160
        return revno
 
161
}
 
162
 
 
163
func (s *watcherSuite) insertAll(c *gc.C, coll string, ids ...interface{}) (revnos []int64) {
 
164
        var ops []txn.Op
 
165
        for _, id := range ids {
 
166
                ops = append(ops, txn.Op{C: coll, Id: id, Insert: M{"n": 1}})
 
167
        }
 
168
        err := s.runner.Run(ops, "", nil)
 
169
        if err != nil {
 
170
                panic(err)
 
171
        }
 
172
        for _, id := range ids {
 
173
                revnos = append(revnos, s.revno(coll, id))
 
174
        }
 
175
        c.Logf("insertAll(%#v, %v) => revnos %v", coll, ids, revnos)
 
176
        return revnos
 
177
}
 
178
 
 
179
func (s *watcherSuite) update(c *gc.C, coll string, id interface{}) (revno int64) {
 
180
        ops := []txn.Op{{C: coll, Id: id, Update: M{"$inc": M{"n": 1}}}}
 
181
        err := s.runner.Run(ops, "", nil)
 
182
        if err != nil {
 
183
                panic(err)
 
184
        }
 
185
        revno = s.revno(coll, id)
 
186
        c.Logf("update(%#v, %#v) => revno %d", coll, id, revno)
 
187
        return revno
 
188
}
 
189
 
 
190
func (s *watcherSuite) remove(c *gc.C, coll string, id interface{}) (revno int64) {
 
191
        ops := []txn.Op{{C: coll, Id: id, Remove: true}}
 
192
        err := s.runner.Run(ops, "", nil)
 
193
        if err != nil {
 
194
                panic(err)
 
195
        }
 
196
        c.Logf("remove(%#v, %#v) => revno -1", coll, id)
 
197
        return -1
 
198
}
 
199
 
 
200
func (s *FastPeriodSuite) TestErrAndDead(c *gc.C) {
 
201
        c.Assert(s.w.Err(), gc.Equals, tomb.ErrStillAlive)
 
202
        select {
 
203
        case <-s.w.Dead():
 
204
                c.Fatalf("Dead channel fired unexpectedly")
 
205
        default:
 
206
        }
 
207
        c.Assert(s.w.Stop(), gc.IsNil)
 
208
        c.Assert(s.w.Err(), gc.IsNil)
 
209
        select {
 
210
        case <-s.w.Dead():
 
211
        default:
 
212
                c.Fatalf("Dead channel should have fired")
 
213
        }
 
214
}
 
215
 
 
216
func (s *FastPeriodSuite) TestWatchBeforeKnown(c *gc.C) {
 
217
        s.w.Watch("test", "a", -1, s.ch)
 
218
        assertNoChange(c, s.ch)
 
219
 
 
220
        revno := s.insert(c, "test", "a")
 
221
 
 
222
        s.w.StartSync()
 
223
        assertChange(c, s.ch, watcher.Change{"test", "a", revno})
 
224
        assertNoChange(c, s.ch)
 
225
 
 
226
        assertOrder(c, -1, revno)
 
227
}
 
228
 
 
229
func (s *FastPeriodSuite) TestWatchAfterKnown(c *gc.C) {
 
230
        revno := s.insert(c, "test", "a")
 
231
 
 
232
        s.w.StartSync()
 
233
 
 
234
        s.w.Watch("test", "a", -1, s.ch)
 
235
        assertChange(c, s.ch, watcher.Change{"test", "a", revno})
 
236
        assertNoChange(c, s.ch)
 
237
 
 
238
        assertOrder(c, -1, revno)
 
239
}
 
240
 
 
241
func (s *FastPeriodSuite) TestWatchIgnoreUnwatched(c *gc.C) {
 
242
        s.w.Watch("test", "a", -1, s.ch)
 
243
        assertNoChange(c, s.ch)
 
244
 
 
245
        s.insert(c, "test", "b")
 
246
 
 
247
        s.w.StartSync()
 
248
        assertNoChange(c, s.ch)
 
249
}
 
250
 
 
251
func (s *FastPeriodSuite) TestWatchOrder(c *gc.C) {
 
252
        s.w.StartSync()
 
253
        for _, id := range []string{"a", "b", "c", "d"} {
 
254
                s.w.Watch("test", id, -1, s.ch)
 
255
        }
 
256
        revno1 := s.insert(c, "test", "a")
 
257
        revno2 := s.insert(c, "test", "b")
 
258
        revno3 := s.insert(c, "test", "c")
 
259
 
 
260
        s.w.StartSync()
 
261
        assertChange(c, s.ch, watcher.Change{"test", "a", revno1})
 
262
        assertChange(c, s.ch, watcher.Change{"test", "b", revno2})
 
263
        assertChange(c, s.ch, watcher.Change{"test", "c", revno3})
 
264
        assertNoChange(c, s.ch)
 
265
}
 
266
 
 
267
func (s *FastPeriodSuite) TestTransactionWithMultiple(c *gc.C) {
 
268
        s.w.StartSync()
 
269
        for _, id := range []string{"a", "b", "c"} {
 
270
                s.w.Watch("test", id, -1, s.ch)
 
271
        }
 
272
        revnos := s.insertAll(c, "test", "a", "b", "c")
 
273
        s.w.StartSync()
 
274
        assertChange(c, s.ch, watcher.Change{"test", "a", revnos[0]})
 
275
        assertChange(c, s.ch, watcher.Change{"test", "b", revnos[1]})
 
276
        assertChange(c, s.ch, watcher.Change{"test", "c", revnos[2]})
 
277
        assertNoChange(c, s.ch)
 
278
}
 
279
 
 
280
func (s *FastPeriodSuite) TestWatchMultipleChannels(c *gc.C) {
 
281
        ch1 := make(chan watcher.Change)
 
282
        ch2 := make(chan watcher.Change)
 
283
        ch3 := make(chan watcher.Change)
 
284
        s.w.Watch("test1", 1, -1, ch1)
 
285
        s.w.Watch("test2", 2, -1, ch2)
 
286
        s.w.Watch("test3", 3, -1, ch3)
 
287
        revno1 := s.insert(c, "test1", 1)
 
288
        revno2 := s.insert(c, "test2", 2)
 
289
        revno3 := s.insert(c, "test3", 3)
 
290
        s.w.StartSync()
 
291
        s.w.Unwatch("test2", 2, ch2)
 
292
        assertChange(c, ch1, watcher.Change{"test1", 1, revno1})
 
293
        _ = revno2
 
294
        assertChange(c, ch3, watcher.Change{"test3", 3, revno3})
 
295
        assertNoChange(c, ch1)
 
296
        assertNoChange(c, ch2)
 
297
        assertNoChange(c, ch3)
 
298
}
 
299
 
 
300
func (s *FastPeriodSuite) TestIgnoreAncientHistory(c *gc.C) {
 
301
        s.insert(c, "test", "a")
 
302
 
 
303
        w := watcher.New(s.log)
 
304
        defer w.Stop()
 
305
        w.StartSync()
 
306
 
 
307
        w.Watch("test", "a", -1, s.ch)
 
308
        assertNoChange(c, s.ch)
 
309
}
 
310
 
 
311
func (s *FastPeriodSuite) TestUpdate(c *gc.C) {
 
312
        s.w.Watch("test", "a", -1, s.ch)
 
313
        assertNoChange(c, s.ch)
 
314
 
 
315
        revno1 := s.insert(c, "test", "a")
 
316
        s.w.StartSync()
 
317
        assertChange(c, s.ch, watcher.Change{"test", "a", revno1})
 
318
        assertNoChange(c, s.ch)
 
319
 
 
320
        revno2 := s.update(c, "test", "a")
 
321
        s.w.StartSync()
 
322
        assertChange(c, s.ch, watcher.Change{"test", "a", revno2})
 
323
 
 
324
        assertOrder(c, -1, revno1, revno2)
 
325
}
 
326
 
 
327
func (s *FastPeriodSuite) TestRemove(c *gc.C) {
 
328
        s.w.Watch("test", "a", -1, s.ch)
 
329
        assertNoChange(c, s.ch)
 
330
 
 
331
        revno1 := s.insert(c, "test", "a")
 
332
        s.w.StartSync()
 
333
        assertChange(c, s.ch, watcher.Change{"test", "a", revno1})
 
334
        assertNoChange(c, s.ch)
 
335
 
 
336
        revno2 := s.remove(c, "test", "a")
 
337
        s.w.StartSync()
 
338
        assertChange(c, s.ch, watcher.Change{"test", "a", -1})
 
339
        assertNoChange(c, s.ch)
 
340
 
 
341
        revno3 := s.insert(c, "test", "a")
 
342
        s.w.StartSync()
 
343
        assertChange(c, s.ch, watcher.Change{"test", "a", revno3})
 
344
        assertNoChange(c, s.ch)
 
345
 
 
346
        assertOrder(c, revno2, revno1)
 
347
        assertOrder(c, revno2, revno3)
 
348
}
 
349
 
 
350
func (s *FastPeriodSuite) TestWatchKnownRemove(c *gc.C) {
 
351
        revno1 := s.insert(c, "test", "a")
 
352
        revno2 := s.remove(c, "test", "a")
 
353
        s.w.StartSync()
 
354
 
 
355
        s.w.Watch("test", "a", revno1, s.ch)
 
356
        assertChange(c, s.ch, watcher.Change{"test", "a", revno2})
 
357
 
 
358
        assertOrder(c, revno2, revno1)
 
359
}
 
360
 
 
361
func (s *FastPeriodSuite) TestScale(c *gc.C) {
 
362
        const N = 500
 
363
        const T = 10
 
364
 
 
365
        c.Logf("Creating %d documents, %d per transaction...", N, T)
 
366
        ops := make([]txn.Op, T)
 
367
        for i := 0; i < (N / T); i++ {
 
368
                ops = ops[:0]
 
369
                for j := 0; j < T && i*T+j < N; j++ {
 
370
                        ops = append(ops, txn.Op{C: "test", Id: i*T + j, Insert: M{"n": 1}})
 
371
                }
 
372
                err := s.runner.Run(ops, "", nil)
 
373
                c.Assert(err, jc.ErrorIsNil)
 
374
        }
 
375
 
 
376
        c.Logf("Watching all documents...")
 
377
        for i := 0; i < N; i++ {
 
378
                s.w.Watch("test", i, -1, s.ch)
 
379
        }
 
380
 
 
381
        c.Logf("Forcing a refresh...")
 
382
        s.w.StartSync()
 
383
 
 
384
        count, err := s.Session.DB("juju").C("test").Count()
 
385
        c.Assert(err, jc.ErrorIsNil)
 
386
        c.Logf("Got %d documents in the collection...", count)
 
387
        c.Assert(count, gc.Equals, N)
 
388
 
 
389
        c.Logf("Reading all changes...")
 
390
        seen := make(map[interface{}]bool)
 
391
        for i := 0; i < N; i++ {
 
392
                select {
 
393
                case change := <-s.ch:
 
394
                        seen[change.Id] = true
 
395
                case <-time.After(worstCase):
 
396
                        c.Fatalf("not enough changes: got %d, want %d", len(seen), N)
 
397
                }
 
398
        }
 
399
        c.Assert(len(seen), gc.Equals, N)
 
400
}
 
401
 
 
402
func (s *FastPeriodSuite) TestWatchUnwatchOnQueue(c *gc.C) {
 
403
        const N = 10
 
404
        for i := 0; i < N; i++ {
 
405
                s.insert(c, "test", i)
 
406
        }
 
407
        s.w.StartSync()
 
408
        for i := 0; i < N; i++ {
 
409
                s.w.Watch("test", i, -1, s.ch)
 
410
        }
 
411
        for i := 1; i < N; i += 2 {
 
412
                s.w.Unwatch("test", i, s.ch)
 
413
        }
 
414
        s.w.StartSync()
 
415
        seen := make(map[interface{}]bool)
 
416
        for i := 0; i < N/2; i++ {
 
417
                select {
 
418
                case change := <-s.ch:
 
419
                        seen[change.Id] = true
 
420
                case <-time.After(worstCase):
 
421
                        c.Fatalf("not enough changes: got %d, want %d", len(seen), N/2)
 
422
                }
 
423
        }
 
424
        c.Assert(len(seen), gc.Equals, N/2)
 
425
        assertNoChange(c, s.ch)
 
426
}
 
427
 
 
428
func (s *FastPeriodSuite) TestStartSync(c *gc.C) {
 
429
        s.w.Watch("test", "a", -1, s.ch)
 
430
 
 
431
        revno := s.insert(c, "test", "a")
 
432
 
 
433
        done := make(chan bool)
 
434
        go func() {
 
435
                s.w.StartSync()
 
436
                s.w.StartSync()
 
437
                s.w.StartSync()
 
438
                done <- true
 
439
        }()
 
440
 
 
441
        select {
 
442
        case <-done:
 
443
        case <-time.After(worstCase):
 
444
                c.Fatalf("StartSync failed to return")
 
445
        }
 
446
 
 
447
        assertChange(c, s.ch, watcher.Change{"test", "a", revno})
 
448
}
 
449
 
 
450
func (s *FastPeriodSuite) TestWatchCollection(c *gc.C) {
 
451
        chA1 := make(chan watcher.Change)
 
452
        chB1 := make(chan watcher.Change)
 
453
        chA := make(chan watcher.Change)
 
454
        chB := make(chan watcher.Change)
 
455
 
 
456
        s.w.Watch("testA", 1, -1, chA1)
 
457
        s.w.Watch("testB", 1, -1, chB1)
 
458
        s.w.WatchCollection("testA", chA)
 
459
        s.w.WatchCollection("testB", chB)
 
460
 
 
461
        revno1 := s.insert(c, "testA", 1)
 
462
        revno2 := s.insert(c, "testA", 2)
 
463
        revno3 := s.insert(c, "testB", 1)
 
464
        revno4 := s.insert(c, "testB", 2)
 
465
 
 
466
        s.w.StartSync()
 
467
 
 
468
        seen := map[chan<- watcher.Change][]watcher.Change{}
 
469
        timeout := time.After(testing.LongWait)
 
470
        n := 0
 
471
Loop1:
 
472
        for n < 6 {
 
473
                select {
 
474
                case chg := <-chA1:
 
475
                        seen[chA1] = append(seen[chA1], chg)
 
476
                case chg := <-chB1:
 
477
                        seen[chB1] = append(seen[chB1], chg)
 
478
                case chg := <-chA:
 
479
                        seen[chA] = append(seen[chA], chg)
 
480
                case chg := <-chB:
 
481
                        seen[chB] = append(seen[chB], chg)
 
482
                case <-timeout:
 
483
                        break Loop1
 
484
                }
 
485
                n++
 
486
        }
 
487
 
 
488
        c.Check(seen[chA1], gc.DeepEquals, []watcher.Change{{"testA", 1, revno1}})
 
489
        c.Check(seen[chB1], gc.DeepEquals, []watcher.Change{{"testB", 1, revno3}})
 
490
        c.Check(seen[chA], gc.DeepEquals, []watcher.Change{{"testA", 1, revno1}, {"testA", 2, revno2}})
 
491
        c.Check(seen[chB], gc.DeepEquals, []watcher.Change{{"testB", 1, revno3}, {"testB", 2, revno4}})
 
492
        if c.Failed() {
 
493
                return
 
494
        }
 
495
 
 
496
        s.w.UnwatchCollection("testB", chB)
 
497
        s.w.Unwatch("testB", 1, chB1)
 
498
 
 
499
        revno1 = s.update(c, "testA", 1)
 
500
 
 
501
        s.w.StartSync()
 
502
 
 
503
        timeout = time.After(testing.LongWait)
 
504
        seen = map[chan<- watcher.Change][]watcher.Change{}
 
505
        n = 0
 
506
Loop2:
 
507
        for n < 2 {
 
508
                select {
 
509
                case chg := <-chA1:
 
510
                        seen[chA1] = append(seen[chA1], chg)
 
511
                case chg := <-chB1:
 
512
                        seen[chB1] = append(seen[chB1], chg)
 
513
                case chg := <-chA:
 
514
                        seen[chA] = append(seen[chA], chg)
 
515
                case chg := <-chB:
 
516
                        seen[chB] = append(seen[chB], chg)
 
517
                case <-timeout:
 
518
                        break Loop2
 
519
                }
 
520
                n++
 
521
        }
 
522
        c.Check(seen[chA1], gc.DeepEquals, []watcher.Change{{"testA", 1, revno1}})
 
523
        c.Check(seen[chB1], gc.IsNil)
 
524
        c.Check(seen[chA], gc.DeepEquals, []watcher.Change{{"testA", 1, revno1}})
 
525
        c.Check(seen[chB], gc.IsNil)
 
526
 
 
527
        // Check that no extra events arrive.
 
528
        seen = map[chan<- watcher.Change][]watcher.Change{}
 
529
        timeout = time.After(testing.ShortWait)
 
530
Loop3:
 
531
        for {
 
532
                select {
 
533
                case chg := <-chA1:
 
534
                        seen[chA1] = append(seen[chA1], chg)
 
535
                case chg := <-chB1:
 
536
                        seen[chB1] = append(seen[chB1], chg)
 
537
                case chg := <-chA:
 
538
                        seen[chA] = append(seen[chA], chg)
 
539
                case chg := <-chB:
 
540
                        seen[chB] = append(seen[chB], chg)
 
541
                case <-timeout:
 
542
                        break Loop3
 
543
                }
 
544
        }
 
545
        c.Check(seen[chA1], gc.IsNil)
 
546
        c.Check(seen[chB1], gc.IsNil)
 
547
        c.Check(seen[chA], gc.IsNil)
 
548
        c.Check(seen[chB], gc.IsNil)
 
549
}
 
550
 
 
551
func (s *FastPeriodSuite) TestUnwatchCollectionWithFilter(c *gc.C) {
 
552
        filter := func(key interface{}) bool {
 
553
                id := key.(int)
 
554
                return id != 2
 
555
        }
 
556
        chA := make(chan watcher.Change)
 
557
        s.w.WatchCollectionWithFilter("testA", chA, filter)
 
558
        revnoA := s.insert(c, "testA", 1)
 
559
        assertChange(c, chA, watcher.Change{"testA", 1, revnoA})
 
560
        s.insert(c, "testA", 2)
 
561
        assertNoChange(c, chA)
 
562
        s.insert(c, "testA", 3)
 
563
        s.w.StartSync()
 
564
        assertChange(c, chA, watcher.Change{"testA", 3, revnoA})
 
565
}
 
566
 
 
567
func (s *FastPeriodSuite) TestUnwatchCollectionWithOutstandingRequest(c *gc.C) {
 
568
        chA := make(chan watcher.Change)
 
569
        s.w.WatchCollection("testA", chA)
 
570
        chB := make(chan watcher.Change)
 
571
        s.w.Watch("testB", 1, -1, chB)
 
572
        revnoA := s.insert(c, "testA", 1)
 
573
        s.insert(c, "testA", 2)
 
574
        // By inserting this *after* the testA document, we ensure that
 
575
        // the watcher will try to send on chB after sending on chA.
 
576
        // The original bug that this test guards against meant that the
 
577
        // UnwatchCollection did not correctly cancel the outstanding
 
578
        // request, so the loop would never get around to sending on
 
579
        // chB.
 
580
        revnoB := s.insert(c, "testB", 1)
 
581
        s.w.StartSync()
 
582
        // When we receive the first change on chA, we know that
 
583
        // the watcher is trying to send changes on all the
 
584
        // watcher channels (2 changes on chA and 1 change on chB).
 
585
        assertChange(c, chA, watcher.Change{"testA", 1, revnoA})
 
586
        s.w.UnwatchCollection("testA", chA)
 
587
        assertChange(c, chB, watcher.Change{"testB", 1, revnoB})
 
588
}
 
589
 
 
590
func (s *FastPeriodSuite) TestNonMutatingTxn(c *gc.C) {
 
591
        chA1 := make(chan watcher.Change)
 
592
        chA := make(chan watcher.Change)
 
593
 
 
594
        revno1 := s.insert(c, "test", "a")
 
595
 
 
596
        s.w.StartSync()
 
597
 
 
598
        s.w.Watch("test", 1, revno1, chA1)
 
599
        s.w.WatchCollection("test", chA)
 
600
 
 
601
        revno2 := s.insert(c, "test", "a")
 
602
 
 
603
        c.Assert(revno1, gc.Equals, revno2)
 
604
 
 
605
        s.w.StartSync()
 
606
 
 
607
        assertNoChange(c, chA1)
 
608
        assertNoChange(c, chA)
 
609
}
 
610
 
 
611
// SlowPeriodSuite implements tests
 
612
// that are flaky when the watcher refresh period
 
613
// is small.
 
614
type SlowPeriodSuite struct {
 
615
        watcherSuite
 
616
}
 
617
 
 
618
func (s *SlowPeriodSuite) SetUpSuite(c *gc.C) {
 
619
        s.watcherSuite.SetUpSuite(c)
 
620
        watcher.Period = slowPeriod
 
621
}
 
622
 
 
623
var _ = gc.Suite(&SlowPeriodSuite{})
 
624
 
 
625
func (s *SlowPeriodSuite) TestWatchBeforeRemoveKnown(c *gc.C) {
 
626
        revno1 := s.insert(c, "test", "a")
 
627
        s.w.StartSync()
 
628
        revno2 := s.remove(c, "test", "a")
 
629
 
 
630
        s.w.Watch("test", "a", -1, s.ch)
 
631
        assertChange(c, s.ch, watcher.Change{"test", "a", revno1})
 
632
        s.w.StartSync()
 
633
        assertChange(c, s.ch, watcher.Change{"test", "a", revno2})
 
634
 
 
635
        assertOrder(c, revno2, revno1)
 
636
}
 
637
 
 
638
func (s *SlowPeriodSuite) TestDoubleUpdate(c *gc.C) {
 
639
        assertNoChange(c, s.ch)
 
640
 
 
641
        revno1 := s.insert(c, "test", "a")
 
642
        s.w.StartSync()
 
643
 
 
644
        revno2 := s.update(c, "test", "a")
 
645
        revno3 := s.update(c, "test", "a")
 
646
 
 
647
        s.w.Watch("test", "a", revno2, s.ch)
 
648
        assertNoChange(c, s.ch)
 
649
 
 
650
        s.w.StartSync()
 
651
        assertChange(c, s.ch, watcher.Change{"test", "a", revno3})
 
652
        assertNoChange(c, s.ch)
 
653
 
 
654
        assertOrder(c, -1, revno1, revno2, revno3)
 
655
}
 
656
 
 
657
func (s *SlowPeriodSuite) TestWatchPeriod(c *gc.C) {
 
658
        revno1 := s.insert(c, "test", "a")
 
659
        s.w.StartSync()
 
660
        t0 := time.Now()
 
661
        s.w.Watch("test", "a", revno1, s.ch)
 
662
        revno2 := s.update(c, "test", "a")
 
663
 
 
664
        leeway := watcher.Period / 4
 
665
        select {
 
666
        case got := <-s.ch:
 
667
                gotPeriod := time.Since(t0)
 
668
                c.Assert(got, gc.Equals, watcher.Change{"test", "a", revno2})
 
669
                if gotPeriod < watcher.Period-leeway {
 
670
                        c.Fatalf("watcher not waiting long enough; got %v want %v", gotPeriod, watcher.Period)
 
671
                }
 
672
        case <-time.After(watcher.Period + leeway):
 
673
                gotPeriod := time.Since(t0)
 
674
                c.Fatalf("watcher waited too long; got %v want %v", gotPeriod, watcher.Period)
 
675
        }
 
676
 
 
677
        assertOrder(c, -1, revno1, revno2)
 
678
}
 
679
 
 
680
func (s *SlowPeriodSuite) TestStartSyncStartsImmediately(c *gc.C) {
 
681
        // Ensure we're at the start of a sync cycle.
 
682
        s.w.StartSync()
 
683
        time.Sleep(justLongEnough)
 
684
 
 
685
        // Watching after StartSync should see the current state of affairs.
 
686
        revno := s.insert(c, "test", "a")
 
687
        s.w.StartSync()
 
688
        s.w.Watch("test", "a", -1, s.ch)
 
689
        select {
 
690
        case got := <-s.ch:
 
691
                c.Assert(got.Revno, gc.Equals, revno)
 
692
        case <-time.After(watcher.Period / 2):
 
693
                c.Fatalf("watch after StartSync is still using old info")
 
694
        }
 
695
 
 
696
        s.remove(c, "test", "a")
 
697
        s.w.StartSync()
 
698
        ch := make(chan watcher.Change)
 
699
        s.w.Watch("test", "a", -1, ch)
 
700
        select {
 
701
        case got := <-ch:
 
702
                c.Fatalf("got event %#v when starting watcher after doc was removed", got)
 
703
        case <-time.After(justLongEnough):
 
704
        }
 
705
}