2
# Licensed under the Apache License, Version 2.0 (the "License"); you may
3
# not use this file except in compliance with the License. You may obtain
4
# a copy of the License at
6
# http://www.apache.org/licenses/LICENSE-2.0
8
# Unless required by applicable law or agreed to in writing, software
9
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11
# License for the specific language governing permissions and limitations
17
import elasticsearch as es
18
from elasticsearch import helpers
19
from oslo.utils import netutils
20
from oslo.utils import timeutils
23
from ceilometer.event.storage import base
24
from ceilometer.event.storage import models
25
from ceilometer import utils
28
AVAILABLE_CAPABILITIES = {
29
'events': {'query': {'simple': True}},
33
AVAILABLE_STORAGE_CAPABILITIES = {
34
'storage': {'production_ready': True},
38
class Connection(base.Connection):
39
"""Put the event data into an ElasticSearch db.
41
Events in ElasticSearch are indexed by day and stored by event_type.
44
{"_index":"events_2014-10-21",
45
"_type":"event_type0",
46
"_id":"dc90e464-65ab-4a5d-bf66-ecb956b5d779",
48
"_source":{"timestamp": "2014-10-21T20:02:09.274797"
49
"traits": {"id4_0": "2014-10-21T20:02:09.274797",
50
"id3_0": 0.7510790937279408,
52
"id1_0": "18c97ba1-3b74-441a-b948-a702a30cbce2"}
57
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
58
AVAILABLE_CAPABILITIES)
59
STORAGE_CAPABILITIES = utils.update_nested(
60
base.Connection.STORAGE_CAPABILITIES,
61
AVAILABLE_STORAGE_CAPABILITIES,
64
# NOTE(gordc): mainly for testing, data is not searchable after write,
65
# it is only searchable after periodic refreshes.
66
_refresh_on_write = False
68
def __init__(self, url):
69
url_split = netutils.urlsplit(url)
70
self.conn = es.Elasticsearch(url_split.netloc)
73
iclient = es.client.IndicesClient(self.conn)
76
'mappings': {'_default_':
77
{'_timestamp': {'enabled': True,
79
'properties': {'traits': {'type': 'nested'}}}}}
80
iclient.put_template(name='enable_timestamp', body=ts_template)
82
def record_events(self, events):
84
def _build_bulk_index(event_list):
86
traits = {t.name: t.value for t in ev.traits}
87
yield {'_op_type': 'create',
88
'_index': '%s_%s' % (self.index_name,
89
ev.generated.date().isoformat()),
90
'_type': ev.event_type,
92
'_source': {'timestamp': ev.generated.isoformat(),
96
for ok, result in helpers.streaming_bulk(
97
self.conn, _build_bulk_index(events)):
99
__, result = result.popitem()
100
if result['status'] == 409:
101
problem_events.append((models.Event.DUPLICATE,
104
problem_events.append((models.Event.UNKNOWN_PROBLEM,
107
if self._refresh_on_write:
108
self.conn.indices.refresh(index='%s_*' % self.index_name)
109
while self.conn.cluster.pending_tasks(local=True)['tasks']:
111
return problem_events
113
def _make_dsl_from_filter(self, indices, ev_filter):
117
if ev_filter.start_timestamp:
118
filters.append({'range': {'timestamp':
119
{'ge': ev_filter.start_timestamp.isoformat()}}})
121
'%s_%s' % (self.index_name,
122
ev_filter.start_timestamp.date().isoformat())):
124
if ev_filter.end_timestamp:
125
filters.append({'range': {'timestamp':
126
{'le': ev_filter.end_timestamp.isoformat()}}})
127
while indices[-1] > (
128
'%s_%s' % (self.index_name,
129
ev_filter.end_timestamp.date().isoformat())):
131
q_args['index'] = indices
133
if ev_filter.event_type:
134
q_args['doc_type'] = ev_filter.event_type
135
if ev_filter.message_id:
136
filters.append({'term': {'_id': ev_filter.message_id}})
137
if ev_filter.traits_filter:
139
for t_filter in ev_filter.traits_filter:
141
for val_type in ['integer', 'string', 'float', 'datetime']:
142
if t_filter.get(val_type):
143
value = t_filter.get(val_type)
144
if isinstance(value, six.string_types):
145
value = value.lower()
146
elif isinstance(value, datetime.datetime):
147
value = value.isoformat()
149
if t_filter.get('op') in ['gt', 'ge', 'lt', 'le']:
150
op = (t_filter.get('op').replace('ge', 'gte')
151
.replace('le', 'lte'))
152
trait_filters.append(
153
{'range': {t_filter['key']: {op: value}}})
155
tf = {"query": {"query_string": {
156
"query": "%s: \"%s\"" % (t_filter['key'], value)}}}
157
if t_filter.get('op') == 'ne':
159
trait_filters.append(tf)
161
{'nested': {'path': 'traits', 'query': {'filtered': {
162
'filter': {'bool': {'must': trait_filters}}}}}})
164
q_args['body'] = {'query': {'filtered':
165
{'filter': {'bool': {'must': filters}}}}}
168
def get_events(self, event_filter):
169
iclient = es.client.IndicesClient(self.conn)
170
indices = iclient.get_mapping('%s_*' % self.index_name).keys()
172
filter_args = self._make_dsl_from_filter(indices, event_filter)
173
results = self.conn.search(fields=['_id', 'timestamp',
175
sort='timestamp:asc',
178
for record in results['hits']['hits']:
180
if not record['_type'] in trait_mappings:
181
trait_mappings[record['_type']] = list(
182
self.get_trait_types(record['_type']))
183
for key in record['_source']['traits'].keys():
184
value = record['_source']['traits'][key]
185
for t_map in trait_mappings[record['_type']]:
186
if t_map['name'] == key:
187
dtype = t_map['data_type']
189
trait_list.append(models.Trait(
190
name=key, dtype=dtype,
191
value=models.Trait.convert_value(dtype, value)))
192
gen_ts = timeutils.normalize_time(timeutils.parse_isotime(
193
record['_source']['timestamp']))
194
yield models.Event(message_id=record['_id'],
195
event_type=record['_type'],
199
key=operator.attrgetter('dtype')))
201
def get_event_types(self):
202
iclient = es.client.IndicesClient(self.conn)
203
es_mappings = iclient.get_mapping('%s_*' % self.index_name)
205
for index in es_mappings.keys():
206
for ev_type in es_mappings[index]['mappings'].keys():
207
seen_types.add(ev_type)
208
# TODO(gordc): tests assume sorted ordering but backends are not
209
# explicitly ordered.
210
# NOTE: _default_ is a type that appears in all mappings but is not
212
seen_types.discard('_default_')
213
return sorted(list(seen_types))
216
def _remap_es_types(d_type):
217
if d_type == 'string':
219
elif d_type == 'long':
221
elif d_type == 'double':
223
elif d_type == 'date' or d_type == 'date_time':
227
def get_trait_types(self, event_type):
228
iclient = es.client.IndicesClient(self.conn)
229
es_mappings = iclient.get_mapping('%s_*' % self.index_name)
231
for index in es_mappings.keys():
232
# if event_type exists in index and has traits
233
if (es_mappings[index]['mappings'].get(event_type) and
234
es_mappings[index]['mappings'][event_type]['properties']
235
['traits'].get('properties')):
236
for t_type in (es_mappings[index]['mappings'][event_type]
237
['properties']['traits']['properties'].keys()):
238
d_type = (es_mappings[index]['mappings'][event_type]
239
['properties']['traits']['properties']
241
d_type = models.Trait.get_type_by_name(
242
self._remap_es_types(d_type))
243
if (t_type, d_type) not in seen_types:
244
yield {'name': t_type, 'data_type': d_type}
245
seen_types.append((t_type, d_type))
247
def get_traits(self, event_type, trait_type=None):
248
t_types = dict((res['name'], res['data_type'])
249
for res in self.get_trait_types(event_type))
250
if not t_types or (trait_type and trait_type not in t_types.keys()):
252
result = self.conn.search('%s_*' % self.index_name, event_type)
253
for ev in result['hits']['hits']:
254
if trait_type and ev['_source']['traits'].get(trait_type):
257
dtype=t_types[trait_type],
258
value=models.Trait.convert_value(
260
ev['_source']['traits'][trait_type]))
262
for trait in ev['_source']['traits'].keys():
265
dtype=t_types[trait],
266
value=models.Trait.convert_value(
268
ev['_source']['traits'][trait]))