3
# Written by John Hoffman
4
# see LICENSE.txt for license information
6
from BitTornado import PSYCO
10
assert psyco.__version__ >= 0x010100f0
15
from download_bt1 import BT1Download
16
from RawServer import RawServer, UPnP_ERROR
17
from RateLimiter import RateLimiter
18
from ServerPortHandler import MultiHandler
19
from parsedir import parsedir
20
from natpunch import UPnP_test
21
from random import seed
22
from socket import error as socketerror
23
from threading import Event
24
from sys import argv, exit
26
from clock import clock
27
from __init__ import createPeerID, mapbase64, version
28
from cStringIO import StringIO
29
from traceback import print_exc
40
n = int(n) # n may be None or too large
41
assert n < 5184000 # 60 days
46
return '%d:%02d:%02d' % (h, m, s)
49
def __init__(self, controller, hash, response, config, myid):
50
self.controller = controller
52
self.response = response
55
self.doneflag = Event()
63
self.status_err = ['']
64
self.status_errtime = 0
65
self.status_done = 0.0
67
self.rawserver = controller.handler.newRawServer(hash, self.doneflag)
69
d = BT1Download(self.display, self.finished, self.error,
70
controller.exchandler, self.doneflag, config, response,
71
hash, myid, self.rawserver, controller.listen_port)
75
if not self.d.saveAs(self.saveAs):
78
self._hashcheckfunc = self.d.initFiles()
79
if not self._hashcheckfunc:
82
self.controller.hashchecksched(self.hash)
85
def saveAs(self, name, length, saveas, isdir):
86
return self.controller.saveAs(self.hash, name, saveas, isdir)
88
def hashcheck_start(self, donefunc):
94
self._hashcheckfunc(donefunc)
96
def hashcheck_callback(self):
101
if not self.d.startEngine(ratelimiter = self.controller.ratelimiter):
104
self.d.startRerequester()
105
self.statsfunc = self.d.startStats()
106
self.rawserver.start_listening(self.d.getPortHandler())
110
return self.doneflag.isSet()
115
def shutdown(self, quiet=True):
119
self.rawserver.shutdown()
120
if self.checking or self.working:
123
self.checking = False
126
self.controller.was_stopped(self.hash)
128
self.controller.died(self.hash)
131
def display(self, activity = None, fractionDone = None):
132
# really only used by StorageWrapper now
134
self.status_msg = activity
135
if fractionDone is not None:
136
self.status_done = float(fractionDone)
141
def error(self, msg):
142
if self.doneflag.isSet():
144
self.status_err.append(msg)
145
self.status_errtime = clock()
149
def __init__(self, config, Output):
154
self.torrent_dir = config['torrent_dir']
155
self.torrent_cache = {}
157
self.blocked_files = {}
158
self.scan_period = config['parse_dir_interval']
159
self.stats_period = config['display_interval']
161
self.torrent_list = []
164
self.doneflag = Event()
166
self.hashcheck_queue = []
167
self.hashcheck_current = None
169
self.rawserver = RawServer(self.doneflag, config['timeout_check_interval'],
170
config['timeout'], ipv6_enable = config['ipv6_enabled'],
171
failfunc = self.failed, errorfunc = self.exchandler)
172
upnp_type = UPnP_test(config['upnp_nat_access'])
175
self.listen_port = self.rawserver.find_and_bind(
176
config['minport'], config['maxport'], config['bind'],
177
ipv6_socket_style = config['ipv6_binds_v4'],
178
upnp = upnp_type, randomizer = config['random_port'])
180
except socketerror, e:
181
if upnp_type and e == UPnP_ERROR:
182
self.Output.message('WARNING: COULD NOT FORWARD VIA UPnP')
185
self.failed("Couldn't listen - " + str(e))
188
self.ratelimiter = RateLimiter(self.rawserver.add_task,
189
config['upload_unit_size'])
190
self.ratelimiter.set_upload_rate(config['max_upload_rate'])
192
self.handler = MultiHandler(self.rawserver, self.doneflag, config)
194
self.rawserver.add_task(self.scan, 0)
195
self.rawserver.add_task(self.stats, 0)
200
print_exc(file = data)
201
Output.exception(data.getvalue())
205
self.handler.listen_forever()
207
self.hashcheck_queue = []
208
for hash in self.torrent_list:
209
self.Output.message('dropped "'+self.torrent_cache[hash]['path']+'"')
210
self.downloads[hash].shutdown()
215
self.Output.exception(data.getvalue())
217
self.rawserver.shutdown()
220
self.rawserver.add_task(self.scan, self.scan_period)
222
r = parsedir(self.torrent_dir, self.torrent_cache,
223
self.file_cache, self.blocked_files,
224
return_metainfo = True, errfunc = self.Output.message)
226
( self.torrent_cache, self.file_cache, self.blocked_files,
229
for hash, data in removed.items():
230
self.Output.message('dropped "'+data['path']+'"')
232
for hash, data in added.items():
233
self.Output.message('added "'+data['path']+'"')
237
self.rawserver.add_task(self.stats, self.stats_period)
239
for hash in self.torrent_list:
240
cache = self.torrent_cache[hash]
241
if self.config['display_path']:
245
size = cache['length']
246
d = self.downloads[hash]
260
status = 'waiting for hash check'
262
status = d.status_msg
263
progress = '%.1f%%' % (d.status_done*100)
265
stats = d.statsfunc()
270
seeds = s.numOldSeeds
274
if s.numSeeds + s.numPeers:
276
if t == 0: # unlikely
281
status = 'connecting to peers'
282
progress = '%.1f%%' % (int(stats['frac']*1000)/10.0)
285
dnrate = stats['down']
291
if d.is_dead() or d.status_errtime+300 > clock():
292
msg = d.status_err[-1]
296
data.append(( name, status, progress, peers, seeds, seedsmsg, dist,
297
uprate, dnrate, upamt, dnamt, size, t, msg ))
298
stop = self.Output.display(data)
302
def remove(self, hash):
303
self.torrent_list.remove(hash)
304
self.downloads[hash].shutdown()
305
del self.downloads[hash]
307
def add(self, hash, data):
312
x = mapbase64[c & 0x3F]+x
314
peer_id = createPeerID(x)
315
d = SingleDownload(self, hash, data['metainfo'], self.config, peer_id)
316
self.torrent_list.append(hash)
317
self.downloads[hash] = d
322
def saveAs(self, hash, name, saveas, isdir):
323
x = self.torrent_cache[hash]
324
style = self.config['saveas_style']
325
if style == 1 or style == 3:
327
saveas = os.path.join(saveas,x['file'][:-1-len(x['type'])])
329
saveas = x['path'][:-1-len(x['type'])]
331
if not os.path.isdir(saveas):
335
raise OSError("couldn't create directory for "+x['path']
338
saveas = os.path.join(saveas, name)
341
saveas = os.path.join(saveas, name)
343
saveas = os.path.join(os.path.split(x['path'])[0], name)
345
if isdir and not os.path.isdir(saveas):
349
raise OSError("couldn't create directory for "+x['path']
354
def hashchecksched(self, hash = None):
356
self.hashcheck_queue.append(hash)
357
self.hashcheck_queue.sort(lambda x, y: cmp(self.downloads[x].d.datalength, self.downloads[y].d.datalength))
358
if not self.hashcheck_current:
359
self._hashcheck_start()
361
def _hashcheck_start(self):
362
self.hashcheck_current = self.hashcheck_queue.pop(0)
363
self.downloads[self.hashcheck_current].hashcheck_start(self.hashcheck_callback)
365
def hashcheck_callback(self):
366
self.downloads[self.hashcheck_current].hashcheck_callback()
367
if self.hashcheck_queue:
368
self._hashcheck_start()
370
self.hashcheck_current = None
372
def died(self, hash):
373
if self.torrent_cache.has_key(hash):
374
self.Output.message('DIED: "'+self.torrent_cache[hash]['path']+'"')
376
def was_stopped(self, hash):
378
self.hashcheck_queue.remove(hash)
381
if self.hashcheck_current == hash:
382
self.hashcheck_current = None
383
if self.hashcheck_queue:
384
self._hashcheck_start()
387
self.Output.message('FAILURE: '+s)
389
def exchandler(self, s):
390
self.Output.exception(s)