74
75
return Connection(conf)
77
def make_timestamp_range(start, end,
78
start_timestamp_op=None, end_timestamp_op=None):
79
"""Given two possible datetimes and their operations, create the query
80
document to find timestamps within that range.
81
By default, using $gte for the lower bound and $lt for the
87
if start_timestamp_op == 'gt':
88
start_timestamp_op = '$gt'
90
start_timestamp_op = '$gte'
91
ts_range[start_timestamp_op] = start
94
if end_timestamp_op == 'le':
95
end_timestamp_op = '$lte'
97
end_timestamp_op = '$lt'
98
ts_range[end_timestamp_op] = end
102
def make_query_from_filter(sample_filter, require_meter=True):
103
"""Return a query dictionary based on the settings in the filter.
105
:param filter: SampleFilter instance
106
:param require_meter: If true and the filter does not have a meter,
111
if sample_filter.user:
112
q['user_id'] = sample_filter.user
113
if sample_filter.project:
114
q['project_id'] = sample_filter.project
116
if sample_filter.meter:
117
q['counter_name'] = sample_filter.meter
119
raise RuntimeError('Missing required meter specifier')
121
ts_range = make_timestamp_range(sample_filter.start, sample_filter.end,
122
sample_filter.start_timestamp_op,
123
sample_filter.end_timestamp_op)
125
q['timestamp'] = ts_range
127
if sample_filter.resource:
128
q['resource_id'] = sample_filter.resource
129
if sample_filter.source:
130
q['source'] = sample_filter.source
131
if sample_filter.message_id:
132
q['message_id'] = sample_filter.message_id
134
# so the samples call metadata resource_metadata, so we convert
136
q.update(dict(('resource_%s' % k, v)
137
for (k, v) in sample_filter.metaquery.iteritems()))
141
class ConnectionPool(object):
146
def connect(self, url):
147
connection_options = pymongo.uri_parser.parse_uri(url)
148
del connection_options['database']
149
del connection_options['username']
150
del connection_options['password']
151
del connection_options['collection']
152
pool_key = tuple(connection_options)
154
if pool_key in self._pool:
155
client = self._pool.get(pool_key)()
158
LOG.info(_('Connecting to DB2 on %s'),
159
connection_options['nodelist'])
160
client = pymongo.MongoClient(
163
self._pool[pool_key] = weakref.ref(client)
167
class Connection(base.Connection):
78
class Connection(pymongo_base.Connection):
168
79
"""DB2 connection.
171
CONNECTION_POOL = ConnectionPool()
82
CONNECTION_POOL = pymongo_base.ConnectionPool()
173
84
GROUP = {'_id': '$counter_name',
174
85
'unit': {'$min': '$counter_unit'},
345
256
# modify a data structure owned by our caller (the driver adds
346
257
# a new key '_id').
347
258
record = copy.copy(data)
259
record['recorded_at'] = timeutils.utcnow()
348
260
# Make sure that the data does have field _id which db2 wont add
350
262
if record.get('_id') is None:
351
263
record['_id'] = str(bson.objectid.ObjectId())
352
264
self.db.meter.insert(record)
354
def get_users(self, source=None):
355
"""Return an iterable of user id strings.
357
:param source: Optional source filter.
360
if source is not None:
363
return (doc['_id'] for doc in
364
self.db.user.find(q, fields=['_id'],
365
sort=[('_id', pymongo.ASCENDING)]))
367
def get_projects(self, source=None):
368
"""Return an iterable of project id strings.
370
:param source: Optional source filter.
373
if source is not None:
376
return (doc['_id'] for doc in
377
self.db.project.find(q, fields=['_id'],
378
sort=[('_id', pymongo.ASCENDING)]))
380
266
def get_resources(self, user=None, project=None, source=None,
381
267
start_timestamp=None, start_timestamp_op=None,
382
268
end_timestamp=None, end_timestamp_op=None,
443
330
user_id=latest_meter['user_id'],
444
331
metadata=latest_meter['resource_metadata'])
446
def get_meters(self, user=None, project=None, resource=None, source=None,
447
metaquery={}, pagination=None):
448
"""Return an iterable of models.Meter instances
450
:param user: Optional ID for user that owns the resource.
451
:param project: Optional ID for project that owns the resource.
452
:param resource: Optional resource filter.
453
:param source: Optional source filter.
454
:param metaquery: Optional dict with metadata to match on.
455
:param pagination: Optional pagination query.
459
raise NotImplementedError(_('Pagination not implemented'))
464
if project is not None:
465
q['project_id'] = project
466
if resource is not None:
468
if source is not None:
472
for r in self.db.resource.find(q):
473
for r_meter in r['meter']:
475
name=r_meter['counter_name'],
476
type=r_meter['counter_type'],
477
# Return empty string if 'counter_unit' is not valid for
478
# backward compatibility.
479
unit=r_meter.get('counter_unit', ''),
480
resource_id=r['_id'],
481
project_id=r['project_id'],
483
user_id=r['user_id'],
486
def get_samples(self, sample_filter, limit=None):
487
"""Return an iterable of model.Sample instances.
489
:param sample_filter: Filter.
490
:param limit: Maximum number of results to return.
494
q = make_query_from_filter(sample_filter, require_meter=False)
497
samples = self.db.meter.find(
498
q, limit=limit, sort=[("timestamp", pymongo.DESCENDING)])
500
samples = self.db.meter.find(
501
q, sort=[("timestamp", pymongo.DESCENDING)])
504
# Remove the ObjectId generated by the database when
505
# the sample was inserted. It is an implementation
506
# detail that should not leak outside of the driver.
508
# Backward compatibility for samples without units
509
s['counter_unit'] = s.get('counter_unit', '')
510
yield models.Sample(**s)
512
def get_meter_statistics(self, sample_filter, period=None, groupby=None):
333
def get_meter_statistics(self, sample_filter, period=None, groupby=None,
513
335
"""Return an iterable of models.Statistics instance containing meter
514
336
statistics described by the query parameters.
555
381
'seconds': (periods * period) % self.SECONDS_IN_A_DAY}
557
383
for key, grouped_meters in itertools.groupby(meters, key=_group_key):
558
stat = models.Statistics(None, sys.maxint, -sys.maxint, 0, 0, 0,
559
0, 0, 0, 0, 0, 0, None)
384
stat = models.Statistics(unit=None,
385
min=sys.maxint, max=-sys.maxint,
386
avg=0, sum=0, count=0,
387
period=0, period_start=0, period_end=0,
388
duration=0, duration_start=0,
389
duration_end=0, groupby=None)
561
391
for meter in grouped_meters:
562
392
stat.unit = meter.get('counter_unit', '')
590
420
stat.period_end = stat.duration_end
594
def _decode_matching_metadata(matching_metadata):
595
if isinstance(matching_metadata, dict):
596
#note(sileht): keep compatibility with old db format
597
return matching_metadata
599
new_matching_metadata = {}
600
for elem in matching_metadata:
601
new_matching_metadata[elem['key']] = elem['value']
602
return new_matching_metadata
605
def _ensure_encapsulated_rule_format(cls, alarm):
606
"""This ensure the alarm returned by the storage have the correct
607
format. The previous format looks like:
609
'alarm_id': '0ld-4l3rt',
612
'description': 'old-alert',
616
'project_id': 'and-da-boys',
617
'comparison_operator': 'lt',
619
'statistic': 'count',
620
'evaluation_periods': 1,
622
'state': "insufficient data",
623
'state_timestamp': None,
625
'alarm_actions': ['http://nowhere/alarms'],
626
'insufficient_data_actions': [],
627
'repeat_actions': False,
628
'matching_metadata': {'key': 'value'}
629
# or 'matching_metadata': [{'key': 'key', 'value': 'value'}]
423
def get_capabilities(self):
424
"""Return an dictionary representing the capabilities of this driver.
427
'meters': {'query': {'simple': True,
429
'resources': {'query': {'simple': True,
431
'samples': {'query': {'simple': True,
434
'statistics': {'groupby': True,
435
'query': {'simple': True,
437
'aggregation': {'standard': True}},
438
'alarms': {'query': {'simple': True,
440
'history': {'query': {'simple': True}}},
633
if isinstance(alarm.get('rule'), dict):
636
alarm['type'] = 'threshold'
638
alarm['matching_metadata'] = cls._decode_matching_metadata(
639
alarm['matching_metadata'])
640
for field in ['period', 'evaluation_period', 'threshold',
641
'statistic', 'comparison_operator', 'meter_name']:
643
alarm['rule'][field] = alarm[field]
647
for key in alarm['matching_metadata']:
648
query.append({'field': key,
650
'value': alarm['matching_metadata'][key]})
651
del alarm['matching_metadata']
652
alarm['rule']['query'] = query
654
def get_alarms(self, name=None, user=None,
655
project=None, enabled=None, alarm_id=None, pagination=None):
656
"""Yields a lists of alarms that match filters
657
:param user: Optional ID for user that owns the resource.
658
:param project: Optional ID for project that owns the resource.
659
:param enabled: Optional boolean to list disable alarm.
660
:param alarm_id: Optional alarm_id to return one alarm.
661
:param metaquery: Optional dict with metadata to match on.
662
:param resource: Optional resource filter.
663
:param pagination: Optional pagination query.
667
raise NotImplementedError(_('Pagination not implemented'))
672
if project is not None:
673
q['project_id'] = project
676
if enabled is not None:
677
q['enabled'] = enabled
678
if alarm_id is not None:
679
q['alarm_id'] = alarm_id
681
for alarm in self.db.alarm.find(q):
685
self._ensure_encapsulated_rule_format(a)
686
yield models.Alarm(**a)
688
def update_alarm(self, alarm):
691
data = alarm.as_dict()
692
self.db.alarm.update(
693
{'alarm_id': alarm.alarm_id},
697
stored_alarm = self.db.alarm.find({'alarm_id': alarm.alarm_id})[0]
698
del stored_alarm['_id']
699
self._ensure_encapsulated_rule_format(stored_alarm)
700
return models.Alarm(**stored_alarm)
702
create_alarm = update_alarm
704
def delete_alarm(self, alarm_id):
707
self.db.alarm.remove({'alarm_id': alarm_id})
442
return utils.update_nested(self.DEFAULT_CAPABILITIES, available)