~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/state/logs.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2015 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
// Low-level functionality for interacting with the logs collection
 
5
// and tailing logs from the replication oplog.
 
6
 
 
7
package state
 
8
 
 
9
import (
 
10
        "fmt"
 
11
        "regexp"
 
12
        "strings"
 
13
        "time"
 
14
 
 
15
        "github.com/dustin/go-humanize"
 
16
        "github.com/juju/errors"
 
17
        "github.com/juju/loggo"
 
18
        "github.com/juju/utils/deque"
 
19
        "github.com/juju/utils/set"
 
20
        "github.com/juju/version"
 
21
        "gopkg.in/juju/names.v2"
 
22
        "gopkg.in/mgo.v2"
 
23
        "gopkg.in/mgo.v2/bson"
 
24
        "launchpad.net/tomb"
 
25
 
 
26
        "github.com/juju/juju/mongo"
 
27
)
 
28
 
 
29
// TODO(wallyworld) - lp:1602508 - collections need to be defined in collections.go
 
30
const (
 
31
        logsDB     = "logs"
 
32
        logsC      = "logs"
 
33
        forwardedC = "forwarded"
 
34
)
 
35
 
 
36
// ErrNeverForwarded signals to the caller that the ID of a
 
37
// previously forwarded log record could not be found.
 
38
var ErrNeverForwarded = errors.Errorf("cannot find ID of the last forwarded record")
 
39
 
 
40
// MongoSessioner supports creating new mongo sessions.
 
41
type MongoSessioner interface {
 
42
        // MongoSession creates a new Mongo session.
 
43
        MongoSession() *mgo.Session
 
44
}
 
45
 
 
46
// ModelSessioner supports creating new mongo sessions for the controller.
 
47
type ControllerSessioner interface {
 
48
        MongoSessioner
 
49
 
 
50
        // IsController indicates if current state is controller.
 
51
        IsController() bool
 
52
}
 
53
 
 
54
// ModelSessioner supports creating new mongo sessions for a model.
 
55
type ModelSessioner interface {
 
56
        MongoSessioner
 
57
 
 
58
        // ModelUUID returns the ID of the current model.
 
59
        ModelUUID() string
 
60
}
 
61
 
 
62
// InitDbLogs sets up the indexes for the logs collection. It should
 
63
// be called as state is opened. It is idempotent.
 
64
func InitDbLogs(session *mgo.Session) error {
 
65
        logsColl := session.DB(logsDB).C(logsC)
 
66
        for _, key := range [][]string{{"e", "t"}, {"e", "n"}} {
 
67
                err := logsColl.EnsureIndex(mgo.Index{Key: key})
 
68
                if err != nil {
 
69
                        return errors.Annotate(err, "cannot create index for logs collection")
 
70
                }
 
71
        }
 
72
        return nil
 
73
}
 
74
 
 
75
// lastSentDoc captures timestamp of the last log record forwarded
 
76
// to a log sink.
 
77
type lastSentDoc struct {
 
78
        // ID is the unique ID mongo will use for the doc.
 
79
        ID string `bson:"_id"`
 
80
 
 
81
        // ModelUUID identifies the model for which the identified record
 
82
        // was last sent.
 
83
        ModelUUID string `bson:"model-uuid"`
 
84
 
 
85
        // Sink identifies the log forwarding target to which the identified
 
86
        // log record was sent.
 
87
        Sink string `bson:"sink"`
 
88
 
 
89
        // TODO(ericsnow) Solve the problems with using the timestamp
 
90
        // as the record ID.
 
91
 
 
92
        // RecordTimestamp identifies the last record sent to the log sink
 
93
        // for the model. It is used to look up log records in the DB.
 
94
        //
 
95
        // Currently we use the record's timestamp (unix nano UTC), which
 
96
        // has a risk of collisions. Log record timestamps have nanosecond
 
97
        // precision. The likelihood of multiple log records having the
 
98
        // same timestamp is small, though it increases with the size and
 
99
        // activity of the model.
 
100
        //
 
101
        // Using the timestamp also has the problem that a faulty clock may
 
102
        // introduce records with a timestamp earlier that the "last sent"
 
103
        // value. Such records would never get forwarded.
 
104
        //
 
105
        // The solution to both these issues will likely involve using an
 
106
        // int sequence for the ID rather than the timestamp. That sequence
 
107
        // would be shared by all models.
 
108
        RecordTimestamp int64 `bson:"record-timestamp"`
 
109
 
 
110
        // RecordID is the ID of the last record sent to the log sink.
 
111
        // We record it but currently just use the timestamp when querying
 
112
        // the log collection.
 
113
        RecordID int64 `bson:"record-id"`
 
114
}
 
115
 
 
116
// LastSentLogTracker records and retrieves timestamps of the most recent
 
117
// log records forwarded to a log sink for a model.
 
118
type LastSentLogTracker struct {
 
119
        session *mgo.Session
 
120
        id      string
 
121
        model   string
 
122
        sink    string
 
123
}
 
124
 
 
125
// NewLastSentLogTracker returns a new tracker that records and retrieves
 
126
// the timestamps of the most recent log records forwarded to the
 
127
// identified log sink for the current model.
 
128
func NewLastSentLogTracker(st ModelSessioner, modelUUID, sink string) *LastSentLogTracker {
 
129
        return newLastSentLogTracker(st, modelUUID, sink)
 
130
}
 
131
 
 
132
// NewAllLastSentLogTracker returns a new tracker that records and retrieves
 
133
// the timestamps of the most recent log records forwarded to the
 
134
// identified log sink for *all* models.
 
135
func NewAllLastSentLogTracker(st ControllerSessioner, sink string) (*LastSentLogTracker, error) {
 
136
        if !st.IsController() {
 
137
                return nil, errors.New("only the admin model can track all log records")
 
138
        }
 
139
        return newLastSentLogTracker(st, "", sink), nil
 
140
}
 
141
 
 
142
func newLastSentLogTracker(st MongoSessioner, model, sink string) *LastSentLogTracker {
 
143
        session := st.MongoSession().Copy()
 
144
        return &LastSentLogTracker{
 
145
                id:      fmt.Sprintf("%s#%s", model, sink),
 
146
                model:   model,
 
147
                sink:    sink,
 
148
                session: session,
 
149
        }
 
150
}
 
151
 
 
152
// Close implements io.Closer
 
153
func (logger *LastSentLogTracker) Close() error {
 
154
        logger.session.Close()
 
155
        return nil
 
156
}
 
157
 
 
158
// Set records the timestamp.
 
159
func (logger *LastSentLogTracker) Set(recID, recTimestamp int64) error {
 
160
        collection := logger.session.DB(logsDB).C(forwardedC)
 
161
        _, err := collection.UpsertId(
 
162
                logger.id,
 
163
                lastSentDoc{
 
164
                        ID:              logger.id,
 
165
                        ModelUUID:       logger.model,
 
166
                        Sink:            logger.sink,
 
167
                        RecordID:        recID,
 
168
                        RecordTimestamp: recTimestamp,
 
169
                },
 
170
        )
 
171
        return errors.Trace(err)
 
172
}
 
173
 
 
174
// Get retrieves the id and timestamp.
 
175
func (logger *LastSentLogTracker) Get() (int64, int64, error) {
 
176
        collection := logger.session.DB(logsDB).C(forwardedC)
 
177
        var doc lastSentDoc
 
178
        err := collection.FindId(logger.id).One(&doc)
 
179
        if err != nil {
 
180
                if err == mgo.ErrNotFound {
 
181
                        return 0, 0, errors.Trace(ErrNeverForwarded)
 
182
                }
 
183
                return 0, 0, errors.Trace(err)
 
184
        }
 
185
        return doc.RecordID, doc.RecordTimestamp, nil
 
186
}
 
187
 
 
188
// logDoc describes log messages stored in MongoDB.
 
189
//
 
190
// Single character field names are used for serialisation to save
 
191
// space. These documents will be inserted 1000's of times and each
 
192
// document includes the field names.
 
193
// (alesstimec) It would be really nice if we could store Time as int64
 
194
// for increased precision.
 
195
type logDoc struct {
 
196
        Id        bson.ObjectId `bson:"_id"`
 
197
        Time      int64         `bson:"t"` // unix nano UTC
 
198
        ModelUUID string        `bson:"e"`
 
199
        Entity    string        `bson:"n"` // e.g. "machine-0"
 
200
        Version   string        `bson:"r"`
 
201
        Module    string        `bson:"m"` // e.g. "juju.worker.firewaller"
 
202
        Location  string        `bson:"l"` // "filename:lineno"
 
203
        Level     int           `bson:"v"`
 
204
        Message   string        `bson:"x"`
 
205
}
 
206
 
 
207
type DbLogger struct {
 
208
        logsColl  *mgo.Collection
 
209
        modelUUID string
 
210
        entity    string
 
211
        version   string
 
212
}
 
213
 
 
214
// NewDbLogger returns a DbLogger instance which is used to write logs
 
215
// to the database.
 
216
func NewDbLogger(st ModelSessioner, entity names.Tag, ver version.Number) *DbLogger {
 
217
        _, logsColl := initLogsSession(st)
 
218
        return &DbLogger{
 
219
                logsColl:  logsColl,
 
220
                modelUUID: st.ModelUUID(),
 
221
                entity:    entity.String(),
 
222
                version:   ver.String(),
 
223
        }
 
224
}
 
225
 
 
226
// Log writes a log message to the database.
 
227
func (logger *DbLogger) Log(t time.Time, module string, location string, level loggo.Level, msg string) error {
 
228
        // TODO(ericsnow) Use a controller-global int sequence for Id.
 
229
 
 
230
        // UnixNano() returns the "absolute" (UTC) number of nanoseconds
 
231
        // since the Unix "epoch".
 
232
        unixEpochNanoUTC := t.UnixNano()
 
233
        return logger.logsColl.Insert(&logDoc{
 
234
                Id:        bson.NewObjectId(),
 
235
                Time:      unixEpochNanoUTC,
 
236
                ModelUUID: logger.modelUUID,
 
237
                Entity:    logger.entity,
 
238
                Version:   logger.version,
 
239
                Module:    module,
 
240
                Location:  location,
 
241
                Level:     int(level),
 
242
                Message:   msg,
 
243
        })
 
244
}
 
245
 
 
246
// Close cleans up resources used by the DbLogger instance.
 
247
func (logger *DbLogger) Close() {
 
248
        if logger.logsColl != nil {
 
249
                logger.logsColl.Database.Session.Close()
 
250
        }
 
251
}
 
252
 
 
253
// LogTailer allows for retrieval of Juju's logs from MongoDB. It
 
254
// first returns any matching already recorded logs and then waits for
 
255
// additional matching logs as they appear.
 
256
type LogTailer interface {
 
257
        // Logs returns the channel through which the LogTailer returns
 
258
        // Juju logs. It will be closed when the tailer stops.
 
259
        Logs() <-chan *LogRecord
 
260
 
 
261
        // Dying returns a channel which will be closed as the LogTailer
 
262
        // stops.
 
263
        Dying() <-chan struct{}
 
264
 
 
265
        // Stop is used to request that the LogTailer stops. It blocks
 
266
        // unil the LogTailer has stopped.
 
267
        Stop() error
 
268
 
 
269
        // Err returns the error that caused the LogTailer to stopped. If
 
270
        // it hasn't stopped or stopped without error nil will be
 
271
        // returned.
 
272
        Err() error
 
273
}
 
274
 
 
275
// LogRecord defines a single Juju log message as returned by
 
276
// LogTailer.
 
277
type LogRecord struct {
 
278
        // universal fields
 
279
        ID   int64
 
280
        Time time.Time
 
281
 
 
282
        // origin fields
 
283
        ModelUUID string
 
284
        Entity    names.Tag
 
285
        Version   version.Number
 
286
 
 
287
        // logging-specific fields
 
288
        Level    loggo.Level
 
289
        Module   string
 
290
        Location string
 
291
        Message  string
 
292
}
 
293
 
 
294
// LogTailerParams specifies the filtering a LogTailer should apply to
 
295
// logs in order to decide which to return.
 
296
type LogTailerParams struct {
 
297
        StartID       int64
 
298
        StartTime     time.Time
 
299
        MinLevel      loggo.Level
 
300
        InitialLines  int
 
301
        NoTail        bool
 
302
        IncludeEntity []string
 
303
        ExcludeEntity []string
 
304
        IncludeModule []string
 
305
        ExcludeModule []string
 
306
        Oplog         *mgo.Collection // For testing only
 
307
        AllModels     bool
 
308
}
 
309
 
 
310
// oplogOverlap is used to decide on the initial oplog timestamp to
 
311
// use when the LogTailer transitions from querying the logs
 
312
// collection to tailing the oplog. Oplog records with a timestamp >=
 
313
// tolastTsFromLogsCollection - oplogOverlap will be considered. This
 
314
// is to allow for delayed log writes, clock skew between the Juju
 
315
// cluster hosts and log writes that occur during the transition
 
316
// period.
 
317
const oplogOverlap = time.Minute
 
318
 
 
319
// This is the maximum number of log document ids that will be tracked
 
320
// to avoid re-reporting logs when transitioning between querying the
 
321
// logs collection and tailing the oplog.
 
322
//
 
323
// The value was calculated by looking at the per-minute peak log
 
324
// output of large broken models with logging at DEBUG.
 
325
var maxRecentLogIds = int(oplogOverlap.Minutes() * 150000)
 
326
 
 
327
// LogTailerState describes the methods on State required for logging to
 
328
// the database.
 
329
type LogTailerState interface {
 
330
        ModelSessioner
 
331
 
 
332
        // IsController indicates whether or not the model is the admin model.
 
333
        IsController() bool
 
334
}
 
335
 
 
336
// NewLogTailer returns a LogTailer which filters according to the
 
337
// parameters given.
 
338
func NewLogTailer(st LogTailerState, params *LogTailerParams) (LogTailer, error) {
 
339
        if !st.IsController() && params.AllModels {
 
340
                return nil, errors.NewNotValid(nil, "not allowed to tail logs from all models: not a controller")
 
341
        }
 
342
 
 
343
        session := st.MongoSession().Copy()
 
344
        t := &logTailer{
 
345
                modelUUID: st.ModelUUID(),
 
346
                session:   session,
 
347
                logsColl:  session.DB(logsDB).C(logsC).With(session),
 
348
                params:    params,
 
349
                logCh:     make(chan *LogRecord),
 
350
                recentIds: newRecentIdTracker(maxRecentLogIds),
 
351
        }
 
352
        go func() {
 
353
                err := t.loop()
 
354
                t.tomb.Kill(errors.Cause(err))
 
355
                close(t.logCh)
 
356
                session.Close()
 
357
                t.tomb.Done()
 
358
        }()
 
359
        return t, nil
 
360
}
 
361
 
 
362
type logTailer struct {
 
363
        tomb      tomb.Tomb
 
364
        modelUUID string
 
365
        session   *mgo.Session
 
366
        logsColl  *mgo.Collection
 
367
        params    *LogTailerParams
 
368
        logCh     chan *LogRecord
 
369
        lastID    int64
 
370
        lastTime  time.Time
 
371
        recentIds *recentIdTracker
 
372
}
 
373
 
 
374
// Logs implements the LogTailer interface.
 
375
func (t *logTailer) Logs() <-chan *LogRecord {
 
376
        return t.logCh
 
377
}
 
378
 
 
379
// Dying implements the LogTailer interface.
 
380
func (t *logTailer) Dying() <-chan struct{} {
 
381
        return t.tomb.Dying()
 
382
}
 
383
 
 
384
// Stop implements the LogTailer interface.
 
385
func (t *logTailer) Stop() error {
 
386
        t.tomb.Kill(nil)
 
387
        return t.tomb.Wait()
 
388
}
 
389
 
 
390
// Err implements the LogTailer interface.
 
391
func (t *logTailer) Err() error {
 
392
        return t.tomb.Err()
 
393
}
 
394
 
 
395
func (t *logTailer) loop() error {
 
396
        err := t.processCollection()
 
397
        if err != nil {
 
398
                return errors.Trace(err)
 
399
        }
 
400
 
 
401
        if t.params.NoTail {
 
402
                return nil
 
403
        }
 
404
 
 
405
        err = t.tailOplog()
 
406
        return errors.Trace(err)
 
407
}
 
408
 
 
409
func (t *logTailer) processCollection() error {
 
410
        // Create a selector from the params.
 
411
        sel := t.paramsToSelector(t.params, "")
 
412
        query := t.logsColl.Find(sel)
 
413
 
 
414
        if t.params.InitialLines > 0 {
 
415
                // This is a little racy but it's good enough.
 
416
                count, err := query.Count()
 
417
                if err != nil {
 
418
                        return errors.Annotate(err, "query count failed")
 
419
                }
 
420
                if skipOver := count - t.params.InitialLines; skipOver > 0 {
 
421
                        query = query.Skip(skipOver)
 
422
                }
 
423
        }
 
424
 
 
425
        // In tests, sorting by time can leave the result ordering
 
426
        // underconstrained. Since object ids are (timestamp, machine id,
 
427
        // process id, counter)
 
428
        // https://docs.mongodb.com/manual/reference/bson-types/#objectid
 
429
        // and the tests only run one mongod process, including _id
 
430
        // guarantees getting log messages in a predictable order.
 
431
        //
 
432
        // Important: it is critical that the sort on _id is done
 
433
        // separately from the sort on {model, time}. Combining the sort
 
434
        // fields means that MongoDB won't use the indexes that are in
 
435
        // place, which risks hitting MongoDB's 32MB sort limit.  See
 
436
        // https://pad.lv/1590605.
 
437
        //
 
438
        // TODO(ericsnow) Sort only by _id once it is a sequential int.
 
439
        iter := query.Sort("e", "t").Sort("_id").Iter()
 
440
        doc := new(logDoc)
 
441
        for iter.Next(doc) {
 
442
                rec, err := logDocToRecord(doc)
 
443
                if err != nil {
 
444
                        return errors.Annotate(err, "deserialization failed (possible DB corruption)")
 
445
                }
 
446
                select {
 
447
                case <-t.tomb.Dying():
 
448
                        return errors.Trace(tomb.ErrDying)
 
449
                case t.logCh <- rec:
 
450
                        t.lastID = rec.ID
 
451
                        t.lastTime = rec.Time
 
452
                        t.recentIds.Add(doc.Id)
 
453
                }
 
454
        }
 
455
        return errors.Trace(iter.Close())
 
456
}
 
457
 
 
458
func (t *logTailer) tailOplog() error {
 
459
        recentIds := t.recentIds.AsSet()
 
460
 
 
461
        newParams := t.params
 
462
        newParams.StartID = t.lastID // (t.lastID + 1) once Id is a sequential int.
 
463
        oplogSel := append(t.paramsToSelector(newParams, "o."),
 
464
                bson.DocElem{"ns", logsDB + "." + logsC},
 
465
        )
 
466
 
 
467
        oplog := t.params.Oplog
 
468
        if oplog == nil {
 
469
                oplog = mongo.GetOplog(t.session)
 
470
        }
 
471
 
 
472
        minOplogTs := t.lastTime.Add(-oplogOverlap)
 
473
        oplogTailer := mongo.NewOplogTailer(mongo.NewOplogSession(oplog, oplogSel), minOplogTs)
 
474
        defer oplogTailer.Stop()
 
475
 
 
476
        logger.Tracef("LogTailer starting oplog tailing: recent id count=%d, lastTime=%s, minOplogTs=%s",
 
477
                recentIds.Length(), t.lastTime, minOplogTs)
 
478
 
 
479
        skipCount := 0
 
480
        for {
 
481
                select {
 
482
                case <-t.tomb.Dying():
 
483
                        return errors.Trace(tomb.ErrDying)
 
484
                case oplogDoc, ok := <-oplogTailer.Out():
 
485
                        if !ok {
 
486
                                return errors.Annotate(oplogTailer.Err(), "oplog tailer died")
 
487
                        }
 
488
 
 
489
                        doc := new(logDoc)
 
490
                        err := oplogDoc.UnmarshalObject(doc)
 
491
                        if err != nil {
 
492
                                return errors.Annotate(err, "oplog unmarshalling failed")
 
493
                        }
 
494
 
 
495
                        if recentIds.Contains(doc.Id) {
 
496
                                // This document has already been reported.
 
497
                                skipCount++
 
498
                                if skipCount%1000 == 0 {
 
499
                                        logger.Tracef("LogTailer duplicates skipped: %d", skipCount)
 
500
                                }
 
501
                                continue
 
502
                        }
 
503
                        rec, err := logDocToRecord(doc)
 
504
                        if err != nil {
 
505
                                return errors.Annotate(err, "deserialization failed (possible DB corruption)")
 
506
                        }
 
507
                        select {
 
508
                        case <-t.tomb.Dying():
 
509
                                return errors.Trace(tomb.ErrDying)
 
510
                        case t.logCh <- rec:
 
511
                        }
 
512
                }
 
513
        }
 
514
}
 
515
 
 
516
func (t *logTailer) paramsToSelector(params *LogTailerParams, prefix string) bson.D {
 
517
        sel := bson.D{}
 
518
        if !params.StartTime.IsZero() {
 
519
                sel = append(sel, bson.DocElem{"t", bson.M{"$gte": params.StartTime.UnixNano()}})
 
520
        }
 
521
        if !params.AllModels {
 
522
                sel = append(sel, bson.DocElem{"e", t.modelUUID})
 
523
        }
 
524
        if params.MinLevel > loggo.UNSPECIFIED {
 
525
                sel = append(sel, bson.DocElem{"v", bson.M{"$gte": int(params.MinLevel)}})
 
526
        }
 
527
        if len(params.IncludeEntity) > 0 {
 
528
                sel = append(sel,
 
529
                        bson.DocElem{"n", bson.RegEx{Pattern: makeEntityPattern(params.IncludeEntity)}})
 
530
        }
 
531
        if len(params.ExcludeEntity) > 0 {
 
532
                sel = append(sel,
 
533
                        bson.DocElem{"n", bson.M{"$not": bson.RegEx{Pattern: makeEntityPattern(params.ExcludeEntity)}}})
 
534
        }
 
535
        if len(params.IncludeModule) > 0 {
 
536
                sel = append(sel,
 
537
                        bson.DocElem{"m", bson.RegEx{Pattern: makeModulePattern(params.IncludeModule)}})
 
538
        }
 
539
        if len(params.ExcludeModule) > 0 {
 
540
                sel = append(sel,
 
541
                        bson.DocElem{"m", bson.M{"$not": bson.RegEx{Pattern: makeModulePattern(params.ExcludeModule)}}})
 
542
        }
 
543
        if prefix != "" {
 
544
                for i, elem := range sel {
 
545
                        sel[i].Name = prefix + elem.Name
 
546
                }
 
547
        }
 
548
        return sel
 
549
}
 
550
 
 
551
func makeEntityPattern(entities []string) string {
 
552
        var patterns []string
 
553
        for _, entity := range entities {
 
554
                // Convert * wildcard to the regex equivalent. This is safe
 
555
                // because * never appears in entity names.
 
556
                patterns = append(patterns, strings.Replace(entity, "*", ".*", -1))
 
557
        }
 
558
        return `^(` + strings.Join(patterns, "|") + `)$`
 
559
}
 
560
 
 
561
func makeModulePattern(modules []string) string {
 
562
        var patterns []string
 
563
        for _, module := range modules {
 
564
                patterns = append(patterns, regexp.QuoteMeta(module))
 
565
        }
 
566
        return `^(` + strings.Join(patterns, "|") + `)(\..+)?$`
 
567
}
 
568
 
 
569
func newRecentIdTracker(maxLen int) *recentIdTracker {
 
570
        return &recentIdTracker{
 
571
                ids: deque.NewWithMaxLen(maxLen),
 
572
        }
 
573
}
 
574
 
 
575
type recentIdTracker struct {
 
576
        ids *deque.Deque
 
577
}
 
578
 
 
579
func (t *recentIdTracker) Add(id bson.ObjectId) {
 
580
        t.ids.PushBack(id)
 
581
}
 
582
 
 
583
func (t *recentIdTracker) AsSet() *objectIdSet {
 
584
        out := newObjectIdSet()
 
585
        for {
 
586
                id, ok := t.ids.PopFront()
 
587
                if !ok {
 
588
                        break
 
589
                }
 
590
                out.Add(id.(bson.ObjectId))
 
591
        }
 
592
        return out
 
593
}
 
594
 
 
595
func newObjectIdSet() *objectIdSet {
 
596
        return &objectIdSet{
 
597
                ids: set.NewStrings(),
 
598
        }
 
599
}
 
600
 
 
601
type objectIdSet struct {
 
602
        ids set.Strings
 
603
}
 
604
 
 
605
func (s *objectIdSet) Add(id bson.ObjectId) {
 
606
        s.ids.Add(string(id))
 
607
}
 
608
 
 
609
func (s *objectIdSet) Contains(id bson.ObjectId) bool {
 
610
        return s.ids.Contains(string(id))
 
611
}
 
612
 
 
613
func (s *objectIdSet) Length() int {
 
614
        return len(s.ids)
 
615
}
 
616
 
 
617
func logDocToRecord(doc *logDoc) (*LogRecord, error) {
 
618
        var ver version.Number
 
619
        if doc.Version != "" {
 
620
                parsed, err := version.Parse(doc.Version)
 
621
                if err != nil {
 
622
                        return nil, errors.Annotatef(err, "invalid version %q", doc.Version)
 
623
                }
 
624
                ver = parsed
 
625
        }
 
626
 
 
627
        level := loggo.Level(doc.Level)
 
628
        if level > loggo.CRITICAL {
 
629
                return nil, errors.Errorf("unrecognized log level %q", doc.Level)
 
630
        }
 
631
 
 
632
        entity, err := names.ParseTag(doc.Entity)
 
633
        if err != nil {
 
634
                return nil, errors.Annotate(err, "while parsing entity tag")
 
635
        }
 
636
 
 
637
        rec := &LogRecord{
 
638
                ID:   doc.Time,
 
639
                Time: time.Unix(0, doc.Time).UTC(), // not worth preserving TZ
 
640
 
 
641
                ModelUUID: doc.ModelUUID,
 
642
                Entity:    entity,
 
643
                Version:   ver,
 
644
 
 
645
                Level:    level,
 
646
                Module:   doc.Module,
 
647
                Location: doc.Location,
 
648
                Message:  doc.Message,
 
649
        }
 
650
        return rec, nil
 
651
}
 
652
 
 
653
// PruneLogs removes old log documents in order to control the size of
 
654
// logs collection. All logs older than minLogTime are
 
655
// removed. Further removal is also performed if the logs collection
 
656
// size is greater than maxLogsMB.
 
657
func PruneLogs(st MongoSessioner, minLogTime time.Time, maxLogsMB int) error {
 
658
        session, logsColl := initLogsSession(st)
 
659
        defer session.Close()
 
660
 
 
661
        modelUUIDs, err := getEnvsInLogs(logsColl)
 
662
        if err != nil {
 
663
                return errors.Annotate(err, "failed to get log counts")
 
664
        }
 
665
 
 
666
        pruneCounts := make(map[string]int)
 
667
 
 
668
        // Remove old log entries (per model UUID to take advantage
 
669
        // of indexes on the logs collection).
 
670
        for _, modelUUID := range modelUUIDs {
 
671
                removeInfo, err := logsColl.RemoveAll(bson.M{
 
672
                        "e": modelUUID,
 
673
                        "t": bson.M{"$lt": minLogTime.UnixNano()},
 
674
                })
 
675
                if err != nil {
 
676
                        return errors.Annotate(err, "failed to prune logs by time")
 
677
                }
 
678
                pruneCounts[modelUUID] = removeInfo.Removed
 
679
        }
 
680
 
 
681
        // Do further pruning if the logs collection is over the maximum size.
 
682
        for {
 
683
                collMB, err := getCollectionMB(logsColl)
 
684
                if err != nil {
 
685
                        return errors.Annotate(err, "failed to retrieve log counts")
 
686
                }
 
687
                if collMB <= maxLogsMB {
 
688
                        break
 
689
                }
 
690
 
 
691
                modelUUID, count, err := findEnvWithMostLogs(logsColl, modelUUIDs)
 
692
                if err != nil {
 
693
                        return errors.Annotate(err, "log count query failed")
 
694
                }
 
695
                if count < 5000 {
 
696
                        break // Pruning is not worthwhile
 
697
                }
 
698
 
 
699
                // Remove the oldest 1% of log records for the model.
 
700
                toRemove := int(float64(count) * 0.01)
 
701
 
 
702
                // Find the threshold timestammp to start removing from.
 
703
                // NOTE: this assumes that there are no more logs being added
 
704
                // for the time range being pruned (which should be true for
 
705
                // any realistic minimum log collection size).
 
706
                tsQuery := logsColl.Find(bson.M{"e": modelUUID}).Sort("e", "t")
 
707
                tsQuery = tsQuery.Skip(toRemove)
 
708
                tsQuery = tsQuery.Select(bson.M{"t": 1})
 
709
                var doc bson.M
 
710
                err = tsQuery.One(&doc)
 
711
                if err != nil {
 
712
                        return errors.Annotate(err, "log pruning timestamp query failed")
 
713
                }
 
714
                thresholdTs := doc["t"]
 
715
 
 
716
                // Remove old records.
 
717
                removeInfo, err := logsColl.RemoveAll(bson.M{
 
718
                        "e": modelUUID,
 
719
                        "t": bson.M{"$lt": thresholdTs},
 
720
                })
 
721
                if err != nil {
 
722
                        return errors.Annotate(err, "log pruning failed")
 
723
                }
 
724
                pruneCounts[modelUUID] += removeInfo.Removed
 
725
        }
 
726
 
 
727
        for modelUUID, count := range pruneCounts {
 
728
                if count > 0 {
 
729
                        logger.Debugf("pruned %d logs for model %s", count, modelUUID)
 
730
                }
 
731
        }
 
732
        return nil
 
733
}
 
734
 
 
735
// initLogsSession creates a new session suitable for logging updates,
 
736
// returning the session and a logs mgo.Collection connected to that
 
737
// session.
 
738
func initLogsSession(st MongoSessioner) (*mgo.Session, *mgo.Collection) {
 
739
        // To improve throughput, only wait for the logs to be written to
 
740
        // the primary. For some reason, this makes a huge difference even
 
741
        // when the replicaset only has one member (i.e. a single primary).
 
742
        session := st.MongoSession().Copy()
 
743
        session.SetSafe(&mgo.Safe{
 
744
                W: 1,
 
745
        })
 
746
        db := session.DB(logsDB)
 
747
        return session, db.C(logsC).With(session)
 
748
}
 
749
 
 
750
// getCollectionMB returns the size of a MongoDB collection (in
 
751
// bytes), excluding space used by indexes.
 
752
func getCollectionMB(coll *mgo.Collection) (int, error) {
 
753
        var result bson.M
 
754
        err := coll.Database.Run(bson.D{
 
755
                {"collStats", coll.Name},
 
756
                {"scale", humanize.MiByte},
 
757
        }, &result)
 
758
        if err != nil {
 
759
                return 0, errors.Trace(err)
 
760
        }
 
761
        return result["size"].(int), nil
 
762
}
 
763
 
 
764
// getEnvsInLogs returns the unique model UUIDs that exist in
 
765
// the logs collection. This uses the one of the indexes on the
 
766
// collection and should be fast.
 
767
func getEnvsInLogs(coll *mgo.Collection) ([]string, error) {
 
768
        var modelUUIDs []string
 
769
        err := coll.Find(nil).Distinct("e", &modelUUIDs)
 
770
        if err != nil {
 
771
                return nil, errors.Trace(err)
 
772
        }
 
773
        return modelUUIDs, nil
 
774
}
 
775
 
 
776
// findEnvWithMostLogs returns the modelUUID and log count for the
 
777
// model with the most logs in the logs collection.
 
778
func findEnvWithMostLogs(logsColl *mgo.Collection, modelUUIDs []string) (string, int, error) {
 
779
        var maxModelUUID string
 
780
        var maxCount int
 
781
        for _, modelUUID := range modelUUIDs {
 
782
                count, err := getLogCountForEnv(logsColl, modelUUID)
 
783
                if err != nil {
 
784
                        return "", -1, errors.Trace(err)
 
785
                }
 
786
                if count > maxCount {
 
787
                        maxModelUUID = modelUUID
 
788
                        maxCount = count
 
789
                }
 
790
        }
 
791
        return maxModelUUID, maxCount, nil
 
792
}
 
793
 
 
794
// getLogCountForEnv returns the number of log records stored for a
 
795
// given model.
 
796
func getLogCountForEnv(coll *mgo.Collection, modelUUID string) (int, error) {
 
797
        count, err := coll.Find(bson.M{"e": modelUUID}).Count()
 
798
        if err != nil {
 
799
                return -1, errors.Annotate(err, "failed to get log count")
 
800
        }
 
801
        return count, nil
 
802
}