1
// Copyright 2014 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
10
"github.com/juju/errors"
11
"github.com/juju/loggo"
12
"gopkg.in/juju/charm.v6-unstable"
13
"gopkg.in/juju/names.v2"
15
"gopkg.in/mgo.v2/bson"
19
var metricsLogger = loggo.GetLogger("juju.state.metrics")
22
CleanupAge = time.Hour * 24
25
// MetricBatch represents a batch of metrics reported from a unit.
26
// These will be received from the unit in batches.
27
// The main contents of the metric (key, value) is defined
28
// by the charm author and sent from the unit via a call to
30
type MetricBatch struct {
35
type metricBatchDoc struct {
36
UUID string `bson:"_id"`
37
ModelUUID string `bson:"model-uuid"`
38
Unit string `bson:"unit"`
39
CharmUrl string `bson:"charmurl"`
40
Sent bool `bson:"sent"`
41
DeleteTime time.Time `bson:"delete-time"`
42
Created time.Time `bson:"created"`
43
Metrics []Metric `bson:"metrics"`
44
Credentials []byte `bson:"credentials"`
47
// Metric represents a single Metric.
49
Key string `bson:"key"`
50
Value string `bson:"value"`
51
Time time.Time `bson:"time"`
54
// validate checks that the MetricBatch contains valid metrics.
55
func (m *MetricBatch) validate() error {
56
charmUrl, err := charm.ParseURL(m.doc.CharmUrl)
58
return errors.Trace(err)
60
chrm, err := m.st.Charm(charmUrl)
62
return errors.Trace(err)
64
chrmMetrics := chrm.Metrics()
65
if chrmMetrics == nil {
66
return errors.Errorf("charm doesn't implement metrics")
68
for _, m := range m.doc.Metrics {
69
if err := chrmMetrics.ValidateMetric(m.Key, m.Value); err != nil {
70
return errors.Trace(err)
76
// BatchParam contains the properties of the metrics batch used when creating a metrics
78
type BatchParam struct {
86
// AddMetrics adds a new batch of metrics to the database.
87
func (st *State) AddMetrics(batch BatchParam) (*MetricBatch, error) {
88
if len(batch.Metrics) == 0 {
89
return nil, errors.New("cannot add a batch of 0 metrics")
91
charmURL, err := charm.ParseURL(batch.CharmURL)
93
return nil, errors.NewNotValid(err, "could not parse charm URL")
96
unit, err := st.Unit(batch.Unit.Id())
98
return nil, errors.Trace(err)
100
application, err := unit.Application()
102
return nil, errors.Trace(err)
105
metric := &MetricBatch{
109
ModelUUID: st.ModelUUID(),
110
Unit: batch.Unit.Id(),
111
CharmUrl: charmURL.String(),
113
Created: batch.Created,
114
Metrics: batch.Metrics,
115
Credentials: application.MetricCredentials(),
118
if err := metric.validate(); err != nil {
121
buildTxn := func(attempt int) ([]txn.Op, error) {
123
notDead, err := isNotDead(st, unitsC, batch.Unit.Id())
124
if err != nil || !notDead {
125
return nil, errors.NotFoundf(batch.Unit.Id())
127
exists, err := st.MetricBatch(batch.UUID)
128
if exists != nil && err == nil {
129
return nil, errors.AlreadyExistsf("metrics batch UUID %q", batch.UUID)
131
if !errors.IsNotFound(err) {
132
return nil, errors.Trace(err)
137
Id: st.docID(batch.Unit.Id()),
142
Assert: txn.DocMissing,
147
err = st.run(buildTxn)
149
return nil, errors.Trace(err)
155
// AllMetricBatches returns all metric batches currently stored in state.
156
// TODO (tasdomas): this method is currently only used in the uniter worker test -
157
// it needs to be modified to restrict the scope of the values it
158
// returns if it is to be used outside of tests.
159
func (st *State) AllMetricBatches() ([]MetricBatch, error) {
160
c, closer := st.getCollection(metricsC)
162
docs := []metricBatchDoc{}
163
err := c.Find(nil).All(&docs)
165
return nil, errors.Trace(err)
167
results := make([]MetricBatch, len(docs))
168
for i, doc := range docs {
169
results[i] = MetricBatch{st: st, doc: doc}
174
func (st *State) queryLocalMetricBatches(query bson.M) ([]MetricBatch, error) {
175
c, closer := st.getCollection(metricsC)
177
docs := []metricBatchDoc{}
181
query["charmurl"] = bson.M{"$regex": "^local:"}
182
err := c.Find(query).All(&docs)
184
return nil, errors.Trace(err)
186
results := make([]MetricBatch, len(docs))
187
for i, doc := range docs {
188
results[i] = MetricBatch{st: st, doc: doc}
193
// MetricBatchesUnit returns metric batches for the given unit.
194
func (st *State) MetricBatchesForUnit(unit string) ([]MetricBatch, error) {
195
return st.queryLocalMetricBatches(bson.M{"unit": unit})
198
// MetricBatchesUnit returns metric batches for the given application.
199
func (st *State) MetricBatchesForService(application string) ([]MetricBatch, error) {
200
svc, err := st.Application(application)
202
return nil, errors.Trace(err)
204
units, err := svc.AllUnits()
206
return nil, errors.Trace(err)
208
unitNames := make([]bson.M, len(units))
209
for i, u := range units {
210
unitNames[i] = bson.M{"unit": u.Name()}
212
return st.queryLocalMetricBatches(bson.M{"$or": unitNames})
215
// MetricBatch returns the metric batch with the given id.
216
func (st *State) MetricBatch(id string) (*MetricBatch, error) {
217
c, closer := st.getCollection(metricsC)
219
doc := metricBatchDoc{}
220
err := c.Find(bson.M{"_id": id}).One(&doc)
221
if err == mgo.ErrNotFound {
222
return nil, errors.NotFoundf("metric %v", id)
227
return &MetricBatch{st: st, doc: doc}, nil
230
// CleanupOldMetrics looks for metrics that are 24 hours old (or older)
231
// and have been sent. Any metrics it finds are deleted.
232
func (st *State) CleanupOldMetrics() error {
233
// TODO(fwereade): 2016-03-17 lp:1558657
235
metrics, closer := st.getCollection(metricsC)
237
// Nothing else in the system will interact with sent metrics, and nothing needs
238
// to watch them either; so in this instance it's safe to do an end run around the
239
// mgo/txn package. See State.cleanupRelationSettings for a similar situation.
240
metricsW := metrics.Writeable()
241
// TODO (mattyw) iter over this.
242
info, err := metricsW.RemoveAll(bson.M{
244
"delete-time": bson.M{"$lte": now},
247
metricsLogger.Tracef("cleanup removed %d metrics", info.Removed)
249
return errors.Trace(err)
252
// MetricsToSend returns batchSize metrics that need to be sent
254
func (st *State) MetricsToSend(batchSize int) ([]*MetricBatch, error) {
255
var docs []metricBatchDoc
256
c, closer := st.getCollection(metricsC)
258
err := c.Find(bson.M{
260
}).Limit(batchSize).All(&docs)
262
return nil, errors.Trace(err)
265
batch := make([]*MetricBatch, len(docs))
266
for i, doc := range docs {
267
batch[i] = &MetricBatch{st: st, doc: doc}
274
// CountOfUnsentMetrics returns the number of metrics that
275
// haven't been sent to the collection service.
276
func (st *State) CountOfUnsentMetrics() (int, error) {
277
c, closer := st.getCollection(metricsC)
279
return c.Find(bson.M{
284
// CountOfSentMetrics returns the number of metrics that
285
// have been sent to the collection service and have not
286
// been removed by the cleanup worker.
287
func (st *State) CountOfSentMetrics() (int, error) {
288
c, closer := st.getCollection(metricsC)
290
return c.Find(bson.M{
295
// MarshalJSON defines how the MetricBatch type should be
296
// converted to json.
297
func (m *MetricBatch) MarshalJSON() ([]byte, error) {
298
return json.Marshal(m.doc)
301
// UUID returns to uuid of the metric.
302
func (m *MetricBatch) UUID() string {
306
// ModelUUID returns the model UUID this metric applies to.
307
func (m *MetricBatch) ModelUUID() string {
308
return m.doc.ModelUUID
311
// Unit returns the name of the unit this metric was generated in.
312
func (m *MetricBatch) Unit() string {
316
// CharmURL returns the charm url for the charm this metric was generated in.
317
func (m *MetricBatch) CharmURL() string {
318
return m.doc.CharmUrl
321
// Created returns the time this metric batch was created.
322
func (m *MetricBatch) Created() time.Time {
326
// Sent returns a flag to tell us if this metric has been sent to the metric
327
// collection service
328
func (m *MetricBatch) Sent() bool {
332
// Metrics returns the metrics in this batch.
333
func (m *MetricBatch) Metrics() []Metric {
334
result := make([]Metric, len(m.doc.Metrics))
335
copy(result, m.doc.Metrics)
339
// SetSent marks the metric has having been sent at
340
// the specified time.
341
func (m *MetricBatch) SetSent(t time.Time) error {
342
deleteTime := t.UTC().Add(CleanupAge)
343
ops := setSentOps([]string{m.UUID()}, deleteTime)
344
if err := m.st.runTransaction(ops); err != nil {
345
return errors.Annotatef(err, "cannot set metric sent for metric %q", m.UUID())
349
m.doc.DeleteTime = deleteTime
353
// Credentials returns any credentials associated with the metric batch.
354
func (m *MetricBatch) Credentials() []byte {
355
return m.doc.Credentials
358
func setSentOps(batchUUIDs []string, deleteTime time.Time) []txn.Op {
359
ops := make([]txn.Op, len(batchUUIDs))
360
for i, u := range batchUUIDs {
364
Assert: txn.DocExists,
365
Update: bson.M{"$set": bson.M{"sent": true, "delete-time": deleteTime}},
371
// SetMetricBatchesSent sets sent on each MetricBatch corresponding to the uuids provided.
372
func (st *State) SetMetricBatchesSent(batchUUIDs []string) error {
373
// TODO(fwereade): 2016-03-17 lp:1558657
374
deleteTime := time.Now().UTC().Add(CleanupAge)
375
ops := setSentOps(batchUUIDs, deleteTime)
376
if err := st.runTransaction(ops); err != nil {
377
return errors.Annotatef(err, "cannot set metric sent in bulk call")