1
# Written by Bram Cohen
2
# see LICENSE.txt for license information
4
from CurrentRateMeasure import Measure
5
from random import shuffle
7
from bitfield import Bitfield
10
def __init__(self, downloader, connection):
11
self.downloader = downloader
12
self.connection = connection
14
self.interested = False
15
self.active_requests = []
16
self.measure = Measure(downloader.max_rate_period)
17
self.have = Bitfield(downloader.numpieces)
19
self.example_interest = None
21
def disconnected(self):
22
self.downloader.downloads.remove(self)
23
for i in xrange(len(self.have)):
25
self.downloader.picker.lost_have(i)
29
if not self.active_requests:
31
if self.downloader.storage.is_endgame():
32
self.active_requests = []
35
for index, begin, length in self.active_requests:
36
self.downloader.storage.request_lost(index, begin, length)
39
self.active_requests = []
40
ds = [d for d in self.downloader.downloads if not d.choked]
44
for d in self.downloader.downloads:
45
if d.choked and not d.interested:
47
if d.have[l] and self.downloader.storage.do_I_have_requests(l):
49
d.connection.send_interested()
57
def got_unchoke(self):
66
def is_interested(self):
67
return self.interested
69
def got_piece(self, index, begin, piece):
71
self.active_requests.remove((index, begin, len(piece)))
74
if self.downloader.storage.is_endgame():
75
self.downloader.all_requests.remove((index, begin, len(piece)))
77
self.measure.update_rate(len(piece))
78
self.downloader.measurefunc(len(piece))
79
self.downloader.downmeasure.update_rate(len(piece))
80
if not self.downloader.storage.piece_came_in(index, begin, piece):
81
if self.downloader.storage.is_endgame():
82
while self.downloader.storage.do_I_have_requests(index):
83
nb, nl = self.downloader.storage.new_request(index)
84
self.downloader.all_requests.append((index, nb, nl))
85
for d in self.downloader.downloads:
86
d.fix_download_endgame()
88
self.downloader.picker.bump(index)
89
ds = [d for d in self.downloader.downloads if not d.choked]
92
d._request_more([index])
94
if self.downloader.storage.do_I_have(index):
95
self.downloader.picker.complete(index)
96
if self.downloader.storage.is_endgame():
97
for d in self.downloader.downloads:
98
if d is not self and d.interested:
100
d.fix_download_endgame()
103
d.active_requests.remove((index, begin, len(piece)))
106
d.connection.send_cancel(index, begin, len(piece))
107
d.fix_download_endgame()
109
if self.downloader.picker.am_I_complete():
110
for d in [i for i in self.downloader.downloads if i.have.numfalse == 0]:
112
return self.downloader.storage.do_I_have(index)
114
def _want(self, index):
115
return self.have[index] and self.downloader.storage.do_I_have_requests(index)
117
def _request_more(self, indices = None):
118
assert not self.choked
119
if len(self.active_requests) == self.downloader.backlog:
121
if self.downloader.storage.is_endgame():
122
self.fix_download_endgame()
125
while len(self.active_requests) < self.downloader.backlog:
127
interest = self.downloader.picker.next(self._want, self.have.numfalse == 0)
131
if self.have[i] and self.downloader.storage.do_I_have_requests(i):
136
if not self.interested:
137
self.interested = True
138
self.connection.send_interested()
139
self.example_interest = interest
140
begin, length = self.downloader.storage.new_request(interest)
141
self.downloader.picker.requested(interest, self.have.numfalse == 0)
142
self.active_requests.append((interest, begin, length))
143
self.connection.send_request(interest, begin, length)
144
if not self.downloader.storage.do_I_have_requests(interest):
145
lost_interests.append(interest)
146
if not self.active_requests and self.interested:
147
self.interested = False
148
self.connection.send_not_interested()
150
for d in self.downloader.downloads:
151
if d.active_requests or not d.interested:
153
if d.example_interest is not None and self.downloader.storage.do_I_have_requests(d.example_interest):
155
for lost in lost_interests:
160
interest = self.downloader.picker.next(d._want, d.have.numfalse == 0)
163
d.connection.send_not_interested()
165
d.example_interest = interest
166
if self.downloader.storage.is_endgame():
167
self.downloader.all_requests = []
168
for d in self.downloader.downloads:
169
self.downloader.all_requests.extend(d.active_requests)
170
for d in self.downloader.downloads:
171
d.fix_download_endgame()
173
def fix_download_endgame(self):
174
want = [a for a in self.downloader.all_requests if self.have[a[0]] and a not in self.active_requests]
175
if self.interested and not self.active_requests and not want:
176
self.interested = False
177
self.connection.send_not_interested()
179
if not self.interested and want:
180
self.interested = True
181
self.connection.send_interested()
185
del want[self.downloader.backlog - len(self.active_requests):]
186
self.active_requests.extend(want)
187
for piece, begin, length in want:
188
self.connection.send_request(piece, begin, length)
190
def got_have(self, index):
193
self.have[index] = True
194
self.downloader.picker.got_have(index)
195
if self.downloader.picker.am_I_complete() and self.have.numfalse == 0:
196
self.connection.close()
198
if self.downloader.storage.is_endgame():
199
self.fix_download_endgame()
200
elif self.downloader.storage.do_I_have_requests(index):
202
self._request_more([index])
204
if not self.interested:
205
self.interested = True
206
self.connection.send_interested()
208
def got_have_bitfield(self, have):
210
for i in xrange(len(self.have)):
212
self.downloader.picker.got_have(i)
213
if self.downloader.picker.am_I_complete() and self.have.numfalse == 0:
214
self.connection.close()
216
if self.downloader.storage.is_endgame():
217
for piece, begin, length in self.downloader.all_requests:
219
self.interested = True
220
self.connection.send_interested()
222
for i in xrange(len(self.have)):
223
if self.have[i] and self.downloader.storage.do_I_have_requests(i):
224
self.interested = True
225
self.connection.send_interested()
229
return self.measure.get_rate()
231
def is_snubbed(self):
232
return time() - self.last > self.downloader.snub_time
235
def __init__(self, storage, picker, backlog, max_rate_period, numpieces,
236
downmeasure, snub_time, measurefunc = lambda x: None):
237
self.storage = storage
239
self.backlog = backlog
240
self.max_rate_period = max_rate_period
241
self.downmeasure = downmeasure
242
self.numpieces = numpieces
243
self.snub_time = snub_time
244
self.measurefunc = measurefunc
247
def make_download(self, connection):
248
self.downloads.append(SingleDownload(self, connection))
249
return self.downloads[-1]
252
def __init__(self, num, r):
253
self.stuff = range(num)
256
def next(self, wantfunc, seed):
262
def lost_have(self, pos):
263
self.r.append('lost have')
265
def got_have(self, pos):
266
self.r.append('got have')
268
def requested(self, pos, seed):
269
self.r.append('requested')
271
def complete(self, pos):
272
self.stuff.remove(pos)
273
self.r.append('complete')
275
def am_I_complete(self):
282
def __init__(self, remaining, have_endgame = False, numpieces = 1):
283
self.remaining = remaining
284
self.active = [[] for i in xrange(numpieces)]
286
self.have_endgame = have_endgame
288
def do_I_have_requests(self, index):
289
return self.remaining[index] != []
291
def request_lost(self, index, begin, length):
293
self.active[index].remove(x)
294
self.remaining[index].append(x)
295
self.remaining[index].sort()
297
def piece_came_in(self, index, begin, piece):
298
self.active[index].remove((begin, len(piece)))
301
def do_I_have(self, index):
302
return (self.remaining[index] == [] and
303
self.active[index] == [])
305
def new_request(self, index):
306
x = self.remaining[index].pop()
307
for i in self.remaining:
312
self.active[index].append(x)
313
self.active[index].sort()
316
def is_endgame(self):
317
return self.have_endgame and self.endgame
319
class DummyConnection:
320
def __init__(self, events):
323
def send_interested(self):
324
self.events.append('interested')
326
def send_not_interested(self):
327
self.events.append('not interested')
329
def send_request(self, index, begin, length):
330
self.events.append(('request', index, begin, length))
332
def send_cancel(self, index, begin, length):
333
self.events.append(('cancel', index, begin, length))
335
def test_stops_at_backlog():
336
ds = DummyStorage([[(0, 2), (2, 2), (4, 2), (6, 2)]])
338
d = Downloader(ds, DummyPicker(len(ds.remaining), events), 2, 15, 1, Measure(15), 10)
339
sd = d.make_download(DummyConnection(events))
341
assert ds.remaining == [[(0, 2), (2, 2), (4, 2), (6, 2)]]
342
assert ds.active == [[]]
343
sd.got_have_bitfield(Bitfield(1, chr(0x80)))
344
assert events == ['got have', 'interested']
346
assert ds.remaining == [[(0, 2), (2, 2), (4, 2), (6, 2)]]
347
assert ds.active == [[]]
349
assert events == ['requested', ('request', 0, 6, 2), 'requested', ('request', 0, 4, 2)]
351
assert ds.remaining == [[(0, 2), (2, 2)]]
352
assert ds.active == [[(4, 2), (6, 2)]]
353
sd.got_piece(0, 4, 'ab')
354
assert events == ['requested', ('request', 0, 2, 2)]
356
assert ds.remaining == [[(0, 2)]]
357
assert ds.active == [[(2, 2), (6, 2)]]
359
def test_got_have_single():
360
ds = DummyStorage([[(0, 2)]])
362
d = Downloader(ds, DummyPicker(len(ds.remaining), events), 2, 15, 1, Measure(15), 10)
363
sd = d.make_download(DummyConnection(events))
365
assert ds.remaining == [[(0, 2)]]
366
assert ds.active == [[]]
369
assert ds.remaining == [[(0, 2)]]
370
assert ds.active == [[]]
372
assert events == ['got have', 'interested', 'requested', ('request', 0, 0, 2)]
374
assert ds.remaining == [[]]
375
assert ds.active == [[(0, 2)]]
377
assert events == ['lost have']
379
def test_choke_clears_active():
380
ds = DummyStorage([[(0, 2)]])
382
d = Downloader(ds, DummyPicker(len(ds.remaining), events), 2, 15, 1, Measure(15), 10)
383
sd1 = d.make_download(DummyConnection(events))
384
sd2 = d.make_download(DummyConnection(events))
386
assert ds.remaining == [[(0, 2)]]
387
assert ds.active == [[]]
390
assert events == ['got have', 'interested', 'requested', ('request', 0, 0, 2)]
392
assert ds.remaining == [[]]
393
assert ds.active == [[(0, 2)]]
396
assert events == ['got have']
398
assert ds.remaining == [[]]
399
assert ds.active == [[(0, 2)]]
401
assert events == ['interested', 'requested', ('request', 0, 0, 2), 'not interested']
403
assert ds.remaining == [[]]
404
assert ds.active == [[(0, 2)]]
405
sd2.got_piece(0, 0, 'ab')
406
assert events == ['complete', 'not interested']
408
assert ds.remaining == [[]]
409
assert ds.active == [[]]
412
ds = DummyStorage([[(0, 2)], [(0, 2)], [(0, 2)]], True, 3)
414
d = Downloader(ds, DummyPicker(len(ds.remaining), events), 10, 15, 3, Measure(15), 10)
419
sd1 = d.make_download(DummyConnection(ev1))
420
sd2 = d.make_download(DummyConnection(ev2))
421
sd3 = d.make_download(DummyConnection(ev3))
424
assert ev1 == ['interested', ('request', 0, 0, 2)]
430
assert ev2 == ['interested', ('request', 1, 0, 2)]
437
assert (ev3 == ['interested', ('request', 2, 0, 2), ('request', 0, 0, 2), ('request', 1, 0, 2)] or
438
ev3 == ['interested', ('request', 2, 0, 2), ('request', 1, 0, 2), ('request', 0, 0, 2)])
440
assert ev2 == [('request', 0, 0, 2)]
443
sd2.got_piece(0, 0, 'ab')
444
assert ev1 == [('cancel', 0, 0, 2), 'not interested']
447
assert ev3 == [('cancel', 0, 0, 2)]
456
assert (ev3 == [('request', 2, 0, 2), ('request', 1, 0, 2)] or
457
ev3 == [('request', 1, 0, 2), ('request', 2, 0, 2)])
462
sd4 = d.make_download(DummyConnection(ev4))
463
sd4.got_have_bitfield([True, True, True])
464
assert ev4 == ['interested']
467
assert (ev4 == [('request', 2, 0, 2), ('request', 1, 0, 2)] or
468
ev4 == [('request', 1, 0, 2), ('request', 2, 0, 2)])
473
def test_stops_at_backlog_endgame():
474
ds = DummyStorage([[(2, 2), (0, 2)], [(2, 2), (0, 2)], [(0, 2)]], True, 3)
476
d = Downloader(ds, DummyPicker(len(ds.remaining), events), 3, 15, 3, Measure(15), 10)
480
sd1 = d.make_download(DummyConnection(ev1))
481
sd2 = d.make_download(DummyConnection(ev2))
482
sd3 = d.make_download(DummyConnection(ev3))
486
assert ev1 == ['interested', ('request', 0, 0, 2), ('request', 0, 2, 2)]
493
assert ev2 == ['interested', ('request', 1, 0, 2), ('request', 1, 2, 2)]
498
assert (ev2 == [('request', 0, 0, 2)] or
499
ev2 == [('request', 0, 2, 2)])
503
sd1.got_piece(0, n, 'ab')
505
assert ev2 == [('cancel', 0, n, 2), ('request', 0, 2-n, 2)]