1
// Copyright 2013 Canonical Ltd.
2
// Licensed under the LGPLv3, see LICENCE file for details.
17
defaultBufferSize = 4096
18
polltime = time.Second
23
bufferSize = defaultBufferSize
24
delimiters = []byte{delimiter}
27
// TailerFilterFunc decides if a line shall be tailed (func is nil or
28
// returns true) of shall be omitted (func returns false).
29
type TailerFilterFunc func(line []byte) bool
31
// Tailer reads an input line by line an tails them into the passed Writer.
32
// The lines have to be terminated with a newline.
35
readSeeker io.ReadSeeker
37
writeCloser io.WriteCloser
39
filter TailerFilterFunc
40
polltime time.Duration
43
// NewTailer starts a Tailer which reads strings from the passed
44
// ReadSeeker line by line. If a filter function is specified the read
45
// lines are filtered. The matching lines are written to the passed
47
func NewTailer(readSeeker io.ReadSeeker, writer io.Writer, filter TailerFilterFunc) *Tailer {
48
return newTailer(readSeeker, writer, filter, polltime)
51
// newTailer starts a Tailer like NewTailer but allows the setting of
52
// the read buffer size and the time between pollings for testing.
53
func newTailer(readSeeker io.ReadSeeker, writer io.Writer,
54
filter TailerFilterFunc, polltime time.Duration) *Tailer {
56
readSeeker: readSeeker,
57
reader: bufio.NewReaderSize(readSeeker, bufferSize),
58
writer: bufio.NewWriter(writer),
69
// Stop tells the tailer to stop working.
70
func (t *Tailer) Stop() error {
75
// Wait waits until the tailer is stopped due to command
76
// or an error. In case of an error it returns the reason.
77
func (t *Tailer) Wait() error {
81
// Dead returns the channel that can be used to wait until
82
// the tailer is stopped.
83
func (t *Tailer) Dead() <-chan struct{} {
87
// Err returns a possible error.
88
func (t *Tailer) Err() error {
92
// loop writes the last lines based on the buffer size to the
93
// writer and then polls for more data to write it to the
95
func (t *Tailer) loop() error {
97
// TODO(mue) 2013-12-06
98
// Handling of read-seeker/files being truncated during
99
// tailing is currently missing!
100
timer := time.NewTimer(0)
103
case <-t.tomb.Dying():
107
line, readErr := t.readLine()
108
_, writeErr := t.writer.Write(line)
113
if readErr != io.EOF {
119
if writeErr := t.writer.Flush(); writeErr != nil {
122
timer.Reset(t.polltime)
127
// SeekLastLines sets the read position of the ReadSeeker to the
128
// wanted number of filtered lines before the end.
129
func SeekLastLines(readSeeker io.ReadSeeker, lines uint, filter TailerFilterFunc) error {
130
offset, err := readSeeker.Seek(0, os.SEEK_END)
135
// We are done, just seeking to the end is sufficient.
140
buffer := make([]byte, bufferSize)
143
// buffer contains the data left over from the
144
// previous iteration.
145
space := cap(buffer) - len(buffer)
146
if space < bufferSize {
148
newBuffer := make([]byte, len(buffer), cap(buffer)*2)
149
copy(newBuffer, buffer)
151
space = cap(buffer) - len(buffer)
153
if int64(space) > offset {
154
// Use exactly the right amount of space if there's
155
// only a small amount remaining.
158
// Copy data remaining from last time to the end of the buffer,
159
// so we can read into the right place.
160
copy(buffer[space:cap(buffer)], buffer)
161
buffer = buffer[0 : len(buffer)+space]
162
offset -= int64(space)
163
_, err := readSeeker.Seek(offset, os.SEEK_SET)
167
_, err = io.ReadFull(readSeeker, buffer[0:space])
171
// Find the end of the last line in the buffer.
172
// This will discard any unterminated line at the end
174
end := bytes.LastIndex(buffer, delimiters)
176
// No end of line found - discard incomplete
177
// line and continue looking. If this happens
178
// at the beginning of the file, we don't care
179
// because we're going to stop anyway.
185
start := bytes.LastIndex(buffer[0:end-1], delimiters)
186
if start == -1 && offset >= 0 {
190
if filter == nil || filter(buffer[start:end]) {
193
seekPos = offset + int64(start)
199
// Leave the last line in buffer, as we don't know whether
200
// it's complete or not.
201
buffer = buffer[0:end]
203
// Final positioning.
204
readSeeker.Seek(seekPos, os.SEEK_SET)
208
// readLine reads the next valid line from the reader, even if it is
209
// larger than the reader buffer.
210
func (t *Tailer) readLine() ([]byte, error) {
212
slice, err := t.reader.ReadSlice(delimiter)
214
if t.isValid(slice) {
219
line := append([]byte(nil), slice...)
220
for err == bufio.ErrBufferFull {
221
slice, err = t.reader.ReadSlice(delimiter)
222
line = append(line, slice...)
230
// EOF without delimiter, step back.
231
t.readSeeker.Seek(-int64(len(line)), os.SEEK_CUR)
239
// isValid checks if the passed line is valid by checking if the
240
// line has content, the filter function is nil or it returns true.
241
func (t *Tailer) isValid(line []byte) bool {
245
return t.filter(line)