1
# Copyright 2014-2015 Canonical Limited.
3
# This file is part of charm-helpers.
5
# charm-helpers is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Lesser General Public License version 3 as
7
# published by the Free Software Foundation.
9
# charm-helpers is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU Lesser General Public License for more details.
14
# You should have received a copy of the GNU Lesser General Public License
15
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
18
This module contains helpers to add and remove ufw rules.
22
- open SSH port for subnet 10.0.3.0/24:
24
>>> from charmhelpers.contrib.network import ufw
26
>>> ufw.grant_access(src='10.0.3.0/24', dst='any', port='22', proto='tcp')
28
- open service by name as defined in /etc/services:
30
>>> from charmhelpers.contrib.network import ufw
32
>>> ufw.service('ssh', 'open')
34
- close service by port number:
36
>>> from charmhelpers.contrib.network import ufw
38
>>> ufw.service('4949', 'close') # munin
43
from charmhelpers.core import hookenv
45
__author__ = "Felipe Reyes <felipe.reyes@canonical.com>"
48
class UFWError(Exception):
52
class UFWIPv6Error(UFWError):
58
Check if `ufw` is enabled
60
:returns: True if ufw is enabled
62
output = subprocess.check_output(['ufw', 'status'],
63
universal_newlines=True,
65
'PATH': os.environ['PATH']})
67
m = re.findall(r'^Status: active\n', output, re.M)
72
def is_ipv6_ok(soft_fail=False):
74
Check if IPv6 support is present and ip6tables functional
76
:param soft_fail: If set to True and IPv6 support is broken, then reports
77
that the host doesn't have IPv6 support, otherwise a
78
UFWIPv6Error exception is raised.
79
:returns: True if IPv6 is working, False otherwise
82
# do we have IPv6 in the machine?
83
if os.path.isdir('/proc/sys/net/ipv6'):
84
# is ip6tables kernel module loaded?
85
lsmod = subprocess.check_output(['lsmod'], universal_newlines=True)
86
matches = re.findall('^ip6_tables[ ]+', lsmod, re.M)
88
# ip6tables support isn't complete, let's try to load it
90
subprocess.check_output(['modprobe', 'ip6_tables'],
91
universal_newlines=True)
92
# great, we could load the module
94
except subprocess.CalledProcessError as ex:
95
hookenv.log("Couldn't load ip6_tables module: %s" % ex.output,
97
# we are in a world where ip6tables isn't working
99
# so we inform that the machine doesn't have IPv6
102
raise UFWIPv6Error("IPv6 firewall support broken")
104
# the module is present :)
108
# the system doesn't have IPv6
114
Disable ufw IPv6 support in /etc/default/ufw
116
exit_code = subprocess.call(['sed', '-i', 's/IPV6=.*/IPV6=no/g',
119
hookenv.log('IPv6 support in ufw disabled', level='INFO')
121
hookenv.log("Couldn't disable IPv6 support in ufw", level="ERROR")
122
raise UFWError("Couldn't disable IPv6 support in ufw")
125
def enable(soft_fail=False):
129
:param soft_fail: If set to True silently disables IPv6 support in ufw,
130
otherwise a UFWIPv6Error exception is raised when IP6
132
:returns: True if ufw is successfully enabled
137
if not is_ipv6_ok(soft_fail):
140
output = subprocess.check_output(['ufw', 'enable'],
141
universal_newlines=True,
142
env={'LANG': 'en_US',
143
'PATH': os.environ['PATH']})
145
m = re.findall('^Firewall is active and enabled on system startup\n',
147
hookenv.log(output, level='DEBUG')
150
hookenv.log("ufw couldn't be enabled", level='WARN')
153
hookenv.log("ufw enabled", level='INFO')
161
:returns: True if ufw is successfully disabled
166
output = subprocess.check_output(['ufw', 'disable'],
167
universal_newlines=True,
168
env={'LANG': 'en_US',
169
'PATH': os.environ['PATH']})
171
m = re.findall(r'^Firewall stopped and disabled on system startup\n',
173
hookenv.log(output, level='DEBUG')
176
hookenv.log("ufw couldn't be disabled", level='WARN')
179
hookenv.log("ufw disabled", level='INFO')
183
def modify_access(src, dst='any', port=None, proto=None, action='allow'):
185
Grant access to an address or subnet
187
:param src: address (e.g. 192.168.1.234) or subnet
188
(e.g. 192.168.1.0/24).
189
:param dst: destiny of the connection, if the machine has multiple IPs and
190
connections to only one of those have to accepted this is the
192
:param port: destiny port
193
:param proto: protocol (tcp or udp)
194
:param action: `allow` or `delete`
197
hookenv.log('ufw is disabled, skipping modify_access()', level='WARN')
200
if action == 'delete':
201
cmd = ['ufw', 'delete', 'allow']
203
cmd = ['ufw', action]
212
cmd += ['port', str(port)]
214
if proto is not None:
215
cmd += ['proto', proto]
217
hookenv.log('ufw {}: {}'.format(action, ' '.join(cmd)), level='DEBUG')
218
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
219
(stdout, stderr) = p.communicate()
221
hookenv.log(stdout, level='INFO')
223
if p.returncode != 0:
224
hookenv.log(stderr, level='ERROR')
225
hookenv.log('Error running: {}, exit code: {}'.format(' '.join(cmd),
230
def grant_access(src, dst='any', port=None, proto=None):
232
Grant access to an address or subnet
234
:param src: address (e.g. 192.168.1.234) or subnet
235
(e.g. 192.168.1.0/24).
236
:param dst: destiny of the connection, if the machine has multiple IPs and
237
connections to only one of those have to accepted this is the
239
:param port: destiny port
240
:param proto: protocol (tcp or udp)
242
return modify_access(src, dst=dst, port=port, proto=proto, action='allow')
245
def revoke_access(src, dst='any', port=None, proto=None):
247
Revoke access to an address or subnet
249
:param src: address (e.g. 192.168.1.234) or subnet
250
(e.g. 192.168.1.0/24).
251
:param dst: destiny of the connection, if the machine has multiple IPs and
252
connections to only one of those have to accepted this is the
254
:param port: destiny port
255
:param proto: protocol (tcp or udp)
257
return modify_access(src, dst=dst, port=port, proto=proto, action='delete')
260
def service(name, action):
262
Open/close access to a service
264
:param name: could be a service name defined in `/etc/services` or a port
266
:param action: `open` or `close`
269
subprocess.check_output(['ufw', 'allow', str(name)],
270
universal_newlines=True)
271
elif action == 'close':
272
subprocess.check_output(['ufw', 'delete', 'allow', str(name)],
273
universal_newlines=True)
275
raise UFWError(("'{}' not supported, use 'allow' "
276
"or 'delete'").format(action))