~shinken-dev/shinken/trunk

« back to all changes in this revision

Viewing changes to modules/logstore_mongodb/module.py

  • Committer: naparuba
  • Date: 2013-07-17 15:03:35 UTC
  • Revision ID: git-v1:8bffb4fe67305a0be4ce50d1feab2416f950f95d
Enh: remove the Logstore modules from the core repo.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/python
2
 
 
3
 
# -*- coding: utf-8 -*-
4
 
 
5
 
# Copyright (C) 2009-2012:
6
 
#    Gabes Jean, naparuba@gmail.com
7
 
#    Gerhard Lausser, Gerhard.Lausser@consol.de
8
 
#    Gregory Starck, g.starck@gmail.com
9
 
#    Hartmut Goebel, h.goebel@goebel-consult.de
10
 
#
11
 
# This file is part of Shinken.
12
 
#
13
 
# Shinken is free software: you can redistribute it and/or modify
14
 
# it under the terms of the GNU Affero General Public License as published by
15
 
# the Free Software Foundation, either version 3 of the License, or
16
 
# (at your option) any later version.
17
 
#
18
 
# Shinken is distributed in the hope that it will be useful,
19
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 
# GNU Affero General Public License for more details.
22
 
#
23
 
# You should have received a copy of the GNU Affero General Public License
24
 
# along with Shinken.  If not, see <http://www.gnu.org/licenses/>.
25
 
 
26
 
# import von modules/livestatus_logstore
27
 
 
28
 
"""
29
 
This class is for attaching a mongodb database to a livestatus broker module.
30
 
It is one possibility for an exchangeable storage for log broks
31
 
"""
32
 
 
33
 
import os
34
 
import time
35
 
import datetime
36
 
import re
37
 
import sys
38
 
import pymongo
39
 
 
40
 
from shinken.objects.service import Service
41
 
from shinken.modulesctx import modulesctx
42
 
 
43
 
# Import a class from the livestatus module, should be already loaded!
44
 
livestatus = modulesctx.get_module('livestatus')
45
 
 
46
 
LiveStatusStack = livestatus.LiveStatusStack
47
 
LOGCLASS_INVALID = livestatus.LOGCLASS_INVALID
48
 
Logline = livestatus.Logline
49
 
 
50
 
 
51
 
from pymongo import Connection
52
 
try:
53
 
    from pymongo import ReplicaSetConnection, ReadPreference
54
 
except ImportError:
55
 
    ReplicaSetConnection = None
56
 
    ReadPreference = None
57
 
from pymongo.errors import AutoReconnect
58
 
 
59
 
from shinken.basemodule import BaseModule
60
 
from shinken.objects.module import Module
61
 
from shinken.log import logger
62
 
from shinken.util import to_bool
63
 
 
64
 
properties = {
65
 
    'daemons': ['livestatus'],
66
 
    'type': 'logstore_mongodb',
67
 
    'external': False,
68
 
    'phases': ['running'],
69
 
    }
70
 
 
71
 
 
72
 
# called by the plugin manager
73
 
def get_instance(plugin):
74
 
    logger.info("[LogstoreMongoDB] Get an LogStore MongoDB module for plugin %s" % plugin.get_name())
75
 
    instance = LiveStatusLogStoreMongoDB(plugin)
76
 
    return instance
77
 
 
78
 
 
79
 
def row_factory(cursor, row):
80
 
    """Handler for the sqlite fetch method."""
81
 
    return Logline(cursor.description, row)
82
 
 
83
 
CONNECTED = 1
84
 
DISCONNECTED = 2
85
 
SWITCHING = 3
86
 
 
87
 
 
88
 
class LiveStatusLogStoreError(Exception):
89
 
    pass
90
 
 
91
 
 
92
 
class LiveStatusLogStoreMongoDB(BaseModule):
93
 
 
94
 
    def __init__(self, modconf):
95
 
        BaseModule.__init__(self, modconf)
96
 
        self.plugins = []
97
 
        # mongodb://host1,host2,host3/?safe=true;w=2;wtimeoutMS=2000
98
 
        self.mongodb_uri = getattr(modconf, 'mongodb_uri', None)
99
 
        self.replica_set = getattr(modconf, 'replica_set', None)
100
 
        if self.replica_set and not ReplicaSetConnection:
101
 
            logger.error('[LogStoreMongoDB] Can not initialize LogStoreMongoDB module with '
102
 
                         'replica_set because your pymongo lib is too old. '
103
 
                         'Please install it with a 2.x+ version from '
104
 
                         'https://github.com/mongodb/mongo-python-driver/downloads')
105
 
            return None
106
 
        self.database = getattr(modconf, 'database', 'logs')
107
 
        self.collection = getattr(modconf, 'collection', 'logs')
108
 
        self.use_aggressive_sql = True
109
 
        self.mongodb_fsync = to_bool(getattr(modconf, 'mongodb_fsync', "True"))
110
 
        max_logs_age = getattr(modconf, 'max_logs_age', '365')
111
 
        maxmatch = re.match(r'^(\d+)([dwmy]*)$', max_logs_age)
112
 
        if maxmatch is None:
113
 
            logger.warning('[LogStoreMongoDB] Wrong format for max_logs_age. Must be <number>[d|w|m|y] or <number> and not %s' % max_logs_age)
114
 
            return None
115
 
        else:
116
 
            if not maxmatch.group(2):
117
 
                self.max_logs_age = int(maxmatch.group(1))
118
 
            elif maxmatch.group(2) == 'd':
119
 
                self.max_logs_age = int(maxmatch.group(1))
120
 
            elif maxmatch.group(2) == 'w':
121
 
                self.max_logs_age = int(maxmatch.group(1)) * 7
122
 
            elif maxmatch.group(2) == 'm':
123
 
                self.max_logs_age = int(maxmatch.group(1)) * 31
124
 
            elif maxmatch.group(2) == 'y':
125
 
                self.max_logs_age = int(maxmatch.group(1)) * 365
126
 
        self.use_aggressive_sql = (getattr(modconf, 'use_aggressive_sql', '1') == '1')
127
 
        # This stack is used to create a full-blown select-statement
128
 
        self.mongo_filter_stack = LiveStatusMongoStack()
129
 
        # This stack is used to create a minimal select-statement which
130
 
        # selects only by time >= and time <=
131
 
        self.mongo_time_filter_stack = LiveStatusMongoStack()
132
 
        self.is_connected = DISCONNECTED
133
 
        self.backlog = []
134
 
        # Now sleep one second, so that won't get lineno collisions with the last second
135
 
        time.sleep(1)
136
 
        self.lineno = 0
137
 
 
138
 
    def load(self, app):
139
 
        self.app = app
140
 
 
141
 
    def init(self):
142
 
        pass
143
 
 
144
 
    def open(self):
145
 
        try:
146
 
            if self.replica_set:
147
 
                self.conn = pymongo.ReplicaSetConnection(self.mongodb_uri, replicaSet=self.replica_set, fsync=self.mongodb_fsync)
148
 
            else:
149
 
                # Old versions of pymongo do not known about fsync
150
 
                if ReplicaSetConnection:
151
 
                    self.conn = pymongo.Connection(self.mongodb_uri, fsync=self.mongodb_fsync)
152
 
                else:
153
 
                    self.conn = pymongo.Connection(self.mongodb_uri)
154
 
            self.db = self.conn[self.database]
155
 
            self.db[self.collection].ensure_index([('host_name', pymongo.ASCENDING), ('time', pymongo.ASCENDING), ('lineno', pymongo.ASCENDING)], name='logs_idx')
156
 
            if self.replica_set:
157
 
                pass
158
 
                # This might be a future option prefer_secondary
159
 
                #self.db.read_preference = ReadPreference.SECONDARY
160
 
            self.is_connected = CONNECTED
161
 
            self.next_log_db_rotate = time.time()
162
 
        except AutoReconnect, exp:
163
 
            # now what, ha?
164
 
            logger.error("[LogStoreMongoDB] LiveStatusLogStoreMongoDB.AutoReconnect %s" % (exp))
165
 
            # The mongodb is hopefully available until this module is restarted
166
 
            raise LiveStatusLogStoreError
167
 
        except Exception, exp:
168
 
            # If there is a replica_set, but the host is a simple standalone one
169
 
            # we get a "No suitable hosts found" here.
170
 
            # But other reasons are possible too.
171
 
            logger.error("[LogStoreMongoDB] Could not open the database" % exp)
172
 
            raise LiveStatusLogStoreError
173
 
 
174
 
    def close(self):
175
 
        self.conn.disconnect()
176
 
 
177
 
    def commit(self):
178
 
        pass
179
 
 
180
 
    def commit_and_rotate_log_db(self):
181
 
        """For a MongoDB there is no rotate, but we will delete old contents."""
182
 
        now = time.time()
183
 
        if self.next_log_db_rotate <= now:
184
 
            today = datetime.date.today()
185
 
            today0000 = datetime.datetime(today.year, today.month, today.day, 0, 0, 0)
186
 
            today0005 = datetime.datetime(today.year, today.month, today.day, 0, 5, 0)
187
 
            oldest = today0000 - datetime.timedelta(days=self.max_logs_age)
188
 
            self.db[self.collection].remove({u'time': {'$lt': time.mktime(oldest.timetuple())}})
189
 
 
190
 
            if now < time.mktime(today0005.timetuple()):
191
 
                nextrotation = today0005
192
 
            else:
193
 
                nextrotation = today0005 + datetime.timedelta(days=1)
194
 
 
195
 
            # See you tomorrow
196
 
            self.next_log_db_rotate = time.mktime(nextrotation.timetuple())
197
 
            logger.info("[LogStoreMongoDB] Next log rotation at %s " % time.asctime(time.localtime(self.next_log_db_rotate)))
198
 
 
199
 
 
200
 
    def manage_log_brok(self, b):
201
 
        data = b.data
202
 
        line = data['log']
203
 
        if re.match("^\[[0-9]*\] [A-Z][a-z]*.:", line):
204
 
            # Match log which NOT have to be stored
205
 
            # print "Unexpected in manage_log_brok", line
206
 
            return
207
 
        logline = Logline(line=line)
208
 
        values = logline.as_dict()
209
 
        if logline.logclass != LOGCLASS_INVALID:
210
 
            try:
211
 
                self.db[self.collection].insert(values)
212
 
                self.is_connected = CONNECTED
213
 
                # If we have a backlog from an outage, we flush these lines
214
 
                # First we make a copy, so we can delete elements from
215
 
                # the original self.backlog
216
 
                backloglines = [bl for bl in self.backlog]
217
 
                for backlogline in backloglines:
218
 
                    try:
219
 
                        self.db[self.collection].insert(backlogline)
220
 
                        self.backlog.remove(backlogline)
221
 
                    except AutoReconnect, exp:
222
 
                        self.is_connected = SWITCHING
223
 
                    except Exception, exp:
224
 
                        logger.error("[LogStoreMongoDB] Got an exception inserting the backlog" % str(exp))
225
 
            except AutoReconnect, exp:
226
 
                if self.is_connected != SWITCHING:
227
 
                    self.is_connected = SWITCHING
228
 
                    time.sleep(5)
229
 
                    # Under normal circumstances after these 5 seconds
230
 
                    # we should have a new primary node
231
 
                else:
232
 
                    # Not yet? Wait, but try harder.
233
 
                    time.sleep(0.1)
234
 
                # At this point we must save the logline for a later attempt
235
 
                # After 5 seconds we either have a successful write
236
 
                # or another exception which means, we are disconnected
237
 
                self.backlog.append(values)
238
 
            except Exception, exp:
239
 
                self.is_connected = DISCONNECTED
240
 
                logger.error("[LogStoreMongoDB] Databased error occurred:" % exp)
241
 
            # FIXME need access to this #self.livestatus.count_event('log_message')
242
 
        else:
243
 
            logger.info("[LogStoreMongoDB] This line is invalid: %s" % line)
244
 
 
245
 
 
246
 
    def add_filter(self, operator, attribute, reference):
247
 
        if attribute == 'time':
248
 
            self.mongo_time_filter_stack.put_stack(self.make_mongo_filter(operator, attribute, reference))
249
 
        self.mongo_filter_stack.put_stack(self.make_mongo_filter(operator, attribute, reference))
250
 
 
251
 
 
252
 
    def add_filter_and(self, andnum):
253
 
        self.mongo_filter_stack.and_elements(andnum)
254
 
 
255
 
 
256
 
    def add_filter_or(self, ornum):
257
 
        self.mongo_filter_stack.or_elements(ornum)
258
 
 
259
 
 
260
 
    def add_filter_not(self):
261
 
        self.mongo_filter_stack.not_elements()
262
 
 
263
 
 
264
 
    def get_live_data_log(self):
265
 
        """Like get_live_data, but for log objects"""
266
 
        # finalize the filter stacks
267
 
        self.mongo_time_filter_stack.and_elements(self.mongo_time_filter_stack.qsize())
268
 
        self.mongo_filter_stack.and_elements(self.mongo_filter_stack.qsize())
269
 
        if self.use_aggressive_sql:
270
 
            # Be aggressive, get preselected data from sqlite and do less
271
 
            # filtering in python. But: only a subset of Filter:-attributes
272
 
            # can be mapped to columns in the logs-table, for the others
273
 
            # we must use "always-true"-clauses. This can result in
274
 
            # funny and potentially ineffective sql-statements
275
 
            mongo_filter_func = self.mongo_filter_stack.get_stack()
276
 
        else:
277
 
            # Be conservative, get everything from the database between
278
 
            # two dates and apply the Filter:-clauses in python
279
 
            mongo_filter_func = self.mongo_time_filter_stack.get_stack()
280
 
        dbresult = []
281
 
        mongo_filter = mongo_filter_func()
282
 
        logger.debug("[Logstore MongoDB] Mongo filter is %s" % str(mongo_filter))
283
 
        # We can apply the filterstack here as well. we have columns and filtercolumns.
284
 
        # the only additional step is to enrich log lines with host/service-attributes
285
 
        # A timerange can be useful for a faster preselection of lines
286
 
 
287
 
        filter_element = eval('{ ' + mongo_filter + ' }')
288
 
        logger.debug("[LogstoreMongoDB] Mongo filter is %s" % str(filter_element))
289
 
        columns = ['logobject', 'attempt', 'logclass', 'command_name', 'comment', 'contact_name', 'host_name', 'lineno', 'message', 'plugin_output', 'service_description', 'state', 'state_type', 'time', 'type']
290
 
        if not self.is_connected == CONNECTED:
291
 
            logger.warning("[LogStoreMongoDB] sorry, not connected")
292
 
        else:
293
 
            dbresult = [Logline([(c,) for c in columns], [x[col] for col in columns]) for x in self.db[self.collection].find(filter_element).sort([(u'time', pymongo.ASCENDING), (u'lineno', pymongo.ASCENDING)])]
294
 
        return dbresult
295
 
 
296
 
 
297
 
    def make_mongo_filter(self, operator, attribute, reference):
298
 
        # The filters are text fragments which are put together to form a sql where-condition finally.
299
 
        # Add parameter Class (Host, Service), lookup datatype (default string), convert reference
300
 
        # which attributes are suitable for a sql statement
301
 
        good_attributes = ['time', 'attempt', 'logclass', 'command_name', 'comment', 'contact_name', 'host_name', 'plugin_output', 'service_description', 'state', 'state_type', 'type']
302
 
        good_operators = ['=', '!=']
303
 
        #  put strings in '' for the query
304
 
        string_attributes = ['command_name', 'comment', 'contact_name', 'host_name', 'plugin_output', 'service_description', 'state_type', 'type']
305
 
        if attribute in string_attributes:
306
 
            reference = "'%s'" % reference
307
 
 
308
 
        # We should change the "class" query into the internal "logclass" attribute
309
 
        if attribute == 'class':
310
 
            attribute = 'logclass'
311
 
 
312
 
        def eq_filter():
313
 
            if reference == '':
314
 
                return '\'%s\' : \'\'' % (attribute,)
315
 
            else:
316
 
                return '\'%s\' : %s' % (attribute, reference)
317
 
 
318
 
        def match_filter():
319
 
            return '\'%s\' : { \'$regex\' : %s }' % (attribute, reference)
320
 
 
321
 
        def eq_nocase_filter():
322
 
            if reference == '':
323
 
                return '\'%s\' : \'\'' % (attribute,)
324
 
            else:
325
 
                return '\'%s\' : { \'$regex\' : %s, \'$options\' : \'i\' }' % (attribute, '^' + reference + '$')
326
 
 
327
 
        def match_nocase_filter():
328
 
            return '\'%s\' : { \'$regex\' : %s, \'$options\' : \'i\' }' % (attribute, reference)
329
 
 
330
 
        def lt_filter():
331
 
            return '\'%s\' : { \'$lt\' : %s }' % (attribute, reference)
332
 
 
333
 
        def gt_filter():
334
 
            return '\'%s\' : { \'$gt\' : %s }' % (attribute, reference)
335
 
 
336
 
        def le_filter():
337
 
            return '\'%s\' : { \'$lte\' : %s }' % (attribute, reference)
338
 
 
339
 
        def ge_filter():
340
 
            return '\'%s\' : { \'$gte\' : %s }' % (attribute, reference)
341
 
 
342
 
        def ne_filter():
343
 
            if reference == '':
344
 
                return '\'%s\' : { \'$ne\' : '' }' % (attribute,)
345
 
            else:
346
 
                return '\'%s\' : { \'$ne\' : %s }' % (attribute, reference)
347
 
 
348
 
        def not_match_filter():
349
 
            # http://myadventuresincoding.wordpress.com/2011/05/19/mongodb-negative-regex-query-in-mongo-shell/
350
 
            return '\'%s\' : { \'$regex\' : %s }' % (attribute, '^((?!' + reference + ').)')
351
 
 
352
 
        def ne_nocase_filter():
353
 
            if reference == '':
354
 
                return '\'%s\' : \'\'' % (attribute,)
355
 
            else:
356
 
                return '\'%s\' : { \'$regex\' : %s, \'$options\' : \'i\' }' % (attribute, '^((?!' + reference + ').)')
357
 
 
358
 
        def not_match_nocase_filter():
359
 
            return '\'%s\' : { \'$regex\' : %s, \'$options\' : \'i\' }' % (attribute, '^((?!' + reference + ').)')
360
 
 
361
 
        def no_filter():
362
 
            return '\'time\' : { \'$exists\' : True }'
363
 
 
364
 
        if attribute not in good_attributes:
365
 
            return no_filter
366
 
        if operator == '=':
367
 
            return eq_filter
368
 
        elif operator == '~':
369
 
            return match_filter
370
 
        elif operator == '=~':
371
 
            return eq_nocase_filter
372
 
        elif operator == '~~':
373
 
            return match_nocase_filter
374
 
        elif operator == '<':
375
 
            return lt_filter
376
 
        elif operator == '>':
377
 
            return gt_filter
378
 
        elif operator == '<=':
379
 
            return le_filter
380
 
        elif operator == '>=':
381
 
            return ge_filter
382
 
        elif operator == '!=':
383
 
            return ne_filter
384
 
        elif operator == '!~':
385
 
            return not_match_filter
386
 
        elif operator == '!=~':
387
 
            return ne_nocase_filter
388
 
        elif operator == '!~~':
389
 
            return not_match_nocase_filter
390
 
 
391
 
 
392
 
class LiveStatusMongoStack(LiveStatusStack):
393
 
    """A Lifo queue for filter functions.
394
 
 
395
 
    This class inherits either from MyLifoQueue or Queue.LifoQueue
396
 
    whatever is available with the current python version.
397
 
 
398
 
    Public functions:
399
 
    and_elements -- takes a certain number (given as argument)
400
 
    of filters from the stack, creates a new filter and puts
401
 
    this filter on the stack. If these filters are lambda functions,
402
 
    the new filter is a boolean and of the underlying filters.
403
 
    If the filters are sql where-conditions, they are also concatenated
404
 
    with and to form a new string containing a more complex where-condition.
405
 
 
406
 
    or_elements --- the same, only that the single filters are
407
 
    combined with a logical or.
408
 
 
409
 
    """
410
 
 
411
 
    def __init__(self, *args, **kw):
412
 
        self.type = 'mongo'
413
 
        self.__class__.__bases__[0].__init__(self, *args, **kw)
414
 
 
415
 
    def not_elements(self):
416
 
        top_filter = self.get_stack()
417
 
        #negate_filter = lambda: '\'$not\': { %s }' % top_filter()
418
 
        # mongodb doesn't have the not-operator like sql, which can negate
419
 
        # a complete expression. Mongodb $not can only reverse one operator
420
 
        # at a time. This would require rewriting of the whole expression.
421
 
        # So instead of deciding whether a record can pass the filter or not,
422
 
        # we let it pass in any case. That's no problem, because the result
423
 
        # of the database query will have to go through the in-memory-objects
424
 
        # filter too.
425
 
        negate_filter = lambda: '\'time\' : { \'$exists\' : True }'
426
 
        self.put_stack(negate_filter)
427
 
 
428
 
    def and_elements(self, num):
429
 
        """Take num filters from the stack, and them and put the result back"""
430
 
        if num > 1:
431
 
            filters = []
432
 
            for _ in range(num):
433
 
                filters.append(self.get_stack())
434
 
            # Take from the stack:
435
 
            # Make a combined anded function
436
 
            # Put it on the stack
437
 
            logger.debug("[Logstore MongoDB] Filter is %s" % str(filters))
438
 
            and_clause = lambda: '\'$and\' : [%s]' % ', '.join('{ ' + x() + ' }' for x in filters)
439
 
            logger.debug("[Logstore MongoDB] and_elements %s" % str(and_clause))
440
 
            self.put_stack(and_clause)
441
 
 
442
 
    def or_elements(self, num):
443
 
        """Take num filters from the stack, or them and put the result back"""
444
 
        if num > 1:
445
 
            filters = []
446
 
            for _ in range(num):
447
 
                filters.append(self.get_stack())
448
 
            or_clause = lambda: '\'$or\' : [%s]' % ', '.join('{ ' + x() + ' }' for x in filters)
449
 
            self.put_stack(or_clause)
450
 
 
451
 
    def get_stack(self):
452
 
        """Return the top element from the stack or a filter which is always true"""
453
 
        if self.qsize() == 0:
454
 
            return lambda: ''
455
 
        else:
456
 
            return self.get()