~nskaggs/+junk/xenial-test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright 2014 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

// Package metricsender contains functions for sending
// metrics from a controller to a remote metric collector.
package metricsender

import (
	"time"

	"github.com/juju/errors"
	"github.com/juju/loggo"
	wireformat "github.com/juju/romulus/wireformat/metrics"

	"github.com/juju/juju/state"
)

var logger = loggo.GetLogger("juju.apiserver.metricsender")

// MetricSender defines the interface used to send metrics
// to a collection service.
type MetricSender interface {
	Send([]*wireformat.MetricBatch) (*wireformat.Response, error)
}

var (
	defaultMaxBatchesPerSend              = 10
	defaultSender            MetricSender = &HttpSender{}
)

func handleResponse(mm *state.MetricsManager, st MetricsSenderBackend, response wireformat.Response) {
	for _, envResp := range response.EnvResponses {
		err := st.SetMetricBatchesSent(envResp.AcknowledgedBatches)
		if err != nil {
			logger.Errorf("failed to set sent on metrics %v", err)
		}
		for unitName, status := range envResp.UnitStatuses {
			unit, err := st.Unit(unitName)
			if err != nil {
				logger.Errorf("failed to retrieve unit %q: %v", unitName, err)
				continue
			}
			err = unit.SetMeterStatus(status.Status, status.Info)
			if err != nil {
				logger.Errorf("failed to set unit %q meter status to %v: %v", unitName, status, err)
			}
		}
	}
	if response.NewGracePeriod > 0 {
		err := mm.SetGracePeriod(response.NewGracePeriod)
		if err != nil {
			logger.Errorf("failed to set new grace period %v", err)
		}
	}
}

// SendMetrics will send any unsent metrics
// over the MetricSender interface in batches
// no larger than batchSize.
func SendMetrics(st MetricsSenderBackend, sender MetricSender, batchSize int) error {
	metricsManager, err := st.MetricsManager()
	if err != nil {
		return errors.Trace(err)
	}
	sent := 0
	for {
		metrics, err := st.MetricsToSend(batchSize)
		if err != nil {
			return errors.Trace(err)
		}
		lenM := len(metrics)
		if lenM == 0 {
			if sent == 0 {
				logger.Infof("nothing to send")
			} else {
				logger.Infof("done sending")
			}
			break
		}
		wireData := make([]*wireformat.MetricBatch, lenM)
		for i, m := range metrics {
			wireData[i] = ToWire(m)
		}
		response, err := sender.Send(wireData)
		if err != nil {
			logger.Errorf("%+v", err)
			if incErr := metricsManager.IncrementConsecutiveErrors(); incErr != nil {
				logger.Errorf("failed to increment error count %v", incErr)
				return errors.Trace(errors.Wrap(err, incErr))
			}
			return errors.Trace(err)
		}
		if response != nil {
			// TODO (mattyw) We are currently ignoring errors during response handling.
			handleResponse(metricsManager, st, *response)
			// TODO(fwereade): 2016-03-17 lp:1558657
			if err := metricsManager.SetLastSuccessfulSend(time.Now()); err != nil {
				err = errors.Annotate(err, "failed to set successful send time")
				logger.Warningf("%v", err)
				return errors.Trace(err)
			}
		}
		sent += lenM
	}

	unsent, err := st.CountOfUnsentMetrics()
	if err != nil {
		return errors.Trace(err)
	}
	sentStored, err := st.CountOfSentMetrics()
	if err != nil {
		return errors.Trace(err)
	}
	logger.Infof("metrics collection summary: sent:%d unsent:%d (%d sent metrics stored)", sent, unsent, sentStored)

	return nil
}

// DefaultMaxBatchesPerSend returns the default number of batches per send.
func DefaultMaxBatchesPerSend() int {
	return defaultMaxBatchesPerSend
}

// DefaultMetricSender returns the default metric sender.
func DefaultMetricSender() MetricSender {
	return defaultSender
}

// ToWire converts the state.MetricBatch into a type
// that can be sent over the wire to the collector.
func ToWire(mb *state.MetricBatch) *wireformat.MetricBatch {
	metrics := make([]wireformat.Metric, len(mb.Metrics()))
	for i, m := range mb.Metrics() {
		metrics[i] = wireformat.Metric{
			Key:   m.Key,
			Value: m.Value,
			Time:  m.Time.UTC(),
		}
	}
	return &wireformat.MetricBatch{
		UUID:        mb.UUID(),
		ModelUUID:   mb.ModelUUID(),
		UnitName:    mb.Unit(),
		CharmUrl:    mb.CharmURL(),
		Created:     mb.Created().UTC(),
		Metrics:     metrics,
		Credentials: mb.Credentials(),
	}
}