~ubuntu-branches/ubuntu/saucy/fail2ban/saucy

« back to all changes in this revision

Viewing changes to testcases/clientreadertestcase.py

  • Committer: Package Import Robot
  • Author(s): Yaroslav Halchenko
  • Date: 2013-05-13 11:58:56 UTC
  • mfrom: (1.1.15) (10.2.13 sid)
  • Revision ID: package-import@ubuntu.com-20130513115856-x7six9p53qcie0vl
Tags: 0.8.9-1
* New upstream release
  - significant improvements in documentation (Closes: #400416)
  - roundcube auth filter (Closes: #699442)
  - enforces C locale for dates (Closes: #686341)
  - provides bash_completion.d/fail2ban
* debian/jail.conf:
  - added findtime and documentation on those basic options from jail.conf
    (Closes: #704568)
  - added new sample jails definitions for ssh-route, ssh-iptables-ipset{4,6},
    roundcube-auth, sogo-auth, mysqld-auth
* debian/control:
  - suggest system-log-daemon (Closes: #691001)
  - boost policy compliance to 3.9.4
* debian/rules:
  - run fail2ban's unittests at build time but ignore the failures
    (there are still some known issues to fix up to guarantee robust testing
    in clean chroots etc).
    Only pyinotify was added to build-depends since gamin might still be
    buggy on older releases and get stuck, which would complicate
    backporting

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
# along with Fail2Ban; if not, write to the Free Software
18
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
19
19
 
20
 
# Author: Cyril Jaquier
21
 
22
 
# $Revision$
23
 
 
24
 
__author__ = "Cyril Jaquier"
25
 
__version__ = "$Revision$"
26
 
__date__ = "$Date$"
27
 
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
 
20
__author__ = "Cyril Jaquier, Yaroslav Halchenko"
 
21
__copyright__ = "Copyright (c) 2004 Cyril Jaquier, 2011-2013 Yaroslav Halchenko"
28
22
__license__ = "GPL"
29
23
 
30
 
import unittest
 
24
import os, shutil, tempfile, unittest
 
25
from client.configreader import ConfigReader
31
26
from client.jailreader import JailReader
 
27
from client.jailsreader import JailsReader
 
28
from client.configurator import Configurator
32
29
 
33
 
class JailReaderTest(unittest.TestCase):
 
30
class ConfigReaderTest(unittest.TestCase):
34
31
 
35
32
        def setUp(self):
36
33
                """Call before every test case."""
 
34
                self.d = tempfile.mkdtemp(prefix="f2b-temp")
 
35
                self.c = ConfigReader(basedir=self.d)
37
36
 
38
37
        def tearDown(self):
39
38
                """Call after every test case."""
 
39
                shutil.rmtree(self.d)
 
40
 
 
41
        def _write(self, fname, value):
 
42
                # verify if we don't need to create .d directory
 
43
                if os.path.sep in fname:
 
44
                        d = os.path.dirname(fname)
 
45
                        d_ = os.path.join(self.d, d)
 
46
                        if not os.path.exists(d_):
 
47
                                os.makedirs(d_)
 
48
                open("%s/%s" % (self.d, fname), "w").write("""
 
49
[section]
 
50
option = %s
 
51
""" % value)
 
52
 
 
53
        def _remove(self, fname):
 
54
                os.unlink("%s/%s" % (self.d, fname))
 
55
                self.assertTrue(self.c.read('c'))       # we still should have some
 
56
 
 
57
 
 
58
        def _getoption(self, f='c'):
 
59
                self.assertTrue(self.c.read(f)) # we got some now
 
60
                return self.c.getOptions('section', [("int", 'option')])['option']
 
61
 
 
62
 
 
63
        def testInaccessibleFile(self):
 
64
                f = os.path.join(self.d, "d.conf")  # inaccessible file
 
65
                self._write('d.conf', 0)
 
66
                self.assertEqual(self._getoption('d'), 0)
 
67
                os.chmod(f, 0)
 
68
                self.assertFalse(self.c.read('d'))      # should not be readable BUT present
 
69
 
 
70
 
 
71
        def testOptionalDotDDir(self):
 
72
                self.assertFalse(self.c.read('c'))      # nothing is there yet
 
73
                self._write("c.conf", "1")
 
74
                self.assertEqual(self._getoption(), 1)
 
75
                self._write("c.conf", "2")              # overwrite
 
76
                self.assertEqual(self._getoption(), 2)
 
77
                self._write("c.local", "3")             # add override in .local
 
78
                self.assertEqual(self._getoption(), 3)
 
79
                self._write("c.d/98.conf", "998") # add 1st override in .d/
 
80
                self.assertEqual(self._getoption(), 998)
 
81
                self._write("c.d/90.conf", "990") # add previously sorted override in .d/
 
82
                self.assertEqual(self._getoption(), 998) #  should stay the same
 
83
                self._write("c.d/99.conf", "999") # now override in a way without sorting we possibly get a failure
 
84
                self.assertEqual(self._getoption(), 999)
 
85
                self._remove("c.d/99.conf")
 
86
                self.assertEqual(self._getoption(), 998)
 
87
                self._remove("c.d/98.conf")
 
88
                self.assertEqual(self._getoption(), 990)
 
89
                self._remove("c.d/90.conf")
 
90
                self.assertEqual(self._getoption(), 3)
 
91
                self._remove("c.conf")                  #  we allow to stay without .conf
 
92
                self.assertEqual(self._getoption(), 3)
 
93
                self._write("c.conf", "1")
 
94
                self._remove("c.local")
 
95
                self.assertEqual(self._getoption(), 1)
 
96
 
 
97
 
 
98
class JailReaderTest(unittest.TestCase):
 
99
 
 
100
        def testStockSSHJail(self):
 
101
                jail = JailReader('ssh-iptables', basedir='config') # we are running tests from root project dir atm
 
102
                self.assertTrue(jail.read())
 
103
                self.assertTrue(jail.getOptions())
 
104
                self.assertFalse(jail.isEnabled())
 
105
                self.assertEqual(jail.getName(), 'ssh-iptables')
40
106
 
41
107
        def testSplitAction(self):
42
108
                action = "mail-whois[name=SSH]"
43
109
                expected = ['mail-whois', {'name': 'SSH'}]
44
110
                result = JailReader.splitAction(action)
45
111
                self.assertEquals(expected, result)
46
 
                
 
112
 
 
113
class JailsReaderTest(unittest.TestCase):
 
114
 
 
115
        def testProvidingBadBasedir(self):
 
116
                if not os.path.exists('/XXX'):
 
117
                        reader = JailsReader(basedir='/XXX')
 
118
                        self.assertRaises(ValueError, reader.read)
 
119
 
 
120
        def testReadStockJailConf(self):
 
121
                jails = JailsReader(basedir='config') # we are running tests from root project dir atm
 
122
                self.assertTrue(jails.read())             # opens fine
 
123
                self.assertTrue(jails.getOptions())       # reads fine
 
124
                comm_commands = jails.convert()
 
125
                # by default None of the jails is enabled and we get no
 
126
                # commands to communicate to the server
 
127
                self.assertEqual(comm_commands, [])
 
128
 
 
129
        def testReadStockJailConfForceEnabled(self):
 
130
                # more of a smoke test to make sure that no obvious surprises
 
131
                # on users' systems when enabling shipped jails
 
132
                jails = JailsReader(basedir='config', force_enable=True) # we are running tests from root project dir atm
 
133
                self.assertTrue(jails.read())             # opens fine
 
134
                self.assertTrue(jails.getOptions())       # reads fine
 
135
                comm_commands = jails.convert()
 
136
 
 
137
                # by default we have lots of jails ;)
 
138
                self.assertTrue(len(comm_commands))
 
139
 
 
140
                # and we know even some of them by heart
 
141
                for j in ['ssh-iptables', 'recidive']:
 
142
                        # by default we have 'auto' backend ATM
 
143
                        self.assertTrue(['add', j, 'auto'] in comm_commands)
 
144
                        # and warn on useDNS
 
145
                        self.assertTrue(['set', j, 'usedns', 'warn'] in comm_commands)
 
146
                        self.assertTrue(['start', j] in comm_commands)
 
147
                # last commands should be the 'start' commands
 
148
                self.assertEqual(comm_commands[-1][0], 'start')
 
149
                # TODO: make sure that all of the jails have actions assigned,
 
150
                #       otherwise it makes little to no sense
 
151
 
 
152
        def testConfigurator(self):
 
153
                configurator = Configurator()
 
154
                configurator.setBaseDir('config')
 
155
                self.assertEqual(configurator.getBaseDir(), 'config')
 
156
 
 
157
                configurator.readEarly()
 
158
                opts = configurator.getEarlyOptions()
 
159
                # our current default settings
 
160
                self.assertEqual(opts['socket'], '/var/run/fail2ban/fail2ban.sock')
 
161
                self.assertEqual(opts['pidfile'], '/var/run/fail2ban/fail2ban.pid')
 
162
 
 
163
                configurator.getOptions()
 
164
                configurator.convertToProtocol()
 
165
                commands = configurator.getConfigStream()
 
166
                # and there is logging information left to be passed into the
 
167
                # server
 
168
                self.assertEqual(commands,
 
169
                                                 [['set', 'loglevel', 3],
 
170
                                                  ['set', 'logtarget', '/var/log/fail2ban.log']])
 
171
 
 
172
                # and if we force change configurator's fail2ban's baseDir
 
173
                # there should be an error message (test visually ;) --
 
174
                # otherwise just a code smoke test)
 
175
                configurator._Configurator__jails.setBaseDir('/tmp')
 
176
                self.assertEqual(configurator._Configurator__jails.getBaseDir(), '/tmp')
 
177
                self.assertEqual(configurator.getBaseDir(), 'config')