1
# Copyright 2014-2015 Canonical Limited.
3
# This file is part of charm-helpers.
5
# charm-helpers is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Lesser General Public License version 3 as
7
# published by the Free Software Foundation.
9
# charm-helpers is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU Lesser General Public License for more details.
14
# You should have received a copy of the GNU Lesser General Public License
15
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
23
from functools import partial
25
from charmhelpers.core.hookenv import unit_get
26
from charmhelpers.fetch import apt_install, apt_update
27
from charmhelpers.core.hookenv import (
35
apt_update(fatal=True)
36
apt_install('python-netifaces', fatal=True)
42
apt_update(fatal=True)
43
apt_install('python-netaddr', fatal=True)
47
def _validate_cidr(network):
49
netaddr.IPNetwork(network)
50
except (netaddr.core.AddrFormatError, ValueError):
51
raise ValueError("Network (%s) is not in CIDR presentation format" %
55
def no_ip_found_error_out(network):
56
errmsg = ("No IP address found in network(s): %s" % network)
57
raise ValueError(errmsg)
60
def get_address_in_network(network, fallback=None, fatal=False):
61
"""Get an IPv4 or IPv6 address within the network from the host.
63
:param network (str): CIDR presentation format. For example,
64
'192.168.1.0/24'. Supports multiple networks as a space-delimited list.
65
:param fallback (str): If no address is found, return fallback.
66
:param fatal (boolean): If no address is found, fallback is not
67
set and fatal is True then exit(1).
70
if fallback is not None:
74
no_ip_found_error_out(network)
78
networks = network.split() or [network]
79
for network in networks:
80
_validate_cidr(network)
81
network = netaddr.IPNetwork(network)
82
for iface in netifaces.interfaces():
83
addresses = netifaces.ifaddresses(iface)
84
if network.version == 4 and netifaces.AF_INET in addresses:
85
addr = addresses[netifaces.AF_INET][0]['addr']
86
netmask = addresses[netifaces.AF_INET][0]['netmask']
87
cidr = netaddr.IPNetwork("%s/%s" % (addr, netmask))
91
if network.version == 6 and netifaces.AF_INET6 in addresses:
92
for addr in addresses[netifaces.AF_INET6]:
93
if not addr['addr'].startswith('fe80'):
94
cidr = netaddr.IPNetwork("%s/%s" % (addr['addr'],
99
if fallback is not None:
103
no_ip_found_error_out(network)
108
def is_ipv6(address):
109
"""Determine whether provided address is IPv6 or not."""
111
address = netaddr.IPAddress(address)
112
except netaddr.AddrFormatError:
113
# probably a hostname - so not an address at all!
116
return address.version == 6
119
def is_address_in_network(network, address):
121
Determine whether the provided address is within a network range.
123
:param network (str): CIDR presentation format. For example,
125
:param address: An individual IPv4 or IPv6 address without a net
126
mask or subnet prefix. For example, '192.168.1.1'.
127
:returns boolean: Flag indicating whether address is in network.
130
network = netaddr.IPNetwork(network)
131
except (netaddr.core.AddrFormatError, ValueError):
132
raise ValueError("Network (%s) is not in CIDR presentation format" %
136
address = netaddr.IPAddress(address)
137
except (netaddr.core.AddrFormatError, ValueError):
138
raise ValueError("Address (%s) is not in correct presentation format" %
141
if address in network:
147
def _get_for_address(address, key):
148
"""Retrieve an attribute of or the physical interface that
149
the IP address provided could be bound to.
151
:param address (str): An individual IPv4 or IPv6 address without a net
152
mask or subnet prefix. For example, '192.168.1.1'.
153
:param key: 'iface' for the physical interface name or an attribute
154
of the configured interface, for example 'netmask'.
155
:returns str: Requested attribute or None if address is not bindable.
157
address = netaddr.IPAddress(address)
158
for iface in netifaces.interfaces():
159
addresses = netifaces.ifaddresses(iface)
160
if address.version == 4 and netifaces.AF_INET in addresses:
161
addr = addresses[netifaces.AF_INET][0]['addr']
162
netmask = addresses[netifaces.AF_INET][0]['netmask']
163
network = netaddr.IPNetwork("%s/%s" % (addr, netmask))
169
return addresses[netifaces.AF_INET][0][key]
171
if address.version == 6 and netifaces.AF_INET6 in addresses:
172
for addr in addresses[netifaces.AF_INET6]:
173
if not addr['addr'].startswith('fe80'):
174
network = netaddr.IPNetwork("%s/%s" % (addr['addr'],
180
elif key == 'netmask' and cidr:
181
return str(cidr).split('/')[1]
188
get_iface_for_address = partial(_get_for_address, key='iface')
191
get_netmask_for_address = partial(_get_for_address, key='netmask')
194
def format_ipv6_addr(address):
195
"""If address is IPv6, wrap it in '[]' otherwise return None.
197
This is required by most configuration files when specifying IPv6
201
return "[%s]" % address
206
def get_iface_addr(iface='eth0', inet_type='AF_INET', inc_aliases=False,
207
fatal=True, exc_list=None):
208
"""Return the assigned IP address for a given interface, if any."""
209
# Extract nic if passed /dev/ethX
211
iface = iface.split('/')[-1]
217
inet_num = getattr(netifaces, inet_type)
218
except AttributeError:
219
raise Exception("Unknown inet type '%s'" % str(inet_type))
221
interfaces = netifaces.interfaces()
224
for _iface in interfaces:
225
if iface == _iface or _iface.split(':')[0] == iface:
226
ifaces.append(_iface)
228
if fatal and not ifaces:
229
raise Exception("Invalid interface '%s'" % iface)
233
if iface not in interfaces:
235
raise Exception("Interface '%s' not found " % (iface))
243
for netiface in ifaces:
244
net_info = netifaces.ifaddresses(netiface)
245
if inet_num in net_info:
246
for entry in net_info[inet_num]:
247
if 'addr' in entry and entry['addr'] not in exc_list:
248
addresses.append(entry['addr'])
250
if fatal and not addresses:
251
raise Exception("Interface '%s' doesn't have any %s addresses." %
254
return sorted(addresses)
257
get_ipv4_addr = partial(get_iface_addr, inet_type='AF_INET')
260
def get_iface_from_addr(addr):
261
"""Work out on which interface the provided address is configured."""
262
for iface in netifaces.interfaces():
263
addresses = netifaces.ifaddresses(iface)
264
for inet_type in addresses:
265
for _addr in addresses[inet_type]:
266
_addr = _addr['addr']
268
ll_key = re.compile("(.+)%.*")
269
raw = re.match(ll_key, _addr)
274
log("Address '%s' is configured on iface '%s'" %
278
msg = "Unable to infer net iface on which '%s' is configured" % (addr)
283
"""Ensure decorated function is called with a value for iface.
285
If no iface provided, inject net iface inferred from unit private address.
287
def iface_sniffer(*args, **kwargs):
288
if not kwargs.get('iface', None):
289
kwargs['iface'] = get_iface_from_addr(unit_get('private-address'))
291
return f(*args, **kwargs)
297
def get_ipv6_addr(iface=None, inc_aliases=False, fatal=True, exc_list=None,
299
"""Get assigned IPv6 address for a given interface.
301
Returns list of addresses found. If no address found, returns empty list.
303
If iface is None, we infer the current primary interface by doing a reverse
304
lookup on the unit private-address.
306
We currently only support scope global IPv6 addresses i.e. non-temporary
307
addresses. If no global IPv6 address is found, return the first one found
308
in the ipv6 address list.
310
addresses = get_iface_addr(iface=iface, inet_type='AF_INET6',
311
inc_aliases=inc_aliases, fatal=fatal,
316
for addr in addresses:
317
key_scope_link_local = re.compile("^fe80::..(.+)%(.+)")
318
m = re.match(key_scope_link_local, addr)
320
eui_64_mac = m.group(1)
323
global_addrs.append(addr)
326
# Make sure any found global addresses are not temporary
327
cmd = ['ip', 'addr', 'show', iface]
328
out = subprocess.check_output(cmd).decode('UTF-8')
330
key = re.compile("inet6 (.+)/[0-9]+ scope global dynamic.*")
332
key = re.compile("inet6 (.+)/[0-9]+ scope global.*")
335
for line in out.split('\n'):
337
m = re.match(key, line)
338
if m and 'temporary' not in line:
339
# Return the first valid address we find
340
for addr in global_addrs:
341
if m.group(1) == addr:
342
if not dynamic_only or \
343
m.group(1).endswith(eui_64_mac):
350
raise Exception("Interface '%s' does not have a scope global "
351
"non-temporary ipv6 address." % iface)
356
def get_bridges(vnic_dir='/sys/devices/virtual/net'):
357
"""Return a list of bridges on the system."""
358
b_regex = "%s/*/bridge" % vnic_dir
359
return [x.replace(vnic_dir, '').split('/')[1] for x in glob.glob(b_regex)]
362
def get_bridge_nics(bridge, vnic_dir='/sys/devices/virtual/net'):
363
"""Return a list of nics comprising a given bridge on the system."""
364
brif_regex = "%s/%s/brif/*" % (vnic_dir, bridge)
365
return [x.split('/')[-1] for x in glob.glob(brif_regex)]
368
def is_bridge_member(nic):
369
"""Check if a given nic is a member of a bridge."""
370
for bridge in get_bridges():
371
if nic in get_bridge_nics(bridge):
379
Returns True if address is a valid IP address.
382
# Test to see if already an IPv4 address
383
socket.inet_aton(address)
389
def ns_query(address):
393
apt_install('python-dnspython')
396
if isinstance(address, dns.name.Name):
398
elif isinstance(address, six.string_types):
403
answers = dns.resolver.query(address, rtype)
405
return str(answers[0])
409
def get_host_ip(hostname, fallback=None):
411
Resolves the IP for a given hostname, or returns
412
the input if it is already an IP.
417
ip_addr = ns_query(hostname)
420
ip_addr = socket.gethostbyname(hostname)
422
log("Failed to resolve hostname '%s'" % (hostname),
428
def get_hostname(address, fqdn=True):
430
Resolves hostname for given IP, or returns the input
431
if it is already a hostname.
435
import dns.reversename
437
apt_install("python-dnspython")
438
import dns.reversename
440
rev = dns.reversename.from_address(address)
441
result = ns_query(rev)
445
result = socket.gethostbyaddr(address)[0]
453
if result.endswith('.'):
458
return result.split('.')[0]