~rogpeppe/juju-core/themue-058-debug-log-api

« back to all changes in this revision

Viewing changes to state/apiserver/logger/logtailer.go

  • Committer: Frank Mueller
  • Date: 2013-12-20 11:36:11 UTC
  • Revision ID: frank.mueller@canonical.com-20131220113611-0abqsnf63yx8bt0s
apiserver/logger: added logtailer and fixed tailer

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2013 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package logger
 
5
 
 
6
import (
 
7
        "bytes"
 
8
        "os"
 
9
        "time"
 
10
 
 
11
        "launchpad.net/tomb"
 
12
 
 
13
        "launchpad.net/juju-core/names"
 
14
        "launchpad.net/juju-core/utils/tailer"
 
15
)
 
16
 
 
17
// logTailer reads the combined all machines log writes the
 
18
// filtered data into a channel of string slices.
 
19
type logTailer struct {
 
20
        logFile *os.File
 
21
        writer  *channelWriter
 
22
        tailer  *tailer.Tailer
 
23
}
 
24
 
 
25
// startLogTailer opens the passed log file and tails the lines
 
26
// with the passed tag.
 
27
func startLogTailer(filename string, lines int, tag string) (*logTailer, error) {
 
28
        // Prepare filter.
 
29
        var filter tailer.TailerFilterFunc
 
30
        if tag != "" {
 
31
                _, _, err := names.ParseTag(tag, "")
 
32
                if err != nil {
 
33
                        return nil, err
 
34
                }
 
35
                prefix := []byte(tag + ":")
 
36
                filter = func(line []byte) bool {
 
37
                        return bytes.HasPrefix(line, prefix)
 
38
                }
 
39
        }
 
40
        // Prepare in- and output.
 
41
        logFile, err := os.Open(filename)
 
42
        if err != nil {
 
43
                return nil, err
 
44
        }
 
45
        lt := &logTailer{
 
46
                logFile: logFile,
 
47
                writer:  startChannelWriter(),
 
48
        }
 
49
        lt.tailer = tailer.NewTailer(lt.logFile, lt.writer, lines, filter)
 
50
        return lt, nil
 
51
}
 
52
 
 
53
// Changes returns the changes delivered by the writer.
 
54
func (lt *logTailer) Changes() <-chan []string {
 
55
        return lt.writer.Changes()
 
56
}
 
57
 
 
58
// Stop terminates the log tailer by closing in- and output
 
59
// after stopping the tailer.
 
60
func (lt *logTailer) Stop() error {
 
61
        defer lt.writer.Close()
 
62
        defer lt.logFile.Close()
 
63
        return lt.tailer.Stop()
 
64
}
 
65
 
 
66
// Err implements the Errer interface.
 
67
func (lt *logTailer) Err() error {
 
68
        return lt.tailer.Err()
 
69
}
 
70
 
 
71
// channelWriter implements an io.Writer which passes the written
 
72
// data into a channel of string slices.
 
73
type channelWriter struct {
 
74
        tomb tomb.Tomb
 
75
        in   chan []byte
 
76
        out  chan []string
 
77
}
 
78
 
 
79
// startChannelWriter creates a Writer converting its data into slices
 
80
// of strings, each terminated by newlines.
 
81
func startChannelWriter() *channelWriter {
 
82
        cw := &channelWriter{
 
83
                in:  make(chan []byte),
 
84
                out: make(chan []string),
 
85
        }
 
86
        go func() {
 
87
                defer cw.tomb.Done()
 
88
                defer close(cw.out)
 
89
                cw.tomb.Kill(cw.loop())
 
90
        }()
 
91
        return cw
 
92
}
 
93
 
 
94
func (cw *channelWriter) Write(p []byte) (int, error) {
 
95
        cw.in <- p
 
96
        return len(p), nil
 
97
}
 
98
 
 
99
func (cw *channelWriter) Changes() <-chan []string {
 
100
        return cw.out
 
101
}
 
102
 
 
103
func (cw *channelWriter) Close() error {
 
104
        cw.tomb.Kill(nil)
 
105
        return cw.tomb.Wait()
 
106
}
 
107
 
 
108
func (cw *channelWriter) loop() error {
 
109
        lines := []string(nil)
 
110
        buffer := []byte(nil)
 
111
        timer := time.NewTimer(10 * time.Millisecond)
 
112
        defer timer.Stop()
 
113
        send := func() {
 
114
                select {
 
115
                case cw.out <- lines:
 
116
                        lines = nil
 
117
                default:
 
118
                }
 
119
        }
 
120
        for {
 
121
                select {
 
122
                case <-cw.tomb.Dying():
 
123
                        return nil
 
124
                case block := <-cw.in:
 
125
                        // Extract lines and send them.
 
126
                        lines, buffer = cw.split(lines, buffer, block)
 
127
                        if lines != nil {
 
128
                                send()
 
129
                        }
 
130
                case <-timer.C:
 
131
                        // Send collected lines and reset timer.
 
132
                        if lines != nil {
 
133
                                send()
 
134
                        }
 
135
                        timer.Reset(10 * time.Millisecond)
 
136
                }
 
137
        }
 
138
}
 
139
 
 
140
func (cw *channelWriter) split(lines []string, buffer, block []byte) ([]string, []byte) {
 
141
        if len(block) == 0 {
 
142
                return lines, buffer
 
143
        }
 
144
        work := append(buffer, block...)
 
145
        start := 0
 
146
        for {
 
147
                i := bytes.IndexByte(work[start:], '\n')
 
148
                if i < 0 {
 
149
                        // Block terminates without a newline.
 
150
                        return lines, work[start:]
 
151
                }
 
152
                lines = append(lines, string(work[start:start+i+1]))
 
153
                start += i + 1
 
154
                if start >= len(work) {
 
155
                        // Block terminates with a newline.
 
156
                        return lines, nil
 
157
                }
 
158
        }
 
159
}