~ubuntu-branches/ubuntu/precise/bittornado/precise

« back to all changes in this revision

Viewing changes to .pc/32_use_hashlib_for_sha.patch/BitTornado/BT1/Rerequester.py

  • Committer: Barry Warsaw
  • Date: 2011-08-10 23:17:46 UTC
  • mfrom: (7.1.1 bittornado)
  • Revision ID: barry@python.org-20110810231746-5buiob6p54m266s8
Tags: 0.3.18-10ubuntu2
* switch to dh_python2 (LP: #788514)
  - install btmakemetafile.py and btcompletedir.py via pyinstall
  - add build depend on python-all
  - bump debhelper depend to 7 for dh_auto_install

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Written by Bram Cohen
 
2
# modified for multitracker operation by John Hoffman
 
3
# see LICENSE.txt for license information
 
4
 
 
5
from BitTornado.zurllib import urlopen, quote
 
6
from urlparse import urlparse, urlunparse
 
7
from socket import gethostbyname
 
8
from btformats import check_peers
 
9
from BitTornado.bencode import bdecode
 
10
from threading import Thread, Lock
 
11
from cStringIO import StringIO
 
12
from traceback import print_exc
 
13
from socket import error, gethostbyname
 
14
from random import shuffle
 
15
from sha import sha
 
16
from time import time
 
17
try:
 
18
    from os import getpid
 
19
except ImportError:
 
20
    def getpid():
 
21
        return 1
 
22
    
 
23
try:
 
24
    True
 
25
except:
 
26
    True = 1
 
27
    False = 0
 
28
 
 
29
mapbase64 = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz.-'
 
30
keys = {}
 
31
basekeydata = str(getpid()) + repr(time()) + 'tracker'
 
32
 
 
33
def add_key(tracker):
 
34
    key = ''
 
35
    for i in sha(basekeydata+tracker).digest()[-6:]:
 
36
        key += mapbase64[ord(i) & 0x3F]
 
37
    keys[tracker] = key
 
38
 
 
39
def get_key(tracker):
 
40
    try:
 
41
        return "&key="+keys[tracker]
 
42
    except:
 
43
        add_key(tracker)
 
44
        return "&key="+keys[tracker]
 
45
 
 
46
class fakeflag:
 
47
    def __init__(self, state=False):
 
48
        self.state = state
 
49
    def wait(self):
 
50
        pass
 
51
    def isSet(self):
 
52
        return self.state
 
53
 
 
54
class Rerequester:
 
55
    def __init__( self, port, myid, infohash, trackerlist, config,
 
56
                  sched, externalsched, errorfunc, excfunc, connect,
 
57
                  howmany, amount_left, up, down, upratefunc, downratefunc,
 
58
                  doneflag, unpauseflag = fakeflag(True),
 
59
                  seededfunc = None, force_rapid_update = False ):
 
60
 
 
61
        self.sched = sched
 
62
        self.externalsched = externalsched
 
63
        self.errorfunc = errorfunc
 
64
        self.excfunc = excfunc
 
65
        self.connect = connect
 
66
        self.howmany = howmany
 
67
        self.amount_left = amount_left
 
68
        self.up = up
 
69
        self.down = down
 
70
        self.upratefunc = upratefunc
 
71
        self.downratefunc = downratefunc
 
72
        self.doneflag = doneflag
 
73
        self.unpauseflag = unpauseflag
 
74
        self.seededfunc = seededfunc
 
75
        self.force_rapid_update = force_rapid_update
 
76
 
 
77
        self.ip = config.get('ip','')
 
78
        self.minpeers = config['min_peers']
 
79
        self.maxpeers = config['max_initiate']
 
80
        self.interval = config['rerequest_interval']
 
81
        self.timeout = config['http_timeout']
 
82
 
 
83
        newtrackerlist = []        
 
84
        for tier in trackerlist:
 
85
            if len(tier)>1:
 
86
                shuffle(tier)
 
87
            newtrackerlist += [tier]
 
88
        self.trackerlist = newtrackerlist
 
89
 
 
90
        self.lastsuccessful = ''
 
91
        self.rejectedmessage = 'rejected by tracker - '
 
92
 
 
93
        self.url = ('info_hash=%s&peer_id=%s' %
 
94
            (quote(infohash), quote(myid)))
 
95
        if not config.get('crypto_allowed'):
 
96
            self.url += "&port="
 
97
        else:
 
98
            self.url += "&supportcrypto=1"
 
99
            if not config.get('crypto_only'):
 
100
                    self.url += "&port="
 
101
            else:
 
102
                self.url += "&requirecrypto=1"            
 
103
                if not config.get('crypto_stealth'):
 
104
                    self.url += "&port="
 
105
                else:
 
106
                    self.url += "&port=0&cryptoport="
 
107
        self.url += str(port)
 
108
 
 
109
        seed_id = config.get('dedicated_seed_id')
 
110
        if seed_id:
 
111
            self.url += '&seed_id='+quote(seed_id)
 
112
        if self.seededfunc:
 
113
            self.url += '&check_seeded=1'
 
114
 
 
115
        self.last = None
 
116
        self.trackerid = None
 
117
        self.announce_interval = 30 * 60
 
118
        self.last_failed = True
 
119
        self.never_succeeded = True
 
120
        self.errorcodes = {}
 
121
        self.lock = SuccessLock()
 
122
        self.special = None
 
123
        self.stopped = False
 
124
 
 
125
    def start(self):
 
126
        self.sched(self.c, self.interval/2)
 
127
        self.d(0)
 
128
 
 
129
    def c(self):
 
130
        if self.stopped:
 
131
            return
 
132
        if not self.unpauseflag.isSet() and (
 
133
            self.howmany() < self.minpeers or self.force_rapid_update ):
 
134
            self.announce(3, self._c)
 
135
        else:
 
136
            self._c()
 
137
 
 
138
    def _c(self):
 
139
        self.sched(self.c, self.interval)
 
140
 
 
141
    def d(self, event = 3):
 
142
        if self.stopped:
 
143
            return
 
144
        if not self.unpauseflag.isSet():
 
145
            self._d()
 
146
            return
 
147
        self.announce(event, self._d)
 
148
 
 
149
    def _d(self):
 
150
        if self.never_succeeded:
 
151
            self.sched(self.d, 60)  # retry in 60 seconds
 
152
        elif self.force_rapid_update:
 
153
            return
 
154
        else:
 
155
            self.sched(self.d, self.announce_interval)
 
156
 
 
157
 
 
158
    def hit(self, event = 3):
 
159
        if not self.unpauseflag.isSet() and (
 
160
            self.howmany() < self.minpeers or self.force_rapid_update ):
 
161
            self.announce(event)
 
162
 
 
163
    def announce(self, event = 3, callback = lambda: None, specialurl = None):
 
164
 
 
165
        if specialurl is not None:
 
166
            s = self.url+'&uploaded=0&downloaded=0&left=1'   # don't add to statistics
 
167
            if self.howmany() >= self.maxpeers:
 
168
                s += '&numwant=0'
 
169
            else:
 
170
                s += '&no_peer_id=1&compact=1'
 
171
            self.last_failed = True         # force true, so will display an error
 
172
            self.special = specialurl
 
173
            self.rerequest(s, callback)
 
174
            return
 
175
        
 
176
        else:
 
177
            s = ('%s&uploaded=%s&downloaded=%s&left=%s' %
 
178
                (self.url, str(self.up()), str(self.down()), 
 
179
                str(self.amount_left())))
 
180
        if self.last is not None:
 
181
            s += '&last=' + quote(str(self.last))
 
182
        if self.trackerid is not None:
 
183
            s += '&trackerid=' + quote(str(self.trackerid))
 
184
        if self.howmany() >= self.maxpeers:
 
185
            s += '&numwant=0'
 
186
        else:
 
187
            s += '&no_peer_id=1&compact=1'
 
188
        if event != 3:
 
189
            s += '&event=' + ['started', 'completed', 'stopped'][event]
 
190
        if event == 2:
 
191
            self.stopped = True
 
192
        self.rerequest(s, callback)
 
193
 
 
194
 
 
195
    def snoop(self, peers, callback = lambda: None):  # tracker call support
 
196
        self.rerequest(self.url
 
197
            +'&event=stopped&port=0&uploaded=0&downloaded=0&left=1&tracker=1&numwant='
 
198
            +str(peers), callback)
 
199
 
 
200
 
 
201
    def rerequest(self, s, callback):
 
202
        if not self.lock.isfinished():  # still waiting for prior cycle to complete??
 
203
            def retry(self = self, s = s, callback = callback):
 
204
                self.rerequest(s, callback)
 
205
            self.sched(retry,5)         # retry in 5 seconds
 
206
            return
 
207
        self.lock.reset()
 
208
        rq = Thread(target = self._rerequest, args = [s, callback])
 
209
        rq.setDaemon(False)
 
210
        rq.start()
 
211
 
 
212
    def _rerequest(self, s, callback):
 
213
        try:
 
214
            def fail (self = self, callback = callback):
 
215
                self._fail(callback)
 
216
            if self.ip:
 
217
                try:
 
218
                    s += '&ip=' + gethostbyname(self.ip)
 
219
                except:
 
220
                    self.errorcodes['troublecode'] = 'unable to resolve: '+self.ip
 
221
                    self.externalsched(fail)
 
222
            self.errorcodes = {}
 
223
            if self.special is None:
 
224
                for t in range(len(self.trackerlist)):
 
225
                    for tr in range(len(self.trackerlist[t])):
 
226
                        tracker  = self.trackerlist[t][tr]
 
227
                        if self.rerequest_single(tracker, s, callback):
 
228
                            if not self.last_failed and tr != 0:
 
229
                                del self.trackerlist[t][tr]
 
230
                                self.trackerlist[t] = [tracker] + self.trackerlist[t]
 
231
                            return
 
232
            else:
 
233
                tracker = self.special
 
234
                self.special = None
 
235
                if self.rerequest_single(tracker, s, callback):
 
236
                    return
 
237
            # no success from any tracker
 
238
            self.externalsched(fail)
 
239
        except:
 
240
            self.exception(callback)
 
241
 
 
242
 
 
243
    def _fail(self, callback):
 
244
        if ( (self.upratefunc() < 100 and self.downratefunc() < 100)
 
245
             or not self.amount_left() ):
 
246
            for f in ['rejected', 'bad_data', 'troublecode']:
 
247
                if self.errorcodes.has_key(f):
 
248
                    r = self.errorcodes[f]
 
249
                    break
 
250
            else:
 
251
                r = 'Problem connecting to tracker - unspecified error'
 
252
            self.errorfunc(r)
 
253
 
 
254
        self.last_failed = True
 
255
        self.lock.give_up()
 
256
        self.externalsched(callback)
 
257
 
 
258
 
 
259
    def rerequest_single(self, t, s, callback):
 
260
        l = self.lock.set()
 
261
        rq = Thread(target = self._rerequest_single, args = [t, s+get_key(t), l, callback])
 
262
        rq.setDaemon(False)
 
263
        rq.start()
 
264
        self.lock.wait()
 
265
        if self.lock.success:
 
266
            self.lastsuccessful = t
 
267
            self.last_failed = False
 
268
            self.never_succeeded = False
 
269
            return True
 
270
        if not self.last_failed and self.lastsuccessful == t:
 
271
            # if the last tracker hit was successful, and you've just tried the tracker
 
272
            # you'd contacted before, don't go any further, just fail silently.
 
273
            self.last_failed = True
 
274
            self.externalsched(callback)
 
275
            self.lock.give_up()
 
276
            return True
 
277
        return False    # returns true if it wants rerequest() to exit
 
278
 
 
279
 
 
280
    def _rerequest_single(self, t, s, l, callback):
 
281
        try:        
 
282
            closer = [None]
 
283
            def timedout(self = self, l = l, closer = closer):
 
284
                if self.lock.trip(l):
 
285
                    self.errorcodes['troublecode'] = 'Problem connecting to tracker - timeout exceeded'
 
286
                    self.lock.unwait(l)
 
287
                try:
 
288
                    closer[0]()
 
289
                except:
 
290
                    pass
 
291
                    
 
292
            self.externalsched(timedout, self.timeout)
 
293
 
 
294
            err = None
 
295
            try:
 
296
                url,q = t.split('?',1)
 
297
                q += '&'+s
 
298
            except:
 
299
                url = t
 
300
                q = s
 
301
            try:
 
302
                h = urlopen(url+'?'+q)
 
303
                closer[0] = h.close
 
304
                data = h.read()
 
305
            except (IOError, error), e:
 
306
                err = 'Problem connecting to tracker - ' + str(e)
 
307
            except:
 
308
                err = 'Problem connecting to tracker'
 
309
            try:
 
310
                h.close()
 
311
            except:
 
312
                pass
 
313
            if err:        
 
314
                if self.lock.trip(l):
 
315
                    self.errorcodes['troublecode'] = err
 
316
                    self.lock.unwait(l)
 
317
                return
 
318
 
 
319
            if data == '':
 
320
                if self.lock.trip(l):
 
321
                    self.errorcodes['troublecode'] = 'no data from tracker'
 
322
                    self.lock.unwait(l)
 
323
                return
 
324
            
 
325
            try:
 
326
                r = bdecode(data, sloppy=1)
 
327
                check_peers(r)
 
328
            except ValueError, e:
 
329
                if self.lock.trip(l):
 
330
                    self.errorcodes['bad_data'] = 'bad data from tracker - ' + str(e)
 
331
                    self.lock.unwait(l)
 
332
                return
 
333
            
 
334
            if r.has_key('failure reason'):
 
335
                if self.lock.trip(l):
 
336
                    self.errorcodes['rejected'] = self.rejectedmessage + r['failure reason']
 
337
                    self.lock.unwait(l)
 
338
                return
 
339
                
 
340
            if self.lock.trip(l, True):     # success!
 
341
                self.lock.unwait(l)
 
342
            else:
 
343
                callback = lambda: None     # attempt timed out, don't do a callback
 
344
 
 
345
            # even if the attempt timed out, go ahead and process data
 
346
            def add(self = self, r = r, callback = callback):
 
347
                self.postrequest(r, callback)
 
348
            self.externalsched(add)
 
349
        except:
 
350
            self.exception(callback)
 
351
 
 
352
 
 
353
    def postrequest(self, r, callback):
 
354
        if r.has_key('warning message'):
 
355
                self.errorfunc('warning from tracker - ' + r['warning message'])
 
356
        self.announce_interval = r.get('interval', self.announce_interval)
 
357
        self.interval = r.get('min interval', self.interval)
 
358
        self.trackerid = r.get('tracker id', self.trackerid)
 
359
        self.last = r.get('last')
 
360
#        ps = len(r['peers']) + self.howmany()
 
361
        p = r['peers']
 
362
        peers = []
 
363
        if type(p) == type(''):
 
364
            lenpeers = len(p)/6
 
365
        else:
 
366
            lenpeers = len(p)
 
367
        cflags = r.get('crypto_flags')
 
368
        if type(cflags) != type('') or len(cflags) != lenpeers:
 
369
            cflags = None
 
370
        if cflags is None:
 
371
            cflags = [None for i in xrange(lenpeers)]
 
372
        else:
 
373
            cflags = [ord(x) for x in cflags]
 
374
        if type(p) == type(''):
 
375
            for x in xrange(0, len(p), 6):
 
376
                ip = '.'.join([str(ord(i)) for i in p[x:x+4]])
 
377
                port = (ord(p[x+4]) << 8) | ord(p[x+5])
 
378
                peers.append(((ip, port), 0, cflags[int(x/6)]))
 
379
        else:
 
380
            for i in xrange(len(p)):
 
381
                x = p[i]
 
382
                peers.append(((x['ip'].strip(), x['port']),
 
383
                              x.get('peer id',0), cflags[i]))
 
384
        ps = len(peers) + self.howmany()
 
385
        if ps < self.maxpeers:
 
386
            if self.doneflag.isSet():
 
387
                if r.get('num peers', 1000) - r.get('done peers', 0) > ps * 1.2:
 
388
                    self.last = None
 
389
            else:
 
390
                if r.get('num peers', 1000) > ps * 1.2:
 
391
                    self.last = None
 
392
        if self.seededfunc and r.get('seeded'):
 
393
            self.seededfunc()
 
394
        elif peers:
 
395
            shuffle(peers)
 
396
            self.connect(peers)
 
397
        callback()
 
398
 
 
399
    def exception(self, callback):
 
400
        data = StringIO()
 
401
        print_exc(file = data)
 
402
        def r(s = data.getvalue(), callback = callback):
 
403
            if self.excfunc:
 
404
                self.excfunc(s)
 
405
            else:
 
406
                print s
 
407
            callback()
 
408
        self.externalsched(r)
 
409
 
 
410
 
 
411
class SuccessLock:
 
412
    def __init__(self):
 
413
        self.lock = Lock()
 
414
        self.pause = Lock()
 
415
        self.code = 0L
 
416
        self.success = False
 
417
        self.finished = True
 
418
 
 
419
    def reset(self):
 
420
        self.success = False
 
421
        self.finished = False
 
422
 
 
423
    def set(self):
 
424
        self.lock.acquire()
 
425
        if not self.pause.locked():
 
426
            self.pause.acquire()
 
427
        self.first = True
 
428
        self.code += 1L
 
429
        self.lock.release()
 
430
        return self.code
 
431
 
 
432
    def trip(self, code, s = False):
 
433
        self.lock.acquire()
 
434
        try:
 
435
            if code == self.code and not self.finished:
 
436
                r = self.first
 
437
                self.first = False
 
438
                if s:
 
439
                    self.finished = True
 
440
                    self.success = True
 
441
                return r
 
442
        finally:
 
443
            self.lock.release()
 
444
 
 
445
    def give_up(self):
 
446
        self.lock.acquire()
 
447
        self.success = False
 
448
        self.finished = True
 
449
        self.lock.release()
 
450
 
 
451
    def wait(self):
 
452
        self.pause.acquire()
 
453
 
 
454
    def unwait(self, code):
 
455
        if code == self.code and self.pause.locked():
 
456
            self.pause.release()
 
457
 
 
458
    def isfinished(self):
 
459
        self.lock.acquire()
 
460
        x = self.finished
 
461
        self.lock.release()
 
462
        return x