2
# -*- coding: utf-8 -*-
4
""" Network interface control tools for wicd.
6
This module implements functions to control and obtain information from
9
class BaseInterface() -- Control a network interface.
10
class BaseWiredInterface() -- Control a wired network interface.
11
class BaseWirelessInterface() -- Control a wireless network interface.
16
# Copyright (C) 2007 - 2009 Adam Blackburn
17
# Copyright (C) 2007 - 2009 Dan O'Reilly
18
# Copyright (C) 2007 - 2009 Byron Hillis
19
# Copyright (C) 2009 Andrew Psaltis
21
# This program is free software; you can redistribute it and/or modify
22
# it under the terms of the GNU General Public License Version 2 as
23
# published by the Free Software Foundation.
25
# This program is distributed in the hope that it will be useful,
26
# but WITHOUT ANY WARRANTY; without even the implied warranty of
27
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28
# GNU General Public License for more details.
30
# You should have received a copy of the GNU General Public License
31
# along with this program. If not, see <http://www.gnu.org/licenses/>.
38
from string import maketrans, translate
42
from misc import find_path
44
# Regular expressions.
45
_re_mode = (re.I | re.M | re.S)
46
essid_pattern = re.compile('.*ESSID:"?(.*?)"?\s*\n', _re_mode)
47
ap_mac_pattern = re.compile('.*Address: (.*?)\n', _re_mode)
48
channel_pattern = re.compile('.*Channel:?=? ?(\d\d?)', _re_mode)
49
strength_pattern = re.compile('.*Quality:?=? ?(\d+)\s*/?\s*(\d*)', _re_mode)
50
altstrength_pattern = re.compile('.*Signal level:?=? ?(\d+)\s*/?\s*(\d*)', _re_mode)
51
signaldbm_pattern = re.compile('.*Signal level:?=? ?(-\d\d*)', _re_mode)
52
bitrates_pattern = re.compile('(\d+\s+\S+/s)', _re_mode)
53
mode_pattern = re.compile('.*Mode:(.*?)\n', _re_mode)
54
freq_pattern = re.compile('.*Frequency:(.*?)\n', _re_mode)
55
wep_pattern = re.compile('.*Encryption key:(.*?)\n', _re_mode)
56
altwpa_pattern = re.compile('(wpa_ie)', _re_mode)
57
wpa1_pattern = re.compile('(WPA Version 1)', _re_mode)
58
wpa2_pattern = re.compile('(WPA2)', _re_mode)
60
#iwconfig-only regular expressions.
61
ip_pattern = re.compile(r'inet [Aa]d?dr[^.]*:([^.]*\.[^.]*\.[^.]*\.[0-9]*)', re.S)
62
bssid_pattern = re.compile('.*Access Point: (([0-9A-Z]{2}:){5}[0-9A-Z]{2})', _re_mode)
63
bitrate_pattern = re.compile('.*Bit Rate[=:](.*?/s)', _re_mode)
64
opmode_pattern = re.compile('.*Mode:(.*?) ', _re_mode)
65
authmethods_pattern = re.compile('.*Authentication capabilities :\n(.*?)Current', _re_mode)
67
# Regular expressions for wpa_cli output
68
auth_pattern = re.compile('.*wpa_state=(.*?)\n', _re_mode)
70
RALINK_DRIVER = 'ralink legacy'
72
blacklist_strict = '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~ '
73
blacklist_norm = ";`$!*|><&\\"
74
blank_trans = maketrans("", "")
76
def _sanitize_string(string):
78
return translate(str(string), blank_trans, blacklist_norm)
82
def _sanitize_string_strict(string):
84
return translate(str(string), blank_trans, blacklist_strict)
89
def timedcache(duration=5):
90
""" A caching decorator for use with wnettools methods.
92
Caches the results of a function for a given number of
93
seconds (defaults to 5).
97
def __timedcache(self, *args, **kwargs):
98
key = str(args) + str(kwargs) + str(f)
99
if hasattr(self, 'iface'):
101
if (key in _cache and
102
(time.time() - _cache[key]['time']) < duration):
103
return _cache[key]['value']
105
value = f(self, *args, **kwargs)
106
_cache[key] = { 'time' : time.time(), 'value' : value }
113
def GetDefaultGateway():
114
""" Attempts to determine the default gateway by parsing route -n. """
115
route_info = misc.Run("route -n")
116
lines = route_info.split('\n')
123
if words[0] == '0.0.0.0':
128
print 'couldn\'t retrieve default gateway from route -n'
131
def GetWirelessInterfaces():
132
""" Get available wireless interfaces.
134
Attempts to get an interface first by parsing /proc/net/wireless,
135
and should that fail, by parsing iwconfig.
137
The first interface available.
140
dev_dir = '/sys/class/net/'
141
ifnames = [iface for iface in os.listdir(dev_dir)
142
if os.path.isdir(dev_dir + iface) and
143
'wireless' in os.listdir(dev_dir + iface)]
147
def GetWiredInterfaces():
148
""" Returns a list of wired interfaces on the system. """
149
basedir = '/sys/class/net/'
150
return [iface for iface in os.listdir(basedir)
151
if os.path.isdir(basedir + iface) and not 'wireless'
152
in os.listdir(basedir + iface) and
153
open(basedir + iface + "/type").readlines()[0].strip() == "1"]
155
def NeedsExternalCalls():
156
""" Returns True if the backend needs to use an external program. """
157
raise NotImplementedError
159
def GetWpaSupplicantDrivers():
160
""" Returns a list of all valid wpa_supplicant drivers. """
161
output = misc.Run(["wpa_supplicant", "-h"])
163
output = output.split("drivers:")[1].split("options:")[0].strip()
165
print "Warning: Couldn't get list of valid wpa_supplicant drivers"
167
patt = re.compile("(\S+)\s+=.*")
168
drivers = patt.findall(output) or [""]
169
# We cannot use the "wired" driver for wireless interfaces.
170
if 'wired' in drivers:
171
drivers.remove('wired')
173
def IsValidWpaSuppDriver(driver):
174
""" Returns True if given string is a valid wpa_supplicant driver. """
175
output = misc.Run(["wpa_supplicant", "-D%s" % driver, "-iolan19",
176
"-c/etc/abcd%sdefzz.zconfz" % random.randint(1, 1000)])
177
return not "Unsupported driver" in output
179
def neediface(default_response):
180
""" A decorator for only running a method if self.iface is defined.
182
This decorator is wrapped around Interface methods, and will
183
return a provided default_response value if self.iface is not
188
def newfunc(self, *args, **kwargs):
189
if not self.iface or \
190
not os.path.exists('/sys/class/net/%s' % self.iface):
191
return default_response
192
return func(self, *args, **kwargs)
193
newfunc.__dict__ = func.__dict__
194
newfunc.__doc__ = func.__doc__
195
newfunc.__module__ = func.__module__
200
class BaseInterface(object):
201
""" Control a network interface. """
202
def __init__(self, iface, verbose=False):
203
""" Initialise the object.
206
iface -- the name of the interface
207
verbose -- whether to print every command run
210
self.iface = _sanitize_string_strict(iface)
211
self.verbose = verbose
212
self.DHCP_CLIENT = None
213
self.flush_tool = None
214
self.link_detect = None
215
self.dhcp_object = None
217
def SetDebugMode(self, value):
218
""" If True, verbose output is enabled. """
221
def SetInterface(self, iface):
222
""" Sets the interface.
225
iface -- the name of the interface.
228
self.iface = _sanitize_string_strict(str(iface))
230
def _find_program_path(self, program):
231
""" Determines the full path for the given program.
233
Searches for a given program name on the PATH.
236
program -- The name of the program to search for
239
The full path of the program or None
242
path = find_path(program)
243
if not path and self.verbose:
244
print "WARNING: No path found for %s" % program
248
def _get_dhcp_command(self, flavor=None, hostname=None):
249
""" Returns the correct DHCP client command.
251
Given a type of DHCP request (create or release a lease),
252
this method will build a command to complete the request
253
using the correct dhcp client, and cli options.
256
def get_client_name(cl):
257
""" Converts the integer value for a dhcp client to a string. """
258
if self.dhcpcd_cmd and cl in [misc.DHCPCD, misc.AUTO]:
260
cmd = self.dhcpcd_cmd
261
elif self.pump_cmd and cl in [misc.PUMP, misc.AUTO]:
264
elif self.dhclient_cmd and cl in [misc.DHCLIENT, misc.AUTO]:
266
cmd = self.dhclient_cmd
267
if self.dhclient_needs_verbose:
269
elif self.udhcpc_cmd and cl in [misc.UDHCPC, misc.AUTO]:
271
cmd = self.udhcpc_cmd
277
# probably /var/lib/wicd/dhclient.conf with defaults
278
dhclient_conf_path = os.path.join(
285
{'connect' : r"%(cmd)s -cf %(dhclientconf)s %(iface)s",
286
'release' : r"%(cmd)s -r %(iface)s",
287
'id' : misc.DHCLIENT,
290
{ 'connect' : r"%(cmd)s -i %(iface)s -h %(hostname)s",
291
'release' : r"%(cmd)s -r -i %(iface)s",
295
{'connect' : r"%(cmd)s %(iface)s -h %(hostname)s ",
296
'release' : r"%(cmd)s -k %(iface)s",
300
{'connect' : r"%(cmd)s -n -i %(iface)s -H %(hostname)s ",
301
'release' : r"killall -SIGUSR2 %(cmd)s",
305
(client_name, cmd) = get_client_name(self.DHCP_CLIENT)
307
# cause dhclient doesn't have a handy dandy argument
308
# for specifing the hostname to be sent
309
if client_name == "dhclient" and flavor:
311
# <hostname> will use the system hostname
312
# we'll use that if there is hostname passed
313
# that shouldn't happen, though
314
hostname = '<hostname>'
315
print 'attempting to set hostname with dhclient'
316
print 'using dhcpcd or another supported client may work better'
317
dhclient_template = \
318
open(os.path.join(wpath.etc, 'dhclient.conf.template'), 'r')
320
output_conf = open(dhclient_conf_path, 'w')
322
for line in dhclient_template.readlines():
323
line = line.replace('$_HOSTNAME', hostname)
324
output_conf.write(line)
327
dhclient_template.close()
329
if not client_name or not cmd:
330
print "WARNING: Failed to find a valid dhcp client!"
333
if flavor == "connect":
335
hostname = os.uname()[1]
336
return client_dict[client_name]['connect'] % \
338
"iface" : self.iface,
339
"hostname" : hostname,
340
'dhclientconf' : dhclient_conf_path }
341
elif flavor == "release":
342
return client_dict[client_name]['release'] % {"cmd":cmd, "iface":self.iface}
344
return client_dict[client_name]['id']
346
def AppAvailable(self, app):
347
""" Return whether a given app is available.
349
Given the name of an executable, determines if it is
350
available for use by checking for a defined 'app'_cmd
354
return bool(self.__dict__.get("%s_cmd" % app.replace("-", "")))
357
""" Check that all required tools are available. """
358
# THINGS TO CHECK FOR: ethtool, pptp-linux, dhclient, host
360
self.CheckWiredTools()
361
self.CheckWirelessTools()
362
self.CheckSudoApplications()
363
self.CheckRouteFlushTool()
364
self.CheckResolvConf()
366
def CheckResolvConf(self):
367
""" Checks for the existence of resolvconf."""
368
self.resolvconf_cmd = self._find_program_path("resolvconf")
371
""" Check for the existence of valid DHCP clients.
373
Checks for the existence of a supported DHCP client. If one is
374
found, the appropriate values for DHCP_CMD, DHCP_RELEASE, and
375
DHCP_CLIENT are set. If a supported client is not found, a
379
self.dhclient_cmd = self._find_program_path("dhclient")
380
if self.dhclient_cmd != None:
381
output = misc.Run(self.dhclient_cmd + " --version",
384
self.dhclient_needs_verbose = True
386
self.dhclient_needs_verbose = False
387
debian_dhcpcd_cmd = self._find_program_path('dhcpcd-bin')
388
if debian_dhcpcd_cmd:
389
self.dhcpcd_cmd = debian_dhcpcd_cmd
391
self.dhcpcd_cmd = self._find_program_path("dhcpcd")
392
self.pump_cmd = self._find_program_path("pump")
393
self.udhcpc_cmd = self._find_program_path("udhcpc")
395
def CheckWiredTools(self):
396
""" Check for the existence of ethtool and mii-tool. """
397
self.miitool_cmd = self._find_program_path("mii-tool")
398
self.ethtool_cmd = self._find_program_path("ethtool")
400
def CheckWirelessTools(self):
401
""" Check for the existence of wpa_cli """
402
self.wpa_cli_cmd = self._find_program_path("wpa_cli")
403
if not self.wpa_cli_cmd:
404
print "wpa_cli not found. Authentication will not be validated."
406
def CheckRouteFlushTool(self):
407
""" Check for a route flush tool. """
408
self.ip_cmd = self._find_program_path("ip")
409
self.route_cmd = self._find_program_path("route")
411
def CheckSudoApplications(self):
412
self.gksudo_cmd = self._find_program_path("gksudo")
413
self.kdesu_cmd = self._find_program_path("kdesu")
414
self.ktsuss_cmd = self._find_program_path("ktsuss")
418
""" Bring the network interface up.
424
cmd = 'ifconfig ' + self.iface + ' up'
425
if self.verbose: print cmd
431
""" Take down the network interface.
437
cmd = 'ifconfig ' + self.iface + ' down'
438
if self.verbose: print cmd
444
def GetIfconfig(self):
445
""" Runs ifconfig and returns the output. """
446
cmd = "ifconfig %s" % self.iface
447
if self.verbose: print cmd
451
def SetAddress(self, ip=None, netmask=None, broadcast=None):
452
""" Set the IP addresses of an interface.
455
ip -- interface IP address in dotted quad form
456
netmask -- netmask address in dotted quad form
457
broadcast -- broadcast address in dotted quad form
460
for val in [ip, netmask, broadcast]:
463
if not misc.IsValidIP(val):
464
print 'WARNING: Invalid IP address found, aborting!'
467
cmd = ''.join(['ifconfig ', self.iface, ' '])
469
cmd = ''.join([cmd, ip, ' '])
471
cmd = ''.join([cmd, 'netmask ', netmask, ' '])
473
cmd = ''.join([cmd, 'broadcast ', broadcast, ' '])
474
if self.verbose: print cmd
477
def _parse_dhclient(self, pipe):
478
""" Parse the output of dhclient.
480
Parses the output of dhclient and returns the status of
481
the connection attempt.
484
pipe -- stdout pipe to the dhclient process.
487
'success' if succesful', an error code string otherwise.
490
dhclient_complete = False
491
dhclient_success = False
493
while not dhclient_complete:
494
line = pipe.readline()
495
if line == '': # Empty string means dhclient is done.
496
dhclient_complete = True
498
print misc.to_unicode(line.strip('\n'))
499
if line.startswith('bound'):
500
dhclient_success = True
501
dhclient_complete = True
503
return self._check_dhcp_result(dhclient_success)
505
def _parse_pump(self, pipe):
506
""" Determines if obtaining an IP using pump succeeded.
509
pipe -- stdout pipe to the pump process.
512
'success' if succesful, an error code string otherwise.
515
pump_complete = False
518
while not pump_complete:
519
line = pipe.readline()
522
elif line.strip().lower().startswith('Operation failed.'):
525
print misc.to_unicode(line)
527
return self._check_dhcp_result(pump_success)
529
def _parse_dhcpcd(self, pipe):
530
""" Determines if obtaining an IP using dhcpcd succeeded.
533
pipe -- stdout pipe to the dhcpcd process.
536
'success' if succesful, an error code string otherwise.
539
dhcpcd_complete = False
540
dhcpcd_success = True
542
while not dhcpcd_complete:
543
line = pipe.readline()
544
if "Error" in line or "timed out" in line:
545
dhcpcd_success = False
546
dhcpcd_complete = True
548
dhcpcd_complete = True
549
print misc.to_unicode(line)
551
return self._check_dhcp_result(dhcpcd_success)
553
def _parse_udhcpc(self, pipe):
554
""" Determines if obtaining an IP using udhcpc succeeded.
557
pipe -- stdout pipe to the dhcpcd process.
560
'success' if successful, an error code string otherwise.
563
udhcpc_complete = False
564
udhcpc_success = True
566
while not udhcpc_complete:
567
line = pipe.readline()
568
if line.endswith("failing."):
569
udhcpc_success = False
570
udhcpc_complete = True
572
udhcpc_complete = True
575
return self._check_dhcp_result(udhcpc_success)
577
def _check_dhcp_result(self, success):
578
""" Print and return the correct DHCP connection result.
581
success -- boolean specifying if DHCP was succesful.
584
'success' if success == True, 'dhcp_failed' otherwise.
588
print 'DHCP connection successful'
591
print 'DHCP connection failed'
595
def StartDHCP(self, hostname):
596
""" Start the DHCP client to obtain an IP address.
599
hostname -- the hostname to send to the DHCP server
602
A string representing the result of the DHCP command. See
603
_check_dhcp_result for the possible values.
606
cmd = self._get_dhcp_command('connect', hostname)
607
if self.verbose: print cmd
608
self.dhcp_object = misc.Run(cmd, include_stderr=True, return_obj=True)
609
pipe = self.dhcp_object.stdout
610
client_dict = { misc.DHCLIENT : self._parse_dhclient,
611
misc.DHCPCD : self._parse_dhcpcd,
612
misc.PUMP : self._parse_pump,
613
misc.UDHCPC : self._parse_udhcpc,
616
DHCP_CLIENT = self._get_dhcp_command()
617
if DHCP_CLIENT in client_dict:
618
ret = client_dict[DHCP_CLIENT](pipe)
620
print "ERROR: no dhcp client found"
625
def ReleaseDHCP(self):
626
""" Release the DHCP lease for this interface. """
627
cmd = self._get_dhcp_command("release")
628
if self.verbose: print cmd
632
def DelDefaultRoute(self):
633
""" Delete only the default route for a device. """
634
if self.ip_cmd and self.flush_tool in [misc.AUTO, misc.IP]:
635
cmd = '%s route del default dev %s' % (self.ip_cmd, self.iface)
636
elif self.route_cmd and self.flush_tool in [misc.AUTO, misc.ROUTE]:
637
cmd = '%s del default dev %s' % (self.route_cmd, self.iface)
639
print "No route manipulation command available!"
641
if self.verbose: print cmd
645
def SetDNS(self, dns1=None, dns2=None, dns3=None,
646
dns_dom=None, search_dom=None):
647
""" Set the DNS of the system to the specified DNS servers.
649
Opens up resolv.conf and writes in the nameservers.
652
dns1 -- IP address of DNS server 1
653
dns2 -- IP address of DNS server 2
654
dns3 -- IP address of DNS server 3
655
dns_dom -- DNS domain
656
search_dom -- DNS search domain
661
resolv_params += 'domain %s\n' % dns_dom
663
resolv_params += 'search %s\n' % search_dom
666
for dns in (dns1, dns2, dns3):
668
if misc.IsValidIP(dns):
670
print 'Setting DNS : ' + dns
671
valid_dns_list.append("nameserver %s\n" % dns)
673
print 'DNS IP %s is not a valid IP address, skipping' % dns
676
resolv_params += ''.join(valid_dns_list)
678
if self.resolvconf_cmd:
679
cmd = [self.resolvconf_cmd, '-a', self.iface]
680
if self.verbose: print cmd
681
p = misc.Run(cmd, include_stderr=True, return_obj=True)
682
p.communicate(input=resolv_params)
684
resolv = open("/etc/resolv.conf", "w")
685
resolv.write(resolv_params + "\n")
689
def FlushRoutes(self):
690
""" Flush network routes for this device. """
691
if self.ip_cmd and self.flush_tool in [misc.AUTO, misc.IP]:
692
cmds = ['%s route flush dev %s' % (self.ip_cmd, self.iface)]
693
elif self.route_cmd and self.flush_tool in [misc.AUTO, misc.ROUTE]:
694
cmds = ['%s del dev %s' % (self.route_cmd, self.iface)]
696
print "No flush command available!"
699
if self.verbose: print cmd
703
def SetDefaultRoute(self, gw):
704
""" Add a default route with the specified gateway.
707
gw -- gateway of the default route in dotted quad form
710
if not misc.IsValidIP(gw):
711
print 'WARNING: Invalid gateway found. Aborting!'
713
cmd = 'route add default gw %s dev %s' % (gw, self.iface)
714
if self.verbose: print cmd
718
def GetIP(self, ifconfig=""):
719
""" Get the IP address of the interface.
722
The IP address of the interface in dotted quad form.
726
output = self.GetIfconfig()
729
return misc.RunRegex(ip_pattern, output)
732
def VerifyAPAssociation(self, gateway):
733
""" Verify assocation with an access point.
735
Verifies that an access point can be contacted by
739
cmd = "ping -q -w 3 -c 1 %s" % gateway
740
if self.verbose: print cmd
741
return misc.LaunchAndWait(cmd)
744
def IsUp(self, ifconfig=None):
745
""" Determines if the interface is up.
748
True if the interface is up, False otherwise.
751
flags_file = '/sys/class/net/%s/flags' % self.iface
753
flags = open(flags_file, "r").read().strip()
755
print "Could not open %s, using ifconfig to determine status" % flags_file
756
return self._slow_is_up(ifconfig)
757
return bool(int(flags, 16) & 1)
760
def _slow_is_up(self, ifconfig=None):
761
""" Determine if an interface is up using ifconfig. """
763
output = self.GetIfconfig()
766
lines = output.split('\n')
769
for line in lines[1:4]:
770
if line.strip().startswith('UP'):
775
class BaseWiredInterface(BaseInterface):
776
""" Control a wired network interface. """
777
def __init__(self, iface, verbose=False):
778
""" Initialise the wired network interface class.
781
iface -- name of the interface
782
verbose -- print all commands
785
BaseInterface.__init__(self, iface, verbose)
788
def GetPluggedIn(self):
789
""" Get the current physical connection state.
791
The method will first attempt to use ethtool do determine
792
physical connection state. Should ethtool fail to run properly,
793
mii-tool will be used instead.
796
True if a link is detected, False otherwise.
799
# check for link using /sys/class/net/iface/carrier
800
# is usually more accurate
801
sys_device = '/sys/class/net/%s/' % self.iface
802
carrier_path = sys_device + 'carrier'
810
if self.IsUp() or tries > MAX_TRIES: break
812
if os.path.exists(carrier_path):
813
carrier = open(carrier_path, 'r')
815
link = carrier.read().strip()
821
except (IOError, ValueError, TypeError):
822
print 'Error checking link using /sys/class/net/%s/carrier' % self.iface
824
if self.ethtool_cmd and self.link_detect in [misc.ETHTOOL, misc.AUTO]:
825
return self._eth_get_plugged_in()
826
elif self.miitool_cmd and self.link_detect in [misc.MIITOOL, misc.AUTO]:
827
return self._mii_get_plugged_in()
829
print ('Error: No way of checking for a wired connection. Make ' +
830
'sure that either mii-tool or ethtool is installed.')
833
def _eth_get_plugged_in(self):
834
""" Use ethtool to determine the physical connection state.
837
True if a link is detected, False otherwise.
840
cmd = "%s %s" % (self.ethtool_cmd, self.iface)
842
print 'Wired Interface is down, putting it up'
845
if self.verbose: print cmd
846
tool_data = misc.Run(cmd, include_stderr=True)
847
if misc.RunRegex(re.compile('(Link detected: yes)', re.I | re.M | re.S),
853
def _mii_get_plugged_in(self):
854
""" Use mii-tool to determine the physical connection state.
857
True if a link is detected, False otherwise.
860
cmd = "%s %s" % (self.miitool_cmd, self.iface)
861
if self.verbose: print cmd
862
tool_data = misc.Run(cmd, include_stderr=True)
863
if misc.RunRegex(re.compile('(Invalid argument)', re.I | re.M | re.S),
864
tool_data) is not None:
865
print 'Wired Interface is down, putting it up'
868
if self.verbose: print cmd
869
tool_data = misc.Run(cmd, include_stderr=True)
871
if misc.RunRegex(re.compile('(link ok)', re.I | re.M | re.S),
872
tool_data) is not None:
878
class BaseWirelessInterface(BaseInterface):
879
""" Control a wireless network interface. """
880
def __init__(self, iface, verbose=False, wpa_driver='wext'):
881
""" Initialise the wireless network interface class.
884
iface -- name of the interface
885
verbose -- print all commands
888
BaseInterface.__init__(self, iface, verbose)
889
self.wpa_driver = wpa_driver
890
self.scan_iface = None
892
def SetWpaDriver(self, driver):
893
""" Sets the wpa_driver. """
894
self.wpa_driver = _sanitize_string(driver)
897
def SetEssid(self, essid):
898
""" Set the essid of the wireless interface.
901
essid -- essid to set the interface to
904
cmd = ['iwconfig', self.iface, 'essid', '--', str(essid)]
905
if self.verbose: print str(cmd)
909
def GetKillSwitchStatus(self):
910
""" Determines if the wireless killswitch is enabled.
913
True if the killswitch is enabled, False otherwise.
916
output = self.GetIwconfig()
918
killswitch_pattern = re.compile('.*radio off', re.I | re.M | re.S)
919
if killswitch_pattern.search(output):
928
def GetIwconfig(self):
929
""" Returns the output of iwconfig for this interface. """
930
cmd = "iwconfig " + self.iface
931
if self.verbose: print cmd
934
def _FreqToChannel(self, freq):
935
""" Translate the specified frequency to a channel.
937
Note: This function is simply a lookup dict and therefore the
938
freq argument must be in the dict to provide a valid channel.
941
freq -- string containing the specified frequency
944
The channel number, or None if not found.
948
freq_dict = {'2.412 GHz': 1, '2.417 GHz': 2, '2.422 GHz': 3,
949
'2.427 GHz': 4, '2.432 GHz': 5, '2.437 GHz': 6,
950
'2.442 GHz': 7, '2.447 GHz': 8, '2.452 GHz': 9,
951
'2.457 GHz': 10, '2.462 GHz': 11, '2.467 GHz': 12,
952
'2.472 GHz': 13, '2.484 GHz': 14 }
954
ret = freq_dict[freq]
956
print "Couldn't determine channel number for frequency: " + str(freq)
960
def _GetRalinkInfo(self):
961
""" Get a network info list used for ralink drivers
963
Calls iwpriv <wireless interface> get_site_survey, which
964
on some ralink cards will return encryption and signal
965
strength info for wireless networks in the area.
968
iwpriv = misc.Run('iwpriv ' + self.iface + ' get_site_survey')
971
lines = iwpriv.splitlines()[2:]
973
patt = re.compile("((?:[0-9A-Z]{2}:){5}[0-9A-Z]{2})")
977
info = filter(None, [x.strip() for x in info])
980
if re.match(patt, info[2].upper()):
981
bssid = info[2].upper()
983
elif re.match(patt, info[3].upper()):
984
bssid = info[3].upper()
987
print 'Invalid iwpriv line. Skipping it.'
989
ap['nettype'] = info[-1]
990
ap['strength'] = info[1]
991
if info[4 + offset] == 'WEP':
992
ap['encryption_method'] = 'WEP'
993
ap['enctype'] = 'WEP'
994
ap['keyname'] = 'Key1'
995
ap['authmode'] = info[5 + offset]
996
elif info[5 + offset] in ['WPA-PSK', 'WPA']:
997
ap['encryption_method'] = 'WPA'
998
ap['authmode'] = "WPAPSK"
999
ap['keyname'] = "WPAPSK"
1000
ap['enctype'] = info[4 + offset]
1001
elif info[5 + offset] == 'WPA2-PSK':
1002
ap['encryption_method'] = 'WPA2'
1003
ap['authmode'] ="WPA2PSK"
1004
ap['keyname'] = "WPA2PSK"
1005
ap['enctype'] = info[4 + offset]
1006
elif info[4 + offset] == "NONE":
1007
ap['encryption_method'] = None
1009
print "Unknown AuthMode, can't assign encryption_method!"
1010
ap['encryption_method'] = 'Unknown'
1012
if self.verbose: print str(aps)
1015
def _ParseRalinkAccessPoint(self, ap, ralink_info, cell):
1016
""" Parse encryption and signal strength info for ralink cards
1019
ap -- array containing info about the current access point
1020
ralink_info -- dict containing available network info
1021
cell -- string containing cell information
1024
Updated array containing info about the current access point
1027
wep_pattern = re.compile('.*Encryption key:(.*?)\n', re.I | re.M | re.S)
1028
if ralink_info.has_key(ap['bssid']):
1029
info = ralink_info[ap['bssid']]
1030
for key in info.keys():
1032
if misc.RunRegex(wep_pattern, cell) == 'on':
1033
ap['encryption'] = True
1035
ap['encryption'] = False
1039
def SetMode(self, mode):
1040
""" Set the mode of the wireless interface.
1043
mode -- mode to set the interface to
1046
mode = _sanitize_string_strict(mode)
1047
if mode.lower() == 'master':
1049
cmd = 'iwconfig %s mode %s' % (self.iface, mode)
1050
if self.verbose: print cmd
1054
def SetChannel(self, channel):
1055
""" Set the channel of the wireless interface.
1058
channel -- channel to set the interface to
1061
if not channel.isdigit():
1062
print 'WARNING: Invalid channel found. Aborting!'
1065
cmd = 'iwconfig %s channel %s' % (self.iface, str(channel))
1066
if self.verbose: print cmd
1070
def SetKey(self, key):
1071
""" Set the encryption key of the wireless interface.
1074
key -- encryption key to set
1077
cmd = 'iwconfig %s key %s' % (self.iface, key)
1078
if self.verbose: print cmd
1082
def Associate(self, essid, channel=None, bssid=None):
1083
""" Associate with the specified wireless network.
1086
essid -- essid of the network
1087
channel -- channel of the network
1088
bssid -- bssid of the network
1091
cmd = ['iwconfig', self.iface, 'essid', essid]
1092
if self.verbose: print str(cmd)
1094
base = "iwconfig %s" % self.iface
1095
if channel and str(channel).isdigit():
1096
cmd = "%s channel %s" % (base, str(channel))
1097
if self.verbose: print cmd
1100
cmd = "%s ap %s" % (base, bssid)
1101
if self.verbose: print cmd
1104
def GeneratePSK(self, network):
1105
""" Generate a PSK using wpa_passphrase.
1108
network -- dictionary containing network info
1111
wpa_pass_path = misc.find_path('wpa_passphrase')
1112
if not wpa_pass_path: return None
1113
key_pattern = re.compile('network={.*?\spsk=(.*?)\n}.*',
1115
cmd = [wpa_pass_path, str(network['essid']), str(network['key'])]
1116
if self.verbose: print cmd
1117
return misc.RunRegex(key_pattern, misc.Run(cmd))
1120
def Authenticate(self, network):
1121
""" Authenticate with the specified wireless network.
1124
network -- dictionary containing network info
1127
misc.ParseEncryption(network)
1128
if self.wpa_driver == RALINK_DRIVER:
1129
self._AuthenticateRalinkLegacy(network)
1131
cmd = ['wpa_supplicant', '-B', '-i', self.iface, '-c',
1132
os.path.join(wpath.networks,
1133
network['bssid'].replace(':', '').lower()),
1134
'-D', self.wpa_driver]
1135
if self.verbose: print cmd
1138
def _AuthenticateRalinkLegacy(self, network):
1139
""" Authenticate with the specified wireless network.
1141
This function handles Ralink legacy cards that cannot use
1145
network -- dictionary containing network info
1148
if network.get('key') != None:
1150
info = self._GetRalinkInfo()[network.get('bssid')]
1152
print "Could not find current network in iwpriv " + \
1153
"get_site_survey results. Cannot authenticate."
1156
if info['enctype'] == "WEP" and info['authtype'] == 'OPEN':
1157
print 'Setting up WEP'
1158
cmd = ''.join(['iwconfig ', self.iface, ' key ',
1159
network.get('key')])
1160
if self.verbose: print cmd
1164
cmd_list.append('NetworkType=' + info['nettype'])
1165
cmd_list.append('AuthMode=' + info['authmode'])
1166
cmd_list.append('EncrypType=' + info['enctype'])
1167
cmd_list.append('SSID="%s"' % network['essid'])
1168
cmd_list.append('%s="%s"' % (network['keyname'], network['key']))
1169
if info['nettype'] == 'SHARED' and info['enctype'] == 'WEP':
1170
cmd_list.append('DefaultKeyID=1')
1172
for cmd in cmd_list:
1173
cmd = ['iwpriv', self.iface, 'set', cmd]
1174
if self.verbose: print ' '.join(cmd)
1178
def GetNetworks(self):
1179
""" Get a list of available wireless networks.
1182
A list containing available wireless networks.
1185
cmd = 'iwlist ' + self.iface + ' scan'
1186
if self.verbose: print cmd
1187
results = misc.Run(cmd)
1188
# Split the networks apart, using Cell as our split point
1189
# this way we can look at only one network at a time.
1190
# The spaces around ' Cell ' are to minimize the chance that someone
1191
# has an essid named Cell...
1192
networks = results.split( ' Cell ' )
1194
# Get available network info from iwpriv get_site_survey
1195
# if we're using a ralink card (needed to get encryption info)
1196
if self.wpa_driver == RALINK_DRIVER:
1197
ralink_info = self._GetRalinkInfo()
1201
# An array for the access points
1204
for cell in networks:
1205
# Only use sections where there is an ESSID.
1206
if 'ESSID:' in cell:
1207
# Add this network to the list of networks
1208
entry = self._ParseAccessPoint(cell, ralink_info)
1209
if entry is not None:
1210
# Normally we only get duplicate bssids with hidden
1211
# networks. If we hit this, we only want the entry
1212
# with the real essid to be in the network list.
1213
if (entry['bssid'] not in access_points
1214
or not entry['hidden']):
1215
access_points[entry['bssid']] = entry
1217
return access_points.values()
1219
def _ParseAccessPoint(self, cell, ralink_info):
1220
""" Parse a single cell from the output of iwlist.
1223
cell -- string containing the cell information
1224
ralink_info -- string contating network information needed
1228
A dictionary containing the cell networks properties.
1232
ap['essid'] = misc.RunRegex(essid_pattern, cell)
1234
ap['essid'] = misc.to_unicode(ap['essid'])
1235
except (UnicodeDecodeError, UnicodeEncodeError):
1236
print 'Unicode problem with current network essid, ignoring!!'
1238
if ap['essid'] in ['Hidden', '<hidden>', "", None]:
1241
ap['essid'] = "<hidden>"
1243
ap['hidden'] = False
1245
# Channel - For cards that don't have a channel number,
1246
# convert the frequency.
1247
ap['channel'] = misc.RunRegex(channel_pattern, cell)
1248
if ap['channel'] == None:
1249
freq = misc.RunRegex(freq_pattern, cell)
1250
ap['channel'] = self._FreqToChannel(freq)
1253
ap['bitrates'] = misc.RunRegex(bitrates_pattern,
1254
cell.split("Bit Rates")[-1])
1257
ap['bssid'] = misc.RunRegex(ap_mac_pattern, cell)
1260
ap['mode'] = misc.RunRegex(mode_pattern, cell)
1262
# Break off here if we're using a ralink card
1263
if self.wpa_driver == RALINK_DRIVER:
1264
ap = self._ParseRalinkAccessPoint(ap, ralink_info, cell)
1265
elif misc.RunRegex(wep_pattern, cell) == 'on':
1266
# Encryption - Default to WEP
1267
ap['encryption'] = True
1268
ap['encryption_method'] = 'WEP'
1270
if misc.RunRegex(wpa1_pattern, cell) == 'WPA Version 1':
1271
ap['encryption_method'] = 'WPA'
1273
if misc.RunRegex(altwpa_pattern, cell) == 'wpa_ie':
1274
ap['encryption_method'] = 'WPA'
1276
if misc.RunRegex(wpa2_pattern, cell) == 'WPA2':
1277
ap['encryption_method'] = 'WPA2'
1279
ap['encryption'] = False
1282
# Set strength to -1 if the quality is not found
1283
ap['quality'] = self._get_link_quality(cell)
1284
if ap['quality'] is None:
1287
# Signal Strength (only used if user doesn't want link
1288
# quality displayed or it isn't found)
1289
if misc.RunRegex(signaldbm_pattern, cell):
1290
ap['strength'] = misc.RunRegex(signaldbm_pattern, cell)
1291
elif self.wpa_driver != RALINK_DRIVER: # This is already set for ralink
1296
def ValidateAuthentication(self, auth_time):
1297
""" Validate WPA authentication.
1299
Validate that the wpa_supplicant authentication
1300
process was successful.
1302
NOTE: It's possible this could return False,
1303
though in reality wpa_supplicant just isn't
1307
auth_time -- The time at which authentication began.
1310
True if wpa_supplicant authenticated succesfully,
1314
# Right now there's no way to do this for these drivers
1315
if self.wpa_driver == RALINK_DRIVER or not self.wpa_cli_cmd:
1319
MAX_DISCONNECTED_TIME = 3
1320
disconnected_time = 0
1321
forced_rescan = False
1322
while (time.time() - auth_time) < MAX_TIME:
1323
cmd = '%s -i %s status' % (self.wpa_cli_cmd, self.iface)
1324
output = misc.Run(cmd)
1325
result = misc.RunRegex(auth_pattern, output)
1327
print 'WPA_CLI RESULT IS', result
1331
if result == "COMPLETED":
1333
elif result == "DISCONNECTED" and not forced_rescan:
1334
disconnected_time += 1
1335
if disconnected_time > MAX_DISCONNECTED_TIME:
1336
disconnected_time = 0
1337
# Force a rescan to get wpa_supplicant moving again.
1338
forced_rescan = True
1339
self._ForceSupplicantScan()
1342
disconnected_time = 0
1345
print 'wpa_supplicant authentication may have failed.'
1349
def _ForceSupplicantScan(self):
1350
""" Force wpa_supplicant to rescan available networks.
1352
This function forces wpa_supplicant to rescan.
1353
This works around authentication validation sometimes failing for
1354
wpa_supplicant because it remains in a DISCONNECTED state for
1355
quite a while, after which a rescan is required, and then
1356
attempting to authenticate. This whole process takes a long
1357
time, so we manually speed it up if we see it happening.
1360
print 'wpa_supplicant rescan forced...'
1361
cmd = 'wpa_cli -i' + self.iface + ' scan'
1366
""" Terminates wpa using wpa_cli"""
1367
cmd = 'wpa_cli -i %s terminate' % self.iface
1368
if self.verbose: print cmd
1372
def GetBSSID(self, iwconfig=None):
1373
""" Get the MAC address for the interface. """
1375
output = self.GetIwconfig()
1379
bssid = misc.RunRegex(bssid_pattern, output)
1383
def GetCurrentBitrate(self, iwconfig=None):
1384
""" Get the current bitrate for the interface. """
1386
output = self.GetIwconfig()
1390
bitrate = misc.RunRegex(bitrate_pattern, output)
1394
def GetOperationalMode(self, iwconfig=None):
1395
""" Get the operational mode for the interface. """
1397
output = self.GetIwconfig()
1401
opmode = misc.RunRegex(opmode_pattern, output)
1403
opmode = opmode.strip()
1407
def GetAvailableAuthMethods(self, iwlistauth=None):
1408
""" Get the available authentication methods for the interface. """
1410
cmd = 'iwlist ' + self.iface + ' auth'
1411
if self.verbose: print cmd
1412
output = misc.Run(cmd)
1416
authm = misc.RunRegex(authmethods_pattern, output)
1417
authm_list = [m.strip() for m in authm.split('\n') if m.strip()]
1418
return ';'.join(authm_list)
1420
def _get_link_quality(self, output):
1421
""" Parse out the link quality from iwlist scan or iwconfig output. """
1423
[(strength, max_strength)] = strength_pattern.findall(output)
1425
(strength, max_strength) = (None, None)
1427
if strength in ['', None]:
1429
[(strength, max_strength)] = altstrength_pattern.findall(output)
1431
# if the pattern was unable to match anything
1432
# we'll return 101, which will allow us to stay
1433
# connected even though we don't know the strength
1434
# it also allows us to tell if
1436
if strength not in ['', None] and max_strength:
1437
return (100 * int(strength) // int(max_strength))
1438
elif strength not in ["", None]:
1439
return int(strength)
1444
def GetSignalStrength(self, iwconfig=None):
1445
""" Get the signal strength of the current network.
1448
The signal strength.
1452
output = self.GetIwconfig()
1455
return self._get_link_quality(output)
1458
def GetDBMStrength(self, iwconfig=None):
1459
""" Get the dBm signal strength of the current network.
1462
The dBm signal strength.
1466
output = self.GetIwconfig()
1469
signaldbm_pattern = re.compile('.*Signal level:?=? ?(-\d\d*)',
1471
dbm_strength = misc.RunRegex(signaldbm_pattern, output)
1475
def GetCurrentNetwork(self, iwconfig=None):
1476
""" Get the essid of the current network.
1479
The current network essid.
1483
output = self.GetIwconfig()
1486
network = misc.RunRegex(re.compile('.*ESSID:"(.*?)"',
1487
re.I | re.M | re.S), output)
1489
network = misc.to_unicode(network)