~juju-qa/ubuntu/yakkety/juju/juju-1.25.8

« back to all changes in this revision

Viewing changes to src/github.com/coreos/go-systemd/sdjournal/read.go

  • Committer: Nicholas Skaggs
  • Date: 2016-12-02 17:28:37 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161202172837-jkrbdlyjcxtrii2n
Initial commit of 1.25.6

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2015 RedHat, Inc.
 
2
// Copyright 2015 CoreOS, Inc.
 
3
//
 
4
// Licensed under the Apache License, Version 2.0 (the "License");
 
5
// you may not use this file except in compliance with the License.
 
6
// You may obtain a copy of the License at
 
7
//
 
8
//     http://www.apache.org/licenses/LICENSE-2.0
 
9
//
 
10
// Unless required by applicable law or agreed to in writing, software
 
11
// distributed under the License is distributed on an "AS IS" BASIS,
 
12
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
13
// See the License for the specific language governing permissions and
 
14
// limitations under the License.
 
15
 
 
16
package sdjournal
 
17
 
 
18
import (
 
19
        "errors"
 
20
        "fmt"
 
21
        "io"
 
22
        "log"
 
23
        "time"
 
24
)
 
25
 
 
26
var (
 
27
        ErrExpired = errors.New("Timeout expired")
 
28
)
 
29
 
 
30
// JournalReaderConfig represents options to drive the behavior of a JournalReader.
 
31
type JournalReaderConfig struct {
 
32
        // The Since and NumFromTail options are mutually exclusive and determine
 
33
        // where the reading begins within the journal.
 
34
        Since       time.Duration // start relative to a Duration from now
 
35
        NumFromTail uint64        // start relative to the tail
 
36
 
 
37
        // Show only journal entries whose fields match the supplied values. If
 
38
        // the array is empty, entries will not be filtered.
 
39
        Matches []Match
 
40
 
 
41
        // If not empty, the journal instance will point to a journal residing
 
42
        // in this directory. The supplied path may be relative or absolute.
 
43
        Path string
 
44
}
 
45
 
 
46
// JournalReader is an io.ReadCloser which provides a simple interface for iterating through the
 
47
// systemd journal.
 
48
type JournalReader struct {
 
49
        journal *Journal
 
50
}
 
51
 
 
52
// NewJournalReader creates a new JournalReader with configuration options that are similar to the
 
53
// systemd journalctl tool's iteration and filtering features.
 
54
func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) {
 
55
        r := &JournalReader{}
 
56
 
 
57
        // Open the journal
 
58
        var err error
 
59
        if config.Path != "" {
 
60
                r.journal, err = NewJournalFromDir(config.Path)
 
61
        } else {
 
62
                r.journal, err = NewJournal()
 
63
        }
 
64
        if err != nil {
 
65
                return nil, err
 
66
        }
 
67
 
 
68
        // Add any supplied matches
 
69
        for _, m := range config.Matches {
 
70
                r.journal.AddMatch(m.String())
 
71
        }
 
72
 
 
73
        // Set the start position based on options
 
74
        if config.Since != 0 {
 
75
                // Start based on a relative time
 
76
                start := time.Now().Add(config.Since)
 
77
                if err := r.journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)); err != nil {
 
78
                        return nil, err
 
79
                }
 
80
        } else if config.NumFromTail != 0 {
 
81
                // Start based on a number of lines before the tail
 
82
                if err := r.journal.SeekTail(); err != nil {
 
83
                        return nil, err
 
84
                }
 
85
 
 
86
                // Move the read pointer into position near the tail. Go one further than
 
87
                // the option so that the initial cursor advancement positions us at the
 
88
                // correct starting point.
 
89
                if _, err := r.journal.PreviousSkip(config.NumFromTail + 1); err != nil {
 
90
                        return nil, err
 
91
                }
 
92
        }
 
93
 
 
94
        return r, nil
 
95
}
 
96
 
 
97
func (r *JournalReader) Read(b []byte) (int, error) {
 
98
        var err error
 
99
        var c int
 
100
 
 
101
        // Advance the journal cursor
 
102
        c, err = r.journal.Next()
 
103
 
 
104
        // An unexpected error
 
105
        if err != nil {
 
106
                return 0, err
 
107
        }
 
108
 
 
109
        // EOF detection
 
110
        if c == 0 {
 
111
                return 0, io.EOF
 
112
        }
 
113
 
 
114
        // Build a message
 
115
        var msg string
 
116
        msg, err = r.buildMessage()
 
117
 
 
118
        if err != nil {
 
119
                return 0, err
 
120
        }
 
121
 
 
122
        // Copy and return the message
 
123
        copy(b, []byte(msg))
 
124
 
 
125
        return len(msg), nil
 
126
}
 
127
 
 
128
func (r *JournalReader) Close() error {
 
129
        return r.journal.Close()
 
130
}
 
131
 
 
132
// Follow synchronously follows the JournalReader, writing each new journal entry to writer. The
 
133
// follow will continue until a single time.Time is received on the until channel.
 
134
func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) (err error) {
 
135
 
 
136
        // Process journal entries and events. Entries are flushed until the tail or
 
137
        // timeout is reached, and then we wait for new events or the timeout.
 
138
        var msg = make([]byte, 64*1<<(10))
 
139
process:
 
140
        for {
 
141
                c, err := r.Read(msg)
 
142
                if err != nil && err != io.EOF {
 
143
                        break process
 
144
                }
 
145
 
 
146
                select {
 
147
                case <-until:
 
148
                        return ErrExpired
 
149
                default:
 
150
                        if c > 0 {
 
151
                                writer.Write(msg[:c])
 
152
                                continue process
 
153
                        }
 
154
                }
 
155
 
 
156
                // We're at the tail, so wait for new events or time out.
 
157
                // Holds journal events to process. Tightly bounded for now unless there's a
 
158
                // reason to unblock the journal watch routine more quickly.
 
159
                events := make(chan int, 1)
 
160
                pollDone := make(chan bool, 1)
 
161
                go func() {
 
162
                        for {
 
163
                                select {
 
164
                                case <-pollDone:
 
165
                                        return
 
166
                                default:
 
167
                                        events <- r.journal.Wait(time.Duration(1) * time.Second)
 
168
                                }
 
169
                        }
 
170
                }()
 
171
 
 
172
                select {
 
173
                case <-until:
 
174
                        pollDone <- true
 
175
                        return ErrExpired
 
176
                case e := <-events:
 
177
                        pollDone <- true
 
178
                        switch e {
 
179
                        case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE:
 
180
                                // TODO: need to account for any of these?
 
181
                        default:
 
182
                                log.Printf("Received unknown event: %d\n", e)
 
183
                        }
 
184
                        continue process
 
185
                }
 
186
        }
 
187
 
 
188
        return
 
189
}
 
190
 
 
191
// buildMessage returns a string representing the current journal entry in a simple format which
 
192
// includes the entry timestamp and MESSAGE field.
 
193
func (r *JournalReader) buildMessage() (string, error) {
 
194
        var msg string
 
195
        var usec uint64
 
196
        var err error
 
197
 
 
198
        if msg, err = r.journal.GetData("MESSAGE"); err != nil {
 
199
                return "", err
 
200
        }
 
201
 
 
202
        if usec, err = r.journal.GetRealtimeUsec(); err != nil {
 
203
                return "", err
 
204
        }
 
205
 
 
206
        timestamp := time.Unix(0, int64(usec)*int64(time.Microsecond))
 
207
 
 
208
        return fmt.Sprintf("%s %s\n", timestamp, msg), nil
 
209
}