3
# -*- coding: utf-8 -*-
5
# Copyright (C) 2009-2012:
6
# Gabes Jean, naparuba@gmail.com
7
# Gerhard Lausser, Gerhard.Lausser@consol.de
8
# Gregory Starck, g.starck@gmail.com
9
# Hartmut Goebel, h.goebel@goebel-consult.de
11
# This file is part of Shinken.
13
# Shinken is free software: you can redistribute it and/or modify
14
# it under the terms of the GNU Affero General Public License as published by
15
# the Free Software Foundation, either version 3 of the License, or
16
# (at your option) any later version.
18
# Shinken is distributed in the hope that it will be useful,
19
# but WITHOUT ANY WARRANTY; without even the implied warranty of
20
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21
# GNU Affero General Public License for more details.
23
# You should have received a copy of the GNU Affero General Public License
24
# along with Shinken. If not, see <http://www.gnu.org/licenses/>.
26
# import von modules/livestatus_logstore
29
This class is for attaching a mongodb database to a livestatus broker module.
30
It is one possibility for an exchangeable storage for log broks
40
from shinken.objects.service import Service
41
from shinken.modulesctx import modulesctx
43
# Import a class from the livestatus module, should be already loaded!
44
livestatus = modulesctx.get_module('livestatus')
46
LiveStatusStack = livestatus.LiveStatusStack
47
LOGCLASS_INVALID = livestatus.LOGCLASS_INVALID
48
Logline = livestatus.Logline
51
from pymongo import Connection
53
from pymongo import ReplicaSetConnection, ReadPreference
55
ReplicaSetConnection = None
57
from pymongo.errors import AutoReconnect
59
from shinken.basemodule import BaseModule
60
from shinken.objects.module import Module
61
from shinken.log import logger
62
from shinken.util import to_bool
65
'daemons': ['livestatus'],
66
'type': 'logstore_mongodb',
68
'phases': ['running'],
72
# called by the plugin manager
73
def get_instance(plugin):
74
logger.info("[LogstoreMongoDB] Get an LogStore MongoDB module for plugin %s" % plugin.get_name())
75
instance = LiveStatusLogStoreMongoDB(plugin)
79
def row_factory(cursor, row):
80
"""Handler for the sqlite fetch method."""
81
return Logline(cursor.description, row)
88
class LiveStatusLogStoreError(Exception):
92
class LiveStatusLogStoreMongoDB(BaseModule):
94
def __init__(self, modconf):
95
BaseModule.__init__(self, modconf)
97
# mongodb://host1,host2,host3/?safe=true;w=2;wtimeoutMS=2000
98
self.mongodb_uri = getattr(modconf, 'mongodb_uri', None)
99
self.replica_set = getattr(modconf, 'replica_set', None)
100
if self.replica_set and not ReplicaSetConnection:
101
logger.error('[LogStoreMongoDB] Can not initialize LogStoreMongoDB module with '
102
'replica_set because your pymongo lib is too old. '
103
'Please install it with a 2.x+ version from '
104
'https://github.com/mongodb/mongo-python-driver/downloads')
106
self.database = getattr(modconf, 'database', 'logs')
107
self.collection = getattr(modconf, 'collection', 'logs')
108
self.use_aggressive_sql = True
109
self.mongodb_fsync = to_bool(getattr(modconf, 'mongodb_fsync', "True"))
110
max_logs_age = getattr(modconf, 'max_logs_age', '365')
111
maxmatch = re.match(r'^(\d+)([dwmy]*)$', max_logs_age)
113
logger.warning('[LogStoreMongoDB] Wrong format for max_logs_age. Must be <number>[d|w|m|y] or <number> and not %s' % max_logs_age)
116
if not maxmatch.group(2):
117
self.max_logs_age = int(maxmatch.group(1))
118
elif maxmatch.group(2) == 'd':
119
self.max_logs_age = int(maxmatch.group(1))
120
elif maxmatch.group(2) == 'w':
121
self.max_logs_age = int(maxmatch.group(1)) * 7
122
elif maxmatch.group(2) == 'm':
123
self.max_logs_age = int(maxmatch.group(1)) * 31
124
elif maxmatch.group(2) == 'y':
125
self.max_logs_age = int(maxmatch.group(1)) * 365
126
self.use_aggressive_sql = (getattr(modconf, 'use_aggressive_sql', '1') == '1')
127
# This stack is used to create a full-blown select-statement
128
self.mongo_filter_stack = LiveStatusMongoStack()
129
# This stack is used to create a minimal select-statement which
130
# selects only by time >= and time <=
131
self.mongo_time_filter_stack = LiveStatusMongoStack()
132
self.is_connected = DISCONNECTED
134
# Now sleep one second, so that won't get lineno collisions with the last second
147
self.conn = pymongo.ReplicaSetConnection(self.mongodb_uri, replicaSet=self.replica_set, fsync=self.mongodb_fsync)
149
# Old versions of pymongo do not known about fsync
150
if ReplicaSetConnection:
151
self.conn = pymongo.Connection(self.mongodb_uri, fsync=self.mongodb_fsync)
153
self.conn = pymongo.Connection(self.mongodb_uri)
154
self.db = self.conn[self.database]
155
self.db[self.collection].ensure_index([('host_name', pymongo.ASCENDING), ('time', pymongo.ASCENDING), ('lineno', pymongo.ASCENDING)], name='logs_idx')
158
# This might be a future option prefer_secondary
159
#self.db.read_preference = ReadPreference.SECONDARY
160
self.is_connected = CONNECTED
161
self.next_log_db_rotate = time.time()
162
except AutoReconnect, exp:
164
logger.error("[LogStoreMongoDB] LiveStatusLogStoreMongoDB.AutoReconnect %s" % (exp))
165
# The mongodb is hopefully available until this module is restarted
166
raise LiveStatusLogStoreError
167
except Exception, exp:
168
# If there is a replica_set, but the host is a simple standalone one
169
# we get a "No suitable hosts found" here.
170
# But other reasons are possible too.
171
logger.error("[LogStoreMongoDB] Could not open the database" % exp)
172
raise LiveStatusLogStoreError
175
self.conn.disconnect()
180
def commit_and_rotate_log_db(self):
181
"""For a MongoDB there is no rotate, but we will delete old contents."""
183
if self.next_log_db_rotate <= now:
184
today = datetime.date.today()
185
today0000 = datetime.datetime(today.year, today.month, today.day, 0, 0, 0)
186
today0005 = datetime.datetime(today.year, today.month, today.day, 0, 5, 0)
187
oldest = today0000 - datetime.timedelta(days=self.max_logs_age)
188
self.db[self.collection].remove({u'time': {'$lt': time.mktime(oldest.timetuple())}})
190
if now < time.mktime(today0005.timetuple()):
191
nextrotation = today0005
193
nextrotation = today0005 + datetime.timedelta(days=1)
196
self.next_log_db_rotate = time.mktime(nextrotation.timetuple())
197
logger.info("[LogStoreMongoDB] Next log rotation at %s " % time.asctime(time.localtime(self.next_log_db_rotate)))
200
def manage_log_brok(self, b):
203
if re.match("^\[[0-9]*\] [A-Z][a-z]*.:", line):
204
# Match log which NOT have to be stored
205
# print "Unexpected in manage_log_brok", line
207
logline = Logline(line=line)
208
values = logline.as_dict()
209
if logline.logclass != LOGCLASS_INVALID:
211
self.db[self.collection].insert(values)
212
self.is_connected = CONNECTED
213
# If we have a backlog from an outage, we flush these lines
214
# First we make a copy, so we can delete elements from
215
# the original self.backlog
216
backloglines = [bl for bl in self.backlog]
217
for backlogline in backloglines:
219
self.db[self.collection].insert(backlogline)
220
self.backlog.remove(backlogline)
221
except AutoReconnect, exp:
222
self.is_connected = SWITCHING
223
except Exception, exp:
224
logger.error("[LogStoreMongoDB] Got an exception inserting the backlog" % str(exp))
225
except AutoReconnect, exp:
226
if self.is_connected != SWITCHING:
227
self.is_connected = SWITCHING
229
# Under normal circumstances after these 5 seconds
230
# we should have a new primary node
232
# Not yet? Wait, but try harder.
234
# At this point we must save the logline for a later attempt
235
# After 5 seconds we either have a successful write
236
# or another exception which means, we are disconnected
237
self.backlog.append(values)
238
except Exception, exp:
239
self.is_connected = DISCONNECTED
240
logger.error("[LogStoreMongoDB] Databased error occurred:" % exp)
241
# FIXME need access to this #self.livestatus.count_event('log_message')
243
logger.info("[LogStoreMongoDB] This line is invalid: %s" % line)
246
def add_filter(self, operator, attribute, reference):
247
if attribute == 'time':
248
self.mongo_time_filter_stack.put_stack(self.make_mongo_filter(operator, attribute, reference))
249
self.mongo_filter_stack.put_stack(self.make_mongo_filter(operator, attribute, reference))
252
def add_filter_and(self, andnum):
253
self.mongo_filter_stack.and_elements(andnum)
256
def add_filter_or(self, ornum):
257
self.mongo_filter_stack.or_elements(ornum)
260
def add_filter_not(self):
261
self.mongo_filter_stack.not_elements()
264
def get_live_data_log(self):
265
"""Like get_live_data, but for log objects"""
266
# finalize the filter stacks
267
self.mongo_time_filter_stack.and_elements(self.mongo_time_filter_stack.qsize())
268
self.mongo_filter_stack.and_elements(self.mongo_filter_stack.qsize())
269
if self.use_aggressive_sql:
270
# Be aggressive, get preselected data from sqlite and do less
271
# filtering in python. But: only a subset of Filter:-attributes
272
# can be mapped to columns in the logs-table, for the others
273
# we must use "always-true"-clauses. This can result in
274
# funny and potentially ineffective sql-statements
275
mongo_filter_func = self.mongo_filter_stack.get_stack()
277
# Be conservative, get everything from the database between
278
# two dates and apply the Filter:-clauses in python
279
mongo_filter_func = self.mongo_time_filter_stack.get_stack()
281
mongo_filter = mongo_filter_func()
282
logger.debug("[Logstore MongoDB] Mongo filter is %s" % str(mongo_filter))
283
# We can apply the filterstack here as well. we have columns and filtercolumns.
284
# the only additional step is to enrich log lines with host/service-attributes
285
# A timerange can be useful for a faster preselection of lines
287
filter_element = eval('{ ' + mongo_filter + ' }')
288
logger.debug("[LogstoreMongoDB] Mongo filter is %s" % str(filter_element))
289
columns = ['logobject', 'attempt', 'logclass', 'command_name', 'comment', 'contact_name', 'host_name', 'lineno', 'message', 'plugin_output', 'service_description', 'state', 'state_type', 'time', 'type']
290
if not self.is_connected == CONNECTED:
291
logger.warning("[LogStoreMongoDB] sorry, not connected")
293
dbresult = [Logline([(c,) for c in columns], [x[col] for col in columns]) for x in self.db[self.collection].find(filter_element).sort([(u'time', pymongo.ASCENDING), (u'lineno', pymongo.ASCENDING)])]
297
def make_mongo_filter(self, operator, attribute, reference):
298
# The filters are text fragments which are put together to form a sql where-condition finally.
299
# Add parameter Class (Host, Service), lookup datatype (default string), convert reference
300
# which attributes are suitable for a sql statement
301
good_attributes = ['time', 'attempt', 'logclass', 'command_name', 'comment', 'contact_name', 'host_name', 'plugin_output', 'service_description', 'state', 'state_type', 'type']
302
good_operators = ['=', '!=']
303
# put strings in '' for the query
304
string_attributes = ['command_name', 'comment', 'contact_name', 'host_name', 'plugin_output', 'service_description', 'state_type', 'type']
305
if attribute in string_attributes:
306
reference = "'%s'" % reference
308
# We should change the "class" query into the internal "logclass" attribute
309
if attribute == 'class':
310
attribute = 'logclass'
314
return '\'%s\' : \'\'' % (attribute,)
316
return '\'%s\' : %s' % (attribute, reference)
319
return '\'%s\' : { \'$regex\' : %s }' % (attribute, reference)
321
def eq_nocase_filter():
323
return '\'%s\' : \'\'' % (attribute,)
325
return '\'%s\' : { \'$regex\' : %s, \'$options\' : \'i\' }' % (attribute, '^' + reference + '$')
327
def match_nocase_filter():
328
return '\'%s\' : { \'$regex\' : %s, \'$options\' : \'i\' }' % (attribute, reference)
331
return '\'%s\' : { \'$lt\' : %s }' % (attribute, reference)
334
return '\'%s\' : { \'$gt\' : %s }' % (attribute, reference)
337
return '\'%s\' : { \'$lte\' : %s }' % (attribute, reference)
340
return '\'%s\' : { \'$gte\' : %s }' % (attribute, reference)
344
return '\'%s\' : { \'$ne\' : '' }' % (attribute,)
346
return '\'%s\' : { \'$ne\' : %s }' % (attribute, reference)
348
def not_match_filter():
349
# http://myadventuresincoding.wordpress.com/2011/05/19/mongodb-negative-regex-query-in-mongo-shell/
350
return '\'%s\' : { \'$regex\' : %s }' % (attribute, '^((?!' + reference + ').)')
352
def ne_nocase_filter():
354
return '\'%s\' : \'\'' % (attribute,)
356
return '\'%s\' : { \'$regex\' : %s, \'$options\' : \'i\' }' % (attribute, '^((?!' + reference + ').)')
358
def not_match_nocase_filter():
359
return '\'%s\' : { \'$regex\' : %s, \'$options\' : \'i\' }' % (attribute, '^((?!' + reference + ').)')
362
return '\'time\' : { \'$exists\' : True }'
364
if attribute not in good_attributes:
368
elif operator == '~':
370
elif operator == '=~':
371
return eq_nocase_filter
372
elif operator == '~~':
373
return match_nocase_filter
374
elif operator == '<':
376
elif operator == '>':
378
elif operator == '<=':
380
elif operator == '>=':
382
elif operator == '!=':
384
elif operator == '!~':
385
return not_match_filter
386
elif operator == '!=~':
387
return ne_nocase_filter
388
elif operator == '!~~':
389
return not_match_nocase_filter
392
class LiveStatusMongoStack(LiveStatusStack):
393
"""A Lifo queue for filter functions.
395
This class inherits either from MyLifoQueue or Queue.LifoQueue
396
whatever is available with the current python version.
399
and_elements -- takes a certain number (given as argument)
400
of filters from the stack, creates a new filter and puts
401
this filter on the stack. If these filters are lambda functions,
402
the new filter is a boolean and of the underlying filters.
403
If the filters are sql where-conditions, they are also concatenated
404
with and to form a new string containing a more complex where-condition.
406
or_elements --- the same, only that the single filters are
407
combined with a logical or.
411
def __init__(self, *args, **kw):
413
self.__class__.__bases__[0].__init__(self, *args, **kw)
415
def not_elements(self):
416
top_filter = self.get_stack()
417
#negate_filter = lambda: '\'$not\': { %s }' % top_filter()
418
# mongodb doesn't have the not-operator like sql, which can negate
419
# a complete expression. Mongodb $not can only reverse one operator
420
# at a time. This would require rewriting of the whole expression.
421
# So instead of deciding whether a record can pass the filter or not,
422
# we let it pass in any case. That's no problem, because the result
423
# of the database query will have to go through the in-memory-objects
425
negate_filter = lambda: '\'time\' : { \'$exists\' : True }'
426
self.put_stack(negate_filter)
428
def and_elements(self, num):
429
"""Take num filters from the stack, and them and put the result back"""
433
filters.append(self.get_stack())
434
# Take from the stack:
435
# Make a combined anded function
436
# Put it on the stack
437
logger.debug("[Logstore MongoDB] Filter is %s" % str(filters))
438
and_clause = lambda: '\'$and\' : [%s]' % ', '.join('{ ' + x() + ' }' for x in filters)
439
logger.debug("[Logstore MongoDB] and_elements %s" % str(and_clause))
440
self.put_stack(and_clause)
442
def or_elements(self, num):
443
"""Take num filters from the stack, or them and put the result back"""
447
filters.append(self.get_stack())
448
or_clause = lambda: '\'$or\' : [%s]' % ', '.join('{ ' + x() + ' }' for x in filters)
449
self.put_stack(or_clause)
452
"""Return the top element from the stack or a filter which is always true"""
453
if self.qsize() == 0: