1
// Copyright 2015 RedHat, Inc.
2
// Copyright 2015 CoreOS, Inc.
4
// Licensed under the Apache License, Version 2.0 (the "License");
5
// you may not use this file except in compliance with the License.
6
// You may obtain a copy of the License at
8
// http://www.apache.org/licenses/LICENSE-2.0
10
// Unless required by applicable law or agreed to in writing, software
11
// distributed under the License is distributed on an "AS IS" BASIS,
12
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
// See the License for the specific language governing permissions and
14
// limitations under the License.
27
ErrExpired = errors.New("Timeout expired")
30
// JournalReaderConfig represents options to drive the behavior of a JournalReader.
31
type JournalReaderConfig struct {
32
// The Since and NumFromTail options are mutually exclusive and determine
33
// where the reading begins within the journal.
34
Since time.Duration // start relative to a Duration from now
35
NumFromTail uint64 // start relative to the tail
37
// Show only journal entries whose fields match the supplied values. If
38
// the array is empty, entries will not be filtered.
41
// If not empty, the journal instance will point to a journal residing
42
// in this directory. The supplied path may be relative or absolute.
46
// JournalReader is an io.ReadCloser which provides a simple interface for iterating through the
48
type JournalReader struct {
52
// NewJournalReader creates a new JournalReader with configuration options that are similar to the
53
// systemd journalctl tool's iteration and filtering features.
54
func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) {
59
if config.Path != "" {
60
r.journal, err = NewJournalFromDir(config.Path)
62
r.journal, err = NewJournal()
68
// Add any supplied matches
69
for _, m := range config.Matches {
70
r.journal.AddMatch(m.String())
73
// Set the start position based on options
74
if config.Since != 0 {
75
// Start based on a relative time
76
start := time.Now().Add(config.Since)
77
if err := r.journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)); err != nil {
80
} else if config.NumFromTail != 0 {
81
// Start based on a number of lines before the tail
82
if err := r.journal.SeekTail(); err != nil {
86
// Move the read pointer into position near the tail. Go one further than
87
// the option so that the initial cursor advancement positions us at the
88
// correct starting point.
89
if _, err := r.journal.PreviousSkip(config.NumFromTail + 1); err != nil {
97
func (r *JournalReader) Read(b []byte) (int, error) {
101
// Advance the journal cursor
102
c, err = r.journal.Next()
104
// An unexpected error
116
msg, err = r.buildMessage()
122
// Copy and return the message
128
func (r *JournalReader) Close() error {
129
return r.journal.Close()
132
// Follow synchronously follows the JournalReader, writing each new journal entry to writer. The
133
// follow will continue until a single time.Time is received on the until channel.
134
func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) (err error) {
136
// Process journal entries and events. Entries are flushed until the tail or
137
// timeout is reached, and then we wait for new events or the timeout.
138
var msg = make([]byte, 64*1<<(10))
141
c, err := r.Read(msg)
142
if err != nil && err != io.EOF {
151
writer.Write(msg[:c])
156
// We're at the tail, so wait for new events or time out.
157
// Holds journal events to process. Tightly bounded for now unless there's a
158
// reason to unblock the journal watch routine more quickly.
159
events := make(chan int, 1)
160
pollDone := make(chan bool, 1)
167
events <- r.journal.Wait(time.Duration(1) * time.Second)
179
case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE:
180
// TODO: need to account for any of these?
182
log.Printf("Received unknown event: %d\n", e)
191
// buildMessage returns a string representing the current journal entry in a simple format which
192
// includes the entry timestamp and MESSAGE field.
193
func (r *JournalReader) buildMessage() (string, error) {
198
if msg, err = r.journal.GetData("MESSAGE"); err != nil {
202
if usec, err = r.journal.GetRealtimeUsec(); err != nil {
206
timestamp := time.Unix(0, int64(usec)*int64(time.Microsecond))
208
return fmt.Sprintf("%s %s\n", timestamp, msg), nil