~ubuntu-branches/ubuntu/vivid/ceilometer/vivid

« back to all changes in this revision

Viewing changes to ceilometer/storage/impl_mongodb.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2014-03-06 14:44:28 UTC
  • mto: (28.1.1 utopic-proposed) (1.2.1)
  • mto: This revision was merged to the branch mainline in revision 19.
  • Revision ID: package-import@ubuntu.com-20140306144428-rvphsh4igwyulzf0
Tags: upstream-2014.1~b3
ImportĀ upstreamĀ versionĀ 2014.1~b3

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
#
3
3
# Copyright Ā© 2012 New Dream Network, LLC (DreamHost)
4
4
# Copyright Ā© 2013 eNovance
 
5
# Copyright Ā© 2014 Red Hat, Inc
5
6
#
6
 
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
7
 
#         Julien Danjou <julien@danjou.info>
 
7
# Authors: Doug Hellmann <doug.hellmann@dreamhost.com>
 
8
#          Julien Danjou <julien@danjou.info>
 
9
#          Eoghan Glynn <eglynn@redhat.com>
8
10
#
9
11
# Licensed under the Apache License, Version 2.0 (the "License"); you may
10
12
# not use this file except in compliance with the License. You may obtain
25
27
import json
26
28
import operator
27
29
import uuid
28
 
import weakref
29
30
 
30
31
import bson.code
31
32
import bson.objectid
35
36
 
36
37
from ceilometer.openstack.common.gettextutils import _  # noqa
37
38
from ceilometer.openstack.common import log
 
39
from ceilometer.openstack.common import timeutils
38
40
from ceilometer import storage
39
41
from ceilometer.storage import base
40
42
from ceilometer.storage import models
 
43
from ceilometer.storage import pymongo_base
 
44
from ceilometer import utils
41
45
 
42
46
cfg.CONF.import_opt('time_to_live', 'ceilometer.storage',
43
47
                    group="database")
77
81
        return Connection(conf)
78
82
 
79
83
 
80
 
def make_timestamp_range(start, end,
81
 
                         start_timestamp_op=None, end_timestamp_op=None):
82
 
    """Given two possible datetimes and their operations, create the query
83
 
    document to find timestamps within that range.
84
 
    By default, using $gte for the lower bound and $lt for the
85
 
    upper bound.
86
 
    """
87
 
    ts_range = {}
88
 
 
89
 
    if start:
90
 
        if start_timestamp_op == 'gt':
91
 
            start_timestamp_op = '$gt'
92
 
        else:
93
 
            start_timestamp_op = '$gte'
94
 
        ts_range[start_timestamp_op] = start
95
 
 
96
 
    if end:
97
 
        if end_timestamp_op == 'le':
98
 
            end_timestamp_op = '$lte'
99
 
        else:
100
 
            end_timestamp_op = '$lt'
101
 
        ts_range[end_timestamp_op] = end
102
 
    return ts_range
103
 
 
104
 
 
105
 
def make_query_from_filter(sample_filter, require_meter=True):
106
 
    """Return a query dictionary based on the settings in the filter.
107
 
 
108
 
    :param filter: SampleFilter instance
109
 
    :param require_meter: If true and the filter does not have a meter,
110
 
                          raise an error.
111
 
    """
112
 
    q = {}
113
 
 
114
 
    if sample_filter.user:
115
 
        q['user_id'] = sample_filter.user
116
 
    if sample_filter.project:
117
 
        q['project_id'] = sample_filter.project
118
 
 
119
 
    if sample_filter.meter:
120
 
        q['counter_name'] = sample_filter.meter
121
 
    elif require_meter:
122
 
        raise RuntimeError('Missing required meter specifier')
123
 
 
124
 
    ts_range = make_timestamp_range(sample_filter.start, sample_filter.end,
125
 
                                    sample_filter.start_timestamp_op,
126
 
                                    sample_filter.end_timestamp_op)
127
 
    if ts_range:
128
 
        q['timestamp'] = ts_range
129
 
 
130
 
    if sample_filter.resource:
131
 
        q['resource_id'] = sample_filter.resource
132
 
    if sample_filter.source:
133
 
        q['source'] = sample_filter.source
134
 
    if sample_filter.message_id:
135
 
        q['message_id'] = sample_filter.message_id
136
 
 
137
 
    # so the samples call metadata resource_metadata, so we convert
138
 
    # to that.
139
 
    q.update(dict(('resource_%s' % k, v)
140
 
                  for (k, v) in sample_filter.metaquery.iteritems()))
141
 
    return q
142
 
 
143
 
 
144
 
class ConnectionPool(object):
145
 
 
146
 
    def __init__(self):
147
 
        self._pool = {}
148
 
 
149
 
    def connect(self, url):
150
 
        connection_options = pymongo.uri_parser.parse_uri(url)
151
 
        del connection_options['database']
152
 
        del connection_options['username']
153
 
        del connection_options['password']
154
 
        del connection_options['collection']
155
 
        pool_key = tuple(connection_options)
156
 
 
157
 
        if pool_key in self._pool:
158
 
            client = self._pool.get(pool_key)()
159
 
            if client:
160
 
                return client
161
 
        LOG.info(_('Connecting to MongoDB on %s'),
162
 
                 connection_options['nodelist'])
163
 
        client = pymongo.MongoClient(
164
 
            url,
165
 
            safe=True)
166
 
        self._pool[pool_key] = weakref.ref(client)
167
 
        return client
168
 
 
169
 
 
170
 
class Connection(base.Connection):
 
84
class Connection(pymongo_base.Connection):
171
85
    """MongoDB connection.
172
86
    """
173
87
 
174
 
    CONNECTION_POOL = ConnectionPool()
 
88
    CONNECTION_POOL = pymongo_base.ConnectionPool()
175
89
 
176
90
    REDUCE_GROUP_CLEAN = bson.code.Code("""
177
91
    function ( curr, result ) {
184
98
    }
185
99
    """)
186
100
 
 
101
    STANDARD_AGGREGATES = dict(
 
102
        emit_initial=dict(
 
103
            sum='',
 
104
            count='',
 
105
            avg='',
 
106
            min='',
 
107
            max=''
 
108
        ),
 
109
        emit_body=dict(
 
110
            sum='sum: this.counter_volume,',
 
111
            count='count: NumberInt(1),',
 
112
            avg='acount: NumberInt(1), asum: this.counter_volume,',
 
113
            min='min: this.counter_volume,',
 
114
            max='max: this.counter_volume,'
 
115
        ),
 
116
        reduce_initial=dict(
 
117
            sum='',
 
118
            count='',
 
119
            avg='',
 
120
            min='',
 
121
            max=''
 
122
        ),
 
123
        reduce_body=dict(
 
124
            sum='sum: values[0].sum,',
 
125
            count='count: values[0].count,',
 
126
            avg='acount: values[0].acount, asum: values[0].asum,',
 
127
            min='min: values[0].min,',
 
128
            max='max: values[0].max,'
 
129
        ),
 
130
        reduce_computation=dict(
 
131
            sum='res.sum += values[i].sum;',
 
132
            count='res.count = NumberInt(res.count + values[i].count);',
 
133
            avg=('res.acount = NumberInt(res.acount + values[i].acount);'
 
134
                 'res.asum += values[i].asum;'),
 
135
            min='if ( values[i].min < res.min ) {res.min = values[i].min;}',
 
136
            max='if ( values[i].max > res.max ) {res.max = values[i].max;}'
 
137
        ),
 
138
        finalize=dict(
 
139
            sum='',
 
140
            count='',
 
141
            avg='value.avg = value.asum / value.acount;',
 
142
            min='',
 
143
            max=''
 
144
        ),
 
145
    )
 
146
 
 
147
    UNPARAMETERIZED_AGGREGATES = dict(
 
148
        emit_initial=dict(
 
149
            stddev=(
 
150
                ''
 
151
            )
 
152
        ),
 
153
        emit_body=dict(
 
154
            stddev='sdsum: this.counter_volume,'
 
155
                   'sdcount: 1,'
 
156
                   'weighted_distances: 0,'
 
157
                   'stddev: 0,'
 
158
        ),
 
159
        reduce_initial=dict(
 
160
            stddev=''
 
161
        ),
 
162
        reduce_body=dict(
 
163
            stddev='sdsum: values[0].sdsum,'
 
164
                   'sdcount: values[0].sdcount,'
 
165
                   'weighted_distances: values[0].weighted_distances,'
 
166
                   'stddev: values[0].stddev,'
 
167
        ),
 
168
        reduce_computation=dict(
 
169
            stddev=(
 
170
                'var deviance = (res.sdsum / res.sdcount) - values[i].sdsum;'
 
171
                'var weight = res.sdcount / ++res.sdcount;'
 
172
                'res.weighted_distances += (Math.pow(deviance, 2) * weight);'
 
173
                'res.sdsum += values[i].sdsum;'
 
174
            )
 
175
        ),
 
176
        finalize=dict(
 
177
            stddev=(
 
178
                'value.stddev = Math.sqrt(value.weighted_distances /'
 
179
                '  value.sdcount);'
 
180
            )
 
181
        ),
 
182
    )
 
183
 
 
184
    PARAMETERIZED_AGGREGATES = dict(
 
185
        validate=dict(
 
186
            cardinality=lambda p: p in ['resource_id', 'user_id', 'project_id',
 
187
                                        'source']
 
188
        ),
 
189
        emit_initial=dict(
 
190
            cardinality=(
 
191
                'var aggregate = {};'
 
192
                'aggregate["cardinality/%(aggregate_param)s"] ='
 
193
                '  this["%(aggregate_param)s"];'
 
194
            )
 
195
        ),
 
196
        emit_body=dict(
 
197
            cardinality='aggregate : aggregate,'
 
198
        ),
 
199
        reduce_initial=dict(
 
200
            cardinality=(
 
201
                'var distincts = {};'
 
202
                'distincts[values[0].aggregate['
 
203
                '  "cardinality/%(aggregate_param)s"]] = true;'
 
204
                'var aggregate = {};'
 
205
                'aggregate["cardinality/%(aggregate_param)s"] = NumberInt(1);'
 
206
            )
 
207
        ),
 
208
        reduce_body=dict(
 
209
            cardinality='aggregate : aggregate,'
 
210
        ),
 
211
        reduce_computation=dict(
 
212
            cardinality=(
 
213
                'if (!(values[i].aggregate["cardinality/%(aggregate_param)s"]'
 
214
                '    in distincts)) {'
 
215
                '  distincts[values[i].aggregate['
 
216
                '    "cardinality/%(aggregate_param)s"]] = true;'
 
217
                '  res.aggregate["cardinality/%(aggregate_param)s"] ='
 
218
                '    NumberInt(Object.keys(distincts).length);}'
 
219
            )
 
220
        ),
 
221
        finalize=dict(
 
222
            cardinality=(
 
223
                'if (typeof value.aggregate['
 
224
                '    "cardinality/%(aggregate_param)s"] !== "number") {'
 
225
                '  value.aggregate["cardinality/%(aggregate_param)s"] ='
 
226
                '     NumberInt(1);}'
 
227
            )
 
228
        ),
 
229
    )
 
230
 
187
231
    EMIT_STATS_COMMON = """
 
232
        %(aggregate_initial_placeholder)s
188
233
        emit(%(key_val)s, { unit: this.counter_unit,
189
 
                            min : this.counter_volume,
190
 
                            max : this.counter_volume,
191
 
                            sum : this.counter_volume,
192
 
                            count : NumberInt(1),
 
234
                            %(aggregate_body_placeholder)s
193
235
                            groupby : %(groupby_val)s,
194
236
                            duration_start : this.timestamp,
195
237
                            duration_end : this.timestamp,
217
259
        }
218
260
    """
219
261
 
220
 
    PARAMS_MAP_STATS = {'key_val': '\'statistics\'',
221
 
                        'groupby_val': 'null',
222
 
                        'period_start_val': 'this.timestamp',
223
 
                        'period_end_val': 'this.timestamp'}
 
262
    PARAMS_MAP_STATS = {
 
263
        'key_val': '\'statistics\'',
 
264
        'groupby_val': 'null',
 
265
        'period_start_val': 'this.timestamp',
 
266
        'period_end_val': 'this.timestamp',
 
267
        'aggregate_initial_placeholder': '%(aggregate_initial_val)s',
 
268
        'aggregate_body_placeholder': '%(aggregate_body_val)s'
 
269
    }
224
270
 
225
271
    MAP_STATS = bson.code.Code("function () {" +
226
272
                               EMIT_STATS_COMMON % PARAMS_MAP_STATS +
230
276
        'key_val': 'period_start',
231
277
        'groupby_val': 'null',
232
278
        'period_start_val': 'new Date(period_start)',
233
 
        'period_end_val': 'new Date(period_start + period)'
 
279
        'period_end_val': 'new Date(period_start + period)',
 
280
        'aggregate_initial_placeholder': '%(aggregate_initial_val)s',
 
281
        'aggregate_body_placeholder': '%(aggregate_body_val)s'
234
282
    }
235
283
 
236
284
    MAP_STATS_PERIOD = bson.code.Code(
239
287
        EMIT_STATS_COMMON % PARAMS_MAP_STATS_PERIOD +
240
288
        "}")
241
289
 
242
 
    PARAMS_MAP_STATS_GROUPBY = {'key_val': 'groupby_key',
243
 
                                'groupby_val': 'groupby',
244
 
                                'period_start_val': 'this.timestamp',
245
 
                                'period_end_val': 'this.timestamp'}
 
290
    PARAMS_MAP_STATS_GROUPBY = {
 
291
        'key_val': 'groupby_key',
 
292
        'groupby_val': 'groupby',
 
293
        'period_start_val': 'this.timestamp',
 
294
        'period_end_val': 'this.timestamp',
 
295
        'aggregate_initial_placeholder': '%(aggregate_initial_val)s',
 
296
        'aggregate_body_placeholder': '%(aggregate_body_val)s'
 
297
    }
246
298
 
247
299
    MAP_STATS_GROUPBY = bson.code.Code(
248
300
        "function () {" +
254
306
        'key_val': 'groupby_key',
255
307
        'groupby_val': 'groupby',
256
308
        'period_start_val': 'new Date(period_start)',
257
 
        'period_end_val': 'new Date(period_start + period)'
 
309
        'period_end_val': 'new Date(period_start + period)',
 
310
        'aggregate_initial_placeholder': '%(aggregate_initial_val)s',
 
311
        'aggregate_body_placeholder': '%(aggregate_body_val)s'
258
312
    }
259
313
 
260
314
    MAP_STATS_PERIOD_GROUPBY = bson.code.Code(
267
321
 
268
322
    REDUCE_STATS = bson.code.Code("""
269
323
    function (key, values) {
 
324
        %(aggregate_initial_val)s
270
325
        var res = { unit: values[0].unit,
271
 
                    min: values[0].min,
272
 
                    max: values[0].max,
273
 
                    count: values[0].count,
274
 
                    sum: values[0].sum,
 
326
                    %(aggregate_body_val)s
275
327
                    groupby: values[0].groupby,
276
328
                    period_start: values[0].period_start,
277
329
                    period_end: values[0].period_end,
278
330
                    duration_start: values[0].duration_start,
279
331
                    duration_end: values[0].duration_end };
280
332
        for ( var i=1; i<values.length; i++ ) {
281
 
            if ( values[i].min < res.min )
282
 
               res.min = values[i].min;
283
 
            if ( values[i].max > res.max )
284
 
               res.max = values[i].max;
285
 
            res.count = NumberInt(res.count + values[i].count);
286
 
            res.sum += values[i].sum;
 
333
            %(aggregate_computation_val)s
287
334
            if ( values[i].duration_start < res.duration_start )
288
335
               res.duration_start = values[i].duration_start;
289
336
            if ( values[i].duration_end > res.duration_end )
295
342
 
296
343
    FINALIZE_STATS = bson.code.Code("""
297
344
    function (key, value) {
298
 
        value.avg = value.sum / value.count;
 
345
        %(aggregate_val)s
299
346
        value.duration = (value.duration_end - value.duration_start) / 1000;
300
347
        value.period = NumberInt((value.period_end - value.period_start)
301
348
                                  / 1000);
457
504
        # modify a data structure owned by our caller (the driver adds
458
505
        # a new key '_id').
459
506
        record = copy.copy(data)
 
507
        record['recorded_at'] = timeutils.utcnow()
460
508
        self.db.meter.insert(record)
461
509
 
462
510
    def clear_expired_metering_data(self, ttl):
533
581
        if marker is not None:
534
582
            sort_criteria_list = []
535
583
 
536
 
            for i in range(0, len(sort_keys)):
 
584
            for i in range(len(sort_keys)):
 
585
                #NOTE(fengqian): Generate the query criteria recursively.
 
586
                #sort_keys=[k1, k2, k3], maker_value=[v1, v2, v3]
 
587
                #sort_flags = ['$lt', '$gt', 'lt'].
 
588
                #The query criteria should be
 
589
                #{'k3': {'$lt': 'v3'}, 'k2': {'eq': 'v2'}, 'k1': {'eq': 'v1'}},
 
590
                #{'k2': {'$gt': 'v2'}, 'k1': {'eq': 'v1'}},
 
591
                #{'k1': {'$lt': 'v1'}} with 'OR' operation.
 
592
                #Each recurse will generate one items of three.
537
593
                sort_criteria_list.append(cls._recurse_sort_keys(
538
594
                                          sort_keys[:(len(sort_keys) - i)],
539
595
                                          marker, _op))
595
651
            limit = 0
596
652
        return db_collection.find(q, limit=limit, sort=all_sort)
597
653
 
598
 
    def get_users(self, source=None):
599
 
        """Return an iterable of user id strings.
600
 
 
601
 
        :param source: Optional source filter.
602
 
        """
603
 
        q = {}
604
 
        if source is not None:
605
 
            q['source'] = source
606
 
 
607
 
        return (doc['_id'] for doc in
608
 
                self.db.user.find(q, fields=['_id'],
609
 
                                  sort=[('_id', pymongo.ASCENDING)]))
610
 
 
611
 
    def get_projects(self, source=None):
612
 
        """Return an iterable of project id strings.
613
 
 
614
 
        :param source: Optional source filter.
615
 
        """
616
 
        q = {}
617
 
        if source is not None:
618
 
            q['source'] = source
619
 
 
620
 
        return (doc['_id'] for doc in
621
 
                self.db.project.find(q, fields=['_id'],
622
 
                                     sort=[('_id', pymongo.ASCENDING)]))
623
 
 
624
654
    def get_resources(self, user=None, project=None, source=None,
625
655
                      start_timestamp=None, start_timestamp_op=None,
626
656
                      end_timestamp=None, end_timestamp_op=None,
662
692
            # Look for resources matching the above criteria and with
663
693
            # samples in the time range we care about, then change the
664
694
            # resource query to return just those resources by id.
665
 
            ts_range = make_timestamp_range(start_timestamp, end_timestamp,
666
 
                                            start_timestamp_op,
667
 
                                            end_timestamp_op)
 
695
            ts_range = pymongo_base.make_timestamp_range(start_timestamp,
 
696
                                                         end_timestamp,
 
697
                                                         start_timestamp_op,
 
698
                                                         end_timestamp_op)
668
699
            if ts_range:
669
700
                q['timestamp'] = ts_range
670
701
 
695
726
        finally:
696
727
            self.db[out].drop()
697
728
 
698
 
    def get_meters(self, user=None, project=None, resource=None, source=None,
699
 
                   metaquery={}, pagination=None):
700
 
        """Return an iterable of models.Meter instances
701
 
 
702
 
        :param user: Optional ID for user that owns the resource.
703
 
        :param project: Optional ID for project that owns the resource.
704
 
        :param resource: Optional resource filter.
705
 
        :param source: Optional source filter.
706
 
        :param metaquery: Optional dict with metadata to match on.
707
 
        :param pagination: Optional pagination query.
708
 
        """
709
 
        if pagination:
710
 
            raise NotImplementedError(_('Pagination not implemented'))
711
 
 
712
 
        q = {}
713
 
        if user is not None:
714
 
            q['user_id'] = user
715
 
        if project is not None:
716
 
            q['project_id'] = project
717
 
        if resource is not None:
718
 
            q['_id'] = resource
719
 
        if source is not None:
720
 
            q['source'] = source
721
 
        q.update(metaquery)
722
 
 
723
 
        for r in self.db.resource.find(q):
724
 
            for r_meter in r['meter']:
725
 
                yield models.Meter(
726
 
                    name=r_meter['counter_name'],
727
 
                    type=r_meter['counter_type'],
728
 
                    # Return empty string if 'counter_unit' is not valid for
729
 
                    # backward compatibility.
730
 
                    unit=r_meter.get('counter_unit', ''),
731
 
                    resource_id=r['_id'],
732
 
                    project_id=r['project_id'],
733
 
                    source=r['source'],
734
 
                    user_id=r['user_id'],
735
 
                )
736
 
 
737
 
    def get_samples(self, sample_filter, limit=None):
738
 
        """Return an iterable of model.Sample instances.
739
 
 
740
 
        :param sample_filter: Filter.
741
 
        :param limit: Maximum number of results to return.
742
 
        """
743
 
        if limit == 0:
744
 
            return
745
 
        q = make_query_from_filter(sample_filter, require_meter=False)
746
 
        if limit:
747
 
            samples = self.db.meter.find(
748
 
                q, limit=limit, sort=[("timestamp", pymongo.DESCENDING)])
749
 
        else:
750
 
            samples = self.db.meter.find(
751
 
                q, sort=[("timestamp", pymongo.DESCENDING)])
752
 
 
753
 
        for s in samples:
754
 
            # Remove the ObjectId generated by the database when
755
 
            # the sample was inserted. It is an implementation
756
 
            # detail that should not leak outside of the driver.
757
 
            del s['_id']
758
 
            # Backward compatibility for samples without units
759
 
            s['counter_unit'] = s.get('counter_unit', '')
760
 
            yield models.Sample(**s)
761
 
 
762
 
    def get_meter_statistics(self, sample_filter, period=None, groupby=None):
 
729
    def _aggregate_param(self, fragment_key, aggregate):
 
730
        fragment_map = self.STANDARD_AGGREGATES[fragment_key]
 
731
 
 
732
        if not aggregate:
 
733
            return ''.join([f for f in fragment_map.values()])
 
734
 
 
735
        fragments = ''
 
736
 
 
737
        for a in aggregate:
 
738
            if a.func in self.STANDARD_AGGREGATES[fragment_key]:
 
739
                fragment_map = self.STANDARD_AGGREGATES[fragment_key]
 
740
                fragments += fragment_map[a.func]
 
741
            elif a.func in self.UNPARAMETERIZED_AGGREGATES[fragment_key]:
 
742
                fragment_map = self.UNPARAMETERIZED_AGGREGATES[fragment_key]
 
743
                fragments += fragment_map[a.func]
 
744
            elif a.func in self.PARAMETERIZED_AGGREGATES[fragment_key]:
 
745
                fragment_map = self.PARAMETERIZED_AGGREGATES[fragment_key]
 
746
                v = self.PARAMETERIZED_AGGREGATES['validate'].get(a.func)
 
747
                if not (v and v(a.param)):
 
748
                    raise storage.StorageBadAggregate('Bad aggregate: %s.%s'
 
749
                                                      % (a.func, a.param))
 
750
                params = dict(aggregate_param=a.param)
 
751
                fragments += (fragment_map[a.func] % params)
 
752
            else:
 
753
                raise NotImplementedError(_('Selectable aggregate function %s'
 
754
                                            ' is not supported') % a.func)
 
755
 
 
756
        return fragments
 
757
 
 
758
    def get_meter_statistics(self, sample_filter, period=None, groupby=None,
 
759
                             aggregate=None):
763
760
        """Return an iterable of models.Statistics instance containing meter
764
761
        statistics described by the query parameters.
765
762
 
771
768
                                    'resource_id', 'source'])):
772
769
            raise NotImplementedError("Unable to group by these fields")
773
770
 
774
 
        q = make_query_from_filter(sample_filter)
 
771
        q = pymongo_base.make_query_from_filter(sample_filter)
775
772
 
776
773
        if period:
777
774
            if sample_filter.start:
781
778
                    limit=1, sort=[('timestamp',
782
779
                                    pymongo.ASCENDING)])[0]['timestamp']
783
780
            period_start = int(calendar.timegm(period_start.utctimetuple()))
784
 
            params_period = {'period': period,
785
 
                             'period_first': period_start,
786
 
                             'groupby_fields': json.dumps(groupby)}
 
781
            map_params = {'period': period,
 
782
                          'period_first': period_start,
 
783
                          'groupby_fields': json.dumps(groupby)}
787
784
            if groupby:
788
 
                map_stats = self.MAP_STATS_PERIOD_GROUPBY % params_period
 
785
                map_fragment = self.MAP_STATS_PERIOD_GROUPBY
789
786
            else:
790
 
                map_stats = self.MAP_STATS_PERIOD % params_period
 
787
                map_fragment = self.MAP_STATS_PERIOD
791
788
        else:
792
789
            if groupby:
793
 
                params_groupby = {'groupby_fields': json.dumps(groupby)}
794
 
                map_stats = self.MAP_STATS_GROUPBY % params_groupby
 
790
                map_params = {'groupby_fields': json.dumps(groupby)}
 
791
                map_fragment = self.MAP_STATS_GROUPBY
795
792
            else:
796
 
                map_stats = self.MAP_STATS
 
793
                map_params = dict()
 
794
                map_fragment = self.MAP_STATS
 
795
 
 
796
        sub = self._aggregate_param
 
797
 
 
798
        map_params['aggregate_initial_val'] = sub('emit_initial', aggregate)
 
799
        map_params['aggregate_body_val'] = sub('emit_body', aggregate)
 
800
 
 
801
        map_stats = map_fragment % map_params
 
802
 
 
803
        reduce_params = dict(
 
804
            aggregate_initial_val=sub('reduce_initial', aggregate),
 
805
            aggregate_body_val=sub('reduce_body', aggregate),
 
806
            aggregate_computation_val=sub('reduce_computation', aggregate)
 
807
        )
 
808
        reduce_stats = self.REDUCE_STATS % reduce_params
 
809
 
 
810
        finalize_params = dict(aggregate_val=sub('finalize', aggregate))
 
811
        finalize_stats = self.FINALIZE_STATS % finalize_params
797
812
 
798
813
        results = self.db.meter.map_reduce(
799
814
            map_stats,
800
 
            self.REDUCE_STATS,
 
815
            reduce_stats,
801
816
            {'inline': 1},
802
 
            finalize=self.FINALIZE_STATS,
 
817
            finalize=finalize_stats,
803
818
            query=q,
804
819
        )
805
820
 
806
821
        # FIXME(terriyu) Fix get_meter_statistics() so we don't use sorted()
807
822
        # to return the results
808
823
        return sorted(
809
 
            (models.Statistics(**(r['value'])) for r in results['results']),
 
824
            (self._stats_result_to_model(r['value'], groupby, aggregate)
 
825
             for r in results['results']),
810
826
            key=operator.attrgetter('period_start'))
811
827
 
812
828
    @staticmethod
813
 
    def _decode_matching_metadata(matching_metadata):
814
 
        if isinstance(matching_metadata, dict):
815
 
            #note(sileht): keep compatibility with alarm
816
 
            #with matching_metadata as a dict
817
 
            return matching_metadata
818
 
        else:
819
 
            new_matching_metadata = {}
820
 
            for elem in matching_metadata:
821
 
                new_matching_metadata[elem['key']] = elem['value']
822
 
            return new_matching_metadata
823
 
 
824
 
    @classmethod
825
 
    def _ensure_encapsulated_rule_format(cls, alarm):
826
 
        """This ensure the alarm returned by the storage have the correct
827
 
        format. The previous format looks like:
828
 
        {
829
 
            'alarm_id': '0ld-4l3rt',
830
 
            'enabled': True,
831
 
            'name': 'old-alert',
832
 
            'description': 'old-alert',
833
 
            'timestamp': None,
834
 
            'meter_name': 'cpu',
835
 
            'user_id': 'me',
836
 
            'project_id': 'and-da-boys',
837
 
            'comparison_operator': 'lt',
838
 
            'threshold': 36,
839
 
            'statistic': 'count',
840
 
            'evaluation_periods': 1,
841
 
            'period': 60,
842
 
            'state': "insufficient data",
843
 
            'state_timestamp': None,
844
 
            'ok_actions': [],
845
 
            'alarm_actions': ['http://nowhere/alarms'],
846
 
            'insufficient_data_actions': [],
847
 
            'repeat_actions': False,
848
 
            'matching_metadata': {'key': 'value'}
849
 
            # or 'matching_metadata': [{'key': 'key', 'value': 'value'}]
 
829
    def _stats_result_aggregates(result, aggregate):
 
830
        stats_args = {}
 
831
        for attr in ['count', 'min', 'max', 'sum', 'avg']:
 
832
            if attr in result:
 
833
                stats_args[attr] = result[attr]
 
834
 
 
835
        if aggregate:
 
836
            stats_args['aggregate'] = {}
 
837
            for a in aggregate:
 
838
                ak = '%s%s' % (a.func, '/%s' % a.param if a.param else '')
 
839
                if ak in result:
 
840
                    stats_args['aggregate'][ak] = result[ak]
 
841
                elif 'aggregate' in result:
 
842
                    stats_args['aggregate'][ak] = result['aggregate'].get(ak)
 
843
        return stats_args
 
844
 
 
845
    @staticmethod
 
846
    def _stats_result_to_model(result, groupby, aggregate):
 
847
        stats_args = Connection._stats_result_aggregates(result, aggregate)
 
848
        stats_args['unit'] = result['unit']
 
849
        stats_args['duration'] = result['duration']
 
850
        stats_args['duration_start'] = result['duration_start']
 
851
        stats_args['duration_end'] = result['duration_end']
 
852
        stats_args['period'] = result['period']
 
853
        stats_args['period_start'] = result['period_start']
 
854
        stats_args['period_end'] = result['period_end']
 
855
        stats_args['groupby'] = (dict(
 
856
            (g, result['groupby'][g]) for g in groupby) if groupby else None)
 
857
        return models.Statistics(**stats_args)
 
858
 
 
859
    def get_capabilities(self):
 
860
        """Return an dictionary representing the capabilities of this driver.
 
861
        """
 
862
        available = {
 
863
            'meters': {'query': {'simple': True,
 
864
                                 'metadata': True}},
 
865
            'resources': {'query': {'simple': True,
 
866
                                    'metadata': True}},
 
867
            'samples': {'query': {'simple': True,
 
868
                                  'metadata': True,
 
869
                                  'complex': True}},
 
870
            'statistics': {'groupby': True,
 
871
                           'query': {'simple': True,
 
872
                                     'metadata': True},
 
873
                           'aggregation': {'standard': True,
 
874
                                           'selectable': {
 
875
                                               'max': True,
 
876
                                               'min': True,
 
877
                                               'sum': True,
 
878
                                               'avg': True,
 
879
                                               'count': True,
 
880
                                               'stddev': True,
 
881
                                               'cardinality': True}}
 
882
                           },
 
883
            'alarms': {'query': {'simple': True,
 
884
                                 'complex': True},
 
885
                       'history': {'query': {'simple': True,
 
886
                                             'complex': True}}},
850
887
        }
851
 
        """
852
 
 
853
 
        if isinstance(alarm.get('rule'), dict):
854
 
            return
855
 
 
856
 
        alarm['type'] = 'threshold'
857
 
        alarm['rule'] = {}
858
 
        alarm['matching_metadata'] = cls._decode_matching_metadata(
859
 
            alarm['matching_metadata'])
860
 
        for field in ['period', 'evaluation_periods', 'threshold',
861
 
                      'statistic', 'comparison_operator', 'meter_name']:
862
 
            if field in alarm:
863
 
                alarm['rule'][field] = alarm[field]
864
 
                del alarm[field]
865
 
 
866
 
        query = []
867
 
        for key in alarm['matching_metadata']:
868
 
            query.append({'field': key,
869
 
                          'op': 'eq',
870
 
                          'value': alarm['matching_metadata'][key],
871
 
                          'type': 'string'})
872
 
        del alarm['matching_metadata']
873
 
        alarm['rule']['query'] = query
874
 
 
875
 
    def get_alarms(self, name=None, user=None,
876
 
                   project=None, enabled=None, alarm_id=None, pagination=None):
877
 
        """Yields a lists of alarms that match filters
878
 
        :param name: The Alarm name.
879
 
        :param user: Optional ID for user that owns the resource.
880
 
        :param project: Optional ID for project that owns the resource.
881
 
        :param enabled: Optional boolean to list disable alarm.
882
 
        :param alarm_id: Optional alarm_id to return one alarm.
883
 
        :param pagination: Optional pagination query.
884
 
        """
885
 
        if pagination:
886
 
            raise NotImplementedError(_('Pagination not implemented'))
887
 
 
888
 
        q = {}
889
 
        if user is not None:
890
 
            q['user_id'] = user
891
 
        if project is not None:
892
 
            q['project_id'] = project
893
 
        if name is not None:
894
 
            q['name'] = name
895
 
        if enabled is not None:
896
 
            q['enabled'] = enabled
897
 
        if alarm_id is not None:
898
 
            q['alarm_id'] = alarm_id
899
 
 
900
 
        for alarm in self.db.alarm.find(q):
901
 
            a = {}
902
 
            a.update(alarm)
903
 
            del a['_id']
904
 
            self._ensure_encapsulated_rule_format(a)
905
 
            yield models.Alarm(**a)
906
 
 
907
 
    def update_alarm(self, alarm):
908
 
        """update alarm
909
 
        """
910
 
        data = alarm.as_dict()
911
 
 
912
 
        self.db.alarm.update(
913
 
            {'alarm_id': alarm.alarm_id},
914
 
            {'$set': data},
915
 
            upsert=True)
916
 
 
917
 
        stored_alarm = self.db.alarm.find({'alarm_id': alarm.alarm_id})[0]
918
 
        del stored_alarm['_id']
919
 
        self._ensure_encapsulated_rule_format(stored_alarm)
920
 
        return models.Alarm(**stored_alarm)
921
 
 
922
 
    create_alarm = update_alarm
923
 
 
924
 
    def delete_alarm(self, alarm_id):
925
 
        """Delete a alarm
926
 
        """
927
 
        self.db.alarm.remove({'alarm_id': alarm_id})
928
 
 
929
 
    def get_alarm_changes(self, alarm_id, on_behalf_of,
930
 
                          user=None, project=None, type=None,
931
 
                          start_timestamp=None, start_timestamp_op=None,
932
 
                          end_timestamp=None, end_timestamp_op=None):
933
 
        """Yields list of AlarmChanges describing alarm history
934
 
 
935
 
        Changes are always sorted in reverse order of occurrence, given
936
 
        the importance of currency.
937
 
 
938
 
        Segregation for non-administrative users is done on the basis
939
 
        of the on_behalf_of parameter. This allows such users to have
940
 
        visibility on both the changes initiated by themselves directly
941
 
        (generally creation, rule changes, or deletion) and also on those
942
 
        changes initiated on their behalf by the alarming service (state
943
 
        transitions after alarm thresholds are crossed).
944
 
 
945
 
        :param alarm_id: ID of alarm to return changes for
946
 
        :param on_behalf_of: ID of tenant to scope changes query (None for
947
 
                             administrative user, indicating all projects)
948
 
        :param user: Optional ID of user to return changes for
949
 
        :param project: Optional ID of project to return changes for
950
 
        :project type: Optional change type
951
 
        :param start_timestamp: Optional modified timestamp start range
952
 
        :param start_timestamp_op: Optional timestamp start range operation
953
 
        :param end_timestamp: Optional modified timestamp end range
954
 
        :param end_timestamp_op: Optional timestamp end range operation
955
 
        """
956
 
        q = dict(alarm_id=alarm_id)
957
 
        if on_behalf_of is not None:
958
 
            q['on_behalf_of'] = on_behalf_of
959
 
        if user is not None:
960
 
            q['user_id'] = user
961
 
        if project is not None:
962
 
            q['project_id'] = project
963
 
        if type is not None:
964
 
            q['type'] = type
965
 
        if start_timestamp or end_timestamp:
966
 
            ts_range = make_timestamp_range(start_timestamp, end_timestamp,
967
 
                                            start_timestamp_op,
968
 
                                            end_timestamp_op)
969
 
            if ts_range:
970
 
                q['timestamp'] = ts_range
971
 
 
972
 
        sort = [("timestamp", pymongo.DESCENDING)]
973
 
        for alarm_change in self.db.alarm_history.find(q, sort=sort):
974
 
            ac = {}
975
 
            ac.update(alarm_change)
976
 
            del ac['_id']
977
 
            yield models.AlarmChange(**ac)
978
 
 
979
 
    def record_alarm_change(self, alarm_change):
980
 
        """Record alarm change event.
981
 
        """
982
 
        self.db.alarm_history.insert(alarm_change)
 
888
        return utils.update_nested(self.DEFAULT_CAPABILITIES, available)