1
# Written by Bram Cohen
2
# see LICENSE.txt for license information
4
from zurllib import urlopen
5
from urlparse import urlparse
6
from BT1.btformats import check_message
7
from BT1.Choker import Choker
8
from BT1.Storage import Storage
9
from BT1.StorageWrapper import StorageWrapper
10
from BT1.FileSelector import FileSelector
11
from BT1.Uploader import Upload
12
from BT1.Downloader import Downloader
13
from BT1.HTTPDownloader import HTTPDownloader
14
from BT1.Connecter import Connecter
15
from RateLimiter import RateLimiter
16
from BT1.Encrypter import Encoder
17
from RawServer import RawServer, autodetect_ipv6, autodetect_socket_style
18
from BT1.Rerequester import Rerequester
19
from BT1.DownloaderFeedback import DownloaderFeedback
20
from RateMeasure import RateMeasure
21
from CurrentRateMeasure import Measure
22
from BT1.PiecePicker import PiecePicker
23
from BT1.Statistics import Statistics
24
from ConfigDir import ConfigDir
25
from bencode import bencode, bdecode
26
from natpunch import UPnP_test
28
from os import path, makedirs, listdir
29
from parseargs import parseargs, formatDefinitions, defaultargs
30
from socket import error as socketerror
31
from random import seed
32
from threading import Thread, Event
33
from clock import clock
34
from BTcrypto import CRYPTO_OK
35
from __init__ import createPeerID
45
"the maximum number of uploads to allow at once."),
46
('keepalive_interval', 120.0,
47
'number of seconds to pause between sending keepalives'),
48
('download_slice_size', 2 ** 14,
49
"How many bytes to query for per request."),
50
('upload_unit_size', 1460,
51
"when limiting upload rate, how many bytes to send at a time"),
52
('request_backlog', 10,
53
"maximum number of requests to keep in a single pipe at once."),
54
('max_message_length', 2 ** 23,
55
"maximum length prefix encoding you'll accept over the wire - larger values get the connection dropped."),
57
"ip to report you have to the tracker."),
58
('minport', 10000, 'minimum port to listen on, counts up if unavailable'),
59
('maxport', 60000, 'maximum port to listen on'),
60
('random_port', 1, 'whether to choose randomly inside the port range ' +
61
'instead of counting up linearly'),
63
'file the server response was stored in, alternative to url'),
65
'url to get file from, alternative to responsefile'),
66
('crypto_allowed', int(CRYPTO_OK),
67
'whether to allow the client to accept encrypted connections'),
69
'whether to only create or allow encrypted connections'),
71
'whether to prevent all non-encrypted connection attempts; ' +
72
'will result in an effectively firewalled state on older trackers'),
73
('selector_enabled', 1,
74
'whether to enable the file selector and fast resume function'),
75
('expire_cache_data', 10,
76
'the number of days after which you wish to expire old cache data ' +
79
'a list of file priorities separated by commas, must be one per file, ' +
80
'0 = highest, 1 = normal, 2 = lowest, -1 = download disabled'),
82
'local file name to save the file as, null indicates query user'),
84
'time to wait between closing sockets which nothing has been received on'),
85
('timeout_check_interval', 60.0,
86
'time to wait between checking if any connections have timed out'),
87
('max_slice_length', 2 ** 17,
88
"maximum length slice to send to peers, larger requests are ignored"),
89
('max_rate_period', 20.0,
90
"maximum amount of time to guess the current rate estimate represents"),
92
'comma-separated list of ips/hostnames to bind to locally'),
93
# ('ipv6_enabled', autodetect_ipv6(),
95
'allow the client to connect to peers via IPv6'),
96
('ipv6_binds_v4', autodetect_socket_style(),
97
"set if an IPv6 server socket won't also field IPv4 connections"),
98
('upload_rate_fudge', 5.0,
99
'time equivalent of writing to kernel-level TCP buffer, for rate adjustment'),
100
('tcp_ack_fudge', 0.03,
101
'how much TCP ACK download overhead to add to upload rate calculations ' +
103
('display_interval', .5,
104
'time between updates of displayed information'),
105
('rerequest_interval', 5 * 60,
106
'time to wait between requesting more peers'),
108
'minimum number of peers to not do rerequesting'),
110
'number of seconds to wait before assuming that an http connection has timed out'),
112
'number of peers at which to stop initiating new connections'),
114
'whether to check hashes on disk'),
115
('max_upload_rate', 0,
116
'maximum kB/s to upload at (0 = no limit, -1 = automatic)'),
117
('max_download_rate', 0,
118
'maximum kB/s to download at (0 = no limit)'),
119
('alloc_type', 'normal',
120
'allocation type (may be normal, background, pre-allocate or sparse)'),
122
'rate (in MiB/s) to allocate space at using background allocation'),
124
'whether to buffer disk reads'),
125
('write_buffer_size', 4,
126
'the maximum amount of space to use for buffering disk writes ' +
127
'(in megabytes, 0 = disabled)'),
128
('breakup_seed_bitfield', 1,
129
'sends an incomplete bitfield and then fills with have messages, '
130
'in order to get around stupid ISP manipulation'),
132
"seconds to wait for data to come in over a connection before assuming it's semi-permanently choked"),
134
"whether to display diagnostic info to stdout"),
135
('rarest_first_cutoff', 2,
136
"number of downloads at which to switch from random to rarest first"),
137
('rarest_first_priority_cutoff', 5,
138
'the number of peers which need to have a piece before other partials take priority over rarest first'),
140
"the number of uploads to fill out to with extra optimistic unchokes"),
141
('max_files_open', 50,
142
'the maximum number of files to keep open at a time, 0 means no limit'),
143
('round_robin_period', 30,
144
"the number of seconds between the client's switching upload targets"),
146
"whether to use special upload-efficiency-maximizing routines (only for dedicated seeds)"),
148
"whether to enable extra security features intended to prevent abuse"),
149
('max_connections', 0,
150
"the absolute maximum number of peers to connect with (0 = no limit)"),
152
"whether to allow the client to automatically kick/ban peers that send bad data"),
154
"whether to double-check data being written to the disk for errors (may increase CPU load)"),
156
"whether to thoroughly check data being written to the disk (may slow disk access)"),
158
"whether to lock files the client is working with"),
159
('lock_while_reading', 0,
160
"whether to lock access to files being read"),
162
"minutes between automatic flushes to disk (0 = disabled)"),
163
('dedicated_seed_id', '',
164
"code to send to tracker identifying as a dedicated seed"),
167
argslistheader = 'Arguments are:\n\n'
173
# old-style downloader
174
def download(params, filefunc, statusfunc, finfunc, errorfunc, doneflag, cols,
175
pathFunc = None, presets = {}, exchandler = None,
176
failed = _failfunc, paramfunc = None):
179
config = parse_params(params, presets)
180
except ValueError, e:
181
failed('error: ' + str(e) + '\nrun with no args for parameter explanations')
184
errorfunc(get_usage())
187
myid = createPeerID()
190
rawserver = RawServer(doneflag, config['timeout_check_interval'],
191
config['timeout'], ipv6_enable = config['ipv6_enabled'],
192
failfunc = failed, errorfunc = exchandler)
196
listen_port = rawserver.find_and_bind(config['minport'], config['maxport'],
197
config['bind'], ipv6_socket_style = config['ipv6_binds_v4'],
198
upnp = upnp_type, randomizer = config['random_port'])
199
except socketerror, e:
200
failed("Couldn't listen - " + str(e))
203
response = get_response(config['responsefile'], config['url'], failed)
207
infohash = sha(bencode(response['info'])).digest()
209
d = BT1Download(statusfunc, finfunc, errorfunc, exchandler, doneflag,
210
config, response, infohash, myid, rawserver, listen_port)
212
if not d.saveAs(filefunc):
216
pathFunc(d.getFilename())
218
hashcheck = d.initFiles(old_style = True)
223
if not d.startEngine():
228
statusfunc(activity = 'connecting to peers')
231
paramfunc({ 'max_upload_rate' : d.setUploadRate, # change_max_upload_rate(<int KiB/sec>)
232
'max_uploads': d.setConns, # change_max_uploads(<int max uploads>)
233
'listen_port' : listen_port, # int
234
'peer_id' : myid, # string
235
'info_hash' : infohash, # string
236
'start_connection' : d._startConnection, # start_connection((<string ip>, <int port>), <peer id>)
239
rawserver.listen_forever(d.getPortHandler())
244
def parse_params(params, presets = {}):
247
config, args = parseargs(params, defaults, 0, 1, presets = presets)
249
if config['responsefile'] or config['url']:
250
raise ValueError,'must have responsefile or url as arg or parameter, not both'
251
if path.isfile(args[0]):
252
config['responsefile'] = args[0]
257
raise ValueError, 'bad filename or url'
258
config['url'] = args[0]
259
elif (config['responsefile'] == '') == (config['url'] == ''):
260
raise ValueError, 'need responsefile or url, must have one, cannot have both'
264
def get_usage(defaults = defaults, cols = 100, presets = {}):
265
return (argslistheader + formatDefinitions(defaults, cols, presets))
268
def get_response(file, url, errorfunc):
273
line = h.read(10) # quick test to see if responsefile contains a dict
274
front,garbage = line.split(':',1)
275
assert front[0] == 'd'
278
errorfunc(file+' is not a valid responsefile')
292
errorfunc(url+' bad url')
297
errorfunc('problem getting response info - ' + str(e))
305
response = bdecode(response)
307
errorfunc("warning: bad data in responsefile")
308
response = bdecode(response, sloppy=1)
309
check_message(response)
310
except ValueError, e:
311
errorfunc("got bad file info - " + str(e))
318
def __init__(self, statusfunc, finfunc, errorfunc, excfunc, doneflag,
319
config, response, infohash, id, rawserver, port,
321
self.statusfunc = statusfunc
322
self.finfunc = finfunc
323
self.errorfunc = errorfunc
324
self.excfunc = excfunc
325
self.doneflag = doneflag
327
self.response = response
328
self.infohash = infohash
330
self.rawserver = rawserver
333
self.info = self.response['info']
334
self.pieces = [self.info['pieces'][x:x+20]
335
for x in xrange(0, len(self.info['pieces']), 20)]
336
self.len_pieces = len(self.pieces)
337
self.argslistheader = argslistheader
338
self.unpauseflag = Event()
339
self.unpauseflag.set()
340
self.downloader = None
341
self.storagewrapper = None
342
self.fileselector = None
343
self.super_seeding_active = False
344
self.filedatflag = Event()
345
self.spewflag = Event()
346
self.superseedflag = Event()
347
self.whenpaused = None
348
self.finflag = Event()
349
self.rerequest = None
350
self.tcp_ack_fudge = config['tcp_ack_fudge']
352
self.selector_enabled = config['selector_enabled']
354
self.appdataobj = appdataobj
355
elif self.selector_enabled:
356
self.appdataobj = ConfigDir()
357
self.appdataobj.deleteOldCacheData( config['expire_cache_data'],
360
self.excflag = self.rawserver.get_exception_flag()
362
self.checking = False
365
self.picker = PiecePicker(self.len_pieces, config['rarest_first_cutoff'],
366
config['rarest_first_priority_cutoff'])
367
self.choker = Choker(config, rawserver.add_task,
368
self.picker, self.finflag.isSet)
371
def checkSaveLocation(self, loc):
372
if self.info.has_key('length'):
373
return path.exists(loc)
374
for x in self.info['files']:
375
if path.exists(path.join(loc, x['path'][0])):
380
def saveAs(self, filefunc, pathfunc = None):
382
def make(f, forcedir = False):
385
if f != '' and not path.exists(f):
388
if self.info.has_key('length'):
389
file_length = self.info['length']
390
file = filefunc(self.info['name'], file_length,
391
self.config['saveas'], False)
395
files = [(file, file_length)]
398
for x in self.info['files']:
399
file_length += x['length']
400
file = filefunc(self.info['name'], file_length,
401
self.config['saveas'], True)
405
# if this path exists, and no files from the info dict exist, we assume it's a new download and
406
# the user wants to create a new directory with the default name
408
if path.exists(file):
409
if not path.isdir(file):
410
self.errorfunc(file + 'is not a dir')
412
if len(listdir(file)) > 0: # if it's not empty
413
for x in self.info['files']:
414
if path.exists(path.join(file, x['path'][0])):
417
file = path.join(file, self.info['name'])
418
if path.exists(file) and not path.isdir(file):
419
if file[-8:] == '.torrent':
421
if path.exists(file) and not path.isdir(file):
422
self.errorfunc("Can't create dir - " + self.info['name'])
426
# alert the UI to any possible change in path
431
for x in self.info['files']:
435
files.append((n, x['length']))
438
self.errorfunc("Couldn't allocate dir - " + str(e))
443
self.datalength = file_length
448
def getFilename(self):
455
self.storage.set_readonly()
456
except (IOError, OSError), e:
457
self.errorfunc('trouble setting readonly at end - ' + str(e))
458
if self.superseedflag.isSet():
459
self._set_super_seed()
460
self.choker.set_round_robin_period(
461
max( self.config['round_robin_period'],
462
self.config['round_robin_period'] *
463
self.info['piece length'] / 200000 ) )
464
self.rerequest_complete()
467
def _data_flunked(self, amount, index):
468
self.ratemeasure_datarejected(amount)
469
if not self.doneflag.isSet():
470
self.errorfunc('piece %d failed hash check, re-downloading it' % index)
472
def _failed(self, reason):
475
if reason is not None:
476
self.errorfunc(reason)
479
def initFiles(self, old_style = False, statusfunc = None):
480
if self.doneflag.isSet():
483
statusfunc = self.statusfunc
485
disabled_files = None
486
if self.selector_enabled:
487
self.priority = self.config['priority']
490
self.priority = self.priority.split(',')
491
assert len(self.priority) == len(self.files)
492
self.priority = [int(p) for p in self.priority]
493
for p in self.priority:
497
self.errorfunc('bad priority list given, ignored')
500
data = self.appdataobj.getTorrentData(self.infohash)
502
d = data['resume data']['priority']
503
assert len(d) == len(self.files)
504
disabled_files = [x == -1 for x in d]
507
disabled_files = [x == -1 for x in self.priority]
513
self.storage = Storage(self.files, self.info['piece length'],
514
self.doneflag, self.config, disabled_files)
516
self.errorfunc('trouble accessing files - ' + str(e))
518
if self.doneflag.isSet():
521
self.storagewrapper = StorageWrapper(self.storage, self.config['download_slice_size'],
522
self.pieces, self.info['piece length'], self._finished, self._failed,
523
statusfunc, self.doneflag, self.config['check_hashes'],
524
self._data_flunked, self.rawserver.add_task,
525
self.config, self.unpauseflag)
527
except ValueError, e:
528
self._failed('bad data - ' + str(e))
530
self._failed('IOError - ' + str(e))
531
if self.doneflag.isSet():
534
if self.selector_enabled:
535
self.fileselector = FileSelector(self.files, self.info['piece length'],
536
self.appdataobj.getPieceDir(self.infohash),
537
self.storage, self.storagewrapper,
538
self.rawserver.add_task,
541
data = data.get('resume data')
543
self.fileselector.unpickle(data)
547
return self.storagewrapper.old_style_init()
548
return self.storagewrapper.initialize
551
def getCachedTorrentData(self):
552
return self.appdataobj.getTorrentData(self.infohash)
555
def _make_upload(self, connection, ratelimiter, totalup):
556
return Upload(connection, ratelimiter, totalup,
557
self.choker, self.storagewrapper, self.picker,
560
def _kick_peer(self, connection):
561
def k(connection = connection):
563
self.rawserver.add_task(k,0)
565
def _ban_peer(self, ip):
568
def _received_raw_data(self, x):
569
if self.tcp_ack_fudge:
570
x = int(x*self.tcp_ack_fudge)
571
self.ratelimiter.adjust_sent(x)
573
def _received_data(self, x):
574
self.downmeasure.update_rate(x)
575
self.ratemeasure.data_came_in(x)
577
def _received_http_data(self, x):
578
self.downmeasure.update_rate(x)
579
self.ratemeasure.data_came_in(x)
580
self.downloader.external_data_received(x)
582
def _cancelfunc(self, pieces):
583
self.downloader.cancel_piece_download(pieces)
584
self.httpdownloader.cancel_piece_download(pieces)
585
def _reqmorefunc(self, pieces):
586
self.downloader.requeue_piece_download(pieces)
588
def startEngine(self, ratelimiter = None, statusfunc = None):
589
if self.doneflag.isSet():
592
statusfunc = self.statusfunc
594
self.checking = False
597
if self.config['crypto_allowed']:
598
self.errorfunc('warning - crypto library not installed')
599
self.config['crypto_allowed'] = 0
600
self.config['crypto_only'] = 0
601
self.config['crypto_stealth'] = 0
603
for i in xrange(self.len_pieces):
604
if self.storagewrapper.do_I_have(i):
605
self.picker.complete(i)
606
self.upmeasure = Measure(self.config['max_rate_period'],
607
self.config['upload_rate_fudge'])
608
self.downmeasure = Measure(self.config['max_rate_period'])
611
self.ratelimiter = ratelimiter
613
self.ratelimiter = RateLimiter(self.rawserver.add_task,
614
self.config['upload_unit_size'],
616
self.ratelimiter.set_upload_rate(self.config['max_upload_rate'])
618
self.ratemeasure = RateMeasure()
619
self.ratemeasure_datarejected = self.ratemeasure.data_rejected
621
self.downloader = Downloader(self.storagewrapper, self.picker,
622
self.config['request_backlog'], self.config['max_rate_period'],
623
self.len_pieces, self.config['download_slice_size'],
624
self._received_data, self.config['snub_time'], self.config['auto_kick'],
625
self._kick_peer, self._ban_peer)
626
self.downloader.set_download_rate(self.config['max_download_rate'])
627
self.connecter = Connecter(self._make_upload, self.downloader, self.choker,
628
self.len_pieces, self.upmeasure, self.config,
629
self.ratelimiter, self.rawserver.add_task)
630
self.encoder = Encoder(self.connecter, self.rawserver,
631
self.myid, self.config['max_message_length'], self.rawserver.add_task,
632
self.config['keepalive_interval'], self.infohash,
633
self._received_raw_data, self.config)
634
self.encoder_ban = self.encoder.ban
636
self.httpdownloader = HTTPDownloader(self.storagewrapper, self.picker,
637
self.rawserver, self.finflag, self.errorfunc, self.downloader,
638
self.config['max_rate_period'], self.infohash, self._received_http_data,
639
self.connecter.got_piece)
640
if self.response.has_key('httpseeds') and not self.finflag.isSet():
641
for u in self.response['httpseeds']:
642
self.httpdownloader.make_download(u)
644
if self.selector_enabled:
645
self.fileselector.tie_in(self.picker, self._cancelfunc,
646
self._reqmorefunc, self.rerequest_ondownloadmore)
648
self.fileselector.set_priorities_now(self.priority)
649
self.appdataobj.deleteTorrentData(self.infohash)
650
# erase old data once you've started modifying it
652
if self.config['super_seeder']:
653
self.set_super_seed()
659
def rerequest_complete(self):
661
self.rerequest.announce(1)
663
def rerequest_stopped(self):
665
self.rerequest.announce(2)
667
def rerequest_lastfailed(self):
669
return self.rerequest.last_failed
672
def rerequest_ondownloadmore(self):
676
def startRerequester(self, seededfunc = None, force_rapid_update = False):
677
if self.response.has_key('announce-list'):
678
trackerlist = self.response['announce-list']
680
trackerlist = [[self.response['announce']]]
682
self.rerequest = Rerequester(self.port, self.myid, self.infohash,
683
trackerlist, self.config,
684
self.rawserver.add_task, self.rawserver.add_task,
685
self.errorfunc, self.excfunc,
686
self.encoder.start_connections,
687
self.connecter.how_many_connections,
688
self.storagewrapper.get_amount_left,
689
self.upmeasure.get_total, self.downmeasure.get_total,
690
self.upmeasure.get_rate, self.downmeasure.get_rate,
691
self.doneflag, self.unpauseflag, seededfunc, force_rapid_update )
693
self.rerequest.start()
696
def _init_stats(self):
697
self.statistics = Statistics(self.upmeasure, self.downmeasure,
698
self.connecter, self.httpdownloader, self.ratelimiter,
699
self.rerequest_lastfailed, self.filedatflag)
700
if self.info.has_key('files'):
701
self.statistics.set_dirstats(self.files, self.info['piece length'])
702
if self.config['spew']:
705
def autoStats(self, displayfunc = None):
707
displayfunc = self.statusfunc
710
DownloaderFeedback(self.choker, self.httpdownloader, self.rawserver.add_task,
711
self.upmeasure.get_rate, self.downmeasure.get_rate,
712
self.ratemeasure, self.storagewrapper.get_stats,
713
self.datalength, self.finflag, self.spewflag, self.statistics,
714
displayfunc, self.config['display_interval'])
716
def startStats(self):
718
d = DownloaderFeedback(self.choker, self.httpdownloader, self.rawserver.add_task,
719
self.upmeasure.get_rate, self.downmeasure.get_rate,
720
self.ratemeasure, self.storagewrapper.get_stats,
721
self.datalength, self.finflag, self.spewflag, self.statistics)
725
def getPortHandler(self):
729
def shutdown(self, torrentdata = {}):
730
if self.checking or self.started:
731
self.storagewrapper.sync()
733
self.rerequest_stopped()
734
if self.fileselector and self.started:
736
self.fileselector.finish()
737
torrentdata['resume data'] = self.fileselector.pickle()
739
self.appdataobj.writeTorrentData(self.infohash,torrentdata)
741
self.appdataobj.deleteTorrentData(self.infohash) # clear it
742
return not self.failed and not self.excflag.isSet()
743
# if returns false, you may wish to auto-restart the torrent
746
def setUploadRate(self, rate):
748
def s(self = self, rate = rate):
749
self.config['max_upload_rate'] = rate
750
self.ratelimiter.set_upload_rate(rate)
751
self.rawserver.add_task(s)
752
except AttributeError:
755
def setConns(self, conns, conns2 = None):
759
def s(self = self, conns = conns, conns2 = conns2):
760
self.config['min_uploads'] = conns
761
self.config['max_uploads'] = conns2
763
self.config['max_initiate'] = conns + 10
764
self.rawserver.add_task(s)
765
except AttributeError:
768
def setDownloadRate(self, rate):
770
def s(self = self, rate = rate):
771
self.config['max_download_rate'] = rate
772
self.downloader.set_download_rate(rate)
773
self.rawserver.add_task(s)
774
except AttributeError:
777
def startConnection(self, ip, port, id):
778
self.encoder._start_connection((ip, port), id)
780
def _startConnection(self, ipandport, id):
781
self.encoder._start_connection(ipandport, id)
783
def setInitiate(self, initiate):
785
def s(self = self, initiate = initiate):
786
self.config['max_initiate'] = initiate
787
self.rawserver.add_task(s)
788
except AttributeError:
794
def getDefaults(self):
795
return defaultargs(defaults)
797
def getUsageText(self):
798
return self.argslistheader
800
def reannounce(self, special = None):
802
def r(self = self, special = special):
804
self.rerequest.announce()
806
self.rerequest.announce(specialurl = special)
807
self.rawserver.add_task(r)
808
except AttributeError:
811
def getResponse(self):
818
if not self.storagewrapper:
820
self.unpauseflag.clear()
821
self.rawserver.add_task(self.onPause)
825
self.whenpaused = clock()
826
if not self.downloader:
828
self.downloader.pause(True)
829
self.encoder.pause(True)
830
self.choker.pause(True)
833
self.unpauseflag.set()
834
self.rawserver.add_task(self.onUnpause)
837
if not self.downloader:
839
self.downloader.pause(False)
840
self.encoder.pause(False)
841
self.choker.pause(False)
842
if self.rerequest and self.whenpaused and clock()-self.whenpaused > 60:
843
self.rerequest.announce(3) # rerequest automatically if paused for >60 seconds
845
def set_super_seed(self):
846
self.superseedflag.set()
847
self.rawserver.add_task(self._set_super_seed)
849
def _set_super_seed(self):
850
if not self.super_seeding_active and self.finflag.isSet():
851
self.super_seeding_active = True
852
self.errorfunc(' ** SUPER-SEED OPERATION ACTIVE **\n' +
853
' please set Max uploads so each peer gets 6-8 kB/s')
855
self.downloader.set_super_seed()
856
self.choker.set_super_seed()
857
self.rawserver.add_task(s)
858
if self.finflag.isSet(): # mode started when already finished
860
self.rerequest.announce(3) # so after kicking everyone off, reannounce
861
self.rawserver.add_task(r)
863
def am_I_finished(self):
864
return self.finflag.isSet()
866
def get_transfer_stats(self):
867
return self.upmeasure.get_total(), self.downmeasure.get_total()