~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/apiserver/metricsender/metricsender.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2014 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
// Package metricsender contains functions for sending
 
5
// metrics from a controller to a remote metric collector.
 
6
package metricsender
 
7
 
 
8
import (
 
9
        "time"
 
10
 
 
11
        "github.com/juju/errors"
 
12
        "github.com/juju/loggo"
 
13
        wireformat "github.com/juju/romulus/wireformat/metrics"
 
14
 
 
15
        "github.com/juju/juju/state"
 
16
)
 
17
 
 
18
var logger = loggo.GetLogger("juju.apiserver.metricsender")
 
19
 
 
20
// MetricSender defines the interface used to send metrics
 
21
// to a collection service.
 
22
type MetricSender interface {
 
23
        Send([]*wireformat.MetricBatch) (*wireformat.Response, error)
 
24
}
 
25
 
 
26
var (
 
27
        defaultMaxBatchesPerSend              = 10
 
28
        defaultSender            MetricSender = &HttpSender{}
 
29
)
 
30
 
 
31
func handleResponse(mm *state.MetricsManager, st MetricsSenderBackend, response wireformat.Response) {
 
32
        for _, envResp := range response.EnvResponses {
 
33
                err := st.SetMetricBatchesSent(envResp.AcknowledgedBatches)
 
34
                if err != nil {
 
35
                        logger.Errorf("failed to set sent on metrics %v", err)
 
36
                }
 
37
                for unitName, status := range envResp.UnitStatuses {
 
38
                        unit, err := st.Unit(unitName)
 
39
                        if err != nil {
 
40
                                logger.Errorf("failed to retrieve unit %q: %v", unitName, err)
 
41
                                continue
 
42
                        }
 
43
                        err = unit.SetMeterStatus(status.Status, status.Info)
 
44
                        if err != nil {
 
45
                                logger.Errorf("failed to set unit %q meter status to %v: %v", unitName, status, err)
 
46
                        }
 
47
                }
 
48
        }
 
49
        if response.NewGracePeriod > 0 {
 
50
                err := mm.SetGracePeriod(response.NewGracePeriod)
 
51
                if err != nil {
 
52
                        logger.Errorf("failed to set new grace period %v", err)
 
53
                }
 
54
        }
 
55
}
 
56
 
 
57
// SendMetrics will send any unsent metrics
 
58
// over the MetricSender interface in batches
 
59
// no larger than batchSize.
 
60
func SendMetrics(st MetricsSenderBackend, sender MetricSender, batchSize int) error {
 
61
        metricsManager, err := st.MetricsManager()
 
62
        if err != nil {
 
63
                return errors.Trace(err)
 
64
        }
 
65
        sent := 0
 
66
        for {
 
67
                metrics, err := st.MetricsToSend(batchSize)
 
68
                if err != nil {
 
69
                        return errors.Trace(err)
 
70
                }
 
71
                lenM := len(metrics)
 
72
                if lenM == 0 {
 
73
                        if sent == 0 {
 
74
                                logger.Infof("nothing to send")
 
75
                        } else {
 
76
                                logger.Infof("done sending")
 
77
                        }
 
78
                        break
 
79
                }
 
80
                wireData := make([]*wireformat.MetricBatch, lenM)
 
81
                for i, m := range metrics {
 
82
                        wireData[i] = ToWire(m)
 
83
                }
 
84
                response, err := sender.Send(wireData)
 
85
                if err != nil {
 
86
                        logger.Errorf("%+v", err)
 
87
                        if incErr := metricsManager.IncrementConsecutiveErrors(); incErr != nil {
 
88
                                logger.Errorf("failed to increment error count %v", incErr)
 
89
                                return errors.Trace(errors.Wrap(err, incErr))
 
90
                        }
 
91
                        return errors.Trace(err)
 
92
                }
 
93
                if response != nil {
 
94
                        // TODO (mattyw) We are currently ignoring errors during response handling.
 
95
                        handleResponse(metricsManager, st, *response)
 
96
                        // TODO(fwereade): 2016-03-17 lp:1558657
 
97
                        if err := metricsManager.SetLastSuccessfulSend(time.Now()); err != nil {
 
98
                                err = errors.Annotate(err, "failed to set successful send time")
 
99
                                logger.Warningf("%v", err)
 
100
                                return errors.Trace(err)
 
101
                        }
 
102
                }
 
103
                sent += lenM
 
104
        }
 
105
 
 
106
        unsent, err := st.CountOfUnsentMetrics()
 
107
        if err != nil {
 
108
                return errors.Trace(err)
 
109
        }
 
110
        sentStored, err := st.CountOfSentMetrics()
 
111
        if err != nil {
 
112
                return errors.Trace(err)
 
113
        }
 
114
        logger.Infof("metrics collection summary: sent:%d unsent:%d (%d sent metrics stored)", sent, unsent, sentStored)
 
115
 
 
116
        return nil
 
117
}
 
118
 
 
119
// DefaultMaxBatchesPerSend returns the default number of batches per send.
 
120
func DefaultMaxBatchesPerSend() int {
 
121
        return defaultMaxBatchesPerSend
 
122
}
 
123
 
 
124
// DefaultMetricSender returns the default metric sender.
 
125
func DefaultMetricSender() MetricSender {
 
126
        return defaultSender
 
127
}
 
128
 
 
129
// ToWire converts the state.MetricBatch into a type
 
130
// that can be sent over the wire to the collector.
 
131
func ToWire(mb *state.MetricBatch) *wireformat.MetricBatch {
 
132
        metrics := make([]wireformat.Metric, len(mb.Metrics()))
 
133
        for i, m := range mb.Metrics() {
 
134
                metrics[i] = wireformat.Metric{
 
135
                        Key:   m.Key,
 
136
                        Value: m.Value,
 
137
                        Time:  m.Time.UTC(),
 
138
                }
 
139
        }
 
140
        return &wireformat.MetricBatch{
 
141
                UUID:        mb.UUID(),
 
142
                ModelUUID:   mb.ModelUUID(),
 
143
                UnitName:    mb.Unit(),
 
144
                CharmUrl:    mb.CharmURL(),
 
145
                Created:     mb.Created().UTC(),
 
146
                Metrics:     metrics,
 
147
                Credentials: mb.Credentials(),
 
148
        }
 
149
}