5
from functools import partial
7
from charmhelpers.core.hookenv import unit_get
8
from charmhelpers.fetch import apt_install
9
from charmhelpers.core.hookenv import (
16
apt_install('python-netifaces')
22
apt_install('python-netaddr')
26
def _validate_cidr(network):
28
netaddr.IPNetwork(network)
29
except (netaddr.core.AddrFormatError, ValueError):
30
raise ValueError("Network (%s) is not in CIDR presentation format" %
34
def no_ip_found_error_out(network):
35
errmsg = ("No IP address found in network: %s" % network)
36
raise ValueError(errmsg)
39
def get_address_in_network(network, fallback=None, fatal=False):
40
"""Get an IPv4 or IPv6 address within the network from the host.
42
:param network (str): CIDR presentation format. For example,
44
:param fallback (str): If no address is found, return fallback.
45
:param fatal (boolean): If no address is found, fallback is not
46
set and fatal is True then exit(1).
49
if fallback is not None:
53
no_ip_found_error_out(network)
57
_validate_cidr(network)
58
network = netaddr.IPNetwork(network)
59
for iface in netifaces.interfaces():
60
addresses = netifaces.ifaddresses(iface)
61
if network.version == 4 and netifaces.AF_INET in addresses:
62
addr = addresses[netifaces.AF_INET][0]['addr']
63
netmask = addresses[netifaces.AF_INET][0]['netmask']
64
cidr = netaddr.IPNetwork("%s/%s" % (addr, netmask))
68
if network.version == 6 and netifaces.AF_INET6 in addresses:
69
for addr in addresses[netifaces.AF_INET6]:
70
if not addr['addr'].startswith('fe80'):
71
cidr = netaddr.IPNetwork("%s/%s" % (addr['addr'],
76
if fallback is not None:
80
no_ip_found_error_out(network)
86
"""Determine whether provided address is IPv6 or not."""
88
address = netaddr.IPAddress(address)
89
except netaddr.AddrFormatError:
90
# probably a hostname - so not an address at all!
93
return address.version == 6
96
def is_address_in_network(network, address):
98
Determine whether the provided address is within a network range.
100
:param network (str): CIDR presentation format. For example,
102
:param address: An individual IPv4 or IPv6 address without a net
103
mask or subnet prefix. For example, '192.168.1.1'.
104
:returns boolean: Flag indicating whether address is in network.
107
network = netaddr.IPNetwork(network)
108
except (netaddr.core.AddrFormatError, ValueError):
109
raise ValueError("Network (%s) is not in CIDR presentation format" %
113
address = netaddr.IPAddress(address)
114
except (netaddr.core.AddrFormatError, ValueError):
115
raise ValueError("Address (%s) is not in correct presentation format" %
118
if address in network:
124
def _get_for_address(address, key):
125
"""Retrieve an attribute of or the physical interface that
126
the IP address provided could be bound to.
128
:param address (str): An individual IPv4 or IPv6 address without a net
129
mask or subnet prefix. For example, '192.168.1.1'.
130
:param key: 'iface' for the physical interface name or an attribute
131
of the configured interface, for example 'netmask'.
132
:returns str: Requested attribute or None if address is not bindable.
134
address = netaddr.IPAddress(address)
135
for iface in netifaces.interfaces():
136
addresses = netifaces.ifaddresses(iface)
137
if address.version == 4 and netifaces.AF_INET in addresses:
138
addr = addresses[netifaces.AF_INET][0]['addr']
139
netmask = addresses[netifaces.AF_INET][0]['netmask']
140
network = netaddr.IPNetwork("%s/%s" % (addr, netmask))
146
return addresses[netifaces.AF_INET][0][key]
148
if address.version == 6 and netifaces.AF_INET6 in addresses:
149
for addr in addresses[netifaces.AF_INET6]:
150
if not addr['addr'].startswith('fe80'):
151
network = netaddr.IPNetwork("%s/%s" % (addr['addr'],
157
elif key == 'netmask' and cidr:
158
return str(cidr).split('/')[1]
165
get_iface_for_address = partial(_get_for_address, key='iface')
168
get_netmask_for_address = partial(_get_for_address, key='netmask')
171
def format_ipv6_addr(address):
172
"""If address is IPv6, wrap it in '[]' otherwise return None.
174
This is required by most configuration files when specifying IPv6
178
return "[%s]" % address
183
def get_iface_addr(iface='eth0', inet_type='AF_INET', inc_aliases=False,
184
fatal=True, exc_list=None):
185
"""Return the assigned IP address for a given interface, if any."""
186
# Extract nic if passed /dev/ethX
188
iface = iface.split('/')[-1]
194
inet_num = getattr(netifaces, inet_type)
195
except AttributeError:
196
raise Exception("Unknown inet type '%s'" % str(inet_type))
198
interfaces = netifaces.interfaces()
201
for _iface in interfaces:
202
if iface == _iface or _iface.split(':')[0] == iface:
203
ifaces.append(_iface)
205
if fatal and not ifaces:
206
raise Exception("Invalid interface '%s'" % iface)
210
if iface not in interfaces:
212
raise Exception("Interface '%s' not found " % (iface))
220
for netiface in ifaces:
221
net_info = netifaces.ifaddresses(netiface)
222
if inet_num in net_info:
223
for entry in net_info[inet_num]:
224
if 'addr' in entry and entry['addr'] not in exc_list:
225
addresses.append(entry['addr'])
227
if fatal and not addresses:
228
raise Exception("Interface '%s' doesn't have any %s addresses." %
231
return sorted(addresses)
234
get_ipv4_addr = partial(get_iface_addr, inet_type='AF_INET')
237
def get_iface_from_addr(addr):
238
"""Work out on which interface the provided address is configured."""
239
for iface in netifaces.interfaces():
240
addresses = netifaces.ifaddresses(iface)
241
for inet_type in addresses:
242
for _addr in addresses[inet_type]:
243
_addr = _addr['addr']
245
ll_key = re.compile("(.+)%.*")
246
raw = re.match(ll_key, _addr)
251
log("Address '%s' is configured on iface '%s'" %
255
msg = "Unable to infer net iface on which '%s' is configured" % (addr)
260
"""Ensure decorated function is called with a value for iface.
262
If no iface provided, inject net iface inferred from unit private address.
264
def iface_sniffer(*args, **kwargs):
265
if not kwargs.get('iface', None):
266
kwargs['iface'] = get_iface_from_addr(unit_get('private-address'))
268
return f(*args, **kwargs)
274
def get_ipv6_addr(iface=None, inc_aliases=False, fatal=True, exc_list=None,
276
"""Get assigned IPv6 address for a given interface.
278
Returns list of addresses found. If no address found, returns empty list.
280
If iface is None, we infer the current primary interface by doing a reverse
281
lookup on the unit private-address.
283
We currently only support scope global IPv6 addresses i.e. non-temporary
284
addresses. If no global IPv6 address is found, return the first one found
285
in the ipv6 address list.
287
addresses = get_iface_addr(iface=iface, inet_type='AF_INET6',
288
inc_aliases=inc_aliases, fatal=fatal,
293
for addr in addresses:
294
key_scope_link_local = re.compile("^fe80::..(.+)%(.+)")
295
m = re.match(key_scope_link_local, addr)
297
eui_64_mac = m.group(1)
300
global_addrs.append(addr)
303
# Make sure any found global addresses are not temporary
304
cmd = ['ip', 'addr', 'show', iface]
305
out = subprocess.check_output(cmd).decode('UTF-8')
307
key = re.compile("inet6 (.+)/[0-9]+ scope global dynamic.*")
309
key = re.compile("inet6 (.+)/[0-9]+ scope global.*")
312
for line in out.split('\n'):
314
m = re.match(key, line)
315
if m and 'temporary' not in line:
316
# Return the first valid address we find
317
for addr in global_addrs:
318
if m.group(1) == addr:
319
if not dynamic_only or \
320
m.group(1).endswith(eui_64_mac):
327
raise Exception("Interface '%s' does not have a scope global "
328
"non-temporary ipv6 address." % iface)
333
def get_bridges(vnic_dir='/sys/devices/virtual/net'):
334
"""Return a list of bridges on the system."""
335
b_regex = "%s/*/bridge" % vnic_dir
336
return [x.replace(vnic_dir, '').split('/')[1] for x in glob.glob(b_regex)]
339
def get_bridge_nics(bridge, vnic_dir='/sys/devices/virtual/net'):
340
"""Return a list of nics comprising a given bridge on the system."""
341
brif_regex = "%s/%s/brif/*" % (vnic_dir, bridge)
342
return [x.split('/')[-1] for x in glob.glob(brif_regex)]
345
def is_bridge_member(nic):
346
"""Check if a given nic is a member of a bridge."""
347
for bridge in get_bridges():
348
if nic in get_bridge_nics(bridge):