1
// Copyright 2015 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
11
jujutesting "github.com/juju/testing"
12
jc "github.com/juju/testing/checkers"
13
gc "gopkg.in/check.v1"
15
"gopkg.in/mgo.v2/bson"
17
"github.com/juju/juju/mongo"
18
coretesting "github.com/juju/juju/testing"
19
"github.com/juju/juju/worker/peergrouper"
22
type oplogSuite struct {
26
var _ = gc.Suite(&oplogSuite{})
28
func (s *oplogSuite) TestWithRealOplog(c *gc.C) {
29
_, session := s.startMongoWithReplicaset(c)
31
// Watch for oplog entries for the "bar" collection in the "foo"
33
oplog := mongo.GetOplog(session)
34
tailer := mongo.NewOplogTailer(
35
mongo.NewOplogSession(
37
bson.D{{"ns", "foo.bar"}},
39
time.Now().Add(-time.Minute),
43
assertOplog := func(expectedOp string, expectedObj, expectedUpdate bson.D) {
44
doc := s.getNextOplog(c, tailer)
45
c.Assert(doc.Operation, gc.Equals, expectedOp)
48
err := doc.UnmarshalObject(&actualObj)
49
c.Assert(err, jc.ErrorIsNil)
50
c.Assert(actualObj, jc.DeepEquals, expectedObj)
52
var actualUpdate bson.D
53
err = doc.UnmarshalUpdate(&actualUpdate)
54
c.Assert(err, jc.ErrorIsNil)
55
c.Assert(actualUpdate, jc.DeepEquals, expectedUpdate)
58
// Insert into foo.bar and see that the oplog entry is reported.
59
db := session.DB("foo")
61
s.insertDoc(c, session, coll, bson.M{"_id": "thing"})
62
assertOplog("i", bson.D{{"_id", "thing"}}, nil)
64
// Update foo.bar and see the update reported.
65
err := coll.UpdateId("thing", bson.M{"$set": bson.M{"blah": 42}})
66
c.Assert(err, jc.ErrorIsNil)
67
assertOplog("u", bson.D{{"$set", bson.D{{"blah", 42}}}}, bson.D{{"_id", "thing"}})
69
// Insert into another collection (shouldn't be reported due to filter).
70
s.insertDoc(c, session, db.C("elsewhere"), bson.M{"_id": "boo"})
71
s.assertNoOplog(c, tailer)
74
func (s *oplogSuite) TestHonoursInitialTs(c *gc.C) {
75
_, session := s.startMongo(c)
79
oplog := s.makeFakeOplog(c, session)
80
for offset := -1; offset <= 1; offset++ {
81
tDoc := t.Add(time.Duration(offset) * time.Second)
82
s.insertDoc(c, session, oplog,
83
&mongo.OplogDoc{Timestamp: mongo.NewMongoTimestamp(tDoc)},
87
tailer := mongo.NewOplogTailer(mongo.NewOplogSession(oplog, nil), t)
90
for offset := 0; offset <= 1; offset++ {
91
doc := s.getNextOplog(c, tailer)
92
tExpected := t.Add(time.Duration(offset) * time.Second)
93
c.Assert(doc.Timestamp, gc.Equals, mongo.NewMongoTimestamp(tExpected))
97
func (s *oplogSuite) TestStops(c *gc.C) {
98
_, session := s.startMongo(c)
100
oplog := s.makeFakeOplog(c, session)
101
tailer := mongo.NewOplogTailer(mongo.NewOplogSession(oplog, nil), time.Time{})
104
s.insertDoc(c, session, oplog, &mongo.OplogDoc{Timestamp: 1})
105
s.getNextOplog(c, tailer)
108
c.Assert(err, jc.ErrorIsNil)
110
s.assertStopped(c, tailer)
111
c.Assert(tailer.Err(), jc.ErrorIsNil)
114
func (s *oplogSuite) TestRestartsOnErrCursor(c *gc.C) {
115
session := newFakeSession(
116
// First iterator terminates with an ErrCursor
117
newFakeIterator(mgo.ErrCursor, &mongo.OplogDoc{Timestamp: 1, OperationId: 99}),
118
newFakeIterator(nil, &mongo.OplogDoc{Timestamp: 2, OperationId: 42}),
120
tailer := mongo.NewOplogTailer(session, time.Time{})
123
// First, ensure that the tailer is seeing oplog rows and handles
124
// the ErrCursor that occurs at the end.
125
doc := s.getNextOplog(c, tailer)
126
c.Check(doc.Timestamp, gc.Equals, bson.MongoTimestamp(1))
127
session.checkLastArgs(c, mongo.NewMongoTimestamp(time.Time{}), nil)
129
// Ensure that the tailer continues after getting a new iterator.
130
doc = s.getNextOplog(c, tailer)
131
c.Check(doc.Timestamp, gc.Equals, bson.MongoTimestamp(2))
132
session.checkLastArgs(c, bson.MongoTimestamp(1), []int64{99})
135
func (s *oplogSuite) TestNoRepeatsAfterIterRestart(c *gc.C) {
136
// A bunch of documents with the same timestamp but different ids.
137
// These will be split across 2 iterators.
138
docs := make([]*mongo.OplogDoc, 11)
139
for i := 0; i < 10; i++ {
141
docs[i] = &mongo.OplogDoc{
146
// Add one more with a different timestamp.
147
docs[10] = &mongo.OplogDoc{
151
session := newFakeSession(
152
// First block of documents, all time 1
153
newFakeIterator(nil, docs[:5]...),
154
// Second block, some time 1, one time 2
155
newFakeIterator(nil, docs[5:]...),
157
tailer := mongo.NewOplogTailer(session, time.Time{})
160
for id := int64(10); id < 15; id++ {
161
doc := s.getNextOplog(c, tailer)
162
c.Assert(doc.Timestamp, gc.Equals, bson.MongoTimestamp(1))
163
c.Assert(doc.OperationId, gc.Equals, id)
166
// Check the query doesn't exclude any in the first request.
167
session.checkLastArgs(c, mongo.NewMongoTimestamp(time.Time{}), nil)
169
// The OplogTailer will fall off the end of the iterator and get a new one.
171
// Ensure that only previously unreported entries are now reported.
172
for id := int64(15); id < 20; id++ {
173
doc := s.getNextOplog(c, tailer)
174
c.Assert(doc.Timestamp, gc.Equals, bson.MongoTimestamp(1))
175
c.Assert(doc.OperationId, gc.Equals, id)
178
// Check we got the next block correctly
179
session.checkLastArgs(c, bson.MongoTimestamp(1), []int64{10, 11, 12, 13, 14})
181
doc := s.getNextOplog(c, tailer)
182
c.Assert(doc.Timestamp, gc.Equals, bson.MongoTimestamp(2))
183
c.Assert(doc.OperationId, gc.Equals, int64(42))
186
func (s *oplogSuite) TestDiesOnFatalError(c *gc.C) {
187
expectedErr := errors.New("oh no, the collection went away!")
188
session := newFakeSession(
189
newFakeIterator(expectedErr, &mongo.OplogDoc{Timestamp: 1}),
192
tailer := mongo.NewOplogTailer(session, time.Time{})
195
doc := s.getNextOplog(c, tailer)
196
c.Assert(doc.Timestamp, gc.Equals, bson.MongoTimestamp(1))
197
s.assertStopped(c, tailer)
198
c.Assert(tailer.Err(), gc.Equals, expectedErr)
201
func (s *oplogSuite) TestNewMongoTimestamp(c *gc.C) {
202
t := time.Date(2015, 6, 24, 12, 47, 0, 0, time.FixedZone("somewhere", 5*3600))
204
expected := bson.MongoTimestamp(6163845091342417920)
205
c.Assert(mongo.NewMongoTimestamp(t), gc.Equals, expected)
206
c.Assert(mongo.NewMongoTimestamp(t.In(time.UTC)), gc.Equals, expected)
209
func (s *oplogSuite) TestNewMongoTimestampBeforeUnixEpoch(c *gc.C) {
210
c.Assert(mongo.NewMongoTimestamp(time.Time{}), gc.Equals, bson.MongoTimestamp(0))
213
func (s *oplogSuite) startMongoWithReplicaset(c *gc.C) (*jujutesting.MgoInstance, *mgo.Session) {
214
inst := &jujutesting.MgoInstance{
219
err := inst.Start(nil)
220
c.Assert(err, jc.ErrorIsNil)
221
s.AddCleanup(func(*gc.C) { inst.Destroy() })
223
// Initiate replicaset.
224
info := inst.DialInfo()
225
args := peergrouper.InitiateMongoParams{
227
MemberHostPort: inst.Addr(),
229
err = peergrouper.InitiateMongoServer(args)
230
c.Assert(err, jc.ErrorIsNil)
232
return inst, s.dialMongo(c, inst)
235
func (s *oplogSuite) startMongo(c *gc.C) (*jujutesting.MgoInstance, *mgo.Session) {
236
var inst jujutesting.MgoInstance
237
err := inst.Start(nil)
238
c.Assert(err, jc.ErrorIsNil)
239
s.AddCleanup(func(*gc.C) { inst.Destroy() })
240
return &inst, s.dialMongo(c, &inst)
243
func (s *oplogSuite) emptyCapped(c *gc.C, coll *mgo.Collection) {
244
// Call the emptycapped (test) command on a capped
245
// collection. This invalidates any cursors on the collection.
246
err := coll.Database.Run(bson.D{{"emptycapped", coll.Name}}, nil)
247
c.Assert(err, jc.ErrorIsNil)
250
func (s *oplogSuite) dialMongo(c *gc.C, inst *jujutesting.MgoInstance) *mgo.Session {
251
session, err := inst.Dial()
252
c.Assert(err, jc.ErrorIsNil)
253
s.AddCleanup(func(*gc.C) { session.Close() })
257
func (s *oplogSuite) makeFakeOplog(c *gc.C, session *mgo.Session) *mgo.Collection {
258
db := session.DB("foo")
259
oplog := db.C("oplog.fake")
260
err := oplog.Create(&mgo.CollectionInfo{
262
MaxBytes: 1024 * 1024,
264
c.Assert(err, jc.ErrorIsNil)
268
func (s *oplogSuite) insertDoc(c *gc.C, srcSession *mgo.Session, coll *mgo.Collection, doc interface{}) {
269
session := srcSession.Copy()
270
defer session.Close()
271
err := coll.With(session).Insert(doc)
272
c.Assert(err, jc.ErrorIsNil)
275
func (s *oplogSuite) getNextOplog(c *gc.C, tailer *mongo.OplogTailer) *mongo.OplogDoc {
277
case doc, ok := <-tailer.Out():
279
c.Fatalf("tailer unexpectedly died: %v", tailer.Err())
282
case <-time.After(coretesting.LongWait):
283
c.Fatal("timed out waiting for oplog doc")
288
func (s *oplogSuite) assertNoOplog(c *gc.C, tailer *mongo.OplogTailer) {
290
case _, ok := <-tailer.Out():
292
c.Fatalf("tailer unexpectedly died: %v", tailer.Err())
294
c.Fatal("unexpected oplog activity reported")
295
case <-time.After(coretesting.ShortWait):
300
func (s *oplogSuite) assertStopped(c *gc.C, tailer *mongo.OplogTailer) {
301
// Output should close.
303
case _, ok := <-tailer.Out():
304
c.Assert(ok, jc.IsFalse)
305
case <-time.After(coretesting.LongWait):
306
c.Fatal("tailer output should have closed")
309
// OplogTailer should die.
311
case <-tailer.Dying():
313
case <-time.After(coretesting.LongWait):
314
c.Fatal("tailer should have died")
318
type fakeIterator struct {
319
docs []*mongo.OplogDoc
325
func (i *fakeIterator) Next(result interface{}) bool {
326
if i.pos >= len(i.docs) {
329
target := reflect.ValueOf(result).Elem()
330
target.Set(reflect.ValueOf(*i.docs[i.pos]))
335
func (i *fakeIterator) Err() error {
336
if i.pos < len(i.docs) {
342
func (i *fakeIterator) Timeout() bool {
343
if i.pos < len(i.docs) {
349
func newFakeIterator(err error, docs ...*mongo.OplogDoc) *fakeIterator {
350
return &fakeIterator{docs: docs, err: err}
353
type iterArgs struct {
354
timestamp bson.MongoTimestamp
358
type fakeSession struct {
359
iterators []*fakeIterator
364
var timeoutIterator = fakeIterator{timeout: true}
366
func (s *fakeSession) NewIter(ts bson.MongoTimestamp, ids []int64) mongo.OplogIterator {
367
if s.pos >= len(s.iterators) {
368
// We've run out of results - at this point the calls to get
369
// more data would just keep timing out.
370
return &timeoutIterator
373
case <-time.After(coretesting.LongWait):
374
panic("took too long to save args")
375
case s.args <- iterArgs{ts, ids}:
377
result := s.iterators[s.pos]
382
func (s *fakeSession) Close() {}
384
func (s *fakeSession) checkLastArgs(c *gc.C, ts bson.MongoTimestamp, ids []int64) {
386
case <-time.After(coretesting.LongWait):
387
c.Logf("timeout getting iter args - test problem")
389
case res := <-s.args:
390
c.Check(res.timestamp, gc.Equals, ts)
391
c.Check(res.excludeIds, gc.DeepEquals, ids)
395
func newFakeSession(iterators ...*fakeIterator) *fakeSession {
397
iterators: iterators,
398
args: make(chan iterArgs, 5),