~ubuntu-branches/ubuntu/vivid/ceilometer/vivid

« back to all changes in this revision

Viewing changes to ceilometer/storage/impl_db2.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2014-03-06 14:44:28 UTC
  • mto: (28.1.1 utopic-proposed) (1.2.1)
  • mto: This revision was merged to the branch mainline in revision 19.
  • Revision ID: package-import@ubuntu.com-20140306144428-rvphsh4igwyulzf0
Tags: upstream-2014.1~b3
ImportĀ upstreamĀ versionĀ 2014.1~b3

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
import datetime
27
27
import itertools
28
28
import sys
29
 
import weakref
30
29
 
31
30
import bson.code
32
31
import bson.objectid
38
37
from ceilometer import storage
39
38
from ceilometer.storage import base
40
39
from ceilometer.storage import models
 
40
from ceilometer.storage import pymongo_base
 
41
from ceilometer import utils
41
42
 
42
43
LOG = log.getLogger(__name__)
43
44
 
74
75
        return Connection(conf)
75
76
 
76
77
 
77
 
def make_timestamp_range(start, end,
78
 
                         start_timestamp_op=None, end_timestamp_op=None):
79
 
    """Given two possible datetimes and their operations, create the query
80
 
    document to find timestamps within that range.
81
 
    By default, using $gte for the lower bound and $lt for the
82
 
    upper bound.
83
 
    """
84
 
    ts_range = {}
85
 
 
86
 
    if start:
87
 
        if start_timestamp_op == 'gt':
88
 
            start_timestamp_op = '$gt'
89
 
        else:
90
 
            start_timestamp_op = '$gte'
91
 
        ts_range[start_timestamp_op] = start
92
 
 
93
 
    if end:
94
 
        if end_timestamp_op == 'le':
95
 
            end_timestamp_op = '$lte'
96
 
        else:
97
 
            end_timestamp_op = '$lt'
98
 
        ts_range[end_timestamp_op] = end
99
 
    return ts_range
100
 
 
101
 
 
102
 
def make_query_from_filter(sample_filter, require_meter=True):
103
 
    """Return a query dictionary based on the settings in the filter.
104
 
 
105
 
    :param filter: SampleFilter instance
106
 
    :param require_meter: If true and the filter does not have a meter,
107
 
                          raise an error.
108
 
    """
109
 
    q = {}
110
 
 
111
 
    if sample_filter.user:
112
 
        q['user_id'] = sample_filter.user
113
 
    if sample_filter.project:
114
 
        q['project_id'] = sample_filter.project
115
 
 
116
 
    if sample_filter.meter:
117
 
        q['counter_name'] = sample_filter.meter
118
 
    elif require_meter:
119
 
        raise RuntimeError('Missing required meter specifier')
120
 
 
121
 
    ts_range = make_timestamp_range(sample_filter.start, sample_filter.end,
122
 
                                    sample_filter.start_timestamp_op,
123
 
                                    sample_filter.end_timestamp_op)
124
 
    if ts_range:
125
 
        q['timestamp'] = ts_range
126
 
 
127
 
    if sample_filter.resource:
128
 
        q['resource_id'] = sample_filter.resource
129
 
    if sample_filter.source:
130
 
        q['source'] = sample_filter.source
131
 
    if sample_filter.message_id:
132
 
        q['message_id'] = sample_filter.message_id
133
 
 
134
 
    # so the samples call metadata resource_metadata, so we convert
135
 
    # to that.
136
 
    q.update(dict(('resource_%s' % k, v)
137
 
                  for (k, v) in sample_filter.metaquery.iteritems()))
138
 
    return q
139
 
 
140
 
 
141
 
class ConnectionPool(object):
142
 
 
143
 
    def __init__(self):
144
 
        self._pool = {}
145
 
 
146
 
    def connect(self, url):
147
 
        connection_options = pymongo.uri_parser.parse_uri(url)
148
 
        del connection_options['database']
149
 
        del connection_options['username']
150
 
        del connection_options['password']
151
 
        del connection_options['collection']
152
 
        pool_key = tuple(connection_options)
153
 
 
154
 
        if pool_key in self._pool:
155
 
            client = self._pool.get(pool_key)()
156
 
            if client:
157
 
                return client
158
 
        LOG.info(_('Connecting to DB2 on %s'),
159
 
                 connection_options['nodelist'])
160
 
        client = pymongo.MongoClient(
161
 
            url,
162
 
            safe=True)
163
 
        self._pool[pool_key] = weakref.ref(client)
164
 
        return client
165
 
 
166
 
 
167
 
class Connection(base.Connection):
 
78
class Connection(pymongo_base.Connection):
168
79
    """DB2 connection.
169
80
    """
170
81
 
171
 
    CONNECTION_POOL = ConnectionPool()
 
82
    CONNECTION_POOL = pymongo_base.ConnectionPool()
172
83
 
173
84
    GROUP = {'_id': '$counter_name',
174
85
             'unit': {'$min': '$counter_unit'},
345
256
        # modify a data structure owned by our caller (the driver adds
346
257
        # a new key '_id').
347
258
        record = copy.copy(data)
 
259
        record['recorded_at'] = timeutils.utcnow()
348
260
        # Make sure that the data does have field _id which db2 wont add
349
261
        # automatically.
350
262
        if record.get('_id') is None:
351
263
            record['_id'] = str(bson.objectid.ObjectId())
352
264
        self.db.meter.insert(record)
353
265
 
354
 
    def get_users(self, source=None):
355
 
        """Return an iterable of user id strings.
356
 
 
357
 
        :param source: Optional source filter.
358
 
        """
359
 
        q = {}
360
 
        if source is not None:
361
 
            q['source'] = source
362
 
 
363
 
        return (doc['_id'] for doc in
364
 
                self.db.user.find(q, fields=['_id'],
365
 
                                  sort=[('_id', pymongo.ASCENDING)]))
366
 
 
367
 
    def get_projects(self, source=None):
368
 
        """Return an iterable of project id strings.
369
 
 
370
 
        :param source: Optional source filter.
371
 
        """
372
 
        q = {}
373
 
        if source is not None:
374
 
            q['source'] = source
375
 
 
376
 
        return (doc['_id'] for doc in
377
 
                self.db.project.find(q, fields=['_id'],
378
 
                                     sort=[('_id', pymongo.ASCENDING)]))
379
 
 
380
266
    def get_resources(self, user=None, project=None, source=None,
381
267
                      start_timestamp=None, start_timestamp_op=None,
382
268
                      end_timestamp=None, end_timestamp_op=None,
414
300
            # Look for resources matching the above criteria and with
415
301
            # samples in the time range we care about, then change the
416
302
            # resource query to return just those resources by id.
417
 
            ts_range = make_timestamp_range(start_timestamp, end_timestamp,
418
 
                                            start_timestamp_op,
419
 
                                            end_timestamp_op)
 
303
            ts_range = pymongo_base.make_timestamp_range(start_timestamp,
 
304
                                                         end_timestamp,
 
305
                                                         start_timestamp_op,
 
306
                                                         end_timestamp_op)
420
307
            if ts_range:
421
308
                q['timestamp'] = ts_range
422
309
 
443
330
                                  user_id=latest_meter['user_id'],
444
331
                                  metadata=latest_meter['resource_metadata'])
445
332
 
446
 
    def get_meters(self, user=None, project=None, resource=None, source=None,
447
 
                   metaquery={}, pagination=None):
448
 
        """Return an iterable of models.Meter instances
449
 
 
450
 
        :param user: Optional ID for user that owns the resource.
451
 
        :param project: Optional ID for project that owns the resource.
452
 
        :param resource: Optional resource filter.
453
 
        :param source: Optional source filter.
454
 
        :param metaquery: Optional dict with metadata to match on.
455
 
        :param pagination: Optional pagination query.
456
 
        """
457
 
 
458
 
        if pagination:
459
 
            raise NotImplementedError(_('Pagination not implemented'))
460
 
 
461
 
        q = {}
462
 
        if user is not None:
463
 
            q['user_id'] = user
464
 
        if project is not None:
465
 
            q['project_id'] = project
466
 
        if resource is not None:
467
 
            q['_id'] = resource
468
 
        if source is not None:
469
 
            q['source'] = source
470
 
        q.update(metaquery)
471
 
 
472
 
        for r in self.db.resource.find(q):
473
 
            for r_meter in r['meter']:
474
 
                yield models.Meter(
475
 
                    name=r_meter['counter_name'],
476
 
                    type=r_meter['counter_type'],
477
 
                    # Return empty string if 'counter_unit' is not valid for
478
 
                    # backward compatibility.
479
 
                    unit=r_meter.get('counter_unit', ''),
480
 
                    resource_id=r['_id'],
481
 
                    project_id=r['project_id'],
482
 
                    source=r['source'],
483
 
                    user_id=r['user_id'],
484
 
                )
485
 
 
486
 
    def get_samples(self, sample_filter, limit=None):
487
 
        """Return an iterable of model.Sample instances.
488
 
 
489
 
        :param sample_filter: Filter.
490
 
        :param limit: Maximum number of results to return.
491
 
        """
492
 
        if limit == 0:
493
 
            return
494
 
        q = make_query_from_filter(sample_filter, require_meter=False)
495
 
 
496
 
        if limit:
497
 
            samples = self.db.meter.find(
498
 
                q, limit=limit, sort=[("timestamp", pymongo.DESCENDING)])
499
 
        else:
500
 
            samples = self.db.meter.find(
501
 
                q, sort=[("timestamp", pymongo.DESCENDING)])
502
 
 
503
 
        for s in samples:
504
 
            # Remove the ObjectId generated by the database when
505
 
            # the sample was inserted. It is an implementation
506
 
            # detail that should not leak outside of the driver.
507
 
            del s['_id']
508
 
            # Backward compatibility for samples without units
509
 
            s['counter_unit'] = s.get('counter_unit', '')
510
 
            yield models.Sample(**s)
511
 
 
512
 
    def get_meter_statistics(self, sample_filter, period=None, groupby=None):
 
333
    def get_meter_statistics(self, sample_filter, period=None, groupby=None,
 
334
                             aggregate=None):
513
335
        """Return an iterable of models.Statistics instance containing meter
514
336
        statistics described by the query parameters.
515
337
 
520
342
                                    'resource_id', 'source'])):
521
343
            raise NotImplementedError("Unable to group by these fields")
522
344
 
523
 
        q = make_query_from_filter(sample_filter)
 
345
        if aggregate:
 
346
            msg = _('Selectable aggregates not implemented')
 
347
            raise NotImplementedError(msg)
 
348
 
 
349
        q = pymongo_base.make_query_from_filter(sample_filter)
524
350
 
525
351
        if period:
526
352
            if sample_filter.start:
555
381
                    'seconds': (periods * period) % self.SECONDS_IN_A_DAY}
556
382
 
557
383
        for key, grouped_meters in itertools.groupby(meters, key=_group_key):
558
 
            stat = models.Statistics(None, sys.maxint, -sys.maxint, 0, 0, 0,
559
 
                                     0, 0, 0, 0, 0, 0, None)
 
384
            stat = models.Statistics(unit=None,
 
385
                                     min=sys.maxint, max=-sys.maxint,
 
386
                                     avg=0, sum=0, count=0,
 
387
                                     period=0, period_start=0, period_end=0,
 
388
                                     duration=0, duration_start=0,
 
389
                                     duration_end=0, groupby=None)
560
390
 
561
391
            for meter in grouped_meters:
562
392
                stat.unit = meter.get('counter_unit', '')
590
420
                stat.period_end = stat.duration_end
591
421
            yield stat
592
422
 
593
 
    @staticmethod
594
 
    def _decode_matching_metadata(matching_metadata):
595
 
        if isinstance(matching_metadata, dict):
596
 
            #note(sileht): keep compatibility with old db format
597
 
            return matching_metadata
598
 
        else:
599
 
            new_matching_metadata = {}
600
 
            for elem in matching_metadata:
601
 
                new_matching_metadata[elem['key']] = elem['value']
602
 
            return new_matching_metadata
603
 
 
604
 
    @classmethod
605
 
    def _ensure_encapsulated_rule_format(cls, alarm):
606
 
        """This ensure the alarm returned by the storage have the correct
607
 
        format. The previous format looks like:
608
 
        {
609
 
            'alarm_id': '0ld-4l3rt',
610
 
            'enabled': True,
611
 
            'name': 'old-alert',
612
 
            'description': 'old-alert',
613
 
            'timestamp': None,
614
 
            'meter_name': 'cpu',
615
 
            'user_id': 'me',
616
 
            'project_id': 'and-da-boys',
617
 
            'comparison_operator': 'lt',
618
 
            'threshold': 36,
619
 
            'statistic': 'count',
620
 
            'evaluation_periods': 1,
621
 
            'period': 60,
622
 
            'state': "insufficient data",
623
 
            'state_timestamp': None,
624
 
            'ok_actions': [],
625
 
            'alarm_actions': ['http://nowhere/alarms'],
626
 
            'insufficient_data_actions': [],
627
 
            'repeat_actions': False,
628
 
            'matching_metadata': {'key': 'value'}
629
 
            # or 'matching_metadata': [{'key': 'key', 'value': 'value'}]
 
423
    def get_capabilities(self):
 
424
        """Return an dictionary representing the capabilities of this driver.
 
425
        """
 
426
        available = {
 
427
            'meters': {'query': {'simple': True,
 
428
                                 'metadata': True}},
 
429
            'resources': {'query': {'simple': True,
 
430
                                    'metadata': True}},
 
431
            'samples': {'query': {'simple': True,
 
432
                                  'metadata': True,
 
433
                                  'complex': True}},
 
434
            'statistics': {'groupby': True,
 
435
                           'query': {'simple': True,
 
436
                                     'metadata': True},
 
437
                           'aggregation': {'standard': True}},
 
438
            'alarms': {'query': {'simple': True,
 
439
                                 'complex': True},
 
440
                       'history': {'query': {'simple': True}}},
630
441
        }
631
 
        """
632
 
 
633
 
        if isinstance(alarm.get('rule'), dict):
634
 
            return
635
 
 
636
 
        alarm['type'] = 'threshold'
637
 
        alarm['rule'] = {}
638
 
        alarm['matching_metadata'] = cls._decode_matching_metadata(
639
 
            alarm['matching_metadata'])
640
 
        for field in ['period', 'evaluation_period', 'threshold',
641
 
                      'statistic', 'comparison_operator', 'meter_name']:
642
 
            if field in alarm:
643
 
                alarm['rule'][field] = alarm[field]
644
 
                del alarm[field]
645
 
 
646
 
        query = []
647
 
        for key in alarm['matching_metadata']:
648
 
            query.append({'field': key,
649
 
                          'op': 'eq',
650
 
                          'value': alarm['matching_metadata'][key]})
651
 
        del alarm['matching_metadata']
652
 
        alarm['rule']['query'] = query
653
 
 
654
 
    def get_alarms(self, name=None, user=None,
655
 
                   project=None, enabled=None, alarm_id=None, pagination=None):
656
 
        """Yields a lists of alarms that match filters
657
 
        :param user: Optional ID for user that owns the resource.
658
 
        :param project: Optional ID for project that owns the resource.
659
 
        :param enabled: Optional boolean to list disable alarm.
660
 
        :param alarm_id: Optional alarm_id to return one alarm.
661
 
        :param metaquery: Optional dict with metadata to match on.
662
 
        :param resource: Optional resource filter.
663
 
        :param pagination: Optional pagination query.
664
 
        """
665
 
 
666
 
        if pagination:
667
 
            raise NotImplementedError(_('Pagination not implemented'))
668
 
 
669
 
        q = {}
670
 
        if user is not None:
671
 
            q['user_id'] = user
672
 
        if project is not None:
673
 
            q['project_id'] = project
674
 
        if name is not None:
675
 
            q['name'] = name
676
 
        if enabled is not None:
677
 
            q['enabled'] = enabled
678
 
        if alarm_id is not None:
679
 
            q['alarm_id'] = alarm_id
680
 
 
681
 
        for alarm in self.db.alarm.find(q):
682
 
            a = {}
683
 
            a.update(alarm)
684
 
            del a['_id']
685
 
            self._ensure_encapsulated_rule_format(a)
686
 
            yield models.Alarm(**a)
687
 
 
688
 
    def update_alarm(self, alarm):
689
 
        """update alarm
690
 
        """
691
 
        data = alarm.as_dict()
692
 
        self.db.alarm.update(
693
 
            {'alarm_id': alarm.alarm_id},
694
 
            {'$set': data},
695
 
            upsert=True)
696
 
 
697
 
        stored_alarm = self.db.alarm.find({'alarm_id': alarm.alarm_id})[0]
698
 
        del stored_alarm['_id']
699
 
        self._ensure_encapsulated_rule_format(stored_alarm)
700
 
        return models.Alarm(**stored_alarm)
701
 
 
702
 
    create_alarm = update_alarm
703
 
 
704
 
    def delete_alarm(self, alarm_id):
705
 
        """Delete an alarm
706
 
        """
707
 
        self.db.alarm.remove({'alarm_id': alarm_id})
 
442
        return utils.update_nested(self.DEFAULT_CAPABILITIES, available)