16
16
# You should have received a copy of the GNU General Public License
17
17
# along with Fail2Ban; if not, write to the Free Software
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20
# Author: Cyril Jaquier
24
__author__ = "Cyril Jaquier"
25
__version__ = "$Revision$"
27
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
22
__copyright__ = "Copyright (c) 2004 Cyril Jaquier; 2012 Yaroslav Halchenko"
28
23
__license__ = "GPL"
25
from __builtin__ import open as fopen
32
from server.jail import Jail
33
33
from server.filterpoll import FilterPoll
34
34
from server.filter import FileFilter, DNSUtils
35
35
from server.failmanager import FailManager
36
36
from server.failmanager import FailManagerEmpty
42
# yoh: per Steven Hiscocks's insight while troubleshooting
43
# https://github.com/fail2ban/fail2ban/issues/103#issuecomment-15542836
44
# adding a sufficiently large buffer might help to guarantee that
45
# writes happen atomically.
47
"""Overload built in open so we could assure sufficiently large buffer
49
Explicit .flush would be needed to assure that changes leave the buffer
52
# ~50kB buffer should be sufficient for all tests here.
53
args = args + (50000,)
56
def _killfile(f, name):
66
# there might as well be the .bak file
67
if os.path.exists(name + '.bak'):
68
_killfile(None, name + '.bak')
72
"""PollFilter relies on file timestamps - so we might need to
73
sleep to guarantee that they differ
75
if sys.version_info[:2] <= (2,4):
76
# on old Python st_mtime is int, so we should give
77
# at least 1 sec so polling filter could detect
83
def _assert_equal_entries(utest, found, output, count=None):
84
"""Little helper to unify comparisons with the target entries
86
and report helpful failure reports instead of millions of seconds ;)
88
utest.assertEqual(found[0], output[0]) # IP
89
utest.assertEqual(found[1], count or output[1]) # count
90
found_time, output_time = \
91
time.localtime(found[2]),\
92
time.localtime(output[2])
93
utest.assertEqual(found_time, output_time)
94
if len(output) > 3 and count is None: # match matches
95
# do not check if custom count (e.g. going through them twice)
96
utest.assertEqual(repr(found[3]), repr(output[3]))
98
def _assert_correct_last_attempt(utest, filter_, output, count=None):
99
"""Additional helper to wrap most common test case
101
Test filter to contain target ticket
103
if isinstance(filter_, DummyJail):
104
ticket = filter_.getFailTicket()
106
# when we are testing without jails
107
ticket = filter_.failManager.toBan()
109
attempts = ticket.getAttempt()
110
date = ticket.getTime()
112
matches = ticket.getMatches()
113
found = (ip, attempts, date, matches)
115
_assert_equal_entries(utest, found, output, count)
117
def _copy_lines_between_files(fin, fout, n=None, skip=0, mode='a', terminal_line=""):
118
"""Copy lines from one file to another (which might be already open)
122
if sys.version_info[:2] <= (2,4): # pragma: no cover
123
# on old Python st_mtime is int, so we should give at least 1 sec so
124
# polling filter could detect the change
126
if isinstance(fin, str): # pragma: no branch - only used with str in test cases
129
for i in xrange(skip):
134
while n is None or i < n:
136
if terminal_line is not None and l == terminal_line:
140
# Write: all at once and flush
141
if isinstance(fout, str):
142
fout = open(fout, mode)
143
fout.write('\n'.join(lines))
145
# to give other threads possibly some time to crunch
38
153
class IgnoreIP(unittest.TestCase):
41
156
"""Call before every test case."""
42
self.__filter = FileFilter(None)
157
self.filter = FileFilter(None)
44
159
def tearDown(self):
45
160
"""Call after every test case."""
71
188
"""Call before every test case."""
72
self.__filter = FilterPoll(None)
73
self.__filter.addLogPath(LogFile.FILENAME)
189
self.filter = FilterPoll(None)
190
self.filter.addLogPath(LogFile.FILENAME)
75
192
def tearDown(self):
76
193
"""Call after every test case."""
78
196
#def testOpen(self):
79
# self.__filter.openLogFile(LogFile.FILENAME)
197
# self.filter.openLogFile(LogFile.FILENAME)
81
199
def testIsModified(self):
82
self.assertTrue(self.__filter.isModified(LogFile.FILENAME))
200
self.assertTrue(self.filter.isModified(LogFile.FILENAME))
203
class LogFileMonitor(unittest.TestCase):
204
"""Few more tests for FilterPoll API
207
"""Call before every test case."""
208
self.filter = self.name = 'NA'
209
_, self.name = tempfile.mkstemp('fail2ban', 'monitorfailures')
210
self.file = open(self.name, 'a')
211
self.filter = FilterPoll(None)
212
self.filter.addLogPath(self.name)
213
self.filter.setActive(True)
214
self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
217
_killfile(self.file, self.name)
220
def isModified(self, delay=2.):
221
"""Wait up to `delay` sec to assure that it was modified or not
224
while time.time() < time0 + delay:
225
if self.filter.isModified(self.name):
230
def notModified(self):
231
# shorter wait time for not modified status
232
return not self.isModified(0.4)
234
def testNewChangeViaIsModified(self):
235
# it is a brand new one -- so first we think it is modified
236
self.assertTrue(self.isModified())
238
self.assertTrue(self.notModified())
239
self.assertTrue(self.notModified())
240
_sleep_4_poll() # to guarantee freshier mtime
241
for i in range(4): # few changes
242
# unless we write into it
243
self.file.write("line%d\n" % i)
245
self.assertTrue(self.isModified())
246
self.assertTrue(self.notModified())
247
_sleep_4_poll() # to guarantee freshier mtime
248
os.rename(self.name, self.name + '.old')
249
# we are not signaling as modified whenever
251
self.assertTrue(self.notModified())
252
f = open(self.name, 'a')
253
self.assertTrue(self.isModified())
254
self.assertTrue(self.notModified())
256
f.write("line%d\n" % i)
258
self.assertTrue(self.isModified())
259
self.assertTrue(self.notModified())
260
_killfile(f, self.name)
261
_killfile(self.name, self.name + '.old')
264
def testNewChangeViaGetFailures_simple(self):
265
# suck in lines from this sample log file
266
self.filter.getFailures(self.name)
267
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
269
# Now let's feed it with entries from the file
270
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=5)
271
self.filter.getFailures(self.name)
272
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
273
# and it should have not been enough
275
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=5)
276
self.filter.getFailures(self.name)
277
_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
279
def testNewChangeViaGetFailures_rewrite(self):
281
# if we rewrite the file at once
283
_copy_lines_between_files(GetFailures.FILENAME_01, self.name)
284
self.filter.getFailures(self.name)
285
_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
287
# What if file gets overridden
288
# yoh: skip so we skip those 2 identical lines which our
289
# filter "marked" as the known beginning, otherwise it
290
# would not detect "rotation"
291
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
293
self.filter.getFailures(self.name)
294
#self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
295
_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
297
def testNewChangeViaGetFailures_move(self):
299
# if we move file into a new location while it has been open already
300
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
302
self.filter.getFailures(self.name)
303
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
304
self.assertEqual(self.filter.failManager.getFailTotal(), 2)
306
# move aside, but leaving the handle still open...
307
os.rename(self.name, self.name + '.bak')
308
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14)
309
self.filter.getFailures(self.name)
310
_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
311
self.assertEqual(self.filter.failManager.getFailTotal(), 3)
314
from threading import Lock
315
class DummyJail(object):
316
"""A simple 'jail' to suck in all the tickets generated by Filter's
325
return len(self.queue)
329
def putFailTicket(self, ticket):
332
self.queue.append(ticket)
336
def getFailTicket(self):
339
return self.queue.pop()
344
return "DummyJail #%s with %d tickets" % (id(self), len(self))
346
def get_monitor_failures_testcase(Filter_):
347
"""Generator of TestCase's for different filters/backends
350
# add Filter_'s name so we could easily identify bad cows
351
testclass_name = tempfile.mktemp(
352
'fail2ban', 'monitorfailures_%s' % (Filter_.__name__,))
354
class MonitorFailures(unittest.TestCase):
357
"""Call before every test case."""
358
self.filter = self.name = 'NA'
359
self.name = '%s-%d' % (testclass_name, self.count)
360
MonitorFailures.count += 1 # so we have unique filenames across tests
361
self.file = open(self.name, 'a')
362
self.jail = DummyJail()
363
self.filter = Filter_(self.jail)
364
self.filter.addLogPath(self.name)
365
self.filter.setActive(True)
366
self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
368
# If filter is polling it would sleep a bit to guarantee that
369
# we have initial time-stamp difference to trigger "actions"
371
#print "D: started filter %s" % self.filter
375
#print "D: SLEEPING A BIT"
376
#import time; time.sleep(5)
377
#print "D: TEARING DOWN"
379
#print "D: WAITING FOR FILTER TO STOP"
380
self.filter.join() # wait for the thread to terminate
381
#print "D: KILLING THE FILE"
382
_killfile(self.file, self.name)
383
#time.sleep(0.2) # Give FS time to ack the removal
386
def isFilled(self, delay=2.):
387
"""Wait up to `delay` sec to assure that it was modified or not
390
while time.time() < time0 + delay:
396
def _sleep_4_poll(self):
397
# Since FilterPoll relies on time stamps and some
398
# actions might be happening too fast in the tests,
399
# sleep a bit to guarantee reliable time stamps
400
if isinstance(self.filter, FilterPoll):
403
def isEmpty(self, delay=0.4):
404
# shorter wait time for not modified status
405
return not self.isFilled(delay)
407
def assert_correct_last_attempt(self, failures, count=None):
408
self.assertTrue(self.isFilled(20)) # give Filter a chance to react
409
_assert_correct_last_attempt(self, self.jail, failures, count=count)
412
def test_grow_file(self):
413
# suck in lines from this sample log file
414
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
416
# Now let's feed it with entries from the file
417
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=5)
418
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
419
# and our dummy jail is empty as well
420
self.assertFalse(len(self.jail))
421
# since it should have not been enough
423
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=5)
424
self.assertTrue(self.isFilled(6))
425
# so we sleep for up to 2 sec for it not to become empty,
426
# and meanwhile pass to other thread(s) and filter should
427
# have gathered new failures and passed them into the
429
self.assertEqual(len(self.jail), 1)
430
# and there should be no "stuck" ticket in failManager
431
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
432
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
433
self.assertEqual(len(self.jail), 0)
436
# just for fun let's copy all of them again and see if that results
438
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
439
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
441
def test_rewrite_file(self):
442
# if we rewrite the file at once
444
_copy_lines_between_files(GetFailures.FILENAME_01, self.name)
445
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
447
# What if file gets overridden
448
# yoh: skip so we skip those 2 identical lines which our
449
# filter "marked" as the known beginning, otherwise it
450
# would not detect "rotation"
451
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
453
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
456
def test_move_file(self):
457
# if we move file into a new location while it has been open already
458
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
460
# Poll might need more time
461
self.assertTrue(self.isEmpty(4 + int(isinstance(self.filter, FilterPoll))*2))
462
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
463
self.assertEqual(self.filter.failManager.getFailTotal(), 2)
465
# move aside, but leaving the handle still open...
466
os.rename(self.name, self.name + '.bak')
467
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14)
468
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
469
self.assertEqual(self.filter.failManager.getFailTotal(), 3)
471
# now remove the moved file
472
_killfile(None, self.name + '.bak')
473
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
474
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
475
self.assertEqual(self.filter.failManager.getFailTotal(), 6)
478
def _test_move_into_file(self, interim_kill=False):
479
# if we move a new file into the location of an old (monitored) file
480
_copy_lines_between_files(GetFailures.FILENAME_01, self.name,
482
# make sure that it is monitored first
483
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
484
self.assertEqual(self.filter.failManager.getFailTotal(), 3)
487
_killfile(None, self.name)
488
time.sleep(0.2) # let them know
490
# now create a new one to override old one
491
_copy_lines_between_files(GetFailures.FILENAME_01, self.name + '.new',
493
os.rename(self.name + '.new', self.name)
494
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
495
self.assertEqual(self.filter.failManager.getFailTotal(), 6)
497
# and to make sure that it now monitored for changes
498
_copy_lines_between_files(GetFailures.FILENAME_01, self.name,
500
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
501
self.assertEqual(self.filter.failManager.getFailTotal(), 9)
504
def test_move_into_file(self):
505
self._test_move_into_file(interim_kill=False)
507
def test_move_into_file_after_removed(self):
508
# exactly as above test + remove file explicitly
509
# to test against possible drop-out of the file from monitoring
510
self._test_move_into_file(interim_kill=True)
513
def test_new_bogus_file(self):
514
# to make sure that watching whole directory does not effect
515
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
516
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
518
# create a bogus file in the same directory and see if that doesn't affect
519
open(self.name + '.bak2', 'w').write('')
520
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
521
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
522
self.assertEqual(self.filter.failManager.getFailTotal(), 6)
523
_killfile(None, self.name + '.bak2')
526
def test_delLogPath(self):
527
# Smoke test for removing of the path from being watched
530
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
531
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
533
# and now remove the LogPath
534
self.filter.delLogPath(self.name)
536
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
537
# so we should get no more failures detected
538
self.assertTrue(self.isEmpty(2))
540
# but then if we add it back again
541
self.filter.addLogPath(self.name)
542
# Tricky catch here is that it should get them from the
543
# tail written before, so let's not copy anything yet
544
#_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
545
# we should detect the failures
546
self.assert_correct_last_attempt(GetFailures.FAILURES_01, count=6) # was needed if we write twice above
548
# now copy and get even more
549
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
550
# yoh: not sure why count here is not 9... TODO
551
self.assert_correct_last_attempt(GetFailures.FAILURES_01)#, count=9)
553
MonitorFailures.__name__ = "MonitorFailures<%s>(%s)" \
554
% (Filter_.__name__, testclass_name) # 'tempfile')
555
return MonitorFailures
85
558
class GetFailures(unittest.TestCase):
88
561
FILENAME_02 = "testcases/files/testcase02.log"
89
562
FILENAME_03 = "testcases/files/testcase03.log"
90
563
FILENAME_04 = "testcases/files/testcase04.log"
564
FILENAME_USEDNS = "testcases/files/testcase-usedns.log"
566
# so that they could be reused by other tests
567
FAILURES_01 = ('193.168.0.128', 3, 1124013599.0,
568
['Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128\n']*3)
93
571
"""Call before every test case."""
94
self.__filter = FileFilter(None)
95
self.__filter.setActive(True)
572
self.filter = FileFilter(None)
573
self.filter.setActive(True)
97
#self.__filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}")
98
#self.__filter.setTimePattern("%b %d %H:%M:%S")
575
#self.filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}")
576
#self.filter.setTimePattern("%b %d %H:%M:%S")
100
578
def tearDown(self):
101
579
"""Call after every test case."""
103
def _assertEqualEntries(self, found, output):
104
"""Little helper to unify comparisons with the target entries
106
and report helpful failure reports instead of millions of seconds ;)
108
self.assertEqual(found[:2], output[:2])
109
found_time, output_time = \
110
time.localtime(found[2]),\
111
time.localtime(output[2])
112
self.assertEqual(found_time, output_time)
113
if len(found) > 3: # match matches
114
self.assertEqual(found[3], output[3])
117
def testGetFailures01(self):
118
output = ('193.168.0.128', 3, 1124013599.0,
119
['Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128\n']*3)
121
self.__filter.addLogPath(GetFailures.FILENAME_01)
122
self.__filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
124
self.__filter.getFailures(GetFailures.FILENAME_01)
126
ticket = self.__filter.failManager.toBan()
128
attempts = ticket.getAttempt()
129
date = ticket.getTime()
131
matches = ticket.getMatches()
132
found = (ip, attempts, date, matches)
134
self._assertEqualEntries(found, output)
583
def testGetFailures01(self, filename=None, failures=None):
584
filename = filename or GetFailures.FILENAME_01
585
failures = failures or GetFailures.FAILURES_01
587
self.filter.addLogPath(filename)
588
self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>$")
589
self.filter.getFailures(filename)
590
_assert_correct_last_attempt(self, self.filter, failures)
592
def testCRLFFailures01(self):
593
# We first adjust logfile/failures to end with CR+LF
594
fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='crlf')
596
fin, fout = open(GetFailures.FILENAME_01), open(fname, 'w')
597
for l in fin.readlines():
598
fout.write('%s\r\n' % l.rstrip('\n'))
602
# now see if we should be getting the "same" failures
603
self.testGetFailures01(filename=fname,
604
failures=GetFailures.FAILURES_01[:3] +
605
([x.rstrip('\n') + '\r\n' for x in
606
GetFailures.FAILURES_01[-1]],))
607
_killfile(fout, fname)
136
610
def testGetFailures02(self):
137
611
output = ('141.3.81.106', 4, 1124013539.0,
138
612
['Aug 14 11:%d:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:141.3.81.106 port 51332 ssh2\n'
139
613
% m for m in 53, 54, 57, 58])
141
self.__filter.addLogPath(GetFailures.FILENAME_02)
142
self.__filter.addFailRegex("Failed .* from <HOST>")
144
self.__filter.getFailures(GetFailures.FILENAME_02)
146
ticket = self.__filter.failManager.toBan()
148
attempts = ticket.getAttempt()
149
date = ticket.getTime()
151
matches = ticket.getMatches()
152
found = (ip, attempts, date, matches)
154
self._assertEqualEntries(found, output)
615
self.filter.addLogPath(GetFailures.FILENAME_02)
616
self.filter.addFailRegex("Failed .* from <HOST>")
617
self.filter.getFailures(GetFailures.FILENAME_02)
618
_assert_correct_last_attempt(self, self.filter, output)
156
620
def testGetFailures03(self):
157
621
output = ('203.162.223.135', 6, 1124013544.0)
159
self.__filter.addLogPath(GetFailures.FILENAME_03)
160
self.__filter.addFailRegex("error,relay=<HOST>,.*550 User unknown")
162
self.__filter.getFailures(GetFailures.FILENAME_03)
164
ticket = self.__filter.failManager.toBan()
166
attempts = ticket.getAttempt()
167
date = ticket.getTime()
169
found = (ip, attempts, date)
171
self._assertEqualEntries(found, output)
623
self.filter.addLogPath(GetFailures.FILENAME_03)
624
self.filter.addFailRegex("error,relay=<HOST>,.*550 User unknown")
625
self.filter.getFailures(GetFailures.FILENAME_03)
626
_assert_correct_last_attempt(self, self.filter, output)
173
628
def testGetFailures04(self):
174
629
output = [('212.41.96.186', 4, 1124013600.0),
175
630
('212.41.96.185', 4, 1124013598.0)]
177
self.__filter.addLogPath(GetFailures.FILENAME_04)
178
self.__filter.addFailRegex("Invalid user .* <HOST>")
180
self.__filter.getFailures(GetFailures.FILENAME_04)
632
self.filter.addLogPath(GetFailures.FILENAME_04)
633
self.filter.addFailRegex("Invalid user .* <HOST>")
634
self.filter.getFailures(GetFailures.FILENAME_04)
184
ticket = self.__filter.failManager.toBan()
185
attempts = ticket.getAttempt()
186
date = ticket.getTime()
188
found = (ip, attempts, date)
189
self.assertEqual(found, output[i])
637
for i, out in enumerate(output):
638
_assert_correct_last_attempt(self, self.filter, out)
190
639
except FailManagerEmpty:
642
def testGetFailuresUseDNS(self):
643
# We should still catch failures with usedns = no ;-)
644
output_yes = ('192.0.43.10', 2, 1124013539.0,
645
['Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2\n',
646
'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:192.0.43.10 port 51332 ssh2\n'])
648
output_no = ('192.0.43.10', 1, 1124013539.0,
649
['Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:192.0.43.10 port 51332 ssh2\n'])
651
# Actually no exception would be raised -- it will be just set to 'no'
652
#self.assertRaises(ValueError,
653
# FileFilter, None, useDns='wrong_value_for_useDns')
655
for useDns, output in (('yes', output_yes),
657
('warn', output_yes)):
658
filter_ = FileFilter(None, useDns=useDns)
659
filter_.setActive(True)
660
filter_.failManager.setMaxRetry(1) # we might have just few failures
662
filter_.addLogPath(GetFailures.FILENAME_USEDNS)
663
filter_.addFailRegex("Failed .* from <HOST>")
664
filter_.getFailures(GetFailures.FILENAME_USEDNS)
665
_assert_correct_last_attempt(self, filter_, output)
193
669
def testGetFailuresMultiRegex(self):
194
670
output = ('141.3.81.106', 8, 1124013541.0)
196
self.__filter.addLogPath(GetFailures.FILENAME_02)
197
self.__filter.addFailRegex("Failed .* from <HOST>")
198
self.__filter.addFailRegex("Accepted .* from <HOST>")
200
self.__filter.getFailures(GetFailures.FILENAME_02)
202
ticket = self.__filter.failManager.toBan()
672
self.filter.addLogPath(GetFailures.FILENAME_02)
673
self.filter.addFailRegex("Failed .* from <HOST>")
674
self.filter.addFailRegex("Accepted .* from <HOST>")
675
self.filter.getFailures(GetFailures.FILENAME_02)
676
_assert_correct_last_attempt(self, self.filter, output)
204
attempts = ticket.getAttempt()
205
date = ticket.getTime()
207
found = (ip, attempts, date)
209
self._assertEqualEntries(found, output)
211
678
def testGetFailuresIgnoreRegex(self):
212
679
output = ('141.3.81.106', 8, 1124013541.0)
214
self.__filter.addLogPath(GetFailures.FILENAME_02)
215
self.__filter.addFailRegex("Failed .* from <HOST>")
216
self.__filter.addFailRegex("Accepted .* from <HOST>")
217
self.__filter.addIgnoreRegex("for roehl")
219
self.__filter.getFailures(GetFailures.FILENAME_02)
221
self.assertRaises(FailManagerEmpty, self.__filter.failManager.toBan)
681
self.filter.addLogPath(GetFailures.FILENAME_02)
682
self.filter.addFailRegex("Failed .* from <HOST>")
683
self.filter.addFailRegex("Accepted .* from <HOST>")
684
self.filter.addIgnoreRegex("for roehl")
686
self.filter.getFailures(GetFailures.FILENAME_02)
688
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
223
690
class DNSUtilsTests(unittest.TestCase):
692
def testUseDns(self):
693
res = DNSUtils.textToIp('www.example.com', 'no')
694
self.assertEqual(res, [])
695
res = DNSUtils.textToIp('www.example.com', 'warn')
696
self.assertEqual(res, ['192.0.43.10'])
697
res = DNSUtils.textToIp('www.example.com', 'yes')
698
self.assertEqual(res, ['192.0.43.10'])
225
700
def testTextToIp(self):
227
'doh1.2.3.4.buga.xxxxx.yyy',
228
'1.2.3.4.buga.xxxxx.yyy',
704
'doh1.2.3.4.buga.xxxxx.yyy.invalid',
705
'1.2.3.4.buga.xxxxx.yyy.invalid',
230
"""Really bogus addresses which should have no matches"""
232
res = DNSUtils.textToIp(s)
233
self.assertEqual(res, [])
708
res = DNSUtils.textToIp(s, 'yes')
709
if s == 'www.example.com':
710
self.assertEqual(res, ['192.0.43.10'])
712
self.assertEqual(res, [])
714
class JailTests(unittest.TestCase):
716
def testSetBackend_gh83(self):
718
jail = Jail('test', backend='polling') # Must not fail to initiate