1
# Written by Bram Cohen
2
# see LICENSE.txt for license information
4
from bisect import insort
5
from SocketHandler import SocketHandler, UPnP_ERROR
7
from cStringIO import StringIO
8
from traceback import print_exc
9
from select import error
10
from threading import Thread, Event
11
from time import sleep
12
from clock import clock
21
def autodetect_ipv6():
23
assert sys.version_info >= (2,3)
24
assert socket.has_ipv6
25
socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
30
def autodetect_socket_style():
31
if sys.platform.find('linux') < 0:
35
f = open('/proc/sys/net/ipv6/bindv6only','r')
36
dual_socket_style = int(f.read())
38
return int(not dual_socket_style)
46
def __init__(self, doneflag, timeout_check_interval, timeout, noisy = True,
47
ipv6_enable = True, failfunc = lambda x: None, errorfunc = None,
48
sockethandler = None, excflag = Event()):
49
self.timeout_check_interval = timeout_check_interval
50
self.timeout = timeout
52
self.single_sockets = {}
53
self.dead_from_write = []
54
self.doneflag = doneflag
56
self.failfunc = failfunc
57
self.errorfunc = errorfunc
60
self.externally_added = []
61
self.finished = Event()
62
self.tasks_to_kill = []
63
self.excflag = excflag
65
if sockethandler is None:
66
sockethandler = SocketHandler(timeout, ipv6_enable, READSIZE)
67
self.sockethandler = sockethandler
68
self.add_task(self.scan_for_timeouts, timeout_check_interval)
70
def get_exception_flag(self):
73
def _add_task(self, func, delay, id = None):
74
assert float(delay) >= 0
75
insort(self.funcs, (clock() + delay, func, id))
77
def add_task(self, func, delay = 0, id = None):
78
assert float(delay) >= 0
79
self.externally_added.append((func, delay, id))
81
def scan_for_timeouts(self):
82
self.add_task(self.scan_for_timeouts, self.timeout_check_interval)
83
self.sockethandler.scan_for_timeouts()
85
def bind(self, port, bind = '', reuse = False,
86
ipv6_socket_style = 1, upnp = False):
87
self.sockethandler.bind(port, bind, reuse, ipv6_socket_style, upnp)
89
def find_and_bind(self, minport, maxport, bind = '', reuse = False,
90
ipv6_socket_style = 1, upnp = 0, randomizer = False):
91
return self.sockethandler.find_and_bind(minport, maxport, bind, reuse,
92
ipv6_socket_style, upnp, randomizer)
94
def start_connection_raw(self, dns, socktype, handler = None):
95
return self.sockethandler.start_connection_raw(dns, socktype, handler)
97
def start_connection(self, dns, handler = None, randomize = False):
98
return self.sockethandler.start_connection(dns, handler, randomize)
101
return self.sockethandler.get_stats()
103
def pop_external(self):
104
while self.externally_added:
105
(a, b, c) = self.externally_added.pop(0)
106
self._add_task(a, b, c)
109
def listen_forever(self, handler):
110
self.sockethandler.set_handler(handler)
112
while not self.doneflag.isSet():
117
period = self.funcs[0][0] + 0.001 - clock()
122
events = self.sockethandler.do_poll(period)
123
if self.doneflag.isSet():
125
while self.funcs and self.funcs[0][0] <= clock():
126
garbage1, func, id = self.funcs.pop(0)
127
if id in self.tasks_to_kill:
130
# print func.func_name
132
except (SystemError, MemoryError), e:
133
self.failfunc(str(e))
135
except KeyboardInterrupt:
136
# self.exception(True)
141
self.sockethandler.close_dead()
142
self.sockethandler.handle_events(events)
143
if self.doneflag.isSet():
145
self.sockethandler.close_dead()
146
except (SystemError, MemoryError), e:
147
self.failfunc(str(e))
150
if self.doneflag.isSet():
152
except KeyboardInterrupt:
153
# self.exception(True)
157
if self.exccount > 10:
160
# self.sockethandler.shutdown()
163
def is_finished(self):
164
return self.finished.isSet()
166
def wait_until_finished(self):
169
def _kill_tasks(self):
170
if self.tasks_to_kill:
172
for (t, func, id) in self.funcs:
173
if id not in self.tasks_to_kill:
174
new_funcs.append((t, func, id))
175
self.funcs = new_funcs
176
self.tasks_to_kill = []
178
def kill_tasks(self, id):
179
self.tasks_to_kill.append(id)
181
def exception(self, kbint = False):
185
if self.errorfunc is None:
189
print_exc(file = data)
190
# print data.getvalue() # report exception here too
191
if not kbint: # don't report here if it's a keyboard interrupt
192
self.errorfunc(data.getvalue())
195
self.sockethandler.shutdown()