1
// Copyright 2015 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
4
// Low-level functionality for interacting with the logs collection
5
// and tailing logs from the replication oplog.
15
"github.com/dustin/go-humanize"
16
"github.com/juju/errors"
17
"github.com/juju/loggo"
18
"github.com/juju/utils/deque"
19
"github.com/juju/utils/set"
20
"github.com/juju/version"
21
"gopkg.in/juju/names.v2"
23
"gopkg.in/mgo.v2/bson"
26
"github.com/juju/juju/mongo"
29
// TODO(wallyworld) - lp:1602508 - collections need to be defined in collections.go
33
forwardedC = "forwarded"
36
// ErrNeverForwarded signals to the caller that the ID of a
37
// previously forwarded log record could not be found.
38
var ErrNeverForwarded = errors.Errorf("cannot find ID of the last forwarded record")
40
// MongoSessioner supports creating new mongo sessions.
41
type MongoSessioner interface {
42
// MongoSession creates a new Mongo session.
43
MongoSession() *mgo.Session
46
// ModelSessioner supports creating new mongo sessions for the controller.
47
type ControllerSessioner interface {
50
// IsController indicates if current state is controller.
54
// ModelSessioner supports creating new mongo sessions for a model.
55
type ModelSessioner interface {
58
// ModelUUID returns the ID of the current model.
62
// InitDbLogs sets up the indexes for the logs collection. It should
63
// be called as state is opened. It is idempotent.
64
func InitDbLogs(session *mgo.Session) error {
65
logsColl := session.DB(logsDB).C(logsC)
66
for _, key := range [][]string{{"e", "t"}, {"e", "n"}} {
67
err := logsColl.EnsureIndex(mgo.Index{Key: key})
69
return errors.Annotate(err, "cannot create index for logs collection")
75
// lastSentDoc captures timestamp of the last log record forwarded
77
type lastSentDoc struct {
78
// ID is the unique ID mongo will use for the doc.
79
ID string `bson:"_id"`
81
// ModelUUID identifies the model for which the identified record
83
ModelUUID string `bson:"model-uuid"`
85
// Sink identifies the log forwarding target to which the identified
86
// log record was sent.
87
Sink string `bson:"sink"`
89
// TODO(ericsnow) Solve the problems with using the timestamp
92
// RecordTimestamp identifies the last record sent to the log sink
93
// for the model. It is used to look up log records in the DB.
95
// Currently we use the record's timestamp (unix nano UTC), which
96
// has a risk of collisions. Log record timestamps have nanosecond
97
// precision. The likelihood of multiple log records having the
98
// same timestamp is small, though it increases with the size and
99
// activity of the model.
101
// Using the timestamp also has the problem that a faulty clock may
102
// introduce records with a timestamp earlier that the "last sent"
103
// value. Such records would never get forwarded.
105
// The solution to both these issues will likely involve using an
106
// int sequence for the ID rather than the timestamp. That sequence
107
// would be shared by all models.
108
RecordTimestamp int64 `bson:"record-timestamp"`
110
// RecordID is the ID of the last record sent to the log sink.
111
// We record it but currently just use the timestamp when querying
112
// the log collection.
113
RecordID int64 `bson:"record-id"`
116
// LastSentLogTracker records and retrieves timestamps of the most recent
117
// log records forwarded to a log sink for a model.
118
type LastSentLogTracker struct {
125
// NewLastSentLogTracker returns a new tracker that records and retrieves
126
// the timestamps of the most recent log records forwarded to the
127
// identified log sink for the current model.
128
func NewLastSentLogTracker(st ModelSessioner, modelUUID, sink string) *LastSentLogTracker {
129
return newLastSentLogTracker(st, modelUUID, sink)
132
// NewAllLastSentLogTracker returns a new tracker that records and retrieves
133
// the timestamps of the most recent log records forwarded to the
134
// identified log sink for *all* models.
135
func NewAllLastSentLogTracker(st ControllerSessioner, sink string) (*LastSentLogTracker, error) {
136
if !st.IsController() {
137
return nil, errors.New("only the admin model can track all log records")
139
return newLastSentLogTracker(st, "", sink), nil
142
func newLastSentLogTracker(st MongoSessioner, model, sink string) *LastSentLogTracker {
143
session := st.MongoSession().Copy()
144
return &LastSentLogTracker{
145
id: fmt.Sprintf("%s#%s", model, sink),
152
// Close implements io.Closer
153
func (logger *LastSentLogTracker) Close() error {
154
logger.session.Close()
158
// Set records the timestamp.
159
func (logger *LastSentLogTracker) Set(recID, recTimestamp int64) error {
160
collection := logger.session.DB(logsDB).C(forwardedC)
161
_, err := collection.UpsertId(
165
ModelUUID: logger.model,
168
RecordTimestamp: recTimestamp,
171
return errors.Trace(err)
174
// Get retrieves the id and timestamp.
175
func (logger *LastSentLogTracker) Get() (int64, int64, error) {
176
collection := logger.session.DB(logsDB).C(forwardedC)
178
err := collection.FindId(logger.id).One(&doc)
180
if err == mgo.ErrNotFound {
181
return 0, 0, errors.Trace(ErrNeverForwarded)
183
return 0, 0, errors.Trace(err)
185
return doc.RecordID, doc.RecordTimestamp, nil
188
// logDoc describes log messages stored in MongoDB.
190
// Single character field names are used for serialisation to save
191
// space. These documents will be inserted 1000's of times and each
192
// document includes the field names.
193
// (alesstimec) It would be really nice if we could store Time as int64
194
// for increased precision.
196
Id bson.ObjectId `bson:"_id"`
197
Time int64 `bson:"t"` // unix nano UTC
198
ModelUUID string `bson:"e"`
199
Entity string `bson:"n"` // e.g. "machine-0"
200
Version string `bson:"r"`
201
Module string `bson:"m"` // e.g. "juju.worker.firewaller"
202
Location string `bson:"l"` // "filename:lineno"
204
Message string `bson:"x"`
207
type DbLogger struct {
208
logsColl *mgo.Collection
214
// NewDbLogger returns a DbLogger instance which is used to write logs
216
func NewDbLogger(st ModelSessioner, entity names.Tag, ver version.Number) *DbLogger {
217
_, logsColl := initLogsSession(st)
220
modelUUID: st.ModelUUID(),
221
entity: entity.String(),
222
version: ver.String(),
226
// Log writes a log message to the database.
227
func (logger *DbLogger) Log(t time.Time, module string, location string, level loggo.Level, msg string) error {
228
// TODO(ericsnow) Use a controller-global int sequence for Id.
230
// UnixNano() returns the "absolute" (UTC) number of nanoseconds
231
// since the Unix "epoch".
232
unixEpochNanoUTC := t.UnixNano()
233
return logger.logsColl.Insert(&logDoc{
234
Id: bson.NewObjectId(),
235
Time: unixEpochNanoUTC,
236
ModelUUID: logger.modelUUID,
237
Entity: logger.entity,
238
Version: logger.version,
246
// Close cleans up resources used by the DbLogger instance.
247
func (logger *DbLogger) Close() {
248
if logger.logsColl != nil {
249
logger.logsColl.Database.Session.Close()
253
// LogTailer allows for retrieval of Juju's logs from MongoDB. It
254
// first returns any matching already recorded logs and then waits for
255
// additional matching logs as they appear.
256
type LogTailer interface {
257
// Logs returns the channel through which the LogTailer returns
258
// Juju logs. It will be closed when the tailer stops.
259
Logs() <-chan *LogRecord
261
// Dying returns a channel which will be closed as the LogTailer
263
Dying() <-chan struct{}
265
// Stop is used to request that the LogTailer stops. It blocks
266
// unil the LogTailer has stopped.
269
// Err returns the error that caused the LogTailer to stopped. If
270
// it hasn't stopped or stopped without error nil will be
275
// LogRecord defines a single Juju log message as returned by
277
type LogRecord struct {
285
Version version.Number
287
// logging-specific fields
294
// LogTailerParams specifies the filtering a LogTailer should apply to
295
// logs in order to decide which to return.
296
type LogTailerParams struct {
302
IncludeEntity []string
303
ExcludeEntity []string
304
IncludeModule []string
305
ExcludeModule []string
306
Oplog *mgo.Collection // For testing only
310
// oplogOverlap is used to decide on the initial oplog timestamp to
311
// use when the LogTailer transitions from querying the logs
312
// collection to tailing the oplog. Oplog records with a timestamp >=
313
// tolastTsFromLogsCollection - oplogOverlap will be considered. This
314
// is to allow for delayed log writes, clock skew between the Juju
315
// cluster hosts and log writes that occur during the transition
317
const oplogOverlap = time.Minute
319
// This is the maximum number of log document ids that will be tracked
320
// to avoid re-reporting logs when transitioning between querying the
321
// logs collection and tailing the oplog.
323
// The value was calculated by looking at the per-minute peak log
324
// output of large broken models with logging at DEBUG.
325
var maxRecentLogIds = int(oplogOverlap.Minutes() * 150000)
327
// LogTailerState describes the methods on State required for logging to
329
type LogTailerState interface {
332
// IsController indicates whether or not the model is the admin model.
336
// NewLogTailer returns a LogTailer which filters according to the
338
func NewLogTailer(st LogTailerState, params *LogTailerParams) (LogTailer, error) {
339
if !st.IsController() && params.AllModels {
340
return nil, errors.NewNotValid(nil, "not allowed to tail logs from all models: not a controller")
343
session := st.MongoSession().Copy()
345
modelUUID: st.ModelUUID(),
347
logsColl: session.DB(logsDB).C(logsC).With(session),
349
logCh: make(chan *LogRecord),
350
recentIds: newRecentIdTracker(maxRecentLogIds),
354
t.tomb.Kill(errors.Cause(err))
362
type logTailer struct {
366
logsColl *mgo.Collection
367
params *LogTailerParams
368
logCh chan *LogRecord
371
recentIds *recentIdTracker
374
// Logs implements the LogTailer interface.
375
func (t *logTailer) Logs() <-chan *LogRecord {
379
// Dying implements the LogTailer interface.
380
func (t *logTailer) Dying() <-chan struct{} {
381
return t.tomb.Dying()
384
// Stop implements the LogTailer interface.
385
func (t *logTailer) Stop() error {
390
// Err implements the LogTailer interface.
391
func (t *logTailer) Err() error {
395
func (t *logTailer) loop() error {
396
err := t.processCollection()
398
return errors.Trace(err)
406
return errors.Trace(err)
409
func (t *logTailer) processCollection() error {
410
// Create a selector from the params.
411
sel := t.paramsToSelector(t.params, "")
412
query := t.logsColl.Find(sel)
414
if t.params.InitialLines > 0 {
415
// This is a little racy but it's good enough.
416
count, err := query.Count()
418
return errors.Annotate(err, "query count failed")
420
if skipOver := count - t.params.InitialLines; skipOver > 0 {
421
query = query.Skip(skipOver)
425
// In tests, sorting by time can leave the result ordering
426
// underconstrained. Since object ids are (timestamp, machine id,
427
// process id, counter)
428
// https://docs.mongodb.com/manual/reference/bson-types/#objectid
429
// and the tests only run one mongod process, including _id
430
// guarantees getting log messages in a predictable order.
432
// Important: it is critical that the sort on _id is done
433
// separately from the sort on {model, time}. Combining the sort
434
// fields means that MongoDB won't use the indexes that are in
435
// place, which risks hitting MongoDB's 32MB sort limit. See
436
// https://pad.lv/1590605.
438
// TODO(ericsnow) Sort only by _id once it is a sequential int.
439
iter := query.Sort("e", "t").Sort("_id").Iter()
442
rec, err := logDocToRecord(doc)
444
return errors.Annotate(err, "deserialization failed (possible DB corruption)")
447
case <-t.tomb.Dying():
448
return errors.Trace(tomb.ErrDying)
451
t.lastTime = rec.Time
452
t.recentIds.Add(doc.Id)
455
return errors.Trace(iter.Close())
458
func (t *logTailer) tailOplog() error {
459
recentIds := t.recentIds.AsSet()
461
newParams := t.params
462
newParams.StartID = t.lastID // (t.lastID + 1) once Id is a sequential int.
463
oplogSel := append(t.paramsToSelector(newParams, "o."),
464
bson.DocElem{"ns", logsDB + "." + logsC},
467
oplog := t.params.Oplog
469
oplog = mongo.GetOplog(t.session)
472
minOplogTs := t.lastTime.Add(-oplogOverlap)
473
oplogTailer := mongo.NewOplogTailer(mongo.NewOplogSession(oplog, oplogSel), minOplogTs)
474
defer oplogTailer.Stop()
476
logger.Tracef("LogTailer starting oplog tailing: recent id count=%d, lastTime=%s, minOplogTs=%s",
477
recentIds.Length(), t.lastTime, minOplogTs)
482
case <-t.tomb.Dying():
483
return errors.Trace(tomb.ErrDying)
484
case oplogDoc, ok := <-oplogTailer.Out():
486
return errors.Annotate(oplogTailer.Err(), "oplog tailer died")
490
err := oplogDoc.UnmarshalObject(doc)
492
return errors.Annotate(err, "oplog unmarshalling failed")
495
if recentIds.Contains(doc.Id) {
496
// This document has already been reported.
498
if skipCount%1000 == 0 {
499
logger.Tracef("LogTailer duplicates skipped: %d", skipCount)
503
rec, err := logDocToRecord(doc)
505
return errors.Annotate(err, "deserialization failed (possible DB corruption)")
508
case <-t.tomb.Dying():
509
return errors.Trace(tomb.ErrDying)
516
func (t *logTailer) paramsToSelector(params *LogTailerParams, prefix string) bson.D {
518
if !params.StartTime.IsZero() {
519
sel = append(sel, bson.DocElem{"t", bson.M{"$gte": params.StartTime.UnixNano()}})
521
if !params.AllModels {
522
sel = append(sel, bson.DocElem{"e", t.modelUUID})
524
if params.MinLevel > loggo.UNSPECIFIED {
525
sel = append(sel, bson.DocElem{"v", bson.M{"$gte": int(params.MinLevel)}})
527
if len(params.IncludeEntity) > 0 {
529
bson.DocElem{"n", bson.RegEx{Pattern: makeEntityPattern(params.IncludeEntity)}})
531
if len(params.ExcludeEntity) > 0 {
533
bson.DocElem{"n", bson.M{"$not": bson.RegEx{Pattern: makeEntityPattern(params.ExcludeEntity)}}})
535
if len(params.IncludeModule) > 0 {
537
bson.DocElem{"m", bson.RegEx{Pattern: makeModulePattern(params.IncludeModule)}})
539
if len(params.ExcludeModule) > 0 {
541
bson.DocElem{"m", bson.M{"$not": bson.RegEx{Pattern: makeModulePattern(params.ExcludeModule)}}})
544
for i, elem := range sel {
545
sel[i].Name = prefix + elem.Name
551
func makeEntityPattern(entities []string) string {
552
var patterns []string
553
for _, entity := range entities {
554
// Convert * wildcard to the regex equivalent. This is safe
555
// because * never appears in entity names.
556
patterns = append(patterns, strings.Replace(entity, "*", ".*", -1))
558
return `^(` + strings.Join(patterns, "|") + `)$`
561
func makeModulePattern(modules []string) string {
562
var patterns []string
563
for _, module := range modules {
564
patterns = append(patterns, regexp.QuoteMeta(module))
566
return `^(` + strings.Join(patterns, "|") + `)(\..+)?$`
569
func newRecentIdTracker(maxLen int) *recentIdTracker {
570
return &recentIdTracker{
571
ids: deque.NewWithMaxLen(maxLen),
575
type recentIdTracker struct {
579
func (t *recentIdTracker) Add(id bson.ObjectId) {
583
func (t *recentIdTracker) AsSet() *objectIdSet {
584
out := newObjectIdSet()
586
id, ok := t.ids.PopFront()
590
out.Add(id.(bson.ObjectId))
595
func newObjectIdSet() *objectIdSet {
597
ids: set.NewStrings(),
601
type objectIdSet struct {
605
func (s *objectIdSet) Add(id bson.ObjectId) {
606
s.ids.Add(string(id))
609
func (s *objectIdSet) Contains(id bson.ObjectId) bool {
610
return s.ids.Contains(string(id))
613
func (s *objectIdSet) Length() int {
617
func logDocToRecord(doc *logDoc) (*LogRecord, error) {
618
var ver version.Number
619
if doc.Version != "" {
620
parsed, err := version.Parse(doc.Version)
622
return nil, errors.Annotatef(err, "invalid version %q", doc.Version)
627
level := loggo.Level(doc.Level)
628
if level > loggo.CRITICAL {
629
return nil, errors.Errorf("unrecognized log level %q", doc.Level)
632
entity, err := names.ParseTag(doc.Entity)
634
return nil, errors.Annotate(err, "while parsing entity tag")
639
Time: time.Unix(0, doc.Time).UTC(), // not worth preserving TZ
641
ModelUUID: doc.ModelUUID,
647
Location: doc.Location,
648
Message: doc.Message,
653
// PruneLogs removes old log documents in order to control the size of
654
// logs collection. All logs older than minLogTime are
655
// removed. Further removal is also performed if the logs collection
656
// size is greater than maxLogsMB.
657
func PruneLogs(st MongoSessioner, minLogTime time.Time, maxLogsMB int) error {
658
session, logsColl := initLogsSession(st)
659
defer session.Close()
661
modelUUIDs, err := getEnvsInLogs(logsColl)
663
return errors.Annotate(err, "failed to get log counts")
666
pruneCounts := make(map[string]int)
668
// Remove old log entries (per model UUID to take advantage
669
// of indexes on the logs collection).
670
for _, modelUUID := range modelUUIDs {
671
removeInfo, err := logsColl.RemoveAll(bson.M{
673
"t": bson.M{"$lt": minLogTime.UnixNano()},
676
return errors.Annotate(err, "failed to prune logs by time")
678
pruneCounts[modelUUID] = removeInfo.Removed
681
// Do further pruning if the logs collection is over the maximum size.
683
collMB, err := getCollectionMB(logsColl)
685
return errors.Annotate(err, "failed to retrieve log counts")
687
if collMB <= maxLogsMB {
691
modelUUID, count, err := findEnvWithMostLogs(logsColl, modelUUIDs)
693
return errors.Annotate(err, "log count query failed")
696
break // Pruning is not worthwhile
699
// Remove the oldest 1% of log records for the model.
700
toRemove := int(float64(count) * 0.01)
702
// Find the threshold timestammp to start removing from.
703
// NOTE: this assumes that there are no more logs being added
704
// for the time range being pruned (which should be true for
705
// any realistic minimum log collection size).
706
tsQuery := logsColl.Find(bson.M{"e": modelUUID}).Sort("e", "t")
707
tsQuery = tsQuery.Skip(toRemove)
708
tsQuery = tsQuery.Select(bson.M{"t": 1})
710
err = tsQuery.One(&doc)
712
return errors.Annotate(err, "log pruning timestamp query failed")
714
thresholdTs := doc["t"]
716
// Remove old records.
717
removeInfo, err := logsColl.RemoveAll(bson.M{
719
"t": bson.M{"$lt": thresholdTs},
722
return errors.Annotate(err, "log pruning failed")
724
pruneCounts[modelUUID] += removeInfo.Removed
727
for modelUUID, count := range pruneCounts {
729
logger.Debugf("pruned %d logs for model %s", count, modelUUID)
735
// initLogsSession creates a new session suitable for logging updates,
736
// returning the session and a logs mgo.Collection connected to that
738
func initLogsSession(st MongoSessioner) (*mgo.Session, *mgo.Collection) {
739
// To improve throughput, only wait for the logs to be written to
740
// the primary. For some reason, this makes a huge difference even
741
// when the replicaset only has one member (i.e. a single primary).
742
session := st.MongoSession().Copy()
743
session.SetSafe(&mgo.Safe{
746
db := session.DB(logsDB)
747
return session, db.C(logsC).With(session)
750
// getCollectionMB returns the size of a MongoDB collection (in
751
// bytes), excluding space used by indexes.
752
func getCollectionMB(coll *mgo.Collection) (int, error) {
754
err := coll.Database.Run(bson.D{
755
{"collStats", coll.Name},
756
{"scale", humanize.MiByte},
759
return 0, errors.Trace(err)
761
return result["size"].(int), nil
764
// getEnvsInLogs returns the unique model UUIDs that exist in
765
// the logs collection. This uses the one of the indexes on the
766
// collection and should be fast.
767
func getEnvsInLogs(coll *mgo.Collection) ([]string, error) {
768
var modelUUIDs []string
769
err := coll.Find(nil).Distinct("e", &modelUUIDs)
771
return nil, errors.Trace(err)
773
return modelUUIDs, nil
776
// findEnvWithMostLogs returns the modelUUID and log count for the
777
// model with the most logs in the logs collection.
778
func findEnvWithMostLogs(logsColl *mgo.Collection, modelUUIDs []string) (string, int, error) {
779
var maxModelUUID string
781
for _, modelUUID := range modelUUIDs {
782
count, err := getLogCountForEnv(logsColl, modelUUID)
784
return "", -1, errors.Trace(err)
786
if count > maxCount {
787
maxModelUUID = modelUUID
791
return maxModelUUID, maxCount, nil
794
// getLogCountForEnv returns the number of log records stored for a
796
func getLogCountForEnv(coll *mgo.Collection, modelUUID string) (int, error) {
797
count, err := coll.Find(bson.M{"e": modelUUID}).Count()
799
return -1, errors.Annotate(err, "failed to get log count")