1
# Written by Bram Cohen
2
# see LICENSE.txt for license information
4
from cStringIO import StringIO
5
from binascii import b2a_hex
6
from socket import error as socketerror
7
from urllib import quote
8
from traceback import print_exc
9
from BitTornado.BTcrypto import Crypto
16
bool = lambda x: not not x
22
protocol_name = 'BitTorrent protocol'
23
option_pattern = chr(0)*8
26
return long(b2a_hex(s), 16)
29
return chr((i >> 8) & 0xFF) + chr(i & 0xFF)
31
hexchars = '0123456789ABCDEF'
34
hexmap.append(hexchars[(i&0xF0)/16]+hexchars[i&0x0F])
39
r.append(hexmap[ord(c)])
45
if quote(s).find('%') >= 0:
50
class IncompleteCounter:
58
return self.c >= MAX_INCOMPLETE
60
incompletecounter = IncompleteCounter()
63
# header, options, download id, my id, [length, message]
66
def __init__(self, Encoder, connection, id,
67
ext_handshake=False, encrypted = None, options = None):
68
self.Encoder = Encoder
69
self.connection = connection
70
self.connecter = Encoder.connecter
72
self.locally_initiated = (id != None)
73
self.readable_id = make_readable(id)
75
self.keepalive = lambda: None
80
self.read = self._read
81
self.write = self._write
84
if self.locally_initiated:
85
incompletecounter.increment()
88
self.encrypter = Crypto(True)
89
self.write(self.encrypter.pubkey+self.encrypter.padding())
91
self.encrypted = False
92
self.write(chr(len(protocol_name)) + protocol_name +
93
option_pattern + self.Encoder.download_id )
94
self.next_len, self.next_func = 1+len(protocol_name), self.read_header
96
self.Encoder.connecter.external_connection_made += 1
97
if encrypted: # passed an already running encrypter
98
self.encrypter = encrypted
101
self.next_len, self.next_func = 14, self.read_crypto_block3c
103
self.encrypted = False
104
self.options = options
105
self.write(self.Encoder.my_id)
106
self.next_len, self.next_func = 20, self.read_peer_id
108
self.encrypted = None # don't know yet
109
self.next_len, self.next_func = 1+len(protocol_name), self.read_header
110
self.Encoder.raw_server.add_task(self._auto_close, 30)
113
def _log_start(self): # only called with DEBUG = True
114
self.log = open('peerlog.'+self.get_ip()+'.txt','a')
115
self.log.write('connected - ')
116
if self.locally_initiated:
117
self.log.write('outgoing\n')
119
self.log.write('incoming\n')
120
self._logwritefunc = self.write
121
self.write = self._log_write
123
def _log_write(self, s):
124
self.log.write('w:'+b2a_hex(s)+'\n')
125
self._logwritefunc(s)
128
def get_ip(self, real=False):
129
return self.connection.get_ip(real)
134
def get_readable_id(self):
135
return self.readable_id
137
def is_locally_initiated(self):
138
return self.locally_initiated
140
def is_encrypted(self):
141
return bool(self.encrypted)
143
def is_flushed(self):
144
return self.connection.is_flushed()
146
def _read_header(self, s):
147
if s == chr(len(protocol_name))+protocol_name:
148
return 8, self.read_options
151
def read_header(self, s):
152
if self._read_header(s):
153
if self.encrypted or self.Encoder.config['crypto_stealth']:
155
return 8, self.read_options
156
if self.locally_initiated and not self.encrypted:
158
elif not self.Encoder.config['crypto_allowed']:
160
if not self.encrypted:
161
self.encrypted = True
162
self.encrypter = Crypto(self.locally_initiated)
163
self._write_buffer(s)
164
return self.encrypter.keylength, self.read_crypto_header
166
################## ENCRYPTION SUPPORT ######################
168
def _start_crypto(self):
169
self.encrypter.setrawaccess(self._read,self._write)
170
self.write = self.encrypter.write
171
self.read = self.encrypter.read
173
self.buffer = self.encrypter.decrypt(self.buffer)
175
def _end_crypto(self):
176
self.read = self._read
177
self.write = self._write
178
self.encrypter = None
180
def read_crypto_header(self, s):
181
self.encrypter.received_key(s)
182
self.encrypter.set_skey(self.Encoder.download_id)
183
if self.locally_initiated:
184
if self.Encoder.config['crypto_only']:
185
cryptmode = '\x00\x00\x00\x02' # full stream encryption
187
cryptmode = '\x00\x00\x00\x03' # header or full stream
188
padc = self.encrypter.padding()
189
self.write( self.encrypter.block3a
190
+ self.encrypter.block3b
191
+ self.encrypter.encrypt(
193
+ cryptmode # acceptable crypto modes
194
+ tobinary16(len(padc))
196
+ '\x00\x00' ) ) # no initial payload data
197
self._max_search = 520
198
return 1, self.read_crypto_block4a
199
self.write(self.encrypter.pubkey+self.encrypter.padding())
200
self._max_search = 520
201
return 0, self.read_crypto_block3a
203
def _search_for_pattern(self, s, pat):
206
if len(s) >= len(pat):
207
self._max_search -= len(s)+1-len(pat)
208
if self._max_search < 0:
211
self._write_buffer(s[1-len(pat):])
213
self._write_buffer(s[p+len(pat):])
216
### INCOMING CONNECTION ###
218
def read_crypto_block3a(self, s):
219
if not self._search_for_pattern(s,self.encrypter.block3a):
220
return -1, self.read_crypto_block3a # wait for more data
221
return len(self.encrypter.block3b), self.read_crypto_block3b
223
def read_crypto_block3b(self, s):
224
if s != self.encrypter.block3b:
226
self.Encoder.connecter.external_connection_made += 1
228
return 14, self.read_crypto_block3c
230
def read_crypto_block3c(self, s):
231
if s[:8] != ('\x00'*8): # check VC
233
self.cryptmode = toint(s[8:12]) % 4
234
if self.cryptmode == 0:
235
return None # no encryption selected
236
if ( self.cryptmode == 1 # only header encryption
237
and self.Encoder.config['crypto_only'] ):
239
padlen = (ord(s[12])<<8)+ord(s[13])
242
return padlen+2, self.read_crypto_pad3
244
def read_crypto_pad3(self, s):
246
ialen = (ord(s[0])<<8)+ord(s[1])
249
if self.cryptmode == 1:
250
cryptmode = '\x00\x00\x00\x01' # header only encryption
252
cryptmode = '\x00\x00\x00\x02' # full stream encryption
253
padd = self.encrypter.padding()
254
self.write( ('\x00'*8) # VC
255
+ cryptmode # encryption mode
256
+ tobinary16(len(padd))
259
return ialen, self.read_crypto_ia
260
return self.read_crypto_block3done()
262
def read_crypto_ia(self, s):
265
self.log.write('r:'+b2a_hex(s)+'(ia)\n')
267
self.log.write('r:'+b2a_hex(self.buffer)+'(buffer)\n')
268
return self.read_crypto_block3done(s)
270
def read_crypto_block3done(self, ia=''):
274
if self.cryptmode == 1: # only handshake encryption
275
assert not self.buffer # oops; check for exceptions to this
278
self._write_buffer(ia)
279
return 1+len(protocol_name), self.read_encrypted_header
281
### OUTGOING CONNECTION ###
283
def read_crypto_block4a(self, s):
284
if not self._search_for_pattern(s,self.encrypter.VC_pattern()):
285
return -1, self.read_crypto_block4a # wait for more data
287
return 6, self.read_crypto_block4b
289
def read_crypto_block4b(self, s):
290
self.cryptmode = toint(s[:4]) % 4
291
if self.cryptmode == 1: # only header encryption
292
if self.Encoder.config['crypto_only']:
294
elif self.cryptmode != 2:
295
return None # unknown encryption
296
padlen = (ord(s[4])<<8)+ord(s[5])
300
return padlen, self.read_crypto_pad4
301
return self.read_crypto_block4done()
303
def read_crypto_pad4(self, s):
305
return self.read_crypto_block4done()
307
def read_crypto_block4done(self):
310
if self.cryptmode == 1: # only handshake encryption
311
if not self.buffer: # oops; check for exceptions to this
314
self.write(chr(len(protocol_name)) + protocol_name +
315
option_pattern + self.Encoder.download_id)
316
return 1+len(protocol_name), self.read_encrypted_header
318
### START PROTOCOL OVER ENCRYPTED CONNECTION ###
320
def read_encrypted_header(self, s):
321
return self._read_header(s)
323
################################################
325
def read_options(self, s):
327
return 20, self.read_download_id
329
def read_download_id(self, s):
330
if ( s != self.Encoder.download_id
331
or not self.Encoder.check_ip(ip=self.get_ip()) ):
333
if not self.locally_initiated:
334
if not self.encrypted:
335
self.Encoder.connecter.external_connection_made += 1
336
self.write(chr(len(protocol_name)) + protocol_name +
337
option_pattern + self.Encoder.download_id + self.Encoder.my_id)
338
return 20, self.read_peer_id
340
def read_peer_id(self, s):
341
if not self.encrypted and self.Encoder.config['crypto_only']:
342
return None # allows older trackers to ping,
343
# but won't proceed w/ connections
346
self.readable_id = make_readable(s)
350
self.complete = self.Encoder.got_id(self)
351
if not self.complete:
353
if self.locally_initiated:
354
self.write(self.Encoder.my_id)
355
incompletecounter.decrement()
356
self._switch_to_read2()
357
c = self.Encoder.connecter.connection_made(self)
358
self.keepalive = c.send_keepalive
359
return 4, self.read_len
361
def read_len(self, s):
363
if l > self.Encoder.max_len:
365
return l, self.read_message
367
def read_message(self, s):
369
self.connecter.got_message(self, s)
370
return 4, self.read_len
372
def read_dead(self, s):
375
def _auto_close(self):
376
if not self.complete:
381
self.connection.close()
386
self.log.write('closed\n')
389
del self.Encoder.connections[self.connection]
391
self.connecter.connection_lost(self)
392
elif self.locally_initiated:
393
incompletecounter.decrement()
395
def send_message_raw(self, message):
398
def _write(self, message):
400
self.connection.write(message)
402
def data_came_in(self, connection, s):
405
def _write_buffer(self, s):
406
self.buffer = s+self.buffer
410
self.log.write('r:'+b2a_hex(s)+'\n')
411
self.Encoder.measurefunc(len(s))
416
# self.next_len = # of characters function expects
417
# or 0 = all characters in the buffer
418
# or -1 = wait for next read, then all characters in the buffer
419
# not compatible w/ keepalives, switch out after all negotiation complete
420
if self.next_len <= 0:
423
elif len(self.buffer) >= self.next_len:
424
m = self.buffer[:self.next_len]
425
self.buffer = self.buffer[self.next_len:]
429
x = self.next_func(m)
431
self.next_len, self.next_func = 1, self.read_dead
436
self.next_len, self.next_func = x
437
if self.next_len < 0: # already checked buffer
438
return # wait for additional data
439
if self.bufferlen is not None:
443
def _switch_to_read2(self):
444
self._write_buffer = None
446
self.encrypter.setrawaccess(self._read2,self._write)
448
self.read = self._read2
449
self.bufferlen = len(self.buffer)
450
self.buffer = [self.buffer]
452
def _read2(self, s): # more efficient, requires buffer['',''] & bufferlen
454
self.log.write('r:'+b2a_hex(s)+'\n')
455
self.Encoder.measurefunc(len(s))
459
p = self.next_len-self.bufferlen
460
if self.next_len == 0:
464
self.buffer.append(s)
465
self.bufferlen += len(s)
467
self.bufferlen = len(s)-p
468
self.buffer.append(s[:p])
469
m = ''.join(self.buffer)
476
# assert len(self.buffer) == 1
478
self.bufferlen = len(s)-self.next_len
479
m = s[:self.next_len]
483
self.buffer = [s[self.next_len:]]
488
x = self.next_func(m)
490
self.next_len, self.next_func = 1, self.read_dead
495
self.next_len, self.next_func = x
496
if self.next_len < 0: # already checked buffer
497
return # wait for additional data
500
def connection_flushed(self, connection):
502
self.connecter.connection_flushed(self)
504
def connection_lost(self, connection):
505
if self.Encoder.connections.has_key(connection):
509
class _dummy_banlist:
510
def includes(self, x):
514
def __init__(self, connecter, raw_server, my_id, max_len,
515
schedulefunc, keepalive_delay, download_id,
516
measurefunc, config, bans=_dummy_banlist() ):
517
self.raw_server = raw_server
518
self.connecter = connecter
520
self.max_len = max_len
521
self.schedulefunc = schedulefunc
522
self.keepalive_delay = keepalive_delay
523
self.download_id = download_id
524
self.measurefunc = measurefunc
526
self.connections = {}
528
self.external_bans = bans
531
if self.config['max_connections'] == 0:
532
self.max_connections = 2 ** 30
534
self.max_connections = self.config['max_connections']
535
schedulefunc(self.send_keepalives, keepalive_delay)
537
def send_keepalives(self):
538
self.schedulefunc(self.send_keepalives, self.keepalive_delay)
541
for c in self.connections.values():
544
def start_connections(self, list):
545
if not self.to_connect:
546
self.raw_server.add_task(self._start_connection_from_queue)
547
self.to_connect = list
549
def _start_connection_from_queue(self):
550
if self.connecter.external_connection_made:
551
max_initiate = self.config['max_initiate']
553
max_initiate = int(self.config['max_initiate']*1.5)
554
cons = len(self.connections)
555
if cons >= self.max_connections or cons >= max_initiate:
557
elif self.paused or incompletecounter.toomany():
561
dns, id, encrypted = self.to_connect.pop(0)
562
self.start_connection(dns, id, encrypted)
564
self.raw_server.add_task(self._start_connection_from_queue, delay)
566
def start_connection(self, dns, id, encrypted = None):
568
or len(self.connections) >= self.max_connections
570
or not self.check_ip(ip=dns[0]) ):
572
if self.config['crypto_only']:
573
if encrypted is None or encrypted: # fails on encrypted = 0
577
for v in self.connections.values():
580
if id and v.id == id:
583
if self.config['security'] and ip != 'unknown' and ip == dns[0]:
586
c = self.raw_server.start_connection(dns)
587
con = Connection(self, c, id, encrypted = encrypted)
588
self.connections[c] = con
594
def _start_connection(self, dns, id, encrypted = None):
595
def foo(self=self, dns=dns, id=id, encrypted=encrypted):
596
self.start_connection(dns, id, encrypted)
597
self.schedulefunc(foo, 0)
599
def check_ip(self, connection=None, ip=None):
601
ip = connection.get_ip(True)
602
if self.config['security'] and self.banned.has_key(ip):
604
if self.external_bans.includes(ip):
608
def got_id(self, connection):
609
if connection.id == self.my_id:
610
self.connecter.external_connection_made -= 1
612
ip = connection.get_ip(True)
613
for v in self.connections.values():
614
if connection is not v:
615
if connection.id == v.id:
616
if ip == v.get_ip(True):
620
if self.config['security'] and ip != 'unknown' and ip == v.get_ip(True):
624
def external_connection_made(self, connection):
625
if self.paused or len(self.connections) >= self.max_connections:
628
con = Connection(self, connection, None)
629
self.connections[connection] = con
630
connection.set_handler(con)
633
def externally_handshaked_connection_made(self, connection, options,
634
already_read, encrypted = None):
636
or len(self.connections) >= self.max_connections
637
or not self.check_ip(connection=connection) ):
640
con = Connection(self, connection, None,
641
ext_handshake = True, encrypted = encrypted, options = options)
642
self.connections[connection] = con
643
connection.set_handler(con)
645
con.data_came_in(con, already_read)
649
for c in self.connections.values():
651
self.connections = {}
656
def pause(self, flag):