~ubuntu-branches/ubuntu/maverick/transmission/maverick-updates

« back to all changes in this revision

Viewing changes to libtransmission/peer-mgr.c

  • Committer: Bazaar Package Importer
  • Author(s): Krzysztof Klimonda
  • Date: 2009-12-08 10:49:11 UTC
  • mfrom: (1.1.29 upstream) (2.1.10 sid)
  • Revision ID: james.westby@ubuntu.com-20091208104911-06gio45n2nla3vpg
Tags: 1.80~b1-0ubuntu1
* New upstream release (LP: #460620), rebased on debian unstable
  remaining changes:
  - debian/control:
    + Added replaces & provides clutch (now included as part of transmission).
      Can be removed in lucid+1
    + Added quilt, liblaunchpad-integration-dev and lsb-release to Build-Depends
  - debian/rules:
    + create a po template during package build.
  - debian/patches/01_lpi.patch:
    + integrate transmission with launchpad
  - debian/patches/20_add_x-ubuntu-gettext-domain.diff:
    + add x-ubuntu-gettext-domain to .desktop file.
  - debian/transmission-daemon.default:
    - remove --auth from OPTIONS
* Fixes bugs:
  - tray menu shows wrong status for "main window" when started minimized
    (LP: #451415)
* Refreshed patches:
  - dont_build_libevent.patch
  - 99_autoreconf.patch
* Removed patches:
  - 21_onPortTested.diff, 23_tr_torrentNext.diff and
    24_tr_torrentDeleteLocalData_do_move.diff
* debian/patches/21_fix_inhibition.patch:
  - The right value for suspend inhibition is 4
* debian/control:
  - Build-Depend on libgconf2-dev to enable magnet link registration and on
    libcanberra-gtk-dev for notification sound.
* debian/watch:
  - make it detect beta versions, to be removed after 1.80 is released.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/*
2
 
 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
 
2
 * This file Copyright (C) 2007-2009 Mnemosyne LLC
3
3
 *
4
4
 * This file is licensed by the GPL version 2.  Works owned by the
5
5
 * Transmission project are granted a special exemption to clause 2(b)
7
7
 * This exemption does not extend to derived works not owned by
8
8
 * the Transmission project.
9
9
 *
10
 
 * $Id: peer-mgr.c 9090 2009-09-10 03:29:17Z charles $
 
10
 * $Id: peer-mgr.c 9671 2009-12-05 02:19:24Z charles $
11
11
 */
12
12
 
13
13
#include <assert.h>
19
19
#include <event.h>
20
20
 
21
21
#include "transmission.h"
 
22
#include "announcer.h"
22
23
#include "bandwidth.h"
23
24
#include "bencode.h"
24
25
#include "blocklist.h"
25
26
#include "clients.h"
26
27
#include "completion.h"
27
28
#include "crypto.h"
28
 
#include "fdlimit.h"
29
29
#include "handshake.h"
30
30
#include "inout.h" /* tr_ioTestPiece */
31
31
#include "net.h"
42
42
 
43
43
enum
44
44
{
 
45
    /* how frequently to cull old atoms */
 
46
    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
 
47
 
45
48
    /* how frequently to change which peers are choked */
46
49
    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
50
 
58
61
    RECONNECT_PERIOD_MSEC = 500,
59
62
 
60
63
    /* when many peers are available, keep idle ones this long */
61
 
    MIN_UPLOAD_IDLE_SECS = ( 30 ),
 
64
    MIN_UPLOAD_IDLE_SECS = ( 60 ),
62
65
 
63
66
    /* when few peers are available, keep idle ones this long */
64
67
    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
65
68
 
66
69
    /* max # of peers to ask fer per torrent per reconnect pulse */
67
 
    MAX_RECONNECTIONS_PER_PULSE = 16,
 
70
    MAX_RECONNECTIONS_PER_PULSE = 8,
68
71
 
69
72
    /* max number of peers to ask for per second overall.
70
73
    * this throttle is to avoid overloading the router */
71
 
    MAX_CONNECTIONS_PER_SECOND = 32,
 
74
    MAX_CONNECTIONS_PER_SECOND = 16,
72
75
 
73
76
    /* number of bad pieces a peer is allowed to send before we ban them */
74
77
    MAX_BAD_PIECES_PER_PEER = 5,
112
115
 */
113
116
struct peer_atom
114
117
{
 
118
    tr_peer   * peer;        /* will be NULL if not connected */
115
119
    uint8_t     from;
116
120
    uint8_t     flags;       /* these match the added_f flags */
117
121
    uint8_t     myflags;     /* flags that aren't defined in added_f */
121
125
    tr_address  addr;
122
126
    time_t      time;        /* when the peer's connection status last changed */
123
127
    time_t      piece_data_time;
 
128
 
 
129
    /* similar to a TTL field, but less rigid --
 
130
     * if the swarm is small, the atom will be kept past this date. */
 
131
    time_t      shelf_date;
124
132
};
125
133
 
126
134
static tr_bool
131
139
        && ( tr_isAddress( &atom->addr ) );
132
140
}
133
141
 
134
 
struct tr_blockIterator
135
 
{
136
 
    time_t expirationDate;
137
 
    struct tr_torrent_peers * t;
138
 
    tr_block_index_t blockIndex, blockCount, *blocks;
139
 
    tr_piece_index_t pieceIndex, pieceCount, *pieces;
 
142
static const char*
 
143
tr_atomAddrStr( const struct peer_atom * atom )
 
144
{
 
145
    return tr_peerIoAddrStr( &atom->addr, atom->port );
 
146
}
 
147
 
 
148
struct block_request
 
149
{
 
150
    tr_block_index_t block;
 
151
    tr_peer * peer;
 
152
    time_t sentAt;
 
153
};
 
154
 
 
155
struct weighted_piece
 
156
{
 
157
    tr_piece_index_t index;
 
158
    int16_t salt;
 
159
    int16_t requestCount;
140
160
};
141
161
 
142
162
typedef struct tr_torrent_peers
143
163
{
144
 
    struct event               refillTimer;
145
 
 
146
164
    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
147
165
    tr_ptrArray                pool; /* struct peer_atom */
148
166
    tr_ptrArray                peers; /* tr_peer */
150
168
 
151
169
    tr_torrent               * tor;
152
170
    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
153
 
    struct tr_blockIterator  * refillQueue; /* used in refillPulse() */
154
171
    struct tr_peerMgr        * manager;
155
 
    int                      * pendingRequestCount;
156
172
 
157
173
    tr_bool                    isRunning;
 
174
    tr_bool                    needsCompletenessCheck;
 
175
 
 
176
    struct block_request     * requests;
 
177
    int                        requestsSort;
 
178
    int                        requestCount;
 
179
    int                        requestAlloc;
 
180
 
 
181
    struct weighted_piece    * pieces;
 
182
    int                        piecesSort;
 
183
    int                        pieceCount;
 
184
 
 
185
    tr_bool                    isInEndgame;
158
186
}
159
187
Torrent;
160
188
 
166
194
    tr_timer        * rechokeTimer;
167
195
    tr_timer        * reconnectTimer;
168
196
    tr_timer        * refillUpkeepTimer;
 
197
    tr_timer        * atomTimer;
169
198
};
170
199
 
171
200
#define tordbg( t, ... ) \
172
201
    do { \
173
202
        if( tr_deepLoggingIsActive( ) ) \
174
 
            tr_deepLog( __FILE__, __LINE__, t->tor->info.name, __VA_ARGS__ ); \
 
203
            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
175
204
    } while( 0 )
176
205
 
177
206
#define dbgmsg( ... ) \
248
277
}
249
278
 
250
279
static int
251
 
comparePeerAtoms( const void * va, const void * vb )
 
280
compareAtomsByAddress( const void * va, const void * vb )
252
281
{
253
282
    const struct peer_atom * b = vb;
254
283
 
261
290
***
262
291
**/
263
292
 
 
293
const tr_address *
 
294
tr_peerAddress( const tr_peer * peer )
 
295
{
 
296
    return &peer->atom->addr;
 
297
}
 
298
 
264
299
static Torrent*
265
300
getExistingTorrent( tr_peerMgr *    manager,
266
301
                    const uint8_t * hash )
271
306
}
272
307
 
273
308
static int
274
 
peerCompare( const void * va, const void * vb )
275
 
{
276
 
    const tr_peer * a = va;
277
 
    const tr_peer * b = vb;
278
 
 
279
 
    return tr_compareAddresses( &a->addr, &b->addr );
280
 
}
281
 
 
282
 
static int
283
 
peerCompareToAddr( const void * va, const void * vb )
284
 
{
285
 
    const tr_peer * a = va;
286
 
 
287
 
    return tr_compareAddresses( &a->addr, vb );
288
 
}
289
 
 
290
 
static tr_peer*
291
 
getExistingPeer( Torrent          * torrent,
292
 
                 const tr_address * addr )
293
 
{
294
 
    assert( torrentIsLocked( torrent ) );
295
 
    assert( addr );
296
 
 
297
 
    return tr_ptrArrayFindSorted( &torrent->peers, addr, peerCompareToAddr );
 
309
peerCompare( const void * a, const void * b )
 
310
{
 
311
    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
298
312
}
299
313
 
300
314
static struct peer_atom*
307
321
}
308
322
 
309
323
static tr_bool
310
 
peerIsInUse( const Torrent    * ct,
311
 
             const tr_address * addr )
 
324
peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
312
325
{
313
326
    Torrent * t = (Torrent*) ct;
314
327
 
315
328
    assert( torrentIsLocked ( t ) );
316
329
 
317
 
    return getExistingPeer( t, addr )
318
 
        || getExistingHandshake( &t->outgoingHandshakes, addr )
319
 
        || getExistingHandshake( &t->manager->incomingHandshakes, addr );
 
330
    return ( atom->peer != NULL )
 
331
        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
 
332
        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
320
333
}
321
334
 
322
335
static tr_peer*
323
 
peerConstructor( const tr_address * addr )
 
336
peerConstructor( struct peer_atom * atom )
324
337
{
325
 
    tr_peer * p;
326
 
    p = tr_new0( tr_peer, 1 );
327
 
    p->addr = *addr;
328
 
    return p;
 
338
    tr_peer * peer = tr_new0( tr_peer, 1 );
 
339
    tr_bitsetConstructor( &peer->have, 0 );
 
340
    peer->atom = atom;
 
341
    atom->peer = peer;
 
342
    return peer;
329
343
}
330
344
 
331
345
static tr_peer*
332
 
getPeer( Torrent          * torrent,
333
 
         const tr_address * addr )
 
346
getPeer( Torrent * torrent, struct peer_atom * atom )
334
347
{
335
348
    tr_peer * peer;
336
349
 
337
350
    assert( torrentIsLocked( torrent ) );
338
351
 
339
 
    peer = getExistingPeer( torrent, addr );
 
352
    peer = atom->peer;
340
353
 
341
354
    if( peer == NULL )
342
355
    {
343
 
        peer = peerConstructor( addr );
 
356
        peer = peerConstructor( atom );
344
357
        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
345
358
    }
346
359
 
347
360
    return peer;
348
361
}
349
362
 
 
363
static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
 
364
 
350
365
static void
351
 
peerDestructor( tr_peer * peer )
 
366
peerDestructor( Torrent * t, tr_peer * peer )
352
367
{
353
 
    assert( peer );
 
368
    assert( peer != NULL );
 
369
 
 
370
    peerDeclinedAllRequests( t, peer );
354
371
 
355
372
    if( peer->msgs != NULL )
356
373
    {
361
378
    tr_peerIoClear( peer->io );
362
379
    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
363
380
 
364
 
    tr_bitfieldFree( peer->have );
 
381
    tr_bitsetDestructor( &peer->have );
365
382
    tr_bitfieldFree( peer->blame );
366
383
    tr_free( peer->client );
 
384
    peer->atom->peer = NULL;
367
385
 
368
386
    tr_free( peer );
369
387
}
377
395
    assert( torrentIsLocked( t ) );
378
396
    assert( atom );
379
397
 
380
 
    atom->time = time( NULL );
 
398
    atom->time = tr_time( );
381
399
 
382
400
    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
383
401
    assert( removed == peer );
384
 
    peerDestructor( removed );
 
402
    peerDestructor( t, removed );
385
403
}
386
404
 
387
405
static void
391
409
        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
392
410
}
393
411
 
394
 
static void blockIteratorFree( struct tr_blockIterator ** inout );
395
 
 
396
412
static void
397
413
torrentDestructor( void * vt )
398
414
{
404
420
    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
405
421
    assert( tr_ptrArrayEmpty( &t->peers ) );
406
422
 
407
 
    evtimer_del( &t->refillTimer );
408
 
 
409
 
    blockIteratorFree( &t->refillQueue );
410
423
    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
411
424
    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
412
425
    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
413
426
    tr_ptrArrayDestruct( &t->peers, NULL );
414
427
 
415
 
    tr_free( t->pendingRequestCount );
 
428
    tr_free( t->requests );
 
429
    tr_free( t->pieces );
416
430
    tr_free( t );
417
431
}
418
432
 
419
 
 
420
 
static void refillPulse( int, short, void* );
421
 
 
422
 
static void peerCallbackFunc( void * vpeer,
423
 
                              void * vevent,
424
 
                              void * vt );
 
433
static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
425
434
 
426
435
static Torrent*
427
436
torrentConstructor( tr_peerMgr * manager,
437
446
    t->peers = TR_PTR_ARRAY_INIT;
438
447
    t->webseeds = TR_PTR_ARRAY_INIT;
439
448
    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
440
 
    evtimer_set( &t->refillTimer, refillPulse, t );
441
 
 
 
449
    t->requests = 0;
442
450
 
443
451
    for( i = 0; i < tor->info.webseedCount; ++i )
444
452
    {
450
458
    return t;
451
459
}
452
460
 
453
 
 
454
 
static int bandwidthPulse ( void * vmgr );
455
 
static int rechokePulse   ( void * vmgr );
456
 
static int reconnectPulse ( void * vmgr );
457
 
static int refillUpkeep   ( void * vmgr );
458
 
 
459
461
tr_peerMgr*
460
462
tr_peerMgrNew( tr_session * session )
461
463
{
468
470
static void
469
471
deleteTimers( struct tr_peerMgr * m )
470
472
{
 
473
    if( m->atomTimer )
 
474
        tr_timerFree( &m->atomTimer );
 
475
 
471
476
    if( m->bandwidthTimer )
472
477
        tr_timerFree( &m->bandwidthTimer );
473
478
 
500
505
}
501
506
 
502
507
static int
503
 
clientIsDownloadingFrom( const tr_peer * peer )
 
508
clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
504
509
{
 
510
    if( !tr_torrentHasMetadata( tor ) )
 
511
        return TRUE;
 
512
 
505
513
    return peer->clientIsInterested && !peer->clientIsChoked;
506
514
}
507
515
 
529
537
    return isSeed;
530
538
}
531
539
 
532
 
/****
533
 
*****
534
 
*****  REFILL
535
 
*****
536
 
****/
537
 
 
538
 
static void
539
 
assertValidPiece( Torrent * t, tr_piece_index_t piece )
540
 
{
541
 
    assert( t );
542
 
    assert( t->tor );
543
 
    assert( piece < t->tor->info.pieceCount );
544
 
}
545
 
 
546
 
static int
547
 
getPieceRequests( Torrent * t, tr_piece_index_t piece )
548
 
{
549
 
    assertValidPiece( t, piece );
550
 
 
551
 
    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
552
 
}
553
 
 
554
 
static void
555
 
incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
556
 
{
557
 
    assertValidPiece( t, piece );
558
 
 
559
 
    if( t->pendingRequestCount == NULL )
560
 
        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
561
 
    t->pendingRequestCount[piece]++;
562
 
}
563
 
 
564
 
static void
565
 
decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
566
 
{
567
 
    assertValidPiece( t, piece );
568
 
 
569
 
    if( t->pendingRequestCount )
570
 
        t->pendingRequestCount[piece]--;
571
 
}
572
 
 
573
 
struct tr_refill_piece
574
 
{
575
 
    tr_priority_t    priority;
576
 
    uint32_t         piece;
577
 
    uint32_t         peerCount;
578
 
    int              random;
579
 
    int              pendingRequestCount;
580
 
    int              missingBlockCount;
581
 
};
582
 
 
583
 
static int
584
 
compareRefillPiece( const void * aIn, const void * bIn )
585
 
{
586
 
    const struct tr_refill_piece * a = aIn;
587
 
    const struct tr_refill_piece * b = bIn;
588
 
 
589
 
    /* if one piece has a higher priority, it goes first */
590
 
    if( a->priority != b->priority )
591
 
        return a->priority > b->priority ? -1 : 1;
592
 
 
593
 
    /* have a per-priority endgame */
594
 
    if( a->pendingRequestCount != b->pendingRequestCount )
595
 
        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
596
 
 
597
 
    /* fewer missing pieces goes first */
598
 
    if( a->missingBlockCount != b->missingBlockCount )
599
 
        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
600
 
 
601
 
    /* otherwise if one has fewer peers, it goes first */
602
 
    if( a->peerCount != b->peerCount )
603
 
        return a->peerCount < b->peerCount ? -1 : 1;
604
 
 
605
 
    /* otherwise go with our random seed */
606
 
    if( a->random != b->random )
607
 
        return a->random < b->random ? -1 : 1;
608
 
 
609
 
    return 0;
610
 
}
611
 
 
612
 
static tr_piece_index_t *
613
 
getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
614
 
{
615
 
    const tr_torrent  * tor = t->tor;
616
 
    const tr_info     * inf = &tor->info;
617
 
    tr_piece_index_t    i;
618
 
    tr_piece_index_t    poolSize = 0;
619
 
    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
620
 
    int                 peerCount;
621
 
    const tr_peer    ** peers;
622
 
 
623
 
    assert( torrentIsLocked( t ) );
624
 
 
625
 
    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
626
 
    peerCount = tr_ptrArraySize( &t->peers );
627
 
 
628
 
    /* make a list of the pieces that we want but don't have */
629
 
    for( i = 0; i < inf->pieceCount; ++i )
630
 
        if( !tor->info.pieces[i].dnd
631
 
                && !tr_cpPieceIsComplete( &tor->completion, i ) )
632
 
            pool[poolSize++] = i;
633
 
 
634
 
    /* sort the pool by which to request next */
635
 
    if( poolSize > 1 )
636
 
    {
637
 
        tr_piece_index_t j;
638
 
        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
639
 
 
640
 
        for( j = 0; j < poolSize; ++j )
641
 
        {
642
 
            int k;
643
 
            const tr_piece_index_t piece = pool[j];
644
 
            struct tr_refill_piece * setme = p + j;
645
 
 
646
 
            setme->piece = piece;
647
 
            setme->priority = inf->pieces[piece].priority;
648
 
            setme->peerCount = 0;
649
 
            setme->random = tr_cryptoWeakRandInt( INT_MAX );
650
 
            setme->pendingRequestCount = getPieceRequests( t, piece );
651
 
            setme->missingBlockCount
652
 
                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
653
 
 
654
 
            for( k = 0; k < peerCount; ++k )
 
540
/**
 
541
***  REQUESTS
 
542
***
 
543
*** There are two data structures associated with managing block requests:
 
544
***
 
545
*** 1. Torrent::requests, an array of "struct block_request" which keeps
 
546
***    track of which blocks have been requested, and when, and by which peers.
 
547
***    This is list is used for (a) cancelling requests that have been pending
 
548
***    for too long and (b) avoiding duplicate requests before endgame.
 
549
***
 
550
*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
 
551
***    pieces that we want to request.  It's used to decide which pieces to
 
552
***    return next when tr_peerMgrGetBlockRequests() is called.
 
553
**/
 
554
 
 
555
/**
 
556
*** struct block_request
 
557
**/
 
558
 
 
559
enum
 
560
{
 
561
    REQ_UNSORTED,
 
562
    REQ_SORTED_BY_BLOCK,
 
563
    REQ_SORTED_BY_TIME
 
564
};
 
565
 
 
566
static int
 
567
compareReqByBlock( const void * va, const void * vb )
 
568
{
 
569
    const struct block_request * a = va;
 
570
    const struct block_request * b = vb;
 
571
    if( a->block < b->block ) return -1;
 
572
    if( a->block > b->block ) return 1;
 
573
    return 0;
 
574
}
 
575
 
 
576
static int
 
577
compareReqByTime( const void * va, const void * vb )
 
578
{
 
579
    const struct block_request * a = va;
 
580
    const struct block_request * b = vb;
 
581
    if( a->sentAt < b->sentAt ) return -1;
 
582
    if( a->sentAt > b->sentAt ) return 1;
 
583
    return 0;
 
584
}
 
585
 
 
586
static void
 
587
requestListSort( Torrent * t, int mode )
 
588
{
 
589
    assert( mode==REQ_SORTED_BY_BLOCK || mode==REQ_SORTED_BY_TIME );
 
590
 
 
591
    if( t->requestsSort != mode )
 
592
    {
 
593
        int(*compar)(const void *, const void *);
 
594
 
 
595
        t->requestsSort = mode;
 
596
 
 
597
        switch( mode ) {
 
598
            case REQ_SORTED_BY_BLOCK: compar = compareReqByBlock; break;
 
599
            case REQ_SORTED_BY_TIME: compar = compareReqByTime; break;
 
600
            default: assert( 0 && "unhandled" );
 
601
        }
 
602
 
 
603
        qsort( t->requests, t->requestCount,
 
604
               sizeof( struct block_request ), compar );
 
605
    }
 
606
}
 
607
 
 
608
static void
 
609
requestListAdd( Torrent * t, const time_t now, tr_block_index_t block, tr_peer * peer )
 
610
{
 
611
    struct block_request key;
 
612
 
 
613
    /* ensure enough room is available... */
 
614
    if( t->requestCount + 1 >= t->requestAlloc )
 
615
    {
 
616
        const int CHUNK_SIZE = 128;
 
617
        t->requestAlloc += CHUNK_SIZE;
 
618
        t->requests = tr_renew( struct block_request,
 
619
                                t->requests, t->requestAlloc );
 
620
    }
 
621
 
 
622
    /* populate the record we're inserting */
 
623
    key.block = block;
 
624
    key.peer = peer;
 
625
    key.sentAt = now;
 
626
 
 
627
    /* insert the request to our array... */
 
628
    switch( t->requestsSort )
 
629
    {
 
630
        case REQ_UNSORTED:
 
631
        case REQ_SORTED_BY_TIME:
 
632
            t->requests[t->requestCount++] = key;
 
633
            break;
 
634
 
 
635
        case REQ_SORTED_BY_BLOCK: {
 
636
            tr_bool exact;
 
637
            const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
 
638
                                           sizeof( struct block_request ),
 
639
                                           compareReqByBlock, &exact );
 
640
            assert( !exact );
 
641
            memmove( t->requests + pos + 1,
 
642
                     t->requests + pos,
 
643
                     sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
 
644
            t->requests[pos] = key;
 
645
            break;
 
646
        }
 
647
    }
 
648
}
 
649
 
 
650
static struct block_request *
 
651
requestListLookup( Torrent * t, tr_block_index_t block )
 
652
{
 
653
    struct block_request key;
 
654
    key.block = block;
 
655
 
 
656
    requestListSort( t, REQ_SORTED_BY_BLOCK );
 
657
 
 
658
    return bsearch( &key, t->requests, t->requestCount,
 
659
                    sizeof( struct block_request ),
 
660
                    compareReqByBlock );
 
661
}
 
662
 
 
663
static void
 
664
requestListRemove( Torrent * t, tr_block_index_t block )
 
665
{
 
666
    const struct block_request * b = requestListLookup( t, block );
 
667
    if( b != NULL )
 
668
    {
 
669
        const int pos = b - t->requests;
 
670
        assert( pos < t->requestCount );
 
671
        memmove( t->requests + pos,
 
672
                 t->requests + pos + 1,
 
673
                 sizeof( struct block_request ) * ( --t->requestCount - pos ) );
 
674
    }
 
675
}
 
676
 
 
677
/**
 
678
*** struct weighted_piece
 
679
**/
 
680
 
 
681
enum
 
682
{
 
683
    PIECES_UNSORTED,
 
684
    PIECES_SORTED_BY_INDEX,
 
685
    PIECES_SORTED_BY_WEIGHT
 
686
};
 
687
 
 
688
const tr_torrent * weightTorrent;
 
689
 
 
690
/* we try to create a "weight" s.t. high-priority pieces come before others,
 
691
 * and that partially-complete pieces come before empty ones. */
 
692
static int
 
693
comparePieceByWeight( const void * va, const void * vb )
 
694
{
 
695
    const struct weighted_piece * a = va;
 
696
    const struct weighted_piece * b = vb;
 
697
    int ia, ib, missing, pending;
 
698
    const tr_torrent * tor = weightTorrent;
 
699
 
 
700
    /* primary key: weight */
 
701
    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
 
702
    pending = a->requestCount;
 
703
    ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
 
704
    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
 
705
    pending = b->requestCount;
 
706
    ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
 
707
    if( ia < ib ) return -1;
 
708
    if( ia > ib ) return 1;
 
709
 
 
710
    /* secondary key: higher priorities go first */
 
711
    ia = tor->info.pieces[a->index].priority;
 
712
    ib = tor->info.pieces[b->index].priority;
 
713
    if( ia > ib ) return -1;
 
714
    if( ia < ib ) return 1;
 
715
 
 
716
    /* tertiary key: random */
 
717
    return a->salt - b->salt;
 
718
}
 
719
 
 
720
static int
 
721
comparePieceByIndex( const void * va, const void * vb )
 
722
{
 
723
    const struct weighted_piece * a = va;
 
724
    const struct weighted_piece * b = vb;
 
725
    if( a->index < b->index ) return -1;
 
726
    if( a->index > b->index ) return 1;
 
727
    return 0;
 
728
}
 
729
 
 
730
static void
 
731
pieceListSort( Torrent * t, int mode )
 
732
{
 
733
    int(*compar)(const void *, const void *);
 
734
 
 
735
    assert( mode==PIECES_SORTED_BY_INDEX
 
736
         || mode==PIECES_SORTED_BY_WEIGHT );
 
737
 
 
738
    if( t->piecesSort != mode )
 
739
    {
 
740
        t->piecesSort = mode;
 
741
 
 
742
        switch( mode ) {
 
743
            case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break;
 
744
            case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break;
 
745
            default: assert( 0 && "unhandled" );  break;
 
746
        }
 
747
 
 
748
        weightTorrent = t->tor;
 
749
        qsort( t->pieces, t->pieceCount,
 
750
               sizeof( struct weighted_piece ), compar );
 
751
    }
 
752
 
 
753
    /* Also, as long as we've got the pieces sorted by weight,
 
754
     * let's also update t.isInEndgame */
 
755
    if( t->piecesSort == PIECES_SORTED_BY_WEIGHT )
 
756
    {
 
757
        tr_bool endgame = TRUE;
 
758
 
 
759
        if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) )
 
760
        {
 
761
            const tr_completion * cp = &t->tor->completion;
 
762
            const struct weighted_piece * p = t->pieces;
 
763
            const int pending = p->requestCount;
 
764
            const int missing = tr_cpMissingBlocksInPiece( cp, p->index );
 
765
            endgame = pending >= missing;
 
766
        }
 
767
 
 
768
        t->isInEndgame = endgame;
 
769
    }
 
770
}
 
771
 
 
772
static struct weighted_piece *
 
773
pieceListLookup( Torrent * t, tr_piece_index_t index )
 
774
{
 
775
    struct weighted_piece key;
 
776
    key.index = index;
 
777
 
 
778
    pieceListSort( t, PIECES_SORTED_BY_INDEX );
 
779
 
 
780
    return bsearch( &key, t->pieces, t->pieceCount,
 
781
                    sizeof( struct weighted_piece ),
 
782
                    comparePieceByIndex );
 
783
}
 
784
 
 
785
static void
 
786
pieceListRebuild( Torrent * t )
 
787
{
 
788
    if( !tr_torrentIsSeed( t->tor ) )
 
789
    {
 
790
        tr_piece_index_t i;
 
791
        tr_piece_index_t * pool;
 
792
        tr_piece_index_t poolCount = 0;
 
793
        const tr_torrent * tor = t->tor;
 
794
        const tr_info * inf = tr_torrentInfo( tor );
 
795
        struct weighted_piece * pieces;
 
796
        int pieceCount;
 
797
 
 
798
        /* build the new list */
 
799
        pool = tr_new( tr_piece_index_t, inf->pieceCount );
 
800
        for( i=0; i<inf->pieceCount; ++i )
 
801
            if( !inf->pieces[i].dnd )
 
802
                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
 
803
                    pool[poolCount++] = i;
 
804
        pieceCount = poolCount;
 
805
        pieces = tr_new0( struct weighted_piece, pieceCount );
 
806
        for( i=0; i<poolCount; ++i ) {
 
807
            struct weighted_piece * piece = pieces + i;
 
808
            piece->index = pool[i];
 
809
            piece->requestCount = 0;
 
810
            piece->salt = tr_cryptoWeakRandInt( 255 );
 
811
        }
 
812
 
 
813
        /* if we already had a list of pieces, merge it into
 
814
         * the new list so we don't lose its requestCounts */
 
815
        if( t->pieces != NULL )
 
816
        {
 
817
            struct weighted_piece * o = t->pieces;
 
818
            struct weighted_piece * oend = o + t->pieceCount;
 
819
            struct weighted_piece * n = pieces;
 
820
            struct weighted_piece * nend = n + pieceCount;
 
821
 
 
822
            pieceListSort( t, PIECES_SORTED_BY_INDEX );
 
823
 
 
824
            while( o!=oend && n!=nend ) {
 
825
                if( o->index < n->index )
 
826
                    ++o;
 
827
                else if( o->index > n->index )
 
828
                    ++n;
 
829
                else
 
830
                    *n++ = *o++;
 
831
            }
 
832
 
 
833
            tr_free( t->pieces );
 
834
        }
 
835
 
 
836
        t->pieces = pieces;
 
837
        t->pieceCount = pieceCount;
 
838
        t->piecesSort = PIECES_SORTED_BY_INDEX;
 
839
 
 
840
        /* cleanup */
 
841
        tr_free( pool );
 
842
    }
 
843
}
 
844
 
 
845
static void
 
846
pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
 
847
{
 
848
    struct weighted_piece * p = pieceListLookup( t, piece );
 
849
 
 
850
    if( p != NULL )
 
851
    {
 
852
        const int pos = p - t->pieces;
 
853
 
 
854
        memmove( t->pieces + pos,
 
855
                 t->pieces + pos + 1,
 
856
                 sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) );
 
857
 
 
858
        if( t->pieceCount == 0 )
 
859
        {
 
860
            tr_free( t->pieces );
 
861
            t->pieces = NULL;
 
862
        }
 
863
    }
 
864
}
 
865
 
 
866
static void
 
867
pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
 
868
{
 
869
    struct weighted_piece * p;
 
870
    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
 
871
 
 
872
    if(( p = pieceListLookup( t, index )))
 
873
        if( p->requestCount > 0 )
 
874
            --p->requestCount;
 
875
 
 
876
    /* note: this invalidates the weighted.piece.weight field,
 
877
     * but that's OK since the call to pieceListLookup ensured
 
878
     * that we were sorted by index anyway.. next time we resort
 
879
     * by weight, pieceListSort() will update the weights */
 
880
}
 
881
 
 
882
/**
 
883
***
 
884
**/
 
885
 
 
886
void
 
887
tr_peerMgrRebuildRequests( tr_torrent * tor )
 
888
{
 
889
    assert( tr_isTorrent( tor ) );
 
890
 
 
891
    pieceListRebuild( tor->torrentPeers );
 
892
}
 
893
 
 
894
void
 
895
tr_peerMgrGetNextRequests( tr_torrent           * tor,
 
896
                           tr_peer              * peer,
 
897
                           int                    numwant,
 
898
                           tr_block_index_t     * setme,
 
899
                           int                  * numgot )
 
900
{
 
901
    int i;
 
902
    int got;
 
903
    Torrent * t;
 
904
    struct weighted_piece * pieces;
 
905
    const tr_bitset * have = &peer->have;
 
906
    const time_t now = tr_time( );
 
907
 
 
908
    /* sanity clause */
 
909
    assert( tr_isTorrent( tor ) );
 
910
    assert( numwant > 0 );
 
911
 
 
912
    /* walk through the pieces and find blocks that should be requested */
 
913
    got = 0;
 
914
    t = tor->torrentPeers;
 
915
 
 
916
    /* prep the pieces list */
 
917
    if( t->pieces == NULL )
 
918
        pieceListRebuild( t );
 
919
    pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
 
920
 
 
921
    pieces = t->pieces;
 
922
    for( i=0; i<t->pieceCount && got<numwant; ++i )
 
923
    {
 
924
        struct weighted_piece * p = pieces + i;
 
925
 
 
926
        /* if the peer has this piece that we want... */
 
927
        if( tr_bitsetHasFast( have, p->index ) )
 
928
        {
 
929
            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
 
930
            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
 
931
 
 
932
            for( ; b!=e && got<numwant; ++b )
655
933
            {
656
 
                const tr_peer * peer = peers[k];
657
 
                if( peer->peerIsInterested
658
 
                        && !peer->clientIsChoked
659
 
                        && tr_bitfieldHas( peer->have, piece ) )
660
 
                    ++setme->peerCount;
 
934
                struct block_request * breq;
 
935
 
 
936
                /* don't request blocks we've already got */
 
937
                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
 
938
                    continue;
 
939
 
 
940
                /* don't request blocks we've already requested (FIXME) */
 
941
                breq = requestListLookup( t, b );
 
942
                if( breq != NULL ) {
 
943
                    assert( breq->peer != NULL );
 
944
                    if( breq->peer == peer ) continue;
 
945
                    if( !t->isInEndgame ) continue;
 
946
                }
 
947
 
 
948
                setme[got++] = b;
 
949
 
 
950
                /* update our own tables */
 
951
                if( breq == NULL )
 
952
                    requestListAdd( t, now, b, peer );
 
953
                ++p->requestCount;
661
954
            }
662
955
        }
663
 
 
664
 
        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
665
 
               compareRefillPiece );
666
 
 
667
 
        for( j = 0; j < poolSize; ++j )
668
 
            pool[j] = p[j].piece;
669
 
 
670
 
        tr_free( p );
671
 
    }
672
 
 
673
 
    *pieceCount = poolSize;
674
 
    return pool;
675
 
}
676
 
 
677
 
static struct tr_blockIterator*
678
 
blockIteratorNew( Torrent * t )
679
 
{
680
 
    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
681
 
    i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS;
682
 
    i->t = t;
683
 
    i->pieces = getPreferredPieces( t, &i->pieceCount );
684
 
    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
685
 
    tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount );
686
 
    return i;
687
 
}
688
 
 
689
 
static tr_bool
690
 
blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
691
 
{
692
 
    tr_bool found;
693
 
    Torrent * t = i->t;
694
 
    tr_torrent * tor = t->tor;
695
 
 
696
 
    while( ( i->blockIndex == i->blockCount )
697
 
        && ( i->pieceIndex < i->pieceCount ) )
698
 
    {
699
 
        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
700
 
        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
701
 
        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
702
 
        tr_block_index_t block;
703
 
 
704
 
        assert( index < tor->info.pieceCount );
705
 
 
706
 
        i->blockCount = 0;
707
 
        i->blockIndex = 0;
708
 
        for( block=b; block!=e; ++block )
709
 
            if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) )
710
 
                i->blocks[i->blockCount++] = block;
711
 
    }
712
 
 
713
 
    assert( i->blockCount <= tor->blockCountInPiece );
714
 
 
715
 
    if(( found = ( i->blockIndex < i->blockCount )))
716
 
        *setme = i->blocks[i->blockIndex++];
717
 
 
718
 
    return found;
719
 
}
720
 
 
721
 
static void
722
 
blockIteratorSkipCurrentPiece( struct tr_blockIterator * i )
723
 
{
724
 
    i->blockIndex = i->blockCount;
725
 
}
726
 
 
727
 
static void
728
 
blockIteratorFree( struct tr_blockIterator ** inout )
729
 
{
730
 
    struct tr_blockIterator * it = *inout;
731
 
 
732
 
    if( it != NULL )
733
 
    {
734
 
        tr_free( it->blocks );
735
 
        tr_free( it->pieces );
736
 
        tr_free( it );
737
 
    }
738
 
 
739
 
    *inout = NULL;
740
 
}
741
 
 
742
 
static tr_peer**
743
 
getPeersUploadingToClient( Torrent * t,
744
 
                           int *     setmeCount )
745
 
{
746
 
    int j;
747
 
    int peerCount = 0;
748
 
    int retCount = 0;
749
 
    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
750
 
    tr_peer ** ret = tr_new( tr_peer *, peerCount );
751
 
 
752
 
    j = 0; /* this is a temporary test to make sure we walk through all the peers */
753
 
    if( peerCount )
754
 
    {
755
 
        /* Get a list of peers we're downloading from.
756
 
           Pick a different starting point each time so all peers
757
 
           get a chance at being the first in line */
758
 
        const int fencepost = tr_cryptoWeakRandInt( peerCount );
759
 
        int i = fencepost;
760
 
        do {
761
 
            if( clientIsDownloadingFrom( peers[i] ) )
762
 
                ret[retCount++] = peers[i];
763
 
            i = ( i + 1 ) % peerCount;
764
 
            ++j;
765
 
        } while( i != fencepost );
766
 
    }
767
 
    assert( j == peerCount );
768
 
    *setmeCount = retCount;
769
 
    return ret;
770
 
}
771
 
 
772
 
static uint32_t
773
 
getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
774
 
{
775
 
    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
776
 
    const uint64_t blockPos = tor->blockSize * b;
777
 
    assert( blockPos >= piecePos );
778
 
    return (uint32_t)( blockPos - piecePos );
779
 
}
780
 
 
 
956
    }
 
957
 
 
958
    /* In most cases we've just changed the weights of a small number of pieces.
 
959
     * So rather than qsort()ing the entire array, it's faster to sort just the
 
960
     * changed ones, then do a standard merge-two-sorted-arrays pass on the
 
961
     * changed and unchanged pieces. */
 
962
    if( got > 0 )
 
963
    {
 
964
        struct weighted_piece * p;
 
965
        struct weighted_piece * pieces;
 
966
        struct weighted_piece * a = t->pieces;
 
967
        struct weighted_piece * a_end = t->pieces + i;
 
968
        struct weighted_piece * b = a_end;
 
969
        struct weighted_piece * b_end = t->pieces + t->pieceCount;
 
970
 
 
971
        /* resort the pieces that we changed */
 
972
        weightTorrent = t->tor;
 
973
        qsort( a, a_end-a, sizeof( struct weighted_piece ), comparePieceByWeight );
 
974
 
 
975
        /* allocate a new array */
 
976
        p = pieces = tr_new( struct weighted_piece, t->pieceCount );
 
977
 
 
978
        /* merge the two sorted arrays into this new array */
 
979
        weightTorrent = t->tor;
 
980
        while( a!=a_end && b!=b_end )
 
981
            *p++ = comparePieceByWeight( a, b ) < 0 ? *a++ : *b++;
 
982
        while( a!=a_end ) *p++ = *a++;
 
983
        while( b!=b_end ) *p++ = *b++;
 
984
 
 
985
#if 0
 
986
        /* make sure we did it right */
 
987
        assert( p - pieces == t->pieceCount );
 
988
        for( it=pieces; it+1<p; ++it )
 
989
            assert( it->weight <= it[1].weight );
 
990
#endif
 
991
 
 
992
        /* update */
 
993
        tr_free( t->pieces );
 
994
        t->pieces = pieces;
 
995
    }
 
996
 
 
997
    *numgot = got;
 
998
}
 
999
 
 
1000
tr_bool
 
1001
tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
 
1002
                          const tr_peer     * peer,
 
1003
                          tr_block_index_t    block )
 
1004
{
 
1005
    const Torrent * t = tor->torrentPeers;
 
1006
    const struct block_request * b = requestListLookup( (Torrent*)t, block );
 
1007
    if( b == NULL ) return FALSE;
 
1008
    if( b->peer == peer ) return TRUE;
 
1009
    if( t->isInEndgame ) return TRUE;
 
1010
    return FALSE;
 
1011
}
 
1012
 
 
1013
/* cancel requests that are too old */
781
1014
static int
782
1015
refillUpkeep( void * vmgr )
783
1016
{
784
 
    tr_torrent * tor = NULL;
785
 
    tr_peerMgr * mgr = vmgr;
786
1017
    time_t now;
 
1018
    time_t too_old;
 
1019
    tr_torrent * tor;
 
1020
    tr_peerMgr * mgr = vmgr;
787
1021
    managerLock( mgr );
788
1022
 
789
 
    now = time( NULL );
790
 
    while(( tor = tr_torrentNext( mgr->session, tor ))) {
 
1023
    now = tr_time( );
 
1024
    too_old = now - REQUEST_TTL_SECS;
 
1025
 
 
1026
    tor = NULL;
 
1027
    while(( tor = tr_torrentNext( mgr->session, tor )))
 
1028
    {
791
1029
        Torrent * t = tor->torrentPeers;
792
 
        if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) {
793
 
            tordbg( t, "refill queue is past its shelf date; discarding." );
794
 
            blockIteratorFree( &t->refillQueue );
 
1030
        const int n = t->requestCount;
 
1031
        if( n > 0 )
 
1032
        {
 
1033
            int keepCount = 0;
 
1034
            int cancelCount = 0;
 
1035
            struct block_request * keep = tr_new( struct block_request, n );
 
1036
            struct block_request * cancel = tr_new( struct block_request, n );
 
1037
            const struct block_request * it;
 
1038
            const struct block_request * end;
 
1039
 
 
1040
            for( it=t->requests, end=it+n; it!=end; ++it )
 
1041
                if( it->sentAt <= too_old )
 
1042
                    cancel[cancelCount++] = *it;
 
1043
                else
 
1044
                    keep[keepCount++] = *it;
 
1045
 
 
1046
            /* prune out the ones we aren't keeping */
 
1047
            tr_free( t->requests );
 
1048
            t->requests = keep;
 
1049
            t->requestCount = keepCount;
 
1050
            t->requestAlloc = n;
 
1051
 
 
1052
            /* send cancel messages for all the "cancel" ones */
 
1053
            for( it=cancel, end=it+cancelCount; it!=end; ++it )
 
1054
                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) )
 
1055
                    tr_peerMsgsCancel( it->peer->msgs, it->block );
 
1056
 
 
1057
            /* decrement the pending request counts for the timed-out blocks */
 
1058
            for( it=cancel, end=it+cancelCount; it!=end; ++it )
 
1059
                pieceListRemoveRequest( t, it->block );
 
1060
 
 
1061
            /* cleanup loop */
 
1062
            tr_free( cancel );
795
1063
        }
796
1064
    }
797
1065
 
800
1068
}
801
1069
 
802
1070
static void
803
 
refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )
804
 
{
805
 
    tr_block_index_t block;
806
 
    int peerCount;
807
 
    int webseedCount;
808
 
    tr_peer ** peers;
809
 
    tr_webseed ** webseeds;
810
 
    Torrent * t = vtorrent;
811
 
    tr_torrent * tor = t->tor;
812
 
    tr_bool hasNext = TRUE;
813
 
 
814
 
    if( !t->isRunning )
815
 
        return;
816
 
    if( tr_torrentIsSeed( t->tor ) )
817
 
        return;
818
 
 
819
 
    torrentLock( t );
820
 
    tordbg( t, "Refilling Request Buffers..." );
821
 
 
822
 
    if( t->refillQueue == NULL )
823
 
        t->refillQueue = blockIteratorNew( t );
824
 
 
825
 
    peers = getPeersUploadingToClient( t, &peerCount );
826
 
    webseedCount = tr_ptrArraySize( &t->webseeds );
827
 
    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
828
 
                          webseedCount * sizeof( tr_webseed* ) );
829
 
 
830
 
    while( ( webseedCount || peerCount )
831
 
        && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )
832
 
    {
833
 
        int j;
834
 
        tr_bool handled = FALSE;
835
 
 
836
 
        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
837
 
        const uint32_t offset = getBlockOffsetInPiece( tor, block );
838
 
        const uint32_t length = tr_torBlockCountBytes( tor, block );
839
 
 
840
 
        assert( block < tor->blockCount );
841
 
 
842
 
        /* find a peer who can ask for this block */
843
 
        for( j=0; !handled && j<peerCount; )
844
 
        {
845
 
            const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
846
 
            switch( val )
847
 
            {
848
 
                case TR_ADDREQ_FULL:
849
 
                case TR_ADDREQ_CLIENT_CHOKED:
850
 
                    peers[j] = peers[--peerCount];
851
 
                    break;
852
 
 
853
 
                case TR_ADDREQ_MISSING:
854
 
                case TR_ADDREQ_DUPLICATE:
855
 
                    ++j;
856
 
                    break;
857
 
 
858
 
                case TR_ADDREQ_OK:
859
 
                    incrementPieceRequests( t, index );
860
 
                    handled = TRUE;
861
 
                    break;
862
 
 
863
 
                default:
864
 
                    assert( 0 && "unhandled value" );
865
 
                    break;
866
 
            }
867
 
        }
868
 
 
869
 
        /* maybe one of the webseeds can do it */
870
 
        for( j=0; !handled && j<webseedCount; )
871
 
        {
872
 
            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
873
 
            switch( val )
874
 
            {
875
 
                case TR_ADDREQ_FULL:
876
 
                    webseeds[j] = webseeds[--webseedCount];
877
 
                    break;
878
 
 
879
 
                case TR_ADDREQ_OK:
880
 
                    incrementPieceRequests( t, index );
881
 
                    handled = TRUE;
882
 
                    break;
883
 
 
884
 
                default:
885
 
                    assert( 0 && "unhandled value" );
886
 
                    break;
887
 
            }
888
 
        }
889
 
 
890
 
        if( !handled )
891
 
            blockIteratorSkipCurrentPiece( t->refillQueue );
892
 
    }
893
 
 
894
 
    /* cleanup */
895
 
    tr_free( webseeds );
896
 
    tr_free( peers );
897
 
 
898
 
    if( !hasNext ) {
899
 
        tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );
900
 
        blockIteratorFree( &t->refillQueue );
901
 
    }
902
 
 
903
 
    torrentUnlock( t );
904
 
}
905
 
 
906
 
static void
907
 
broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
908
 
{
909
 
    size_t i;
910
 
    size_t peerCount;
911
 
    tr_peer ** peers;
912
 
 
913
 
    assert( torrentIsLocked( t ) );
914
 
 
915
 
    tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
916
 
 
917
 
    peerCount = tr_ptrArraySize( &t->peers );
918
 
    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
919
 
    for( i=0; i<peerCount; ++i )
920
 
        if( peers[i]->msgs )
921
 
            tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
922
 
}
923
 
 
924
 
static void
925
1071
addStrike( Torrent * t, tr_peer * peer )
926
1072
{
927
1073
    tordbg( t, "increasing peer %s strike count to %d",
928
 
            tr_peerIoAddrStr( &peer->addr,
929
 
                              peer->port ), peer->strikes + 1 );
 
1074
            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
930
1075
 
931
1076
    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
932
1077
    {
933
1078
        struct peer_atom * atom = peer->atom;
934
1079
        atom->myflags |= MYFLAG_BANNED;
935
1080
        peer->doPurge = 1;
936
 
        tordbg( t, "banning peer %s", tr_peerIoAddrStr( &atom->addr, atom->port ) );
 
1081
        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
937
1082
    }
938
1083
}
939
1084
 
940
1085
static void
941
 
gotBadPiece( Torrent *        t,
942
 
             tr_piece_index_t pieceIndex )
 
1086
gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
943
1087
{
944
1088
    tr_torrent *   tor = t->tor;
945
1089
    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
949
1093
}
950
1094
 
951
1095
static void
952
 
refillSoon( Torrent * t )
953
 
{
954
 
    if( !evtimer_pending( &t->refillTimer, NULL ) )
955
 
        tr_timerAdd( &t->refillTimer, 0, REFILL_PERIOD_MSEC );
956
 
}
957
 
 
958
 
static void
959
1096
peerSuggestedPiece( Torrent            * t UNUSED,
960
1097
                    tr_peer            * peer UNUSED,
961
1098
                    tr_piece_index_t     pieceIndex UNUSED,
1004
1141
}
1005
1142
 
1006
1143
static void
 
1144
decrementDownloadedCount( tr_torrent * tor, uint32_t byteCount )
 
1145
{
 
1146
    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
 
1147
}
 
1148
 
 
1149
static void
 
1150
clientGotUnwantedBlock( tr_torrent * tor, tr_block_index_t block )
 
1151
{
 
1152
    decrementDownloadedCount( tor, tr_torBlockCountBytes( tor, block ) );
 
1153
}
 
1154
 
 
1155
static void
 
1156
removeRequestFromTables( Torrent * t, tr_block_index_t block )
 
1157
{
 
1158
    requestListRemove( t, block );
 
1159
    pieceListRemoveRequest( t, block );
 
1160
}
 
1161
 
 
1162
/* peer choked us, or maybe it disconnected.
 
1163
   either way we need to remove all its requests */
 
1164
static void
 
1165
peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
 
1166
{
 
1167
    int i, n;
 
1168
    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
 
1169
 
 
1170
    for( i=n=0; i<t->requestCount; ++i )
 
1171
        if( peer == t->requests[i].peer )
 
1172
            blocks[n++] = t->requests[i].block;
 
1173
 
 
1174
    for( i=0; i<n; ++i )
 
1175
        removeRequestFromTables( t, blocks[i] );
 
1176
 
 
1177
    tr_free( blocks );
 
1178
}
 
1179
 
 
1180
static void
1007
1181
peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1008
1182
{
1009
1183
    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1027
1201
            }
1028
1202
            break;
1029
1203
 
1030
 
        case TR_PEER_NEED_REQ:
1031
 
            refillSoon( t );
1032
 
            break;
1033
 
 
1034
 
        case TR_PEER_CANCEL:
1035
 
            decrementPieceRequests( t, e->pieceIndex );
1036
 
            break;
1037
 
 
1038
1204
        case TR_PEER_PEER_GOT_DATA:
1039
1205
        {
1040
 
            const time_t now = time( NULL );
 
1206
            const time_t now = tr_time( );
1041
1207
            tr_torrent * tor = t->tor;
1042
1208
 
1043
1209
            tr_torrentSetActivityDate( tor, now );
1060
1226
            break;
1061
1227
        }
1062
1228
 
 
1229
        case TR_PEER_CLIENT_GOT_REJ:
 
1230
            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ) );
 
1231
            break;
 
1232
 
 
1233
        case TR_PEER_CLIENT_GOT_CHOKE:
 
1234
            peerDeclinedAllRequests( t, peer );
 
1235
            break;
 
1236
 
 
1237
        case TR_PEER_CLIENT_GOT_PORT:
 
1238
            if( peer )
 
1239
                peer->atom->port = e->port;
 
1240
            break;
 
1241
 
1063
1242
        case TR_PEER_CLIENT_GOT_SUGGEST:
1064
1243
            if( peer )
1065
1244
                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1072
1251
 
1073
1252
        case TR_PEER_CLIENT_GOT_DATA:
1074
1253
        {
1075
 
            const time_t now = time( NULL );
 
1254
            const time_t now = tr_time( );
1076
1255
            tr_torrent * tor = t->tor;
1077
1256
 
1078
1257
            tr_torrentSetActivityDate( tor, now );
1105
1284
            {
1106
1285
                struct peer_atom * atom = peer->atom;
1107
1286
                if( e->progress >= 1.0 ) {
1108
 
                    tordbg( t, "marking peer %s as a seed", tr_peerIoAddrStr( &atom->addr, atom->port ) );
 
1287
                    tordbg( t, "marking peer %s as a seed",
 
1288
                            tr_atomAddrStr( atom ) );
1109
1289
                    atom->flags |= ADDED_F_SEED_FLAG;
1110
1290
                }
1111
1291
            }
1115
1295
        case TR_PEER_CLIENT_GOT_BLOCK:
1116
1296
        {
1117
1297
            tr_torrent * tor = t->tor;
1118
 
 
1119
1298
            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1120
1299
 
1121
 
            tr_cpBlockAdd( &tor->completion, block );
1122
 
            tr_torrentSetDirty( tor );
1123
 
            decrementPieceRequests( t, e->pieceIndex );
1124
 
 
1125
 
            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
1126
 
 
1127
 
            if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1128
 
            {
1129
 
                const tr_piece_index_t p = e->pieceIndex;
1130
 
                const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1131
 
 
1132
 
                if( !ok )
1133
 
                {
1134
 
                    tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1135
 
                               (unsigned long)p );
1136
 
                }
1137
 
 
1138
 
                tr_torrentSetHasPiece( tor, p, ok );
1139
 
                tr_torrentSetPieceChecked( tor, p, TRUE );
1140
 
                tr_peerMgrSetBlame( tor, p, ok );
1141
 
 
1142
 
                if( !ok )
1143
 
                {
1144
 
                    gotBadPiece( t, p );
1145
 
                }
1146
 
                else
1147
 
                {
1148
 
                    int i;
1149
 
                    int peerCount;
1150
 
                    tr_peer ** peers;
1151
 
                    tr_file_index_t fileIndex;
1152
 
 
1153
 
                    peerCount = tr_ptrArraySize( &t->peers );
1154
 
                    peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1155
 
                    for( i=0; i<peerCount; ++i )
1156
 
                        tr_peerMsgsHave( peers[i]->msgs, p );
1157
 
 
1158
 
                    for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex )
1159
 
                    {
1160
 
                        const tr_file * file = &tor->info.files[fileIndex];
1161
 
                        if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) && tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1162
 
                        {
1163
 
                            char * path = tr_buildPath( tor->downloadDir, file->name, NULL );
1164
 
                            tordbg( t, "closing recently-completed file \"%s\"", path );
1165
 
                            tr_fdFileClose( path );
1166
 
                            tr_free( path );
 
1300
            requestListRemove( t, block );
 
1301
            pieceListRemoveRequest( t, block );
 
1302
 
 
1303
            if( tr_cpBlockIsComplete( &tor->completion, block ) )
 
1304
            {
 
1305
                tordbg( t, "we have this block already..." );
 
1306
                clientGotUnwantedBlock( tor, block );
 
1307
            }
 
1308
            else
 
1309
            {
 
1310
                tr_cpBlockAdd( &tor->completion, block );
 
1311
                tr_torrentSetDirty( tor );
 
1312
 
 
1313
                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
 
1314
                {
 
1315
                    const tr_piece_index_t p = e->pieceIndex;
 
1316
                    const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
 
1317
 
 
1318
                    if( !ok )
 
1319
                    {
 
1320
                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
 
1321
                                   (unsigned long)p );
 
1322
                    }
 
1323
 
 
1324
                    tr_torrentSetHasPiece( tor, p, ok );
 
1325
                    tr_torrentSetPieceChecked( tor, p, TRUE );
 
1326
                    tr_peerMgrSetBlame( tor, p, ok );
 
1327
 
 
1328
                    if( !ok )
 
1329
                    {
 
1330
                        gotBadPiece( t, p );
 
1331
                    }
 
1332
                    else
 
1333
                    {
 
1334
                        int i;
 
1335
                        int peerCount;
 
1336
                        tr_peer ** peers;
 
1337
                        tr_file_index_t fileIndex;
 
1338
 
 
1339
                        peerCount = tr_ptrArraySize( &t->peers );
 
1340
                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
 
1341
                        for( i=0; i<peerCount; ++i )
 
1342
                            tr_peerMsgsHave( peers[i]->msgs, p );
 
1343
 
 
1344
                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
 
1345
                            const tr_file * file = &tor->info.files[fileIndex];
 
1346
                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
 
1347
                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
 
1348
                                    tr_torrentFileCompleted( tor, fileIndex );
1167
1349
                        }
 
1350
 
 
1351
                        pieceListRemovePiece( t, p );
1168
1352
                    }
1169
1353
                }
1170
1354
 
1171
 
                tr_torrentRecheckCompleteness( tor );
 
1355
                t->needsCompletenessCheck = TRUE;
1172
1356
            }
1173
1357
            break;
1174
1358
        }
1179
1363
                /* some protocol error from the peer */
1180
1364
                peer->doPurge = 1;
1181
1365
                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1182
 
                        tr_peerIoAddrStr( &peer->addr, peer->port ) );
 
1366
                        tr_atomAddrStr( peer->atom ) );
1183
1367
            }
1184
 
            else 
 
1368
            else
1185
1369
            {
1186
1370
                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1187
1371
            }
1194
1378
    torrentUnlock( t );
1195
1379
}
1196
1380
 
 
1381
static int
 
1382
getDefaultShelfLife( uint8_t from )
 
1383
{
 
1384
    /* in general, peers obtained from firsthand contact
 
1385
     * are better than those from secondhand, etc etc */
 
1386
    switch( from )
 
1387
    {
 
1388
        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
 
1389
        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
 
1390
        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
 
1391
        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
 
1392
        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
 
1393
        case TR_PEER_FROM_RESUME   : return 60 * 60;
 
1394
        default                    : return 60 * 60;
 
1395
    }
 
1396
}
 
1397
 
 
1398
 
1197
1399
static void
1198
 
ensureAtomExists( Torrent          * t,
1199
 
                  const tr_address * addr,
1200
 
                  tr_port            port,
1201
 
                  uint8_t            flags,
1202
 
                  uint8_t            from )
 
1400
ensureAtomExists( Torrent           * t,
 
1401
                  const tr_address  * addr,
 
1402
                  const tr_port       port,
 
1403
                  const uint8_t       flags,
 
1404
                  const uint8_t       from )
1203
1405
{
1204
1406
    assert( tr_isAddress( addr ) );
1205
1407
    assert( from < TR_PEER_FROM__MAX );
1207
1409
    if( getExistingAtom( t, addr ) == NULL )
1208
1410
    {
1209
1411
        struct peer_atom * a;
 
1412
        const int jitter = tr_cryptoWeakRandInt( 60*10 );
 
1413
 
1210
1414
        a = tr_new0( struct peer_atom, 1 );
1211
1415
        a->addr = *addr;
1212
1416
        a->port = port;
1213
1417
        a->flags = flags;
1214
1418
        a->from = from;
1215
 
        tordbg( t, "got a new atom: %s", tr_peerIoAddrStr( &a->addr, a->port ) );
1216
 
        tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
 
1419
        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
 
1420
        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
 
1421
 
 
1422
        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1217
1423
    }
1218
1424
}
1219
1425
 
1281
1487
    else /* looking good */
1282
1488
    {
1283
1489
        struct peer_atom * atom;
 
1490
 
1284
1491
        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1285
1492
        atom = getExistingAtom( t, addr );
1286
 
        atom->time = time( NULL );
 
1493
        atom->time = tr_time( );
1287
1494
        atom->piece_data_time = 0;
1288
1495
 
1289
1496
        if( atom->myflags & MYFLAG_BANNED )
1290
1497
        {
1291
1498
            tordbg( t, "banned peer %s tried to reconnect",
1292
 
                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
 
1499
                    tr_atomAddrStr( atom ) );
1293
1500
        }
1294
1501
        else if( tr_peerIoIsIncoming( io )
1295
1502
               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1298
1505
        }
1299
1506
        else
1300
1507
        {
1301
 
            tr_peer * peer = getExistingPeer( t, addr );
 
1508
            tr_peer * peer = atom->peer;
1302
1509
 
1303
1510
            if( peer ) /* we already have this peer */
1304
1511
            {
1305
1512
            }
1306
1513
            else
1307
1514
            {
1308
 
                peer = getPeer( t, addr );
 
1515
                peer = getPeer( t, atom );
1309
1516
                tr_free( peer->client );
1310
1517
 
1311
1518
                if( !peer_id )
1316
1523
                    peer->client = tr_strdup( client );
1317
1524
                }
1318
1525
 
1319
 
                peer->port = port;
1320
 
                peer->atom = atom;
1321
1526
                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1322
1527
                                                                balanced by our unref in peerDestructor()  */
1323
1528
                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1340
1545
                       tr_port      port,
1341
1546
                       int          socket )
1342
1547
{
 
1548
    tr_session * session;
 
1549
 
1343
1550
    managerLock( manager );
1344
1551
 
1345
 
    if( tr_sessionIsAddressBlocked( manager->session, addr ) )
 
1552
    assert( tr_isSession( manager->session ) );
 
1553
    session = manager->session;
 
1554
 
 
1555
    if( tr_sessionIsAddressBlocked( session, addr ) )
1346
1556
    {
1347
1557
        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1348
 
        tr_netClose( socket );
 
1558
        tr_netClose( session, socket );
1349
1559
    }
1350
1560
    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1351
1561
    {
1352
 
        tr_netClose( socket );
 
1562
        tr_netClose( session, socket );
1353
1563
    }
1354
 
    else /* we don't have a connetion to them yet... */
 
1564
    else /* we don't have a connection to them yet... */
1355
1565
    {
1356
1566
        tr_peerIo *    io;
1357
1567
        tr_handshake * handshake;
1358
1568
 
1359
 
        io = tr_peerIoNewIncoming( manager->session, manager->session->bandwidth, addr, port, socket );
 
1569
        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1360
1570
 
1361
1571
        handshake = tr_handshakeNew( io,
1362
 
                                     manager->session->encryptionMode,
 
1572
                                     session->encryptionMode,
1363
1573
                                     myHandshakeDoneCB,
1364
1574
                                     manager );
1365
1575
 
1492
1702
            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1493
1703
            {
1494
1704
                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1495
 
                        tr_peerIoAddrStr( &peer->addr, peer->port ),
 
1705
                        tr_atomAddrStr( peer->atom ),
1496
1706
                        pieceIndex, (int)peer->strikes + 1 );
1497
1707
                addStrike( t, peer );
1498
1708
            }
1554
1764
}
1555
1765
 
1556
1766
int
1557
 
tr_peerMgrGetPeers( tr_torrent * tor, tr_pex ** setme_pex, uint8_t af, int maxPeerCount )
 
1767
tr_peerMgrGetPeers( tr_torrent   * tor,
 
1768
                    tr_pex      ** setme_pex,
 
1769
                    uint8_t        af,
 
1770
                    uint8_t        list_mode,
 
1771
                    int            maxCount )
1558
1772
{
 
1773
    int i;
 
1774
    int n;
1559
1775
    int count = 0;
 
1776
    int atomCount = 0;
1560
1777
    const Torrent * t = tor->torrentPeers;
 
1778
    struct peer_atom ** atoms = NULL;
 
1779
    tr_pex * pex;
 
1780
    tr_pex * walk;
 
1781
 
 
1782
    assert( tr_isTorrent( tor ) );
 
1783
    assert( setme_pex != NULL );
 
1784
    assert( af==TR_AF_INET || af==TR_AF_INET6 );
 
1785
    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
1561
1786
 
1562
1787
    managerLock( t->manager );
1563
1788
 
 
1789
    /**
 
1790
    ***  build a list of atoms
 
1791
    **/
 
1792
 
 
1793
    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
1564
1794
    {
1565
1795
        int i;
1566
 
        const int atomCount = tr_ptrArraySize( &t->pool );
1567
 
        const int pexCount = MIN( atomCount, maxPeerCount );
 
1796
        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
 
1797
        atomCount = tr_ptrArraySize( &t->peers );
 
1798
        atoms = tr_new( struct peer_atom *, atomCount );
 
1799
        for( i=0; i<atomCount; ++i )
 
1800
            atoms[i] = peers[i]->atom;
 
1801
    }
 
1802
    else /* TR_PEERS_ALL */
 
1803
    {
1568
1804
        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1569
 
        struct peer_atom ** atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1570
 
        /* for now, this will waste memory on torrents that have both
1571
 
         * ipv6 and ipv4 peers */
1572
 
        tr_pex * pex = tr_new0( tr_pex, atomCount );
1573
 
        tr_pex * walk = pex;
1574
 
 
1575
 
        qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1576
 
 
1577
 
        for( i=0; i<atomCount && count<pexCount; ++i )
 
1805
        atomCount = tr_ptrArraySize( &t->pool );
 
1806
        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
 
1807
    }
 
1808
 
 
1809
    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
 
1810
 
 
1811
    /**
 
1812
    ***  add the first N of them into our return list
 
1813
    **/
 
1814
 
 
1815
    n = MIN( atomCount, maxCount );
 
1816
    pex = walk = tr_new0( tr_pex, n );
 
1817
 
 
1818
    for( i=0; i<atomCount && count<n; ++i )
 
1819
    {
 
1820
        const struct peer_atom * atom = atoms[i];
 
1821
        if( atom->addr.type == af )
1578
1822
        {
1579
 
            const struct peer_atom * atom = atoms[i];
1580
 
            if( atom->addr.type == af )
1581
 
            {
1582
 
                assert( tr_isAddress( &atom->addr ) );
1583
 
                walk->addr = atom->addr;
1584
 
                walk->port = atom->port;
1585
 
                walk->flags = atom->flags;
1586
 
                ++count;
1587
 
                ++walk;
1588
 
            }
 
1823
            assert( tr_isAddress( &atom->addr ) );
 
1824
            walk->addr = atom->addr;
 
1825
            walk->port = atom->port;
 
1826
            walk->flags = atom->flags;
 
1827
            ++count;
 
1828
            ++walk;
1589
1829
        }
1590
 
 
1591
 
        assert( ( walk - pex ) == count );
1592
 
        *setme_pex = pex;
1593
 
 
1594
 
        tr_free( atoms );
1595
1830
    }
1596
1831
 
 
1832
    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
 
1833
 
 
1834
    assert( ( walk - pex ) == count );
 
1835
    *setme_pex = pex;
 
1836
 
 
1837
    /* cleanup */
 
1838
    tr_free( atoms );
1597
1839
    managerUnlock( t->manager );
1598
1840
    return count;
1599
1841
}
1600
1842
 
 
1843
static int atomPulse      ( void * vmgr );
 
1844
static int bandwidthPulse ( void * vmgr );
 
1845
static int rechokePulse   ( void * vmgr );
 
1846
static int reconnectPulse ( void * vmgr );
 
1847
 
1601
1848
static void
1602
1849
ensureMgrTimersExist( struct tr_peerMgr * m )
1603
1850
{
1604
1851
    tr_session * s = m->session;
1605
1852
 
 
1853
    if( m->atomTimer == NULL )
 
1854
        m->atomTimer = tr_timerNew( s, atomPulse, m, ATOM_PERIOD_MSEC );
 
1855
 
1606
1856
    if( m->bandwidthTimer == NULL )
1607
1857
        m->bandwidthTimer = tr_timerNew( s, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
1608
1858
 
1625
1875
    managerLock( t->manager );
1626
1876
    ensureMgrTimersExist( t->manager );
1627
1877
 
1628
 
    if( !t->isRunning )
1629
 
    {
1630
 
        t->isRunning = TRUE;
1631
 
 
1632
 
        if( !tr_ptrArrayEmpty( &t->webseeds ) )
1633
 
            refillSoon( t );
1634
 
    }
 
1878
    t->isRunning = TRUE;
1635
1879
 
1636
1880
    rechokePulse( t->manager );
1637
1881
    managerUnlock( t->manager );
1640
1884
static void
1641
1885
stopTorrent( Torrent * t )
1642
1886
{
 
1887
    int i, n;
 
1888
 
1643
1889
    assert( torrentIsLocked( t ) );
1644
1890
 
1645
1891
    t->isRunning = FALSE;
1646
1892
 
1647
1893
    /* disconnect the peers. */
1648
 
    tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
 
1894
    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
 
1895
        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
1649
1896
    tr_ptrArrayClear( &t->peers );
1650
1897
 
1651
1898
    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1722
1969
        else if( peerCount ) {
1723
1970
            int j;
1724
1971
            for( j = 0; j < peerCount; ++j )
1725
 
                if( tr_bitfieldHas( peers[j]->have, i ) )
 
1972
                if( tr_bitsetHas( &peers[j]->have, i ) )
1726
1973
                    ++tab[i];
1727
1974
        }
1728
1975
    }
1745
1992
    peerCount = tr_ptrArraySize( &t->peers );
1746
1993
    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1747
1994
    for( i=0; i<peerCount; ++i )
1748
 
        tr_bitfieldOr( pieces, peers[i]->have );
 
1995
        tr_bitsetOr( pieces, &peers[i]->have );
1749
1996
 
1750
1997
    managerUnlock( t->manager );
1751
1998
    return pieces;
1793
2040
 
1794
2041
        ++setmePeersFrom[atom->from];
1795
2042
 
1796
 
        if( clientIsDownloadingFrom( peer ) )
 
2043
        if( clientIsDownloadingFrom( tor, peer ) )
1797
2044
            ++*setmePeersSendingToUs;
1798
2045
 
1799
2046
        if( clientIsUploadingTo( peer ) )
1890
2137
        const struct peer_atom * atom = peer->atom;
1891
2138
        tr_peer_stat *           stat = ret + i;
1892
2139
 
1893
 
        tr_ntop( &peer->addr, stat->addr, sizeof( stat->addr ) );
 
2140
        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
1894
2141
        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
1895
2142
                   sizeof( stat->client ) );
1896
 
        stat->port               = ntohs( peer->port );
 
2143
        stat->port               = ntohs( peer->atom->port );
1897
2144
        stat->from               = atom->from;
1898
2145
        stat->progress           = peer->progress;
1899
2146
        stat->isEncrypted        = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1904
2151
        stat->clientIsChoked     = peer->clientIsChoked;
1905
2152
        stat->clientIsInterested = peer->clientIsInterested;
1906
2153
        stat->isIncoming         = tr_peerIoIsIncoming( peer->io );
1907
 
        stat->isDownloadingFrom  = clientIsDownloadingFrom( peer );
 
2154
        stat->isDownloadingFrom  = clientIsDownloadingFrom( tor, peer );
1908
2155
        stat->isUploadingTo      = clientIsUploadingTo( peer );
1909
2156
        stat->isSeed             = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
1910
2157
 
2107
2354
static tr_close_type_t
2108
2355
shouldPeerBeClosed( const Torrent    * t,
2109
2356
                    const tr_peer    * peer,
2110
 
                    int                peerCount )
 
2357
                    int                peerCount,
 
2358
                    const time_t       now )
2111
2359
{
2112
2360
    const tr_torrent *       tor = t->tor;
2113
 
    const time_t             now = time( NULL );
2114
2361
    const struct peer_atom * atom = peer->atom;
2115
2362
 
2116
2363
    /* if it's marked for purging, close it */
2117
2364
    if( peer->doPurge )
2118
2365
    {
2119
2366
        tordbg( t, "purging peer %s because its doPurge flag is set",
2120
 
                tr_peerIoAddrStr( &atom->addr, atom->port ) );
 
2367
                tr_atomAddrStr( atom ) );
2121
2368
        return TR_MUST_CLOSE;
2122
2369
    }
2123
2370
 
2132
2379
            peerHasEverything = FALSE;
2133
2380
        else {
2134
2381
            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2135
 
            tr_bitfieldDifference( tmp, peer->have );
 
2382
            tr_bitsetDifference( tmp, &peer->have );
2136
2383
            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2137
2384
            tr_bitfieldFree( tmp );
2138
2385
        }
2140
2387
        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2141
2388
        {
2142
2389
            tordbg( t, "purging peer %s because we're both seeds",
2143
 
                    tr_peerIoAddrStr( &atom->addr, atom->port ) );
 
2390
                    tr_atomAddrStr( atom ) );
2144
2391
            return TR_MUST_CLOSE;
2145
2392
        }
2146
2393
    }
2161
2408
/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2162
2409
        if( idleTime > limit ) {
2163
2410
            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2164
 
                       tr_peerIoAddrStr( &atom->addr, atom->port ), idleTime );
 
2411
                       tr_atomAddrStr( atom ), idleTime );
2165
2412
            return TR_CAN_CLOSE;
2166
2413
        }
2167
2414
    }
2169
2416
    return TR_CAN_KEEP;
2170
2417
}
2171
2418
 
 
2419
static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
 
2420
 
2172
2421
static tr_peer **
2173
 
getPeersToClose( Torrent * t, tr_close_type_t closeType, int * setmeSize )
 
2422
getPeersToClose( Torrent * t, tr_close_type_t closeType, const time_t now, int * setmeSize )
2174
2423
{
2175
2424
    int i, peerCount, outsize;
2176
2425
    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2179
2428
    assert( torrentIsLocked( t ) );
2180
2429
 
2181
2430
    for( i = outsize = 0; i < peerCount; ++i )
2182
 
        if( shouldPeerBeClosed( t, peers[i], peerCount ) == closeType )
 
2431
        if( shouldPeerBeClosed( t, peers[i], peerCount, now ) == closeType )
2183
2432
            ret[outsize++] = peers[i];
2184
2433
 
 
2434
    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
 
2435
 
2185
2436
    *setmeSize = outsize;
2186
2437
    return ret;
2187
2438
}
2217
2468
}
2218
2469
 
2219
2470
static int
2220
 
getReconnectIntervalSecs( const struct peer_atom * atom )
 
2471
getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
2221
2472
{
2222
 
    int          sec;
2223
 
    const time_t now = time( NULL );
 
2473
    int sec;
2224
2474
 
2225
2475
    /* if we were recently connected to this peer and transferring piece
2226
2476
     * data, try to reconnect to them sooner rather that later -- we don't
2248
2498
}
2249
2499
 
2250
2500
static struct peer_atom **
2251
 
getPeerCandidates( Torrent * t, int * setmeSize )
 
2501
getPeerCandidates( Torrent * t, const time_t now, int * setmeSize )
2252
2502
{
2253
2503
    int                 i, atomCount, retCount;
2254
2504
    struct peer_atom ** atoms;
2255
2505
    struct peer_atom ** ret;
2256
 
    const time_t        now = time( NULL );
2257
2506
    const int           seed = tr_torrentIsSeed( t->tor );
2258
2507
 
2259
2508
    assert( torrentIsLocked( t ) );
2276
2525
        if( atom->myflags & MYFLAG_UNREACHABLE )
2277
2526
            continue;
2278
2527
 
2279
 
        /* we don't need two connections to the same peer... */
2280
 
        if( peerIsInUse( t, &atom->addr ) )
2281
 
            continue;
2282
 
 
2283
2528
        /* no need to connect if we're both seeds... */
2284
2529
        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2285
2530
                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2286
2531
            continue;
2287
2532
 
2288
2533
        /* don't reconnect too often */
2289
 
        interval = getReconnectIntervalSecs( atom );
 
2534
        interval = getReconnectIntervalSecs( atom, now );
2290
2535
        if( ( now - atom->time ) < interval )
2291
2536
        {
2292
2537
            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2293
 
                    i, tr_peerIoAddrStr( &atom->addr, atom->port ), interval );
 
2538
                    i, tr_atomAddrStr( atom ), interval );
2294
2539
            continue;
2295
2540
        }
2296
2541
 
2298
2543
        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2299
2544
            continue;
2300
2545
 
 
2546
        /* we don't need two connections to the same peer... */
 
2547
        if( peerIsInUse( t, atom ) )
 
2548
            continue;
 
2549
 
2301
2550
        ret[retCount++] = atom;
2302
2551
    }
2303
2552
 
2304
 
    qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
 
2553
    if( retCount != 0 )
 
2554
        qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2305
2555
    *setmeSize = retCount;
2306
2556
    return ret;
2307
2557
}
2333
2583
{
2334
2584
    static time_t prevTime = 0;
2335
2585
    static int    newConnectionsThisSecond = 0;
2336
 
    time_t        now;
 
2586
    const time_t  now = tr_time( );
2337
2587
 
2338
 
    now = time( NULL );
2339
2588
    if( prevTime != now )
2340
2589
    {
2341
2590
        prevTime = now;
2349
2598
    else
2350
2599
    {
2351
2600
        int i;
2352
 
        int canCloseCount;
2353
2601
        int mustCloseCount;
2354
 
        int candidateCount;
2355
2602
        int maxCandidates;
2356
 
        struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, &canCloseCount );
2357
 
        struct tr_peer ** mustClose = getPeersToClose( t, TR_MUST_CLOSE, &mustCloseCount );
2358
 
        struct peer_atom ** candidates = getPeerCandidates( t, &candidateCount );
2359
 
 
2360
 
        tordbg( t, "reconnect pulse for [%s]: "
2361
 
                   "%d must-close connections, "
2362
 
                   "%d can-close connections, "
2363
 
                   "%d connection candidates, "
2364
 
                   "%d atoms, "
2365
 
                   "max per pulse is %d",
2366
 
                   t->tor->info.name,
2367
 
                   mustCloseCount,
2368
 
                   canCloseCount,
2369
 
                   candidateCount,
2370
 
                   tr_ptrArraySize( &t->pool ),
2371
 
                   MAX_RECONNECTIONS_PER_PULSE );
 
2603
        struct tr_peer ** mustClose;
2372
2604
 
2373
2605
        /* disconnect the really bad peers */
 
2606
        mustClose = getPeersToClose( t, TR_MUST_CLOSE, now, &mustCloseCount );
2374
2607
        for( i=0; i<mustCloseCount; ++i )
2375
2608
            closePeer( t, mustClose[i] );
 
2609
        tr_free( mustClose );
2376
2610
 
2377
2611
        /* decide how many peers can we try to add in this pass */
2378
 
        maxCandidates = candidateCount;
2379
 
        maxCandidates = MIN( maxCandidates, MAX_RECONNECTIONS_PER_PULSE );
 
2612
        maxCandidates = MAX_RECONNECTIONS_PER_PULSE;
 
2613
        if( tr_announcerHasBacklog( t->manager->session->announcer ) )
 
2614
            maxCandidates /= 2;
2380
2615
        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2381
2616
        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2382
2617
 
2383
 
        /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2384
 
        for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2385
 
            closePeer( t, canClose[i] );
2386
 
 
2387
 
        tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2388
 
                   " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2389
 
                   "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2390
 
                   candidateCount,
2391
 
                   MAX_RECONNECTIONS_PER_PULSE,
2392
 
                   getPeerCount( t ),
2393
 
                   getMaxPeerCount( t->tor ),
2394
 
                   newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2395
 
 
2396
 
        /* add some new ones */
2397
 
        for( i=0; i<maxCandidates; ++i )
2398
 
        {
2399
 
            tr_peerMgr        * mgr = t->manager;
2400
 
            struct peer_atom  * atom = candidates[i];
2401
 
            tr_peerIo         * io;
2402
 
 
2403
 
            tordbg( t, "Starting an OUTGOING connection with %s",
2404
 
                   tr_peerIoAddrStr( &atom->addr, atom->port ) );
2405
 
 
2406
 
            io = tr_peerIoNewOutgoing( mgr->session, mgr->session->bandwidth, &atom->addr, atom->port, t->tor->info.hash );
2407
 
 
2408
 
            if( io == NULL )
2409
 
            {
2410
 
                tordbg( t, "peerIo not created; marking peer %s as unreachable",
2411
 
                        tr_peerIoAddrStr( &atom->addr, atom->port ) );
2412
 
                atom->myflags |= MYFLAG_UNREACHABLE;
2413
 
            }
2414
 
            else
2415
 
            {
2416
 
                tr_handshake * handshake = tr_handshakeNew( io,
2417
 
                                                            mgr->session->encryptionMode,
2418
 
                                                            myHandshakeDoneCB,
2419
 
                                                            mgr );
2420
 
 
2421
 
                assert( tr_peerIoGetTorrentHash( io ) );
2422
 
 
2423
 
                tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2424
 
 
2425
 
                ++newConnectionsThisSecond;
2426
 
 
2427
 
                tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2428
 
                                         handshakeCompare );
2429
 
            }
2430
 
 
2431
 
            atom->time = time( NULL );
2432
 
        }
2433
 
 
2434
 
        /* cleanup */
2435
 
        tr_free( candidates );
2436
 
        tr_free( mustClose );
2437
 
        tr_free( canClose );
 
2618
        /* select the best candidates, if they are requested */
 
2619
        if( maxCandidates == 0 )
 
2620
        {
 
2621
            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
 
2622
                       "NO connection candidates needed, %d atoms, "
 
2623
                       "max per pulse is %d",
 
2624
                       t->tor->info.name, mustCloseCount,
 
2625
                       tr_ptrArraySize( &t->pool ),
 
2626
                       MAX_RECONNECTIONS_PER_PULSE );
 
2627
 
 
2628
            tordbg( t, "maxCandidates is %d, MAX_RECONNECTIONS_PER_PULSE is %d, "
 
2629
                       "getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
 
2630
                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
 
2631
                       maxCandidates, MAX_RECONNECTIONS_PER_PULSE,
 
2632
                       getPeerCount( t ), getMaxPeerCount( t->tor ),
 
2633
                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
 
2634
        }
 
2635
        else
 
2636
        {
 
2637
            int canCloseCount = 0;
 
2638
            int candidateCount;
 
2639
            struct peer_atom ** candidates;
 
2640
 
 
2641
            candidates = getPeerCandidates( t, now, &candidateCount );
 
2642
            maxCandidates = MIN( maxCandidates, candidateCount );
 
2643
 
 
2644
            /* maybe disconnect some lesser peers, if we have candidates to replace them with */
 
2645
            if( maxCandidates != 0 )
 
2646
            {
 
2647
                struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, now, &canCloseCount );
 
2648
                for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
 
2649
                   closePeer( t, canClose[i] );
 
2650
                tr_free( canClose );
 
2651
            }
 
2652
 
 
2653
            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
 
2654
                       "%d can-close connections, %d connection candidates, "
 
2655
                       "%d atoms, max per pulse is %d",
 
2656
                       t->tor->info.name, mustCloseCount,
 
2657
                       canCloseCount, candidateCount,
 
2658
                       tr_ptrArraySize( &t->pool ), MAX_RECONNECTIONS_PER_PULSE );
 
2659
 
 
2660
            tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
 
2661
                       " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
 
2662
                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
 
2663
                       candidateCount, MAX_RECONNECTIONS_PER_PULSE,
 
2664
                       getPeerCount( t ), getMaxPeerCount( t->tor ),
 
2665
                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
 
2666
 
 
2667
            /* add some new ones */
 
2668
            for( i=0; i<maxCandidates; ++i )
 
2669
            {
 
2670
                tr_peerMgr        * mgr = t->manager;
 
2671
                struct peer_atom  * atom = candidates[i];
 
2672
                tr_peerIo         * io;
 
2673
 
 
2674
                tordbg( t, "Starting an OUTGOING connection with %s",
 
2675
                        tr_atomAddrStr( atom ) );
 
2676
 
 
2677
                io = tr_peerIoNewOutgoing( mgr->session,
 
2678
                                           mgr->session->bandwidth,
 
2679
                                           &atom->addr,
 
2680
                                           atom->port,
 
2681
                                           t->tor->info.hash,
 
2682
                                           t->tor->completeness == TR_SEED );
 
2683
 
 
2684
                if( io == NULL )
 
2685
                {
 
2686
                    tordbg( t, "peerIo not created; marking peer %s as unreachable",
 
2687
                            tr_atomAddrStr( atom ) );
 
2688
                    atom->myflags |= MYFLAG_UNREACHABLE;
 
2689
                }
 
2690
                else
 
2691
                {
 
2692
                    tr_handshake * handshake = tr_handshakeNew( io,
 
2693
                                                                mgr->session->encryptionMode,
 
2694
                                                                myHandshakeDoneCB,
 
2695
                                                                mgr );
 
2696
 
 
2697
                    assert( tr_peerIoGetTorrentHash( io ) );
 
2698
 
 
2699
                    tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
 
2700
 
 
2701
                    ++newConnectionsThisSecond;
 
2702
 
 
2703
                    tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
 
2704
                                             handshakeCompare );
 
2705
                }
 
2706
 
 
2707
                atom->time = now;
 
2708
            }
 
2709
            tr_free( candidates );
 
2710
        }
2438
2711
    }
2439
2712
}
2440
2713
 
2471
2744
    return 0;
2472
2745
}
2473
2746
 
2474
 
/* FIXME: getPeersToClose() should use this */
 
2747
static int
 
2748
comparePeerLivelinessReverse( const void * va, const void * vb )
 
2749
{
 
2750
    return -comparePeerLiveliness (va, vb);
 
2751
}
 
2752
 
2475
2753
static void
2476
 
sortPeersByLiveliness( tr_peer ** peers, void** clientData, int n, uint64_t now )
 
2754
sortPeersByLivelinessImpl( tr_peer  ** peers,
 
2755
                           void     ** clientData,
 
2756
                           int         n,
 
2757
                           uint64_t    now,
 
2758
                           int (*compare) ( const void *va, const void *vb ) )
2477
2759
{
2478
2760
    int i;
2479
2761
    struct peer_liveliness *lives, *l;
2480
2762
 
2481
2763
    /* build a sortable array of peer + extra info */
2482
2764
    lives = l = tr_new0( struct peer_liveliness, n );
2483
 
    for( i=0; i<n; ++i, ++l ) {
 
2765
    for( i=0; i<n; ++i, ++l )
 
2766
    {
2484
2767
        tr_peer * p = peers[i];
2485
2768
        l->peer = p;
2486
2769
        l->doPurge = p->doPurge;
2494
2777
 
2495
2778
    /* sort 'em */
2496
2779
    assert( n == ( l - lives ) );
2497
 
    qsort( lives, n, sizeof( struct peer_liveliness ), comparePeerLiveliness );
 
2780
    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
2498
2781
 
2499
2782
    /* build the peer array */
2500
2783
    for( i=0, l=lives; i<n; ++i, ++l ) {
2509
2792
}
2510
2793
 
2511
2794
static void
 
2795
sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
 
2796
{
 
2797
    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
 
2798
}
 
2799
 
 
2800
static void
 
2801
sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
 
2802
{
 
2803
    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
 
2804
}
 
2805
 
 
2806
 
 
2807
static void
2512
2808
enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2513
2809
{
2514
2810
    int n = tr_ptrArraySize( &t->peers );
2642
2938
        }
2643
2939
    }
2644
2940
 
 
2941
    /* run the completeness check for any torrents that need it */
 
2942
    tor = NULL;
 
2943
    while(( tor = tr_torrentNext( mgr->session, tor ))) {
 
2944
        if( tor->torrentPeers->needsCompletenessCheck ) {
 
2945
            tor->torrentPeers->needsCompletenessCheck  = FALSE;
 
2946
            tr_torrentRecheckCompleteness( tor );
 
2947
        }
 
2948
    }
 
2949
 
2645
2950
    /* possibly stop torrents that have an error */
2646
2951
    tor = NULL;
2647
2952
    while(( tor = tr_torrentNext( mgr->session, tor )))
2648
 
        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR )) 
 
2953
        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
2649
2954
            tr_torrentStop( tor );
2650
2955
 
2651
2956
    managerUnlock( mgr );
2652
2957
    return TRUE;
2653
2958
}
 
2959
 
 
2960
/***
 
2961
****
 
2962
***/
 
2963
 
 
2964
static int
 
2965
compareAtomPtrsByAddress( const void * va, const void *vb )
 
2966
{
 
2967
    const struct peer_atom * a = * (const struct peer_atom**) va;
 
2968
    const struct peer_atom * b = * (const struct peer_atom**) vb;
 
2969
 
 
2970
    assert( tr_isAtom( a ) );
 
2971
    assert( tr_isAtom( b ) );
 
2972
 
 
2973
    return tr_compareAddresses( &a->addr, &b->addr );
 
2974
}
 
2975
 
 
2976
static time_t tr_now = 0;
 
2977
 
 
2978
/* best come first, worst go last */
 
2979
static int
 
2980
compareAtomPtrsByShelfDate( const void * va, const void *vb )
 
2981
{
 
2982
    time_t atime;
 
2983
    time_t btime;
 
2984
    const struct peer_atom * a = * (const struct peer_atom**) va;
 
2985
    const struct peer_atom * b = * (const struct peer_atom**) vb;
 
2986
    const int data_time_cutoff_secs = 60 * 60;
 
2987
 
 
2988
    assert( tr_isAtom( a ) );
 
2989
    assert( tr_isAtom( b ) );
 
2990
 
 
2991
    /* primary key: the last piece data time *if* it was within the last hour */
 
2992
    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
 
2993
    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
 
2994
    if( atime != btime )
 
2995
        return atime > btime ? -1 : 1;
 
2996
 
 
2997
    /* secondary key: shelf date. */
 
2998
    if( a->shelf_date != b->shelf_date )
 
2999
        return a->shelf_date > b->shelf_date ? -1 : 1;
 
3000
 
 
3001
    return 0;
 
3002
}
 
3003
 
 
3004
static int
 
3005
getMaxAtomCount( const tr_torrent * tor )
 
3006
{
 
3007
    /* FIXME: this curve should be smoother... */
 
3008
    const int n = tor->maxConnectedPeers;
 
3009
    if( n >= 200 ) return n * 1.5;
 
3010
    if( n >= 100 ) return n * 2;
 
3011
    if( n >=  50 ) return n * 3;
 
3012
    if( n >=  20 ) return n * 5;
 
3013
    return n * 10;
 
3014
}
 
3015
 
 
3016
static int
 
3017
atomPulse( void * vmgr )
 
3018
{
 
3019
    tr_torrent * tor = NULL;
 
3020
    tr_peerMgr * mgr = vmgr;
 
3021
    managerLock( mgr );
 
3022
 
 
3023
    while(( tor = tr_torrentNext( mgr->session, tor )))
 
3024
    {
 
3025
        int atomCount;
 
3026
        Torrent * t = tor->torrentPeers;
 
3027
        const int maxAtomCount = getMaxAtomCount( tor );
 
3028
        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
 
3029
 
 
3030
        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
 
3031
        {
 
3032
            int i;
 
3033
            int keepCount = 0;
 
3034
            int testCount = 0;
 
3035
            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
 
3036
            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
 
3037
 
 
3038
            /* keep the ones that are in use */
 
3039
            for( i=0; i<atomCount; ++i ) {
 
3040
                struct peer_atom * atom = atoms[i];
 
3041
                if( peerIsInUse( t, atom ) )
 
3042
                    keep[keepCount++] = atom;
 
3043
                else
 
3044
                    test[testCount++] = atom;
 
3045
            }
 
3046
 
 
3047
            /* if there's room, keep the best of what's left */
 
3048
            i = 0;
 
3049
            if( keepCount < maxAtomCount ) {
 
3050
                tr_now = tr_time( );
 
3051
                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
 
3052
                while( i<testCount && keepCount<maxAtomCount )
 
3053
                    keep[keepCount++] = test[i++];
 
3054
            }
 
3055
 
 
3056
            /* free the culled atoms */
 
3057
            while( i<testCount )
 
3058
                tr_free( test[i++] );
 
3059
 
 
3060
            /* rebuild Torrent.pool with what's left */
 
3061
            tr_ptrArrayDestruct( &t->pool, NULL );
 
3062
            t->pool = TR_PTR_ARRAY_INIT;
 
3063
            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
 
3064
            for( i=0; i<keepCount; ++i )
 
3065
                tr_ptrArrayAppend( &t->pool, keep[i] );
 
3066
 
 
3067
            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
 
3068
 
 
3069
            /* cleanup */
 
3070
            tr_free( test );
 
3071
            tr_free( keep );
 
3072
        }
 
3073
    }
 
3074
 
 
3075
    managerUnlock( mgr );
 
3076
    return TRUE;
 
3077
}