~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/state/metrics.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2014 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package state
 
5
 
 
6
import (
 
7
        "encoding/json"
 
8
        "time"
 
9
 
 
10
        "github.com/juju/errors"
 
11
        "github.com/juju/loggo"
 
12
        "gopkg.in/juju/charm.v6-unstable"
 
13
        "gopkg.in/juju/names.v2"
 
14
        "gopkg.in/mgo.v2"
 
15
        "gopkg.in/mgo.v2/bson"
 
16
        "gopkg.in/mgo.v2/txn"
 
17
)
 
18
 
 
19
var metricsLogger = loggo.GetLogger("juju.state.metrics")
 
20
 
 
21
const (
 
22
        CleanupAge = time.Hour * 24
 
23
)
 
24
 
 
25
// MetricBatch represents a batch of metrics reported from a unit.
 
26
// These will be received from the unit in batches.
 
27
// The main contents of the metric (key, value) is defined
 
28
// by the charm author and sent from the unit via a call to
 
29
// add-metric
 
30
type MetricBatch struct {
 
31
        st  *State
 
32
        doc metricBatchDoc
 
33
}
 
34
 
 
35
type metricBatchDoc struct {
 
36
        UUID        string    `bson:"_id"`
 
37
        ModelUUID   string    `bson:"model-uuid"`
 
38
        Unit        string    `bson:"unit"`
 
39
        CharmUrl    string    `bson:"charmurl"`
 
40
        Sent        bool      `bson:"sent"`
 
41
        DeleteTime  time.Time `bson:"delete-time"`
 
42
        Created     time.Time `bson:"created"`
 
43
        Metrics     []Metric  `bson:"metrics"`
 
44
        Credentials []byte    `bson:"credentials"`
 
45
}
 
46
 
 
47
// Metric represents a single Metric.
 
48
type Metric struct {
 
49
        Key   string    `bson:"key"`
 
50
        Value string    `bson:"value"`
 
51
        Time  time.Time `bson:"time"`
 
52
}
 
53
 
 
54
// validate checks that the MetricBatch contains valid metrics.
 
55
func (m *MetricBatch) validate() error {
 
56
        charmUrl, err := charm.ParseURL(m.doc.CharmUrl)
 
57
        if err != nil {
 
58
                return errors.Trace(err)
 
59
        }
 
60
        chrm, err := m.st.Charm(charmUrl)
 
61
        if err != nil {
 
62
                return errors.Trace(err)
 
63
        }
 
64
        chrmMetrics := chrm.Metrics()
 
65
        if chrmMetrics == nil {
 
66
                return errors.Errorf("charm doesn't implement metrics")
 
67
        }
 
68
        for _, m := range m.doc.Metrics {
 
69
                if err := chrmMetrics.ValidateMetric(m.Key, m.Value); err != nil {
 
70
                        return errors.Trace(err)
 
71
                }
 
72
        }
 
73
        return nil
 
74
}
 
75
 
 
76
// BatchParam contains the properties of the metrics batch used when creating a metrics
 
77
// batch.
 
78
type BatchParam struct {
 
79
        UUID     string
 
80
        CharmURL string
 
81
        Created  time.Time
 
82
        Metrics  []Metric
 
83
        Unit     names.UnitTag
 
84
}
 
85
 
 
86
// AddMetrics adds a new batch of metrics to the database.
 
87
func (st *State) AddMetrics(batch BatchParam) (*MetricBatch, error) {
 
88
        if len(batch.Metrics) == 0 {
 
89
                return nil, errors.New("cannot add a batch of 0 metrics")
 
90
        }
 
91
        charmURL, err := charm.ParseURL(batch.CharmURL)
 
92
        if err != nil {
 
93
                return nil, errors.NewNotValid(err, "could not parse charm URL")
 
94
        }
 
95
 
 
96
        unit, err := st.Unit(batch.Unit.Id())
 
97
        if err != nil {
 
98
                return nil, errors.Trace(err)
 
99
        }
 
100
        application, err := unit.Application()
 
101
        if err != nil {
 
102
                return nil, errors.Trace(err)
 
103
        }
 
104
 
 
105
        metric := &MetricBatch{
 
106
                st: st,
 
107
                doc: metricBatchDoc{
 
108
                        UUID:        batch.UUID,
 
109
                        ModelUUID:   st.ModelUUID(),
 
110
                        Unit:        batch.Unit.Id(),
 
111
                        CharmUrl:    charmURL.String(),
 
112
                        Sent:        false,
 
113
                        Created:     batch.Created,
 
114
                        Metrics:     batch.Metrics,
 
115
                        Credentials: application.MetricCredentials(),
 
116
                },
 
117
        }
 
118
        if err := metric.validate(); err != nil {
 
119
                return nil, err
 
120
        }
 
121
        buildTxn := func(attempt int) ([]txn.Op, error) {
 
122
                if attempt > 0 {
 
123
                        notDead, err := isNotDead(st, unitsC, batch.Unit.Id())
 
124
                        if err != nil || !notDead {
 
125
                                return nil, errors.NotFoundf(batch.Unit.Id())
 
126
                        }
 
127
                        exists, err := st.MetricBatch(batch.UUID)
 
128
                        if exists != nil && err == nil {
 
129
                                return nil, errors.AlreadyExistsf("metrics batch UUID %q", batch.UUID)
 
130
                        }
 
131
                        if !errors.IsNotFound(err) {
 
132
                                return nil, errors.Trace(err)
 
133
                        }
 
134
                }
 
135
                ops := []txn.Op{{
 
136
                        C:      unitsC,
 
137
                        Id:     st.docID(batch.Unit.Id()),
 
138
                        Assert: notDeadDoc,
 
139
                }, {
 
140
                        C:      metricsC,
 
141
                        Id:     metric.UUID(),
 
142
                        Assert: txn.DocMissing,
 
143
                        Insert: &metric.doc,
 
144
                }}
 
145
                return ops, nil
 
146
        }
 
147
        err = st.run(buildTxn)
 
148
        if err != nil {
 
149
                return nil, errors.Trace(err)
 
150
        }
 
151
 
 
152
        return metric, nil
 
153
}
 
154
 
 
155
// AllMetricBatches returns all metric batches currently stored in state.
 
156
// TODO (tasdomas): this method is currently only used in the uniter worker test -
 
157
//                  it needs to be modified to restrict the scope of the values it
 
158
//                  returns if it is to be used outside of tests.
 
159
func (st *State) AllMetricBatches() ([]MetricBatch, error) {
 
160
        c, closer := st.getCollection(metricsC)
 
161
        defer closer()
 
162
        docs := []metricBatchDoc{}
 
163
        err := c.Find(nil).All(&docs)
 
164
        if err != nil {
 
165
                return nil, errors.Trace(err)
 
166
        }
 
167
        results := make([]MetricBatch, len(docs))
 
168
        for i, doc := range docs {
 
169
                results[i] = MetricBatch{st: st, doc: doc}
 
170
        }
 
171
        return results, nil
 
172
}
 
173
 
 
174
func (st *State) queryLocalMetricBatches(query bson.M) ([]MetricBatch, error) {
 
175
        c, closer := st.getCollection(metricsC)
 
176
        defer closer()
 
177
        docs := []metricBatchDoc{}
 
178
        if query == nil {
 
179
                query = bson.M{}
 
180
        }
 
181
        query["charmurl"] = bson.M{"$regex": "^local:"}
 
182
        err := c.Find(query).All(&docs)
 
183
        if err != nil {
 
184
                return nil, errors.Trace(err)
 
185
        }
 
186
        results := make([]MetricBatch, len(docs))
 
187
        for i, doc := range docs {
 
188
                results[i] = MetricBatch{st: st, doc: doc}
 
189
        }
 
190
        return results, nil
 
191
}
 
192
 
 
193
// MetricBatchesUnit returns metric batches for the given unit.
 
194
func (st *State) MetricBatchesForUnit(unit string) ([]MetricBatch, error) {
 
195
        return st.queryLocalMetricBatches(bson.M{"unit": unit})
 
196
}
 
197
 
 
198
// MetricBatchesUnit returns metric batches for the given application.
 
199
func (st *State) MetricBatchesForService(application string) ([]MetricBatch, error) {
 
200
        svc, err := st.Application(application)
 
201
        if err != nil {
 
202
                return nil, errors.Trace(err)
 
203
        }
 
204
        units, err := svc.AllUnits()
 
205
        if err != nil {
 
206
                return nil, errors.Trace(err)
 
207
        }
 
208
        unitNames := make([]bson.M, len(units))
 
209
        for i, u := range units {
 
210
                unitNames[i] = bson.M{"unit": u.Name()}
 
211
        }
 
212
        return st.queryLocalMetricBatches(bson.M{"$or": unitNames})
 
213
}
 
214
 
 
215
// MetricBatch returns the metric batch with the given id.
 
216
func (st *State) MetricBatch(id string) (*MetricBatch, error) {
 
217
        c, closer := st.getCollection(metricsC)
 
218
        defer closer()
 
219
        doc := metricBatchDoc{}
 
220
        err := c.Find(bson.M{"_id": id}).One(&doc)
 
221
        if err == mgo.ErrNotFound {
 
222
                return nil, errors.NotFoundf("metric %v", id)
 
223
        }
 
224
        if err != nil {
 
225
                return nil, err
 
226
        }
 
227
        return &MetricBatch{st: st, doc: doc}, nil
 
228
}
 
229
 
 
230
// CleanupOldMetrics looks for metrics that are 24 hours old (or older)
 
231
// and have been sent. Any metrics it finds are deleted.
 
232
func (st *State) CleanupOldMetrics() error {
 
233
        // TODO(fwereade): 2016-03-17 lp:1558657
 
234
        now := time.Now()
 
235
        metrics, closer := st.getCollection(metricsC)
 
236
        defer closer()
 
237
        // Nothing else in the system will interact with sent metrics, and nothing needs
 
238
        // to watch them either; so in this instance it's safe to do an end run around the
 
239
        // mgo/txn package. See State.cleanupRelationSettings for a similar situation.
 
240
        metricsW := metrics.Writeable()
 
241
        // TODO (mattyw) iter over this.
 
242
        info, err := metricsW.RemoveAll(bson.M{
 
243
                "sent":        true,
 
244
                "delete-time": bson.M{"$lte": now},
 
245
        })
 
246
        if err == nil {
 
247
                metricsLogger.Tracef("cleanup removed %d metrics", info.Removed)
 
248
        }
 
249
        return errors.Trace(err)
 
250
}
 
251
 
 
252
// MetricsToSend returns batchSize metrics that need to be sent
 
253
// to the collector
 
254
func (st *State) MetricsToSend(batchSize int) ([]*MetricBatch, error) {
 
255
        var docs []metricBatchDoc
 
256
        c, closer := st.getCollection(metricsC)
 
257
        defer closer()
 
258
        err := c.Find(bson.M{
 
259
                "sent": false,
 
260
        }).Limit(batchSize).All(&docs)
 
261
        if err != nil {
 
262
                return nil, errors.Trace(err)
 
263
        }
 
264
 
 
265
        batch := make([]*MetricBatch, len(docs))
 
266
        for i, doc := range docs {
 
267
                batch[i] = &MetricBatch{st: st, doc: doc}
 
268
 
 
269
        }
 
270
 
 
271
        return batch, nil
 
272
}
 
273
 
 
274
// CountOfUnsentMetrics returns the number of metrics that
 
275
// haven't been sent to the collection service.
 
276
func (st *State) CountOfUnsentMetrics() (int, error) {
 
277
        c, closer := st.getCollection(metricsC)
 
278
        defer closer()
 
279
        return c.Find(bson.M{
 
280
                "sent": false,
 
281
        }).Count()
 
282
}
 
283
 
 
284
// CountOfSentMetrics returns the number of metrics that
 
285
// have been sent to the collection service and have not
 
286
// been removed by the cleanup worker.
 
287
func (st *State) CountOfSentMetrics() (int, error) {
 
288
        c, closer := st.getCollection(metricsC)
 
289
        defer closer()
 
290
        return c.Find(bson.M{
 
291
                "sent": true,
 
292
        }).Count()
 
293
}
 
294
 
 
295
// MarshalJSON defines how the MetricBatch type should be
 
296
// converted to json.
 
297
func (m *MetricBatch) MarshalJSON() ([]byte, error) {
 
298
        return json.Marshal(m.doc)
 
299
}
 
300
 
 
301
// UUID returns to uuid of the metric.
 
302
func (m *MetricBatch) UUID() string {
 
303
        return m.doc.UUID
 
304
}
 
305
 
 
306
// ModelUUID returns the model UUID this metric applies to.
 
307
func (m *MetricBatch) ModelUUID() string {
 
308
        return m.doc.ModelUUID
 
309
}
 
310
 
 
311
// Unit returns the name of the unit this metric was generated in.
 
312
func (m *MetricBatch) Unit() string {
 
313
        return m.doc.Unit
 
314
}
 
315
 
 
316
// CharmURL returns the charm url for the charm this metric was generated in.
 
317
func (m *MetricBatch) CharmURL() string {
 
318
        return m.doc.CharmUrl
 
319
}
 
320
 
 
321
// Created returns the time this metric batch was created.
 
322
func (m *MetricBatch) Created() time.Time {
 
323
        return m.doc.Created
 
324
}
 
325
 
 
326
// Sent returns a flag to tell us if this metric has been sent to the metric
 
327
// collection service
 
328
func (m *MetricBatch) Sent() bool {
 
329
        return m.doc.Sent
 
330
}
 
331
 
 
332
// Metrics returns the metrics in this batch.
 
333
func (m *MetricBatch) Metrics() []Metric {
 
334
        result := make([]Metric, len(m.doc.Metrics))
 
335
        copy(result, m.doc.Metrics)
 
336
        return result
 
337
}
 
338
 
 
339
// SetSent marks the metric has having been sent at
 
340
// the specified time.
 
341
func (m *MetricBatch) SetSent(t time.Time) error {
 
342
        deleteTime := t.UTC().Add(CleanupAge)
 
343
        ops := setSentOps([]string{m.UUID()}, deleteTime)
 
344
        if err := m.st.runTransaction(ops); err != nil {
 
345
                return errors.Annotatef(err, "cannot set metric sent for metric %q", m.UUID())
 
346
        }
 
347
 
 
348
        m.doc.Sent = true
 
349
        m.doc.DeleteTime = deleteTime
 
350
        return nil
 
351
}
 
352
 
 
353
// Credentials returns any credentials associated with the metric batch.
 
354
func (m *MetricBatch) Credentials() []byte {
 
355
        return m.doc.Credentials
 
356
}
 
357
 
 
358
func setSentOps(batchUUIDs []string, deleteTime time.Time) []txn.Op {
 
359
        ops := make([]txn.Op, len(batchUUIDs))
 
360
        for i, u := range batchUUIDs {
 
361
                ops[i] = txn.Op{
 
362
                        C:      metricsC,
 
363
                        Id:     u,
 
364
                        Assert: txn.DocExists,
 
365
                        Update: bson.M{"$set": bson.M{"sent": true, "delete-time": deleteTime}},
 
366
                }
 
367
        }
 
368
        return ops
 
369
}
 
370
 
 
371
// SetMetricBatchesSent sets sent on each MetricBatch corresponding to the uuids provided.
 
372
func (st *State) SetMetricBatchesSent(batchUUIDs []string) error {
 
373
        // TODO(fwereade): 2016-03-17 lp:1558657
 
374
        deleteTime := time.Now().UTC().Add(CleanupAge)
 
375
        ops := setSentOps(batchUUIDs, deleteTime)
 
376
        if err := st.runTransaction(ops); err != nil {
 
377
                return errors.Annotatef(err, "cannot set metric sent in bulk call")
 
378
        }
 
379
        return nil
 
380
}