1
// Copyright 2013 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
13
"launchpad.net/juju-core/names"
14
"launchpad.net/juju-core/utils/tailer"
17
// logTailer reads the combined all machines log writes the
18
// filtered data into a channel of string slices.
19
type logTailer struct {
25
// startLogTailer opens the passed log file and tails the lines
26
// with the passed tag.
27
func startLogTailer(filename string, lines int, tag string) (*logTailer, error) {
29
var filter tailer.TailerFilterFunc
31
_, _, err := names.ParseTag(tag, "")
35
prefix := []byte(tag + ":")
36
filter = func(line []byte) bool {
37
return bytes.HasPrefix(line, prefix)
40
// Prepare in- and output.
41
logFile, err := os.Open(filename)
47
writer: startChannelWriter(),
49
lt.tailer = tailer.NewTailer(lt.logFile, lt.writer, lines, filter)
53
// Changes returns the changes delivered by the writer.
54
func (lt *logTailer) Changes() <-chan []string {
55
return lt.writer.Changes()
58
// Stop terminates the log tailer by closing in- and output
59
// after stopping the tailer.
60
func (lt *logTailer) Stop() error {
61
defer lt.writer.Close()
62
defer lt.logFile.Close()
63
return lt.tailer.Stop()
66
// Err implements the Errer interface.
67
func (lt *logTailer) Err() error {
68
return lt.tailer.Err()
71
// channelWriter implements an io.Writer which passes the written
72
// data into a channel of string slices.
73
type channelWriter struct {
79
// startChannelWriter creates a Writer converting its data into slices
80
// of strings, each terminated by newlines.
81
func startChannelWriter() *channelWriter {
83
in: make(chan []byte),
84
out: make(chan []string),
89
cw.tomb.Kill(cw.loop())
94
func (cw *channelWriter) Write(p []byte) (int, error) {
99
func (cw *channelWriter) Changes() <-chan []string {
103
func (cw *channelWriter) Close() error {
105
return cw.tomb.Wait()
108
func (cw *channelWriter) loop() error {
109
lines := []string(nil)
110
buffer := []byte(nil)
111
timer := time.NewTimer(10 * time.Millisecond)
115
case cw.out <- lines:
122
case <-cw.tomb.Dying():
124
case block := <-cw.in:
125
// Extract lines and send them.
126
lines, buffer = cw.split(lines, buffer, block)
131
// Send collected lines and reset timer.
135
timer.Reset(10 * time.Millisecond)
140
func (cw *channelWriter) split(lines []string, buffer, block []byte) ([]string, []byte) {
144
work := append(buffer, block...)
147
i := bytes.IndexByte(work[start:], '\n')
149
// Block terminates without a newline.
150
return lines, work[start:]
152
lines = append(lines, string(work[start:start+i+1]))
154
if start >= len(work) {
155
// Block terminates with a newline.