17
17
# along with Fail2Ban; if not, write to the Free Software
18
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20
# Author: Cyril Jaquier
24
__author__ = "Cyril Jaquier"
25
__version__ = "$Revision$"
27
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
20
__author__ = "Cyril Jaquier, Yaroslav Halchenko"
21
__copyright__ = "Copyright (c) 2004 Cyril Jaquier, 2011-2013 Yaroslav Halchenko"
28
22
__license__ = "GPL"
24
import os, shutil, tempfile, unittest
25
from client.configreader import ConfigReader
31
26
from client.jailreader import JailReader
27
from client.jailsreader import JailsReader
28
from client.configurator import Configurator
33
class JailReaderTest(unittest.TestCase):
30
class ConfigReaderTest(unittest.TestCase):
36
33
"""Call before every test case."""
34
self.d = tempfile.mkdtemp(prefix="f2b-temp")
35
self.c = ConfigReader(basedir=self.d)
38
37
def tearDown(self):
39
38
"""Call after every test case."""
41
def _write(self, fname, value):
42
# verify if we don't need to create .d directory
43
if os.path.sep in fname:
44
d = os.path.dirname(fname)
45
d_ = os.path.join(self.d, d)
46
if not os.path.exists(d_):
48
open("%s/%s" % (self.d, fname), "w").write("""
53
def _remove(self, fname):
54
os.unlink("%s/%s" % (self.d, fname))
55
self.assertTrue(self.c.read('c')) # we still should have some
58
def _getoption(self, f='c'):
59
self.assertTrue(self.c.read(f)) # we got some now
60
return self.c.getOptions('section', [("int", 'option')])['option']
63
def testInaccessibleFile(self):
64
f = os.path.join(self.d, "d.conf") # inaccessible file
65
self._write('d.conf', 0)
66
self.assertEqual(self._getoption('d'), 0)
68
self.assertFalse(self.c.read('d')) # should not be readable BUT present
71
def testOptionalDotDDir(self):
72
self.assertFalse(self.c.read('c')) # nothing is there yet
73
self._write("c.conf", "1")
74
self.assertEqual(self._getoption(), 1)
75
self._write("c.conf", "2") # overwrite
76
self.assertEqual(self._getoption(), 2)
77
self._write("c.local", "3") # add override in .local
78
self.assertEqual(self._getoption(), 3)
79
self._write("c.d/98.conf", "998") # add 1st override in .d/
80
self.assertEqual(self._getoption(), 998)
81
self._write("c.d/90.conf", "990") # add previously sorted override in .d/
82
self.assertEqual(self._getoption(), 998) # should stay the same
83
self._write("c.d/99.conf", "999") # now override in a way without sorting we possibly get a failure
84
self.assertEqual(self._getoption(), 999)
85
self._remove("c.d/99.conf")
86
self.assertEqual(self._getoption(), 998)
87
self._remove("c.d/98.conf")
88
self.assertEqual(self._getoption(), 990)
89
self._remove("c.d/90.conf")
90
self.assertEqual(self._getoption(), 3)
91
self._remove("c.conf") # we allow to stay without .conf
92
self.assertEqual(self._getoption(), 3)
93
self._write("c.conf", "1")
94
self._remove("c.local")
95
self.assertEqual(self._getoption(), 1)
98
class JailReaderTest(unittest.TestCase):
100
def testStockSSHJail(self):
101
jail = JailReader('ssh-iptables', basedir='config') # we are running tests from root project dir atm
102
self.assertTrue(jail.read())
103
self.assertTrue(jail.getOptions())
104
self.assertFalse(jail.isEnabled())
105
self.assertEqual(jail.getName(), 'ssh-iptables')
41
107
def testSplitAction(self):
42
108
action = "mail-whois[name=SSH]"
43
109
expected = ['mail-whois', {'name': 'SSH'}]
44
110
result = JailReader.splitAction(action)
45
111
self.assertEquals(expected, result)
113
class JailsReaderTest(unittest.TestCase):
115
def testProvidingBadBasedir(self):
116
if not os.path.exists('/XXX'):
117
reader = JailsReader(basedir='/XXX')
118
self.assertRaises(ValueError, reader.read)
120
def testReadStockJailConf(self):
121
jails = JailsReader(basedir='config') # we are running tests from root project dir atm
122
self.assertTrue(jails.read()) # opens fine
123
self.assertTrue(jails.getOptions()) # reads fine
124
comm_commands = jails.convert()
125
# by default None of the jails is enabled and we get no
126
# commands to communicate to the server
127
self.assertEqual(comm_commands, [])
129
def testReadStockJailConfForceEnabled(self):
130
# more of a smoke test to make sure that no obvious surprises
131
# on users' systems when enabling shipped jails
132
jails = JailsReader(basedir='config', force_enable=True) # we are running tests from root project dir atm
133
self.assertTrue(jails.read()) # opens fine
134
self.assertTrue(jails.getOptions()) # reads fine
135
comm_commands = jails.convert()
137
# by default we have lots of jails ;)
138
self.assertTrue(len(comm_commands))
140
# and we know even some of them by heart
141
for j in ['ssh-iptables', 'recidive']:
142
# by default we have 'auto' backend ATM
143
self.assertTrue(['add', j, 'auto'] in comm_commands)
145
self.assertTrue(['set', j, 'usedns', 'warn'] in comm_commands)
146
self.assertTrue(['start', j] in comm_commands)
147
# last commands should be the 'start' commands
148
self.assertEqual(comm_commands[-1][0], 'start')
149
# TODO: make sure that all of the jails have actions assigned,
150
# otherwise it makes little to no sense
152
def testConfigurator(self):
153
configurator = Configurator()
154
configurator.setBaseDir('config')
155
self.assertEqual(configurator.getBaseDir(), 'config')
157
configurator.readEarly()
158
opts = configurator.getEarlyOptions()
159
# our current default settings
160
self.assertEqual(opts['socket'], '/var/run/fail2ban/fail2ban.sock')
161
self.assertEqual(opts['pidfile'], '/var/run/fail2ban/fail2ban.pid')
163
configurator.getOptions()
164
configurator.convertToProtocol()
165
commands = configurator.getConfigStream()
166
# and there is logging information left to be passed into the
168
self.assertEqual(commands,
169
[['set', 'loglevel', 3],
170
['set', 'logtarget', '/var/log/fail2ban.log']])
172
# and if we force change configurator's fail2ban's baseDir
173
# there should be an error message (test visually ;) --
174
# otherwise just a code smoke test)
175
configurator._Configurator__jails.setBaseDir('/tmp')
176
self.assertEqual(configurator._Configurator__jails.getBaseDir(), '/tmp')
177
self.assertEqual(configurator.getBaseDir(), 'config')