~ubuntu-branches/ubuntu/utopic/ufw/utopic

« back to all changes in this revision

Viewing changes to tests/unit/test_frontend.py

  • Committer: Package Import Robot
  • Author(s): Jamie Strandboge
  • Date: 2014-02-20 09:23:54 UTC
  • mfrom: (30.1.16)
  • Revision ID: package-import@ubuntu.com-20140220092354-bfbvpmrbgpxlxxha
Tags: 0.34~rc-0ubuntu1
* New upstream pre-release (LP: #1059060, #1065297, #1062521, #1101304,
  LP: #1075975, #1089262, #262421)
* Dropped the following patches now included upstream:
  - 0002-lp1044361.patch
  - 0003-fix-typeerror-on-error.patch
  - 0004-lp1039729.patch
  - 0005-lp1191197.patch
* Remaining changes:
  - 0001-optimize-boot.patch: only read in /etc/ufw/ufw.conf when disabled
* debian/before[6].rules.md5sum: adjusted for new release
* debian/control: update Standards-Version to 3.9.5
* debian/rules:
  - only ship /usr/share/ufw/iptables/*rules and not /usr/share/ufw/
  - *.init files should also be config files
* debian/ufw.links: added to makes symlinks from /usr/share/ufw/iptables/*
  to /usr/share/ufw/ (so ucf is happy on upgrades)
* debian/ufw.postinst:
  - use TEMPLATE_PATH/iptables/*rules instead of TEMPLATE_PATH/*rules (not
    strictly required since we are using dh_link, but makes the intent
    clearer)
  - copy /usr/share/ufw/*.init in to /etc/ufw

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# Copyright 2012-2013 Canonical Ltd.
 
3
#
 
4
# This program is free software: you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License version 3,
 
6
# as published by the Free Software Foundation.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
15
#
 
16
 
 
17
import unittest
 
18
import os
 
19
 
 
20
try: # python 2
 
21
    from StringIO import StringIO
 
22
except ImportError:
 
23
    from io import StringIO
 
24
 
 
25
import tests.unit.support
 
26
import ufw.common
 
27
import ufw.frontend
 
28
import ufw.util
 
29
 
 
30
class FrontendTestCase(unittest.TestCase):
 
31
    def setUp(self):
 
32
        ufw.common.do_checks = False
 
33
        iptables_dir = ""
 
34
        for d in ["/sbin", "/bin",
 
35
                  "/usr/sbin", "/usr/bin",
 
36
                  "/usr/local/sbin", "/usr/local/bin"]:
 
37
            if os.path.exists(os.path.join(d, "iptables")):
 
38
                iptables_dir = d
 
39
                break
 
40
        self.assertTrue(iptables_dir != "")
 
41
        ufw.common.iptables_dir = iptables_dir
 
42
 
 
43
        # This needs to be before we set ufw.util.msg_output since
 
44
        # ufw.util.warn() is called in backend.py:init()
 
45
        self.ui = ufw.frontend.UFWFrontend(dryrun=True)
 
46
 
 
47
        # Capture stdout from msg() and write_to_file() so we can examine it
 
48
        self.saved_msg_output = ufw.util.msg_output
 
49
        self.msg_output = StringIO()
 
50
        ufw.util.msg_output = self.msg_output
 
51
 
 
52
    def tearDown(self):
 
53
        # Restore stdout
 
54
        if self.msg_output:
 
55
            ufw.util.msg_output = self.saved_msg_output
 
56
            self.msg_output.close()
 
57
            self.msg_output = None
 
58
 
 
59
        self.ui = None
 
60
 
 
61
    def test_parse_command(self):
 
62
        '''Test parse_command()'''
 
63
        # test_parser.py will handle command combinations exhaustively, let's
 
64
        # just use a representative set here
 
65
        cmds = [
 
66
                'enable',
 
67
                'disable',
 
68
                'reload',
 
69
                'default allow',
 
70
                'default deny',
 
71
                'default reject',
 
72
                'default allow incoming',
 
73
                'default deny outgoing',
 
74
                'logging on',
 
75
                'logging off',
 
76
                'logging medium',
 
77
                'reset',
 
78
                'status',
 
79
                'status numbered',
 
80
                'status verbose',
 
81
                'show raw',
 
82
                'show builtins',
 
83
                'show before-rules',
 
84
                'show user-rules',
 
85
                'show after-rules',
 
86
                'show logging-rules',
 
87
                'show listening',
 
88
                'show added',
 
89
                'delete 1',
 
90
                'delete reject 22',
 
91
                'insert 1 limit 22/tcp',
 
92
                'allow 53/udp',
 
93
                'deny http',
 
94
                'allow to any port 23 proto tcp',
 
95
                'deny from 192.168.0.1 to 192.168.0.2',
 
96
                'reject in on eth0',
 
97
                'allow to fe80::/16',
 
98
                'deny from any port 53 proto udp',
 
99
                'limit in on eth0 to 192.168.0.1 port 22 from 10.0.0.0/24 port 1024:65535 proto tcp',
 
100
                '--version',
 
101
                '--dry-run allow 22/tcp',
 
102
                '--dry-run app list',
 
103
                'app list',
 
104
                'app info Apache',
 
105
                'app default skip',
 
106
                'app update Apache',
 
107
               ]
 
108
        for c in cmds:
 
109
            #print(c)
 
110
            ufw.frontend.parse_command(['ufw'] + c.split())
 
111
 
 
112
    def test_parse_command_bad(self):
 
113
        '''Test parse_command_bad'''
 
114
        data = [
 
115
                 ('llow 12345', ValueError),
 
116
                 ('allo 12345', ValueError),
 
117
                 ('allow', ValueError),
 
118
               ]
 
119
        # for ufw.util.error() on python3
 
120
        ufw.util.msg_output = self.saved_msg_output
 
121
        for (c, expected) in data:
 
122
            tests.unit.support.check_for_exception(self, expected,
 
123
                    ufw.frontend.parse_command, ['ufw'] + c.split())
 
124
 
 
125
    def test___init__(self):
 
126
        '''Test __init__()'''
 
127
        tests.unit.support.check_for_exception(self, ufw.common.UFWError,
 
128
                ufw.frontend.UFWFrontend, True, 'nonexistent')
 
129
 
 
130
    def test_get_command_help(self):
 
131
        '''Test get_command_help()'''
 
132
        s = ufw.frontend.get_command_help()
 
133
        terms = ['enable',
 
134
                 'disable',
 
135
                 'default ARG',
 
136
                 'logging LEVEL',
 
137
                 'allow ARGS',
 
138
                 'deny ARGS',
 
139
                 'reject ARGS',
 
140
                 'limit ARGS',
 
141
                 'delete RULE|NUM',
 
142
                 'insert NUM RULE',
 
143
                 'reload',
 
144
                 'reset',
 
145
                 'status',
 
146
                 'status numbered',
 
147
                 'status verbose',
 
148
                 'show ARG',
 
149
                 'version',
 
150
                 'app list',
 
151
                 'app info PROFILE',
 
152
                 'app update PROFILE',
 
153
                 'app default ARG'
 
154
                ]
 
155
        for search in terms:
 
156
            self.assertTrue(search in s, "Could not find '%s' in:\n%s" % \
 
157
                            (search, s))
 
158
 
 
159
    def test_continue_under_ssh(self):
 
160
        '''Test continue_under_ssh()'''
 
161
        self.ui.continue_under_ssh()
 
162
 
 
163
    def test_do_action(self):
 
164
        '''Test do_action()'''
 
165
        cmds = [
 
166
                'enable',
 
167
                'disable',
 
168
                'enable',
 
169
                'reload',
 
170
                'default allow',
 
171
                'default deny',
 
172
                'default reject',
 
173
                'default allow incoming',
 
174
                'default deny outgoing',
 
175
                'logging on',
 
176
                'logging off',
 
177
                'logging medium',
 
178
                'reset',
 
179
                'status',
 
180
                'status numbered',
 
181
                'status verbose',
 
182
                'allow 43',
 
183
                'reject 22',
 
184
                'delete 1',
 
185
                'delete reject 22',
 
186
                'insert 1 limit 22/tcp',
 
187
                'allow 53/udp',
 
188
                'deny http',
 
189
                'allow to any port 23 proto tcp',
 
190
                'deny from 192.168.0.1 to 192.168.0.2',
 
191
                'reject in on eth0',
 
192
                'allow to fe80::/16',
 
193
                'deny from any port 53 proto udp',
 
194
                'limit in on eth0 to 192.168.0.1 port 22 from 10.0.0.0/24 port 1024:65535 proto tcp',
 
195
                'allow CIFS',
 
196
                'delete allow CIFS',
 
197
                'allow CIFS',
 
198
                'delete allow CifS',
 
199
                'allow to 192.168.0.1 app WWW',
 
200
                'delete allow to 192.168.0.1 app WWW',
 
201
                'allow to fe80::/16 app WWW',
 
202
                'delete allow to fe80::/16 app WWW',
 
203
                'allow from fe80::/16 app WWW',
 
204
                'delete allow from fe80::/16 app WWW',
 
205
                'allow from fe80::/16 app CIFS',
 
206
                'delete allow from fe80::/16 app CifS',
 
207
                'show listening',
 
208
                'show added',
 
209
                'show raw',
 
210
               ]
 
211
        for dryrun in [True, False]:
 
212
            ufw.util.msg_output = self.saved_msg_output
 
213
            ui = ufw.frontend.UFWFrontend(dryrun=dryrun)
 
214
            ufw.util.msg_output = self.msg_output
 
215
            for c in cmds:
 
216
                if not dryrun and c not in ['allow', 'deny', 'limit',
 
217
                                            'reject', 'delete', 'insert']:
 
218
                    continue
 
219
                try:
 
220
                    pr = ufw.frontend.parse_command(['ufw'] + c.split())
 
221
                    if 'rule' in pr.data:
 
222
                        res = ui.do_action(pr.action,
 
223
                                           pr.data['rule'],
 
224
                                           pr.data['iptype'],
 
225
                                           force=True)
 
226
                    else:
 
227
                        res = ui.do_action(pr.action, "", "", force=True)
 
228
                except Exception:
 
229
                    print("%s failed:" % c)
 
230
                    raise
 
231
                self.assertTrue(res != "", "Output is empty for '%s'" % c)
 
232
                cmd = c.split()[0]
 
233
                out = self.msg_output.getvalue()
 
234
                if cmd in ['allow', 'deny', 'limit', 'reject', 'delete',
 
235
                           'insert']:
 
236
                    for search in ['*filter', 'COMMIT']:
 
237
                        self.assertTrue(search in out, \
 
238
                                        "Could not find '%s' in:\n%s" % \
 
239
                                         (search, out))
 
240
                else:
 
241
                    search = "running ufw-init"
 
242
                    self.assertTrue(search in out, \
 
243
                                    "Could not find '%s' in:\n%s" % \
 
244
                                     (search, out))
 
245
 
 
246
        print ("TODO: verify output of rules in do_action()")
 
247
 
 
248
    def test_do_action_remove_bad_appname(self):
 
249
        '''Test do_action() remove bad appname'''
 
250
        c = 'delete allow to any app &^%$'
 
251
        pr = ufw.frontend.parse_command(['ufw'] + c.split())
 
252
        tests.unit.support.check_for_exception(self, ufw.common.UFWError,
 
253
                self.ui.do_action, pr.action, pr.data['rule'],
 
254
                                   pr.data['iptype'], True)
 
255
 
 
256
    def test_do_application_action(self):
 
257
        '''Test do_application_action()'''
 
258
        cmds = [
 
259
                'app list',
 
260
                'app info WWW',
 
261
                'app default skip',
 
262
                'app default deny',
 
263
                'app update WWW',
 
264
                'app update all',
 
265
                'app update --add-new CIFS',
 
266
               ]
 
267
        for c in cmds:
 
268
            try:
 
269
                pr = ufw.frontend.parse_command(['ufw'] + c.split())
 
270
                if 'type' in pr.data and pr.data['type'] == 'app':
 
271
                    res = self.ui.do_application_action(pr.action,
 
272
                                                        pr.data['name'])
 
273
                else:
 
274
                    res = self.ui.do_action(pr.action, "", "", force=True)
 
275
            except Exception:
 
276
                print("%s failed:" % c)
 
277
                raise
 
278
            # print(res)
 
279
            if c.startswith("app update"):
 
280
                self.assertTrue(res == "", "Output is not empty for '%s'" % c)
 
281
            elif c.startswith('app list'):
 
282
                for search in ['Available applications', 'AIM', 'WWW']:
 
283
                    self.assertTrue(search in res, \
 
284
                                    "Could not find '%s' in:\n%s" % \
 
285
                                     (search, res))
 
286
            elif c.startswith('app info'):
 
287
                for search in ['Title: Web Server', '80/tcp']:
 
288
                    self.assertTrue(search in res, \
 
289
                                    "Could not find '%s' in:\n%s" % \
 
290
                                     (search, res))
 
291
            elif c.startswith('app default'):
 
292
                p = c.split()[-1]
 
293
                search = "Default application policy changed to '%s'" % p
 
294
                self.assertTrue(search in res, \
 
295
                                "Could not find '%s' in:\n%s" % (search, res))
 
296
            else:
 
297
                self.assertTrue(res != "", "Output is empty for '%s'" % c)
 
298
 
 
299
        pr = ufw.frontend.parse_command(['ufw', 'app', 'update', '--add-new', 'all'])
 
300
        tests.unit.support.check_for_exception(self, ufw.common.UFWError,
 
301
                self.ui.do_application_action, pr.action, pr.data['name'])
 
302
 
 
303
    def test_get_show_raw(self):
 
304
        '''Test get_show_raw()'''
 
305
        res = self.ui.get_show_raw()
 
306
        search = "> Checking"
 
307
        self.assertTrue(search in res, \
 
308
                        "Could not find '%s' in:\n%s" % (search, res))
 
309
 
 
310
    def test_get_show_listening(self):
 
311
        '''Test get_show_listening()'''
 
312
        res = self.ui.get_show_listening()
 
313
        for search in ['tcp', 'udp']:
 
314
            # self.assertTrue(search in res, \
 
315
            #                 "Could not find '%s' in:\n%s" % (search, res))
 
316
            if search not in res:
 
317
                print("(TODO: fake-netstat) Could not find '%s' in:\n%s" % (search, res))
 
318
 
 
319
    def test_get_show_added(self):
 
320
        '''Test get_show_added()'''
 
321
        res = self.ui.get_show_added()
 
322
        search = "(None)"
 
323
        self.assertTrue(search in res, \
 
324
                        "Could not find '%s' in:\n%s" % (search, res))
 
325
 
 
326
        c = 'allow 12345'
 
327
        pr = ufw.frontend.parse_command(['ufw'] + c.split())
 
328
        self.ui.do_action(pr.action, pr.data['rule'], pr.data['iptype'],
 
329
                          force=True)
 
330
        res = self.ui.get_show_added()
 
331
        search = c
 
332
        self.assertTrue(search in res, \
 
333
                        "Could not find '%s' in:\n%s" % (search, res))
 
334
 
 
335
    def test_application_add(self):
 
336
        '''Test application_add()'''
 
337
        for i in ['accept', 'drop', 'reject']:
 
338
            self.ui.backend.defaults['default_application_policy'] = i
 
339
            res = self.ui.application_add('WWW')
 
340
            for search in ['Rules updated', 'Rules updated (v6)']:
 
341
                self.assertTrue(search in res, \
 
342
                                "Could not find '%s' in:\n%s" % (search, res))
 
343
        self.ui.backend.defaults['default_application_policy'] = 'bad'
 
344
        tests.unit.support.check_for_exception(self, ufw.common.UFWError,
 
345
                self.ui.application_add, 'WWW')
 
346
        self.ui.backend.defaults['default_application_policy'] = 'skip'
 
347
 
 
348
 
 
349
def test_main(): # used by runner.py
 
350
    tests.unit.support.run_unittest(
 
351
            FrontendTestCase
 
352
    )
 
353
 
 
354
if __name__ == "__main__": # used when standalone
 
355
    unittest.main()