~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/logforwarder/tracker.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2016 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package logforwarder
 
5
 
 
6
import (
 
7
        "github.com/juju/errors"
 
8
        "gopkg.in/juju/names.v2"
 
9
 
 
10
        "github.com/juju/juju/api/base"
 
11
        logfwdapi "github.com/juju/juju/api/logfwd"
 
12
        "github.com/juju/juju/logfwd"
 
13
        "github.com/juju/juju/logfwd/syslog"
 
14
)
 
15
 
 
16
// TrackingSinkArgs holds the args to OpenTrackingSender.
 
17
type TrackingSinkArgs struct {
 
18
        // AllModels indicates that the tracker is handling all models.
 
19
        AllModels bool
 
20
 
 
21
        // Config is the logging config that will be used.
 
22
        Config *syslog.RawConfig
 
23
 
 
24
        // Caller is the API caller that will be used.
 
25
        Caller base.APICaller
 
26
 
 
27
        // Name is the name given to the log sink.
 
28
        Name string
 
29
 
 
30
        // OpenSink is the function that opens the underlying log sink that
 
31
        // will be wrapped.
 
32
        OpenSink LogSinkFn
 
33
}
 
34
 
 
35
// OpenTrackingSink opens a log record sender to use with a worker.
 
36
// The sender also tracks records that were successfully sent.
 
37
func OpenTrackingSink(args TrackingSinkArgs) (*LogSink, error) {
 
38
        sink, err := args.OpenSink(args.Config)
 
39
        if err != nil {
 
40
                return nil, errors.Trace(err)
 
41
        }
 
42
 
 
43
        return &LogSink{
 
44
                &trackingSender{
 
45
                        SendCloser: sink,
 
46
                        tracker:    newLastSentTracker(args.Name, args.Caller),
 
47
                },
 
48
        }, nil
 
49
}
 
50
 
 
51
type trackingSender struct {
 
52
        SendCloser
 
53
        tracker   *lastSentTracker
 
54
        allModels bool
 
55
}
 
56
 
 
57
// Send implements Sender.
 
58
func (s *trackingSender) Send(records []logfwd.Record) error {
 
59
        if err := s.SendCloser.Send(records); err != nil {
 
60
                return errors.Trace(err)
 
61
        }
 
62
        if err := s.tracker.setLastSent(s.allModels, records); err != nil {
 
63
                return errors.Trace(err)
 
64
        }
 
65
        return nil
 
66
}
 
67
 
 
68
type lastSentTracker struct {
 
69
        sink   string
 
70
        client *logfwdapi.LastSentClient
 
71
}
 
72
 
 
73
func newLastSentTracker(sink string, caller base.APICaller) *lastSentTracker {
 
74
        client := logfwdapi.NewLastSentClient(func(name string) logfwdapi.FacadeCaller {
 
75
                return base.NewFacadeCaller(caller, name)
 
76
        })
 
77
        return &lastSentTracker{
 
78
                sink:   sink,
 
79
                client: client,
 
80
        }
 
81
}
 
82
 
 
83
func (lst lastSentTracker) setLastSent(allModels bool, records []logfwd.Record) error {
 
84
        // The records are received and sent in order, so we only need to
 
85
        // call SetLastSent for the last record.
 
86
        if len(records) == 0 {
 
87
                return nil
 
88
        }
 
89
        rec := records[len(records)-1]
 
90
        model := rec.Origin.ModelUUID
 
91
        if allModels {
 
92
                model = ""
 
93
        }
 
94
        var modelTag names.ModelTag
 
95
        if model != "" {
 
96
                if !names.IsValidModel(model) {
 
97
                        return errors.Errorf("bad model UUID %q", model)
 
98
                }
 
99
                modelTag = names.NewModelTag(model)
 
100
        }
 
101
        results, err := lst.client.SetLastSent([]logfwdapi.LastSentInfo{{
 
102
                LastSentID: logfwdapi.LastSentID{
 
103
                        Model: modelTag,
 
104
                        Sink:  lst.sink,
 
105
                },
 
106
                RecordID:        rec.ID,
 
107
                RecordTimestamp: rec.Timestamp,
 
108
        }})
 
109
        if err != nil {
 
110
                return errors.Trace(err)
 
111
        }
 
112
        if err := results[0].Error; err != nil {
 
113
                return errors.Trace(err)
 
114
        }
 
115
        return nil
 
116
}