~ubuntu-branches/ubuntu/vivid/fail2ban/vivid

« back to all changes in this revision

Viewing changes to testcases/filtertestcase.py

  • Committer: Package Import Robot
  • Author(s): Yaroslav Halchenko
  • Date: 2013-05-13 11:58:56 UTC
  • mfrom: (1.2.6) (11.1.5 experimental)
  • Revision ID: package-import@ubuntu.com-20130513115856-r1wwsd58ajx2ub5o
Tags: 0.8.9-1
* New upstream release
  - significant improvements in documentation (Closes: #400416)
  - roundcube auth filter (Closes: #699442)
  - enforces C locale for dates (Closes: #686341)
  - provides bash_completion.d/fail2ban
* debian/jail.conf:
  - added findtime and documentation on those basic options from jail.conf
    (Closes: #704568)
  - added new sample jails definitions for ssh-route, ssh-iptables-ipset{4,6},
    roundcube-auth, sogo-auth, mysqld-auth
* debian/control:
  - suggest system-log-daemon (Closes: #691001)
  - boost policy compliance to 3.9.4
* debian/rules:
  - run fail2ban's unittests at build time but ignore the failures
    (there are still some known issues to fix up to guarantee robust testing
    in clean chroots etc).
    Only pyinotify was added to build-depends since gamin might still be
    buggy on older releases and get stuck, which would complicate
    backporting

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
#
16
16
# You should have received a copy of the GNU General Public License
17
17
# along with Fail2Ban; if not, write to the Free Software
18
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
 
 
20
 
# Author: Cyril Jaquier
21
 
22
 
# $Revision$
23
 
 
24
 
__author__ = "Cyril Jaquier"
25
 
__version__ = "$Revision$"
26
 
__date__ = "$Date$"
27
 
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
 
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 
19
 
 
20
# Fail2Ban developers
 
21
 
 
22
__copyright__ = "Copyright (c) 2004 Cyril Jaquier; 2012 Yaroslav Halchenko"
28
23
__license__ = "GPL"
29
24
 
 
25
from __builtin__ import open as fopen
30
26
import unittest
 
27
import os
 
28
import sys
31
29
import time
 
30
import tempfile
32
31
 
 
32
from server.jail import Jail
33
33
from server.filterpoll import FilterPoll
34
34
from server.filter import FileFilter, DNSUtils
35
35
from server.failmanager import FailManager
36
36
from server.failmanager import FailManagerEmpty
37
37
 
 
38
#
 
39
# Useful helpers
 
40
#
 
41
 
 
42
# yoh: per Steven Hiscocks's insight while troubleshooting
 
43
# https://github.com/fail2ban/fail2ban/issues/103#issuecomment-15542836
 
44
# adding a sufficiently large buffer might help to guarantee that
 
45
# writes happen atomically.
 
46
def open(*args):
 
47
        """Overload built in open so we could assure sufficiently large buffer
 
48
 
 
49
        Explicit .flush would be needed to assure that changes leave the buffer
 
50
        """
 
51
        if len(args) == 2:
 
52
                # ~50kB buffer should be sufficient for all tests here.
 
53
                args = args + (50000,)
 
54
        return fopen(*args)
 
55
 
 
56
def _killfile(f, name):
 
57
        try:
 
58
                f.close()
 
59
        except:
 
60
                pass
 
61
        try:
 
62
                os.unlink(name)
 
63
        except:
 
64
                pass
 
65
 
 
66
        # there might as well be the .bak file
 
67
        if os.path.exists(name + '.bak'):
 
68
                _killfile(None, name + '.bak')
 
69
 
 
70
 
 
71
def _sleep_4_poll():
 
72
        """PollFilter relies on file timestamps - so we might need to
 
73
        sleep to guarantee that they differ
 
74
        """
 
75
        if sys.version_info[:2] <= (2,4):
 
76
                # on old Python st_mtime is int, so we should give
 
77
                # at least 1 sec so polling filter could detect
 
78
                # the change
 
79
                time.sleep(1.)
 
80
        else:
 
81
                time.sleep(0.1)
 
82
 
 
83
def _assert_equal_entries(utest, found, output, count=None):
 
84
        """Little helper to unify comparisons with the target entries
 
85
 
 
86
        and report helpful failure reports instead of millions of seconds ;)
 
87
        """
 
88
        utest.assertEqual(found[0], output[0])            # IP
 
89
        utest.assertEqual(found[1], count or output[1])   # count
 
90
        found_time, output_time = \
 
91
                                time.localtime(found[2]),\
 
92
                                time.localtime(output[2])
 
93
        utest.assertEqual(found_time, output_time)
 
94
        if len(output) > 3 and count is None: # match matches
 
95
                # do not check if custom count (e.g. going through them twice)
 
96
                utest.assertEqual(repr(found[3]), repr(output[3]))
 
97
 
 
98
def _assert_correct_last_attempt(utest, filter_, output, count=None):
 
99
        """Additional helper to wrap most common test case
 
100
 
 
101
        Test filter to contain target ticket
 
102
        """
 
103
        if isinstance(filter_, DummyJail):
 
104
                ticket = filter_.getFailTicket()
 
105
        else:
 
106
                # when we are testing without jails
 
107
                ticket = filter_.failManager.toBan()
 
108
 
 
109
        attempts = ticket.getAttempt()
 
110
        date = ticket.getTime()
 
111
        ip = ticket.getIP()
 
112
        matches = ticket.getMatches()
 
113
        found = (ip, attempts, date, matches)
 
114
 
 
115
        _assert_equal_entries(utest, found, output, count)
 
116
 
 
117
def _copy_lines_between_files(fin, fout, n=None, skip=0, mode='a', terminal_line=""):
 
118
        """Copy lines from one file to another (which might be already open)
 
119
 
 
120
        Returns open fout
 
121
        """
 
122
        if sys.version_info[:2] <= (2,4): # pragma: no cover
 
123
                # on old Python st_mtime is int, so we should give at least 1 sec so
 
124
                # polling filter could detect the change
 
125
                time.sleep(1)
 
126
        if isinstance(fin, str): # pragma: no branch - only used with str in test cases
 
127
                fin = open(fin, 'r')
 
128
        # Skip
 
129
        for i in xrange(skip):
 
130
                _ = fin.readline()
 
131
        # Read
 
132
        i = 0
 
133
        lines = []
 
134
        while n is None or i < n:
 
135
                l = fin.readline()
 
136
                if terminal_line is not None and l == terminal_line:
 
137
                        break
 
138
                lines.append(l)
 
139
                i += 1
 
140
        # Write: all at once and flush
 
141
        if isinstance(fout, str):
 
142
                fout = open(fout, mode)
 
143
        fout.write('\n'.join(lines))
 
144
        fout.flush()
 
145
        # to give other threads possibly some time to crunch
 
146
        time.sleep(0.1)
 
147
        return fout
 
148
 
 
149
#
 
150
#  Actual tests
 
151
#
 
152
 
38
153
class IgnoreIP(unittest.TestCase):
39
154
 
40
155
        def setUp(self):
41
156
                """Call before every test case."""
42
 
                self.__filter = FileFilter(None)
 
157
                self.filter = FileFilter(None)
43
158
 
44
159
        def tearDown(self):
45
160
                """Call after every test case."""
47
162
        def testIgnoreIPOK(self):
48
163
                ipList = "127.0.0.1", "192.168.0.1", "255.255.255.255", "99.99.99.99"
49
164
                for ip in ipList:
50
 
                        self.__filter.addIgnoreIP(ip)
51
 
                        self.assertTrue(self.__filter.inIgnoreIPList(ip))
 
165
                        self.filter.addIgnoreIP(ip)
 
166
 
 
167
                        self.assertTrue(self.filter.inIgnoreIPList(ip))
52
168
                # Test DNS
53
 
                self.__filter.addIgnoreIP("www.epfl.ch")
54
 
                self.assertTrue(self.__filter.inIgnoreIPList("128.178.50.12"))
55
 
        
 
169
                self.filter.addIgnoreIP("www.epfl.ch")
 
170
 
 
171
                self.assertTrue(self.filter.inIgnoreIPList("128.178.50.12"))
 
172
 
56
173
        def testIgnoreIPNOK(self):
57
174
                ipList = "", "999.999.999.999", "abcdef", "192.168.0."
58
175
                for ip in ipList:
59
 
                        self.__filter.addIgnoreIP(ip)
60
 
                        self.assertFalse(self.__filter.inIgnoreIPList(ip))
 
176
                        self.filter.addIgnoreIP(ip)
 
177
                        self.assertFalse(self.filter.inIgnoreIPList(ip))
61
178
                # Test DNS
62
 
                self.__filter.addIgnoreIP("www.epfl.ch")
63
 
                self.assertFalse(self.__filter.inIgnoreIPList("127.177.50.10"))
 
179
                self.filter.addIgnoreIP("www.epfl.ch")
 
180
                self.assertFalse(self.filter.inIgnoreIPList("127.177.50.10"))
64
181
 
65
182
 
66
183
class LogFile(unittest.TestCase):
69
186
 
70
187
        def setUp(self):
71
188
                """Call before every test case."""
72
 
                self.__filter = FilterPoll(None)
73
 
                self.__filter.addLogPath(LogFile.FILENAME)
 
189
                self.filter = FilterPoll(None)
 
190
                self.filter.addLogPath(LogFile.FILENAME)
74
191
 
75
192
        def tearDown(self):
76
193
                """Call after every test case."""
77
 
                
 
194
                pass
 
195
 
78
196
        #def testOpen(self):
79
 
        #       self.__filter.openLogFile(LogFile.FILENAME)
80
 
        
 
197
        #       self.filter.openLogFile(LogFile.FILENAME)
 
198
 
81
199
        def testIsModified(self):
82
 
                self.assertTrue(self.__filter.isModified(LogFile.FILENAME))
 
200
                self.assertTrue(self.filter.isModified(LogFile.FILENAME))
 
201
 
 
202
 
 
203
class LogFileMonitor(unittest.TestCase):
 
204
        """Few more tests for FilterPoll API
 
205
        """
 
206
        def setUp(self):
 
207
                """Call before every test case."""
 
208
                self.filter = self.name = 'NA'
 
209
                _, self.name = tempfile.mkstemp('fail2ban', 'monitorfailures')
 
210
                self.file = open(self.name, 'a')
 
211
                self.filter = FilterPoll(None)
 
212
                self.filter.addLogPath(self.name)
 
213
                self.filter.setActive(True)
 
214
                self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
 
215
 
 
216
        def tearDown(self):
 
217
                _killfile(self.file, self.name)
 
218
                pass
 
219
 
 
220
        def isModified(self, delay=2.):
 
221
                """Wait up to `delay` sec to assure that it was modified or not
 
222
                """
 
223
                time0 = time.time()
 
224
                while time.time() < time0 + delay:
 
225
                        if self.filter.isModified(self.name):
 
226
                                return True
 
227
                        time.sleep(0.1)
 
228
                return False
 
229
 
 
230
        def notModified(self):
 
231
                # shorter wait time for not modified status
 
232
                return not self.isModified(0.4)
 
233
 
 
234
        def testNewChangeViaIsModified(self):
 
235
                # it is a brand new one -- so first we think it is modified
 
236
                self.assertTrue(self.isModified())
 
237
                # but not any longer
 
238
                self.assertTrue(self.notModified())
 
239
                self.assertTrue(self.notModified())
 
240
                _sleep_4_poll()                         # to guarantee freshier mtime
 
241
                for i in range(4):                        # few changes
 
242
                        # unless we write into it
 
243
                        self.file.write("line%d\n" % i)
 
244
                        self.file.flush()
 
245
                        self.assertTrue(self.isModified())
 
246
                        self.assertTrue(self.notModified())
 
247
                        _sleep_4_poll()                         # to guarantee freshier mtime
 
248
                os.rename(self.name, self.name + '.old')
 
249
                # we are not signaling as modified whenever
 
250
                # it gets away
 
251
                self.assertTrue(self.notModified())
 
252
                f = open(self.name, 'a')
 
253
                self.assertTrue(self.isModified())
 
254
                self.assertTrue(self.notModified())
 
255
                _sleep_4_poll()
 
256
                f.write("line%d\n" % i)
 
257
                f.flush()
 
258
                self.assertTrue(self.isModified())
 
259
                self.assertTrue(self.notModified())
 
260
                _killfile(f, self.name)
 
261
                _killfile(self.name, self.name + '.old')
 
262
                pass
 
263
 
 
264
        def testNewChangeViaGetFailures_simple(self):
 
265
                # suck in lines from this sample log file
 
266
                self.filter.getFailures(self.name)
 
267
                self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 
268
 
 
269
                # Now let's feed it with entries from the file
 
270
                _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=5)
 
271
                self.filter.getFailures(self.name)
 
272
                self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 
273
                # and it should have not been enough
 
274
 
 
275
                _copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=5)
 
276
                self.filter.getFailures(self.name)
 
277
                _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
 
278
 
 
279
        def testNewChangeViaGetFailures_rewrite(self):
 
280
                #
 
281
                # if we rewrite the file at once
 
282
                self.file.close()
 
283
                _copy_lines_between_files(GetFailures.FILENAME_01, self.name)
 
284
                self.filter.getFailures(self.name)
 
285
                _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
 
286
 
 
287
                # What if file gets overridden
 
288
                # yoh: skip so we skip those 2 identical lines which our
 
289
                # filter "marked" as the known beginning, otherwise it
 
290
                # would not detect "rotation"
 
291
                self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
 
292
                                                                                          skip=3, mode='w')
 
293
                self.filter.getFailures(self.name)
 
294
                #self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 
295
                _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
 
296
 
 
297
        def testNewChangeViaGetFailures_move(self):
 
298
                #
 
299
                # if we move file into a new location while it has been open already
 
300
                self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
 
301
                                                                                          n=14, mode='w')
 
302
                self.filter.getFailures(self.name)
 
303
                self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 
304
                self.assertEqual(self.filter.failManager.getFailTotal(), 2)
 
305
 
 
306
                # move aside, but leaving the handle still open...
 
307
                os.rename(self.name, self.name + '.bak')
 
308
                _copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14)
 
309
                self.filter.getFailures(self.name)
 
310
                _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
 
311
                self.assertEqual(self.filter.failManager.getFailTotal(), 3)
 
312
 
 
313
 
 
314
from threading import Lock
 
315
class DummyJail(object):
 
316
        """A simple 'jail' to suck in all the tickets generated by Filter's
 
317
        """
 
318
        def __init__(self):
 
319
                self.lock = Lock()
 
320
                self.queue = []
 
321
 
 
322
        def __len__(self):
 
323
                try:
 
324
                        self.lock.acquire()
 
325
                        return len(self.queue)
 
326
                finally:
 
327
                        self.lock.release()
 
328
 
 
329
        def putFailTicket(self, ticket):
 
330
                try:
 
331
                        self.lock.acquire()
 
332
                        self.queue.append(ticket)
 
333
                finally:
 
334
                        self.lock.release()
 
335
 
 
336
        def getFailTicket(self):
 
337
                try:
 
338
                        self.lock.acquire()
 
339
                        return self.queue.pop()
 
340
                finally:
 
341
                        self.lock.release()
 
342
 
 
343
        def getName(self):
 
344
                return "DummyJail #%s with %d tickets" % (id(self), len(self))
 
345
 
 
346
def get_monitor_failures_testcase(Filter_):
 
347
        """Generator of TestCase's for different filters/backends
 
348
        """
 
349
 
 
350
        # add Filter_'s name so we could easily identify bad cows
 
351
        testclass_name = tempfile.mktemp(
 
352
                'fail2ban', 'monitorfailures_%s' % (Filter_.__name__,))
 
353
 
 
354
        class MonitorFailures(unittest.TestCase):
 
355
                count = 0
 
356
                def setUp(self):
 
357
                        """Call before every test case."""
 
358
                        self.filter = self.name = 'NA'
 
359
                        self.name = '%s-%d' % (testclass_name, self.count)
 
360
                        MonitorFailures.count += 1 # so we have unique filenames across tests
 
361
                        self.file = open(self.name, 'a')
 
362
                        self.jail = DummyJail()
 
363
                        self.filter = Filter_(self.jail)
 
364
                        self.filter.addLogPath(self.name)
 
365
                        self.filter.setActive(True)
 
366
                        self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
 
367
                        self.filter.start()
 
368
                        # If filter is polling it would sleep a bit to guarantee that
 
369
                        # we have initial time-stamp difference to trigger "actions"
 
370
                        self._sleep_4_poll()
 
371
                        #print "D: started filter %s" % self.filter
 
372
 
 
373
 
 
374
                def tearDown(self):
 
375
                        #print "D: SLEEPING A BIT"
 
376
                        #import time; time.sleep(5)
 
377
                        #print "D: TEARING DOWN"
 
378
                        self.filter.stop()
 
379
                        #print "D: WAITING FOR FILTER TO STOP"
 
380
                        self.filter.join()                # wait for the thread to terminate
 
381
                        #print "D: KILLING THE FILE"
 
382
                        _killfile(self.file, self.name)
 
383
                        #time.sleep(0.2)                          # Give FS time to ack the removal
 
384
                        pass
 
385
 
 
386
                def isFilled(self, delay=2.):
 
387
                        """Wait up to `delay` sec to assure that it was modified or not
 
388
                        """
 
389
                        time0 = time.time()
 
390
                        while time.time() < time0 + delay:
 
391
                                if len(self.jail):
 
392
                                        return True
 
393
                                time.sleep(0.1)
 
394
                        return False
 
395
 
 
396
                def _sleep_4_poll(self):
 
397
                        # Since FilterPoll relies on time stamps and some
 
398
                        # actions might be happening too fast in the tests,
 
399
                        # sleep a bit to guarantee reliable time stamps
 
400
                        if isinstance(self.filter, FilterPoll):
 
401
                                _sleep_4_poll()
 
402
 
 
403
                def isEmpty(self, delay=0.4):
 
404
                        # shorter wait time for not modified status
 
405
                        return not self.isFilled(delay)
 
406
 
 
407
                def assert_correct_last_attempt(self, failures, count=None):
 
408
                        self.assertTrue(self.isFilled(20)) # give Filter a chance to react
 
409
                        _assert_correct_last_attempt(self, self.jail, failures, count=count)
 
410
 
 
411
 
 
412
                def test_grow_file(self):
 
413
                        # suck in lines from this sample log file
 
414
                        self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 
415
 
 
416
                        # Now let's feed it with entries from the file
 
417
                        _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=5)
 
418
                        self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 
419
                        # and our dummy jail is empty as well
 
420
                        self.assertFalse(len(self.jail))
 
421
                        # since it should have not been enough
 
422
 
 
423
                        _copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=5)
 
424
                        self.assertTrue(self.isFilled(6))
 
425
                        # so we sleep for up to 2 sec for it not to become empty,
 
426
                        # and meanwhile pass to other thread(s) and filter should
 
427
                        # have gathered new failures and passed them into the
 
428
                        # DummyJail
 
429
                        self.assertEqual(len(self.jail), 1)
 
430
                        # and there should be no "stuck" ticket in failManager
 
431
                        self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 
432
                        self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 
433
                        self.assertEqual(len(self.jail), 0)
 
434
 
 
435
                        #return
 
436
                        # just for fun let's copy all of them again and see if that results
 
437
                        # in a new ban
 
438
                        _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
 
439
                        self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 
440
 
 
441
                def test_rewrite_file(self):
 
442
                        # if we rewrite the file at once
 
443
                        self.file.close()
 
444
                        _copy_lines_between_files(GetFailures.FILENAME_01, self.name)
 
445
                        self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 
446
 
 
447
                        # What if file gets overridden
 
448
                        # yoh: skip so we skip those 2 identical lines which our
 
449
                        # filter "marked" as the known beginning, otherwise it
 
450
                        # would not detect "rotation"
 
451
                        self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
 
452
                                                                                                  skip=3, mode='w')
 
453
                        self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 
454
 
 
455
 
 
456
                def test_move_file(self):
 
457
                        # if we move file into a new location while it has been open already
 
458
                        self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
 
459
                                                                                                  n=14, mode='w')
 
460
                        # Poll might need more time
 
461
                        self.assertTrue(self.isEmpty(4 + int(isinstance(self.filter, FilterPoll))*2))
 
462
                        self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 
463
                        self.assertEqual(self.filter.failManager.getFailTotal(), 2)
 
464
 
 
465
                        # move aside, but leaving the handle still open...
 
466
                        os.rename(self.name, self.name + '.bak')
 
467
                        _copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14)
 
468
                        self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 
469
                        self.assertEqual(self.filter.failManager.getFailTotal(), 3)
 
470
 
 
471
                        # now remove the moved file
 
472
                        _killfile(None, self.name + '.bak')
 
473
                        _copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
 
474
                        self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 
475
                        self.assertEqual(self.filter.failManager.getFailTotal(), 6)
 
476
 
 
477
 
 
478
                def _test_move_into_file(self, interim_kill=False):
 
479
                        # if we move a new file into the location of an old (monitored) file
 
480
                        _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
 
481
                                                                          n=100).close()
 
482
                        # make sure that it is monitored first
 
483
                        self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 
484
                        self.assertEqual(self.filter.failManager.getFailTotal(), 3)
 
485
 
 
486
                        if interim_kill:
 
487
                                _killfile(None, self.name)
 
488
                                time.sleep(0.2)                           # let them know
 
489
 
 
490
                        # now create a new one to override old one
 
491
                        _copy_lines_between_files(GetFailures.FILENAME_01, self.name + '.new',
 
492
                                                                          n=100).close()
 
493
                        os.rename(self.name + '.new', self.name)
 
494
                        self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 
495
                        self.assertEqual(self.filter.failManager.getFailTotal(), 6)
 
496
 
 
497
                        # and to make sure that it now monitored for changes
 
498
                        _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
 
499
                                                                          n=100).close()
 
500
                        self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 
501
                        self.assertEqual(self.filter.failManager.getFailTotal(), 9)
 
502
 
 
503
 
 
504
                def test_move_into_file(self):
 
505
                        self._test_move_into_file(interim_kill=False)
 
506
 
 
507
                def test_move_into_file_after_removed(self):
 
508
                        # exactly as above test + remove file explicitly
 
509
                        # to test against possible drop-out of the file from monitoring
 
510
                    self._test_move_into_file(interim_kill=True)
 
511
 
 
512
 
 
513
                def test_new_bogus_file(self):
 
514
                        # to make sure that watching whole directory does not effect
 
515
                        _copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
 
516
                        self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 
517
 
 
518
                        # create a bogus file in the same directory and see if that doesn't affect
 
519
                        open(self.name + '.bak2', 'w').write('')
 
520
                        _copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
 
521
                        self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 
522
                        self.assertEqual(self.filter.failManager.getFailTotal(), 6)
 
523
                        _killfile(None, self.name + '.bak2')
 
524
 
 
525
 
 
526
                def test_delLogPath(self):
 
527
                        # Smoke test for removing of the path from being watched
 
528
 
 
529
                        # basic full test
 
530
                        _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
 
531
                        self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 
532
 
 
533
                        # and now remove the LogPath
 
534
                        self.filter.delLogPath(self.name)
 
535
 
 
536
                        _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
 
537
                        # so we should get no more failures detected
 
538
                        self.assertTrue(self.isEmpty(2))
 
539
 
 
540
                        # but then if we add it back again
 
541
                        self.filter.addLogPath(self.name)
 
542
                        # Tricky catch here is that it should get them from the
 
543
                        # tail written before, so let's not copy anything yet
 
544
                        #_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
 
545
                        # we should detect the failures
 
546
                        self.assert_correct_last_attempt(GetFailures.FAILURES_01, count=6) # was needed if we write twice above
 
547
 
 
548
                        # now copy and get even more
 
549
                        _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
 
550
                        # yoh: not sure why count here is not 9... TODO
 
551
                        self.assert_correct_last_attempt(GetFailures.FAILURES_01)#, count=9)
 
552
 
 
553
        MonitorFailures.__name__ = "MonitorFailures<%s>(%s)" \
 
554
                          % (Filter_.__name__, testclass_name) # 'tempfile')
 
555
        return MonitorFailures
83
556
 
84
557
 
85
558
class GetFailures(unittest.TestCase):
88
561
        FILENAME_02 = "testcases/files/testcase02.log"
89
562
        FILENAME_03 = "testcases/files/testcase03.log"
90
563
        FILENAME_04 = "testcases/files/testcase04.log"
 
564
        FILENAME_USEDNS = "testcases/files/testcase-usedns.log"
 
565
 
 
566
        # so that they could be reused by other tests
 
567
        FAILURES_01 = ('193.168.0.128', 3, 1124013599.0,
 
568
                                  ['Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128\n']*3)
91
569
 
92
570
        def setUp(self):
93
571
                """Call before every test case."""
94
 
                self.__filter = FileFilter(None)
95
 
                self.__filter.setActive(True)
 
572
                self.filter = FileFilter(None)
 
573
                self.filter.setActive(True)
96
574
                # TODO Test this
97
 
                #self.__filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}")
98
 
                #self.__filter.setTimePattern("%b %d %H:%M:%S")
 
575
                #self.filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}")
 
576
                #self.filter.setTimePattern("%b %d %H:%M:%S")
99
577
 
100
578
        def tearDown(self):
101
579
                """Call after every test case."""
102
580
 
103
 
        def _assertEqualEntries(self, found, output):
104
 
                """Little helper to unify comparisons with the target entries
105
 
 
106
 
                and report helpful failure reports instead of millions of seconds ;)
107
 
                """
108
 
                self.assertEqual(found[:2], output[:2])
109
 
                found_time, output_time = \
110
 
                                        time.localtime(found[2]),\
111
 
                                        time.localtime(output[2])
112
 
                self.assertEqual(found_time, output_time)
113
 
                if len(found) > 3:                              # match matches
114
 
                        self.assertEqual(found[3], output[3])
115
 
 
116
 
 
117
 
        def testGetFailures01(self):
118
 
                output = ('193.168.0.128', 3, 1124013599.0,
119
 
                                  ['Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128\n']*3)
120
 
 
121
 
                self.__filter.addLogPath(GetFailures.FILENAME_01)
122
 
                self.__filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
123
 
 
124
 
                self.__filter.getFailures(GetFailures.FILENAME_01)
125
 
 
126
 
                ticket = self.__filter.failManager.toBan()
127
 
 
128
 
                attempts = ticket.getAttempt()
129
 
                date = ticket.getTime()
130
 
                ip = ticket.getIP()
131
 
                matches = ticket.getMatches()
132
 
                found = (ip, attempts, date, matches)
133
 
 
134
 
                self._assertEqualEntries(found, output)
135
 
        
 
581
 
 
582
 
 
583
        def testGetFailures01(self, filename=None, failures=None):
 
584
                filename = filename or GetFailures.FILENAME_01
 
585
                failures = failures or GetFailures.FAILURES_01
 
586
 
 
587
                self.filter.addLogPath(filename)
 
588
                self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>$")
 
589
                self.filter.getFailures(filename)
 
590
                _assert_correct_last_attempt(self, self.filter,  failures)
 
591
 
 
592
        def testCRLFFailures01(self):
 
593
                # We first adjust logfile/failures to end with CR+LF
 
594
                fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='crlf')
 
595
                # poor man unix2dos:
 
596
                fin, fout = open(GetFailures.FILENAME_01), open(fname, 'w')
 
597
                for l in fin.readlines():
 
598
                        fout.write('%s\r\n' % l.rstrip('\n'))
 
599
                fin.close()
 
600
                fout.close()
 
601
 
 
602
                # now see if we should be getting the "same" failures
 
603
                self.testGetFailures01(filename=fname,
 
604
                                                           failures=GetFailures.FAILURES_01[:3] +
 
605
                                                           ([x.rstrip('\n') + '\r\n' for x in
 
606
                                                                 GetFailures.FAILURES_01[-1]],))
 
607
                _killfile(fout, fname)
 
608
 
 
609
 
136
610
        def testGetFailures02(self):
137
611
                output = ('141.3.81.106', 4, 1124013539.0,
138
612
                                  ['Aug 14 11:%d:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:141.3.81.106 port 51332 ssh2\n'
139
613
                                   % m for m in 53, 54, 57, 58])
140
614
 
141
 
                self.__filter.addLogPath(GetFailures.FILENAME_02)
142
 
                self.__filter.addFailRegex("Failed .* from <HOST>")
143
 
                
144
 
                self.__filter.getFailures(GetFailures.FILENAME_02)
145
 
                
146
 
                ticket = self.__filter.failManager.toBan()
147
 
 
148
 
                attempts = ticket.getAttempt()
149
 
                date = ticket.getTime()
150
 
                ip = ticket.getIP()
151
 
                matches = ticket.getMatches()
152
 
                found = (ip, attempts, date, matches)
153
 
                
154
 
                self._assertEqualEntries(found, output)
 
615
                self.filter.addLogPath(GetFailures.FILENAME_02)
 
616
                self.filter.addFailRegex("Failed .* from <HOST>")
 
617
                self.filter.getFailures(GetFailures.FILENAME_02)
 
618
                _assert_correct_last_attempt(self, self.filter, output)
155
619
 
156
620
        def testGetFailures03(self):
157
621
                output = ('203.162.223.135', 6, 1124013544.0)
158
622
 
159
 
                self.__filter.addLogPath(GetFailures.FILENAME_03)
160
 
                self.__filter.addFailRegex("error,relay=<HOST>,.*550 User unknown")
161
 
                
162
 
                self.__filter.getFailures(GetFailures.FILENAME_03)
163
 
                
164
 
                ticket = self.__filter.failManager.toBan()
165
 
                
166
 
                attempts = ticket.getAttempt()
167
 
                date = ticket.getTime()
168
 
                ip = ticket.getIP()
169
 
                found = (ip, attempts, date)
170
 
                
171
 
                self._assertEqualEntries(found, output) 
 
623
                self.filter.addLogPath(GetFailures.FILENAME_03)
 
624
                self.filter.addFailRegex("error,relay=<HOST>,.*550 User unknown")
 
625
                self.filter.getFailures(GetFailures.FILENAME_03)
 
626
                _assert_correct_last_attempt(self, self.filter, output)
172
627
 
173
628
        def testGetFailures04(self):
174
629
                output = [('212.41.96.186', 4, 1124013600.0),
175
630
                                  ('212.41.96.185', 4, 1124013598.0)]
176
631
 
177
 
                self.__filter.addLogPath(GetFailures.FILENAME_04)
178
 
                self.__filter.addFailRegex("Invalid user .* <HOST>")
179
 
                
180
 
                self.__filter.getFailures(GetFailures.FILENAME_04)
 
632
                self.filter.addLogPath(GetFailures.FILENAME_04)
 
633
                self.filter.addFailRegex("Invalid user .* <HOST>")
 
634
                self.filter.getFailures(GetFailures.FILENAME_04)
181
635
 
182
636
                try:
183
 
                        for i in range(2):
184
 
                                ticket = self.__filter.failManager.toBan()              
185
 
                                attempts = ticket.getAttempt()
186
 
                                date = ticket.getTime()
187
 
                                ip = ticket.getIP()
188
 
                                found = (ip, attempts, date)
189
 
                                self.assertEqual(found, output[i])
 
637
                        for i, out in enumerate(output):
 
638
                                _assert_correct_last_attempt(self, self.filter, out)
190
639
                except FailManagerEmpty:
191
640
                        pass
192
 
                
 
641
 
 
642
        def testGetFailuresUseDNS(self):
 
643
                # We should still catch failures with usedns = no ;-)
 
644
                output_yes = ('192.0.43.10', 2, 1124013539.0,
 
645
                                          ['Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2\n',
 
646
                                           'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:192.0.43.10 port 51332 ssh2\n'])
 
647
 
 
648
                output_no = ('192.0.43.10', 1, 1124013539.0,
 
649
                                          ['Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:192.0.43.10 port 51332 ssh2\n'])
 
650
 
 
651
                # Actually no exception would be raised -- it will be just set to 'no'
 
652
                #self.assertRaises(ValueError,
 
653
                #                                 FileFilter, None, useDns='wrong_value_for_useDns')
 
654
 
 
655
                for useDns, output in (('yes',  output_yes),
 
656
                                                           ('no',   output_no),
 
657
                                                           ('warn', output_yes)):
 
658
                        filter_ = FileFilter(None, useDns=useDns)
 
659
                        filter_.setActive(True)
 
660
                        filter_.failManager.setMaxRetry(1)      # we might have just few failures
 
661
 
 
662
                        filter_.addLogPath(GetFailures.FILENAME_USEDNS)
 
663
                        filter_.addFailRegex("Failed .* from <HOST>")
 
664
                        filter_.getFailures(GetFailures.FILENAME_USEDNS)
 
665
                        _assert_correct_last_attempt(self, filter_, output)
 
666
 
 
667
 
 
668
 
193
669
        def testGetFailuresMultiRegex(self):
194
670
                output = ('141.3.81.106', 8, 1124013541.0)
195
671
 
196
 
                self.__filter.addLogPath(GetFailures.FILENAME_02)
197
 
                self.__filter.addFailRegex("Failed .* from <HOST>")
198
 
                self.__filter.addFailRegex("Accepted .* from <HOST>")
199
 
                
200
 
                self.__filter.getFailures(GetFailures.FILENAME_02)
201
 
                
202
 
                ticket = self.__filter.failManager.toBan()
 
672
                self.filter.addLogPath(GetFailures.FILENAME_02)
 
673
                self.filter.addFailRegex("Failed .* from <HOST>")
 
674
                self.filter.addFailRegex("Accepted .* from <HOST>")
 
675
                self.filter.getFailures(GetFailures.FILENAME_02)
 
676
                _assert_correct_last_attempt(self, self.filter, output)
203
677
 
204
 
                attempts = ticket.getAttempt()
205
 
                date = ticket.getTime()
206
 
                ip = ticket.getIP()
207
 
                found = (ip, attempts, date)
208
 
                
209
 
                self._assertEqualEntries(found, output)
210
 
        
211
678
        def testGetFailuresIgnoreRegex(self):
212
679
                output = ('141.3.81.106', 8, 1124013541.0)
213
680
 
214
 
                self.__filter.addLogPath(GetFailures.FILENAME_02)
215
 
                self.__filter.addFailRegex("Failed .* from <HOST>")
216
 
                self.__filter.addFailRegex("Accepted .* from <HOST>")
217
 
                self.__filter.addIgnoreRegex("for roehl")
218
 
                
219
 
                self.__filter.getFailures(GetFailures.FILENAME_02)
220
 
                
221
 
                self.assertRaises(FailManagerEmpty, self.__filter.failManager.toBan)
 
681
                self.filter.addLogPath(GetFailures.FILENAME_02)
 
682
                self.filter.addFailRegex("Failed .* from <HOST>")
 
683
                self.filter.addFailRegex("Accepted .* from <HOST>")
 
684
                self.filter.addIgnoreRegex("for roehl")
 
685
 
 
686
                self.filter.getFailures(GetFailures.FILENAME_02)
 
687
 
 
688
                self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
222
689
 
223
690
class DNSUtilsTests(unittest.TestCase):
224
691
 
 
692
        def testUseDns(self):
 
693
                res = DNSUtils.textToIp('www.example.com', 'no')
 
694
                self.assertEqual(res, [])
 
695
                res = DNSUtils.textToIp('www.example.com', 'warn')
 
696
                self.assertEqual(res, ['192.0.43.10'])
 
697
                res = DNSUtils.textToIp('www.example.com', 'yes')
 
698
                self.assertEqual(res, ['192.0.43.10'])
 
699
 
225
700
        def testTextToIp(self):
226
 
                bogus = [
227
 
                        'doh1.2.3.4.buga.xxxxx.yyy',
228
 
                        '1.2.3.4.buga.xxxxx.yyy',
 
701
                # Test hostnames
 
702
                hostnames = [
 
703
                        'www.example.com',
 
704
                        'doh1.2.3.4.buga.xxxxx.yyy.invalid',
 
705
                        '1.2.3.4.buga.xxxxx.yyy.invalid',
229
706
                        ]
230
 
                """Really bogus addresses which should have no matches"""
231
 
                for s in bogus:
232
 
                        res = DNSUtils.textToIp(s)
233
 
                        self.assertEqual(res, [])
 
707
                for s in hostnames:
 
708
                        res = DNSUtils.textToIp(s, 'yes')
 
709
                        if s == 'www.example.com':
 
710
                                self.assertEqual(res, ['192.0.43.10'])
 
711
                        else:
 
712
                                self.assertEqual(res, [])
 
713
 
 
714
class JailTests(unittest.TestCase):
 
715
 
 
716
        def testSetBackend_gh83(self):
 
717
                # smoke test
 
718
                jail = Jail('test', backend='polling') # Must not fail to initiate
 
719