77
81
return Connection(conf)
80
def make_timestamp_range(start, end,
81
start_timestamp_op=None, end_timestamp_op=None):
82
"""Given two possible datetimes and their operations, create the query
83
document to find timestamps within that range.
84
By default, using $gte for the lower bound and $lt for the
90
if start_timestamp_op == 'gt':
91
start_timestamp_op = '$gt'
93
start_timestamp_op = '$gte'
94
ts_range[start_timestamp_op] = start
97
if end_timestamp_op == 'le':
98
end_timestamp_op = '$lte'
100
end_timestamp_op = '$lt'
101
ts_range[end_timestamp_op] = end
105
def make_query_from_filter(sample_filter, require_meter=True):
106
"""Return a query dictionary based on the settings in the filter.
108
:param filter: SampleFilter instance
109
:param require_meter: If true and the filter does not have a meter,
114
if sample_filter.user:
115
q['user_id'] = sample_filter.user
116
if sample_filter.project:
117
q['project_id'] = sample_filter.project
119
if sample_filter.meter:
120
q['counter_name'] = sample_filter.meter
122
raise RuntimeError('Missing required meter specifier')
124
ts_range = make_timestamp_range(sample_filter.start, sample_filter.end,
125
sample_filter.start_timestamp_op,
126
sample_filter.end_timestamp_op)
128
q['timestamp'] = ts_range
130
if sample_filter.resource:
131
q['resource_id'] = sample_filter.resource
132
if sample_filter.source:
133
q['source'] = sample_filter.source
134
if sample_filter.message_id:
135
q['message_id'] = sample_filter.message_id
137
# so the samples call metadata resource_metadata, so we convert
139
q.update(dict(('resource_%s' % k, v)
140
for (k, v) in sample_filter.metaquery.iteritems()))
144
class ConnectionPool(object):
149
def connect(self, url):
150
connection_options = pymongo.uri_parser.parse_uri(url)
151
del connection_options['database']
152
del connection_options['username']
153
del connection_options['password']
154
del connection_options['collection']
155
pool_key = tuple(connection_options)
157
if pool_key in self._pool:
158
client = self._pool.get(pool_key)()
161
LOG.info(_('Connecting to MongoDB on %s'),
162
connection_options['nodelist'])
163
client = pymongo.MongoClient(
166
self._pool[pool_key] = weakref.ref(client)
170
class Connection(base.Connection):
84
class Connection(pymongo_base.Connection):
171
85
"""MongoDB connection.
174
CONNECTION_POOL = ConnectionPool()
88
CONNECTION_POOL = pymongo_base.ConnectionPool()
176
90
REDUCE_GROUP_CLEAN = bson.code.Code("""
177
91
function ( curr, result ) {
101
STANDARD_AGGREGATES = dict(
110
sum='sum: this.counter_volume,',
111
count='count: NumberInt(1),',
112
avg='acount: NumberInt(1), asum: this.counter_volume,',
113
min='min: this.counter_volume,',
114
max='max: this.counter_volume,'
124
sum='sum: values[0].sum,',
125
count='count: values[0].count,',
126
avg='acount: values[0].acount, asum: values[0].asum,',
127
min='min: values[0].min,',
128
max='max: values[0].max,'
130
reduce_computation=dict(
131
sum='res.sum += values[i].sum;',
132
count='res.count = NumberInt(res.count + values[i].count);',
133
avg=('res.acount = NumberInt(res.acount + values[i].acount);'
134
'res.asum += values[i].asum;'),
135
min='if ( values[i].min < res.min ) {res.min = values[i].min;}',
136
max='if ( values[i].max > res.max ) {res.max = values[i].max;}'
141
avg='value.avg = value.asum / value.acount;',
147
UNPARAMETERIZED_AGGREGATES = dict(
154
stddev='sdsum: this.counter_volume,'
156
'weighted_distances: 0,'
163
stddev='sdsum: values[0].sdsum,'
164
'sdcount: values[0].sdcount,'
165
'weighted_distances: values[0].weighted_distances,'
166
'stddev: values[0].stddev,'
168
reduce_computation=dict(
170
'var deviance = (res.sdsum / res.sdcount) - values[i].sdsum;'
171
'var weight = res.sdcount / ++res.sdcount;'
172
'res.weighted_distances += (Math.pow(deviance, 2) * weight);'
173
'res.sdsum += values[i].sdsum;'
178
'value.stddev = Math.sqrt(value.weighted_distances /'
184
PARAMETERIZED_AGGREGATES = dict(
186
cardinality=lambda p: p in ['resource_id', 'user_id', 'project_id',
191
'var aggregate = {};'
192
'aggregate["cardinality/%(aggregate_param)s"] ='
193
' this["%(aggregate_param)s"];'
197
cardinality='aggregate : aggregate,'
201
'var distincts = {};'
202
'distincts[values[0].aggregate['
203
' "cardinality/%(aggregate_param)s"]] = true;'
204
'var aggregate = {};'
205
'aggregate["cardinality/%(aggregate_param)s"] = NumberInt(1);'
209
cardinality='aggregate : aggregate,'
211
reduce_computation=dict(
213
'if (!(values[i].aggregate["cardinality/%(aggregate_param)s"]'
215
' distincts[values[i].aggregate['
216
' "cardinality/%(aggregate_param)s"]] = true;'
217
' res.aggregate["cardinality/%(aggregate_param)s"] ='
218
' NumberInt(Object.keys(distincts).length);}'
223
'if (typeof value.aggregate['
224
' "cardinality/%(aggregate_param)s"] !== "number") {'
225
' value.aggregate["cardinality/%(aggregate_param)s"] ='
187
231
EMIT_STATS_COMMON = """
232
%(aggregate_initial_placeholder)s
188
233
emit(%(key_val)s, { unit: this.counter_unit,
189
min : this.counter_volume,
190
max : this.counter_volume,
191
sum : this.counter_volume,
192
count : NumberInt(1),
234
%(aggregate_body_placeholder)s
193
235
groupby : %(groupby_val)s,
194
236
duration_start : this.timestamp,
195
237
duration_end : this.timestamp,
696
727
self.db[out].drop()
698
def get_meters(self, user=None, project=None, resource=None, source=None,
699
metaquery={}, pagination=None):
700
"""Return an iterable of models.Meter instances
702
:param user: Optional ID for user that owns the resource.
703
:param project: Optional ID for project that owns the resource.
704
:param resource: Optional resource filter.
705
:param source: Optional source filter.
706
:param metaquery: Optional dict with metadata to match on.
707
:param pagination: Optional pagination query.
710
raise NotImplementedError(_('Pagination not implemented'))
715
if project is not None:
716
q['project_id'] = project
717
if resource is not None:
719
if source is not None:
723
for r in self.db.resource.find(q):
724
for r_meter in r['meter']:
726
name=r_meter['counter_name'],
727
type=r_meter['counter_type'],
728
# Return empty string if 'counter_unit' is not valid for
729
# backward compatibility.
730
unit=r_meter.get('counter_unit', ''),
731
resource_id=r['_id'],
732
project_id=r['project_id'],
734
user_id=r['user_id'],
737
def get_samples(self, sample_filter, limit=None):
738
"""Return an iterable of model.Sample instances.
740
:param sample_filter: Filter.
741
:param limit: Maximum number of results to return.
745
q = make_query_from_filter(sample_filter, require_meter=False)
747
samples = self.db.meter.find(
748
q, limit=limit, sort=[("timestamp", pymongo.DESCENDING)])
750
samples = self.db.meter.find(
751
q, sort=[("timestamp", pymongo.DESCENDING)])
754
# Remove the ObjectId generated by the database when
755
# the sample was inserted. It is an implementation
756
# detail that should not leak outside of the driver.
758
# Backward compatibility for samples without units
759
s['counter_unit'] = s.get('counter_unit', '')
760
yield models.Sample(**s)
762
def get_meter_statistics(self, sample_filter, period=None, groupby=None):
729
def _aggregate_param(self, fragment_key, aggregate):
730
fragment_map = self.STANDARD_AGGREGATES[fragment_key]
733
return ''.join([f for f in fragment_map.values()])
738
if a.func in self.STANDARD_AGGREGATES[fragment_key]:
739
fragment_map = self.STANDARD_AGGREGATES[fragment_key]
740
fragments += fragment_map[a.func]
741
elif a.func in self.UNPARAMETERIZED_AGGREGATES[fragment_key]:
742
fragment_map = self.UNPARAMETERIZED_AGGREGATES[fragment_key]
743
fragments += fragment_map[a.func]
744
elif a.func in self.PARAMETERIZED_AGGREGATES[fragment_key]:
745
fragment_map = self.PARAMETERIZED_AGGREGATES[fragment_key]
746
v = self.PARAMETERIZED_AGGREGATES['validate'].get(a.func)
747
if not (v and v(a.param)):
748
raise storage.StorageBadAggregate('Bad aggregate: %s.%s'
750
params = dict(aggregate_param=a.param)
751
fragments += (fragment_map[a.func] % params)
753
raise NotImplementedError(_('Selectable aggregate function %s'
754
' is not supported') % a.func)
758
def get_meter_statistics(self, sample_filter, period=None, groupby=None,
763
760
"""Return an iterable of models.Statistics instance containing meter
764
761
statistics described by the query parameters.
781
778
limit=1, sort=[('timestamp',
782
779
pymongo.ASCENDING)])[0]['timestamp']
783
780
period_start = int(calendar.timegm(period_start.utctimetuple()))
784
params_period = {'period': period,
785
'period_first': period_start,
786
'groupby_fields': json.dumps(groupby)}
781
map_params = {'period': period,
782
'period_first': period_start,
783
'groupby_fields': json.dumps(groupby)}
788
map_stats = self.MAP_STATS_PERIOD_GROUPBY % params_period
785
map_fragment = self.MAP_STATS_PERIOD_GROUPBY
790
map_stats = self.MAP_STATS_PERIOD % params_period
787
map_fragment = self.MAP_STATS_PERIOD
793
params_groupby = {'groupby_fields': json.dumps(groupby)}
794
map_stats = self.MAP_STATS_GROUPBY % params_groupby
790
map_params = {'groupby_fields': json.dumps(groupby)}
791
map_fragment = self.MAP_STATS_GROUPBY
796
map_stats = self.MAP_STATS
794
map_fragment = self.MAP_STATS
796
sub = self._aggregate_param
798
map_params['aggregate_initial_val'] = sub('emit_initial', aggregate)
799
map_params['aggregate_body_val'] = sub('emit_body', aggregate)
801
map_stats = map_fragment % map_params
803
reduce_params = dict(
804
aggregate_initial_val=sub('reduce_initial', aggregate),
805
aggregate_body_val=sub('reduce_body', aggregate),
806
aggregate_computation_val=sub('reduce_computation', aggregate)
808
reduce_stats = self.REDUCE_STATS % reduce_params
810
finalize_params = dict(aggregate_val=sub('finalize', aggregate))
811
finalize_stats = self.FINALIZE_STATS % finalize_params
798
813
results = self.db.meter.map_reduce(
802
finalize=self.FINALIZE_STATS,
817
finalize=finalize_stats,
806
821
# FIXME(terriyu) Fix get_meter_statistics() so we don't use sorted()
807
822
# to return the results
809
(models.Statistics(**(r['value'])) for r in results['results']),
824
(self._stats_result_to_model(r['value'], groupby, aggregate)
825
for r in results['results']),
810
826
key=operator.attrgetter('period_start'))
813
def _decode_matching_metadata(matching_metadata):
814
if isinstance(matching_metadata, dict):
815
#note(sileht): keep compatibility with alarm
816
#with matching_metadata as a dict
817
return matching_metadata
819
new_matching_metadata = {}
820
for elem in matching_metadata:
821
new_matching_metadata[elem['key']] = elem['value']
822
return new_matching_metadata
825
def _ensure_encapsulated_rule_format(cls, alarm):
826
"""This ensure the alarm returned by the storage have the correct
827
format. The previous format looks like:
829
'alarm_id': '0ld-4l3rt',
832
'description': 'old-alert',
836
'project_id': 'and-da-boys',
837
'comparison_operator': 'lt',
839
'statistic': 'count',
840
'evaluation_periods': 1,
842
'state': "insufficient data",
843
'state_timestamp': None,
845
'alarm_actions': ['http://nowhere/alarms'],
846
'insufficient_data_actions': [],
847
'repeat_actions': False,
848
'matching_metadata': {'key': 'value'}
849
# or 'matching_metadata': [{'key': 'key', 'value': 'value'}]
829
def _stats_result_aggregates(result, aggregate):
831
for attr in ['count', 'min', 'max', 'sum', 'avg']:
833
stats_args[attr] = result[attr]
836
stats_args['aggregate'] = {}
838
ak = '%s%s' % (a.func, '/%s' % a.param if a.param else '')
840
stats_args['aggregate'][ak] = result[ak]
841
elif 'aggregate' in result:
842
stats_args['aggregate'][ak] = result['aggregate'].get(ak)
846
def _stats_result_to_model(result, groupby, aggregate):
847
stats_args = Connection._stats_result_aggregates(result, aggregate)
848
stats_args['unit'] = result['unit']
849
stats_args['duration'] = result['duration']
850
stats_args['duration_start'] = result['duration_start']
851
stats_args['duration_end'] = result['duration_end']
852
stats_args['period'] = result['period']
853
stats_args['period_start'] = result['period_start']
854
stats_args['period_end'] = result['period_end']
855
stats_args['groupby'] = (dict(
856
(g, result['groupby'][g]) for g in groupby) if groupby else None)
857
return models.Statistics(**stats_args)
859
def get_capabilities(self):
860
"""Return an dictionary representing the capabilities of this driver.
863
'meters': {'query': {'simple': True,
865
'resources': {'query': {'simple': True,
867
'samples': {'query': {'simple': True,
870
'statistics': {'groupby': True,
871
'query': {'simple': True,
873
'aggregation': {'standard': True,
881
'cardinality': True}}
883
'alarms': {'query': {'simple': True,
885
'history': {'query': {'simple': True,
853
if isinstance(alarm.get('rule'), dict):
856
alarm['type'] = 'threshold'
858
alarm['matching_metadata'] = cls._decode_matching_metadata(
859
alarm['matching_metadata'])
860
for field in ['period', 'evaluation_periods', 'threshold',
861
'statistic', 'comparison_operator', 'meter_name']:
863
alarm['rule'][field] = alarm[field]
867
for key in alarm['matching_metadata']:
868
query.append({'field': key,
870
'value': alarm['matching_metadata'][key],
872
del alarm['matching_metadata']
873
alarm['rule']['query'] = query
875
def get_alarms(self, name=None, user=None,
876
project=None, enabled=None, alarm_id=None, pagination=None):
877
"""Yields a lists of alarms that match filters
878
:param name: The Alarm name.
879
:param user: Optional ID for user that owns the resource.
880
:param project: Optional ID for project that owns the resource.
881
:param enabled: Optional boolean to list disable alarm.
882
:param alarm_id: Optional alarm_id to return one alarm.
883
:param pagination: Optional pagination query.
886
raise NotImplementedError(_('Pagination not implemented'))
891
if project is not None:
892
q['project_id'] = project
895
if enabled is not None:
896
q['enabled'] = enabled
897
if alarm_id is not None:
898
q['alarm_id'] = alarm_id
900
for alarm in self.db.alarm.find(q):
904
self._ensure_encapsulated_rule_format(a)
905
yield models.Alarm(**a)
907
def update_alarm(self, alarm):
910
data = alarm.as_dict()
912
self.db.alarm.update(
913
{'alarm_id': alarm.alarm_id},
917
stored_alarm = self.db.alarm.find({'alarm_id': alarm.alarm_id})[0]
918
del stored_alarm['_id']
919
self._ensure_encapsulated_rule_format(stored_alarm)
920
return models.Alarm(**stored_alarm)
922
create_alarm = update_alarm
924
def delete_alarm(self, alarm_id):
927
self.db.alarm.remove({'alarm_id': alarm_id})
929
def get_alarm_changes(self, alarm_id, on_behalf_of,
930
user=None, project=None, type=None,
931
start_timestamp=None, start_timestamp_op=None,
932
end_timestamp=None, end_timestamp_op=None):
933
"""Yields list of AlarmChanges describing alarm history
935
Changes are always sorted in reverse order of occurrence, given
936
the importance of currency.
938
Segregation for non-administrative users is done on the basis
939
of the on_behalf_of parameter. This allows such users to have
940
visibility on both the changes initiated by themselves directly
941
(generally creation, rule changes, or deletion) and also on those
942
changes initiated on their behalf by the alarming service (state
943
transitions after alarm thresholds are crossed).
945
:param alarm_id: ID of alarm to return changes for
946
:param on_behalf_of: ID of tenant to scope changes query (None for
947
administrative user, indicating all projects)
948
:param user: Optional ID of user to return changes for
949
:param project: Optional ID of project to return changes for
950
:project type: Optional change type
951
:param start_timestamp: Optional modified timestamp start range
952
:param start_timestamp_op: Optional timestamp start range operation
953
:param end_timestamp: Optional modified timestamp end range
954
:param end_timestamp_op: Optional timestamp end range operation
956
q = dict(alarm_id=alarm_id)
957
if on_behalf_of is not None:
958
q['on_behalf_of'] = on_behalf_of
961
if project is not None:
962
q['project_id'] = project
965
if start_timestamp or end_timestamp:
966
ts_range = make_timestamp_range(start_timestamp, end_timestamp,
970
q['timestamp'] = ts_range
972
sort = [("timestamp", pymongo.DESCENDING)]
973
for alarm_change in self.db.alarm_history.find(q, sort=sort):
975
ac.update(alarm_change)
977
yield models.AlarmChange(**ac)
979
def record_alarm_change(self, alarm_change):
980
"""Record alarm change event.
982
self.db.alarm_history.insert(alarm_change)
888
return utils.update_nested(self.DEFAULT_CAPABILITIES, available)