~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/utils/tailer/tailer.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2013 Canonical Ltd.
 
2
// Licensed under the LGPLv3, see LICENCE file for details.
 
3
 
 
4
package tailer
 
5
 
 
6
import (
 
7
        "bufio"
 
8
        "bytes"
 
9
        "io"
 
10
        "os"
 
11
        "time"
 
12
 
 
13
        "launchpad.net/tomb"
 
14
)
 
15
 
 
16
const (
 
17
        defaultBufferSize = 4096
 
18
        polltime          = time.Second
 
19
        delimiter         = '\n'
 
20
)
 
21
 
 
22
var (
 
23
        bufferSize = defaultBufferSize
 
24
        delimiters = []byte{delimiter}
 
25
)
 
26
 
 
27
// TailerFilterFunc decides if a line shall be tailed (func is nil or
 
28
// returns true) of shall be omitted (func returns false).
 
29
type TailerFilterFunc func(line []byte) bool
 
30
 
 
31
// Tailer reads an input line by line an tails them into the passed Writer.
 
32
// The lines have to be terminated with a newline.
 
33
type Tailer struct {
 
34
        tomb        tomb.Tomb
 
35
        readSeeker  io.ReadSeeker
 
36
        reader      *bufio.Reader
 
37
        writeCloser io.WriteCloser
 
38
        writer      *bufio.Writer
 
39
        filter      TailerFilterFunc
 
40
        polltime    time.Duration
 
41
}
 
42
 
 
43
// NewTailer starts a Tailer which reads strings from the passed
 
44
// ReadSeeker line by line. If a filter function is specified the read
 
45
// lines are filtered. The matching lines are written to the passed
 
46
// Writer.
 
47
func NewTailer(readSeeker io.ReadSeeker, writer io.Writer, filter TailerFilterFunc) *Tailer {
 
48
        return newTailer(readSeeker, writer, filter, polltime)
 
49
}
 
50
 
 
51
// newTailer starts a Tailer like NewTailer but allows the setting of
 
52
// the read buffer size and the time between pollings for testing.
 
53
func newTailer(readSeeker io.ReadSeeker, writer io.Writer,
 
54
        filter TailerFilterFunc, polltime time.Duration) *Tailer {
 
55
        t := &Tailer{
 
56
                readSeeker: readSeeker,
 
57
                reader:     bufio.NewReaderSize(readSeeker, bufferSize),
 
58
                writer:     bufio.NewWriter(writer),
 
59
                filter:     filter,
 
60
                polltime:   polltime,
 
61
        }
 
62
        go func() {
 
63
                defer t.tomb.Done()
 
64
                t.tomb.Kill(t.loop())
 
65
        }()
 
66
        return t
 
67
}
 
68
 
 
69
// Stop tells the tailer to stop working.
 
70
func (t *Tailer) Stop() error {
 
71
        t.tomb.Kill(nil)
 
72
        return t.tomb.Wait()
 
73
}
 
74
 
 
75
// Wait waits until the tailer is stopped due to command
 
76
// or an error. In case of an error it returns the reason.
 
77
func (t *Tailer) Wait() error {
 
78
        return t.tomb.Wait()
 
79
}
 
80
 
 
81
// Dead returns the channel that can be used to wait until
 
82
// the tailer is stopped.
 
83
func (t *Tailer) Dead() <-chan struct{} {
 
84
        return t.tomb.Dead()
 
85
}
 
86
 
 
87
// Err returns a possible error.
 
88
func (t *Tailer) Err() error {
 
89
        return t.tomb.Err()
 
90
}
 
91
 
 
92
// loop writes the last lines based on the buffer size to the
 
93
// writer and then polls for more data to write it to the
 
94
// writer too.
 
95
func (t *Tailer) loop() error {
 
96
        // Start polling.
 
97
        // TODO(mue) 2013-12-06
 
98
        // Handling of read-seeker/files being truncated during
 
99
        // tailing is currently missing!
 
100
        timer := time.NewTimer(0)
 
101
        for {
 
102
                select {
 
103
                case <-t.tomb.Dying():
 
104
                        return nil
 
105
                case <-timer.C:
 
106
                        for {
 
107
                                line, readErr := t.readLine()
 
108
                                _, writeErr := t.writer.Write(line)
 
109
                                if writeErr != nil {
 
110
                                        return writeErr
 
111
                                }
 
112
                                if readErr != nil {
 
113
                                        if readErr != io.EOF {
 
114
                                                return readErr
 
115
                                        }
 
116
                                        break
 
117
                                }
 
118
                        }
 
119
                        if writeErr := t.writer.Flush(); writeErr != nil {
 
120
                                return writeErr
 
121
                        }
 
122
                        timer.Reset(t.polltime)
 
123
                }
 
124
        }
 
125
}
 
126
 
 
127
// SeekLastLines sets the read position of the ReadSeeker to the
 
128
// wanted number of filtered lines before the end.
 
129
func SeekLastLines(readSeeker io.ReadSeeker, lines uint, filter TailerFilterFunc) error {
 
130
        offset, err := readSeeker.Seek(0, os.SEEK_END)
 
131
        if err != nil {
 
132
                return err
 
133
        }
 
134
        if lines == 0 {
 
135
                // We are done, just seeking to the end is sufficient.
 
136
                return nil
 
137
        }
 
138
        seekPos := int64(0)
 
139
        found := uint(0)
 
140
        buffer := make([]byte, bufferSize)
 
141
SeekLoop:
 
142
        for offset > 0 {
 
143
                // buffer contains the data left over from the
 
144
                // previous iteration.
 
145
                space := cap(buffer) - len(buffer)
 
146
                if space < bufferSize {
 
147
                        // Grow buffer.
 
148
                        newBuffer := make([]byte, len(buffer), cap(buffer)*2)
 
149
                        copy(newBuffer, buffer)
 
150
                        buffer = newBuffer
 
151
                        space = cap(buffer) - len(buffer)
 
152
                }
 
153
                if int64(space) > offset {
 
154
                        // Use exactly the right amount of space if there's
 
155
                        // only a small amount remaining.
 
156
                        space = int(offset)
 
157
                }
 
158
                // Copy data remaining from last time to the end of the buffer,
 
159
                // so we can read into the right place.
 
160
                copy(buffer[space:cap(buffer)], buffer)
 
161
                buffer = buffer[0 : len(buffer)+space]
 
162
                offset -= int64(space)
 
163
                _, err := readSeeker.Seek(offset, os.SEEK_SET)
 
164
                if err != nil {
 
165
                        return err
 
166
                }
 
167
                _, err = io.ReadFull(readSeeker, buffer[0:space])
 
168
                if err != nil {
 
169
                        return err
 
170
                }
 
171
                // Find the end of the last line in the buffer.
 
172
                // This will discard any unterminated line at the end
 
173
                // of the file.
 
174
                end := bytes.LastIndex(buffer, delimiters)
 
175
                if end == -1 {
 
176
                        // No end of line found - discard incomplete
 
177
                        // line and continue looking. If this happens
 
178
                        // at the beginning of the file, we don't care
 
179
                        // because we're going to stop anyway.
 
180
                        buffer = buffer[:0]
 
181
                        continue
 
182
                }
 
183
                end++
 
184
                for {
 
185
                        start := bytes.LastIndex(buffer[0:end-1], delimiters)
 
186
                        if start == -1 && offset >= 0 {
 
187
                                break
 
188
                        }
 
189
                        start++
 
190
                        if filter == nil || filter(buffer[start:end]) {
 
191
                                found++
 
192
                                if found >= lines {
 
193
                                        seekPos = offset + int64(start)
 
194
                                        break SeekLoop
 
195
                                }
 
196
                        }
 
197
                        end = start
 
198
                }
 
199
                // Leave the last line in buffer, as we don't know whether
 
200
                // it's complete or not.
 
201
                buffer = buffer[0:end]
 
202
        }
 
203
        // Final positioning.
 
204
        readSeeker.Seek(seekPos, os.SEEK_SET)
 
205
        return nil
 
206
}
 
207
 
 
208
// readLine reads the next valid line from the reader, even if it is
 
209
// larger than the reader buffer.
 
210
func (t *Tailer) readLine() ([]byte, error) {
 
211
        for {
 
212
                slice, err := t.reader.ReadSlice(delimiter)
 
213
                if err == nil {
 
214
                        if t.isValid(slice) {
 
215
                                return slice, nil
 
216
                        }
 
217
                        continue
 
218
                }
 
219
                line := append([]byte(nil), slice...)
 
220
                for err == bufio.ErrBufferFull {
 
221
                        slice, err = t.reader.ReadSlice(delimiter)
 
222
                        line = append(line, slice...)
 
223
                }
 
224
                switch err {
 
225
                case nil:
 
226
                        if t.isValid(line) {
 
227
                                return line, nil
 
228
                        }
 
229
                case io.EOF:
 
230
                        // EOF without delimiter, step back.
 
231
                        t.readSeeker.Seek(-int64(len(line)), os.SEEK_CUR)
 
232
                        return nil, err
 
233
                default:
 
234
                        return nil, err
 
235
                }
 
236
        }
 
237
}
 
238
 
 
239
// isValid checks if the passed line is valid by checking if the
 
240
// line has content, the filter function is nil or it returns true.
 
241
func (t *Tailer) isValid(line []byte) bool {
 
242
        if t.filter == nil {
 
243
                return true
 
244
        }
 
245
        return t.filter(line)
 
246
}