~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/mongo/oplog_test.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2015 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package mongo_test
 
5
 
 
6
import (
 
7
        "errors"
 
8
        "reflect"
 
9
        "time"
 
10
 
 
11
        jujutesting "github.com/juju/testing"
 
12
        jc "github.com/juju/testing/checkers"
 
13
        gc "gopkg.in/check.v1"
 
14
        "gopkg.in/mgo.v2"
 
15
        "gopkg.in/mgo.v2/bson"
 
16
 
 
17
        "github.com/juju/juju/mongo"
 
18
        coretesting "github.com/juju/juju/testing"
 
19
        "github.com/juju/juju/worker/peergrouper"
 
20
)
 
21
 
 
22
type oplogSuite struct {
 
23
        coretesting.BaseSuite
 
24
}
 
25
 
 
26
var _ = gc.Suite(&oplogSuite{})
 
27
 
 
28
func (s *oplogSuite) TestWithRealOplog(c *gc.C) {
 
29
        _, session := s.startMongoWithReplicaset(c)
 
30
 
 
31
        // Watch for oplog entries for the "bar" collection in the "foo"
 
32
        // DB.
 
33
        oplog := mongo.GetOplog(session)
 
34
        tailer := mongo.NewOplogTailer(
 
35
                mongo.NewOplogSession(
 
36
                        oplog,
 
37
                        bson.D{{"ns", "foo.bar"}},
 
38
                ),
 
39
                time.Now().Add(-time.Minute),
 
40
        )
 
41
        defer tailer.Stop()
 
42
 
 
43
        assertOplog := func(expectedOp string, expectedObj, expectedUpdate bson.D) {
 
44
                doc := s.getNextOplog(c, tailer)
 
45
                c.Assert(doc.Operation, gc.Equals, expectedOp)
 
46
 
 
47
                var actualObj bson.D
 
48
                err := doc.UnmarshalObject(&actualObj)
 
49
                c.Assert(err, jc.ErrorIsNil)
 
50
                c.Assert(actualObj, jc.DeepEquals, expectedObj)
 
51
 
 
52
                var actualUpdate bson.D
 
53
                err = doc.UnmarshalUpdate(&actualUpdate)
 
54
                c.Assert(err, jc.ErrorIsNil)
 
55
                c.Assert(actualUpdate, jc.DeepEquals, expectedUpdate)
 
56
        }
 
57
 
 
58
        // Insert into foo.bar and see that the oplog entry is reported.
 
59
        db := session.DB("foo")
 
60
        coll := db.C("bar")
 
61
        s.insertDoc(c, session, coll, bson.M{"_id": "thing"})
 
62
        assertOplog("i", bson.D{{"_id", "thing"}}, nil)
 
63
 
 
64
        // Update foo.bar and see the update reported.
 
65
        err := coll.UpdateId("thing", bson.M{"$set": bson.M{"blah": 42}})
 
66
        c.Assert(err, jc.ErrorIsNil)
 
67
        assertOplog("u", bson.D{{"$set", bson.D{{"blah", 42}}}}, bson.D{{"_id", "thing"}})
 
68
 
 
69
        // Insert into another collection (shouldn't be reported due to filter).
 
70
        s.insertDoc(c, session, db.C("elsewhere"), bson.M{"_id": "boo"})
 
71
        s.assertNoOplog(c, tailer)
 
72
}
 
73
 
 
74
func (s *oplogSuite) TestHonoursInitialTs(c *gc.C) {
 
75
        _, session := s.startMongo(c)
 
76
 
 
77
        t := time.Now()
 
78
 
 
79
        oplog := s.makeFakeOplog(c, session)
 
80
        for offset := -1; offset <= 1; offset++ {
 
81
                tDoc := t.Add(time.Duration(offset) * time.Second)
 
82
                s.insertDoc(c, session, oplog,
 
83
                        &mongo.OplogDoc{Timestamp: mongo.NewMongoTimestamp(tDoc)},
 
84
                )
 
85
        }
 
86
 
 
87
        tailer := mongo.NewOplogTailer(mongo.NewOplogSession(oplog, nil), t)
 
88
        defer tailer.Stop()
 
89
 
 
90
        for offset := 0; offset <= 1; offset++ {
 
91
                doc := s.getNextOplog(c, tailer)
 
92
                tExpected := t.Add(time.Duration(offset) * time.Second)
 
93
                c.Assert(doc.Timestamp, gc.Equals, mongo.NewMongoTimestamp(tExpected))
 
94
        }
 
95
}
 
96
 
 
97
func (s *oplogSuite) TestStops(c *gc.C) {
 
98
        _, session := s.startMongo(c)
 
99
 
 
100
        oplog := s.makeFakeOplog(c, session)
 
101
        tailer := mongo.NewOplogTailer(mongo.NewOplogSession(oplog, nil), time.Time{})
 
102
        defer tailer.Stop()
 
103
 
 
104
        s.insertDoc(c, session, oplog, &mongo.OplogDoc{Timestamp: 1})
 
105
        s.getNextOplog(c, tailer)
 
106
 
 
107
        err := tailer.Stop()
 
108
        c.Assert(err, jc.ErrorIsNil)
 
109
 
 
110
        s.assertStopped(c, tailer)
 
111
        c.Assert(tailer.Err(), jc.ErrorIsNil)
 
112
}
 
113
 
 
114
func (s *oplogSuite) TestRestartsOnErrCursor(c *gc.C) {
 
115
        session := newFakeSession(
 
116
                // First iterator terminates with an ErrCursor
 
117
                newFakeIterator(mgo.ErrCursor, &mongo.OplogDoc{Timestamp: 1, OperationId: 99}),
 
118
                newFakeIterator(nil, &mongo.OplogDoc{Timestamp: 2, OperationId: 42}),
 
119
        )
 
120
        tailer := mongo.NewOplogTailer(session, time.Time{})
 
121
        defer tailer.Stop()
 
122
 
 
123
        // First, ensure that the tailer is seeing oplog rows and handles
 
124
        // the ErrCursor that occurs at the end.
 
125
        doc := s.getNextOplog(c, tailer)
 
126
        c.Check(doc.Timestamp, gc.Equals, bson.MongoTimestamp(1))
 
127
        session.checkLastArgs(c, mongo.NewMongoTimestamp(time.Time{}), nil)
 
128
 
 
129
        // Ensure that the tailer continues after getting a new iterator.
 
130
        doc = s.getNextOplog(c, tailer)
 
131
        c.Check(doc.Timestamp, gc.Equals, bson.MongoTimestamp(2))
 
132
        session.checkLastArgs(c, bson.MongoTimestamp(1), []int64{99})
 
133
}
 
134
 
 
135
func (s *oplogSuite) TestNoRepeatsAfterIterRestart(c *gc.C) {
 
136
        // A bunch of documents with the same timestamp but different ids.
 
137
        // These will be split across 2 iterators.
 
138
        docs := make([]*mongo.OplogDoc, 11)
 
139
        for i := 0; i < 10; i++ {
 
140
                id := int64(i + 10)
 
141
                docs[i] = &mongo.OplogDoc{
 
142
                        Timestamp:   1,
 
143
                        OperationId: id,
 
144
                }
 
145
        }
 
146
        // Add one more with a different timestamp.
 
147
        docs[10] = &mongo.OplogDoc{
 
148
                Timestamp:   2,
 
149
                OperationId: 42,
 
150
        }
 
151
        session := newFakeSession(
 
152
                // First block of documents, all time 1
 
153
                newFakeIterator(nil, docs[:5]...),
 
154
                // Second block, some time 1, one time 2
 
155
                newFakeIterator(nil, docs[5:]...),
 
156
        )
 
157
        tailer := mongo.NewOplogTailer(session, time.Time{})
 
158
        defer tailer.Stop()
 
159
 
 
160
        for id := int64(10); id < 15; id++ {
 
161
                doc := s.getNextOplog(c, tailer)
 
162
                c.Assert(doc.Timestamp, gc.Equals, bson.MongoTimestamp(1))
 
163
                c.Assert(doc.OperationId, gc.Equals, id)
 
164
        }
 
165
 
 
166
        // Check the query doesn't exclude any in the first request.
 
167
        session.checkLastArgs(c, mongo.NewMongoTimestamp(time.Time{}), nil)
 
168
 
 
169
        // The OplogTailer will fall off the end of the iterator and get a new one.
 
170
 
 
171
        // Ensure that only previously unreported entries are now reported.
 
172
        for id := int64(15); id < 20; id++ {
 
173
                doc := s.getNextOplog(c, tailer)
 
174
                c.Assert(doc.Timestamp, gc.Equals, bson.MongoTimestamp(1))
 
175
                c.Assert(doc.OperationId, gc.Equals, id)
 
176
        }
 
177
 
 
178
        // Check we got the next block correctly
 
179
        session.checkLastArgs(c, bson.MongoTimestamp(1), []int64{10, 11, 12, 13, 14})
 
180
 
 
181
        doc := s.getNextOplog(c, tailer)
 
182
        c.Assert(doc.Timestamp, gc.Equals, bson.MongoTimestamp(2))
 
183
        c.Assert(doc.OperationId, gc.Equals, int64(42))
 
184
}
 
185
 
 
186
func (s *oplogSuite) TestDiesOnFatalError(c *gc.C) {
 
187
        expectedErr := errors.New("oh no, the collection went away!")
 
188
        session := newFakeSession(
 
189
                newFakeIterator(expectedErr, &mongo.OplogDoc{Timestamp: 1}),
 
190
        )
 
191
 
 
192
        tailer := mongo.NewOplogTailer(session, time.Time{})
 
193
        defer tailer.Stop()
 
194
 
 
195
        doc := s.getNextOplog(c, tailer)
 
196
        c.Assert(doc.Timestamp, gc.Equals, bson.MongoTimestamp(1))
 
197
        s.assertStopped(c, tailer)
 
198
        c.Assert(tailer.Err(), gc.Equals, expectedErr)
 
199
}
 
200
 
 
201
func (s *oplogSuite) TestNewMongoTimestamp(c *gc.C) {
 
202
        t := time.Date(2015, 6, 24, 12, 47, 0, 0, time.FixedZone("somewhere", 5*3600))
 
203
 
 
204
        expected := bson.MongoTimestamp(6163845091342417920)
 
205
        c.Assert(mongo.NewMongoTimestamp(t), gc.Equals, expected)
 
206
        c.Assert(mongo.NewMongoTimestamp(t.In(time.UTC)), gc.Equals, expected)
 
207
}
 
208
 
 
209
func (s *oplogSuite) TestNewMongoTimestampBeforeUnixEpoch(c *gc.C) {
 
210
        c.Assert(mongo.NewMongoTimestamp(time.Time{}), gc.Equals, bson.MongoTimestamp(0))
 
211
}
 
212
 
 
213
func (s *oplogSuite) startMongoWithReplicaset(c *gc.C) (*jujutesting.MgoInstance, *mgo.Session) {
 
214
        inst := &jujutesting.MgoInstance{
 
215
                Params: []string{
 
216
                        "--replSet", "juju",
 
217
                },
 
218
        }
 
219
        err := inst.Start(nil)
 
220
        c.Assert(err, jc.ErrorIsNil)
 
221
        s.AddCleanup(func(*gc.C) { inst.Destroy() })
 
222
 
 
223
        // Initiate replicaset.
 
224
        info := inst.DialInfo()
 
225
        args := peergrouper.InitiateMongoParams{
 
226
                DialInfo:       info,
 
227
                MemberHostPort: inst.Addr(),
 
228
        }
 
229
        err = peergrouper.InitiateMongoServer(args)
 
230
        c.Assert(err, jc.ErrorIsNil)
 
231
 
 
232
        return inst, s.dialMongo(c, inst)
 
233
}
 
234
 
 
235
func (s *oplogSuite) startMongo(c *gc.C) (*jujutesting.MgoInstance, *mgo.Session) {
 
236
        var inst jujutesting.MgoInstance
 
237
        err := inst.Start(nil)
 
238
        c.Assert(err, jc.ErrorIsNil)
 
239
        s.AddCleanup(func(*gc.C) { inst.Destroy() })
 
240
        return &inst, s.dialMongo(c, &inst)
 
241
}
 
242
 
 
243
func (s *oplogSuite) emptyCapped(c *gc.C, coll *mgo.Collection) {
 
244
        // Call the emptycapped (test) command on a capped
 
245
        // collection. This invalidates any cursors on the collection.
 
246
        err := coll.Database.Run(bson.D{{"emptycapped", coll.Name}}, nil)
 
247
        c.Assert(err, jc.ErrorIsNil)
 
248
}
 
249
 
 
250
func (s *oplogSuite) dialMongo(c *gc.C, inst *jujutesting.MgoInstance) *mgo.Session {
 
251
        session, err := inst.Dial()
 
252
        c.Assert(err, jc.ErrorIsNil)
 
253
        s.AddCleanup(func(*gc.C) { session.Close() })
 
254
        return session
 
255
}
 
256
 
 
257
func (s *oplogSuite) makeFakeOplog(c *gc.C, session *mgo.Session) *mgo.Collection {
 
258
        db := session.DB("foo")
 
259
        oplog := db.C("oplog.fake")
 
260
        err := oplog.Create(&mgo.CollectionInfo{
 
261
                Capped:   true,
 
262
                MaxBytes: 1024 * 1024,
 
263
        })
 
264
        c.Assert(err, jc.ErrorIsNil)
 
265
        return oplog
 
266
}
 
267
 
 
268
func (s *oplogSuite) insertDoc(c *gc.C, srcSession *mgo.Session, coll *mgo.Collection, doc interface{}) {
 
269
        session := srcSession.Copy()
 
270
        defer session.Close()
 
271
        err := coll.With(session).Insert(doc)
 
272
        c.Assert(err, jc.ErrorIsNil)
 
273
}
 
274
 
 
275
func (s *oplogSuite) getNextOplog(c *gc.C, tailer *mongo.OplogTailer) *mongo.OplogDoc {
 
276
        select {
 
277
        case doc, ok := <-tailer.Out():
 
278
                if !ok {
 
279
                        c.Fatalf("tailer unexpectedly died: %v", tailer.Err())
 
280
                }
 
281
                return doc
 
282
        case <-time.After(coretesting.LongWait):
 
283
                c.Fatal("timed out waiting for oplog doc")
 
284
        }
 
285
        return nil
 
286
}
 
287
 
 
288
func (s *oplogSuite) assertNoOplog(c *gc.C, tailer *mongo.OplogTailer) {
 
289
        select {
 
290
        case _, ok := <-tailer.Out():
 
291
                if !ok {
 
292
                        c.Fatalf("tailer unexpectedly died: %v", tailer.Err())
 
293
                }
 
294
                c.Fatal("unexpected oplog activity reported")
 
295
        case <-time.After(coretesting.ShortWait):
 
296
                // Success
 
297
        }
 
298
}
 
299
 
 
300
func (s *oplogSuite) assertStopped(c *gc.C, tailer *mongo.OplogTailer) {
 
301
        // Output should close.
 
302
        select {
 
303
        case _, ok := <-tailer.Out():
 
304
                c.Assert(ok, jc.IsFalse)
 
305
        case <-time.After(coretesting.LongWait):
 
306
                c.Fatal("tailer output should have closed")
 
307
        }
 
308
 
 
309
        // OplogTailer should die.
 
310
        select {
 
311
        case <-tailer.Dying():
 
312
                // Success.
 
313
        case <-time.After(coretesting.LongWait):
 
314
                c.Fatal("tailer should have died")
 
315
        }
 
316
}
 
317
 
 
318
type fakeIterator struct {
 
319
        docs    []*mongo.OplogDoc
 
320
        pos     int
 
321
        err     error
 
322
        timeout bool
 
323
}
 
324
 
 
325
func (i *fakeIterator) Next(result interface{}) bool {
 
326
        if i.pos >= len(i.docs) {
 
327
                return false
 
328
        }
 
329
        target := reflect.ValueOf(result).Elem()
 
330
        target.Set(reflect.ValueOf(*i.docs[i.pos]))
 
331
        i.pos++
 
332
        return true
 
333
}
 
334
 
 
335
func (i *fakeIterator) Err() error {
 
336
        if i.pos < len(i.docs) {
 
337
                return nil
 
338
        }
 
339
        return i.err
 
340
}
 
341
 
 
342
func (i *fakeIterator) Timeout() bool {
 
343
        if i.pos < len(i.docs) {
 
344
                return false
 
345
        }
 
346
        return i.timeout
 
347
}
 
348
 
 
349
func newFakeIterator(err error, docs ...*mongo.OplogDoc) *fakeIterator {
 
350
        return &fakeIterator{docs: docs, err: err}
 
351
}
 
352
 
 
353
type iterArgs struct {
 
354
        timestamp  bson.MongoTimestamp
 
355
        excludeIds []int64
 
356
}
 
357
 
 
358
type fakeSession struct {
 
359
        iterators []*fakeIterator
 
360
        pos       int
 
361
        args      chan iterArgs
 
362
}
 
363
 
 
364
var timeoutIterator = fakeIterator{timeout: true}
 
365
 
 
366
func (s *fakeSession) NewIter(ts bson.MongoTimestamp, ids []int64) mongo.OplogIterator {
 
367
        if s.pos >= len(s.iterators) {
 
368
                // We've run out of results - at this point the calls to get
 
369
                // more data would just keep timing out.
 
370
                return &timeoutIterator
 
371
        }
 
372
        select {
 
373
        case <-time.After(coretesting.LongWait):
 
374
                panic("took too long to save args")
 
375
        case s.args <- iterArgs{ts, ids}:
 
376
        }
 
377
        result := s.iterators[s.pos]
 
378
        s.pos++
 
379
        return result
 
380
}
 
381
 
 
382
func (s *fakeSession) Close() {}
 
383
 
 
384
func (s *fakeSession) checkLastArgs(c *gc.C, ts bson.MongoTimestamp, ids []int64) {
 
385
        select {
 
386
        case <-time.After(coretesting.LongWait):
 
387
                c.Logf("timeout getting iter args - test problem")
 
388
                c.FailNow()
 
389
        case res := <-s.args:
 
390
                c.Check(res.timestamp, gc.Equals, ts)
 
391
                c.Check(res.excludeIds, gc.DeepEquals, ids)
 
392
        }
 
393
}
 
394
 
 
395
func newFakeSession(iterators ...*fakeIterator) *fakeSession {
 
396
        return &fakeSession{
 
397
                iterators: iterators,
 
398
                args:      make(chan iterArgs, 5),
 
399
        }
 
400
}