2
* This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
4
* This file is licensed by the GPL version 2. Works owned by the
5
* Transmission project are granted a special exemption to clause 2(b)
6
* so that the bulk of its code can remain under the MIT license.
7
* This exemption does not extend to derived works not owned by
8
* the Transmission project.
10
* $Id: peer-msgs.c 3501 2007-10-22 23:27:47Z charles $
19
#include <libgen.h> /* basename */
21
#include <arpa/inet.h>
23
#include <sys/types.h> /* event.h needs this */
26
#include "transmission.h"
28
#include "completion.h"
33
#include "peer-mgr-private.h"
34
#include "peer-msgs.h"
35
#include "ratecontrol.h"
43
#define MAX_ALLOWED_SET_COUNT 10 /* number of pieces generated for allow-fast,
44
threshold for fast-allowing others */
51
BT_NOT_INTERESTED = 3,
69
MAX_REQUEST_BYTE_COUNT = (16 * 1024), /* drop requests who want too much */
71
KEEPALIVE_INTERVAL_SECS = 90, /* idle seconds before we send a keepalive */
72
PEX_INTERVAL = (60 * 1000), /* msec between calls to sendPex() */
73
PEER_PULSE_INTERVAL = (133), /* msec between calls to pulse() */
74
RATE_PULSE_INTERVAL = (333), /* msec between calls to ratePulse() */
89
time_t time_requested;
93
compareRequest( const void * va, const void * vb )
95
struct peer_request * a = (struct peer_request*) va;
96
struct peer_request * b = (struct peer_request*) vb;
97
if( a->index != b->index ) return a->index - b->index;
98
if( a->offset != b->offset ) return a->offset - b->offset;
99
if( a->length != b->length ) return a->length - b->length;
108
tr_torrent * torrent;
111
tr_publisher_t * publisher;
113
struct evbuffer * outMessages; /* buffer of all the non-piece messages */
114
struct evbuffer * inBlock; /* the block we're currently receiving */
115
tr_list * peerAskedFor;
116
tr_list * clientAskedFor;
117
tr_list * clientWillAskFor;
119
tr_timer * rateTimer;
120
tr_timer * pulseTimer;
123
struct peer_request blockToUs; /* the block currntly being sent to us */
125
time_t lastReqAddedAt;
126
time_t clientSentPexAt;
127
time_t clientSentAnythingAt;
129
unsigned int notListening : 1;
130
unsigned int peerSupportsPex : 1;
131
unsigned int clientSentLtepHandshake : 1;
132
unsigned int peerSentLtepHandshake : 1;
134
tr_bitfield * clientAllowedPieces;
135
tr_bitfield * peerAllowedPieces;
140
uint32_t incomingMessageLength;
141
uint32_t maxActiveRequests;
142
uint32_t minActiveRequests;
152
myDebug( const char * file, int line,
153
const struct tr_peermsgs * msgs,
154
const char * fmt, ... )
156
FILE * fp = tr_getLog( );
161
struct evbuffer * buf = evbuffer_new( );
162
char * myfile = tr_strdup( file );
164
evbuffer_add_printf( buf, "[%s] %s [%s]: ",
165
tr_getLogTimeStr( timestr, sizeof(timestr) ),
166
tr_peerIoGetAddrStr( msgs->io ),
167
msgs->info->client );
168
va_start( args, fmt );
169
evbuffer_add_vprintf( buf, fmt, args );
171
evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
172
fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
175
evbuffer_free( buf );
179
#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
186
protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
188
tr_peerIo * io = msgs->io;
189
struct evbuffer * out = msgs->outMessages;
191
dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
192
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
193
tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
194
tr_peerIoWriteUint32( io, out, req->index );
195
tr_peerIoWriteUint32( io, out, req->offset );
196
tr_peerIoWriteUint32( io, out, req->length );
200
protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
202
tr_peerIo * io = msgs->io;
203
struct evbuffer * out = msgs->outMessages;
205
dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
206
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
207
tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
208
tr_peerIoWriteUint32( io, out, req->index );
209
tr_peerIoWriteUint32( io, out, req->offset );
210
tr_peerIoWriteUint32( io, out, req->length );
214
protocolSendHave( tr_peermsgs * msgs, uint32_t index )
216
tr_peerIo * io = msgs->io;
217
struct evbuffer * out = msgs->outMessages;
219
dbgmsg( msgs, "sending Have %u", index );
220
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
221
tr_peerIoWriteUint8 ( io, out, BT_HAVE );
222
tr_peerIoWriteUint32( io, out, index );
226
protocolSendChoke( tr_peermsgs * msgs, int choke )
228
tr_peerIo * io = msgs->io;
229
struct evbuffer * out = msgs->outMessages;
231
dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
232
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
233
tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
237
protocolSendPiece( tr_peermsgs * msgs,
238
const struct peer_request * r,
239
const uint8_t * pieceData )
241
tr_peerIo * io = msgs->io;
242
struct evbuffer * out = evbuffer_new( );
244
dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
245
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
246
tr_peerIoWriteUint8 ( io, out, BT_PIECE );
247
tr_peerIoWriteUint32( io, out, r->index );
248
tr_peerIoWriteUint32( io, out, r->offset );
249
tr_peerIoWriteBytes ( io, out, pieceData, r->length );
250
tr_peerIoWriteBuf ( io, out );
252
evbuffer_free( out );
259
static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
262
publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
264
tr_publisherPublish( msgs->publisher, msgs->info, e );
268
fireGotError( tr_peermsgs * msgs )
270
tr_peermsgs_event e = blankEvent;
271
e.eventType = TR_PEERMSG_GOT_ERROR;
276
fireNeedReq( tr_peermsgs * msgs )
278
tr_peermsgs_event e = blankEvent;
279
e.eventType = TR_PEERMSG_NEED_REQ;
284
firePeerProgress( tr_peermsgs * msgs )
286
tr_peermsgs_event e = blankEvent;
287
e.eventType = TR_PEERMSG_PEER_PROGRESS;
288
e.progress = msgs->info->progress;
293
fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
295
tr_peermsgs_event e = blankEvent;
296
e.eventType = TR_PEERMSG_CLIENT_HAVE;
297
e.pieceIndex = pieceIndex;
302
fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
304
tr_peermsgs_event e = blankEvent;
305
e.eventType = TR_PEERMSG_CLIENT_BLOCK;
306
e.pieceIndex = pieceIndex;
313
fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
315
tr_peermsgs_event e = blankEvent;
316
e.eventType = TR_PEERMSG_CANCEL;
317
e.pieceIndex = req->index;
318
e.offset = req->offset;
319
e.length = req->length;
328
isPieceInteresting( const tr_peermsgs * peer,
331
const tr_torrent * torrent = peer->torrent;
332
if( torrent->info.pieces[piece].dnd ) /* we don't want it */
334
if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
336
if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
338
if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
343
/* "interested" means we'll ask for piece data if they unchoke us */
345
isPeerInteresting( const tr_peermsgs * msgs )
348
const tr_torrent * torrent;
349
const tr_bitfield * bitfield;
350
const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
355
torrent = msgs->torrent;
356
bitfield = tr_cpPieceBitfield( torrent->completion );
358
if( !msgs->info->have )
361
assert( bitfield->len == msgs->info->have->len );
362
for( i=0; i<torrent->info.pieceCount; ++i )
363
if( isPieceInteresting( msgs, i ) )
370
sendInterest( tr_peermsgs * msgs, int weAreInterested )
372
assert( msgs != NULL );
373
assert( weAreInterested==0 || weAreInterested==1 );
375
msgs->info->clientIsInterested = weAreInterested;
376
dbgmsg( msgs, "Sending %s",
377
weAreInterested ? "Interested" : "Not Interested");
379
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
380
tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
381
weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
385
updateInterest( tr_peermsgs * msgs )
387
const int i = isPeerInteresting( msgs );
388
if( i != msgs->info->clientIsInterested )
389
sendInterest( msgs, i );
394
#define MIN_CHOKE_PERIOD_SEC 10
397
tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
399
const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
401
assert( msgs != NULL );
402
assert( msgs->info != NULL );
403
assert( choke==0 || choke==1 );
405
if( msgs->info->chokeChangedAt > fibrillationTime )
407
dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
409
else if( msgs->info->peerIsChoked != choke )
411
msgs->info->peerIsChoked = choke;
416
for( walk = msgs->peerAskedFor; walk != NULL; )
418
tr_list * next = walk->next;
419
/* don't reject a peer's fast allowed requests at choke */
420
struct peer_request *req = walk->data;
421
if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
423
tr_list_remove_data( &msgs->peerAskedFor, req );
430
protocolSendChoke( msgs, choke );
431
msgs->info->chokeChangedAt = time( NULL );
440
tr_peerMsgsHave( tr_peermsgs * msgs,
443
protocolSendHave( msgs, index );
445
/* since we have more pieces now, we might not be interested in this peer */
446
updateInterest( msgs );
450
sendFastSuggest( tr_peermsgs * msgs,
451
uint32_t pieceIndex )
453
dbgmsg( msgs, "w00t SUGGESTing them piece #%d", pieceIndex );
454
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
455
tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
456
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
458
updateInterest( msgs );
462
sendFastHave( tr_peermsgs * msgs,
465
dbgmsg( msgs, "w00t telling them we %s pieces", (all ? "HAVE_ALL" : "HAVE_NONE" ) );
466
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
467
tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL : BT_HAVE_NONE ) );
469
updateInterest( msgs );
473
sendFastReject( tr_peermsgs * msgs,
478
assert( msgs != NULL );
479
assert( length > 0 );
481
/* reject the request */
482
const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
483
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
484
tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
485
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
486
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
487
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
491
sendFastAllowed( tr_peermsgs * msgs,
494
dbgmsg( msgs, "w00t telling them we ALLOW_FAST piece #%d", pieceIndex );
495
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
496
tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
497
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
503
sendFastAllowedSet( tr_peermsgs * msgs )
506
while (i <= msgs->torrent->info.pieceCount )
508
if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
509
sendFastAllowed( msgs, i );
520
reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
522
const tr_torrent * tor = msgs->torrent;
524
if( index >= (uint32_t) tor->info.pieceCount )
526
if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
528
if( length > MAX_REQUEST_BYTE_COUNT )
530
if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
537
requestIsValid( const tr_peermsgs * msgs, struct peer_request * req )
539
return reqIsValid( msgs, req->index, req->offset, req->length );
543
pumpRequestQueue( tr_peermsgs * msgs )
545
const int max = msgs->maxActiveRequests;
546
const int min = msgs->minActiveRequests;
547
int count = tr_list_size( msgs->clientAskedFor );
552
if( msgs->info->clientIsChoked )
555
while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
557
struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor );
558
protocolSendRequest( msgs, req );
559
req->time_requested = msgs->lastReqAddedAt = time( NULL );
560
tr_list_append( &msgs->clientAskedFor, req );
566
dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
568
tr_list_size(msgs->clientAskedFor),
569
tr_list_size(msgs->clientWillAskFor) );
576
tr_peerMsgsAddRequest( tr_peermsgs * msgs,
581
const int req_max = msgs->maxActiveRequests;
582
struct peer_request tmp, *req;
584
assert( msgs != NULL );
585
assert( msgs->torrent != NULL );
586
assert( reqIsValid( msgs, index, offset, length ) );
589
*** Reasons to decline the request
592
/* don't send requests to choked clients */
593
if( msgs->info->clientIsChoked ) {
594
dbgmsg( msgs, "declining request because they're choking us" );
595
return TR_ADDREQ_CLIENT_CHOKED;
598
/* peer doesn't have this piece */
599
if( !tr_bitfieldHas( msgs->info->have, index ) )
600
return TR_ADDREQ_MISSING;
602
/* peer's queue is full */
603
if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
604
dbgmsg( msgs, "declining request because we're full" );
605
return TR_ADDREQ_FULL;
608
/* have we already asked for this piece? */
612
if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
613
dbgmsg( msgs, "declining because it's a duplicate" );
614
return TR_ADDREQ_DUPLICATE;
616
if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
617
dbgmsg( msgs, "declining because it's a duplicate" );
618
return TR_ADDREQ_DUPLICATE;
622
*** Accept this request
625
dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
626
req = tr_new0( struct peer_request, 1 );
628
tr_list_append( &msgs->clientWillAskFor, req );
633
tr_peerMsgsCancelAllRequests( tr_peermsgs * msgs )
635
struct peer_request * req;
637
while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
639
fireCancelledReq( msgs, req );
643
while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
645
fireCancelledReq( msgs, req );
646
protocolSendCancel( msgs, req );
652
tr_peerMsgsCancel( tr_peermsgs * msgs,
657
struct peer_request *req, tmp;
659
assert( msgs != NULL );
660
assert( length > 0 );
662
/* have we asked the peer for this piece? */
663
tmp.index = pieceIndex;
667
/* if it's only in the queue and hasn't been sent yet, free it */
668
if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
670
fireCancelledReq( msgs, req );
674
/* if it's already been sent, send a cancel message too */
675
if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
677
protocolSendCancel( msgs, req );
678
fireCancelledReq( msgs, req );
688
sendLtepHandshake( tr_peermsgs * msgs )
694
const char * v = TR_NAME " " USERAGENT_PREFIX;
695
const int port = tr_getPublicPort( msgs->handle );
696
struct evbuffer * outbuf;
698
if( msgs->clientSentLtepHandshake )
701
outbuf = evbuffer_new( );
702
dbgmsg( msgs, "sending an ltep handshake" );
703
msgs->clientSentLtepHandshake = 1;
705
/* decide if we want to advertise pex support */
706
if( !tr_torrentIsPexEnabled( msgs->torrent ) )
708
else if( msgs->peerSentLtepHandshake )
709
pex = msgs->peerSupportsPex ? 1 : 0;
713
tr_bencInit( &val, TYPE_DICT );
714
tr_bencDictReserve( &val, 4 );
715
tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
716
m = tr_bencDictAdd( &val, "m" );
717
tr_bencInit( m, TYPE_DICT );
719
tr_bencDictReserve( m, 1 );
720
tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
723
tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
724
tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
725
buf = tr_bencSaveMalloc( &val, &len );
727
tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
728
tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
729
tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
730
tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
732
tr_peerIoWriteBuf( msgs->io, outbuf );
735
dbgmsg( msgs, "here is the ltep handshake we sent:" );
736
tr_bencPrint( &val );
742
evbuffer_free( outbuf );
746
parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
748
benc_val_t val, * sub;
749
uint8_t * tmp = tr_new( uint8_t, len );
751
tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
752
msgs->peerSentLtepHandshake = 1;
754
if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
755
dbgmsg( msgs, "GET extended-handshake, couldn't get dictionary" );
761
dbgmsg( msgs, "here is the ltep handshake we read:" );
762
tr_bencPrint( &val );
765
/* does the peer prefer encrypted connections? */
766
sub = tr_bencDictFind( &val, "e" );
767
if( tr_bencIsInt( sub ) )
768
msgs->info->encryption_preference = sub->val.i
769
? ENCRYPTION_PREFERENCE_YES
770
: ENCRYPTION_PREFERENCE_NO;
772
/* check supported messages for utorrent pex */
773
sub = tr_bencDictFind( &val, "m" );
774
if( tr_bencIsDict( sub ) ) {
775
sub = tr_bencDictFind( sub, "ut_pex" );
776
if( tr_bencIsInt( sub ) ) {
777
msgs->peerSupportsPex = 1;
778
msgs->ut_pex_id = (uint8_t) sub->val.i;
779
dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
783
/* get peer's listening port */
784
sub = tr_bencDictFind( &val, "p" );
785
if( tr_bencIsInt( sub ) ) {
786
msgs->info->port = htons( (uint16_t)sub->val.i );
787
dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
795
parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
797
benc_val_t val, * sub;
800
if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
803
tmp = tr_new( uint8_t, msglen );
804
tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
806
if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
807
dbgmsg( msgs, "GET can't read extended-pex dictionary" );
812
sub = tr_bencDictFind( &val, "added" );
813
if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
814
const int n = sub->val.s.i / 6 ;
815
dbgmsg( msgs, "got %d peers from uT pex", n );
816
tr_peerMgrAddPeers( msgs->handle->peerMgr,
817
msgs->torrent->info.hash,
819
(uint8_t*)sub->val.s.s, n );
827
sendPex( tr_peermsgs * msgs );
830
parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
834
tr_peerIoReadUint8( msgs->io, inbuf, <ep_msgid );
837
if( ltep_msgid == LTEP_HANDSHAKE )
839
dbgmsg( msgs, "got ltep handshake" );
840
parseLtepHandshake( msgs, msglen, inbuf );
841
sendLtepHandshake( msgs );
844
else if( ltep_msgid == msgs->ut_pex_id )
846
dbgmsg( msgs, "got ut pex" );
847
msgs->peerSupportsPex = 1;
848
parseUtPex( msgs, msglen, inbuf );
852
dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
853
evbuffer_drain( inbuf, msglen );
858
readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
861
const size_t needlen = sizeof(uint32_t);
863
if( EVBUFFER_LENGTH(inbuf) < needlen )
866
tr_peerIoReadUint32( msgs->io, inbuf, &len );
868
if( len == 0 ) /* peer sent us a keepalive message */
869
dbgmsg( msgs, "got KeepAlive" );
871
msgs->incomingMessageLength = len;
872
msgs->state = AWAITING_BT_MESSAGE;
879
updatePeerProgress( tr_peermsgs * msgs )
881
msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
882
dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
883
updateInterest( msgs );
884
firePeerProgress( msgs );
888
readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
892
uint32_t msglen = msgs->incomingMessageLength;
894
if( EVBUFFER_LENGTH(inbuf) < msglen )
897
tr_peerIoReadUint8( msgs->io, inbuf, &id );
899
dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen );
904
dbgmsg( msgs, "got Choke" );
905
assert( msglen == 0 );
906
msgs->info->clientIsChoked = 1;
909
for( walk = msgs->peerAskedFor; walk != NULL; )
911
tr_list * next = walk->next;
912
/* We shouldn't reject a peer's fast allowed requests at choke */
913
struct peer_request *req = walk->data;
914
if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
916
tr_list_remove_data( &msgs->peerAskedFor, req );
922
tr_peerMsgsCancelAllRequests( msgs );
926
dbgmsg( msgs, "got Unchoke" );
927
assert( msglen == 0 );
928
msgs->info->clientIsChoked = 0;
933
dbgmsg( msgs, "got Interested" );
934
assert( msglen == 0 );
935
msgs->info->peerIsInterested = 1;
936
tr_peerMsgsSetChoke( msgs, 0 );
939
case BT_NOT_INTERESTED:
940
dbgmsg( msgs, "got Not Interested" );
941
assert( msglen == 0 );
942
msgs->info->peerIsInterested = 0;
946
assert( msglen == 4 );
947
tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
948
tr_bitfieldAdd( msgs->info->have, ui32 );
949
updatePeerProgress( msgs );
950
tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
951
dbgmsg( msgs, "got Have: %u", ui32 );
955
const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
956
dbgmsg( msgs, "got a bitfield" );
957
assert( msglen == msgs->info->have->len );
958
tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
959
updatePeerProgress( msgs );
960
tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
966
struct peer_request * req;
967
assert( msglen == 12 );
968
req = tr_new( struct peer_request, 1 );
969
tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
970
tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
971
tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
972
dbgmsg( msgs, "got Request: %u:%u->%u", req->index, req->offset, req->length );
974
if ( !requestIsValid( msgs, req ) )
976
dbgmsg( msgs, "BT_REQUEST: invalid request, ignoring" );
981
If we're not choking him -> continue
983
it doesn't support FPE -> He's deaf, reCHOKE and bail...
985
If the asked piece is not allowed
986
OR he's above our threshold
987
OR we don't have the requested piece -> Reject
989
Asked piece allowed AND he's below our threshold -> continue...
993
if ( msgs->info->peerIsChoked )
995
if ( !tr_peerIoSupportsFEXT( msgs->io ) )
997
dbgmsg( msgs, "BT_REQUEST: peer is choked, ignoring" );
998
/* Didn't he get it? */
999
tr_peerMsgsSetChoke( msgs, 1 );
1005
if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index )
1006
|| ( msgs->info->progress * (float)msgs->torrent->info.pieceCount) >= MAX_ALLOWED_SET_COUNT
1007
|| !tr_cpPieceIsComplete( msgs->torrent->completion, req->index ) )
1009
dbgmsg( msgs, "BT_REQUEST: peer requests an un-fastallowed piece" );
1010
sendFastReject( msgs, req->index, req->offset, req->length );
1014
dbgmsg( msgs, "BT_REQUEST: fast allowed piece, accepting request" );
1018
tr_list_append( &msgs->peerAskedFor, req );
1023
struct peer_request req;
1025
assert( msglen == 12 );
1026
tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1027
tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1028
tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1029
dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1030
data = tr_list_remove( &msgs->peerAskedFor, &req, compareRequest );
1036
dbgmsg( msgs, "got a Piece!" );
1037
assert( msgs->blockToUs.length == 0 );
1038
tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
1039
tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
1040
msgs->blockToUs.length = msglen - 8;
1041
assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
1042
msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
1048
dbgmsg( msgs, "Got a BT_PORT" );
1049
assert( msglen == 2 );
1050
tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1060
assert( msglen == 0 );
1061
dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1062
memset( msgs->info->have->bits, 1, msgs->info->have->len );
1063
updatePeerProgress( msgs );
1067
case BT_HAVE_NONE: {
1068
assert( msglen == 0 );
1069
dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1070
tr_bitfieldClear( msgs->info->have );
1071
updatePeerProgress( msgs );
1076
struct peer_request req;
1078
assert( msglen == 12 );
1079
dbgmsg( msgs, "Got a BT_REJECT" );
1080
tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1081
tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1082
tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1083
node = tr_list_find( msgs->peerAskedFor, &req, compareRequest );
1084
if( node != NULL ) {
1085
void * data = node->data;
1086
tr_list_remove_data( &msgs->peerAskedFor, data );
1088
dbgmsg( msgs, "found the req that peer has rejected... cancelled." );
1093
case BT_ALLOWED_FAST: {
1094
assert( msglen == 4 );
1095
dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1096
tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1097
tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1102
dbgmsg( msgs, "Got a BT_LTEP" );
1103
parseLtep( msgs, msglen, inbuf );
1107
dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1108
tr_peerIoDrain( msgs->io, inbuf, msglen );
1112
msgs->incomingMessageLength = -1;
1113
msgs->state = AWAITING_BT_LENGTH;
1118
clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1120
tr_torrent * tor = msgs->torrent;
1121
tor->activityDate = tr_date( );
1122
tor->downloadedCur += byteCount;
1123
msgs->info->pieceDataActivityDate = time( NULL );
1124
tr_rcTransferred( msgs->info->rcToClient, byteCount );
1125
tr_rcTransferred( tor->download, byteCount );
1126
tr_rcTransferred( tor->handle->download, byteCount );
1130
peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1132
tr_torrent * tor = msgs->torrent;
1133
tor->activityDate = tr_date( );
1134
tor->uploadedCur += byteCount;
1135
msgs->info->pieceDataActivityDate = time( NULL );
1136
tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1137
tr_rcTransferred( tor->upload, byteCount );
1138
tr_rcTransferred( tor->handle->upload, byteCount );
1142
canDownload( const tr_peermsgs * msgs )
1144
tr_torrent * tor = msgs->torrent;
1146
if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1147
return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
1149
if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1150
return tr_rcCanTransfer( tor->download );
1156
reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1158
tr_torrent * tor = msgs->torrent;
1160
/* increment the `corrupt' field */
1161
tor->corruptCur += byteCount;
1163
/* decrement the `downloaded' field */
1164
if( tor->downloadedCur >= byteCount )
1165
tor->downloadedCur -= byteCount;
1167
tor->downloadedCur = 0;
1172
gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1174
const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1175
reassignBytesToCorrupt( msgs, byteCount );
1179
gotUnwantedBlock( tr_peermsgs * msgs,
1180
uint32_t index UNUSED,
1181
uint32_t offset UNUSED,
1184
reassignBytesToCorrupt( msgs, length );
1188
addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
1190
if( !msgs->info->blame )
1191
msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1192
tr_bitfieldAdd( msgs->info->blame, index );
1196
gotBlock( tr_peermsgs * msgs,
1197
struct evbuffer * inbuf,
1202
tr_torrent * tor = msgs->torrent;
1203
const int block = _tr_block( tor, index, offset );
1204
struct peer_request key, *req;
1207
*** Remove the block from our `we asked for this' list
1211
key.offset = offset;
1212
key.length = length;
1213
req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
1216
gotUnwantedBlock( msgs, index, offset, length );
1217
dbgmsg( msgs, "we didn't ask for this message..." );
1220
dbgmsg( msgs, "Got block %u:%u->%u (turnaround time %d secs)",
1221
req->index, req->offset, req->length,
1222
(int)(time(NULL) - req->time_requested) );
1224
dbgmsg( msgs, "peer has %d more blocks we've asked for",
1225
tr_list_size(msgs->clientAskedFor));
1231
if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1232
dbgmsg( msgs, "have this block already..." );
1233
tr_dbg( "have this block already..." );
1234
gotUnwantedBlock( msgs, index, offset, length );
1238
if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
1239
dbgmsg( msgs, "block is the wrong length..." );
1240
tr_dbg( "block is the wrong length..." );
1241
gotUnwantedBlock( msgs, index, offset, length );
1249
if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf )))
1252
tr_cpBlockAdd( tor->completion, block );
1254
addUsToBlamefield( msgs, index );
1256
fireGotBlock( msgs, index, offset, length );
1259
*** Handle if this was the last block in the piece
1262
if( tr_cpPieceIsComplete( tor->completion, index ) )
1264
if( tr_ioHash( tor, index ) )
1266
gotBadPiece( msgs, index );
1270
fireClientHave( msgs, index );
1276
readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
1281
assert( msgs != NULL );
1282
assert( msgs->blockToUs.length > 0 );
1283
assert( inbuf != NULL );
1284
assert( EVBUFFER_LENGTH( inbuf ) > 0 );
1286
/* read from the inbuf into our block buffer */
1287
inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
1288
tmp = tr_new( uint8_t, inlen );
1289
tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
1290
evbuffer_add( msgs->inBlock, tmp, inlen );
1292
/* update our tables accordingly */
1293
assert( inlen >= msgs->blockToUs.length );
1294
msgs->blockToUs.length -= inlen;
1295
msgs->info->peerSentPieceDataAt = time( NULL );
1296
clientGotBytes( msgs, inlen );
1298
/* if this was the entire block, save it */
1299
if( !msgs->blockToUs.length )
1301
dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset );
1302
assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
1303
gotBlock( msgs, msgs->inBlock,
1304
msgs->blockToUs.index,
1305
msgs->blockToUs.offset,
1306
EVBUFFER_LENGTH( msgs->inBlock ) );
1307
evbuffer_drain( msgs->inBlock, ~0 );
1308
msgs->state = AWAITING_BT_LENGTH;
1317
canRead( struct bufferevent * evin, void * vmsgs )
1320
tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1321
struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1323
if( !canDownload( msgs ) )
1325
msgs->notListening = 1;
1326
tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
1329
else switch( msgs->state )
1331
case AWAITING_BT_LENGTH: ret = readBtLength ( msgs, inbuf ); break;
1332
case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
1333
case READING_BT_PIECE: ret = readBtPiece ( msgs, inbuf ); break;
1334
default: assert( 0 );
1341
sendKeepalive( tr_peermsgs * msgs )
1343
dbgmsg( msgs, "sending a keepalive message" );
1344
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1352
canWrite( const tr_peermsgs * msgs )
1354
/* don't let our outbuffer get too large */
1355
if( tr_peerIoWriteBytesWaiting( msgs->io ) > 8192 )
1362
canUpload( const tr_peermsgs * msgs )
1364
const tr_torrent * tor = msgs->torrent;
1366
if( !canWrite( msgs ) )
1369
if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1370
return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
1372
if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1373
return tr_rcCanTransfer( tor->upload );
1379
ratePulse( void * vmsgs )
1381
tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1382
msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1383
msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1384
msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 );
1385
msgs->minActiveRequests = msgs->maxActiveRequests / 2;
1390
pulse( void * vmsgs )
1392
const time_t now = time( NULL );
1393
tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1396
/* if we froze out a downloaded block because of speed limits,
1397
start listening to the peer again */
1398
if( msgs->notListening && canDownload( msgs ) )
1400
msgs->notListening = 0;
1401
tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1404
pumpRequestQueue( msgs );
1406
if( !canWrite( msgs ) )
1409
else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1411
tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1412
msgs->clientSentAnythingAt = now;
1414
else if(( msgs->peerAskedFor ))
1416
if( canUpload( msgs ) )
1418
struct peer_request * r = tr_list_pop_front( &msgs->peerAskedFor );
1419
uint8_t * buf = tr_new( uint8_t, r->length );
1421
if( requestIsValid( msgs, r )
1422
&& tr_cpPieceIsComplete( msgs->torrent->completion, r->index )
1423
&& !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1425
protocolSendPiece( msgs, r, buf );
1426
peerGotBytes( msgs, r->length );
1427
msgs->clientSentAnythingAt = now;
1434
else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1436
sendKeepalive( msgs );
1439
return TRUE; /* loop forever */
1443
didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1449
gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1451
dbgmsg( vmsgs, "libevent got an error! what=%d, errno=%d (%s)",
1452
(int)what, errno, strerror(errno) );
1453
fireGotError( vmsgs );
1457
sendBitfield( tr_peermsgs * msgs )
1459
const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1460
struct evbuffer * out = msgs->outMessages;
1462
dbgmsg( msgs, "sending peer a bitfield message" );
1463
tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1464
tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1465
tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1472
/* some peers give us error messages if we send
1473
more than this many peers in a single pex message */
1474
#define MAX_PEX_DIFFS 200
1489
pexAddedCb( void * vpex, void * userData )
1491
PexDiffs * diffs = (PexDiffs *) userData;
1492
tr_pex * pex = (tr_pex *) vpex;
1493
if( diffs->diffCount < MAX_PEX_DIFFS )
1496
diffs->added[diffs->addedCount++] = *pex;
1497
diffs->elements[diffs->elementCount++] = *pex;
1502
pexRemovedCb( void * vpex, void * userData )
1504
PexDiffs * diffs = (PexDiffs *) userData;
1505
tr_pex * pex = (tr_pex *) vpex;
1506
if( diffs->diffCount < MAX_PEX_DIFFS )
1509
diffs->dropped[diffs->droppedCount++] = *pex;
1514
pexElementCb( void * vpex, void * userData )
1516
PexDiffs * diffs = (PexDiffs *) userData;
1517
tr_pex * pex = (tr_pex *) vpex;
1518
if( diffs->diffCount < MAX_PEX_DIFFS )
1521
diffs->elements[diffs->elementCount++] = *pex;
1526
sendPex( tr_peermsgs * msgs )
1528
if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1531
tr_pex * newPex = NULL;
1532
const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1534
benc_val_t val, *added, *dropped, *flags;
1535
uint8_t *tmp, *walk;
1539
/* build the diffs */
1540
diffs.added = tr_new( tr_pex, newCount );
1541
diffs.addedCount = 0;
1542
diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1543
diffs.droppedCount = 0;
1544
diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1545
diffs.elementCount = 0;
1546
diffs.diffCount = 0;
1547
tr_set_compare( msgs->pex, msgs->pexCount,
1549
tr_pexCompare, sizeof(tr_pex),
1550
pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1551
dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1554
tr_free( msgs->pex );
1555
msgs->pex = diffs.elements;
1556
msgs->pexCount = diffs.elementCount;
1558
/* build the pex payload */
1559
tr_bencInit( &val, TYPE_DICT );
1560
tr_bencDictReserve( &val, 3 );
1563
added = tr_bencDictAdd( &val, "added" );
1564
tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1565
for( i=0; i<diffs.addedCount; ++i ) {
1566
memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1567
memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1569
assert( ( walk - tmp ) == diffs.addedCount * 6 );
1570
tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1573
flags = tr_bencDictAdd( &val, "added.f" );
1574
tmp = walk = tr_new( uint8_t, diffs.addedCount );
1575
for( i=0; i<diffs.addedCount; ++i )
1576
*walk++ = diffs.added[i].flags;
1577
assert( ( walk - tmp ) == diffs.addedCount );
1578
tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1581
dropped = tr_bencDictAdd( &val, "dropped" );
1582
tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1583
for( i=0; i<diffs.droppedCount; ++i ) {
1584
memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1585
memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1587
assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1588
tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1590
/* write the pex message */
1591
benc = tr_bencSaveMalloc( &val, &bencLen );
1592
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1593
tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1594
tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1595
tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1599
tr_bencFree( &val );
1600
tr_free( diffs.added );
1601
tr_free( diffs.dropped );
1604
msgs->clientSentPexAt = time( NULL );
1609
pexPulse( void * vpeer )
1620
tr_peerMsgsNew( struct tr_torrent * torrent,
1621
struct tr_peer * info,
1622
tr_delivery_func func,
1624
tr_publisher_tag * setme )
1628
assert( info != NULL );
1629
assert( info->io != NULL );
1631
m = tr_new0( tr_peermsgs, 1 );
1632
m->publisher = tr_publisherNew( );
1634
m->handle = torrent->handle;
1635
m->torrent = torrent;
1637
m->info->clientIsChoked = 1;
1638
m->info->peerIsChoked = 1;
1639
m->info->clientIsInterested = 0;
1640
m->info->peerIsInterested = 0;
1641
m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1642
m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1643
m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1644
m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1645
m->outMessages = evbuffer_new( );
1646
m->inBlock = evbuffer_new( );
1647
m->peerAllowedPieces = NULL;
1648
m->clientAllowedPieces = NULL;
1649
*setme = tr_publisherSubscribe( m->publisher, func, userData );
1651
if ( tr_peerIoSupportsFEXT( m->io ) )
1653
/* This peer is fastpeer-enabled, generate its allowed set
1654
* (before registering our callbacks) */
1655
if ( !m->peerAllowedPieces ) {
1656
const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1658
m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_ALLOWED_SET_COUNT,
1659
m->torrent->info.pieceCount,
1660
m->torrent->info.hash,
1663
m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1666
tr_peerIoSetTimeoutSecs( m->io, 150 ); /* error if we don't read or write for 2.5 minutes */
1667
tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1668
tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
1671
if ( tr_peerIoSupportsLTEP( m->io ) )
1672
sendLtepHandshake( m );
1674
if ( !tr_peerIoSupportsFEXT( m->io ) )
1677
/* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1678
float completion = tr_cpPercentComplete( m->torrent->completion );
1679
if ( completion == 0.0f ) {
1680
sendFastHave( m, 0 );
1681
} else if ( completion == 1.0f ) {
1682
sendFastHave( m, 1 );
1686
uint32_t peerProgress = m->torrent->info.pieceCount * m->info->progress;
1688
if ( peerProgress < MAX_ALLOWED_SET_COUNT )
1689
sendFastAllowedSet( m );
1696
tr_peerMsgsFree( tr_peermsgs* msgs )
1700
tr_timerFree( &msgs->pulseTimer );
1701
tr_timerFree( &msgs->rateTimer );
1702
tr_timerFree( &msgs->pexTimer );
1703
tr_publisherFree( &msgs->publisher );
1704
tr_list_free( &msgs->clientWillAskFor, tr_free );
1705
tr_list_free( &msgs->clientAskedFor, tr_free );
1706
tr_list_free( &msgs->peerAskedFor, tr_free );
1707
evbuffer_free( msgs->outMessages );
1708
evbuffer_free( msgs->inBlock );
1709
tr_free( msgs->pex );
1716
tr_peerMsgsSubscribe( tr_peermsgs * peer,
1717
tr_delivery_func func,
1720
return tr_publisherSubscribe( peer->publisher, func, userData );
1724
tr_peerMsgsUnsubscribe( tr_peermsgs * peer,
1725
tr_publisher_tag tag )
1727
tr_publisherUnsubscribe( peer->publisher, tag );
1731
tr_peerMsgIsPieceFastAllowed( const tr_peermsgs * peer,
1734
return tr_bitfieldHas( peer->clientAllowedPieces, index );