3
# Written by John Hoffman
4
# see LICENSE.txt for license information
6
from BitTornado import PSYCO
10
assert psyco.__version__ >= 0x010100f0
15
from download_bt1 import BT1Download
16
from RawServer import RawServer, UPnP_ERROR
17
from RateLimiter import RateLimiter
18
from ServerPortHandler import MultiHandler
19
from parsedir import parsedir
20
from natpunch import UPnP_test
21
from random import seed
22
from socket import error as socketerror
23
from threading import Event
24
from sys import argv, exit
26
from clock import clock
27
from __init__ import createPeerID, mapbase64, version
28
from cStringIO import StringIO
29
from traceback import print_exc
40
n = int(n) # n may be None or too large
41
assert n < 5184000 # 60 days
46
return '%d:%02d:%02d' % (h, m, s)
49
def __init__(self, controller, hash, response, config, myid):
50
self.controller = controller
52
self.response = response
55
self.doneflag = Event()
63
self.status_err = ['']
64
self.status_errtime = 0
65
self.status_done = 0.0
67
self.rawserver = controller.handler.newRawServer(hash, self.doneflag)
69
d = BT1Download(self.display, self.finished, self.error,
70
controller.exchandler, self.doneflag, config, response,
71
hash, myid, self.rawserver, controller.listen_port)
75
if not self.d.saveAs(self.saveAs):
78
self._hashcheckfunc = self.d.initFiles()
79
if not self._hashcheckfunc:
82
self.controller.hashchecksched(self.hash)
85
def saveAs(self, name, length, saveas, isdir):
86
return self.controller.saveAs(self.hash, name, saveas, isdir)
88
def hashcheck_start(self, donefunc):
94
self._hashcheckfunc(donefunc)
96
def hashcheck_callback(self):
101
if not self.d.startEngine(ratelimiter = self.controller.ratelimiter):
104
self.d.startRerequester()
105
self.statsfunc = self.d.startStats()
106
self.rawserver.start_listening(self.d.getPortHandler())
110
return self.doneflag.isSet()
115
def shutdown(self, quiet=True):
119
self.rawserver.shutdown()
120
if self.checking or self.working:
123
self.checking = False
126
self.controller.was_stopped(self.hash)
128
self.controller.died(self.hash)
131
def display(self, activity = None, fractionDone = None):
132
# really only used by StorageWrapper now
134
self.status_msg = activity
135
if fractionDone is not None:
136
self.status_done = float(fractionDone)
141
def error(self, msg):
142
if self.doneflag.isSet():
144
self.status_err.append(msg)
145
self.status_errtime = clock()
149
def __init__(self, config, Output):
154
self.torrent_dir = config['torrent_dir']
155
self.torrent_cache = {}
157
self.blocked_files = {}
158
self.scan_period = config['parse_dir_interval']
159
self.stats_period = config['display_interval']
161
self.torrent_list = []
164
self.doneflag = Event()
166
self.hashcheck_queue = []
167
self.hashcheck_current = None
169
self.rawserver = RawServer(self.doneflag, config['timeout_check_interval'],
170
config['timeout'], ipv6_enable = config['ipv6_enabled'],
171
failfunc = self.failed, errorfunc = self.exchandler)
172
upnp_type = UPnP_test(config['upnp_nat_access'])
175
self.listen_port = self.rawserver.find_and_bind(
176
config['minport'], config['maxport'], config['bind'],
177
ipv6_socket_style = config['ipv6_binds_v4'],
178
upnp = upnp_type, randomizer = config['random_port'])
180
except socketerror, e:
181
if upnp_type and e == UPnP_ERROR:
182
self.Output.message('WARNING: COULD NOT FORWARD VIA UPnP')
185
self.failed("Couldn't listen - " + str(e))
188
self.ratelimiter = RateLimiter(self.rawserver.add_task,
189
config['upload_unit_size'])
190
self.ratelimiter.set_upload_rate(config['max_upload_rate'])
192
self.handler = MultiHandler(self.rawserver, self.doneflag)
194
self.rawserver.add_task(self.scan, 0)
195
self.rawserver.add_task(self.stats, 0)
197
self.handler.listen_forever()
199
self.Output.message('shutting down')
200
self.hashcheck_queue = []
201
for hash in self.torrent_list:
202
self.Output.message('dropped "'+self.torrent_cache[hash]['path']+'"')
203
self.downloads[hash].shutdown()
204
self.rawserver.shutdown()
208
print_exc(file = data)
209
Output.exception(data.getvalue())
213
self.rawserver.add_task(self.scan, self.scan_period)
215
r = parsedir(self.torrent_dir, self.torrent_cache,
216
self.file_cache, self.blocked_files,
217
return_metainfo = True, errfunc = self.Output.message)
219
( self.torrent_cache, self.file_cache, self.blocked_files,
222
for hash, data in removed.items():
223
self.Output.message('dropped "'+data['path']+'"')
225
for hash, data in added.items():
226
self.Output.message('added "'+data['path']+'"')
230
self.rawserver.add_task(self.stats, self.stats_period)
232
for hash in self.torrent_list:
233
cache = self.torrent_cache[hash]
234
if self.config['display_path']:
238
size = cache['length']
239
d = self.downloads[hash]
253
status = 'waiting for hash check'
255
status = d.status_msg
256
progress = '%.1f%%' % (d.status_done*100)
258
stats = d.statsfunc()
263
seeds = s.numOldSeeds
267
if s.numSeeds + s.numPeers:
269
if t == 0: # unlikely
274
status = 'connecting to peers'
275
progress = '%.1f%%' % (int(stats['frac']*1000)/10.0)
278
dnrate = stats['down']
284
if d.is_dead() or d.status_errtime+300 > clock():
285
msg = d.status_err[-1]
289
data.append(( name, status, progress, peers, seeds, seedsmsg, dist,
290
uprate, dnrate, upamt, dnamt, size, t, msg ))
291
stop = self.Output.display(data)
295
def remove(self, hash):
296
self.torrent_list.remove(hash)
297
self.downloads[hash].shutdown()
298
del self.downloads[hash]
300
def add(self, hash, data):
305
x = mapbase64[c & 0x3F]+x
307
peer_id = createPeerID(x)
308
d = SingleDownload(self, hash, data['metainfo'], self.config, peer_id)
309
self.torrent_list.append(hash)
310
self.downloads[hash] = d
314
def saveAs(self, hash, name, saveas, isdir):
315
x = self.torrent_cache[hash]
316
style = self.config['saveas_style']
317
if style == 1 or style == 3:
319
saveas = os.path.join(saveas,x['file'][:-1-len(x['type'])])
321
saveas = x['path'][:-1-len(x['type'])]
323
if not os.path.isdir(saveas):
327
raise OSError("couldn't create directory for "+x['path']
330
saveas = os.path.join(saveas, name)
333
saveas = os.path.join(saveas, name)
335
saveas = os.path.join(os.path.split(x['path'])[0], name)
337
if isdir and not os.path.isdir(saveas):
341
raise OSError("couldn't create directory for "+x['path']
346
def hashchecksched(self, hash = None):
348
self.hashcheck_queue.append(hash)
349
if not self.hashcheck_current:
350
self._hashcheck_start()
352
def _hashcheck_start(self):
353
self.hashcheck_current = self.hashcheck_queue.pop(0)
354
self.downloads[self.hashcheck_current].hashcheck_start(self.hashcheck_callback)
356
def hashcheck_callback(self):
357
self.downloads[self.hashcheck_current].hashcheck_callback()
358
if self.hashcheck_queue:
359
self._hashcheck_start()
361
self.hashcheck_current = None
363
def died(self, hash):
364
if self.torrent_cache.has_key(hash):
365
self.Output.message('DIED: "'+self.torrent_cache[hash]['path']+'"')
367
def was_stopped(self, hash):
369
self.hashcheck_queue.remove(hash)
372
if self.hashcheck_current == hash:
373
self.hashcheck_current = None
374
if self.hashcheck_queue:
375
self._hashcheck_start()
378
self.Output.message('FAILURE: '+s)
380
def exchandler(self, s):
381
self.Output.exception(s)