2
# -*- coding: utf-8 -*-
4
""" Network interface control tools for wicd.
6
This module implements functions to control and obtain information from
9
class BaseInterface() -- Control a network interface.
10
class BaseWiredInterface() -- Control a wired network interface.
11
class BaseWirelessInterface() -- Control a wireless network interface.
16
# Copyright (C) 2007 - 2009 Adam Blackburn
17
# Copyright (C) 2007 - 2009 Dan O'Reilly
18
# Copyright (C) 2007 - 2009 Byron Hillis
19
# Copyright (C) 2009 Andrew Psaltis
21
# This program is free software; you can redistribute it and/or modify
22
# it under the terms of the GNU General Public License Version 2 as
23
# published by the Free Software Foundation.
25
# This program is distributed in the hope that it will be useful,
26
# but WITHOUT ANY WARRANTY; without even the implied warranty of
27
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28
# GNU General Public License for more details.
30
# You should have received a copy of the GNU General Public License
31
# along with this program. If not, see <http://www.gnu.org/licenses/>.
38
from string import maketrans, translate
42
from misc import find_path
44
# Regular expressions.
45
_re_mode = (re.I | re.M | re.S)
46
essid_pattern = re.compile('.*ESSID:"?(.*?)"?\s*\n', _re_mode)
47
ap_mac_pattern = re.compile('.*Address: (.*?)\n', _re_mode)
48
channel_pattern = re.compile('.*Channel:?=? ?(\d\d?)', _re_mode)
49
strength_pattern = re.compile('.*Quality:?=? ?(\d+)\s*/?\s*(\d*)', _re_mode)
50
altstrength_pattern = re.compile('.*Signal level:?=? ?(\d+)\s*/?\s*(\d*)', _re_mode)
51
signaldbm_pattern = re.compile('.*Signal level:?=? ?(-\d\d*)', _re_mode)
52
bitrates_pattern = re.compile('(\d+\s+\S+/s)', _re_mode)
53
mode_pattern = re.compile('.*Mode:(.*?)\n', _re_mode)
54
freq_pattern = re.compile('.*Frequency:(.*?)\n', _re_mode)
55
wep_pattern = re.compile('.*Encryption key:(.*?)\n', _re_mode)
56
altwpa_pattern = re.compile('(wpa_ie)', _re_mode)
57
wpa1_pattern = re.compile('(WPA Version 1)', _re_mode)
58
wpa2_pattern = re.compile('(WPA2)', _re_mode)
60
#iwconfig-only regular expressions.
61
ip_pattern = re.compile(r'inet [Aa]d?dr[^.]*:([^.]*\.[^.]*\.[^.]*\.[0-9]*)', re.S)
62
bssid_pattern = re.compile('.*Access Point: (([0-9A-Z]{2}:){5}[0-9A-Z]{2})', _re_mode)
63
bitrate_pattern = re.compile('.*Bit Rate[=:](.*?/s)', _re_mode)
64
opmode_pattern = re.compile('.*Mode:(.*?) ', _re_mode)
65
authmethods_pattern = re.compile('.*Authentication capabilities :\n(.*?)Current', _re_mode)
67
# Regular expressions for wpa_cli output
68
auth_pattern = re.compile('.*wpa_state=(.*?)\n', _re_mode)
70
RALINK_DRIVER = 'ralink legacy'
72
blacklist_strict = '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~ '
73
blacklist_norm = ";`$!*|><&\\"
74
blank_trans = maketrans("", "")
76
def _sanitize_string(string):
78
return translate(str(string), blank_trans, blacklist_norm)
82
def _sanitize_string_strict(string):
84
return translate(str(string), blank_trans, blacklist_strict)
89
def timedcache(duration=5):
90
""" A caching decorator for use with wnettools methods.
92
Caches the results of a function for a given number of
93
seconds (defaults to 5).
97
def __timedcache(self, *args, **kwargs):
98
key = str(args) + str(kwargs) + str(f)
99
if hasattr(self, 'iface'):
101
if (key in _cache and
102
(time.time() - _cache[key]['time']) < duration):
103
return _cache[key]['value']
105
value = f(self, *args, **kwargs)
106
_cache[key] = { 'time' : time.time(), 'value' : value }
113
def GetDefaultGateway():
114
""" Attempts to determine the default gateway by parsing route -n. """
115
route_info = misc.Run("route -n")
116
lines = route_info.split('\n')
123
if words[0] == '0.0.0.0':
128
print 'couldn\'t retrieve default gateway from route -n'
131
def GetWirelessInterfaces():
132
""" Get available wireless interfaces.
134
Attempts to get an interface first by parsing /proc/net/wireless,
135
and should that fail, by parsing iwconfig.
137
The first interface available.
140
dev_dir = '/sys/class/net/'
141
ifnames = [iface for iface in os.listdir(dev_dir)
142
if os.path.isdir(dev_dir + iface) and
143
'wireless' in os.listdir(dev_dir + iface)]
147
def GetWiredInterfaces():
148
""" Returns a list of wired interfaces on the system. """
149
basedir = '/sys/class/net/'
150
return [iface for iface in os.listdir(basedir)
151
if os.path.isdir(basedir + iface) and not 'wireless'
152
in os.listdir(basedir + iface) and
153
open(basedir + iface + "/type").readlines()[0].strip() == "1"]
155
def NeedsExternalCalls():
156
""" Returns True if the backend needs to use an external program. """
157
raise NotImplementedError
159
def GetWpaSupplicantDrivers():
160
""" Returns a list of all valid wpa_supplicant drivers. """
161
output = misc.Run(["wpa_supplicant", "-h"])
163
output = output.split("drivers:")[1].split("options:")[0].strip()
165
print "Warning: Couldn't get list of valid wpa_supplicant drivers"
167
patt = re.compile("(\S+)\s+=.*")
168
drivers = patt.findall(output) or [""]
169
# We cannot use the "wired" driver for wireless interfaces.
170
if 'wired' in drivers:
171
drivers.remove('wired')
173
def IsValidWpaSuppDriver(driver):
174
""" Returns True if given string is a valid wpa_supplicant driver. """
175
output = misc.Run(["wpa_supplicant", "-D%s" % driver, "-iolan19",
176
"-c/etc/abcd%sdefzz.zconfz" % random.randint(1, 1000)])
177
return not "Unsupported driver" in output
179
def neediface(default_response):
180
""" A decorator for only running a method if self.iface is defined.
182
This decorator is wrapped around Interface methods, and will
183
return a provided default_response value if self.iface is not
188
def newfunc(self, *args, **kwargs):
189
if not self.iface or \
190
not os.path.exists('/sys/class/net/%s' % self.iface):
191
return default_response
192
return func(self, *args, **kwargs)
193
newfunc.__dict__ = func.__dict__
194
newfunc.__doc__ = func.__doc__
195
newfunc.__module__ = func.__module__
200
class BaseInterface(object):
201
""" Control a network interface. """
202
def __init__(self, iface, verbose=False):
203
""" Initialise the object.
206
iface -- the name of the interface
207
verbose -- whether to print every command run
210
self.iface = _sanitize_string_strict(iface)
211
self.verbose = verbose
212
self.DHCP_CLIENT = None
213
self.flush_tool = None
214
self.link_detect = None
215
self.dhcp_object = None
217
def SetDebugMode(self, value):
218
""" If True, verbose output is enabled. """
221
def SetInterface(self, iface):
222
""" Sets the interface.
225
iface -- the name of the interface.
228
self.iface = _sanitize_string_strict(str(iface))
230
def _find_program_path(self, program):
231
""" Determines the full path for the given program.
233
Searches for a given program name on the PATH.
236
program -- The name of the program to search for
239
The full path of the program or None
242
path = find_path(program)
243
if not path and self.verbose:
244
print "WARNING: No path found for %s" % program
248
def _get_dhcp_command(self, flavor=None, hostname=None):
249
""" Returns the correct DHCP client command.
251
Given a type of DHCP request (create or release a lease),
252
this method will build a command to complete the request
253
using the correct dhcp client, and cli options.
256
def get_client_name(cl):
257
""" Converts the integer value for a dhcp client to a string. """
258
if self.dhcpcd_cmd and cl in [misc.DHCPCD, misc.AUTO]:
260
cmd = self.dhcpcd_cmd
261
elif self.pump_cmd and cl in [misc.PUMP, misc.AUTO]:
264
elif self.dhclient_cmd and cl in [misc.DHCLIENT, misc.AUTO]:
266
cmd = self.dhclient_cmd
267
if self.dhclient_needs_verbose:
269
elif self.udhcpc_cmd and cl in [misc.UDHCPC, misc.AUTO]:
271
cmd = self.udhcpc_cmd
277
# probably /var/lib/wicd/dhclient.conf with defaults
278
dhclient_conf_path = os.path.join(
285
{'connect' : r"%(cmd)s -cf %(dhclientconf)s %(iface)s",
286
'release' : r"%(cmd)s -r %(iface)s",
287
'id' : misc.DHCLIENT,
290
{ 'connect' : r"%(cmd)s -i %(iface)s -h %(hostname)s",
291
'release' : r"%(cmd)s -r -i %(iface)s",
295
{'connect' : r"%(cmd)s %(iface)s -h %(hostname)s ",
296
'release' : r"%(cmd)s -k %(iface)s",
300
{'connect' : r"%(cmd)s -n -i %(iface)s -H %(hostname)s ",
301
'release' : r"killall -SIGUSR2 %(cmd)s",
305
(client_name, cmd) = get_client_name(self.DHCP_CLIENT)
307
# cause dhclient doesn't have a handy dandy argument
308
# for specifing the hostname to be sent
309
if client_name == "dhclient" and flavor:
311
# <hostname> will use the system hostname
312
# we'll use that if there is hostname passed
313
# that shouldn't happen, though
314
hostname = '<hostname>'
315
print 'attempting to set hostname with dhclient'
316
print 'using dhcpcd or another supported client may work better'
317
dhclient_template = \
318
open(os.path.join(wpath.etc, 'dhclient.conf.template'), 'r')
320
output_conf = open(dhclient_conf_path, 'w')
322
for line in dhclient_template.readlines():
323
line = line.replace('$_HOSTNAME', hostname)
324
output_conf.write(line)
327
dhclient_template.close()
329
if not client_name or not cmd:
330
print "WARNING: Failed to find a valid dhcp client!"
333
if flavor == "connect":
335
hostname = os.uname()[1]
336
return client_dict[client_name]['connect'] % \
338
"iface" : self.iface,
339
"hostname" : hostname,
340
'dhclientconf' : dhclient_conf_path }
341
elif flavor == "release":
342
return client_dict[client_name]['release'] % {"cmd":cmd, "iface":self.iface}
344
return client_dict[client_name]['id']
346
def AppAvailable(self, app):
347
""" Return whether a given app is available.
349
Given the name of an executable, determines if it is
350
available for use by checking for a defined 'app'_cmd
354
return bool(self.__dict__.get("%s_cmd" % app.replace("-", "")))
357
""" Check that all required tools are available. """
358
# THINGS TO CHECK FOR: ethtool, pptp-linux, dhclient, host
360
self.CheckWiredTools()
361
self.CheckWirelessTools()
362
self.CheckSudoApplications()
363
self.CheckRouteFlushTool()
364
self.CheckResolvConf()
366
def CheckResolvConf(self):
367
""" Checks for the existence of resolvconf."""
368
self.resolvconf_cmd = self._find_program_path("resolvconf")
371
""" Check for the existence of valid DHCP clients.
373
Checks for the existence of a supported DHCP client. If one is
374
found, the appropriate values for DHCP_CMD, DHCP_RELEASE, and
375
DHCP_CLIENT are set. If a supported client is not found, a
379
self.dhclient_cmd = self._find_program_path("dhclient")
380
if self.dhclient_cmd != None:
381
output = misc.Run(self.dhclient_cmd + " --version",
384
self.dhclient_needs_verbose = True
386
self.dhclient_needs_verbose = False
387
debian_dhcpcd_cmd = self._find_program_path('dhcpcd-bin')
388
if debian_dhcpcd_cmd:
389
self.dhcpcd_cmd = debian_dhcpcd_cmd
391
self.dhcpcd_cmd = self._find_program_path("dhcpcd")
392
self.pump_cmd = self._find_program_path("pump")
393
self.udhcpc_cmd = self._find_program_path("udhcpc")
395
def CheckWiredTools(self):
396
""" Check for the existence of ethtool and mii-tool. """
397
self.miitool_cmd = self._find_program_path("mii-tool")
398
self.ethtool_cmd = self._find_program_path("ethtool")
400
def CheckWirelessTools(self):
401
""" Check for the existence of wpa_cli """
402
self.wpa_cli_cmd = self._find_program_path("wpa_cli")
403
if not self.wpa_cli_cmd:
404
print "wpa_cli not found. Authentication will not be validated."
406
def CheckRouteFlushTool(self):
407
""" Check for a route flush tool. """
408
self.ip_cmd = self._find_program_path("ip")
409
self.route_cmd = self._find_program_path("route")
411
def CheckSudoApplications(self):
412
self.gksudo_cmd = self._find_program_path("gksudo")
413
self.kdesu_cmd = self._find_program_path("kdesu")
414
self.ktsuss_cmd = self._find_program_path("ktsuss")
418
""" Bring the network interface up.
424
cmd = 'ifconfig ' + self.iface + ' up'
425
if self.verbose: print cmd
431
""" Take down the network interface.
437
cmd = 'ifconfig ' + self.iface + ' down'
438
if self.verbose: print cmd
444
def GetIfconfig(self):
445
""" Runs ifconfig and returns the output. """
446
cmd = "ifconfig %s" % self.iface
447
if self.verbose: print cmd
451
def SetAddress(self, ip=None, netmask=None, broadcast=None):
452
""" Set the IP addresses of an interface.
455
ip -- interface IP address in dotted quad form
456
netmask -- netmask address in dotted quad form
457
broadcast -- broadcast address in dotted quad form
460
for val in [ip, netmask, broadcast]:
463
if not misc.IsValidIP(val):
464
print 'WARNING: Invalid IP address found, aborting!'
467
cmd = ''.join(['ifconfig ', self.iface, ' '])
469
cmd = ''.join([cmd, ip, ' '])
471
cmd = ''.join([cmd, 'netmask ', netmask, ' '])
473
cmd = ''.join([cmd, 'broadcast ', broadcast, ' '])
474
if self.verbose: print cmd
477
def _parse_dhclient(self, pipe):
478
""" Parse the output of dhclient.
480
Parses the output of dhclient and returns the status of
481
the connection attempt.
484
pipe -- stdout pipe to the dhclient process.
487
'success' if succesful', an error code string otherwise.
490
dhclient_complete = False
491
dhclient_success = False
493
while not dhclient_complete:
494
line = pipe.readline()
495
if line == '': # Empty string means dhclient is done.
496
dhclient_complete = True
498
print misc.to_unicode(line.strip('\n'))
499
if line.startswith('bound'):
500
dhclient_success = True
501
dhclient_complete = True
503
return self._check_dhcp_result(dhclient_success)
505
def _parse_pump(self, pipe):
506
""" Determines if obtaining an IP using pump succeeded.
509
pipe -- stdout pipe to the pump process.
512
'success' if succesful, an error code string otherwise.
515
pump_complete = False
518
while not pump_complete:
519
line = pipe.readline()
522
elif line.strip().lower().startswith('Operation failed.'):
525
print misc.to_unicode(line)
527
return self._check_dhcp_result(pump_success)
529
def _parse_dhcpcd(self, pipe):
530
""" Determines if obtaining an IP using dhcpcd succeeded.
533
pipe -- stdout pipe to the dhcpcd process.
536
'success' if succesful, an error code string otherwise.
539
dhcpcd_complete = False
540
dhcpcd_success = True
542
while not dhcpcd_complete:
543
line = pipe.readline()
544
if "Error" in line or "timed out" in line:
545
dhcpcd_success = False
546
dhcpcd_complete = True
548
dhcpcd_complete = True
549
print misc.to_unicode(line)
551
return self._check_dhcp_result(dhcpcd_success)
553
def _parse_udhcpc(self, pipe):
554
""" Determines if obtaining an IP using udhcpc succeeded.
557
pipe -- stdout pipe to the dhcpcd process.
560
'success' if successful, an error code string otherwise.
563
udhcpc_complete = False
564
udhcpc_success = True
566
while not udhcpc_complete:
567
line = pipe.readline()
568
if line.endswith("failing."):
569
udhcpc_success = False
570
udhcpc_complete = True
572
udhcpc_complete = True
575
return self._check_dhcp_result(udhcpc_success)
577
def _check_dhcp_result(self, success):
578
""" Print and return the correct DHCP connection result.
581
success -- boolean specifying if DHCP was succesful.
584
'success' if success == True, 'dhcp_failed' otherwise.
588
print 'DHCP connection successful'
591
print 'DHCP connection failed'
595
def StartDHCP(self, hostname):
596
""" Start the DHCP client to obtain an IP address.
599
hostname -- the hostname to send to the DHCP server
602
A string representing the result of the DHCP command. See
603
_check_dhcp_result for the possible values.
606
cmd = self._get_dhcp_command('connect', hostname)
607
if self.verbose: print cmd
608
self.dhcp_object = misc.Run(cmd, include_stderr=True, return_obj=True)
609
pipe = self.dhcp_object.stdout
610
client_dict = { misc.DHCLIENT : self._parse_dhclient,
611
misc.DHCPCD : self._parse_dhcpcd,
612
misc.PUMP : self._parse_pump,
613
misc.UDHCPC : self._parse_udhcpc,
616
DHCP_CLIENT = self._get_dhcp_command()
617
if DHCP_CLIENT in client_dict:
618
ret = client_dict[DHCP_CLIENT](pipe)
620
print "ERROR: no dhcp client found"
625
def ReleaseDHCP(self):
626
""" Release the DHCP lease for this interface. """
627
cmd = self._get_dhcp_command("release")
628
if self.verbose: print cmd
632
def DelDefaultRoute(self):
633
""" Delete only the default route for a device. """
634
if self.ip_cmd and self.flush_tool in [misc.AUTO, misc.IP]:
635
cmd = '%s route del default dev %s' % (self.ip_cmd, self.iface)
636
elif self.route_cmd and self.flush_tool in [misc.AUTO, misc.ROUTE]:
637
cmd = '%s del default dev %s' % (self.route_cmd, self.iface)
639
print "No route manipulation command available!"
641
if self.verbose: print cmd
645
def SetDNS(self, dns1=None, dns2=None, dns3=None,
646
dns_dom=None, search_dom=None):
647
""" Set the DNS of the system to the specified DNS servers.
649
Opens up resolv.conf and writes in the nameservers.
652
dns1 -- IP address of DNS server 1
653
dns2 -- IP address of DNS server 2
654
dns3 -- IP address of DNS server 3
655
dns_dom -- DNS domain
656
search_dom -- DNS search domain
661
resolv_params += 'domain %s\n' % dns_dom
663
resolv_params += 'search %s\n' % search_dom
666
for dns in (dns1, dns2, dns3):
668
if misc.IsValidIP(dns):
670
print 'Setting DNS : ' + dns
671
valid_dns_list.append("nameserver %s\n" % dns)
673
print 'DNS IP %s is not a valid IP address, skipping' % dns
676
resolv_params += ''.join(valid_dns_list)
678
if self.resolvconf_cmd:
679
cmd = [self.resolvconf_cmd, '-a', self.iface]
680
if self.verbose: print cmd
681
p = misc.Run(cmd, include_stderr=True, return_obj=True)
682
p.communicate(input=resolv_params)
684
resolv = open("/etc/resolv.conf", "w")
685
resolv.write(resolv_params + "\n")
689
def FlushRoutes(self):
690
""" Flush network routes for this device. """
691
if self.ip_cmd and self.flush_tool in [misc.AUTO, misc.IP]:
692
cmds = ['%s route flush dev %s' % (self.ip_cmd, self.iface)]
693
elif self.route_cmd and self.flush_tool in [misc.AUTO, misc.ROUTE]:
694
cmds = ['%s del dev %s' % (self.route_cmd, self.iface)]
696
print "No flush command available!"
699
if self.verbose: print cmd
703
def SetDefaultRoute(self, gw):
704
""" Add a default route with the specified gateway.
707
gw -- gateway of the default route in dotted quad form
710
if not misc.IsValidIP(gw):
711
print 'WARNING: Invalid gateway found. Aborting!'
713
cmd = 'route add default gw %s dev %s' % (gw, self.iface)
714
if self.verbose: print cmd
718
def GetIP(self, ifconfig=""):
719
""" Get the IP address of the interface.
722
The IP address of the interface in dotted quad form.
726
output = self.GetIfconfig()
729
return misc.RunRegex(ip_pattern, output)
732
def VerifyAPAssociation(self, gateway):
733
""" Verify assocation with an access point.
735
Verifies that an access point can be contacted by
739
if "iputils" in misc.Run(["ping", "-V"]):
740
cmd = "ping -q -w 3 -c 1 %s" % gateway
742
# ping is from inetutils-ping (which doesn't support -w)
743
# or from some other package
745
# If there's no AP association, this will wait for
746
# timeout, while the above will wait (-w) 3 seconds at
748
cmd = "ping -q -c 1 %s" % gateway
749
if self.verbose: print cmd
750
return misc.LaunchAndWait(cmd)
753
def IsUp(self, ifconfig=None):
754
""" Determines if the interface is up.
757
True if the interface is up, False otherwise.
760
flags_file = '/sys/class/net/%s/flags' % self.iface
762
flags = open(flags_file, "r").read().strip()
764
print "Could not open %s, using ifconfig to determine status" % flags_file
765
return self._slow_is_up(ifconfig)
766
return bool(int(flags, 16) & 1)
769
def _slow_is_up(self, ifconfig=None):
770
""" Determine if an interface is up using ifconfig. """
772
output = self.GetIfconfig()
775
lines = output.split('\n')
778
for line in lines[1:4]:
779
if line.strip().startswith('UP'):
784
class BaseWiredInterface(BaseInterface):
785
""" Control a wired network interface. """
786
def __init__(self, iface, verbose=False):
787
""" Initialise the wired network interface class.
790
iface -- name of the interface
791
verbose -- print all commands
794
BaseInterface.__init__(self, iface, verbose)
797
def GetPluggedIn(self):
798
""" Get the current physical connection state.
800
The method will first attempt to use ethtool do determine
801
physical connection state. Should ethtool fail to run properly,
802
mii-tool will be used instead.
805
True if a link is detected, False otherwise.
808
# check for link using /sys/class/net/iface/carrier
809
# is usually more accurate
810
sys_device = '/sys/class/net/%s/' % self.iface
811
carrier_path = sys_device + 'carrier'
819
if self.IsUp() or tries > MAX_TRIES: break
821
if os.path.exists(carrier_path):
822
carrier = open(carrier_path, 'r')
824
link = carrier.read().strip()
830
except (IOError, ValueError, TypeError):
831
print 'Error checking link using /sys/class/net/%s/carrier' % self.iface
833
if self.ethtool_cmd and self.link_detect in [misc.ETHTOOL, misc.AUTO]:
834
return self._eth_get_plugged_in()
835
elif self.miitool_cmd and self.link_detect in [misc.MIITOOL, misc.AUTO]:
836
return self._mii_get_plugged_in()
838
print ('Error: No way of checking for a wired connection. Make ' +
839
'sure that either mii-tool or ethtool is installed.')
842
def _eth_get_plugged_in(self):
843
""" Use ethtool to determine the physical connection state.
846
True if a link is detected, False otherwise.
849
cmd = "%s %s" % (self.ethtool_cmd, self.iface)
851
print 'Wired Interface is down, putting it up'
854
if self.verbose: print cmd
855
tool_data = misc.Run(cmd, include_stderr=True)
856
if misc.RunRegex(re.compile('(Link detected: yes)', re.I | re.M | re.S),
862
def _mii_get_plugged_in(self):
863
""" Use mii-tool to determine the physical connection state.
866
True if a link is detected, False otherwise.
869
cmd = "%s %s" % (self.miitool_cmd, self.iface)
870
if self.verbose: print cmd
871
tool_data = misc.Run(cmd, include_stderr=True)
872
if misc.RunRegex(re.compile('(Invalid argument)', re.I | re.M | re.S),
873
tool_data) is not None:
874
print 'Wired Interface is down, putting it up'
877
if self.verbose: print cmd
878
tool_data = misc.Run(cmd, include_stderr=True)
880
if misc.RunRegex(re.compile('(link ok)', re.I | re.M | re.S),
881
tool_data) is not None:
887
class BaseWirelessInterface(BaseInterface):
888
""" Control a wireless network interface. """
889
def __init__(self, iface, verbose=False, wpa_driver='wext'):
890
""" Initialise the wireless network interface class.
893
iface -- name of the interface
894
verbose -- print all commands
897
BaseInterface.__init__(self, iface, verbose)
898
self.wpa_driver = wpa_driver
899
self.scan_iface = None
901
def SetWpaDriver(self, driver):
902
""" Sets the wpa_driver. """
903
self.wpa_driver = _sanitize_string(driver)
906
def SetEssid(self, essid):
907
""" Set the essid of the wireless interface.
910
essid -- essid to set the interface to
913
cmd = ['iwconfig', self.iface, 'essid', '--', str(essid)]
914
if self.verbose: print str(cmd)
918
def GetKillSwitchStatus(self):
919
""" Determines if the wireless killswitch is enabled.
922
True if the killswitch is enabled, False otherwise.
925
output = self.GetIwconfig()
927
killswitch_pattern = re.compile('.*radio off', re.I | re.M | re.S)
928
if killswitch_pattern.search(output):
937
def GetIwconfig(self):
938
""" Returns the output of iwconfig for this interface. """
939
cmd = "iwconfig " + self.iface
940
if self.verbose: print cmd
943
def _FreqToChannel(self, freq):
944
""" Translate the specified frequency to a channel.
946
Note: This function is simply a lookup dict and therefore the
947
freq argument must be in the dict to provide a valid channel.
950
freq -- string containing the specified frequency
953
The channel number, or None if not found.
957
freq_dict = {'2.412 GHz': 1, '2.417 GHz': 2, '2.422 GHz': 3,
958
'2.427 GHz': 4, '2.432 GHz': 5, '2.437 GHz': 6,
959
'2.442 GHz': 7, '2.447 GHz': 8, '2.452 GHz': 9,
960
'2.457 GHz': 10, '2.462 GHz': 11, '2.467 GHz': 12,
961
'2.472 GHz': 13, '2.484 GHz': 14 }
963
ret = freq_dict[freq]
965
print "Couldn't determine channel number for frequency: " + str(freq)
969
def _GetRalinkInfo(self):
970
""" Get a network info list used for ralink drivers
972
Calls iwpriv <wireless interface> get_site_survey, which
973
on some ralink cards will return encryption and signal
974
strength info for wireless networks in the area.
977
iwpriv = misc.Run('iwpriv ' + self.iface + ' get_site_survey')
980
lines = iwpriv.splitlines()[2:]
982
patt = re.compile("((?:[0-9A-Z]{2}:){5}[0-9A-Z]{2})")
986
info = filter(None, [x.strip() for x in info])
989
if re.match(patt, info[2].upper()):
990
bssid = info[2].upper()
992
elif re.match(patt, info[3].upper()):
993
bssid = info[3].upper()
996
print 'Invalid iwpriv line. Skipping it.'
998
ap['nettype'] = info[-1]
999
ap['strength'] = info[1]
1000
if info[4 + offset] == 'WEP':
1001
ap['encryption_method'] = 'WEP'
1002
ap['enctype'] = 'WEP'
1003
ap['keyname'] = 'Key1'
1004
ap['authmode'] = info[5 + offset]
1005
elif info[5 + offset] in ['WPA-PSK', 'WPA']:
1006
ap['encryption_method'] = 'WPA'
1007
ap['authmode'] = "WPAPSK"
1008
ap['keyname'] = "WPAPSK"
1009
ap['enctype'] = info[4 + offset]
1010
elif info[5 + offset] == 'WPA2-PSK':
1011
ap['encryption_method'] = 'WPA2'
1012
ap['authmode'] ="WPA2PSK"
1013
ap['keyname'] = "WPA2PSK"
1014
ap['enctype'] = info[4 + offset]
1015
elif info[4 + offset] == "NONE":
1016
ap['encryption_method'] = None
1018
print "Unknown AuthMode, can't assign encryption_method!"
1019
ap['encryption_method'] = 'Unknown'
1021
if self.verbose: print str(aps)
1024
def _ParseRalinkAccessPoint(self, ap, ralink_info, cell):
1025
""" Parse encryption and signal strength info for ralink cards
1028
ap -- array containing info about the current access point
1029
ralink_info -- dict containing available network info
1030
cell -- string containing cell information
1033
Updated array containing info about the current access point
1036
wep_pattern = re.compile('.*Encryption key:(.*?)\n', re.I | re.M | re.S)
1037
if ralink_info.has_key(ap['bssid']):
1038
info = ralink_info[ap['bssid']]
1039
for key in info.keys():
1041
if misc.RunRegex(wep_pattern, cell) == 'on':
1042
ap['encryption'] = True
1044
ap['encryption'] = False
1048
def SetMode(self, mode):
1049
""" Set the mode of the wireless interface.
1052
mode -- mode to set the interface to
1055
mode = _sanitize_string_strict(mode)
1056
if mode.lower() == 'master':
1058
cmd = 'iwconfig %s mode %s' % (self.iface, mode)
1059
if self.verbose: print cmd
1063
def SetChannel(self, channel):
1064
""" Set the channel of the wireless interface.
1067
channel -- channel to set the interface to
1070
if not channel.isdigit():
1071
print 'WARNING: Invalid channel found. Aborting!'
1074
cmd = 'iwconfig %s channel %s' % (self.iface, str(channel))
1075
if self.verbose: print cmd
1079
def SetKey(self, key):
1080
""" Set the encryption key of the wireless interface.
1083
key -- encryption key to set
1086
cmd = 'iwconfig %s key %s' % (self.iface, key)
1087
if self.verbose: print cmd
1091
def Associate(self, essid, channel=None, bssid=None):
1092
""" Associate with the specified wireless network.
1095
essid -- essid of the network
1096
channel -- channel of the network
1097
bssid -- bssid of the network
1100
cmd = ['iwconfig', self.iface, 'essid', essid]
1101
if self.verbose: print str(cmd)
1103
base = "iwconfig %s" % self.iface
1104
if channel and str(channel).isdigit():
1105
cmd = "%s channel %s" % (base, str(channel))
1106
if self.verbose: print cmd
1109
cmd = "%s ap %s" % (base, bssid)
1110
if self.verbose: print cmd
1113
def GeneratePSK(self, network):
1114
""" Generate a PSK using wpa_passphrase.
1117
network -- dictionary containing network info
1120
wpa_pass_path = misc.find_path('wpa_passphrase')
1121
if not wpa_pass_path: return None
1122
key_pattern = re.compile('network={.*?\spsk=(.*?)\n}.*',
1124
cmd = [wpa_pass_path, str(network['essid']), str(network['key'])]
1125
if self.verbose: print cmd
1126
return misc.RunRegex(key_pattern, misc.Run(cmd))
1129
def Authenticate(self, network):
1130
""" Authenticate with the specified wireless network.
1133
network -- dictionary containing network info
1136
misc.ParseEncryption(network)
1137
if self.wpa_driver == RALINK_DRIVER:
1138
self._AuthenticateRalinkLegacy(network)
1140
cmd = ['wpa_supplicant', '-B', '-i', self.iface, '-c',
1141
os.path.join(wpath.networks,
1142
network['bssid'].replace(':', '').lower()),
1143
'-D', self.wpa_driver]
1144
if self.verbose: print cmd
1147
def _AuthenticateRalinkLegacy(self, network):
1148
""" Authenticate with the specified wireless network.
1150
This function handles Ralink legacy cards that cannot use
1154
network -- dictionary containing network info
1157
if network.get('key') != None:
1159
info = self._GetRalinkInfo()[network.get('bssid')]
1161
print "Could not find current network in iwpriv " + \
1162
"get_site_survey results. Cannot authenticate."
1165
if info['enctype'] == "WEP" and info['authtype'] == 'OPEN':
1166
print 'Setting up WEP'
1167
cmd = ''.join(['iwconfig ', self.iface, ' key ',
1168
network.get('key')])
1169
if self.verbose: print cmd
1173
cmd_list.append('NetworkType=' + info['nettype'])
1174
cmd_list.append('AuthMode=' + info['authmode'])
1175
cmd_list.append('EncrypType=' + info['enctype'])
1176
cmd_list.append('SSID="%s"' % network['essid'])
1177
cmd_list.append('%s="%s"' % (network['keyname'], network['key']))
1178
if info['nettype'] == 'SHARED' and info['enctype'] == 'WEP':
1179
cmd_list.append('DefaultKeyID=1')
1181
for cmd in cmd_list:
1182
cmd = ['iwpriv', self.iface, 'set', cmd]
1183
if self.verbose: print ' '.join(cmd)
1187
def GetNetworks(self):
1188
""" Get a list of available wireless networks.
1191
A list containing available wireless networks.
1194
cmd = 'iwlist ' + self.iface + ' scan'
1195
if self.verbose: print cmd
1196
results = misc.Run(cmd)
1197
# Split the networks apart, using Cell as our split point
1198
# this way we can look at only one network at a time.
1199
# The spaces around ' Cell ' are to minimize the chance that someone
1200
# has an essid named Cell...
1201
networks = results.split( ' Cell ' )
1203
# Get available network info from iwpriv get_site_survey
1204
# if we're using a ralink card (needed to get encryption info)
1205
if self.wpa_driver == RALINK_DRIVER:
1206
ralink_info = self._GetRalinkInfo()
1210
# An array for the access points
1213
for cell in networks:
1214
# Only use sections where there is an ESSID.
1215
if 'ESSID:' in cell:
1216
# Add this network to the list of networks
1217
entry = self._ParseAccessPoint(cell, ralink_info)
1218
if entry is not None:
1219
# Normally we only get duplicate bssids with hidden
1220
# networks. If we hit this, we only want the entry
1221
# with the real essid to be in the network list.
1222
if (entry['bssid'] not in access_points
1223
or not entry['hidden']):
1224
access_points[entry['bssid']] = entry
1226
return access_points.values()
1228
def _ParseAccessPoint(self, cell, ralink_info):
1229
""" Parse a single cell from the output of iwlist.
1232
cell -- string containing the cell information
1233
ralink_info -- string contating network information needed
1237
A dictionary containing the cell networks properties.
1241
ap['essid'] = misc.RunRegex(essid_pattern, cell)
1243
ap['essid'] = misc.to_unicode(ap['essid'])
1244
except (UnicodeDecodeError, UnicodeEncodeError):
1245
print 'Unicode problem with current network essid, ignoring!!'
1247
if ap['essid'] in ['Hidden', '<hidden>', "", None]:
1250
ap['essid'] = "<hidden>"
1252
ap['hidden'] = False
1254
# Channel - For cards that don't have a channel number,
1255
# convert the frequency.
1256
ap['channel'] = misc.RunRegex(channel_pattern, cell)
1257
if ap['channel'] == None:
1258
freq = misc.RunRegex(freq_pattern, cell)
1259
ap['channel'] = self._FreqToChannel(freq)
1262
ap['bitrates'] = misc.RunRegex(bitrates_pattern,
1263
cell.split("Bit Rates")[-1])
1266
ap['bssid'] = misc.RunRegex(ap_mac_pattern, cell)
1269
ap['mode'] = misc.RunRegex(mode_pattern, cell)
1271
# Break off here if we're using a ralink card
1272
if self.wpa_driver == RALINK_DRIVER:
1273
ap = self._ParseRalinkAccessPoint(ap, ralink_info, cell)
1274
elif misc.RunRegex(wep_pattern, cell) == 'on':
1275
# Encryption - Default to WEP
1276
ap['encryption'] = True
1277
ap['encryption_method'] = 'WEP'
1279
if misc.RunRegex(wpa1_pattern, cell) == 'WPA Version 1':
1280
ap['encryption_method'] = 'WPA'
1282
if misc.RunRegex(altwpa_pattern, cell) == 'wpa_ie':
1283
ap['encryption_method'] = 'WPA'
1285
if misc.RunRegex(wpa2_pattern, cell) == 'WPA2':
1286
ap['encryption_method'] = 'WPA2'
1288
ap['encryption'] = False
1291
# Set strength to -1 if the quality is not found
1292
ap['quality'] = self._get_link_quality(cell)
1293
if ap['quality'] is None:
1296
# Signal Strength (only used if user doesn't want link
1297
# quality displayed or it isn't found)
1298
if misc.RunRegex(signaldbm_pattern, cell):
1299
ap['strength'] = misc.RunRegex(signaldbm_pattern, cell)
1300
elif self.wpa_driver != RALINK_DRIVER: # This is already set for ralink
1305
def ValidateAuthentication(self, auth_time):
1306
""" Validate WPA authentication.
1308
Validate that the wpa_supplicant authentication
1309
process was successful.
1311
NOTE: It's possible this could return False,
1312
though in reality wpa_supplicant just isn't
1316
auth_time -- The time at which authentication began.
1319
True if wpa_supplicant authenticated succesfully,
1323
# Right now there's no way to do this for these drivers
1324
if self.wpa_driver == RALINK_DRIVER or not self.wpa_cli_cmd:
1328
MAX_DISCONNECTED_TIME = 3
1329
disconnected_time = 0
1330
forced_rescan = False
1331
while (time.time() - auth_time) < MAX_TIME:
1332
cmd = '%s -i %s status' % (self.wpa_cli_cmd, self.iface)
1333
output = misc.Run(cmd)
1334
result = misc.RunRegex(auth_pattern, output)
1336
print 'WPA_CLI RESULT IS', result
1340
if result == "COMPLETED":
1342
elif result == "DISCONNECTED" and not forced_rescan:
1343
disconnected_time += 1
1344
if disconnected_time > MAX_DISCONNECTED_TIME:
1345
disconnected_time = 0
1346
# Force a rescan to get wpa_supplicant moving again.
1347
forced_rescan = True
1348
self._ForceSupplicantScan()
1351
disconnected_time = 0
1354
print 'wpa_supplicant authentication may have failed.'
1358
def _ForceSupplicantScan(self):
1359
""" Force wpa_supplicant to rescan available networks.
1361
This function forces wpa_supplicant to rescan.
1362
This works around authentication validation sometimes failing for
1363
wpa_supplicant because it remains in a DISCONNECTED state for
1364
quite a while, after which a rescan is required, and then
1365
attempting to authenticate. This whole process takes a long
1366
time, so we manually speed it up if we see it happening.
1369
print 'wpa_supplicant rescan forced...'
1370
cmd = 'wpa_cli -i' + self.iface + ' scan'
1375
""" Terminates wpa using wpa_cli"""
1376
cmd = 'wpa_cli -i %s terminate' % self.iface
1377
if self.verbose: print cmd
1381
def GetBSSID(self, iwconfig=None):
1382
""" Get the MAC address for the interface. """
1384
output = self.GetIwconfig()
1388
bssid = misc.RunRegex(bssid_pattern, output)
1392
def GetCurrentBitrate(self, iwconfig=None):
1393
""" Get the current bitrate for the interface. """
1395
output = self.GetIwconfig()
1399
bitrate = misc.RunRegex(bitrate_pattern, output)
1403
def GetOperationalMode(self, iwconfig=None):
1404
""" Get the operational mode for the interface. """
1406
output = self.GetIwconfig()
1410
opmode = misc.RunRegex(opmode_pattern, output)
1412
opmode = opmode.strip()
1416
def GetAvailableAuthMethods(self, iwlistauth=None):
1417
""" Get the available authentication methods for the interface. """
1419
cmd = 'iwlist ' + self.iface + ' auth'
1420
if self.verbose: print cmd
1421
output = misc.Run(cmd)
1425
authm = misc.RunRegex(authmethods_pattern, output)
1426
authm_list = [m.strip() for m in authm.split('\n') if m.strip()]
1427
return ';'.join(authm_list)
1429
def _get_link_quality(self, output):
1430
""" Parse out the link quality from iwlist scan or iwconfig output. """
1432
[(strength, max_strength)] = strength_pattern.findall(output)
1434
(strength, max_strength) = (None, None)
1436
if strength in ['', None]:
1438
[(strength, max_strength)] = altstrength_pattern.findall(output)
1440
# if the pattern was unable to match anything
1441
# we'll return 101, which will allow us to stay
1442
# connected even though we don't know the strength
1443
# it also allows us to tell if
1445
if strength not in ['', None] and max_strength:
1446
return (100 * int(strength) // int(max_strength))
1447
elif strength not in ["", None]:
1448
return int(strength)
1453
def GetSignalStrength(self, iwconfig=None):
1454
""" Get the signal strength of the current network.
1457
The signal strength.
1461
output = self.GetIwconfig()
1464
return self._get_link_quality(output)
1467
def GetDBMStrength(self, iwconfig=None):
1468
""" Get the dBm signal strength of the current network.
1471
The dBm signal strength.
1475
output = self.GetIwconfig()
1478
signaldbm_pattern = re.compile('.*Signal level:?=? ?(-\d\d*)',
1480
dbm_strength = misc.RunRegex(signaldbm_pattern, output)
1484
def GetCurrentNetwork(self, iwconfig=None):
1485
""" Get the essid of the current network.
1488
The current network essid.
1492
output = self.GetIwconfig()
1495
network = misc.RunRegex(re.compile('.*ESSID:"(.*?)"',
1496
re.I | re.M | re.S), output)
1498
network = misc.to_unicode(network)