~ubuntu-branches/ubuntu/hardy/transmission/hardy-updates

« back to all changes in this revision

Viewing changes to libtransmission/peer-msgs.c

  • Committer: Bazaar Package Importer
  • Author(s): Philipp Benner
  • Date: 2007-10-26 16:02:39 UTC
  • mto: This revision was merged to the branch mainline in revision 6.
  • Revision ID: james.westby@ubuntu.com-20071026160239-2c0agn7q1ken0xsp
Tags: upstream-0.90.dfsg
ImportĀ upstreamĀ versionĀ 0.90.dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
 
3
 *
 
4
 * This file is licensed by the GPL version 2.  Works owned by the
 
5
 * Transmission project are granted a special exemption to clause 2(b)
 
6
 * so that the bulk of its code can remain under the MIT license. 
 
7
 * This exemption does not extend to derived works not owned by
 
8
 * the Transmission project.
 
9
 *
 
10
 * $Id: peer-msgs.c 3501 2007-10-22 23:27:47Z charles $
 
11
 */
 
12
 
 
13
#include <assert.h>
 
14
#include <ctype.h>
 
15
#include <errno.h>
 
16
#include <stdio.h>
 
17
#include <stdlib.h>
 
18
#include <string.h>
 
19
#include <libgen.h> /* basename */
 
20
 
 
21
#include <arpa/inet.h>
 
22
 
 
23
#include <sys/types.h> /* event.h needs this */
 
24
#include <event.h>
 
25
 
 
26
#include "transmission.h"
 
27
#include "bencode.h"
 
28
#include "completion.h"
 
29
#include "inout.h"
 
30
#include "list.h"
 
31
#include "peer-io.h"
 
32
#include "peer-mgr.h"
 
33
#include "peer-mgr-private.h"
 
34
#include "peer-msgs.h"
 
35
#include "ratecontrol.h"
 
36
#include "trevent.h"
 
37
#include "utils.h"
 
38
 
 
39
/**
 
40
***
 
41
**/
 
42
 
 
43
#define MAX_ALLOWED_SET_COUNT   10 /* number of pieces generated for allow-fast,
 
44
                                      threshold for fast-allowing others */
 
45
 
 
46
enum
 
47
{
 
48
    BT_CHOKE                = 0,
 
49
    BT_UNCHOKE              = 1,
 
50
    BT_INTERESTED           = 2,
 
51
    BT_NOT_INTERESTED       = 3,
 
52
    BT_HAVE                 = 4,
 
53
    BT_BITFIELD             = 5,
 
54
    BT_REQUEST              = 6,
 
55
    BT_PIECE                = 7,
 
56
    BT_CANCEL               = 8,
 
57
    BT_PORT                 = 9,
 
58
    BT_SUGGEST              = 13,
 
59
    BT_HAVE_ALL             = 14,
 
60
    BT_HAVE_NONE            = 15,
 
61
    BT_REJECT               = 16,
 
62
    BT_ALLOWED_FAST         = 17,
 
63
    BT_LTEP                 = 20,
 
64
 
 
65
    LTEP_HANDSHAKE          = 0,
 
66
 
 
67
    OUR_LTEP_PEX            = 1,
 
68
 
 
69
    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
 
70
 
 
71
    KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
 
72
    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
 
73
    PEER_PULSE_INTERVAL     = (133),        /* msec between calls to pulse() */
 
74
    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
 
75
};
 
76
 
 
77
enum
 
78
{
 
79
    AWAITING_BT_LENGTH,
 
80
    AWAITING_BT_MESSAGE,
 
81
    READING_BT_PIECE
 
82
};
 
83
 
 
84
struct peer_request
 
85
{
 
86
    uint32_t index;
 
87
    uint32_t offset;
 
88
    uint32_t length;
 
89
    time_t time_requested;
 
90
};
 
91
 
 
92
static int
 
93
compareRequest( const void * va, const void * vb )
 
94
{
 
95
    struct peer_request * a = (struct peer_request*) va;
 
96
    struct peer_request * b = (struct peer_request*) vb;
 
97
    if( a->index != b->index ) return a->index - b->index;
 
98
    if( a->offset != b->offset ) return a->offset - b->offset;
 
99
    if( a->length != b->length ) return a->length - b->length;
 
100
    return 0;
 
101
}
 
102
 
 
103
struct tr_peermsgs
 
104
{
 
105
    tr_peer * info;
 
106
 
 
107
    tr_handle * handle;
 
108
    tr_torrent * torrent;
 
109
    tr_peerIo * io;
 
110
 
 
111
    tr_publisher_t * publisher;
 
112
 
 
113
    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
 
114
    struct evbuffer * inBlock;     /* the block we're currently receiving */
 
115
    tr_list * peerAskedFor;
 
116
    tr_list * clientAskedFor;
 
117
    tr_list * clientWillAskFor;
 
118
 
 
119
    tr_timer * rateTimer;
 
120
    tr_timer * pulseTimer;
 
121
    tr_timer * pexTimer;
 
122
 
 
123
    struct peer_request blockToUs; /* the block currntly being sent to us */
 
124
 
 
125
    time_t lastReqAddedAt;
 
126
    time_t clientSentPexAt;
 
127
    time_t clientSentAnythingAt;
 
128
 
 
129
    unsigned int notListening             : 1;
 
130
    unsigned int peerSupportsPex          : 1;
 
131
    unsigned int clientSentLtepHandshake  : 1;
 
132
    unsigned int peerSentLtepHandshake    : 1;
 
133
    
 
134
    tr_bitfield * clientAllowedPieces;
 
135
    tr_bitfield * peerAllowedPieces;
 
136
    
 
137
    uint8_t state;
 
138
    uint8_t ut_pex_id;
 
139
    uint16_t pexCount;
 
140
    uint32_t incomingMessageLength;
 
141
    uint32_t maxActiveRequests;
 
142
    uint32_t minActiveRequests;
 
143
 
 
144
    tr_pex * pex;
 
145
};
 
146
 
 
147
/**
 
148
***
 
149
**/
 
150
 
 
151
static void
 
152
myDebug( const char * file, int line,
 
153
         const struct tr_peermsgs * msgs,
 
154
         const char * fmt, ... )
 
155
{
 
156
    FILE * fp = tr_getLog( );
 
157
    if( fp != NULL )
 
158
    {
 
159
        va_list args;
 
160
        char timestr[64];
 
161
        struct evbuffer * buf = evbuffer_new( );
 
162
        char * myfile = tr_strdup( file );
 
163
 
 
164
        evbuffer_add_printf( buf, "[%s] %s [%s]: ",
 
165
                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
 
166
                             tr_peerIoGetAddrStr( msgs->io ),
 
167
                             msgs->info->client );
 
168
        va_start( args, fmt );
 
169
        evbuffer_add_vprintf( buf, fmt, args );
 
170
        va_end( args );
 
171
        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
 
172
        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
 
173
 
 
174
        tr_free( myfile );
 
175
        evbuffer_free( buf );
 
176
    }
 
177
}
 
178
 
 
179
#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
 
180
 
 
181
/**
 
182
***
 
183
**/
 
184
 
 
185
static void
 
186
protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
 
187
{
 
188
    tr_peerIo * io = msgs->io;
 
189
    struct evbuffer * out = msgs->outMessages;
 
190
 
 
191
    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
 
192
    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
 
193
    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
 
194
    tr_peerIoWriteUint32( io, out, req->index );
 
195
    tr_peerIoWriteUint32( io, out, req->offset );
 
196
    tr_peerIoWriteUint32( io, out, req->length );
 
197
}
 
198
 
 
199
static void
 
200
protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
 
201
{
 
202
    tr_peerIo * io = msgs->io;
 
203
    struct evbuffer * out = msgs->outMessages;
 
204
 
 
205
    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
 
206
    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
 
207
    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
 
208
    tr_peerIoWriteUint32( io, out, req->index );
 
209
    tr_peerIoWriteUint32( io, out, req->offset );
 
210
    tr_peerIoWriteUint32( io, out, req->length );
 
211
}
 
212
 
 
213
static void
 
214
protocolSendHave( tr_peermsgs * msgs, uint32_t index )
 
215
{
 
216
    tr_peerIo * io = msgs->io;
 
217
    struct evbuffer * out = msgs->outMessages;
 
218
 
 
219
    dbgmsg( msgs, "sending Have %u", index );
 
220
    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
 
221
    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
 
222
    tr_peerIoWriteUint32( io, out, index );
 
223
}
 
224
 
 
225
static void
 
226
protocolSendChoke( tr_peermsgs * msgs, int choke )
 
227
{
 
228
    tr_peerIo * io = msgs->io;
 
229
    struct evbuffer * out = msgs->outMessages;
 
230
 
 
231
    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
 
232
    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
 
233
    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
 
234
}
 
235
 
 
236
static void
 
237
protocolSendPiece( tr_peermsgs                * msgs,
 
238
                   const struct peer_request  * r,
 
239
                   const uint8_t              * pieceData )
 
240
{
 
241
    tr_peerIo * io = msgs->io;
 
242
    struct evbuffer * out = evbuffer_new( );
 
243
 
 
244
    dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
 
245
    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
 
246
    tr_peerIoWriteUint8 ( io, out, BT_PIECE );
 
247
    tr_peerIoWriteUint32( io, out, r->index );
 
248
    tr_peerIoWriteUint32( io, out, r->offset );
 
249
    tr_peerIoWriteBytes ( io, out, pieceData, r->length );
 
250
    tr_peerIoWriteBuf   ( io, out );
 
251
 
 
252
    evbuffer_free( out );
 
253
}
 
254
 
 
255
/**
 
256
***  EVENTS
 
257
**/
 
258
 
 
259
static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
 
260
 
 
261
static void
 
262
publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
 
263
{
 
264
    tr_publisherPublish( msgs->publisher, msgs->info, e );
 
265
}
 
266
 
 
267
static void
 
268
fireGotError( tr_peermsgs * msgs )
 
269
{
 
270
    tr_peermsgs_event e = blankEvent;
 
271
    e.eventType = TR_PEERMSG_GOT_ERROR;
 
272
    publish( msgs, &e );
 
273
}
 
274
 
 
275
static void
 
276
fireNeedReq( tr_peermsgs * msgs )
 
277
{
 
278
    tr_peermsgs_event e = blankEvent;
 
279
    e.eventType = TR_PEERMSG_NEED_REQ;
 
280
    publish( msgs, &e );
 
281
}
 
282
 
 
283
static void
 
284
firePeerProgress( tr_peermsgs * msgs )
 
285
{
 
286
    tr_peermsgs_event e = blankEvent;
 
287
    e.eventType = TR_PEERMSG_PEER_PROGRESS;
 
288
    e.progress = msgs->info->progress;
 
289
    publish( msgs, &e );
 
290
}
 
291
 
 
292
static void
 
293
fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
 
294
{
 
295
    tr_peermsgs_event e = blankEvent;
 
296
    e.eventType = TR_PEERMSG_CLIENT_HAVE;
 
297
    e.pieceIndex = pieceIndex;
 
298
    publish( msgs, &e );
 
299
}
 
300
 
 
301
static void
 
302
fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
 
303
{
 
304
    tr_peermsgs_event e = blankEvent;
 
305
    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
 
306
    e.pieceIndex = pieceIndex;
 
307
    e.offset = offset;
 
308
    e.length = length;
 
309
    publish( msgs, &e );
 
310
}
 
311
 
 
312
static void
 
313
fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
 
314
{
 
315
    tr_peermsgs_event e = blankEvent;
 
316
    e.eventType = TR_PEERMSG_CANCEL;
 
317
    e.pieceIndex = req->index;
 
318
    e.offset = req->offset;
 
319
    e.length = req->length;
 
320
    publish( msgs, &e );
 
321
}
 
322
 
 
323
/**
 
324
***  INTEREST
 
325
**/
 
326
 
 
327
static int
 
328
isPieceInteresting( const tr_peermsgs   * peer,
 
329
                    int                   piece )
 
330
{
 
331
    const tr_torrent * torrent = peer->torrent;
 
332
    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
 
333
        return FALSE;
 
334
    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
 
335
        return FALSE;
 
336
    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
 
337
        return FALSE;
 
338
    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
 
339
        return FALSE;
 
340
    return TRUE;
 
341
}
 
342
 
 
343
/* "interested" means we'll ask for piece data if they unchoke us */
 
344
static int
 
345
isPeerInteresting( const tr_peermsgs * msgs )
 
346
{
 
347
    int i;
 
348
    const tr_torrent * torrent;
 
349
    const tr_bitfield * bitfield;
 
350
    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
 
351
 
 
352
    if( clientIsSeed )
 
353
        return FALSE;
 
354
 
 
355
    torrent = msgs->torrent;
 
356
    bitfield = tr_cpPieceBitfield( torrent->completion );
 
357
 
 
358
    if( !msgs->info->have )
 
359
        return TRUE;
 
360
 
 
361
    assert( bitfield->len == msgs->info->have->len );
 
362
    for( i=0; i<torrent->info.pieceCount; ++i )
 
363
        if( isPieceInteresting( msgs, i ) )
 
364
            return TRUE;
 
365
 
 
366
    return FALSE;
 
367
}
 
368
 
 
369
static void
 
370
sendInterest( tr_peermsgs * msgs, int weAreInterested )
 
371
{
 
372
    assert( msgs != NULL );
 
373
    assert( weAreInterested==0 || weAreInterested==1 );
 
374
 
 
375
    msgs->info->clientIsInterested = weAreInterested;
 
376
    dbgmsg( msgs, "Sending %s",
 
377
            weAreInterested ? "Interested" : "Not Interested");
 
378
 
 
379
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
 
380
    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
 
381
                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
 
382
}
 
383
 
 
384
static void
 
385
updateInterest( tr_peermsgs * msgs )
 
386
{
 
387
    const int i = isPeerInteresting( msgs );
 
388
    if( i != msgs->info->clientIsInterested )
 
389
        sendInterest( msgs, i );
 
390
    if( i )
 
391
        fireNeedReq( msgs );
 
392
}
 
393
 
 
394
#define MIN_CHOKE_PERIOD_SEC 10
 
395
 
 
396
void
 
397
tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
 
398
{
 
399
    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
 
400
 
 
401
    assert( msgs != NULL );
 
402
    assert( msgs->info != NULL );
 
403
    assert( choke==0 || choke==1 );
 
404
 
 
405
    if( msgs->info->chokeChangedAt > fibrillationTime )
 
406
    {
 
407
        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
 
408
    }
 
409
    else if( msgs->info->peerIsChoked != choke )
 
410
    {
 
411
        msgs->info->peerIsChoked = choke;
 
412
        
 
413
        if( choke )
 
414
        {
 
415
            tr_list * walk;
 
416
            for( walk = msgs->peerAskedFor; walk != NULL; )
 
417
            {
 
418
                tr_list * next = walk->next;
 
419
                /* don't reject a peer's fast allowed requests at choke */
 
420
                struct peer_request *req = walk->data;
 
421
                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
 
422
                {
 
423
                    tr_list_remove_data( &msgs->peerAskedFor, req );
 
424
                    tr_free( req );
 
425
                }
 
426
                walk = next;
 
427
            }
 
428
        }
 
429
 
 
430
        protocolSendChoke( msgs, choke );
 
431
        msgs->info->chokeChangedAt = time( NULL );
 
432
    }
 
433
}
 
434
 
 
435
/**
 
436
***
 
437
**/
 
438
 
 
439
void
 
440
tr_peerMsgsHave( tr_peermsgs * msgs,
 
441
                 uint32_t      index )
 
442
{
 
443
    protocolSendHave( msgs, index );
 
444
 
 
445
    /* since we have more pieces now, we might not be interested in this peer */
 
446
    updateInterest( msgs );
 
447
}
 
448
#if 0
 
449
static void
 
450
sendFastSuggest( tr_peermsgs * msgs,
 
451
                 uint32_t      pieceIndex )
 
452
{
 
453
    dbgmsg( msgs, "w00t SUGGESTing them piece #%d", pieceIndex );
 
454
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
 
455
    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
 
456
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
 
457
    
 
458
    updateInterest( msgs );
 
459
}
 
460
#endif
 
461
static void
 
462
sendFastHave( tr_peermsgs * msgs,
 
463
              int           all)
 
464
{
 
465
    dbgmsg( msgs, "w00t telling them we %s pieces", (all ? "HAVE_ALL" : "HAVE_NONE" ) );
 
466
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
 
467
    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL : BT_HAVE_NONE ) );
 
468
    
 
469
    updateInterest( msgs );
 
470
}
 
471
 
 
472
static void
 
473
sendFastReject( tr_peermsgs * msgs,
 
474
                uint32_t      pieceIndex,
 
475
                uint32_t      offset,
 
476
                uint32_t      length )
 
477
{
 
478
    assert( msgs != NULL );
 
479
    assert( length > 0 );
 
480
    
 
481
    /* reject the request */
 
482
    const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
 
483
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
 
484
    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
 
485
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
 
486
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
 
487
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
 
488
}
 
489
 
 
490
static void
 
491
sendFastAllowed( tr_peermsgs * msgs,
 
492
                 uint32_t      pieceIndex)
 
493
{
 
494
    dbgmsg( msgs, "w00t telling them we ALLOW_FAST piece #%d", pieceIndex );
 
495
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
 
496
    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
 
497
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
 
498
}
 
499
 
 
500
 
 
501
 
 
502
static void
 
503
sendFastAllowedSet( tr_peermsgs * msgs )
 
504
{
 
505
    int i = 0;
 
506
    while (i <= msgs->torrent->info.pieceCount )
 
507
    {
 
508
        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
 
509
            sendFastAllowed( msgs, i );
 
510
        i++;
 
511
    }
 
512
}
 
513
 
 
514
 
 
515
/**
 
516
***
 
517
**/
 
518
 
 
519
static int
 
520
reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
 
521
{
 
522
    const tr_torrent * tor = msgs->torrent;
 
523
 
 
524
    if( index >= (uint32_t) tor->info.pieceCount )
 
525
        return FALSE;
 
526
    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
 
527
        return FALSE;
 
528
    if( length > MAX_REQUEST_BYTE_COUNT )
 
529
        return FALSE;
 
530
    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
 
531
        return FALSE;
 
532
 
 
533
    return TRUE;
 
534
}
 
535
 
 
536
static int
 
537
requestIsValid( const tr_peermsgs * msgs, struct peer_request * req )
 
538
{
 
539
    return reqIsValid( msgs, req->index, req->offset, req->length );
 
540
}
 
541
 
 
542
static void
 
543
pumpRequestQueue( tr_peermsgs * msgs )
 
544
{
 
545
    const int max = msgs->maxActiveRequests;
 
546
    const int min = msgs->minActiveRequests;
 
547
    int count = tr_list_size( msgs->clientAskedFor );
 
548
    int sent = 0;
 
549
 
 
550
    if( count > min )
 
551
        return;
 
552
    if( msgs->info->clientIsChoked )
 
553
        return;
 
554
 
 
555
    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
 
556
    {
 
557
        struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor );
 
558
        protocolSendRequest( msgs, req );
 
559
        req->time_requested = msgs->lastReqAddedAt = time( NULL );
 
560
        tr_list_append( &msgs->clientAskedFor, req );
 
561
        ++count;
 
562
        ++sent;
 
563
    }
 
564
 
 
565
    if( sent )
 
566
        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
 
567
                sent,
 
568
                tr_list_size(msgs->clientAskedFor),
 
569
                tr_list_size(msgs->clientWillAskFor) );
 
570
 
 
571
    if( count < max )
 
572
        fireNeedReq( msgs );
 
573
}
 
574
 
 
575
int
 
576
tr_peerMsgsAddRequest( tr_peermsgs * msgs,
 
577
                       uint32_t      index, 
 
578
                       uint32_t      offset, 
 
579
                       uint32_t      length )
 
580
{
 
581
    const int req_max = msgs->maxActiveRequests;
 
582
    struct peer_request tmp, *req;
 
583
 
 
584
    assert( msgs != NULL );
 
585
    assert( msgs->torrent != NULL );
 
586
    assert( reqIsValid( msgs, index, offset, length ) );
 
587
 
 
588
    /**
 
589
    ***  Reasons to decline the request
 
590
    **/
 
591
 
 
592
    /* don't send requests to choked clients */
 
593
    if( msgs->info->clientIsChoked ) {
 
594
        dbgmsg( msgs, "declining request because they're choking us" );
 
595
        return TR_ADDREQ_CLIENT_CHOKED;
 
596
    }
 
597
 
 
598
    /* peer doesn't have this piece */
 
599
    if( !tr_bitfieldHas( msgs->info->have, index ) )
 
600
        return TR_ADDREQ_MISSING;
 
601
 
 
602
    /* peer's queue is full */
 
603
    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
 
604
        dbgmsg( msgs, "declining request because we're full" );
 
605
        return TR_ADDREQ_FULL;
 
606
    }
 
607
 
 
608
    /* have we already asked for this piece? */
 
609
    tmp.index = index;
 
610
    tmp.offset = offset;
 
611
    tmp.length = length;
 
612
    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
 
613
        dbgmsg( msgs, "declining because it's a duplicate" );
 
614
        return TR_ADDREQ_DUPLICATE;
 
615
    }
 
616
    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
 
617
        dbgmsg( msgs, "declining because it's a duplicate" );
 
618
        return TR_ADDREQ_DUPLICATE;
 
619
    }
 
620
 
 
621
    /**
 
622
    ***  Accept this request
 
623
    **/
 
624
 
 
625
    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
 
626
    req = tr_new0( struct peer_request, 1 );
 
627
    *req = tmp;
 
628
    tr_list_append( &msgs->clientWillAskFor, req );
 
629
    return TR_ADDREQ_OK;
 
630
}
 
631
 
 
632
static void
 
633
tr_peerMsgsCancelAllRequests( tr_peermsgs * msgs )
 
634
{
 
635
    struct peer_request * req;
 
636
 
 
637
    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
 
638
    {
 
639
        fireCancelledReq( msgs, req );
 
640
        tr_free( req );
 
641
    }
 
642
 
 
643
    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
 
644
    {
 
645
        fireCancelledReq( msgs, req );
 
646
        protocolSendCancel( msgs, req );
 
647
        tr_free( req );
 
648
    }
 
649
}
 
650
 
 
651
void
 
652
tr_peerMsgsCancel( tr_peermsgs * msgs,
 
653
                   uint32_t      pieceIndex,
 
654
                   uint32_t      offset,
 
655
                   uint32_t      length )
 
656
{
 
657
    struct peer_request *req, tmp;
 
658
 
 
659
    assert( msgs != NULL );
 
660
    assert( length > 0 );
 
661
 
 
662
    /* have we asked the peer for this piece? */
 
663
    tmp.index = pieceIndex;
 
664
    tmp.offset = offset;
 
665
    tmp.length = length;
 
666
 
 
667
    /* if it's only in the queue and hasn't been sent yet, free it */
 
668
    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
 
669
    {
 
670
        fireCancelledReq( msgs, req );
 
671
        tr_free( req );
 
672
    }
 
673
 
 
674
    /* if it's already been sent, send a cancel message too */
 
675
    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
 
676
    {
 
677
        protocolSendCancel( msgs, req );
 
678
        fireCancelledReq( msgs, req );
 
679
        tr_free( req );
 
680
    }
 
681
}
 
682
 
 
683
/**
 
684
***
 
685
**/
 
686
 
 
687
static void
 
688
sendLtepHandshake( tr_peermsgs * msgs )
 
689
{
 
690
    benc_val_t val, *m;
 
691
    char * buf;
 
692
    int len;
 
693
    int pex;
 
694
    const char * v = TR_NAME " " USERAGENT_PREFIX;
 
695
    const int port = tr_getPublicPort( msgs->handle );
 
696
    struct evbuffer * outbuf;
 
697
 
 
698
    if( msgs->clientSentLtepHandshake )
 
699
        return;
 
700
 
 
701
    outbuf = evbuffer_new( );
 
702
    dbgmsg( msgs, "sending an ltep handshake" );
 
703
    msgs->clientSentLtepHandshake = 1;
 
704
 
 
705
    /* decide if we want to advertise pex support */
 
706
    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
 
707
        pex = 0;
 
708
    else if( msgs->peerSentLtepHandshake )
 
709
        pex = msgs->peerSupportsPex ? 1 : 0;
 
710
    else
 
711
        pex = 1;
 
712
 
 
713
    tr_bencInit( &val, TYPE_DICT );
 
714
    tr_bencDictReserve( &val, 4 );
 
715
    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
 
716
    m  = tr_bencDictAdd( &val, "m" );
 
717
    tr_bencInit( m, TYPE_DICT );
 
718
    if( pex ) {
 
719
        tr_bencDictReserve( m, 1 );
 
720
        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
 
721
    }
 
722
    if( port > 0 )
 
723
        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
 
724
    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
 
725
    buf = tr_bencSaveMalloc( &val,  &len );
 
726
 
 
727
    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
 
728
    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
 
729
    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
 
730
    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
 
731
 
 
732
    tr_peerIoWriteBuf( msgs->io, outbuf );
 
733
 
 
734
#if 0
 
735
    dbgmsg( msgs, "here is the ltep handshake we sent:" );
 
736
    tr_bencPrint( &val );
 
737
#endif
 
738
 
 
739
    /* cleanup */
 
740
    tr_bencFree( &val );
 
741
    tr_free( buf );
 
742
    evbuffer_free( outbuf );
 
743
}
 
744
 
 
745
static void
 
746
parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
 
747
{
 
748
    benc_val_t val, * sub;
 
749
    uint8_t * tmp = tr_new( uint8_t, len );
 
750
 
 
751
    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
 
752
    msgs->peerSentLtepHandshake = 1;
 
753
 
 
754
    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
 
755
        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
 
756
        tr_free( tmp );
 
757
        return;
 
758
    }
 
759
 
 
760
#if 0
 
761
    dbgmsg( msgs, "here is the ltep handshake we read:" );
 
762
    tr_bencPrint( &val );
 
763
#endif
 
764
 
 
765
    /* does the peer prefer encrypted connections? */
 
766
    sub = tr_bencDictFind( &val, "e" );
 
767
    if( tr_bencIsInt( sub ) )
 
768
        msgs->info->encryption_preference = sub->val.i
 
769
                                      ? ENCRYPTION_PREFERENCE_YES
 
770
                                      : ENCRYPTION_PREFERENCE_NO;
 
771
 
 
772
    /* check supported messages for utorrent pex */
 
773
    sub = tr_bencDictFind( &val, "m" );
 
774
    if( tr_bencIsDict( sub ) ) {
 
775
        sub = tr_bencDictFind( sub, "ut_pex" );
 
776
        if( tr_bencIsInt( sub ) ) {
 
777
            msgs->peerSupportsPex = 1;
 
778
            msgs->ut_pex_id = (uint8_t) sub->val.i;
 
779
            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
 
780
        }
 
781
    }
 
782
 
 
783
    /* get peer's listening port */
 
784
    sub = tr_bencDictFind( &val, "p" );
 
785
    if( tr_bencIsInt( sub ) ) {
 
786
        msgs->info->port = htons( (uint16_t)sub->val.i );
 
787
        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
 
788
    }
 
789
 
 
790
    tr_bencFree( &val );
 
791
    tr_free( tmp );
 
792
}
 
793
 
 
794
static void
 
795
parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
 
796
{
 
797
    benc_val_t val, * sub;
 
798
    uint8_t * tmp;
 
799
 
 
800
    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
 
801
        return;
 
802
 
 
803
    tmp = tr_new( uint8_t, msglen );
 
804
    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
 
805
 
 
806
    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
 
807
        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
 
808
        tr_free( tmp );
 
809
        return;
 
810
    }
 
811
 
 
812
    sub = tr_bencDictFind( &val, "added" );
 
813
    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
 
814
        const int n = sub->val.s.i / 6 ;
 
815
        dbgmsg( msgs, "got %d peers from uT pex", n );
 
816
        tr_peerMgrAddPeers( msgs->handle->peerMgr,
 
817
                            msgs->torrent->info.hash,
 
818
                            TR_PEER_FROM_PEX,
 
819
                            (uint8_t*)sub->val.s.s, n );
 
820
    }
 
821
 
 
822
    tr_bencFree( &val );
 
823
    tr_free( tmp );
 
824
}
 
825
 
 
826
static void
 
827
sendPex( tr_peermsgs * msgs );
 
828
 
 
829
static void
 
830
parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
 
831
{
 
832
    uint8_t ltep_msgid;
 
833
 
 
834
    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
 
835
    msglen--;
 
836
 
 
837
    if( ltep_msgid == LTEP_HANDSHAKE )
 
838
    {
 
839
        dbgmsg( msgs, "got ltep handshake" );
 
840
        parseLtepHandshake( msgs, msglen, inbuf );
 
841
        sendLtepHandshake( msgs );
 
842
        sendPex( msgs );
 
843
    }
 
844
    else if( ltep_msgid == msgs->ut_pex_id )
 
845
    {
 
846
        dbgmsg( msgs, "got ut pex" );
 
847
        msgs->peerSupportsPex = 1;
 
848
        parseUtPex( msgs, msglen, inbuf );
 
849
    }
 
850
    else
 
851
    {
 
852
        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
 
853
        evbuffer_drain( inbuf, msglen );
 
854
    }
 
855
}
 
856
 
 
857
static int
 
858
readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
 
859
{
 
860
    uint32_t len;
 
861
    const size_t needlen = sizeof(uint32_t);
 
862
 
 
863
    if( EVBUFFER_LENGTH(inbuf) < needlen )
 
864
        return READ_MORE;
 
865
 
 
866
    tr_peerIoReadUint32( msgs->io, inbuf, &len );
 
867
 
 
868
    if( len == 0 ) /* peer sent us a keepalive message */
 
869
        dbgmsg( msgs, "got KeepAlive" );
 
870
    else {
 
871
        msgs->incomingMessageLength = len;
 
872
        msgs->state = AWAITING_BT_MESSAGE;
 
873
    }
 
874
 
 
875
    return READ_AGAIN;
 
876
}
 
877
 
 
878
static void
 
879
updatePeerProgress( tr_peermsgs * msgs )
 
880
{
 
881
    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
 
882
    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
 
883
    updateInterest( msgs );
 
884
    firePeerProgress( msgs );
 
885
}
 
886
 
 
887
static int
 
888
readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
 
889
{
 
890
    uint8_t id;
 
891
    uint32_t ui32;
 
892
    uint32_t msglen = msgs->incomingMessageLength;
 
893
 
 
894
    if( EVBUFFER_LENGTH(inbuf) < msglen )
 
895
        return READ_MORE;
 
896
 
 
897
    tr_peerIoReadUint8( msgs->io, inbuf, &id );
 
898
    msglen--;
 
899
    dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen );
 
900
 
 
901
    switch( id )
 
902
    {
 
903
        case BT_CHOKE:
 
904
            dbgmsg( msgs, "got Choke" );
 
905
            assert( msglen == 0 );
 
906
            msgs->info->clientIsChoked = 1;
 
907
#if 0
 
908
            tr_list * walk;
 
909
            for( walk = msgs->peerAskedFor; walk != NULL; )
 
910
            {
 
911
                tr_list * next = walk->next;
 
912
                /* We shouldn't reject a peer's fast allowed requests at choke */
 
913
                struct peer_request *req = walk->data;
 
914
                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
 
915
                {
 
916
                    tr_list_remove_data( &msgs->peerAskedFor, req );
 
917
                    tr_free( req );
 
918
                }
 
919
                walk = next;
 
920
            }
 
921
#endif
 
922
            tr_peerMsgsCancelAllRequests( msgs );
 
923
            break;
 
924
 
 
925
        case BT_UNCHOKE:
 
926
            dbgmsg( msgs, "got Unchoke" );
 
927
            assert( msglen == 0 );
 
928
            msgs->info->clientIsChoked = 0;
 
929
            fireNeedReq( msgs );
 
930
            break;
 
931
 
 
932
        case BT_INTERESTED:
 
933
            dbgmsg( msgs, "got Interested" );
 
934
            assert( msglen == 0 );
 
935
            msgs->info->peerIsInterested = 1;
 
936
            tr_peerMsgsSetChoke( msgs, 0 );
 
937
            break;
 
938
 
 
939
        case BT_NOT_INTERESTED:
 
940
            dbgmsg( msgs, "got Not Interested" );
 
941
            assert( msglen == 0 );
 
942
            msgs->info->peerIsInterested = 0;
 
943
            break;
 
944
 
 
945
        case BT_HAVE:
 
946
            assert( msglen == 4 );
 
947
            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
 
948
            tr_bitfieldAdd( msgs->info->have, ui32 );
 
949
            updatePeerProgress( msgs );
 
950
            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
 
951
            dbgmsg( msgs, "got Have: %u", ui32 );
 
952
            break;
 
953
 
 
954
        case BT_BITFIELD: {
 
955
            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
 
956
            dbgmsg( msgs, "got a bitfield" );
 
957
            assert( msglen == msgs->info->have->len );
 
958
            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
 
959
            updatePeerProgress( msgs );
 
960
            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
 
961
            fireNeedReq( msgs );
 
962
            break;
 
963
        }
 
964
 
 
965
        case BT_REQUEST: {
 
966
            struct peer_request * req;
 
967
            assert( msglen == 12 );
 
968
            req = tr_new( struct peer_request, 1 );
 
969
            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
 
970
            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
 
971
            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
 
972
            dbgmsg( msgs, "got Request: %u:%u->%u", req->index, req->offset, req->length );
 
973
            
 
974
            if ( !requestIsValid( msgs, req ) )
 
975
            {
 
976
                dbgmsg( msgs, "BT_REQUEST: invalid request, ignoring" );
 
977
                tr_free( req );
 
978
                break;
 
979
            }
 
980
            /* 
 
981
                If we're not choking him -> continue
 
982
                If we're choking him
 
983
                    it doesn't support FPE -> He's deaf, reCHOKE and bail...
 
984
                    it support FPE
 
985
                        If the asked piece is not allowed
 
986
                            OR he's above our threshold
 
987
                            OR we don't have the requested piece -> Reject
 
988
                        Else
 
989
                        Asked piece allowed AND he's below our threshold -> continue...
 
990
             */
 
991
    
 
992
 
 
993
            if ( msgs->info->peerIsChoked )
 
994
            {
 
995
                if ( !tr_peerIoSupportsFEXT( msgs->io ) )
 
996
                {
 
997
                    dbgmsg( msgs, "BT_REQUEST: peer is choked, ignoring" );
 
998
                    /* Didn't he get it? */
 
999
                    tr_peerMsgsSetChoke( msgs, 1 );
 
1000
                    tr_free( req );
 
1001
                    break;
 
1002
                }
 
1003
                else
 
1004
                {
 
1005
                    if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index )
 
1006
                         || ( msgs->info->progress * (float)msgs->torrent->info.pieceCount) >= MAX_ALLOWED_SET_COUNT
 
1007
                         || !tr_cpPieceIsComplete( msgs->torrent->completion, req->index ) )
 
1008
                    {
 
1009
                        dbgmsg( msgs, "BT_REQUEST: peer requests an un-fastallowed piece" );
 
1010
                        sendFastReject( msgs, req->index, req->offset, req->length );
 
1011
                        tr_free( req );
 
1012
                        break;
 
1013
                    }
 
1014
                    dbgmsg( msgs, "BT_REQUEST: fast allowed piece, accepting request" );
 
1015
                }    
 
1016
            }
 
1017
            
 
1018
            tr_list_append( &msgs->peerAskedFor, req );
 
1019
            break;
 
1020
        }
 
1021
 
 
1022
        case BT_CANCEL: {
 
1023
            struct peer_request req;
 
1024
            void * data;
 
1025
            assert( msglen == 12 );
 
1026
            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
 
1027
            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
 
1028
            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
 
1029
            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
 
1030
            data = tr_list_remove( &msgs->peerAskedFor, &req, compareRequest );
 
1031
            tr_free( data );
 
1032
            break;
 
1033
        }
 
1034
 
 
1035
        case BT_PIECE: {
 
1036
            dbgmsg( msgs, "got a Piece!" );
 
1037
            assert( msgs->blockToUs.length == 0 );
 
1038
            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
 
1039
            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
 
1040
            msgs->blockToUs.length = msglen - 8;
 
1041
            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
 
1042
            msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
 
1043
            return READ_AGAIN;
 
1044
            break;
 
1045
        }
 
1046
 
 
1047
        case BT_PORT: {
 
1048
            dbgmsg( msgs, "Got a BT_PORT" );
 
1049
            assert( msglen == 2 );
 
1050
            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
 
1051
            break;
 
1052
        }
 
1053
        
 
1054
        case BT_SUGGEST: {
 
1055
            /* tiennou TODO */
 
1056
            break;
 
1057
        }
 
1058
            
 
1059
        case BT_HAVE_ALL: {
 
1060
            assert( msglen == 0 );
 
1061
            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
 
1062
            memset( msgs->info->have->bits, 1, msgs->info->have->len );
 
1063
            updatePeerProgress( msgs );
 
1064
            break;
 
1065
        }
 
1066
            
 
1067
        case BT_HAVE_NONE: {
 
1068
            assert( msglen == 0 );
 
1069
            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
 
1070
            tr_bitfieldClear( msgs->info->have );
 
1071
            updatePeerProgress( msgs );
 
1072
            break;
 
1073
        }
 
1074
            
 
1075
        case BT_REJECT: {
 
1076
            struct peer_request req;
 
1077
            tr_list * node;
 
1078
            assert( msglen == 12 );
 
1079
            dbgmsg( msgs, "Got a BT_REJECT" );
 
1080
            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
 
1081
            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
 
1082
            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
 
1083
            node = tr_list_find( msgs->peerAskedFor, &req, compareRequest );
 
1084
            if( node != NULL ) {
 
1085
                void * data = node->data;
 
1086
                tr_list_remove_data( &msgs->peerAskedFor, data );
 
1087
                tr_free( data );
 
1088
                dbgmsg( msgs, "found the req that peer has rejected... cancelled." );
 
1089
            }
 
1090
            break;
 
1091
        }
 
1092
            
 
1093
        case BT_ALLOWED_FAST: {
 
1094
            assert( msglen == 4 );
 
1095
            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
 
1096
            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
 
1097
            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
 
1098
            break;
 
1099
        }
 
1100
            
 
1101
        case BT_LTEP:
 
1102
            dbgmsg( msgs, "Got a BT_LTEP" );
 
1103
            parseLtep( msgs, msglen, inbuf );
 
1104
            break;
 
1105
 
 
1106
        default:
 
1107
            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
 
1108
            tr_peerIoDrain( msgs->io, inbuf, msglen );
 
1109
            assert( 0 );
 
1110
    }
 
1111
 
 
1112
    msgs->incomingMessageLength = -1;
 
1113
    msgs->state = AWAITING_BT_LENGTH;
 
1114
    return READ_AGAIN;
 
1115
}
 
1116
 
 
1117
static void
 
1118
clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
 
1119
{
 
1120
    tr_torrent * tor = msgs->torrent;
 
1121
    tor->activityDate = tr_date( );
 
1122
    tor->downloadedCur += byteCount;
 
1123
    msgs->info->pieceDataActivityDate = time( NULL );
 
1124
    tr_rcTransferred( msgs->info->rcToClient, byteCount );
 
1125
    tr_rcTransferred( tor->download, byteCount );
 
1126
    tr_rcTransferred( tor->handle->download, byteCount );
 
1127
}
 
1128
 
 
1129
static void
 
1130
peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
 
1131
{
 
1132
    tr_torrent * tor = msgs->torrent;
 
1133
    tor->activityDate = tr_date( );
 
1134
    tor->uploadedCur += byteCount;
 
1135
    msgs->info->pieceDataActivityDate = time( NULL );
 
1136
    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
 
1137
    tr_rcTransferred( tor->upload, byteCount );
 
1138
    tr_rcTransferred( tor->handle->upload, byteCount );
 
1139
}
 
1140
 
 
1141
static int
 
1142
canDownload( const tr_peermsgs * msgs )
 
1143
{
 
1144
    tr_torrent * tor = msgs->torrent;
 
1145
 
 
1146
    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
 
1147
        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
 
1148
 
 
1149
    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
 
1150
        return tr_rcCanTransfer( tor->download );
 
1151
 
 
1152
    return TRUE;
 
1153
}
 
1154
 
 
1155
static void
 
1156
reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
 
1157
{
 
1158
    tr_torrent * tor = msgs->torrent;
 
1159
 
 
1160
    /* increment the `corrupt' field */
 
1161
    tor->corruptCur += byteCount;
 
1162
 
 
1163
    /* decrement the `downloaded' field */
 
1164
    if( tor->downloadedCur >= byteCount )
 
1165
        tor->downloadedCur -= byteCount;
 
1166
    else
 
1167
        tor->downloadedCur = 0;
 
1168
}
 
1169
 
 
1170
 
 
1171
static void
 
1172
gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
 
1173
{
 
1174
    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
 
1175
    reassignBytesToCorrupt( msgs, byteCount );
 
1176
}
 
1177
 
 
1178
static void
 
1179
gotUnwantedBlock( tr_peermsgs * msgs,
 
1180
                  uint32_t      index UNUSED,
 
1181
                  uint32_t      offset UNUSED,
 
1182
                  uint32_t      length )
 
1183
{
 
1184
    reassignBytesToCorrupt( msgs, length );
 
1185
}
 
1186
 
 
1187
static void
 
1188
addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
 
1189
{
 
1190
    if( !msgs->info->blame )
 
1191
         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
 
1192
    tr_bitfieldAdd( msgs->info->blame, index );
 
1193
}
 
1194
 
 
1195
static void
 
1196
gotBlock( tr_peermsgs      * msgs,
 
1197
          struct evbuffer  * inbuf,
 
1198
          uint32_t           index,
 
1199
          uint32_t           offset,
 
1200
          uint32_t           length )
 
1201
{
 
1202
    tr_torrent * tor = msgs->torrent;
 
1203
    const int block = _tr_block( tor, index, offset );
 
1204
    struct peer_request key, *req;
 
1205
 
 
1206
    /**
 
1207
    *** Remove the block from our `we asked for this' list
 
1208
    **/
 
1209
 
 
1210
    key.index = index;
 
1211
    key.offset = offset;
 
1212
    key.length = length;
 
1213
    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
 
1214
                                                 compareRequest );
 
1215
    if( req == NULL ) {
 
1216
        gotUnwantedBlock( msgs, index, offset, length );
 
1217
        dbgmsg( msgs, "we didn't ask for this message..." );
 
1218
        return;
 
1219
    }
 
1220
    dbgmsg( msgs, "Got block %u:%u->%u (turnaround time %d secs)", 
 
1221
                     req->index, req->offset, req->length,
 
1222
                     (int)(time(NULL) - req->time_requested) );
 
1223
    tr_free( req );
 
1224
    dbgmsg( msgs, "peer has %d more blocks we've asked for",
 
1225
                  tr_list_size(msgs->clientAskedFor));
 
1226
 
 
1227
    /**
 
1228
    *** Error checks
 
1229
    **/
 
1230
 
 
1231
    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
 
1232
        dbgmsg( msgs, "have this block already..." );
 
1233
        tr_dbg( "have this block already..." );
 
1234
        gotUnwantedBlock( msgs, index, offset, length );
 
1235
        return;
 
1236
    }
 
1237
 
 
1238
    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
 
1239
        dbgmsg( msgs, "block is the wrong length..." );
 
1240
        tr_dbg( "block is the wrong length..." );
 
1241
        gotUnwantedBlock( msgs, index, offset, length );
 
1242
        return;
 
1243
    }
 
1244
 
 
1245
    /**
 
1246
    ***  Write the block
 
1247
    **/
 
1248
 
 
1249
    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf )))
 
1250
        return;
 
1251
 
 
1252
    tr_cpBlockAdd( tor->completion, block );
 
1253
 
 
1254
    addUsToBlamefield( msgs, index );
 
1255
 
 
1256
    fireGotBlock( msgs, index, offset, length );
 
1257
 
 
1258
    /**
 
1259
    ***  Handle if this was the last block in the piece
 
1260
    **/
 
1261
 
 
1262
    if( tr_cpPieceIsComplete( tor->completion, index ) )
 
1263
    {
 
1264
        if( tr_ioHash( tor, index ) )
 
1265
        {
 
1266
            gotBadPiece( msgs, index );
 
1267
            return;
 
1268
        }
 
1269
 
 
1270
        fireClientHave( msgs, index );
 
1271
    }
 
1272
}
 
1273
 
 
1274
 
 
1275
static ReadState
 
1276
readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
 
1277
{
 
1278
    uint32_t inlen;
 
1279
    uint8_t * tmp;
 
1280
 
 
1281
    assert( msgs != NULL );
 
1282
    assert( msgs->blockToUs.length > 0 );
 
1283
    assert( inbuf != NULL );
 
1284
    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
 
1285
 
 
1286
    /* read from the inbuf into our block buffer */
 
1287
    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
 
1288
    tmp = tr_new( uint8_t, inlen );
 
1289
    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
 
1290
    evbuffer_add( msgs->inBlock, tmp, inlen );
 
1291
 
 
1292
    /* update our tables accordingly */
 
1293
    assert( inlen >= msgs->blockToUs.length );
 
1294
    msgs->blockToUs.length -= inlen;
 
1295
    msgs->info->peerSentPieceDataAt = time( NULL );
 
1296
    clientGotBytes( msgs, inlen );
 
1297
 
 
1298
    /* if this was the entire block, save it */
 
1299
    if( !msgs->blockToUs.length )
 
1300
    {
 
1301
        dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset );
 
1302
        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
 
1303
        gotBlock( msgs, msgs->inBlock,
 
1304
                        msgs->blockToUs.index,
 
1305
                        msgs->blockToUs.offset,
 
1306
                        EVBUFFER_LENGTH( msgs->inBlock ) );
 
1307
        evbuffer_drain( msgs->inBlock, ~0 );
 
1308
        msgs->state = AWAITING_BT_LENGTH;
 
1309
    }
 
1310
 
 
1311
    /* cleanup */
 
1312
    tr_free( tmp );
 
1313
    return READ_AGAIN;
 
1314
}
 
1315
 
 
1316
static ReadState
 
1317
canRead( struct bufferevent * evin, void * vmsgs )
 
1318
{
 
1319
    ReadState ret;
 
1320
    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
 
1321
    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
 
1322
 
 
1323
    if( !canDownload( msgs ) )
 
1324
    {
 
1325
        msgs->notListening = 1;
 
1326
        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
 
1327
        ret = READ_DONE;
 
1328
    }
 
1329
    else switch( msgs->state )
 
1330
    {
 
1331
        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
 
1332
        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
 
1333
        case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
 
1334
        default: assert( 0 );
 
1335
    }
 
1336
 
 
1337
    return ret;
 
1338
}
 
1339
 
 
1340
static void
 
1341
sendKeepalive( tr_peermsgs * msgs )
 
1342
{
 
1343
    dbgmsg( msgs, "sending a keepalive message" );
 
1344
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
 
1345
}
 
1346
 
 
1347
/**
 
1348
***
 
1349
**/
 
1350
 
 
1351
static int
 
1352
canWrite( const tr_peermsgs * msgs )
 
1353
{
 
1354
    /* don't let our outbuffer get too large */
 
1355
    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 8192 )
 
1356
        return FALSE;
 
1357
 
 
1358
    return TRUE;
 
1359
}
 
1360
 
 
1361
static int
 
1362
canUpload( const tr_peermsgs * msgs )
 
1363
{
 
1364
    const tr_torrent * tor = msgs->torrent;
 
1365
 
 
1366
    if( !canWrite( msgs ) )
 
1367
        return FALSE;
 
1368
 
 
1369
    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
 
1370
        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
 
1371
 
 
1372
    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
 
1373
        return tr_rcCanTransfer( tor->upload );
 
1374
 
 
1375
    return TRUE;
 
1376
}
 
1377
 
 
1378
static int
 
1379
ratePulse( void * vmsgs )
 
1380
{
 
1381
    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
 
1382
    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
 
1383
    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
 
1384
    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 );
 
1385
    msgs->minActiveRequests = msgs->maxActiveRequests / 2;
 
1386
    return TRUE;
 
1387
}
 
1388
 
 
1389
static int
 
1390
pulse( void * vmsgs )
 
1391
{
 
1392
    const time_t now = time( NULL );
 
1393
    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
 
1394
    size_t len;
 
1395
 
 
1396
    /* if we froze out a downloaded block because of speed limits,
 
1397
       start listening to the peer again */
 
1398
    if( msgs->notListening && canDownload( msgs ) )
 
1399
    {
 
1400
        msgs->notListening = 0;
 
1401
        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
 
1402
    }
 
1403
 
 
1404
    pumpRequestQueue( msgs );
 
1405
 
 
1406
    if( !canWrite( msgs ) )
 
1407
    {
 
1408
    }
 
1409
    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
 
1410
    {
 
1411
        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
 
1412
        msgs->clientSentAnythingAt = now;
 
1413
    }
 
1414
    else if(( msgs->peerAskedFor ))
 
1415
    {
 
1416
        if( canUpload( msgs ) )
 
1417
        {
 
1418
            struct peer_request * r = tr_list_pop_front( &msgs->peerAskedFor );
 
1419
            uint8_t * buf = tr_new( uint8_t, r->length );
 
1420
 
 
1421
            if( requestIsValid( msgs, r )
 
1422
                && tr_cpPieceIsComplete( msgs->torrent->completion, r->index )
 
1423
                && !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
 
1424
            {
 
1425
                protocolSendPiece( msgs, r, buf );
 
1426
                peerGotBytes( msgs, r->length );
 
1427
                msgs->clientSentAnythingAt = now;
 
1428
            }
 
1429
 
 
1430
            tr_free( buf );
 
1431
            tr_free( r );
 
1432
        }
 
1433
    }
 
1434
    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
 
1435
    {
 
1436
        sendKeepalive( msgs );
 
1437
    }
 
1438
 
 
1439
    return TRUE; /* loop forever */
 
1440
}
 
1441
 
 
1442
static void
 
1443
didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
 
1444
{
 
1445
    pulse( vmsgs );
 
1446
}
 
1447
 
 
1448
static void
 
1449
gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
 
1450
{
 
1451
    dbgmsg( vmsgs, "libevent got an error! what=%d, errno=%d (%s)",
 
1452
            (int)what, errno, strerror(errno) );
 
1453
    fireGotError( vmsgs );
 
1454
}
 
1455
 
 
1456
static void
 
1457
sendBitfield( tr_peermsgs * msgs )
 
1458
{
 
1459
    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
 
1460
    struct evbuffer * out = msgs->outMessages;
 
1461
 
 
1462
    dbgmsg( msgs, "sending peer a bitfield message" );
 
1463
    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
 
1464
    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
 
1465
    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
 
1466
}
 
1467
 
 
1468
/**
 
1469
***
 
1470
**/
 
1471
 
 
1472
/* some peers give us error messages if we send
 
1473
   more than this many peers in a single pex message */
 
1474
#define MAX_PEX_DIFFS 200
 
1475
 
 
1476
typedef struct
 
1477
{
 
1478
    tr_pex * added;
 
1479
    tr_pex * dropped;
 
1480
    tr_pex * elements;
 
1481
    int addedCount;
 
1482
    int droppedCount;
 
1483
    int elementCount;
 
1484
    int diffCount;
 
1485
}
 
1486
PexDiffs;
 
1487
 
 
1488
static void
 
1489
pexAddedCb( void * vpex, void * userData )
 
1490
{
 
1491
    PexDiffs * diffs = (PexDiffs *) userData;
 
1492
    tr_pex * pex = (tr_pex *) vpex;
 
1493
    if( diffs->diffCount < MAX_PEX_DIFFS )
 
1494
    {
 
1495
        diffs->diffCount++;
 
1496
        diffs->added[diffs->addedCount++] = *pex;
 
1497
        diffs->elements[diffs->elementCount++] = *pex;
 
1498
    }
 
1499
}
 
1500
 
 
1501
static void
 
1502
pexRemovedCb( void * vpex, void * userData )
 
1503
{
 
1504
    PexDiffs * diffs = (PexDiffs *) userData;
 
1505
    tr_pex * pex = (tr_pex *) vpex;
 
1506
    if( diffs->diffCount < MAX_PEX_DIFFS )
 
1507
    {
 
1508
        diffs->diffCount++;
 
1509
        diffs->dropped[diffs->droppedCount++] = *pex;
 
1510
    }
 
1511
}
 
1512
 
 
1513
static void
 
1514
pexElementCb( void * vpex, void * userData )
 
1515
{
 
1516
    PexDiffs * diffs = (PexDiffs *) userData;
 
1517
    tr_pex * pex = (tr_pex *) vpex;
 
1518
    if( diffs->diffCount < MAX_PEX_DIFFS )
 
1519
    {
 
1520
        diffs->diffCount++;
 
1521
        diffs->elements[diffs->elementCount++] = *pex;
 
1522
    }
 
1523
}
 
1524
 
 
1525
static void
 
1526
sendPex( tr_peermsgs * msgs )
 
1527
{
 
1528
    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
 
1529
    {
 
1530
        int i;
 
1531
        tr_pex * newPex = NULL;
 
1532
        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
 
1533
        PexDiffs diffs;
 
1534
        benc_val_t val, *added, *dropped, *flags;
 
1535
        uint8_t *tmp, *walk;
 
1536
        char * benc;
 
1537
        int bencLen;
 
1538
 
 
1539
        /* build the diffs */
 
1540
        diffs.added = tr_new( tr_pex, newCount );
 
1541
        diffs.addedCount = 0;
 
1542
        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
 
1543
        diffs.droppedCount = 0;
 
1544
        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
 
1545
        diffs.elementCount = 0;
 
1546
        diffs.diffCount = 0;
 
1547
        tr_set_compare( msgs->pex, msgs->pexCount,
 
1548
                        newPex, newCount,
 
1549
                        tr_pexCompare, sizeof(tr_pex),
 
1550
                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
 
1551
        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
 
1552
 
 
1553
        /* update peer */
 
1554
        tr_free( msgs->pex );
 
1555
        msgs->pex = diffs.elements;
 
1556
        msgs->pexCount = diffs.elementCount;
 
1557
 
 
1558
        /* build the pex payload */
 
1559
        tr_bencInit( &val, TYPE_DICT );
 
1560
        tr_bencDictReserve( &val, 3 );
 
1561
 
 
1562
        /* "added" */
 
1563
        added = tr_bencDictAdd( &val, "added" );
 
1564
        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
 
1565
        for( i=0; i<diffs.addedCount; ++i ) {
 
1566
            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
 
1567
            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
 
1568
        }
 
1569
        assert( ( walk - tmp ) == diffs.addedCount * 6 );
 
1570
        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
 
1571
 
 
1572
        /* "added.f" */
 
1573
        flags = tr_bencDictAdd( &val, "added.f" );
 
1574
        tmp = walk = tr_new( uint8_t, diffs.addedCount );
 
1575
        for( i=0; i<diffs.addedCount; ++i )
 
1576
            *walk++ = diffs.added[i].flags;
 
1577
        assert( ( walk - tmp ) == diffs.addedCount );
 
1578
        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
 
1579
 
 
1580
        /* "dropped" */
 
1581
        dropped = tr_bencDictAdd( &val, "dropped" );
 
1582
        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
 
1583
        for( i=0; i<diffs.droppedCount; ++i ) {
 
1584
            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
 
1585
            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
 
1586
        }
 
1587
        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
 
1588
        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
 
1589
 
 
1590
        /* write the pex message */
 
1591
        benc = tr_bencSaveMalloc( &val, &bencLen );
 
1592
        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
 
1593
        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
 
1594
        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
 
1595
        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
 
1596
 
 
1597
        /* cleanup */
 
1598
        tr_free( benc );
 
1599
        tr_bencFree( &val );
 
1600
        tr_free( diffs.added );
 
1601
        tr_free( diffs.dropped );
 
1602
        tr_free( newPex );
 
1603
 
 
1604
        msgs->clientSentPexAt = time( NULL );
 
1605
    }
 
1606
}
 
1607
 
 
1608
static int
 
1609
pexPulse( void * vpeer )
 
1610
{
 
1611
    sendPex( vpeer );
 
1612
    return TRUE;
 
1613
}
 
1614
 
 
1615
/**
 
1616
***
 
1617
**/
 
1618
 
 
1619
tr_peermsgs*
 
1620
tr_peerMsgsNew( struct tr_torrent * torrent,
 
1621
                struct tr_peer    * info,
 
1622
                tr_delivery_func    func,
 
1623
                void              * userData,
 
1624
                tr_publisher_tag  * setme )
 
1625
{
 
1626
    tr_peermsgs * m;
 
1627
 
 
1628
    assert( info != NULL );
 
1629
    assert( info->io != NULL );
 
1630
 
 
1631
    m = tr_new0( tr_peermsgs, 1 );
 
1632
    m->publisher = tr_publisherNew( );
 
1633
    m->info = info;
 
1634
    m->handle = torrent->handle;
 
1635
    m->torrent = torrent;
 
1636
    m->io = info->io;
 
1637
    m->info->clientIsChoked = 1;
 
1638
    m->info->peerIsChoked = 1;
 
1639
    m->info->clientIsInterested = 0;
 
1640
    m->info->peerIsInterested = 0;
 
1641
    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
 
1642
    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
 
1643
    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
 
1644
    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
 
1645
    m->outMessages = evbuffer_new( );
 
1646
    m->inBlock = evbuffer_new( );
 
1647
    m->peerAllowedPieces = NULL;
 
1648
    m->clientAllowedPieces = NULL;
 
1649
    *setme = tr_publisherSubscribe( m->publisher, func, userData );
 
1650
    
 
1651
    if ( tr_peerIoSupportsFEXT( m->io ) )
 
1652
    {
 
1653
        /* This peer is fastpeer-enabled, generate its allowed set
 
1654
         * (before registering our callbacks) */
 
1655
        if ( !m->peerAllowedPieces ) {
 
1656
            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
 
1657
            
 
1658
            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_ALLOWED_SET_COUNT,
 
1659
                                                                 m->torrent->info.pieceCount,
 
1660
                                                                 m->torrent->info.hash,
 
1661
                                                                 peerAddr );
 
1662
        }
 
1663
        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
 
1664
    }
 
1665
    
 
1666
    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* error if we don't read or write for 2.5 minutes */
 
1667
    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
 
1668
    tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
 
1669
    ratePulse( m );
 
1670
 
 
1671
    if ( tr_peerIoSupportsLTEP( m->io ) )
 
1672
        sendLtepHandshake( m );
 
1673
 
 
1674
    if ( !tr_peerIoSupportsFEXT( m->io ) )
 
1675
        sendBitfield( m );
 
1676
    else {
 
1677
        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
 
1678
        float completion = tr_cpPercentComplete( m->torrent->completion );
 
1679
        if ( completion == 0.0f ) {
 
1680
            sendFastHave( m, 0 );
 
1681
        } else if ( completion == 1.0f ) {
 
1682
            sendFastHave( m, 1 );
 
1683
        } else {
 
1684
            sendBitfield( m );
 
1685
        }
 
1686
        uint32_t peerProgress = m->torrent->info.pieceCount * m->info->progress;
 
1687
        
 
1688
        if ( peerProgress < MAX_ALLOWED_SET_COUNT )
 
1689
            sendFastAllowedSet( m );
 
1690
    }
 
1691
 
 
1692
    return m;
 
1693
}
 
1694
 
 
1695
void
 
1696
tr_peerMsgsFree( tr_peermsgs* msgs )
 
1697
{
 
1698
    if( msgs != NULL )
 
1699
    {
 
1700
        tr_timerFree( &msgs->pulseTimer );
 
1701
        tr_timerFree( &msgs->rateTimer );
 
1702
        tr_timerFree( &msgs->pexTimer );
 
1703
        tr_publisherFree( &msgs->publisher );
 
1704
        tr_list_free( &msgs->clientWillAskFor, tr_free );
 
1705
        tr_list_free( &msgs->clientAskedFor, tr_free );
 
1706
        tr_list_free( &msgs->peerAskedFor, tr_free );
 
1707
        evbuffer_free( msgs->outMessages );
 
1708
        evbuffer_free( msgs->inBlock );
 
1709
        tr_free( msgs->pex );
 
1710
        msgs->pexCount = 0;
 
1711
        tr_free( msgs );
 
1712
    }
 
1713
}
 
1714
 
 
1715
tr_publisher_tag
 
1716
tr_peerMsgsSubscribe( tr_peermsgs       * peer,
 
1717
                      tr_delivery_func    func,
 
1718
                      void              * userData )
 
1719
{
 
1720
    return tr_publisherSubscribe( peer->publisher, func, userData );
 
1721
}
 
1722
 
 
1723
void
 
1724
tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
 
1725
                        tr_publisher_tag    tag )
 
1726
{
 
1727
    tr_publisherUnsubscribe( peer->publisher, tag );
 
1728
}
 
1729
 
 
1730
int
 
1731
tr_peerMsgIsPieceFastAllowed( const tr_peermsgs * peer,
 
1732
                              uint32_t            index )
 
1733
{
 
1734
    return tr_bitfieldHas( peer->clientAllowedPieces, index );
 
1735
}