~cosmin.lupu/+junk/penguintv

« back to all changes in this revision

Viewing changes to penguintv/ptvbittorrent/Connecter.py

  • Committer: cosmin.lupu at gmail
  • Date: 2010-04-27 16:47:43 UTC
  • Revision ID: cosmin.lupu@gmail.com-20100427164743-ds8xrqonipp5ovdf
initial packaging

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Written by Bram Cohen
 
2
# see LICENSE.txt for license information
 
3
 
 
4
from bitfield import Bitfield
 
5
from binascii import b2a_hex
 
6
from CurrentRateMeasure import Measure
 
7
 
 
8
def toint(s):
 
9
    return long(b2a_hex(s), 16)
 
10
 
 
11
def tobinary(i):
 
12
    return (chr(i >> 24) + chr((i >> 16) & 0xFF) + 
 
13
        chr((i >> 8) & 0xFF) + chr(i & 0xFF))
 
14
 
 
15
CHOKE = chr(0)
 
16
UNCHOKE = chr(1)
 
17
INTERESTED = chr(2)
 
18
NOT_INTERESTED = chr(3)
 
19
# index
 
20
HAVE = chr(4)
 
21
# index, bitfield
 
22
BITFIELD = chr(5)
 
23
# index, begin, length
 
24
REQUEST = chr(6)
 
25
# index, begin, piece
 
26
PIECE = chr(7)
 
27
# index, begin, piece
 
28
CANCEL = chr(8)
 
29
 
 
30
class Connection:
 
31
    def __init__(self, connection, connecter):
 
32
        self.connection = connection
 
33
        self.connecter = connecter
 
34
        self.got_anything = False
 
35
 
 
36
    def get_ip(self):
 
37
        return self.connection.get_ip()
 
38
 
 
39
    def get_id(self):
 
40
        return self.connection.get_id()
 
41
 
 
42
    def close(self):
 
43
        self.connection.close()
 
44
 
 
45
    def is_flushed(self):
 
46
        if self.connecter.rate_capped:
 
47
            return False
 
48
        return self.connection.is_flushed()
 
49
 
 
50
    def is_locally_initiated(self):
 
51
        return self.connection.is_locally_initiated()
 
52
 
 
53
    def send_interested(self):
 
54
        self.connection.send_message(INTERESTED)
 
55
 
 
56
    def send_not_interested(self):
 
57
        self.connection.send_message(NOT_INTERESTED)
 
58
 
 
59
    def send_choke(self):
 
60
        self.connection.send_message(CHOKE)
 
61
 
 
62
    def send_unchoke(self):
 
63
        self.connection.send_message(UNCHOKE)
 
64
 
 
65
    def send_request(self, index, begin, length):
 
66
        self.connection.send_message(REQUEST + tobinary(index) + 
 
67
            tobinary(begin) + tobinary(length))
 
68
 
 
69
    def send_cancel(self, index, begin, length):
 
70
        self.connection.send_message(CANCEL + tobinary(index) + 
 
71
            tobinary(begin) + tobinary(length))
 
72
 
 
73
    def send_piece(self, index, begin, piece):
 
74
        assert not self.connecter.rate_capped
 
75
        self.connecter._update_upload_rate(len(piece))
 
76
        self.connection.send_message(PIECE + tobinary(index) + 
 
77
            tobinary(begin) + piece)
 
78
 
 
79
    def send_bitfield(self, bitfield):
 
80
        self.connection.send_message(BITFIELD + bitfield)
 
81
 
 
82
    def send_have(self, index):
 
83
        self.connection.send_message(HAVE + tobinary(index))
 
84
 
 
85
    def get_upload(self):
 
86
        return self.upload
 
87
 
 
88
    def get_download(self):
 
89
        return self.download
 
90
 
 
91
class Connecter:
 
92
    def __init__(self, make_upload, downloader, choker, numpieces,
 
93
            totalup, max_upload_rate = 0, sched = None):
 
94
        self.downloader = downloader
 
95
        self.make_upload = make_upload
 
96
        self.choker = choker
 
97
        self.numpieces = numpieces
 
98
        self.max_upload_rate = max_upload_rate
 
99
        self.sched = sched
 
100
        self.totalup = totalup
 
101
        self.rate_capped = False
 
102
        self.connections = {}
 
103
 
 
104
    def _update_upload_rate(self, amount):
 
105
        self.totalup.update_rate(amount)
 
106
        if self.max_upload_rate > 0 and self.totalup.get_rate_noupdate() > self.max_upload_rate:
 
107
            self.rate_capped = True
 
108
            self.sched(self._uncap, self.totalup.time_until_rate(self.max_upload_rate))
 
109
 
 
110
    def _uncap(self):
 
111
        self.rate_capped = False
 
112
        while not self.rate_capped:
 
113
            up = None
 
114
            minrate = None
 
115
            for i in self.connections.values():
 
116
                if not i.upload.is_choked() and i.upload.has_queries() and i.connection.is_flushed():
 
117
                    rate = i.upload.get_rate()
 
118
                    if up is None or rate < minrate:
 
119
                        up = i.upload
 
120
                        minrate = rate
 
121
            if up is None:
 
122
                break
 
123
            up.flushed()
 
124
            if self.totalup.get_rate_noupdate() > self.max_upload_rate:
 
125
                break
 
126
 
 
127
    def change_max_upload_rate(self, newval):
 
128
        def foo(self=self, newval=newval):
 
129
            self._change_max_upload_rate(newval)
 
130
        self.sched(foo, 0);
 
131
        
 
132
    def _change_max_upload_rate(self, newval):
 
133
        self.max_upload_rate = newval
 
134
        self._uncap()
 
135
        
 
136
    def how_many_connections(self):
 
137
        return len(self.connections)
 
138
 
 
139
    def connection_made(self, connection):
 
140
        c = Connection(connection, self)
 
141
        self.connections[connection] = c
 
142
        c.upload = self.make_upload(c)
 
143
        c.download = self.downloader.make_download(c)
 
144
        self.choker.connection_made(c)
 
145
 
 
146
    def connection_lost(self, connection):
 
147
        c = self.connections[connection]
 
148
        d = c.download
 
149
        del self.connections[connection]
 
150
        d.disconnected()
 
151
        self.choker.connection_lost(c)
 
152
 
 
153
    def connection_flushed(self, connection):
 
154
        self.connections[connection].upload.flushed()
 
155
 
 
156
    def got_message(self, connection, message):
 
157
        c = self.connections[connection]
 
158
        t = message[0]
 
159
        if t == BITFIELD and c.got_anything:
 
160
            connection.close()
 
161
            return
 
162
        c.got_anything = True
 
163
        if (t in [CHOKE, UNCHOKE, INTERESTED, NOT_INTERESTED] and 
 
164
                len(message) != 1):
 
165
            connection.close()
 
166
            return
 
167
        if t == CHOKE:
 
168
            c.download.got_choke()
 
169
        elif t == UNCHOKE:
 
170
            c.download.got_unchoke()
 
171
        elif t == INTERESTED:
 
172
            c.upload.got_interested()
 
173
        elif t == NOT_INTERESTED:
 
174
            c.upload.got_not_interested()
 
175
        elif t == HAVE:
 
176
            if len(message) != 5:
 
177
                connection.close()
 
178
                return
 
179
            i = toint(message[1:])
 
180
            if i >= self.numpieces:
 
181
                connection.close()
 
182
                return
 
183
            c.download.got_have(i)
 
184
        elif t == BITFIELD:
 
185
            try:
 
186
                b = Bitfield(self.numpieces, message[1:])
 
187
            except ValueError:
 
188
                connection.close()
 
189
                return
 
190
            c.download.got_have_bitfield(b)
 
191
        elif t == REQUEST:
 
192
            if len(message) != 13:
 
193
                connection.close()
 
194
                return
 
195
            i = toint(message[1:5])
 
196
            if i >= self.numpieces:
 
197
                connection.close()
 
198
                return
 
199
            c.upload.got_request(i, toint(message[5:9]), 
 
200
                toint(message[9:]))
 
201
        elif t == CANCEL:
 
202
            if len(message) != 13:
 
203
                connection.close()
 
204
                return
 
205
            i = toint(message[1:5])
 
206
            if i >= self.numpieces:
 
207
                connection.close()
 
208
                return
 
209
            c.upload.got_cancel(i, toint(message[5:9]), 
 
210
                toint(message[9:]))
 
211
        elif t == PIECE:
 
212
            if len(message) <= 9:
 
213
                connection.close()
 
214
                return
 
215
            i = toint(message[1:5])
 
216
            if i >= self.numpieces:
 
217
                connection.close()
 
218
                return
 
219
            if c.download.got_piece(i, toint(message[5:9]), message[9:]):
 
220
                for co in self.connections.values():
 
221
                    co.send_have(i)
 
222
        else:
 
223
            connection.close()
 
224
 
 
225
class DummyUpload:
 
226
    def __init__(self, events):
 
227
        self.events = events
 
228
        events.append('made upload')
 
229
 
 
230
    def flushed(self):
 
231
        self.events.append('flushed')
 
232
 
 
233
    def got_interested(self):
 
234
        self.events.append('interested')
 
235
        
 
236
    def got_not_interested(self):
 
237
        self.events.append('not interested')
 
238
 
 
239
    def got_request(self, index, begin, length):
 
240
        self.events.append(('request', index, begin, length))
 
241
 
 
242
    def got_cancel(self, index, begin, length):
 
243
        self.events.append(('cancel', index, begin, length))
 
244
 
 
245
class DummyDownload:
 
246
    def __init__(self, events):
 
247
        self.events = events
 
248
        events.append('made download')
 
249
        self.hit = 0
 
250
 
 
251
    def disconnected(self):
 
252
        self.events.append('disconnected')
 
253
 
 
254
    def got_choke(self):
 
255
        self.events.append('choke')
 
256
 
 
257
    def got_unchoke(self):
 
258
        self.events.append('unchoke')
 
259
 
 
260
    def got_have(self, i):
 
261
        self.events.append(('have', i))
 
262
 
 
263
    def got_have_bitfield(self, bitfield):
 
264
        self.events.append(('bitfield', bitfield.tostring()))
 
265
 
 
266
    def got_piece(self, index, begin, piece):
 
267
        self.events.append(('piece', index, begin, piece))
 
268
        self.hit += 1
 
269
        return self.hit > 1
 
270
 
 
271
class DummyDownloader:
 
272
    def __init__(self, events):
 
273
        self.events = events
 
274
 
 
275
    def make_download(self, connection):
 
276
        return DummyDownload(self.events)
 
277
 
 
278
class DummyConnection:
 
279
    def __init__(self, events):
 
280
        self.events = events
 
281
 
 
282
    def send_message(self, message):
 
283
        self.events.append(('m', message))
 
284
 
 
285
class DummyChoker:
 
286
    def __init__(self, events, cs):
 
287
        self.events = events
 
288
        self.cs = cs
 
289
 
 
290
    def connection_made(self, c):
 
291
        self.events.append('made')
 
292
        self.cs.append(c)
 
293
 
 
294
    def connection_lost(self, c):
 
295
        self.events.append('lost')
 
296
 
 
297
def test_operation():
 
298
    events = []
 
299
    cs = []
 
300
    co = Connecter(lambda c, events = events: DummyUpload(events), 
 
301
        DummyDownloader(events), DummyChoker(events, cs), 3, 
 
302
        Measure(10))
 
303
    assert events == []
 
304
    assert cs == []
 
305
    
 
306
    dc = DummyConnection(events)
 
307
    co.connection_made(dc)
 
308
    assert len(cs) == 1
 
309
    cc = cs[0]
 
310
    co.got_message(dc, BITFIELD + chr(0xc0))
 
311
    co.got_message(dc, CHOKE)
 
312
    co.got_message(dc, UNCHOKE)
 
313
    co.got_message(dc, INTERESTED)
 
314
    co.got_message(dc, NOT_INTERESTED)
 
315
    co.got_message(dc, HAVE + tobinary(2))
 
316
    co.got_message(dc, REQUEST + tobinary(1) + tobinary(5) + tobinary(6))
 
317
    co.got_message(dc, CANCEL + tobinary(2) + tobinary(3) + tobinary(4))
 
318
    co.got_message(dc, PIECE + tobinary(1) + tobinary(0) + 'abc')
 
319
    co.got_message(dc, PIECE + tobinary(1) + tobinary(3) + 'def')
 
320
    co.connection_flushed(dc)
 
321
    cc.send_bitfield(chr(0x60))
 
322
    cc.send_interested()
 
323
    cc.send_not_interested()
 
324
    cc.send_choke()
 
325
    cc.send_unchoke()
 
326
    cc.send_have(4)
 
327
    cc.send_request(0, 2, 1)
 
328
    cc.send_cancel(1, 2, 3)
 
329
    cc.send_piece(1, 2, 'abc')
 
330
    co.connection_lost(dc)
 
331
    x = ['made upload', 'made download', 'made', 
 
332
        ('bitfield', chr(0xC0)), 'choke', 'unchoke',
 
333
        'interested', 'not interested', ('have', 2), 
 
334
        ('request', 1, 5, 6), ('cancel', 2, 3, 4),
 
335
        ('piece', 1, 0, 'abc'), ('piece', 1, 3, 'def'), 
 
336
        ('m', HAVE + tobinary(1)),
 
337
        'flushed', ('m', BITFIELD + chr(0x60)), ('m', INTERESTED), 
 
338
        ('m', NOT_INTERESTED), ('m', CHOKE), ('m', UNCHOKE), 
 
339
        ('m', HAVE + tobinary(4)), ('m', REQUEST + tobinary(0) + 
 
340
        tobinary(2) + tobinary(1)), ('m', CANCEL + tobinary(1) + 
 
341
        tobinary(2) + tobinary(3)), ('m', PIECE + tobinary(1) + 
 
342
        tobinary(2) + 'abc'), 'disconnected', 'lost']
 
343
    for a, b in zip (events, x):
 
344
        assert a == b, repr((a, b))
 
345
 
 
346
def test_conversion():
 
347
    assert toint(tobinary(50000)) == 50000