~jdstrand/ufw/trunk

156 by Jamie Strandboge
first pass at code refactoring and splitting out ufw into different modules
1
#
2
# util.py: utility functions for ufw
3
#
4
# Copyright (C) 2008 Canonical Ltd.
5
#
6
#    This program is free software: you can redistribute it and/or modify
7
#    it under the terms of the GNU General Public License version 3,
8
#    as published by the Free Software Foundation.
9
#
10
#    This program is distributed in the hope that it will be useful,
11
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
12
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
#    GNU General Public License for more details.
14
#
15
#    You should have received a copy of the GNU General Public License
16
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
#
18
19
import os
20
import re
21
import shutil
22
import socket
23
import subprocess
24
import sys
25
from tempfile import mkstemp
26
27
# only needed for UFWError
28
import ufw.common
29
30
debugging = False
31
32
def get_services_proto(port):
33
    '''Get the protocol for a specified port from /etc/services'''
34
    proto = ""
35
    try:
36
        socket.getservbyname(port)
37
    except:
38
        raise
39
40
    try:
41
        socket.getservbyname(port, "tcp")
42
        proto = "tcp"
43
    except:
44
        pass
45
46
    try:
47
        socket.getservbyname(port, "udp")
48
        if proto == "tcp":
49
            proto = "any"
50
        else:
51
            proto = "udp"
52
    except:
53
        pass
54
55
    return proto
56
57
def parse_port_proto(str):
58
    '''Parse port or port and protocol'''
59
    port = ""
60
    proto = ""
61
    tmp = str.split('/')
62
    if len(tmp) == 1:
63
        port = tmp[0]
64
        proto = "any"
65
    elif len(tmp) == 2:
66
        port = tmp[0]
67
        proto = tmp[1]
68
    else:
69
        err_msg = _("Bad port/protocol")
70
        raise common.UFWError(err_msg)
71
    return (port, proto)
72
73
74
def valid_address(addr, v6=False):
75
    '''Validate IP addresses'''
76
    if v6 and not socket.has_ipv6:
77
        warn_msg = _("python does not have IPv6 support.")
78
        warn(warn_msg)
79
        return False
80
81
    # quick and dirty test
82
    if len(addr) > 43 or not re.match(r'^[a-fA-F0-9:\./]+$', addr):
83
        return False
84
85
    net = addr.split('/')
86
87
    if len(net) > 2:
88
        return False
89
    elif len(net) == 2:
90
        # Check netmask specified via '/'
91
92
        if not re.match(r'^[0-9]+$', net[1]):
93
            # Only allow integer netmasks
94
            return False
95
96
        if v6:
97
            if int(net[1]) < 0 or int(net[1]) > 128:
98
                return False
99
        else:
100
            if int(net[1]) < 0 or int(net[1]) > 32:
101
                return False
102
103
    try:
104
        if v6:
105
            socket.inet_pton(socket.AF_INET6, net[0])
106
        else:
107
            socket.inet_pton(socket.AF_INET, net[0])
108
    except:
109
        return False
110
    
111
    return True
112
113
def open_file_read(f):
114
    '''Opens the specified file read-only'''
115
    try:
116
        orig = open(f, 'r')
117
    except OSError, e:
118
        err_msg = _("Couldn't open '%s' for reading") % (f)
119
        raise common.UFWError(err_msg)
120
    except Exception:
121
        raise
122
123
    return orig
124
125
126
def open_files(f):
127
    '''Opens the specified file read-only and a tempfile read-write.'''
128
    orig = open_file_read(f)
129
130
    try:
131
        (tmp, tmpname) = mkstemp()
132
    except Exception:
133
        orig.close()
134
        raise
135
136
    return { "orig": orig, "origname": f, "tmp": tmp, "tmpname": tmpname }
137
138
139
def close_files(fns, update = True):
140
    '''Closes the specified files (as returned by open_files), and update
141
       original file with the temporary file.
142
    '''
143
    fns['orig'].close()
144
    os.close(fns['tmp'])
145
146
    if update:
147
        try:
148
            shutil.copystat(fns['origname'], fns['tmpname'])
149
            shutil.copy(fns['tmpname'], fns['origname'])
150
        except Exception:
151
            raise
152
153
    try:
154
        os.unlink(fns['tmpname'])
155
    except OSError, e:
156
        raise
157
158
159
def cmd(command):
160
    '''Try to execute the given command.'''
161
    try:
162
        sp = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
163
    except OSError, e:
164
        return [127, str(e)]
165
166
    out = sp.communicate()[0]
167
    return [sp.returncode,out]
168
169
170
def cmd_pipe(command1, command2):
171
    '''Try to pipe command1 into command2.'''
172
    try:
173
        sp1 = subprocess.Popen(command1, stdout=subprocess.PIPE)
174
        sp2 = subprocess.Popen(command2, stdin=sp1.stdout)
175
    except OSError, e:
176
        return [127, str(e)]
177
178
    out = sp2.communicate()[0]
179
    return [sp2.returncode,out]
180
181
182
def error(msg):
183
    '''Print error message and exit'''
184
    print >> sys.stderr, _("ERROR: %s") % (msg)
185
    sys.exit(1)
186
187
188
def warn(msg):
189
    '''Print warning message'''
190
    print >> sys.stderr, _("WARN: %s") % (msg)
191
192
193
def debug(msg):
194
    '''Print debug message'''
195
    if debugging:
196
        print >> sys.stderr, _("DEBUG: %s") % (msg)
197