~cosmin.lupu/+junk/penguintv

« back to all changes in this revision

Viewing changes to penguintv/ptvbittorrent/Downloader.py

  • Committer: cosmin.lupu at gmail
  • Date: 2010-04-27 16:47:43 UTC
  • Revision ID: cosmin.lupu@gmail.com-20100427164743-ds8xrqonipp5ovdf
initial packaging

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Written by Bram Cohen
 
2
# see LICENSE.txt for license information
 
3
 
 
4
from CurrentRateMeasure import Measure
 
5
from random import shuffle
 
6
from time import time
 
7
from bitfield import Bitfield
 
8
 
 
9
class SingleDownload:
 
10
    def __init__(self, downloader, connection):
 
11
        self.downloader = downloader
 
12
        self.connection = connection
 
13
        self.choked = True
 
14
        self.interested = False
 
15
        self.active_requests = []
 
16
        self.measure = Measure(downloader.max_rate_period)
 
17
        self.have = Bitfield(downloader.numpieces)
 
18
        self.last = 0
 
19
        self.example_interest = None
 
20
 
 
21
    def disconnected(self):
 
22
        self.downloader.downloads.remove(self)
 
23
        for i in xrange(len(self.have)):
 
24
            if self.have[i]:
 
25
                self.downloader.picker.lost_have(i)
 
26
        self._letgo()
 
27
 
 
28
    def _letgo(self):
 
29
        if not self.active_requests:
 
30
            return
 
31
        if self.downloader.storage.is_endgame():
 
32
            self.active_requests = []
 
33
            return
 
34
        lost = []
 
35
        for index, begin, length in self.active_requests:
 
36
            self.downloader.storage.request_lost(index, begin, length)
 
37
            if index not in lost:
 
38
                lost.append(index)
 
39
        self.active_requests = []
 
40
        ds = [d for d in self.downloader.downloads if not d.choked]
 
41
        shuffle(ds)
 
42
        for d in ds:
 
43
            d._request_more(lost)
 
44
        for d in self.downloader.downloads:
 
45
            if d.choked and not d.interested:
 
46
                for l in lost:
 
47
                    if d.have[l] and self.downloader.storage.do_I_have_requests(l):
 
48
                        d.interested = True
 
49
                        d.connection.send_interested()
 
50
                        break
 
51
 
 
52
    def got_choke(self):
 
53
        if not self.choked:
 
54
            self.choked = True
 
55
            self._letgo()
 
56
 
 
57
    def got_unchoke(self):
 
58
        if self.choked:
 
59
            self.choked = False
 
60
            if self.interested:
 
61
                self._request_more()
 
62
 
 
63
    def is_choked(self):
 
64
        return self.choked
 
65
 
 
66
    def is_interested(self):
 
67
        return self.interested
 
68
 
 
69
    def got_piece(self, index, begin, piece):
 
70
        try:
 
71
            self.active_requests.remove((index, begin, len(piece)))
 
72
        except ValueError:
 
73
            return False
 
74
        if self.downloader.storage.is_endgame():
 
75
            self.downloader.all_requests.remove((index, begin, len(piece)))
 
76
        self.last = time()
 
77
        self.measure.update_rate(len(piece))
 
78
        self.downloader.measurefunc(len(piece))
 
79
        self.downloader.downmeasure.update_rate(len(piece))
 
80
        if not self.downloader.storage.piece_came_in(index, begin, piece):
 
81
            if self.downloader.storage.is_endgame():
 
82
                while self.downloader.storage.do_I_have_requests(index):
 
83
                    nb, nl = self.downloader.storage.new_request(index)
 
84
                    self.downloader.all_requests.append((index, nb, nl))
 
85
                for d in self.downloader.downloads:
 
86
                    d.fix_download_endgame()
 
87
                return False
 
88
            self.downloader.picker.bump(index)
 
89
            ds = [d for d in self.downloader.downloads if not d.choked]
 
90
            shuffle(ds)
 
91
            for d in ds:
 
92
                d._request_more([index])
 
93
            return False
 
94
        if self.downloader.storage.do_I_have(index):
 
95
            self.downloader.picker.complete(index)
 
96
        if self.downloader.storage.is_endgame():
 
97
            for d in self.downloader.downloads:
 
98
                if d is not self and d.interested:
 
99
                    if d.choked:
 
100
                        d.fix_download_endgame()
 
101
                    else:
 
102
                        try:
 
103
                            d.active_requests.remove((index, begin, len(piece)))
 
104
                        except ValueError:
 
105
                            continue
 
106
                        d.connection.send_cancel(index, begin, len(piece))
 
107
                        d.fix_download_endgame()
 
108
        self._request_more()
 
109
        if self.downloader.picker.am_I_complete():
 
110
            for d in [i for i in self.downloader.downloads if i.have.numfalse == 0]:
 
111
                d.connection.close()
 
112
        return self.downloader.storage.do_I_have(index)
 
113
 
 
114
    def _want(self, index):
 
115
        return self.have[index] and self.downloader.storage.do_I_have_requests(index)
 
116
 
 
117
    def _request_more(self, indices = None):
 
118
        assert not self.choked
 
119
        if len(self.active_requests) == self.downloader.backlog:
 
120
            return
 
121
        if self.downloader.storage.is_endgame():
 
122
            self.fix_download_endgame()
 
123
            return
 
124
        lost_interests = []
 
125
        while len(self.active_requests) < self.downloader.backlog:
 
126
            if indices is None:
 
127
                interest = self.downloader.picker.next(self._want, self.have.numfalse == 0)
 
128
            else:
 
129
                interest = None
 
130
                for i in indices:
 
131
                    if self.have[i] and self.downloader.storage.do_I_have_requests(i):
 
132
                        interest = i
 
133
                        break
 
134
            if interest is None:
 
135
                break
 
136
            if not self.interested:
 
137
                self.interested = True
 
138
                self.connection.send_interested()
 
139
            self.example_interest = interest
 
140
            begin, length = self.downloader.storage.new_request(interest)
 
141
            self.downloader.picker.requested(interest, self.have.numfalse == 0)
 
142
            self.active_requests.append((interest, begin, length))
 
143
            self.connection.send_request(interest, begin, length)
 
144
            if not self.downloader.storage.do_I_have_requests(interest):
 
145
                lost_interests.append(interest)
 
146
        if not self.active_requests and self.interested:
 
147
            self.interested = False
 
148
            self.connection.send_not_interested()
 
149
        if lost_interests:
 
150
            for d in self.downloader.downloads:
 
151
                if d.active_requests or not d.interested:
 
152
                    continue
 
153
                if d.example_interest is not None and self.downloader.storage.do_I_have_requests(d.example_interest):
 
154
                    continue
 
155
                for lost in lost_interests:
 
156
                    if d.have[lost]:
 
157
                        break
 
158
                else:
 
159
                    continue
 
160
                interest = self.downloader.picker.next(d._want, d.have.numfalse == 0)
 
161
                if interest is None:
 
162
                    d.interested = False
 
163
                    d.connection.send_not_interested()
 
164
                else:
 
165
                    d.example_interest = interest
 
166
        if self.downloader.storage.is_endgame():
 
167
            self.downloader.all_requests = []
 
168
            for d in self.downloader.downloads:
 
169
                self.downloader.all_requests.extend(d.active_requests)
 
170
            for d in self.downloader.downloads:
 
171
                d.fix_download_endgame()
 
172
 
 
173
    def fix_download_endgame(self):
 
174
        want = [a for a in self.downloader.all_requests if self.have[a[0]] and a not in self.active_requests]
 
175
        if self.interested and not self.active_requests and not want:
 
176
            self.interested = False
 
177
            self.connection.send_not_interested()
 
178
            return
 
179
        if not self.interested and want:
 
180
            self.interested = True
 
181
            self.connection.send_interested()
 
182
        if self.choked:
 
183
            return
 
184
        shuffle(want)
 
185
        del want[self.downloader.backlog - len(self.active_requests):]
 
186
        self.active_requests.extend(want)
 
187
        for piece, begin, length in want:
 
188
            self.connection.send_request(piece, begin, length)
 
189
 
 
190
    def got_have(self, index):
 
191
        if self.have[index]:
 
192
            return
 
193
        self.have[index] = True
 
194
        self.downloader.picker.got_have(index)
 
195
        if self.downloader.picker.am_I_complete() and self.have.numfalse == 0:
 
196
            self.connection.close()
 
197
            return
 
198
        if self.downloader.storage.is_endgame():
 
199
            self.fix_download_endgame()
 
200
        elif self.downloader.storage.do_I_have_requests(index):
 
201
            if not self.choked:
 
202
                self._request_more([index])
 
203
            else:
 
204
                if not self.interested:
 
205
                    self.interested = True
 
206
                    self.connection.send_interested()
 
207
 
 
208
    def got_have_bitfield(self, have):
 
209
        self.have = have
 
210
        for i in xrange(len(self.have)):
 
211
            if self.have[i]:
 
212
                self.downloader.picker.got_have(i)
 
213
        if self.downloader.picker.am_I_complete() and self.have.numfalse == 0:
 
214
            self.connection.close()
 
215
            return
 
216
        if self.downloader.storage.is_endgame():
 
217
            for piece, begin, length in self.downloader.all_requests:
 
218
                if self.have[piece]:
 
219
                    self.interested = True
 
220
                    self.connection.send_interested()
 
221
                    return
 
222
        for i in xrange(len(self.have)):
 
223
            if self.have[i] and self.downloader.storage.do_I_have_requests(i):
 
224
                self.interested = True
 
225
                self.connection.send_interested()
 
226
                return
 
227
 
 
228
    def get_rate(self):
 
229
        return self.measure.get_rate()
 
230
 
 
231
    def is_snubbed(self):
 
232
        return time() - self.last > self.downloader.snub_time
 
233
 
 
234
class Downloader:
 
235
    def __init__(self, storage, picker, backlog, max_rate_period, numpieces, 
 
236
            downmeasure, snub_time, measurefunc = lambda x: None):
 
237
        self.storage = storage
 
238
        self.picker = picker
 
239
        self.backlog = backlog
 
240
        self.max_rate_period = max_rate_period
 
241
        self.downmeasure = downmeasure
 
242
        self.numpieces = numpieces
 
243
        self.snub_time = snub_time
 
244
        self.measurefunc = measurefunc
 
245
        self.downloads = []
 
246
 
 
247
    def make_download(self, connection):
 
248
        self.downloads.append(SingleDownload(self, connection))
 
249
        return self.downloads[-1]
 
250
 
 
251
class DummyPicker:
 
252
    def __init__(self, num, r):
 
253
        self.stuff = range(num)
 
254
        self.r = r
 
255
 
 
256
    def next(self, wantfunc, seed):
 
257
        for i in self.stuff:
 
258
            if wantfunc(i):
 
259
                return i
 
260
        return None
 
261
 
 
262
    def lost_have(self, pos):
 
263
        self.r.append('lost have')
 
264
 
 
265
    def got_have(self, pos):
 
266
        self.r.append('got have')
 
267
 
 
268
    def requested(self, pos, seed):
 
269
        self.r.append('requested')
 
270
 
 
271
    def complete(self, pos):
 
272
        self.stuff.remove(pos)
 
273
        self.r.append('complete')
 
274
 
 
275
    def am_I_complete(self):
 
276
        return False
 
277
 
 
278
    def bump(self, i):
 
279
        pass
 
280
 
 
281
class DummyStorage:
 
282
    def __init__(self, remaining, have_endgame = False, numpieces = 1):
 
283
        self.remaining = remaining
 
284
        self.active = [[] for i in xrange(numpieces)]
 
285
        self.endgame = False
 
286
        self.have_endgame = have_endgame
 
287
 
 
288
    def do_I_have_requests(self, index):
 
289
        return self.remaining[index] != []
 
290
        
 
291
    def request_lost(self, index, begin, length):
 
292
        x = (begin, length)
 
293
        self.active[index].remove(x)
 
294
        self.remaining[index].append(x)
 
295
        self.remaining[index].sort()
 
296
        
 
297
    def piece_came_in(self, index, begin, piece):
 
298
        self.active[index].remove((begin, len(piece)))
 
299
        return True
 
300
        
 
301
    def do_I_have(self, index):
 
302
        return (self.remaining[index] == [] and 
 
303
            self.active[index] == [])
 
304
        
 
305
    def new_request(self, index):
 
306
        x = self.remaining[index].pop()
 
307
        for i in self.remaining:
 
308
            if i:
 
309
                break
 
310
        else:
 
311
            self.endgame = True
 
312
        self.active[index].append(x)
 
313
        self.active[index].sort()
 
314
        return x
 
315
 
 
316
    def is_endgame(self):
 
317
        return self.have_endgame and self.endgame
 
318
 
 
319
class DummyConnection:
 
320
    def __init__(self, events):
 
321
        self.events = events
 
322
 
 
323
    def send_interested(self):
 
324
        self.events.append('interested')
 
325
        
 
326
    def send_not_interested(self):
 
327
        self.events.append('not interested')
 
328
        
 
329
    def send_request(self, index, begin, length):
 
330
        self.events.append(('request', index, begin, length))
 
331
 
 
332
    def send_cancel(self, index, begin, length):
 
333
        self.events.append(('cancel', index, begin, length))
 
334
 
 
335
def test_stops_at_backlog():
 
336
    ds = DummyStorage([[(0, 2), (2, 2), (4, 2), (6, 2)]])
 
337
    events = []
 
338
    d = Downloader(ds, DummyPicker(len(ds.remaining), events), 2, 15, 1, Measure(15), 10)
 
339
    sd = d.make_download(DummyConnection(events))
 
340
    assert events == []
 
341
    assert ds.remaining == [[(0, 2), (2, 2), (4, 2), (6, 2)]]
 
342
    assert ds.active == [[]]
 
343
    sd.got_have_bitfield(Bitfield(1, chr(0x80)))
 
344
    assert events == ['got have', 'interested']
 
345
    del events[:]
 
346
    assert ds.remaining == [[(0, 2), (2, 2), (4, 2), (6, 2)]]
 
347
    assert ds.active == [[]]
 
348
    sd.got_unchoke()
 
349
    assert events == ['requested', ('request', 0, 6, 2), 'requested', ('request', 0, 4, 2)]
 
350
    del events[:]
 
351
    assert ds.remaining == [[(0, 2), (2, 2)]]
 
352
    assert ds.active == [[(4, 2), (6, 2)]]
 
353
    sd.got_piece(0, 4, 'ab')
 
354
    assert events == ['requested', ('request', 0, 2, 2)]
 
355
    del events[:]
 
356
    assert ds.remaining == [[(0, 2)]]
 
357
    assert ds.active == [[(2, 2), (6, 2)]]
 
358
 
 
359
def test_got_have_single():
 
360
    ds = DummyStorage([[(0, 2)]])
 
361
    events = []
 
362
    d = Downloader(ds, DummyPicker(len(ds.remaining), events), 2, 15, 1, Measure(15), 10)
 
363
    sd = d.make_download(DummyConnection(events))
 
364
    assert events == []
 
365
    assert ds.remaining == [[(0, 2)]]
 
366
    assert ds.active == [[]]
 
367
    sd.got_unchoke()
 
368
    assert events == []
 
369
    assert ds.remaining == [[(0, 2)]]
 
370
    assert ds.active == [[]]
 
371
    sd.got_have(0)
 
372
    assert events == ['got have', 'interested', 'requested', ('request', 0, 0, 2)]
 
373
    del events[:]
 
374
    assert ds.remaining == [[]]
 
375
    assert ds.active == [[(0, 2)]]
 
376
    sd.disconnected()
 
377
    assert events == ['lost have']
 
378
 
 
379
def test_choke_clears_active():
 
380
    ds = DummyStorage([[(0, 2)]])
 
381
    events = []
 
382
    d = Downloader(ds, DummyPicker(len(ds.remaining), events), 2, 15, 1, Measure(15), 10)
 
383
    sd1 = d.make_download(DummyConnection(events))
 
384
    sd2 = d.make_download(DummyConnection(events))
 
385
    assert events == []
 
386
    assert ds.remaining == [[(0, 2)]]
 
387
    assert ds.active == [[]]
 
388
    sd1.got_unchoke()
 
389
    sd1.got_have(0)
 
390
    assert events == ['got have', 'interested', 'requested', ('request', 0, 0, 2)]
 
391
    del events[:]
 
392
    assert ds.remaining == [[]]
 
393
    assert ds.active == [[(0, 2)]]
 
394
    sd2.got_unchoke()
 
395
    sd2.got_have(0)
 
396
    assert events == ['got have']
 
397
    del events[:]
 
398
    assert ds.remaining == [[]]
 
399
    assert ds.active == [[(0, 2)]]
 
400
    sd1.got_choke()
 
401
    assert events == ['interested', 'requested', ('request', 0, 0, 2), 'not interested']
 
402
    del events[:]
 
403
    assert ds.remaining == [[]]
 
404
    assert ds.active == [[(0, 2)]]
 
405
    sd2.got_piece(0, 0, 'ab')
 
406
    assert events == ['complete', 'not interested']
 
407
    del events[:]
 
408
    assert ds.remaining == [[]]
 
409
    assert ds.active == [[]]
 
410
 
 
411
def test_endgame():
 
412
    ds = DummyStorage([[(0, 2)], [(0, 2)], [(0, 2)]], True, 3)
 
413
    events = []
 
414
    d = Downloader(ds, DummyPicker(len(ds.remaining), events), 10, 15, 3, Measure(15), 10)
 
415
    ev1 = []
 
416
    ev2 = []
 
417
    ev3 = []
 
418
    ev4 = []
 
419
    sd1 = d.make_download(DummyConnection(ev1))
 
420
    sd2 = d.make_download(DummyConnection(ev2))
 
421
    sd3 = d.make_download(DummyConnection(ev3))
 
422
    sd1.got_unchoke()
 
423
    sd1.got_have(0)
 
424
    assert ev1 == ['interested', ('request', 0, 0, 2)]
 
425
    del ev1[:]
 
426
    
 
427
    sd2.got_unchoke()
 
428
    sd2.got_have(0)
 
429
    sd2.got_have(1)
 
430
    assert ev2 == ['interested', ('request', 1, 0, 2)]
 
431
    del ev2[:]
 
432
    
 
433
    sd3.got_unchoke()
 
434
    sd3.got_have(0)
 
435
    sd3.got_have(1)
 
436
    sd3.got_have(2)
 
437
    assert (ev3 == ['interested', ('request', 2, 0, 2), ('request', 0, 0, 2), ('request', 1, 0, 2)] or 
 
438
        ev3 == ['interested', ('request', 2, 0, 2), ('request', 1, 0, 2), ('request', 0, 0, 2)])
 
439
    del ev3[:]
 
440
    assert ev2 == [('request', 0, 0, 2)]
 
441
    del ev2[:]
 
442
 
 
443
    sd2.got_piece(0, 0, 'ab')
 
444
    assert ev1 == [('cancel', 0, 0, 2), 'not interested']
 
445
    del ev1[:]
 
446
    assert ev2 == []
 
447
    assert ev3 == [('cancel', 0, 0, 2)]
 
448
    del ev3[:]
 
449
 
 
450
    sd3.got_choke()
 
451
    assert ev1 == []
 
452
    assert ev2 == []
 
453
    assert ev3 == []
 
454
 
 
455
    sd3.got_unchoke()
 
456
    assert (ev3 == [('request', 2, 0, 2), ('request', 1, 0, 2)] or 
 
457
        ev3 == [('request', 1, 0, 2), ('request', 2, 0, 2)])
 
458
    del ev3[:]
 
459
    assert ev1 == []
 
460
    assert ev2 == []
 
461
 
 
462
    sd4 = d.make_download(DummyConnection(ev4))
 
463
    sd4.got_have_bitfield([True, True, True])
 
464
    assert ev4 == ['interested']
 
465
    del ev4[:]
 
466
    sd4.got_unchoke()
 
467
    assert (ev4 == [('request', 2, 0, 2), ('request', 1, 0, 2)] or 
 
468
        ev4 == [('request', 1, 0, 2), ('request', 2, 0, 2)])
 
469
    assert ev1 == []
 
470
    assert ev2 == []
 
471
    assert ev3 == []
 
472
 
 
473
def test_stops_at_backlog_endgame():
 
474
    ds = DummyStorage([[(2, 2), (0, 2)], [(2, 2), (0, 2)], [(0, 2)]], True, 3)
 
475
    events = []
 
476
    d = Downloader(ds, DummyPicker(len(ds.remaining), events), 3, 15, 3, Measure(15), 10)
 
477
    ev1 = []
 
478
    ev2 = []
 
479
    ev3 = []
 
480
    sd1 = d.make_download(DummyConnection(ev1))
 
481
    sd2 = d.make_download(DummyConnection(ev2))
 
482
    sd3 = d.make_download(DummyConnection(ev3))
 
483
 
 
484
    sd1.got_unchoke()
 
485
    sd1.got_have(0)
 
486
    assert ev1 == ['interested', ('request', 0, 0, 2), ('request', 0, 2, 2)]
 
487
    del ev1[:]
 
488
 
 
489
    sd2.got_unchoke()
 
490
    sd2.got_have(0)
 
491
    assert ev2 == []
 
492
    sd2.got_have(1)
 
493
    assert ev2 == ['interested', ('request', 1, 0, 2), ('request', 1, 2, 2)]
 
494
    del ev2[:]
 
495
 
 
496
    sd3.got_unchoke()
 
497
    sd3.got_have(2)
 
498
    assert (ev2 == [('request', 0, 0, 2)] or 
 
499
        ev2 == [('request', 0, 2, 2)])
 
500
    n = ev2[0][2]
 
501
    del ev2[:]
 
502
 
 
503
    sd1.got_piece(0, n, 'ab')
 
504
    assert ev1 == []
 
505
    assert ev2 == [('cancel', 0, n, 2), ('request', 0, 2-n, 2)]