~rogpeppe/juju-core/themue-058-debug-log-api

« back to all changes in this revision

Viewing changes to utils/tailer/tailer_test.go

  • Committer: Tarmac
  • Author(s): Frank Mueller
  • Date: 2013-12-13 13:40:11 UTC
  • mfrom: (2116.3.8 juju-core)
  • Revision ID: tarmac-20131213134011-2mp5zu6n392ibvpw
[r=themue] utils: added Tailer for tailing of logs in API

The Tailer is the initial component of the debug logging command
of the API. It allows the filtered tailing of any ReaderSeeker.
If no filter is passed all lines will be written in the passed
Writer, otherwise only those where the filter function returns
true. The initial number of lines can also be specified, the
filter already works here. So if a File (which is a ReaderSeeker)
containes 100 lines, 10 lines are wanted and 5 match to the
filter only those 5 lines are returned.

https://codereview.appspot.com/36540043/

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2013 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package tailer_test
 
5
 
 
6
import (
 
7
        "bufio"
 
8
        "bytes"
 
9
        "fmt"
 
10
        "io"
 
11
        "sync"
 
12
        stdtesting "testing"
 
13
        "time"
 
14
 
 
15
        gc "launchpad.net/gocheck"
 
16
 
 
17
        "launchpad.net/juju-core/testing"
 
18
        "launchpad.net/juju-core/utils/tailer"
 
19
)
 
20
 
 
21
func Test(t *stdtesting.T) {
 
22
        gc.TestingT(t)
 
23
}
 
24
 
 
25
type tailerSuite struct{}
 
26
 
 
27
var _ = gc.Suite(tailerSuite{})
 
28
 
 
29
var alphabetData = []string{
 
30
        "alpha alpha\n",
 
31
        "bravo bravo\n",
 
32
        "charlie charlie\n",
 
33
        "delta delta\n",
 
34
        "echo echo\n",
 
35
        "foxtrott foxtrott\n",
 
36
        "golf golf\n",
 
37
        "hotel hotel\n",
 
38
        "india india\n",
 
39
        "juliet juliet\n",
 
40
        "kilo kilo\n",
 
41
        "lima lima\n",
 
42
        "mike mike\n",
 
43
        "november november\n",
 
44
        "oscar oscar\n",
 
45
        "papa papa\n",
 
46
        "quebec quebec\n",
 
47
        "romeo romeo\n",
 
48
        "sierra sierra\n",
 
49
        "tango tango\n",
 
50
        "uniform uniform\n",
 
51
        "victor victor\n",
 
52
        "whiskey whiskey\n",
 
53
        "x-ray x-ray\n",
 
54
        "yankee yankee\n",
 
55
        "zulu zulu\n",
 
56
}
 
57
 
 
58
var tests = []struct {
 
59
        description           string
 
60
        data                  []string
 
61
        initialLinesWritten   int
 
62
        initialLinesRequested int
 
63
        bufferSize            int
 
64
        filter                tailer.TailerFilterFunc
 
65
        injector              func(*tailer.Tailer, *readSeeker) func([]string)
 
66
        initialCollectedData  []string
 
67
        appendedCollectedData []string
 
68
        err                   string
 
69
}{{
 
70
        description: "lines are longer than buffer size",
 
71
        data: []string{
 
72
                "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
 
73
                "0123456789012345678901234567890123456789012345678901\n",
 
74
        },
 
75
        initialLinesWritten:   1,
 
76
        initialLinesRequested: 1,
 
77
        bufferSize:            5,
 
78
        initialCollectedData: []string{
 
79
                "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
 
80
        },
 
81
        appendedCollectedData: []string{
 
82
                "0123456789012345678901234567890123456789012345678901\n",
 
83
        },
 
84
}, {
 
85
        description: "lines are longer than buffer size, missing termination of last line",
 
86
        data: []string{
 
87
                "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
 
88
                "0123456789012345678901234567890123456789012345678901\n",
 
89
                "the quick brown fox ",
 
90
        },
 
91
        initialLinesWritten:   1,
 
92
        initialLinesRequested: 1,
 
93
        bufferSize:            5,
 
94
        initialCollectedData: []string{
 
95
                "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
 
96
        },
 
97
        appendedCollectedData: []string{
 
98
                "0123456789012345678901234567890123456789012345678901\n",
 
99
        },
 
100
}, {
 
101
        description: "lines are longer than buffer size, last line is terminated later",
 
102
        data: []string{
 
103
                "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
 
104
                "0123456789012345678901234567890123456789012345678901\n",
 
105
                "the quick brown fox ",
 
106
                "jumps over the lazy dog\n",
 
107
        },
 
108
        initialLinesWritten:   1,
 
109
        initialLinesRequested: 1,
 
110
        bufferSize:            5,
 
111
        initialCollectedData: []string{
 
112
                "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
 
113
        },
 
114
        appendedCollectedData: []string{
 
115
                "0123456789012345678901234567890123456789012345678901\n",
 
116
                "the quick brown fox jumps over the lazy dog\n",
 
117
        },
 
118
}, {
 
119
        description: "missing termination of last line",
 
120
        data: []string{
 
121
                "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
 
122
                "0123456789012345678901234567890123456789012345678901\n",
 
123
                "the quick brown fox ",
 
124
        },
 
125
        initialLinesWritten:   1,
 
126
        initialLinesRequested: 1,
 
127
        initialCollectedData: []string{
 
128
                "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
 
129
        },
 
130
        appendedCollectedData: []string{
 
131
                "0123456789012345678901234567890123456789012345678901\n",
 
132
        },
 
133
}, {
 
134
        description: "last line is terminated later",
 
135
        data: []string{
 
136
                "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
 
137
                "0123456789012345678901234567890123456789012345678901\n",
 
138
                "the quick brown fox ",
 
139
                "jumps over the lazy dog\n",
 
140
        },
 
141
        initialLinesWritten:   1,
 
142
        initialLinesRequested: 1,
 
143
        initialCollectedData: []string{
 
144
                "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
 
145
        },
 
146
        appendedCollectedData: []string{
 
147
                "0123456789012345678901234567890123456789012345678901\n",
 
148
                "the quick brown fox jumps over the lazy dog\n",
 
149
        },
 
150
}, {
 
151
        description:           "more lines already written than initially requested",
 
152
        data:                  alphabetData,
 
153
        initialLinesWritten:   5,
 
154
        initialLinesRequested: 3,
 
155
        initialCollectedData: []string{
 
156
                "charlie charlie\n",
 
157
                "delta delta\n",
 
158
                "echo echo\n",
 
159
        },
 
160
        appendedCollectedData: alphabetData[5:],
 
161
}, {
 
162
        description:           "less lines already written than initially requested",
 
163
        data:                  alphabetData,
 
164
        initialLinesWritten:   3,
 
165
        initialLinesRequested: 5,
 
166
        initialCollectedData: []string{
 
167
                "alpha alpha\n",
 
168
                "bravo bravo\n",
 
169
                "charlie charlie\n",
 
170
        },
 
171
        appendedCollectedData: alphabetData[3:],
 
172
}, {
 
173
        description:           "lines are longer than buffer size, more lines already written than initially requested",
 
174
        data:                  alphabetData,
 
175
        initialLinesWritten:   5,
 
176
        initialLinesRequested: 3,
 
177
        bufferSize:            5,
 
178
        initialCollectedData: []string{
 
179
                "charlie charlie\n",
 
180
                "delta delta\n",
 
181
                "echo echo\n",
 
182
        },
 
183
        appendedCollectedData: alphabetData[5:],
 
184
}, {
 
185
        description:           "lines are longer than buffer size, less lines already written than initially requested",
 
186
        data:                  alphabetData,
 
187
        initialLinesWritten:   3,
 
188
        initialLinesRequested: 5,
 
189
        bufferSize:            5,
 
190
        initialCollectedData: []string{
 
191
                "alpha alpha\n",
 
192
                "bravo bravo\n",
 
193
                "charlie charlie\n",
 
194
        },
 
195
        appendedCollectedData: alphabetData[3:],
 
196
}, {
 
197
        description:           "filter lines which contain the char 'e'",
 
198
        data:                  alphabetData,
 
199
        initialLinesWritten:   10,
 
200
        initialLinesRequested: 3,
 
201
        filter: func(line []byte) bool {
 
202
                return bytes.Contains(line, []byte{'e'})
 
203
        },
 
204
        initialCollectedData: []string{
 
205
                "echo echo\n",
 
206
                "hotel hotel\n",
 
207
                "juliet juliet\n",
 
208
        },
 
209
        appendedCollectedData: []string{
 
210
                "mike mike\n",
 
211
                "november november\n",
 
212
                "quebec quebec\n",
 
213
                "romeo romeo\n",
 
214
                "sierra sierra\n",
 
215
                "whiskey whiskey\n",
 
216
                "yankee yankee\n",
 
217
        },
 
218
}, {
 
219
        description:           "stop tailing after 10 collected lines",
 
220
        data:                  alphabetData,
 
221
        initialLinesWritten:   5,
 
222
        initialLinesRequested: 3,
 
223
        injector: func(t *tailer.Tailer, rs *readSeeker) func([]string) {
 
224
                return func(lines []string) {
 
225
                        if len(lines) == 10 {
 
226
                                t.Stop()
 
227
                        }
 
228
                }
 
229
        },
 
230
        initialCollectedData: []string{
 
231
                "charlie charlie\n",
 
232
                "delta delta\n",
 
233
                "echo echo\n",
 
234
        },
 
235
        appendedCollectedData: alphabetData[5:],
 
236
}, {
 
237
        description:           "generate an error after 10 collected lines",
 
238
        data:                  alphabetData,
 
239
        initialLinesWritten:   5,
 
240
        initialLinesRequested: 3,
 
241
        injector: func(t *tailer.Tailer, rs *readSeeker) func([]string) {
 
242
                return func(lines []string) {
 
243
                        if len(lines) == 10 {
 
244
                                rs.setError(fmt.Errorf("ouch after 10 lines"))
 
245
                        }
 
246
                }
 
247
        },
 
248
        initialCollectedData: []string{
 
249
                "charlie charlie\n",
 
250
                "delta delta\n",
 
251
                "echo echo\n",
 
252
        },
 
253
        appendedCollectedData: alphabetData[5:],
 
254
        err: "ouch after 10 lines",
 
255
}, {
 
256
        description: "more lines already written than initially requested, some empty, unfiltered",
 
257
        data: []string{
 
258
                "one one\n",
 
259
                "two two\n",
 
260
                "\n",
 
261
                "\n",
 
262
                "three three\n",
 
263
                "four four\n",
 
264
                "\n",
 
265
                "\n",
 
266
                "five five\n",
 
267
                "six six\n",
 
268
        },
 
269
        initialLinesWritten:   3,
 
270
        initialLinesRequested: 2,
 
271
        initialCollectedData: []string{
 
272
                "two two\n",
 
273
                "\n",
 
274
        },
 
275
        appendedCollectedData: []string{
 
276
                "\n",
 
277
                "three three\n",
 
278
                "four four\n",
 
279
                "\n",
 
280
                "\n",
 
281
                "five five\n",
 
282
                "six six\n",
 
283
        },
 
284
}, {
 
285
        description: "more lines already written than initially requested, some empty, those filtered",
 
286
        data: []string{
 
287
                "one one\n",
 
288
                "two two\n",
 
289
                "\n",
 
290
                "\n",
 
291
                "three three\n",
 
292
                "four four\n",
 
293
                "\n",
 
294
                "\n",
 
295
                "five five\n",
 
296
                "six six\n",
 
297
        },
 
298
        initialLinesWritten:   3,
 
299
        initialLinesRequested: 2,
 
300
        filter: func(line []byte) bool {
 
301
                return len(bytes.TrimSpace(line)) > 0
 
302
        },
 
303
        initialCollectedData: []string{
 
304
                "one one\n",
 
305
                "two two\n",
 
306
        },
 
307
        appendedCollectedData: []string{
 
308
                "three three\n",
 
309
                "four four\n",
 
310
                "five five\n",
 
311
                "six six\n",
 
312
        },
 
313
}}
 
314
 
 
315
func (tailerSuite) TestTailer(c *gc.C) {
 
316
        for i, test := range tests {
 
317
                c.Logf("Test #%d) %s", i, test.description)
 
318
                bufferSize := test.bufferSize
 
319
                if bufferSize == 0 {
 
320
                        // Default value.
 
321
                        bufferSize = 4096
 
322
                }
 
323
                reader, writer := io.Pipe()
 
324
                sigc := make(chan struct{}, 1)
 
325
                rs := startReadSeeker(c, test.data, test.initialLinesWritten, sigc)
 
326
                tailer := tailer.NewTestTailer(rs, writer, test.initialLinesRequested, test.filter, bufferSize, 2*time.Millisecond)
 
327
                linec := startReading(c, tailer, reader, writer)
 
328
 
 
329
                // Collect initial data.
 
330
                assertCollected(c, linec, test.initialCollectedData, nil)
 
331
 
 
332
                sigc <- struct{}{}
 
333
 
 
334
                // Collect remaining data, possibly with injection to stop
 
335
                // earlier or generate an error.
 
336
                var injection func([]string)
 
337
                if test.injector != nil {
 
338
                        injection = test.injector(tailer, rs)
 
339
                }
 
340
 
 
341
                assertCollected(c, linec, test.appendedCollectedData, injection)
 
342
 
 
343
                if test.err == "" {
 
344
                        c.Assert(tailer.Stop(), gc.IsNil)
 
345
                } else {
 
346
                        c.Assert(tailer.Err(), gc.ErrorMatches, test.err)
 
347
                }
 
348
        }
 
349
}
 
350
 
 
351
// startReading starts a goroutine receiving the lines out of the reader
 
352
// in the background and passing them to a created string channel. This
 
353
// will used in the assertions.
 
354
func startReading(c *gc.C, tailer *tailer.Tailer, reader *io.PipeReader, writer *io.PipeWriter) chan string {
 
355
        linec := make(chan string)
 
356
        // Start goroutine for reading.
 
357
        go func() {
 
358
                defer close(linec)
 
359
                reader := bufio.NewReader(reader)
 
360
                for {
 
361
                        line, err := reader.ReadString('\n')
 
362
                        switch err {
 
363
                        case nil:
 
364
                                linec <- line
 
365
                        case io.EOF:
 
366
                                return
 
367
                        default:
 
368
                                c.Fail()
 
369
                        }
 
370
                }
 
371
        }()
 
372
        // Close writer when tailer is stopped or has an error. Tailer using
 
373
        // components can do it the same way.
 
374
        go func() {
 
375
                tailer.Wait()
 
376
                writer.Close()
 
377
        }()
 
378
        return linec
 
379
}
 
380
 
 
381
// assertCollected reads lines from the string channel linec. It compares if
 
382
// those are the one passed with compare until a timeout. If the timeout is
 
383
// reached earlier than all lines are collected the assertion fails. The
 
384
// injection function allows to interrupt the processing with a function
 
385
// generating an error or a regular stopping during the tailing. In case the
 
386
// linec is closed due to stopping or an error only the values so far care
 
387
// compared. Checking the reason for termination is done in the test.
 
388
func assertCollected(c *gc.C, linec chan string, compare []string, injection func([]string)) {
 
389
        timeout := time.After(testing.LongWait)
 
390
        lines := []string{}
 
391
        for {
 
392
                select {
 
393
                case line, ok := <-linec:
 
394
                        if ok {
 
395
                                lines = append(lines, line)
 
396
                                if injection != nil {
 
397
                                        injection(lines)
 
398
                                }
 
399
                                if len(lines) == len(compare) {
 
400
                                        // All data received.
 
401
                                        c.Assert(lines, gc.DeepEquals, compare)
 
402
                                        return
 
403
                                }
 
404
                        } else {
 
405
                                // linec closed after stopping or error.
 
406
                                c.Assert(lines, gc.DeepEquals, compare[:len(lines)])
 
407
                                return
 
408
                        }
 
409
                case <-timeout:
 
410
                        if injection == nil {
 
411
                                c.Fatalf("timeout during tailer collection")
 
412
                        }
 
413
                        return
 
414
                }
 
415
        }
 
416
}
 
417
 
 
418
// startReadSeeker returns a ReadSeeker for the Tailer. It simulates
 
419
// reading and seeking inside a file and also simulating an error.
 
420
// The goroutine waits for a signal that it can start writing the
 
421
// appended lines.
 
422
func startReadSeeker(c *gc.C, data []string, initialLeg int, sigc chan struct{}) *readSeeker {
 
423
        // Write initial lines into the buffer.
 
424
        var rs readSeeker
 
425
        var i int
 
426
        for i = 0; i < initialLeg; i++ {
 
427
                rs.write(data[i])
 
428
        }
 
429
 
 
430
        go func() {
 
431
                <-sigc
 
432
 
 
433
                for ; i < len(data); i++ {
 
434
                        time.Sleep(5 * time.Millisecond)
 
435
                        rs.write(data[i])
 
436
                }
 
437
        }()
 
438
        return &rs
 
439
}
 
440
 
 
441
type readSeeker struct {
 
442
        mux    sync.Mutex
 
443
        buffer []byte
 
444
        pos    int
 
445
        err    error
 
446
}
 
447
 
 
448
func (r *readSeeker) write(s string) {
 
449
        r.mux.Lock()
 
450
        defer r.mux.Unlock()
 
451
        r.buffer = append(r.buffer, []byte(s)...)
 
452
}
 
453
 
 
454
func (r *readSeeker) setError(err error) {
 
455
        r.mux.Lock()
 
456
        defer r.mux.Unlock()
 
457
        r.err = err
 
458
}
 
459
 
 
460
func (r *readSeeker) Read(p []byte) (n int, err error) {
 
461
        r.mux.Lock()
 
462
        defer r.mux.Unlock()
 
463
        if r.err != nil {
 
464
                return 0, r.err
 
465
        }
 
466
        if r.pos >= len(r.buffer) {
 
467
                return 0, io.EOF
 
468
        }
 
469
        n = copy(p, r.buffer[r.pos:])
 
470
        r.pos += n
 
471
        return n, nil
 
472
}
 
473
 
 
474
func (r *readSeeker) Seek(offset int64, whence int) (ret int64, err error) {
 
475
        r.mux.Lock()
 
476
        defer r.mux.Unlock()
 
477
        var newPos int64
 
478
        switch whence {
 
479
        case 0:
 
480
                newPos = offset
 
481
        case 1:
 
482
                newPos = int64(r.pos) + offset
 
483
        case 2:
 
484
                newPos = int64(len(r.buffer)) + offset
 
485
        default:
 
486
                return 0, fmt.Errorf("invalid whence: %d", whence)
 
487
        }
 
488
        if newPos < 0 {
 
489
                return 0, fmt.Errorf("negative position: %d", newPos)
 
490
        }
 
491
        if newPos >= 1<<31 {
 
492
                return 0, fmt.Errorf("position out of range: %d", newPos)
 
493
        }
 
494
        r.pos = int(newPos)
 
495
        return newPos, nil
 
496
}