1
// Copyright 2014 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
4
// Package metricsender contains functions for sending
5
// metrics from a controller to a remote metric collector.
11
"github.com/juju/errors"
12
"github.com/juju/loggo"
13
wireformat "github.com/juju/romulus/wireformat/metrics"
15
"github.com/juju/juju/state"
18
var logger = loggo.GetLogger("juju.apiserver.metricsender")
20
// MetricSender defines the interface used to send metrics
21
// to a collection service.
22
type MetricSender interface {
23
Send([]*wireformat.MetricBatch) (*wireformat.Response, error)
27
defaultMaxBatchesPerSend = 10
28
defaultSender MetricSender = &HttpSender{}
31
func handleResponse(mm *state.MetricsManager, st MetricsSenderBackend, response wireformat.Response) {
32
for _, envResp := range response.EnvResponses {
33
err := st.SetMetricBatchesSent(envResp.AcknowledgedBatches)
35
logger.Errorf("failed to set sent on metrics %v", err)
37
for unitName, status := range envResp.UnitStatuses {
38
unit, err := st.Unit(unitName)
40
logger.Errorf("failed to retrieve unit %q: %v", unitName, err)
43
err = unit.SetMeterStatus(status.Status, status.Info)
45
logger.Errorf("failed to set unit %q meter status to %v: %v", unitName, status, err)
49
if response.NewGracePeriod > 0 {
50
err := mm.SetGracePeriod(response.NewGracePeriod)
52
logger.Errorf("failed to set new grace period %v", err)
57
// SendMetrics will send any unsent metrics
58
// over the MetricSender interface in batches
59
// no larger than batchSize.
60
func SendMetrics(st MetricsSenderBackend, sender MetricSender, batchSize int) error {
61
metricsManager, err := st.MetricsManager()
63
return errors.Trace(err)
67
metrics, err := st.MetricsToSend(batchSize)
69
return errors.Trace(err)
74
logger.Infof("nothing to send")
76
logger.Infof("done sending")
80
wireData := make([]*wireformat.MetricBatch, lenM)
81
for i, m := range metrics {
82
wireData[i] = ToWire(m)
84
response, err := sender.Send(wireData)
86
logger.Errorf("%+v", err)
87
if incErr := metricsManager.IncrementConsecutiveErrors(); incErr != nil {
88
logger.Errorf("failed to increment error count %v", incErr)
89
return errors.Trace(errors.Wrap(err, incErr))
91
return errors.Trace(err)
94
// TODO (mattyw) We are currently ignoring errors during response handling.
95
handleResponse(metricsManager, st, *response)
96
// TODO(fwereade): 2016-03-17 lp:1558657
97
if err := metricsManager.SetLastSuccessfulSend(time.Now()); err != nil {
98
err = errors.Annotate(err, "failed to set successful send time")
99
logger.Warningf("%v", err)
100
return errors.Trace(err)
106
unsent, err := st.CountOfUnsentMetrics()
108
return errors.Trace(err)
110
sentStored, err := st.CountOfSentMetrics()
112
return errors.Trace(err)
114
logger.Infof("metrics collection summary: sent:%d unsent:%d (%d sent metrics stored)", sent, unsent, sentStored)
119
// DefaultMaxBatchesPerSend returns the default number of batches per send.
120
func DefaultMaxBatchesPerSend() int {
121
return defaultMaxBatchesPerSend
124
// DefaultMetricSender returns the default metric sender.
125
func DefaultMetricSender() MetricSender {
129
// ToWire converts the state.MetricBatch into a type
130
// that can be sent over the wire to the collector.
131
func ToWire(mb *state.MetricBatch) *wireformat.MetricBatch {
132
metrics := make([]wireformat.Metric, len(mb.Metrics()))
133
for i, m := range mb.Metrics() {
134
metrics[i] = wireformat.Metric{
140
return &wireformat.MetricBatch{
142
ModelUUID: mb.ModelUUID(),
144
CharmUrl: mb.CharmURL(),
145
Created: mb.Created().UTC(),
147
Credentials: mb.Credentials(),