~ubuntu-branches/ubuntu/jaunty/transmission/jaunty-security

« back to all changes in this revision

Viewing changes to libtransmission/peer-msgs.c

  • Committer: Bazaar Package Importer
  • Author(s): Philipp Benner
  • Date: 2007-10-29 19:53:02 UTC
  • mfrom: (1.1.4 upstream)
  • Revision ID: james.westby@ubuntu.com-20071029195302-gyy129sjmci7ezel
Tags: 0.91.dfsg-1
* New upstream release (Closes: #448516).
* Using manpages from source package.

Show diffs side-by-side

added added

removed removed

Lines of Context:
7
7
 * This exemption does not extend to derived works not owned by
8
8
 * the Transmission project.
9
9
 *
10
 
 * $Id: peer-msgs.c 3501 2007-10-22 23:27:47Z charles $
 
10
 * $Id: peer-msgs.c 3614 2007-10-28 15:20:23Z charles $
11
11
 */
12
12
 
13
13
#include <assert.h>
18
18
#include <string.h>
19
19
#include <libgen.h> /* basename */
20
20
 
21
 
#include <arpa/inet.h>
 
21
#include <netinet/in.h> /* struct in_addr */
22
22
 
23
23
#include <sys/types.h> /* event.h needs this */
24
24
#include <event.h>
70
70
 
71
71
    KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
72
72
    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
73
 
    PEER_PULSE_INTERVAL     = (133),        /* msec between calls to pulse() */
 
73
    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
74
74
    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
75
75
};
76
76
 
110
110
 
111
111
    tr_publisher_t * publisher;
112
112
 
 
113
    struct evbuffer * outBlock;    /* buffer of all the current piece message */
113
114
    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
114
115
    struct evbuffer * inBlock;     /* the block we're currently receiving */
115
116
    tr_list * peerAskedFor;
 
117
    tr_list * peerAskedForFast;
116
118
    tr_list * clientAskedFor;
117
119
    tr_list * clientWillAskFor;
118
120
 
233
235
    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
234
236
}
235
237
 
236
 
static void
237
 
protocolSendPiece( tr_peermsgs                * msgs,
238
 
                   const struct peer_request  * r,
239
 
                   const uint8_t              * pieceData )
240
 
{
241
 
    tr_peerIo * io = msgs->io;
242
 
    struct evbuffer * out = evbuffer_new( );
243
 
 
244
 
    dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
245
 
    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
246
 
    tr_peerIoWriteUint8 ( io, out, BT_PIECE );
247
 
    tr_peerIoWriteUint32( io, out, r->index );
248
 
    tr_peerIoWriteUint32( io, out, r->offset );
249
 
    tr_peerIoWriteBytes ( io, out, pieceData, r->length );
250
 
    tr_peerIoWriteBuf   ( io, out );
251
 
 
252
 
    evbuffer_free( out );
253
 
}
254
 
 
255
238
/**
256
239
***  EVENTS
257
240
**/
393
376
 
394
377
#define MIN_CHOKE_PERIOD_SEC 10
395
378
 
 
379
static void
 
380
cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
 
381
{
 
382
    tr_list_free( &msgs->peerAskedFor, tr_free );
 
383
}
 
384
 
396
385
void
397
386
tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
398
387
{
409
398
    else if( msgs->info->peerIsChoked != choke )
410
399
    {
411
400
        msgs->info->peerIsChoked = choke;
412
 
        
413
401
        if( choke )
414
 
        {
415
 
            tr_list * walk;
416
 
            for( walk = msgs->peerAskedFor; walk != NULL; )
417
 
            {
418
 
                tr_list * next = walk->next;
419
 
                /* don't reject a peer's fast allowed requests at choke */
420
 
                struct peer_request *req = walk->data;
421
 
                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
422
 
                {
423
 
                    tr_list_remove_data( &msgs->peerAskedFor, req );
424
 
                    tr_free( req );
425
 
                }
426
 
                walk = next;
427
 
            }
428
 
        }
429
 
 
 
402
            cancelAllRequestsToClientExceptFast( msgs );
430
403
        protocolSendChoke( msgs, choke );
431
404
        msgs->info->chokeChangedAt = time( NULL );
432
405
    }
477
450
{
478
451
    assert( msgs != NULL );
479
452
    assert( length > 0 );
480
 
    
481
 
    /* reject the request */
482
 
    const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
483
 
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
484
 
    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
485
 
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
486
 
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
487
 
    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
 
453
 
 
454
    if( tr_peerIoSupportsFEXT( msgs->io ) )
 
455
    {
 
456
        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
 
457
        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
 
458
        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
 
459
        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
 
460
        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
 
461
        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
 
462
    }
488
463
}
489
464
 
490
465
static void
534
509
}
535
510
 
536
511
static int
537
 
requestIsValid( const tr_peermsgs * msgs, struct peer_request * req )
 
512
requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
538
513
{
539
514
    return reqIsValid( msgs, req->index, req->offset, req->length );
540
515
}
630
605
}
631
606
 
632
607
static void
633
 
tr_peerMsgsCancelAllRequests( tr_peermsgs * msgs )
 
608
cancelAllRequestsToPeer( tr_peermsgs * msgs )
634
609
{
635
610
    struct peer_request * req;
636
611
 
885
860
}
886
861
 
887
862
static int
 
863
clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
 
864
{
 
865
    /* FIXME(tiennou): base this on how many blocks we've already sent this
 
866
     * peer, or maybe how many fast blocks per minute we've sent overall,
 
867
     * or maybe how much bandwidth we're already using up w/o fast peers.
 
868
     * I don't know what the Right Thing here is, but 
 
869
     * the previous measurement of how many pieces we have is not enough. */
 
870
    return FALSE;
 
871
}
 
872
 
 
873
static void
 
874
peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
 
875
{
 
876
    const int reqIsValid = requestIsValid( msgs, req );
 
877
    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
 
878
    const int peerIsChoked = msgs->info->peerIsChoked;
 
879
    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
 
880
    const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
 
881
    const int canSendFast = clientCanSendFastBlock( msgs );
 
882
 
 
883
    if( !reqIsValid ) /* bad request */
 
884
    {
 
885
        dbgmsg( msgs, "rejecting an invalid request." );
 
886
        sendFastReject( msgs, req->index, req->offset, req->length );
 
887
    }
 
888
    else if( !clientHasPiece ) /* we don't have it */
 
889
    {
 
890
        dbgmsg( msgs, "rejecting request for a piece we don't have." );
 
891
        sendFastReject( msgs, req->index, req->offset, req->length );
 
892
    }
 
893
    else if( peerIsChoked && !peerIsFast ) /* maybe he doesn't know he's choked? */
 
894
    {
 
895
        tr_peerMsgsSetChoke( msgs, 1 );
 
896
        sendFastReject( msgs, req->index, req->offset, req->length );
 
897
    }
 
898
    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
 
899
    {
 
900
        sendFastReject( msgs, req->index, req->offset, req->length );
 
901
    }
 
902
    else /* YAY */
 
903
    {
 
904
        struct peer_request * tmp = tr_new( struct peer_request, 1 );
 
905
        *tmp = *req;
 
906
        if( peerIsFast && pieceIsFast )
 
907
            tr_list_append( &msgs->peerAskedForFast, tmp );
 
908
        else
 
909
            tr_list_append( &msgs->peerAskedFor, tmp );
 
910
    }
 
911
}
 
912
 
 
913
static int
 
914
messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
 
915
{
 
916
    switch( id )
 
917
    {
 
918
        case BT_CHOKE:
 
919
        case BT_UNCHOKE:
 
920
        case BT_INTERESTED:
 
921
        case BT_NOT_INTERESTED:
 
922
        case BT_HAVE_ALL:
 
923
        case BT_HAVE_NONE:
 
924
            return len==1;
 
925
 
 
926
        case BT_HAVE:
 
927
        case BT_SUGGEST:
 
928
        case BT_ALLOWED_FAST:
 
929
            return len==5;
 
930
 
 
931
        case BT_BITFIELD:
 
932
            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
 
933
       
 
934
        case BT_REQUEST:
 
935
        case BT_CANCEL:
 
936
        case BT_REJECT:
 
937
            return len==13;
 
938
 
 
939
        case BT_PIECE:
 
940
            return len > 9;
 
941
 
 
942
        case BT_PORT:
 
943
            return len==3;
 
944
 
 
945
        case BT_LTEP:
 
946
            return len >= 2;
 
947
 
 
948
        default:
 
949
            return FALSE;
 
950
    }
 
951
}
 
952
 
 
953
 
 
954
static int
888
955
readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
889
956
{
890
957
    uint8_t id;
895
962
        return READ_MORE;
896
963
 
897
964
    tr_peerIoReadUint8( msgs->io, inbuf, &id );
898
 
    msglen--;
899
965
    dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen );
900
966
 
 
967
    if( !messageLengthIsCorrect( msgs, id, msglen ) )
 
968
    {
 
969
        fireGotError( msgs );
 
970
        return READ_DONE;
 
971
    }
 
972
 
 
973
    --msglen;
 
974
 
901
975
    switch( id )
902
976
    {
903
 
        case BT_CHOKE:
904
 
            dbgmsg( msgs, "got Choke" );
905
 
            assert( msglen == 0 );
906
 
            msgs->info->clientIsChoked = 1;
907
 
#if 0
908
 
            tr_list * walk;
909
 
            for( walk = msgs->peerAskedFor; walk != NULL; )
910
 
            {
911
 
                tr_list * next = walk->next;
912
 
                /* We shouldn't reject a peer's fast allowed requests at choke */
913
 
                struct peer_request *req = walk->data;
914
 
                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
915
 
                {
916
 
                    tr_list_remove_data( &msgs->peerAskedFor, req );
917
 
                    tr_free( req );
918
 
                }
919
 
                walk = next;
920
 
            }
921
 
#endif
922
 
            tr_peerMsgsCancelAllRequests( msgs );
923
 
            break;
924
 
 
925
 
        case BT_UNCHOKE:
926
 
            dbgmsg( msgs, "got Unchoke" );
927
 
            assert( msglen == 0 );
928
 
            msgs->info->clientIsChoked = 0;
929
 
            fireNeedReq( msgs );
930
 
            break;
931
 
 
932
 
        case BT_INTERESTED:
933
 
            dbgmsg( msgs, "got Interested" );
934
 
            assert( msglen == 0 );
935
 
            msgs->info->peerIsInterested = 1;
936
 
            tr_peerMsgsSetChoke( msgs, 0 );
937
 
            break;
938
 
 
939
 
        case BT_NOT_INTERESTED:
940
 
            dbgmsg( msgs, "got Not Interested" );
941
 
            assert( msglen == 0 );
942
 
            msgs->info->peerIsInterested = 0;
943
 
            break;
944
 
 
945
 
        case BT_HAVE:
946
 
            assert( msglen == 4 );
947
 
            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
948
 
            tr_bitfieldAdd( msgs->info->have, ui32 );
949
 
            updatePeerProgress( msgs );
950
 
            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
951
 
            dbgmsg( msgs, "got Have: %u", ui32 );
952
 
            break;
953
 
 
954
 
        case BT_BITFIELD: {
955
 
            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
956
 
            dbgmsg( msgs, "got a bitfield" );
957
 
            assert( msglen == msgs->info->have->len );
958
 
            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
959
 
            updatePeerProgress( msgs );
960
 
            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
961
 
            fireNeedReq( msgs );
962
 
            break;
963
 
        }
964
 
 
965
 
        case BT_REQUEST: {
966
 
            struct peer_request * req;
967
 
            assert( msglen == 12 );
968
 
            req = tr_new( struct peer_request, 1 );
969
 
            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
970
 
            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
971
 
            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
972
 
            dbgmsg( msgs, "got Request: %u:%u->%u", req->index, req->offset, req->length );
973
 
            
974
 
            if ( !requestIsValid( msgs, req ) )
975
 
            {
976
 
                dbgmsg( msgs, "BT_REQUEST: invalid request, ignoring" );
977
 
                tr_free( req );
978
 
                break;
979
 
            }
980
 
            /* 
981
 
                If we're not choking him -> continue
982
 
                If we're choking him
983
 
                    it doesn't support FPE -> He's deaf, reCHOKE and bail...
984
 
                    it support FPE
985
 
                        If the asked piece is not allowed
986
 
                            OR he's above our threshold
987
 
                            OR we don't have the requested piece -> Reject
988
 
                        Else
989
 
                        Asked piece allowed AND he's below our threshold -> continue...
990
 
             */
 
977
        case BT_CHOKE:
 
978
            dbgmsg( msgs, "got Choke" );
 
979
            msgs->info->clientIsChoked = 1;
 
980
            cancelAllRequestsToPeer( msgs );
 
981
            cancelAllRequestsToClientExceptFast( msgs );
 
982
            break;
 
983
 
 
984
        case BT_UNCHOKE:
 
985
            dbgmsg( msgs, "got Unchoke" );
 
986
            msgs->info->clientIsChoked = 0;
 
987
            fireNeedReq( msgs );
 
988
            break;
 
989
 
 
990
        case BT_INTERESTED:
 
991
            dbgmsg( msgs, "got Interested" );
 
992
            msgs->info->peerIsInterested = 1;
 
993
            tr_peerMsgsSetChoke( msgs, 0 );
 
994
            break;
 
995
 
 
996
        case BT_NOT_INTERESTED:
 
997
            dbgmsg( msgs, "got Not Interested" );
 
998
            msgs->info->peerIsInterested = 0;
 
999
            break;
 
1000
 
 
1001
        case BT_HAVE:
 
1002
            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
 
1003
            dbgmsg( msgs, "got Have: %u", ui32 );
 
1004
            tr_bitfieldAdd( msgs->info->have, ui32 );
 
1005
            updatePeerProgress( msgs );
 
1006
            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
 
1007
            break;
 
1008
 
 
1009
        case BT_BITFIELD: {
 
1010
            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
 
1011
            dbgmsg( msgs, "got a bitfield" );
 
1012
            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
 
1013
            updatePeerProgress( msgs );
 
1014
            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
 
1015
            fireNeedReq( msgs );
 
1016
            break;
 
1017
        }
 
1018
 
 
1019
        case BT_REQUEST: {
 
1020
            struct peer_request req;
 
1021
            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
 
1022
            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
 
1023
            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
 
1024
            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
 
1025
            peerMadeRequest( msgs, &req );
 
1026
            break;
 
1027
        }
 
1028
 
 
1029
        case BT_CANCEL: {
 
1030
            struct peer_request req;
 
1031
            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
 
1032
            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
 
1033
            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
 
1034
            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
 
1035
            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
 
1036
            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
 
1037
            break;
 
1038
        }
 
1039
 
 
1040
        case BT_PIECE: {
 
1041
            dbgmsg( msgs, "got a Piece!" );
 
1042
            assert( msgs->blockToUs.length == 0 );
 
1043
            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
 
1044
            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
 
1045
            msgs->blockToUs.length = msglen - 8;
 
1046
            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
 
1047
            msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
 
1048
            return READ_AGAIN;
 
1049
            break;
 
1050
        }
 
1051
 
 
1052
        case BT_PORT: {
 
1053
            dbgmsg( msgs, "Got a BT_PORT" );
 
1054
            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
 
1055
            break;
 
1056
        }
991
1057
    
992
 
 
993
 
            if ( msgs->info->peerIsChoked )
994
 
            {
995
 
                if ( !tr_peerIoSupportsFEXT( msgs->io ) )
996
 
                {
997
 
                    dbgmsg( msgs, "BT_REQUEST: peer is choked, ignoring" );
998
 
                    /* Didn't he get it? */
999
 
                    tr_peerMsgsSetChoke( msgs, 1 );
1000
 
                    tr_free( req );
1001
 
                    break;
1002
 
                }
1003
 
                else
1004
 
                {
1005
 
                    if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index )
1006
 
                         || ( msgs->info->progress * (float)msgs->torrent->info.pieceCount) >= MAX_ALLOWED_SET_COUNT
1007
 
                         || !tr_cpPieceIsComplete( msgs->torrent->completion, req->index ) )
1008
 
                    {
1009
 
                        dbgmsg( msgs, "BT_REQUEST: peer requests an un-fastallowed piece" );
1010
 
                        sendFastReject( msgs, req->index, req->offset, req->length );
1011
 
                        tr_free( req );
1012
 
                        break;
1013
 
                    }
1014
 
                    dbgmsg( msgs, "BT_REQUEST: fast allowed piece, accepting request" );
1015
 
                }    
1016
 
            }
1017
 
            
1018
 
            tr_list_append( &msgs->peerAskedFor, req );
1019
 
            break;
1020
 
        }
1021
 
 
1022
 
        case BT_CANCEL: {
1023
 
            struct peer_request req;
1024
 
            void * data;
1025
 
            assert( msglen == 12 );
1026
 
            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1027
 
            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1028
 
            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1029
 
            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1030
 
            data = tr_list_remove( &msgs->peerAskedFor, &req, compareRequest );
1031
 
            tr_free( data );
1032
 
            break;
1033
 
        }
1034
 
 
1035
 
        case BT_PIECE: {
1036
 
            dbgmsg( msgs, "got a Piece!" );
1037
 
            assert( msgs->blockToUs.length == 0 );
1038
 
            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
1039
 
            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
1040
 
            msgs->blockToUs.length = msglen - 8;
1041
 
            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
1042
 
            msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
1043
 
            return READ_AGAIN;
1044
 
            break;
1045
 
        }
1046
 
 
1047
 
        case BT_PORT: {
1048
 
            dbgmsg( msgs, "Got a BT_PORT" );
1049
 
            assert( msglen == 2 );
1050
 
            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1051
 
            break;
1052
 
        }
1053
 
        
1054
 
        case BT_SUGGEST: {
1055
 
            /* tiennou TODO */
1056
 
            break;
1057
 
        }
1058
 
            
1059
 
        case BT_HAVE_ALL: {
1060
 
            assert( msglen == 0 );
1061
 
            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1062
 
            memset( msgs->info->have->bits, 1, msgs->info->have->len );
1063
 
            updatePeerProgress( msgs );
1064
 
            break;
1065
 
        }
1066
 
            
1067
 
        case BT_HAVE_NONE: {
1068
 
            assert( msglen == 0 );
1069
 
            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1070
 
            tr_bitfieldClear( msgs->info->have );
1071
 
            updatePeerProgress( msgs );
1072
 
            break;
1073
 
        }
1074
 
            
1075
 
        case BT_REJECT: {
1076
 
            struct peer_request req;
1077
 
            tr_list * node;
1078
 
            assert( msglen == 12 );
1079
 
            dbgmsg( msgs, "Got a BT_REJECT" );
1080
 
            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1081
 
            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1082
 
            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1083
 
            node = tr_list_find( msgs->peerAskedFor, &req, compareRequest );
1084
 
            if( node != NULL ) {
1085
 
                void * data = node->data;
1086
 
                tr_list_remove_data( &msgs->peerAskedFor, data );
1087
 
                tr_free( data );
1088
 
                dbgmsg( msgs, "found the req that peer has rejected... cancelled." );
1089
 
            }
1090
 
            break;
1091
 
        }
1092
 
            
1093
 
        case BT_ALLOWED_FAST: {
1094
 
            assert( msglen == 4 );
1095
 
            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1096
 
            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1097
 
            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1098
 
            break;
1099
 
        }
1100
 
            
1101
 
        case BT_LTEP:
1102
 
            dbgmsg( msgs, "Got a BT_LTEP" );
1103
 
            parseLtep( msgs, msglen, inbuf );
1104
 
            break;
1105
 
 
1106
 
        default:
1107
 
            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1108
 
            tr_peerIoDrain( msgs->io, inbuf, msglen );
1109
 
            assert( 0 );
 
1058
        case BT_SUGGEST: {
 
1059
            /* FIXME(tiennou) */
 
1060
            uint32_t index;
 
1061
            tr_peerIoReadUint32( msgs->io, inbuf, &index );
 
1062
            break;
 
1063
        }
 
1064
        
 
1065
        case BT_HAVE_ALL:
 
1066
            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
 
1067
            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
 
1068
            updatePeerProgress( msgs );
 
1069
            break;
 
1070
        
 
1071
        case BT_HAVE_NONE:
 
1072
            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
 
1073
            tr_bitfieldClear( msgs->info->have );
 
1074
            updatePeerProgress( msgs );
 
1075
            break;
 
1076
        
 
1077
        case BT_REJECT: {
 
1078
            struct peer_request req;
 
1079
            dbgmsg( msgs, "Got a BT_REJECT" );
 
1080
            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
 
1081
            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
 
1082
            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
 
1083
            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
 
1084
            break;
 
1085
        }
 
1086
        
 
1087
        case BT_ALLOWED_FAST: {
 
1088
            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
 
1089
            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
 
1090
            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
 
1091
            break;
 
1092
        }
 
1093
        
 
1094
        case BT_LTEP:
 
1095
            dbgmsg( msgs, "Got a BT_LTEP" );
 
1096
            parseLtep( msgs, msglen, inbuf );
 
1097
            break;
 
1098
 
 
1099
        default:
 
1100
            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
 
1101
            tr_peerIoDrain( msgs->io, inbuf, msglen );
 
1102
            break;
1110
1103
    }
1111
1104
 
1112
1105
    msgs->incomingMessageLength = -1;
1352
1345
canWrite( const tr_peermsgs * msgs )
1353
1346
{
1354
1347
    /* don't let our outbuffer get too large */
1355
 
    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 8192 )
 
1348
    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 4096 )
1356
1349
        return FALSE;
1357
1350
 
1358
1351
    return TRUE;
1359
1352
}
1360
1353
 
1361
 
static int
1362
 
canUpload( const tr_peermsgs * msgs )
 
1354
static size_t
 
1355
getUploadMax( const tr_peermsgs * msgs )
1363
1356
{
 
1357
    static const size_t maxval = ~0;
1364
1358
    const tr_torrent * tor = msgs->torrent;
1365
1359
 
1366
1360
    if( !canWrite( msgs ) )
1367
 
        return FALSE;
 
1361
        return 0;
1368
1362
 
1369
1363
    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1370
 
        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
 
1364
        return tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1371
1365
 
1372
1366
    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1373
 
        return tr_rcCanTransfer( tor->upload );
 
1367
        return tr_rcBytesLeft( tor->upload );
1374
1368
 
1375
 
    return TRUE;
 
1369
    return maxval;
1376
1370
}
1377
1371
 
1378
1372
static int
1386
1380
    return TRUE;
1387
1381
}
1388
1382
 
 
1383
static struct peer_request*
 
1384
popNextRequest( tr_peermsgs * msgs )
 
1385
{
 
1386
    struct peer_request * ret;
 
1387
    ret = tr_list_pop_front( &msgs->peerAskedForFast );
 
1388
    if( !ret )
 
1389
        ret = tr_list_pop_front( &msgs->peerAskedFor);
 
1390
    return ret;
 
1391
}
 
1392
 
1389
1393
static int
1390
1394
pulse( void * vmsgs )
1391
1395
{
1392
1396
    const time_t now = time( NULL );
1393
 
    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
 
1397
    tr_peermsgs * msgs = vmsgs;
 
1398
    struct peer_request * r;
1394
1399
    size_t len;
1395
1400
 
1396
1401
    /* if we froze out a downloaded block because of speed limits,
1406
1411
    if( !canWrite( msgs ) )
1407
1412
    {
1408
1413
    }
 
1414
    else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
 
1415
    {
 
1416
        const size_t uploadMax = getUploadMax( msgs );
 
1417
        const size_t outlen = MIN( len, uploadMax );
 
1418
        tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
 
1419
        evbuffer_drain( msgs->outBlock, outlen );
 
1420
        msgs->clientSentAnythingAt = now;
 
1421
        peerGotBytes( msgs, outlen );
 
1422
        len -= outlen;
 
1423
        dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
 
1424
        fflush( stdout );
 
1425
    }
1409
1426
    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1410
1427
    {
1411
1428
        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1412
1429
        msgs->clientSentAnythingAt = now;
1413
1430
    }
1414
 
    else if(( msgs->peerAskedFor ))
1415
 
    {
1416
 
        if( canUpload( msgs ) )
1417
 
        {
1418
 
            struct peer_request * r = tr_list_pop_front( &msgs->peerAskedFor );
1419
 
            uint8_t * buf = tr_new( uint8_t, r->length );
1420
 
 
1421
 
            if( requestIsValid( msgs, r )
1422
 
                && tr_cpPieceIsComplete( msgs->torrent->completion, r->index )
1423
 
                && !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1424
 
            {
1425
 
                protocolSendPiece( msgs, r, buf );
1426
 
                peerGotBytes( msgs, r->length );
1427
 
                msgs->clientSentAnythingAt = now;
1428
 
            }
1429
 
 
1430
 
            tr_free( buf );
1431
 
            tr_free( r );
1432
 
        }
1433
 
    }
1434
1431
    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1435
1432
    {
1436
1433
        sendKeepalive( msgs );
1437
1434
    }
1438
1435
 
 
1436
    if( !EVBUFFER_LENGTH( msgs->outBlock )
 
1437
        && (( r = popNextRequest( msgs )))
 
1438
        && requestIsValid( msgs, r )
 
1439
        && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
 
1440
    {
 
1441
        uint8_t * buf = tr_new( uint8_t, r->length );
 
1442
 
 
1443
        if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
 
1444
        {
 
1445
            tr_peerIo * io = msgs->io;
 
1446
            struct evbuffer * out = msgs->outBlock;
 
1447
 
 
1448
            dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
 
1449
            tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
 
1450
            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
 
1451
            tr_peerIoWriteUint32( io, out, r->index );
 
1452
            tr_peerIoWriteUint32( io, out, r->offset );
 
1453
            tr_peerIoWriteBytes ( io, out, buf, r->length );
 
1454
        }
 
1455
 
 
1456
        tr_free( buf );
 
1457
        tr_free( r );
 
1458
 
 
1459
        pulse( msgs ); /* start sending it right away */
 
1460
    }
 
1461
 
1439
1462
    return TRUE; /* loop forever */
1440
1463
}
1441
1464
 
1643
1666
    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1644
1667
    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1645
1668
    m->outMessages = evbuffer_new( );
 
1669
    m->outBlock = evbuffer_new( );
1646
1670
    m->inBlock = evbuffer_new( );
1647
1671
    m->peerAllowedPieces = NULL;
1648
1672
    m->clientAllowedPieces = NULL;
1675
1699
        sendBitfield( m );
1676
1700
    else {
1677
1701
        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
 
1702
        uint32_t peerProgress;
1678
1703
        float completion = tr_cpPercentComplete( m->torrent->completion );
1679
1704
        if ( completion == 0.0f ) {
1680
1705
            sendFastHave( m, 0 );
1683
1708
        } else {
1684
1709
            sendBitfield( m );
1685
1710
        }
1686
 
        uint32_t peerProgress = m->torrent->info.pieceCount * m->info->progress;
 
1711
        peerProgress = m->torrent->info.pieceCount * m->info->progress;
1687
1712
        
1688
1713
        if ( peerProgress < MAX_ALLOWED_SET_COUNT )
1689
1714
            sendFastAllowedSet( m );
1703
1728
        tr_publisherFree( &msgs->publisher );
1704
1729
        tr_list_free( &msgs->clientWillAskFor, tr_free );
1705
1730
        tr_list_free( &msgs->clientAskedFor, tr_free );
 
1731
        tr_list_free( &msgs->peerAskedForFast, tr_free );
1706
1732
        tr_list_free( &msgs->peerAskedFor, tr_free );
1707
1733
        evbuffer_free( msgs->outMessages );
 
1734
        evbuffer_free( msgs->outBlock );
1708
1735
        evbuffer_free( msgs->inBlock );
1709
1736
        tr_free( msgs->pex );
1710
1737
        msgs->pexCount = 0;