~ubuntu-branches/ubuntu/vivid/ceilometer/vivid

« back to all changes in this revision

Viewing changes to ceilometer/event/storage/impl_elasticsearch.py

  • Committer: Package Import Robot
  • Author(s): James Page, Corey Bryant, James Page
  • Date: 2015-02-19 14:59:07 UTC
  • mfrom: (1.2.3)
  • Revision ID: package-import@ubuntu.com-20150219145907-9jojybdsl64zcn14
Tags: 2015.1~b2-0ubuntu1
[ Corey Bryant ]
* New upstream release.
  - d/control: Align requirements with upstream.
  - d/p/skip-test.patch: Rebased.

[ James Page ]
* d/rules,d/p/skip-gabbi.patch: Skip tests that rely on python-gabbi until
  packaging and MIR is complete.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 
3
# not use this file except in compliance with the License. You may obtain
 
4
# a copy of the License at
 
5
#
 
6
#      http://www.apache.org/licenses/LICENSE-2.0
 
7
#
 
8
# Unless required by applicable law or agreed to in writing, software
 
9
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
10
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
11
# License for the specific language governing permissions and limitations
 
12
# under the License.
 
13
 
 
14
import datetime
 
15
import operator
 
16
 
 
17
import elasticsearch as es
 
18
from elasticsearch import helpers
 
19
from oslo.utils import netutils
 
20
from oslo.utils import timeutils
 
21
import six
 
22
 
 
23
from ceilometer.event.storage import base
 
24
from ceilometer.event.storage import models
 
25
from ceilometer import utils
 
26
 
 
27
 
 
28
AVAILABLE_CAPABILITIES = {
 
29
    'events': {'query': {'simple': True}},
 
30
}
 
31
 
 
32
 
 
33
AVAILABLE_STORAGE_CAPABILITIES = {
 
34
    'storage': {'production_ready': True},
 
35
}
 
36
 
 
37
 
 
38
class Connection(base.Connection):
 
39
    """Put the event data into an ElasticSearch db.
 
40
 
 
41
    Events in ElasticSearch are indexed by day and stored by event_type.
 
42
    An example document::
 
43
 
 
44
      {"_index":"events_2014-10-21",
 
45
       "_type":"event_type0",
 
46
       "_id":"dc90e464-65ab-4a5d-bf66-ecb956b5d779",
 
47
       "_score":1.0,
 
48
       "_source":{"timestamp": "2014-10-21T20:02:09.274797"
 
49
                  "traits": {"id4_0": "2014-10-21T20:02:09.274797",
 
50
                             "id3_0": 0.7510790937279408,
 
51
                             "id2_0": 5,
 
52
                             "id1_0": "18c97ba1-3b74-441a-b948-a702a30cbce2"}
 
53
                 }
 
54
      }
 
55
    """
 
56
 
 
57
    CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
 
58
                                       AVAILABLE_CAPABILITIES)
 
59
    STORAGE_CAPABILITIES = utils.update_nested(
 
60
        base.Connection.STORAGE_CAPABILITIES,
 
61
        AVAILABLE_STORAGE_CAPABILITIES,
 
62
    )
 
63
    index_name = 'events'
 
64
    # NOTE(gordc): mainly for testing, data is not searchable after write,
 
65
    #              it is only searchable after periodic refreshes.
 
66
    _refresh_on_write = False
 
67
 
 
68
    def __init__(self, url):
 
69
        url_split = netutils.urlsplit(url)
 
70
        self.conn = es.Elasticsearch(url_split.netloc)
 
71
 
 
72
    def upgrade(self):
 
73
        iclient = es.client.IndicesClient(self.conn)
 
74
        ts_template = {
 
75
            'template': '*',
 
76
            'mappings': {'_default_':
 
77
                         {'_timestamp': {'enabled': True,
 
78
                                         'store': True},
 
79
                          'properties': {'traits': {'type': 'nested'}}}}}
 
80
        iclient.put_template(name='enable_timestamp', body=ts_template)
 
81
 
 
82
    def record_events(self, events):
 
83
 
 
84
        def _build_bulk_index(event_list):
 
85
            for ev in event_list:
 
86
                traits = {t.name: t.value for t in ev.traits}
 
87
                yield {'_op_type': 'create',
 
88
                       '_index': '%s_%s' % (self.index_name,
 
89
                                            ev.generated.date().isoformat()),
 
90
                       '_type': ev.event_type,
 
91
                       '_id': ev.message_id,
 
92
                       '_source': {'timestamp': ev.generated.isoformat(),
 
93
                                   'traits': traits}}
 
94
 
 
95
        problem_events = []
 
96
        for ok, result in helpers.streaming_bulk(
 
97
                self.conn, _build_bulk_index(events)):
 
98
            if not ok:
 
99
                __, result = result.popitem()
 
100
                if result['status'] == 409:
 
101
                    problem_events.append((models.Event.DUPLICATE,
 
102
                                           result['_id']))
 
103
                else:
 
104
                    problem_events.append((models.Event.UNKNOWN_PROBLEM,
 
105
                                           result['_id']))
 
106
 
 
107
        if self._refresh_on_write:
 
108
            self.conn.indices.refresh(index='%s_*' % self.index_name)
 
109
            while self.conn.cluster.pending_tasks(local=True)['tasks']:
 
110
                pass
 
111
        return problem_events
 
112
 
 
113
    def _make_dsl_from_filter(self, indices, ev_filter):
 
114
        q_args = {}
 
115
        filters = []
 
116
 
 
117
        if ev_filter.start_timestamp:
 
118
            filters.append({'range': {'timestamp':
 
119
                           {'ge': ev_filter.start_timestamp.isoformat()}}})
 
120
            while indices[0] < (
 
121
                '%s_%s' % (self.index_name,
 
122
                           ev_filter.start_timestamp.date().isoformat())):
 
123
                del indices[0]
 
124
        if ev_filter.end_timestamp:
 
125
            filters.append({'range': {'timestamp':
 
126
                           {'le': ev_filter.end_timestamp.isoformat()}}})
 
127
            while indices[-1] > (
 
128
                '%s_%s' % (self.index_name,
 
129
                           ev_filter.end_timestamp.date().isoformat())):
 
130
                del indices[-1]
 
131
        q_args['index'] = indices
 
132
 
 
133
        if ev_filter.event_type:
 
134
            q_args['doc_type'] = ev_filter.event_type
 
135
        if ev_filter.message_id:
 
136
            filters.append({'term': {'_id': ev_filter.message_id}})
 
137
        if ev_filter.traits_filter:
 
138
            trait_filters = []
 
139
            for t_filter in ev_filter.traits_filter:
 
140
                value = None
 
141
                for val_type in ['integer', 'string', 'float', 'datetime']:
 
142
                    if t_filter.get(val_type):
 
143
                        value = t_filter.get(val_type)
 
144
                        if isinstance(value, six.string_types):
 
145
                            value = value.lower()
 
146
                        elif isinstance(value, datetime.datetime):
 
147
                            value = value.isoformat()
 
148
                        break
 
149
                if t_filter.get('op') in ['gt', 'ge', 'lt', 'le']:
 
150
                    op = (t_filter.get('op').replace('ge', 'gte')
 
151
                          .replace('le', 'lte'))
 
152
                    trait_filters.append(
 
153
                        {'range': {t_filter['key']: {op: value}}})
 
154
                else:
 
155
                    tf = {"query": {"query_string": {
 
156
                        "query": "%s: \"%s\"" % (t_filter['key'], value)}}}
 
157
                    if t_filter.get('op') == 'ne':
 
158
                        tf = {"not": tf}
 
159
                    trait_filters.append(tf)
 
160
            filters.append(
 
161
                {'nested': {'path': 'traits', 'query': {'filtered': {
 
162
                    'filter': {'bool': {'must': trait_filters}}}}}})
 
163
 
 
164
        q_args['body'] = {'query': {'filtered':
 
165
                                    {'filter': {'bool': {'must': filters}}}}}
 
166
        return q_args
 
167
 
 
168
    def get_events(self, event_filter):
 
169
        iclient = es.client.IndicesClient(self.conn)
 
170
        indices = iclient.get_mapping('%s_*' % self.index_name).keys()
 
171
        if indices:
 
172
            filter_args = self._make_dsl_from_filter(indices, event_filter)
 
173
            results = self.conn.search(fields=['_id', 'timestamp',
 
174
                                               '_type', '_source'],
 
175
                                       sort='timestamp:asc',
 
176
                                       **filter_args)
 
177
            trait_mappings = {}
 
178
            for record in results['hits']['hits']:
 
179
                trait_list = []
 
180
                if not record['_type'] in trait_mappings:
 
181
                    trait_mappings[record['_type']] = list(
 
182
                        self.get_trait_types(record['_type']))
 
183
                for key in record['_source']['traits'].keys():
 
184
                    value = record['_source']['traits'][key]
 
185
                    for t_map in trait_mappings[record['_type']]:
 
186
                        if t_map['name'] == key:
 
187
                            dtype = t_map['data_type']
 
188
                            break
 
189
                    trait_list.append(models.Trait(
 
190
                        name=key, dtype=dtype,
 
191
                        value=models.Trait.convert_value(dtype, value)))
 
192
                gen_ts = timeutils.normalize_time(timeutils.parse_isotime(
 
193
                    record['_source']['timestamp']))
 
194
                yield models.Event(message_id=record['_id'],
 
195
                                   event_type=record['_type'],
 
196
                                   generated=gen_ts,
 
197
                                   traits=sorted(
 
198
                                       trait_list,
 
199
                                       key=operator.attrgetter('dtype')))
 
200
 
 
201
    def get_event_types(self):
 
202
        iclient = es.client.IndicesClient(self.conn)
 
203
        es_mappings = iclient.get_mapping('%s_*' % self.index_name)
 
204
        seen_types = set()
 
205
        for index in es_mappings.keys():
 
206
            for ev_type in es_mappings[index]['mappings'].keys():
 
207
                seen_types.add(ev_type)
 
208
        # TODO(gordc): tests assume sorted ordering but backends are not
 
209
        #              explicitly ordered.
 
210
        # NOTE: _default_ is a type that appears in all mappings but is not
 
211
        #       real 'type'
 
212
        seen_types.discard('_default_')
 
213
        return sorted(list(seen_types))
 
214
 
 
215
    @staticmethod
 
216
    def _remap_es_types(d_type):
 
217
        if d_type == 'string':
 
218
            d_type = 'text'
 
219
        elif d_type == 'long':
 
220
            d_type = 'int'
 
221
        elif d_type == 'double':
 
222
            d_type = 'float'
 
223
        elif d_type == 'date' or d_type == 'date_time':
 
224
            d_type = 'datetime'
 
225
        return d_type
 
226
 
 
227
    def get_trait_types(self, event_type):
 
228
        iclient = es.client.IndicesClient(self.conn)
 
229
        es_mappings = iclient.get_mapping('%s_*' % self.index_name)
 
230
        seen_types = []
 
231
        for index in es_mappings.keys():
 
232
            # if event_type exists in index and has traits
 
233
            if (es_mappings[index]['mappings'].get(event_type) and
 
234
                    es_mappings[index]['mappings'][event_type]['properties']
 
235
                    ['traits'].get('properties')):
 
236
                for t_type in (es_mappings[index]['mappings'][event_type]
 
237
                               ['properties']['traits']['properties'].keys()):
 
238
                    d_type = (es_mappings[index]['mappings'][event_type]
 
239
                              ['properties']['traits']['properties']
 
240
                              [t_type]['type'])
 
241
                    d_type = models.Trait.get_type_by_name(
 
242
                        self._remap_es_types(d_type))
 
243
                    if (t_type, d_type) not in seen_types:
 
244
                        yield {'name': t_type, 'data_type': d_type}
 
245
                        seen_types.append((t_type, d_type))
 
246
 
 
247
    def get_traits(self, event_type, trait_type=None):
 
248
        t_types = dict((res['name'], res['data_type'])
 
249
                       for res in self.get_trait_types(event_type))
 
250
        if not t_types or (trait_type and trait_type not in t_types.keys()):
 
251
            return
 
252
        result = self.conn.search('%s_*' % self.index_name, event_type)
 
253
        for ev in result['hits']['hits']:
 
254
            if trait_type and ev['_source']['traits'].get(trait_type):
 
255
                yield models.Trait(
 
256
                    name=trait_type,
 
257
                    dtype=t_types[trait_type],
 
258
                    value=models.Trait.convert_value(
 
259
                        t_types[trait_type],
 
260
                        ev['_source']['traits'][trait_type]))
 
261
            else:
 
262
                for trait in ev['_source']['traits'].keys():
 
263
                    yield models.Trait(
 
264
                        name=trait,
 
265
                        dtype=t_types[trait],
 
266
                        value=models.Trait.convert_value(
 
267
                            t_types[trait],
 
268
                            ev['_source']['traits'][trait]))