~ubuntu-branches/ubuntu/intrepid/miro/intrepid

« back to all changes in this revision

Viewing changes to portable/libtorrent/src/peer_connection.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Christopher James Halse Rogers
  • Date: 2008-02-09 13:37:10 UTC
  • mfrom: (1.1.2 upstream)
  • Revision ID: james.westby@ubuntu.com-20080209133710-9rs90q6gckvp1b6i
Tags: 1.1.2-0ubuntu1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 
 
3
Copyright (c) 2003, Arvid Norberg
 
4
All rights reserved.
 
5
 
 
6
Redistribution and use in source and binary forms, with or without
 
7
modification, are permitted provided that the following conditions
 
8
are met:
 
9
 
 
10
    * Redistributions of source code must retain the above copyright
 
11
      notice, this list of conditions and the following disclaimer.
 
12
    * Redistributions in binary form must reproduce the above copyright
 
13
      notice, this list of conditions and the following disclaimer in
 
14
      the documentation and/or other materials provided with the distribution.
 
15
    * Neither the name of the author nor the names of its
 
16
      contributors may be used to endorse or promote products derived
 
17
      from this software without specific prior written permission.
 
18
 
 
19
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 
20
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 
21
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 
22
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 
23
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 
24
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 
25
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 
26
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 
27
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 
28
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
29
POSSIBILITY OF SUCH DAMAGE.
 
30
 
 
31
*/
 
32
 
 
33
#include "libtorrent/pch.hpp"
 
34
 
 
35
#include <vector>
 
36
#include <iostream>
 
37
#include <iomanip>
 
38
#include <limits>
 
39
#include <boost/bind.hpp>
 
40
 
 
41
#include "libtorrent/peer_connection.hpp"
 
42
#include "libtorrent/identify_client.hpp"
 
43
#include "libtorrent/entry.hpp"
 
44
#include "libtorrent/bencode.hpp"
 
45
#include "libtorrent/alert_types.hpp"
 
46
#include "libtorrent/invariant_check.hpp"
 
47
#include "libtorrent/io.hpp"
 
48
#include "libtorrent/file.hpp"
 
49
#include "libtorrent/version.hpp"
 
50
#include "libtorrent/extensions.hpp"
 
51
#include "libtorrent/aux_/session_impl.hpp"
 
52
#include "libtorrent/policy.hpp"
 
53
#include "libtorrent/socket_type.hpp"
 
54
#include "libtorrent/assert.hpp"
 
55
 
 
56
using boost::bind;
 
57
using boost::shared_ptr;
 
58
using libtorrent::aux::session_impl;
 
59
 
 
60
namespace libtorrent
 
61
{
 
62
 
 
63
        // outbound connection
 
64
        peer_connection::peer_connection(
 
65
                session_impl& ses
 
66
                , boost::weak_ptr<torrent> tor
 
67
                , shared_ptr<socket_type> s
 
68
                , tcp::endpoint const& remote
 
69
                , policy::peer* peerinfo)
 
70
                :
 
71
#ifndef NDEBUG
 
72
                m_last_choke(time_now() - hours(1))
 
73
                ,
 
74
#endif
 
75
                  m_ses(ses)
 
76
                , m_max_out_request_queue(m_ses.settings().max_out_request_queue)
 
77
                , m_timeout(m_ses.settings().peer_timeout)
 
78
                , m_last_piece(time_now())
 
79
                , m_last_request(time_now())
 
80
                , m_last_incoming_request(min_time())
 
81
                , m_last_unchoke(min_time())
 
82
                , m_packet_size(0)
 
83
                , m_recv_pos(0)
 
84
                , m_reading_bytes(0)
 
85
                , m_last_receive(time_now())
 
86
                , m_last_sent(time_now())
 
87
                , m_socket(s)
 
88
                , m_remote(remote)
 
89
                , m_torrent(tor)
 
90
                , m_active(true)
 
91
                , m_peer_interested(false)
 
92
                , m_peer_choked(true)
 
93
                , m_interesting(false)
 
94
                , m_choked(true)
 
95
                , m_failed(false)
 
96
                , m_ignore_bandwidth_limits(false)
 
97
                , m_have_all(false)
 
98
                , m_num_pieces(0)
 
99
                , m_desired_queue_size(2)
 
100
                , m_free_upload(0)
 
101
                , m_assume_fifo(false)
 
102
                , m_num_invalid_requests(0)
 
103
                , m_disconnecting(false)
 
104
                , m_became_uninterested(time_now())
 
105
                , m_became_uninteresting(time_now())
 
106
                , m_connecting(true)
 
107
                , m_queued(true)
 
108
                , m_writing(false)
 
109
                , m_reading(false)
 
110
                , m_prefer_whole_pieces(false)
 
111
                , m_request_large_blocks(false)
 
112
                , m_non_prioritized(false)
 
113
                , m_upload_limit(bandwidth_limit::inf)
 
114
                , m_download_limit(bandwidth_limit::inf)
 
115
                , m_peer_info(peerinfo)
 
116
                , m_speed(slow)
 
117
                , m_connection_ticket(-1)
 
118
                , m_remote_bytes_dled(0)
 
119
                , m_remote_dl_rate(0)
 
120
                , m_remote_dl_update(time_now())
 
121
                , m_outstanding_writing_bytes(0)
 
122
                , m_fast_reconnect(false)
 
123
#ifndef NDEBUG
 
124
                , m_in_constructor(true)
 
125
#endif
 
126
        {
 
127
#ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
 
128
                std::fill(m_country, m_country + 2, 0);
 
129
#endif
 
130
#ifdef TORRENT_VERBOSE_LOGGING
 
131
                m_logger = m_ses.create_log(m_remote.address().to_string() + "_"
 
132
                        + boost::lexical_cast<std::string>(m_remote.port()), m_ses.listen_port());
 
133
                (*m_logger) << "*** OUTGOING CONNECTION\n";
 
134
#endif
 
135
 
 
136
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
137
                TORRENT_ASSERT(t);
 
138
                std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
 
139
 
 
140
                if (t->ready_for_connections())
 
141
                        init();
 
142
        }
 
143
 
 
144
        // incoming connection
 
145
        peer_connection::peer_connection(
 
146
                session_impl& ses
 
147
                , boost::shared_ptr<socket_type> s
 
148
                , policy::peer* peerinfo)
 
149
                :
 
150
#ifndef NDEBUG
 
151
                m_last_choke(time_now() - hours(1))
 
152
                ,
 
153
#endif
 
154
                  m_ses(ses)
 
155
                , m_max_out_request_queue(m_ses.settings().max_out_request_queue)
 
156
                , m_timeout(m_ses.settings().peer_timeout)
 
157
                , m_last_piece(time_now())
 
158
                , m_last_request(time_now())
 
159
                , m_last_incoming_request(min_time())
 
160
                , m_last_unchoke(min_time())
 
161
                , m_packet_size(0)
 
162
                , m_recv_pos(0)
 
163
                , m_reading_bytes(0)
 
164
                , m_last_receive(time_now())
 
165
                , m_last_sent(time_now())
 
166
                , m_socket(s)
 
167
                , m_active(false)
 
168
                , m_peer_interested(false)
 
169
                , m_peer_choked(true)
 
170
                , m_interesting(false)
 
171
                , m_choked(true)
 
172
                , m_failed(false)
 
173
                , m_ignore_bandwidth_limits(false)
 
174
                , m_have_all(false)
 
175
                , m_num_pieces(0)
 
176
                , m_desired_queue_size(2)
 
177
                , m_free_upload(0)
 
178
                , m_assume_fifo(false)
 
179
                , m_num_invalid_requests(0)
 
180
                , m_disconnecting(false)
 
181
                , m_became_uninterested(time_now())
 
182
                , m_became_uninteresting(time_now())
 
183
                , m_connecting(false)
 
184
                , m_queued(false)
 
185
                , m_writing(false)
 
186
                , m_reading(false)
 
187
                , m_prefer_whole_pieces(false)
 
188
                , m_request_large_blocks(false)
 
189
                , m_non_prioritized(false)
 
190
                , m_upload_limit(bandwidth_limit::inf)
 
191
                , m_download_limit(bandwidth_limit::inf)
 
192
                , m_peer_info(peerinfo)
 
193
                , m_speed(slow)
 
194
                , m_connection_ticket(-1)
 
195
                , m_remote_bytes_dled(0)
 
196
                , m_remote_dl_rate(0)
 
197
                , m_remote_dl_update(time_now())
 
198
                , m_outstanding_writing_bytes(0)
 
199
                , m_fast_reconnect(false)
 
200
#ifndef NDEBUG
 
201
                , m_in_constructor(true)
 
202
#endif
 
203
        {
 
204
                tcp::socket::non_blocking_io ioc(true);
 
205
                m_socket->io_control(ioc);
 
206
#ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
 
207
                std::fill(m_country, m_country + 2, 0);
 
208
#endif
 
209
                m_remote = m_socket->remote_endpoint();
 
210
 
 
211
#ifdef TORRENT_VERBOSE_LOGGING
 
212
                TORRENT_ASSERT(m_socket->remote_endpoint() == remote());
 
213
                m_logger = m_ses.create_log(remote().address().to_string() + "_"
 
214
                        + boost::lexical_cast<std::string>(remote().port()), m_ses.listen_port());
 
215
                (*m_logger) << "*** INCOMING CONNECTION\n";
 
216
#endif
 
217
                
 
218
                std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
 
219
        }
 
220
 
 
221
        void peer_connection::update_interest()
 
222
        {
 
223
                INVARIANT_CHECK;
 
224
 
 
225
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
226
                TORRENT_ASSERT(t);
 
227
 
 
228
                bool interested = false;
 
229
                const std::vector<bool>& we_have = t->pieces();
 
230
                for (int j = 0; j != (int)we_have.size(); ++j)
 
231
                {
 
232
                        if (!we_have[j]
 
233
                                && t->piece_priority(j) > 0
 
234
                                && m_have_piece[j])
 
235
                        {
 
236
                                interested = true;
 
237
                                break;
 
238
                        }
 
239
                }
 
240
                try
 
241
                {
 
242
                        if (!interested)
 
243
                                send_not_interested();
 
244
                        else
 
245
                                t->get_policy().peer_is_interesting(*this);
 
246
                }
 
247
                // may throw an asio error if socket has disconnected
 
248
                catch (std::exception& e) {}
 
249
 
 
250
                TORRENT_ASSERT(is_interesting() == interested);
 
251
        }
 
252
 
 
253
#ifndef TORRENT_DISABLE_EXTENSIONS
 
254
        void peer_connection::add_extension(boost::shared_ptr<peer_plugin> ext)
 
255
        {
 
256
                m_extensions.push_back(ext);
 
257
        }
 
258
#endif
 
259
 
 
260
        void peer_connection::send_allowed_set()
 
261
        {
 
262
                INVARIANT_CHECK;
 
263
 
 
264
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
265
                TORRENT_ASSERT(t);
 
266
 
 
267
                int num_allowed_pieces = m_ses.settings().allowed_fast_set_size;
 
268
                int num_pieces = t->torrent_file().num_pieces();
 
269
 
 
270
                if (num_allowed_pieces >= num_pieces)
 
271
                {
 
272
                        for (int i = 0; i < num_pieces; ++i)
 
273
                        {
 
274
#ifdef TORRENT_VERBOSE_LOGGING
 
275
                        (*m_logger) << time_now_string()
 
276
                                << " ==> ALLOWED_FAST [ " << i << " ]\n";
 
277
#endif
 
278
                                write_allow_fast(i);
 
279
                                m_accept_fast.insert(i);
 
280
                        }
 
281
                        return;
 
282
                }
 
283
 
 
284
                std::string x;
 
285
                address const& addr = m_remote.address();
 
286
                if (addr.is_v4())
 
287
                {
 
288
                        address_v4::bytes_type bytes = addr.to_v4().to_bytes();
 
289
                        x.assign((char*)&bytes[0], bytes.size());
 
290
                }
 
291
                else
 
292
                {
 
293
                        address_v6::bytes_type bytes = addr.to_v6().to_bytes();
 
294
                        x.assign((char*)&bytes[0], bytes.size());
 
295
                }
 
296
                x.append((char*)&t->torrent_file().info_hash()[0], 20);
 
297
 
 
298
                sha1_hash hash = hasher(&x[0], x.size()).final();
 
299
                for (;;)
 
300
                {
 
301
                        char* p = (char*)&hash[0];
 
302
                        for (int i = 0; i < 5; ++i)
 
303
                        {
 
304
                                int piece = detail::read_uint32(p) % num_pieces;
 
305
                                if (m_accept_fast.find(piece) == m_accept_fast.end())
 
306
                                {
 
307
#ifdef TORRENT_VERBOSE_LOGGING
 
308
                                        (*m_logger) << time_now_string()
 
309
                                                << " ==> ALLOWED_FAST [ " << piece << " ]\n";
 
310
#endif
 
311
                                        write_allow_fast(piece);
 
312
                                        m_accept_fast.insert(piece);
 
313
                                        if (int(m_accept_fast.size()) >= num_allowed_pieces
 
314
                                                || int(m_accept_fast.size()) == num_pieces) return;
 
315
                                }
 
316
                        }
 
317
                        hash = hasher((char*)&hash[0], 20).final();
 
318
                }
 
319
        }
 
320
 
 
321
        void peer_connection::init()
 
322
        {
 
323
                INVARIANT_CHECK;
 
324
 
 
325
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
326
                TORRENT_ASSERT(t);
 
327
                TORRENT_ASSERT(t->valid_metadata());
 
328
                TORRENT_ASSERT(t->ready_for_connections());
 
329
 
 
330
                m_have_piece.resize(t->torrent_file().num_pieces(), m_have_all);
 
331
 
 
332
                // now that we have a piece_picker,
 
333
                // update it with this peers pieces
 
334
 
 
335
                int num_pieces = std::count(m_have_piece.begin(), m_have_piece.end(), true);
 
336
                if (num_pieces == int(m_have_piece.size()))
 
337
                {
 
338
#ifdef TORRENT_VERBOSE_LOGGING
 
339
                        (*m_logger) << " *** THIS IS A SEED ***\n";
 
340
#endif
 
341
                        // if this is a web seed. we don't have a peer_info struct
 
342
                        if (m_peer_info) m_peer_info->seed = true;
 
343
                        // if we're a seed too, disconnect
 
344
                        if (t->is_finished())
 
345
                        {
 
346
                                throw std::runtime_error("seed to seed connection redundant, disconnecting");
 
347
                        }
 
348
                        m_num_pieces = num_pieces;
 
349
                        t->peer_has_all();
 
350
                        if (!t->is_finished())
 
351
                                t->get_policy().peer_is_interesting(*this);
 
352
                        return;
 
353
                }
 
354
 
 
355
                m_num_pieces = num_pieces;
 
356
                // if we're a seed, we don't keep track of piece availability
 
357
                if (!t->is_seed())
 
358
                {
 
359
                        bool interesting = false;
 
360
                        for (int i = 0; i < int(m_have_piece.size()); ++i)
 
361
                        {
 
362
                                if (m_have_piece[i])
 
363
                                {
 
364
                                        t->peer_has(i);
 
365
                                        // if the peer has a piece and we don't, the peer is interesting
 
366
                                        if (!t->have_piece(i)
 
367
                                                && t->picker().piece_priority(i) != 0)
 
368
                                                interesting = true;
 
369
                                }
 
370
                        }
 
371
                        if (interesting)
 
372
                                t->get_policy().peer_is_interesting(*this);
 
373
                }
 
374
        }
 
375
 
 
376
        peer_connection::~peer_connection()
 
377
        {
 
378
//              INVARIANT_CHECK;
 
379
                TORRENT_ASSERT(m_disconnecting);
 
380
 
 
381
#ifdef TORRENT_VERBOSE_LOGGING
 
382
                if (m_logger)
 
383
                {
 
384
                        (*m_logger) << time_now_string()
 
385
                                << " *** CONNECTION CLOSED\n";
 
386
                }
 
387
#endif
 
388
#ifndef NDEBUG
 
389
                if (m_peer_info)
 
390
                        TORRENT_ASSERT(m_peer_info->connection == 0);
 
391
 
 
392
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
393
                if (t) TORRENT_ASSERT(t->connection_for(remote()) != this);
 
394
#endif
 
395
        }
 
396
 
 
397
        void peer_connection::fast_reconnect(bool r)
 
398
        {
 
399
                if (peer_info_struct() && peer_info_struct()->fast_reconnects > 1) return;
 
400
                m_fast_reconnect = r;
 
401
                peer_info_struct()->connected = time_now()
 
402
                        - seconds(m_ses.settings().min_reconnect_time
 
403
                        * m_ses.settings().max_failcount);
 
404
                if (peer_info_struct()) ++peer_info_struct()->fast_reconnects;
 
405
        }
 
406
 
 
407
        void peer_connection::announce_piece(int index)
 
408
        {
 
409
                // dont announce during handshake
 
410
                if (in_handshake()) return;
 
411
 
 
412
                // remove suggested pieces that we have         
 
413
                std::vector<int>::iterator i = std::find(
 
414
                        m_suggested_pieces.begin(), m_suggested_pieces.end(), index);
 
415
                if (i != m_suggested_pieces.end()) m_suggested_pieces.erase(i);
 
416
 
 
417
                // optimization, don't send have messages
 
418
                // to peers that already have the piece
 
419
                if (!m_ses.settings().send_redundant_have
 
420
                        && has_piece(index)) return;
 
421
 
 
422
#ifdef TORRENT_VERBOSE_LOGGING
 
423
                (*m_logger) << time_now_string()
 
424
                        << " ==> HAVE    [ piece: " << index << "]\n";
 
425
#endif
 
426
                write_have(index);
 
427
#ifndef NDEBUG
 
428
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
429
                TORRENT_ASSERT(t);
 
430
                TORRENT_ASSERT(t->have_piece(index));
 
431
#endif
 
432
        }
 
433
 
 
434
        bool peer_connection::has_piece(int i) const
 
435
        {
 
436
                INVARIANT_CHECK;
 
437
 
 
438
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
439
                TORRENT_ASSERT(t);
 
440
                TORRENT_ASSERT(t->valid_metadata());
 
441
                TORRENT_ASSERT(i >= 0);
 
442
                TORRENT_ASSERT(i < t->torrent_file().num_pieces());
 
443
                return m_have_piece[i];
 
444
        }
 
445
 
 
446
        std::deque<piece_block> const& peer_connection::request_queue() const
 
447
        {
 
448
                return m_request_queue;
 
449
        }
 
450
        
 
451
        std::deque<piece_block> const& peer_connection::download_queue() const
 
452
        {
 
453
                return m_download_queue;
 
454
        }
 
455
        
 
456
        std::deque<peer_request> const& peer_connection::upload_queue() const
 
457
        {
 
458
                return m_requests;
 
459
        }
 
460
 
 
461
        void peer_connection::add_stat(size_type downloaded, size_type uploaded)
 
462
        {
 
463
                m_statistics.add_stat(downloaded, uploaded);
 
464
        }
 
465
 
 
466
        std::vector<bool> const& peer_connection::get_bitfield() const
 
467
        {
 
468
                return m_have_piece;
 
469
        }
 
470
 
 
471
        void peer_connection::received_valid_data(int index)
 
472
        {
 
473
                INVARIANT_CHECK;
 
474
 
 
475
#ifndef TORRENT_DISABLE_EXTENSIONS
 
476
                for (extension_list_t::iterator i = m_extensions.begin()
 
477
                        , end(m_extensions.end()); i != end; ++i)
 
478
                {
 
479
                        try { (*i)->on_piece_pass(index); } catch (std::exception&) {}
 
480
                }
 
481
#endif
 
482
        }
 
483
 
 
484
        void peer_connection::received_invalid_data(int index)
 
485
        {
 
486
                INVARIANT_CHECK;
 
487
 
 
488
#ifndef TORRENT_DISABLE_EXTENSIONS
 
489
                for (extension_list_t::iterator i = m_extensions.begin()
 
490
                        , end(m_extensions.end()); i != end; ++i)
 
491
                {
 
492
                        try { (*i)->on_piece_failed(index); } catch (std::exception&) {}
 
493
                }
 
494
#endif
 
495
 
 
496
                if (peer_info_struct())
 
497
                {
 
498
                        peer_info_struct()->on_parole = true;
 
499
                        ++peer_info_struct()->hashfails;
 
500
                        int& trust_points = peer_info_struct()->trust_points;
 
501
 
 
502
                        // we decrease more than we increase, to keep the
 
503
                        // allowed failed/passed ratio low.
 
504
                        // TODO: make this limit user settable
 
505
                        trust_points -= 2;
 
506
                        if (trust_points < -7) trust_points = -7;
 
507
                }
 
508
        }
 
509
        
 
510
        size_type peer_connection::total_free_upload() const
 
511
        {
 
512
                return m_free_upload;
 
513
        }
 
514
 
 
515
        void peer_connection::add_free_upload(size_type free_upload)
 
516
        {
 
517
                INVARIANT_CHECK;
 
518
 
 
519
                m_free_upload += free_upload;
 
520
        }
 
521
 
 
522
        // verifies a piece to see if it is valid (is within a valid range)
 
523
        // and if it can correspond to a request generated by libtorrent.
 
524
        bool peer_connection::verify_piece(const peer_request& p) const
 
525
        {
 
526
                INVARIANT_CHECK;
 
527
 
 
528
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
529
                TORRENT_ASSERT(t);
 
530
 
 
531
                TORRENT_ASSERT(t->valid_metadata());
 
532
                torrent_info const& ti = t->torrent_file();
 
533
 
 
534
                return p.piece >= 0
 
535
                        && p.piece < t->torrent_file().num_pieces()
 
536
                        && p.length > 0
 
537
                        && p.start >= 0
 
538
                        && (p.length == t->block_size()
 
539
                                || (p.length < t->block_size()
 
540
                                        && p.piece == ti.num_pieces()-1
 
541
                                        && p.start + p.length == ti.piece_size(p.piece))
 
542
                                || (m_request_large_blocks
 
543
                                        && p.length <= ti.piece_length() * m_prefer_whole_pieces == 0 ?
 
544
                                        1 : m_prefer_whole_pieces))
 
545
                        && p.piece * size_type(ti.piece_length()) + p.start + p.length
 
546
                                <= ti.total_size()
 
547
                        && (p.start % t->block_size() == 0);
 
548
        }
 
549
 
 
550
        void peer_connection::attach_to_torrent(sha1_hash const& ih)
 
551
        {
 
552
                INVARIANT_CHECK;
 
553
 
 
554
                TORRENT_ASSERT(!m_disconnecting);
 
555
                TORRENT_ASSERT(m_torrent.expired());
 
556
                boost::weak_ptr<torrent> wpt = m_ses.find_torrent(ih);
 
557
                boost::shared_ptr<torrent> t = wpt.lock();
 
558
 
 
559
                if (t && t->is_aborted())
 
560
                {
 
561
#ifdef TORRENT_VERBOSE_LOGGING
 
562
                        (*m_logger) << " *** the torrent has been aborted\n";
 
563
#endif
 
564
                        t.reset();
 
565
                }
 
566
 
 
567
                if (!t)
 
568
                {
 
569
                        // we couldn't find the torrent!
 
570
#ifdef TORRENT_VERBOSE_LOGGING
 
571
                        (*m_logger) << " *** couldn't find a torrent with the given info_hash: " << ih << "\n";
 
572
                        (*m_logger) << " torrents:\n";
 
573
                        session_impl::torrent_map const& torrents = m_ses.m_torrents;
 
574
                        for (session_impl::torrent_map::const_iterator i = torrents.begin()
 
575
                                , end(torrents.end()); i != end; ++i)
 
576
                        {
 
577
                                (*m_logger) << "   " << i->second->torrent_file().info_hash() << "\n";
 
578
                        }
 
579
#endif
 
580
                        throw std::runtime_error("got info-hash that is not in our session");
 
581
                }
 
582
 
 
583
                if (t->is_paused())
 
584
                {
 
585
                        // paused torrents will not accept
 
586
                        // incoming connections
 
587
#ifdef TORRENT_VERBOSE_LOGGING
 
588
                        (*m_logger) << " rejected connection to paused torrent\n";
 
589
#endif
 
590
                        throw std::runtime_error("connection rejected by paused torrent");
 
591
                }
 
592
 
 
593
                TORRENT_ASSERT(m_torrent.expired());
 
594
                // check to make sure we don't have another connection with the same
 
595
                // info_hash and peer_id. If we do. close this connection.
 
596
                t->attach_peer(this);
 
597
                m_torrent = wpt;
 
598
 
 
599
                TORRENT_ASSERT(!m_torrent.expired());
 
600
 
 
601
                // if the torrent isn't ready to accept
 
602
                // connections yet, we'll have to wait with
 
603
                // our initialization
 
604
                if (t->ready_for_connections()) init();
 
605
 
 
606
                TORRENT_ASSERT(!m_torrent.expired());
 
607
 
 
608
                // assume the other end has no pieces
 
609
                // if we don't have valid metadata yet,
 
610
                // leave the vector unallocated
 
611
                TORRENT_ASSERT(m_num_pieces == 0);
 
612
                std::fill(m_have_piece.begin(), m_have_piece.end(), false);
 
613
                TORRENT_ASSERT(!m_torrent.expired());
 
614
        }
 
615
 
 
616
        // message handlers
 
617
 
 
618
        // -----------------------------
 
619
        // --------- KEEPALIVE ---------
 
620
        // -----------------------------
 
621
 
 
622
        void peer_connection::incoming_keepalive()
 
623
        {
 
624
                INVARIANT_CHECK;
 
625
 
 
626
#ifdef TORRENT_VERBOSE_LOGGING
 
627
                (*m_logger) << time_now_string() << " <== KEEPALIVE\n";
 
628
#endif
 
629
        }
 
630
 
 
631
        // -----------------------------
 
632
        // ----------- CHOKE -----------
 
633
        // -----------------------------
 
634
 
 
635
        void peer_connection::incoming_choke()
 
636
        {
 
637
                INVARIANT_CHECK;
 
638
 
 
639
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
640
                TORRENT_ASSERT(t);
 
641
 
 
642
#ifndef TORRENT_DISABLE_EXTENSIONS
 
643
                for (extension_list_t::iterator i = m_extensions.begin()
 
644
                        , end(m_extensions.end()); i != end; ++i)
 
645
                {
 
646
                        if ((*i)->on_choke()) return;
 
647
                }
 
648
#endif
 
649
 
 
650
#ifdef TORRENT_VERBOSE_LOGGING
 
651
                (*m_logger) << time_now_string() << " <== CHOKE\n";
 
652
#endif
 
653
                m_peer_choked = true;
 
654
                t->get_policy().choked(*this);
 
655
                
 
656
                if (peer_info_struct() == 0 || !peer_info_struct()->on_parole)
 
657
                {
 
658
                        // if the peer is not in parole mode, clear the queued
 
659
                        // up block requests
 
660
                        if (!t->is_seed())
 
661
                        {
 
662
                                piece_picker& p = t->picker();
 
663
                                for (std::deque<piece_block>::const_iterator i = m_request_queue.begin()
 
664
                                        , end(m_request_queue.end()); i != end; ++i)
 
665
                                {
 
666
                                        // since this piece was skipped, clear it and allow it to
 
667
                                        // be requested from other peers
 
668
                                        p.abort_download(*i);
 
669
                                }
 
670
                        }
 
671
                        m_request_queue.clear();
 
672
                }
 
673
        }
 
674
 
 
675
        bool match_request(peer_request const& r, piece_block const& b, int block_size)
 
676
        {
 
677
                if (b.piece_index != r.piece) return false;
 
678
                if (b.block_index != r.start / block_size) return false;
 
679
                if (r.start % block_size != 0) return false;
 
680
                return true;
 
681
        }
 
682
 
 
683
        // -----------------------------
 
684
        // -------- REJECT PIECE -------
 
685
        // -----------------------------
 
686
 
 
687
        void peer_connection::incoming_reject_request(peer_request const& r)
 
688
        {
 
689
                INVARIANT_CHECK;
 
690
 
 
691
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
692
                TORRENT_ASSERT(t);
 
693
 
 
694
#ifndef TORRENT_DISABLE_EXTENSIONS
 
695
                for (extension_list_t::iterator i = m_extensions.begin()
 
696
                        , end(m_extensions.end()); i != end; ++i)
 
697
                {
 
698
                        if ((*i)->on_reject(r)) return;
 
699
                }
 
700
#endif
 
701
 
 
702
                std::deque<piece_block>::iterator i = std::find_if(
 
703
                        m_download_queue.begin(), m_download_queue.end()
 
704
                        , bind(match_request, boost::cref(r), _1, t->block_size()));
 
705
        
 
706
#ifdef TORRENT_VERBOSE_LOGGING
 
707
                        (*m_logger) << time_now_string()
 
708
                                << " <== REJECT_PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
 
709
#endif
 
710
 
 
711
                piece_block b(-1, 0);
 
712
                if (i != m_download_queue.end())
 
713
                {
 
714
                        b = *i;
 
715
                        m_download_queue.erase(i);
 
716
                        
 
717
                        // if the peer is in parole mode, keep the request
 
718
                        if (peer_info_struct() && peer_info_struct()->on_parole)
 
719
                        {
 
720
                                m_request_queue.push_front(b);
 
721
                        }
 
722
                        else if (!t->is_seed())
 
723
                        {
 
724
                                piece_picker& p = t->picker();
 
725
                                p.abort_download(b);
 
726
                        }
 
727
                }
 
728
#ifdef TORRENT_VERBOSE_LOGGING
 
729
                else
 
730
                {
 
731
                        (*m_logger) << time_now_string()
 
732
                                << " *** PIECE NOT IN REQUEST QUEUE\n";
 
733
                }
 
734
#endif
 
735
                if (has_peer_choked())
 
736
                {
 
737
                        // if we're choked and we got a rejection of
 
738
                        // a piece in the allowed fast set, remove it
 
739
                        // from the allow fast set.
 
740
                        std::vector<int>::iterator i = std::find(
 
741
                                m_allowed_fast.begin(), m_allowed_fast.end(), r.piece);
 
742
                        if (i != m_allowed_fast.end()) m_allowed_fast.erase(i);
 
743
                }
 
744
                else
 
745
                {
 
746
                        std::vector<int>::iterator i = std::find(m_suggested_pieces.begin()
 
747
                                , m_suggested_pieces.end(), r.piece);
 
748
                        if (i != m_suggested_pieces.end())
 
749
                                m_suggested_pieces.erase(i);
 
750
                }
 
751
 
 
752
                if (m_request_queue.empty())
 
753
                {
 
754
                        if (m_download_queue.size() < 2)
 
755
                        {
 
756
                                request_a_block(*t, *this);
 
757
                        }
 
758
                        send_block_requests();
 
759
                }
 
760
        }
 
761
        
 
762
        // -----------------------------
 
763
        // -------- REJECT PIECE -------
 
764
        // -----------------------------
 
765
 
 
766
        void peer_connection::incoming_suggest(int index)
 
767
        {
 
768
                INVARIANT_CHECK;
 
769
 
 
770
#ifdef TORRENT_VERBOSE_LOGGING
 
771
                (*m_logger) << time_now_string()
 
772
                        << " <== SUGGEST_PIECE [ piece: " << index << " ]\n";
 
773
#endif
 
774
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
775
                if (!t) return;
 
776
 
 
777
#ifndef TORRENT_DISABLE_EXTENSIONS
 
778
                for (extension_list_t::iterator i = m_extensions.begin()
 
779
                        , end(m_extensions.end()); i != end; ++i)
 
780
                {
 
781
                        if ((*i)->on_suggest(index)) return;
 
782
                }
 
783
#endif
 
784
 
 
785
                if (t->have_piece(index)) return;
 
786
                
 
787
                if (m_suggested_pieces.size() > 9)
 
788
                        m_suggested_pieces.erase(m_suggested_pieces.begin());
 
789
                m_suggested_pieces.push_back(index);
 
790
 
 
791
#ifdef TORRENT_VERBOSE_LOGGING
 
792
                (*m_logger) << time_now_string()
 
793
                        << " ** SUGGEST_PIECE [ piece: " << index << " added to set: " << m_suggested_pieces.size() << " ]\n";
 
794
#endif
 
795
        }
 
796
 
 
797
        // -----------------------------
 
798
        // ---------- UNCHOKE ----------
 
799
        // -----------------------------
 
800
 
 
801
        void peer_connection::incoming_unchoke()
 
802
        {
 
803
                INVARIANT_CHECK;
 
804
 
 
805
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
806
                TORRENT_ASSERT(t);
 
807
 
 
808
#ifndef TORRENT_DISABLE_EXTENSIONS
 
809
                for (extension_list_t::iterator i = m_extensions.begin()
 
810
                        , end(m_extensions.end()); i != end; ++i)
 
811
                {
 
812
                        if ((*i)->on_unchoke()) return;
 
813
                }
 
814
#endif
 
815
 
 
816
#ifdef TORRENT_VERBOSE_LOGGING
 
817
                (*m_logger) << time_now_string() << " <== UNCHOKE\n";
 
818
#endif
 
819
                m_peer_choked = false;
 
820
                t->get_policy().unchoked(*this);
 
821
        }
 
822
 
 
823
        // -----------------------------
 
824
        // -------- INTERESTED ---------
 
825
        // -----------------------------
 
826
 
 
827
        void peer_connection::incoming_interested()
 
828
        {
 
829
                INVARIANT_CHECK;
 
830
 
 
831
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
832
                TORRENT_ASSERT(t);
 
833
 
 
834
#ifndef TORRENT_DISABLE_EXTENSIONS
 
835
                for (extension_list_t::iterator i = m_extensions.begin()
 
836
                        , end(m_extensions.end()); i != end; ++i)
 
837
                {
 
838
                        if ((*i)->on_interested()) return;
 
839
                }
 
840
#endif
 
841
 
 
842
#ifdef TORRENT_VERBOSE_LOGGING
 
843
                (*m_logger) << time_now_string() << " <== INTERESTED\n";
 
844
#endif
 
845
                m_peer_interested = true;
 
846
                t->get_policy().interested(*this);
 
847
        }
 
848
 
 
849
        // -----------------------------
 
850
        // ------ NOT INTERESTED -------
 
851
        // -----------------------------
 
852
 
 
853
        void peer_connection::incoming_not_interested()
 
854
        {
 
855
                INVARIANT_CHECK;
 
856
 
 
857
#ifndef TORRENT_DISABLE_EXTENSIONS
 
858
                for (extension_list_t::iterator i = m_extensions.begin()
 
859
                        , end(m_extensions.end()); i != end; ++i)
 
860
                {
 
861
                        if ((*i)->on_not_interested()) return;
 
862
                }
 
863
#endif
 
864
 
 
865
                m_became_uninterested = time_now();
 
866
 
 
867
#ifdef TORRENT_VERBOSE_LOGGING
 
868
                (*m_logger) << time_now_string() << " <== NOT_INTERESTED\n";
 
869
#endif
 
870
 
 
871
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
872
                TORRENT_ASSERT(t);
 
873
 
 
874
                m_peer_interested = false;
 
875
                t->get_policy().not_interested(*this);
 
876
        }
 
877
 
 
878
        // -----------------------------
 
879
        // ----------- HAVE ------------
 
880
        // -----------------------------
 
881
 
 
882
        void peer_connection::incoming_have(int index)
 
883
        {
 
884
                INVARIANT_CHECK;
 
885
 
 
886
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
887
                TORRENT_ASSERT(t);
 
888
 
 
889
#ifndef TORRENT_DISABLE_EXTENSIONS
 
890
                for (extension_list_t::iterator i = m_extensions.begin()
 
891
                        , end(m_extensions.end()); i != end; ++i)
 
892
                {
 
893
                        if ((*i)->on_have(index)) return;
 
894
                }
 
895
#endif
 
896
 
 
897
#ifdef TORRENT_VERBOSE_LOGGING
 
898
                (*m_logger) << time_now_string()
 
899
                        << " <== HAVE    [ piece: " << index << "]\n";
 
900
#endif
 
901
 
 
902
                // if we got an invalid message, abort
 
903
                if (index >= (int)m_have_piece.size() || index < 0)
 
904
                        throw protocol_error("got 'have'-message with higher index "
 
905
                                "than the number of pieces");
 
906
 
 
907
                if (m_have_piece[index])
 
908
                {
 
909
#ifdef TORRENT_VERBOSE_LOGGING
 
910
                        (*m_logger) << "   got redundant HAVE message for index: " << index << "\n";
 
911
#endif
 
912
                }
 
913
                else
 
914
                {
 
915
                        m_have_piece[index] = true;
 
916
 
 
917
                        // only update the piece_picker if
 
918
                        // we have the metadata and if
 
919
                        // we're not a seed (in which case
 
920
                        // we won't have a piece picker)
 
921
                        if (t->valid_metadata())
 
922
                        {
 
923
                                ++m_num_pieces;
 
924
                                t->peer_has(index);
 
925
 
 
926
                                if (!t->have_piece(index)
 
927
                                        && !t->is_seed()
 
928
                                        && !is_interesting()
 
929
                                        && t->picker().piece_priority(index) != 0)
 
930
                                        t->get_policy().peer_is_interesting(*this);
 
931
 
 
932
                                // this will disregard all have messages we get within
 
933
                                // the first two seconds. Since some clients implements
 
934
                                // lazy bitfields, these will not be reliable to use
 
935
                                // for an estimated peer download rate.
 
936
                                if (!peer_info_struct() || time_now() - peer_info_struct()->connected > seconds(2))
 
937
                                {
 
938
                                        // update bytes downloaded since last timer
 
939
                                        m_remote_bytes_dled += t->torrent_file().piece_size(index);
 
940
                                }
 
941
                        }
 
942
                        
 
943
                        if (is_seed())
 
944
                        {
 
945
                                TORRENT_ASSERT(m_peer_info);
 
946
                                m_peer_info->seed = true;
 
947
                                if (t->is_finished())
 
948
                                {
 
949
                                        throw protocol_error("seed to seed connection redundant, disconnecting");
 
950
                                }
 
951
                        }
 
952
                }
 
953
        }
 
954
 
 
955
        // -----------------------------
 
956
        // --------- BITFIELD ----------
 
957
        // -----------------------------
 
958
 
 
959
        void peer_connection::incoming_bitfield(std::vector<bool> const& bitfield)
 
960
        {
 
961
                INVARIANT_CHECK;
 
962
 
 
963
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
964
                TORRENT_ASSERT(t);
 
965
 
 
966
#ifndef TORRENT_DISABLE_EXTENSIONS
 
967
                for (extension_list_t::iterator i = m_extensions.begin()
 
968
                        , end(m_extensions.end()); i != end; ++i)
 
969
                {
 
970
                        if ((*i)->on_bitfield(bitfield)) return;
 
971
                }
 
972
#endif
 
973
 
 
974
#ifdef TORRENT_VERBOSE_LOGGING
 
975
                (*m_logger) << time_now_string() << " <== BITFIELD ";
 
976
 
 
977
                for (int i = 0; i < int(bitfield.size()); ++i)
 
978
                {
 
979
                        if (bitfield[i]) (*m_logger) << "1";
 
980
                        else (*m_logger) << "0";
 
981
                }
 
982
                (*m_logger) << "\n";
 
983
#endif
 
984
 
 
985
                // if we don't have the metedata, we cannot
 
986
                // verify the bitfield size
 
987
                if (t->valid_metadata()
 
988
                        && (bitfield.size() / 8) != (m_have_piece.size() / 8))
 
989
                        throw protocol_error("got bitfield with invalid size: "
 
990
                                + boost::lexical_cast<std::string>(bitfield.size() / 8)
 
991
                                + "bytes. expected: "
 
992
                                + boost::lexical_cast<std::string>(m_have_piece.size() / 8)
 
993
                                + "bytes");
 
994
 
 
995
                // if we don't have metadata yet
 
996
                // just remember the bitmask
 
997
                // don't update the piecepicker
 
998
                // (since it doesn't exist yet)
 
999
                if (!t->ready_for_connections())
 
1000
                {
 
1001
                        m_have_piece = bitfield;
 
1002
                        m_num_pieces = std::count(bitfield.begin(), bitfield.end(), true);
 
1003
                        if (m_peer_info) m_peer_info->seed = (m_num_pieces == int(bitfield.size()));
 
1004
                        return;
 
1005
                }
 
1006
 
 
1007
                TORRENT_ASSERT(t->valid_metadata());
 
1008
                
 
1009
                int num_pieces = std::count(bitfield.begin(), bitfield.end(), true);
 
1010
                if (num_pieces == int(m_have_piece.size()))
 
1011
                {
 
1012
#ifdef TORRENT_VERBOSE_LOGGING
 
1013
                        (*m_logger) << " *** THIS IS A SEED ***\n";
 
1014
#endif
 
1015
                        // if this is a web seed. we don't have a peer_info struct
 
1016
                        if (m_peer_info) m_peer_info->seed = true;
 
1017
                        // if we're a seed too, disconnect
 
1018
                        if (t->is_finished())
 
1019
                        {
 
1020
                                throw protocol_error("seed to seed connection redundant, disconnecting");
 
1021
                        }
 
1022
 
 
1023
                        std::fill(m_have_piece.begin(), m_have_piece.end(), true);
 
1024
                        m_num_pieces = num_pieces;
 
1025
                        t->peer_has_all();
 
1026
                        if (!t->is_finished())
 
1027
                                t->get_policy().peer_is_interesting(*this);
 
1028
                        return;
 
1029
                }
 
1030
 
 
1031
                // let the torrent know which pieces the
 
1032
                // peer has
 
1033
                // if we're a seed, we don't keep track of piece availability
 
1034
                if (!t->is_seed())
 
1035
                {
 
1036
                        bool interesting = false;
 
1037
                        for (int i = 0; i < (int)m_have_piece.size(); ++i)
 
1038
                        {
 
1039
                                bool have = bitfield[i];
 
1040
                                if (have && !m_have_piece[i])
 
1041
                                {
 
1042
                                        m_have_piece[i] = true;
 
1043
                                        ++m_num_pieces;
 
1044
                                        t->peer_has(i);
 
1045
                                        if (!t->have_piece(i) && t->picker().piece_priority(i) != 0)
 
1046
                                                interesting = true;
 
1047
                                }
 
1048
                                else if (!have && m_have_piece[i])
 
1049
                                {
 
1050
                                        // this should probably not be allowed
 
1051
                                        m_have_piece[i] = false;
 
1052
                                        --m_num_pieces;
 
1053
                                        t->peer_lost(i);
 
1054
                                }
 
1055
                        }
 
1056
 
 
1057
                        if (interesting) t->get_policy().peer_is_interesting(*this);
 
1058
                }
 
1059
                else
 
1060
                {
 
1061
                        for (int i = 0; i < (int)m_have_piece.size(); ++i)
 
1062
                        {
 
1063
                                bool have = bitfield[i];
 
1064
                                if (have && !m_have_piece[i])
 
1065
                                {
 
1066
                                        m_have_piece[i] = true;
 
1067
                                        ++m_num_pieces;
 
1068
                                }
 
1069
                                else if (!have && m_have_piece[i])
 
1070
                                {
 
1071
                                        // this should probably not be allowed
 
1072
                                        m_have_piece[i] = false;
 
1073
                                        --m_num_pieces;
 
1074
                                }
 
1075
                        }
 
1076
                }
 
1077
        }
 
1078
 
 
1079
        // -----------------------------
 
1080
        // ---------- REQUEST ----------
 
1081
        // -----------------------------
 
1082
 
 
1083
        void peer_connection::incoming_request(peer_request const& r)
 
1084
        {
 
1085
                INVARIANT_CHECK;
 
1086
 
 
1087
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
1088
                TORRENT_ASSERT(t);
 
1089
 
 
1090
#ifndef TORRENT_DISABLE_EXTENSIONS
 
1091
                for (extension_list_t::iterator i = m_extensions.begin()
 
1092
                        , end(m_extensions.end()); i != end; ++i)
 
1093
                {
 
1094
                        if ((*i)->on_request(r)) return;
 
1095
                }
 
1096
#endif
 
1097
 
 
1098
                if (!t->valid_metadata())
 
1099
                {
 
1100
                        // if we don't have valid metadata yet,
 
1101
                        // we shouldn't get a request
 
1102
#ifdef TORRENT_VERBOSE_LOGGING
 
1103
                        (*m_logger) << time_now_string()
 
1104
                                << " <== UNEXPECTED_REQUEST [ "
 
1105
                                "piece: " << r.piece << " | "
 
1106
                                "s: " << r.start << " | "
 
1107
                                "l: " << r.length << " | "
 
1108
                                "i: " << m_peer_interested << " | "
 
1109
                                "t: " << (int)t->torrent_file().piece_size(r.piece) << " | "
 
1110
                                "n: " << t->torrent_file().num_pieces() << " ]\n";
 
1111
#endif
 
1112
                        write_reject_request(r);
 
1113
                        return;
 
1114
                }
 
1115
 
 
1116
                if (int(m_requests.size()) > m_ses.settings().max_allowed_in_request_queue)
 
1117
                {
 
1118
                        // don't allow clients to abuse our
 
1119
                        // memory consumption.
 
1120
                        // ignore requests if the client
 
1121
                        // is making too many of them.
 
1122
#ifdef TORRENT_VERBOSE_LOGGING
 
1123
                        (*m_logger) << time_now_string()
 
1124
                                << " <== TOO MANY REQUESTS [ "
 
1125
                                "piece: " << r.piece << " | "
 
1126
                                "s: " << r.start << " | "
 
1127
                                "l: " << r.length << " | "
 
1128
                                "i: " << m_peer_interested << " | "
 
1129
                                "t: " << (int)t->torrent_file().piece_size(r.piece) << " | "
 
1130
                                "n: " << t->torrent_file().num_pieces() << " ]\n";
 
1131
#endif
 
1132
                        write_reject_request(r);
 
1133
                        return;
 
1134
                }
 
1135
 
 
1136
                // make sure this request
 
1137
                // is legal and that the peer
 
1138
                // is not choked
 
1139
                if (r.piece >= 0
 
1140
                        && r.piece < t->torrent_file().num_pieces()
 
1141
                        && t->have_piece(r.piece)
 
1142
                        && r.start >= 0
 
1143
                        && r.start < t->torrent_file().piece_size(r.piece)
 
1144
                        && r.length > 0
 
1145
                        && r.length + r.start <= t->torrent_file().piece_size(r.piece)
 
1146
                        && m_peer_interested
 
1147
                        && r.length <= t->block_size())
 
1148
                {
 
1149
#ifdef TORRENT_VERBOSE_LOGGING
 
1150
                        (*m_logger) << time_now_string()
 
1151
                                << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
 
1152
#endif
 
1153
                        // if we have choked the client
 
1154
                        // ignore the request
 
1155
                        if (m_choked && m_accept_fast.find(r.piece) == m_accept_fast.end())
 
1156
                        {
 
1157
                                write_reject_request(r);
 
1158
#ifdef TORRENT_VERBOSE_LOGGING
 
1159
                        (*m_logger) << time_now_string()
 
1160
                                << " *** REJECTING REQUEST [ peer choked and piece not in allowed fast set ]\n";
 
1161
#endif
 
1162
                        }
 
1163
                        else
 
1164
                        {
 
1165
                                m_requests.push_back(r);
 
1166
                                m_last_incoming_request = time_now();
 
1167
                                fill_send_buffer();
 
1168
                        }
 
1169
                }
 
1170
                else
 
1171
                {
 
1172
#ifdef TORRENT_VERBOSE_LOGGING
 
1173
                        (*m_logger) << time_now_string()
 
1174
                                << " <== INVALID_REQUEST [ "
 
1175
                                "piece: " << r.piece << " | "
 
1176
                                "s: " << r.start << " | "
 
1177
                                "l: " << r.length << " | "
 
1178
                                "i: " << m_peer_interested << " | "
 
1179
                                "t: " << (int)t->torrent_file().piece_size(r.piece) << " | "
 
1180
                                "n: " << t->torrent_file().num_pieces() << " | "
 
1181
                                "h: " << t->have_piece(r.piece) << " | "
 
1182
                                "block_limit: " << t->block_size() << " ]\n";
 
1183
#endif
 
1184
 
 
1185
                        write_reject_request(r);
 
1186
                        ++m_num_invalid_requests;
 
1187
 
 
1188
                        if (t->alerts().should_post(alert::debug))
 
1189
                        {
 
1190
                                t->alerts().post_alert(invalid_request_alert(
 
1191
                                        r
 
1192
                                        , t->get_handle()
 
1193
                                        , m_remote
 
1194
                                        , m_peer_id
 
1195
                                        , "peer sent an illegal piece request"));
 
1196
                        }
 
1197
                }
 
1198
        }
 
1199
 
 
1200
        void peer_connection::incoming_piece_fragment()
 
1201
        {
 
1202
                m_last_piece = time_now();
 
1203
        }
 
1204
 
 
1205
#ifndef NDEBUG
 
1206
        struct check_postcondition
 
1207
        {
 
1208
                check_postcondition(boost::shared_ptr<torrent> const& t_
 
1209
                        , bool init_check = true): t(t_) { if (init_check) check(); }
 
1210
        
 
1211
                ~check_postcondition() { check(); }
 
1212
                
 
1213
                void check()
 
1214
                {
 
1215
                        if (!t->is_seed())
 
1216
                        {
 
1217
                                const int blocks_per_piece = static_cast<int>(
 
1218
                                        t->torrent_file().piece_length() / t->block_size());
 
1219
 
 
1220
                                std::vector<piece_picker::downloading_piece> const& dl_queue
 
1221
                                        = t->picker().get_download_queue();
 
1222
 
 
1223
                                for (std::vector<piece_picker::downloading_piece>::const_iterator i =
 
1224
                                        dl_queue.begin(); i != dl_queue.end(); ++i)
 
1225
                                {
 
1226
                                        TORRENT_ASSERT(i->finished <= blocks_per_piece);
 
1227
                                }
 
1228
                        }
 
1229
                }
 
1230
                
 
1231
                shared_ptr<torrent> t;
 
1232
        };
 
1233
#endif
 
1234
 
 
1235
 
 
1236
        // -----------------------------
 
1237
        // ----------- PIECE -----------
 
1238
        // -----------------------------
 
1239
 
 
1240
        void peer_connection::incoming_piece(peer_request const& p, char const* data)
 
1241
        {
 
1242
                INVARIANT_CHECK;
 
1243
 
 
1244
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
1245
                TORRENT_ASSERT(t);
 
1246
 
 
1247
#ifndef TORRENT_DISABLE_EXTENSIONS
 
1248
                for (extension_list_t::iterator i = m_extensions.begin()
 
1249
                        , end(m_extensions.end()); i != end; ++i)
 
1250
                {
 
1251
                        if ((*i)->on_piece(p, data)) return;
 
1252
                }
 
1253
#endif
 
1254
 
 
1255
#ifndef NDEBUG
 
1256
                check_postcondition post_checker_(t);
 
1257
                t->check_invariant();
 
1258
#endif
 
1259
 
 
1260
#ifdef TORRENT_VERBOSE_LOGGING
 
1261
                (*m_logger) << time_now_string()
 
1262
                        << " <== PIECE   [ piece: " << p.piece << " | "
 
1263
                        "s: " << p.start << " | "
 
1264
                        "l: " << p.length << " | "
 
1265
                        "ds: " << statistics().download_rate() << " | "
 
1266
                        "qs: " << m_desired_queue_size << " ]\n";
 
1267
#endif
 
1268
 
 
1269
                if (!verify_piece(p))
 
1270
                {
 
1271
#ifdef TORRENT_VERBOSE_LOGGING
 
1272
                        (*m_logger) << time_now_string()
 
1273
                                << " <== INVALID_PIECE [ piece: " << p.piece << " | "
 
1274
                                "start: " << p.start << " | "
 
1275
                                "length: " << p.length << " ]\n";
 
1276
#endif
 
1277
                        throw protocol_error("got invalid piece packet");
 
1278
                }
 
1279
 
 
1280
                // if we're already seeding, don't bother,
 
1281
                // just ignore it
 
1282
                if (t->is_seed())
 
1283
                {
 
1284
                        t->received_redundant_data(p.length);
 
1285
                        return;
 
1286
                }
 
1287
 
 
1288
                piece_picker& picker = t->picker();
 
1289
                piece_manager& fs = t->filesystem();
 
1290
 
 
1291
                std::vector<piece_block> finished_blocks;
 
1292
                piece_block block_finished(p.piece, p.start / t->block_size());
 
1293
                TORRENT_ASSERT(p.start % t->block_size() == 0);
 
1294
                TORRENT_ASSERT(p.length == t->block_size()
 
1295
                        || p.length == t->torrent_file().total_size() % t->block_size());
 
1296
 
 
1297
                std::deque<piece_block>::iterator b
 
1298
                        = std::find(
 
1299
                                m_download_queue.begin()
 
1300
                                , m_download_queue.end()
 
1301
                                , block_finished);
 
1302
 
 
1303
                if (b != m_download_queue.end())
 
1304
                {
 
1305
                        if (m_assume_fifo)
 
1306
                        {
 
1307
                                for (std::deque<piece_block>::iterator i = m_download_queue.begin();
 
1308
                                        i != b; ++i)
 
1309
                                {
 
1310
#ifdef TORRENT_VERBOSE_LOGGING
 
1311
                                        (*m_logger) << time_now_string()
 
1312
                                                << " *** SKIPPED_PIECE [ piece: " << i->piece_index << " | "
 
1313
                                                "b: " << i->block_index << " ] ***\n";
 
1314
#endif
 
1315
                                        // since this piece was skipped, clear it and allow it to
 
1316
                                        // be requested from other peers
 
1317
                                        // TODO: send cancel?
 
1318
                                        picker.abort_download(*i);
 
1319
                                }
 
1320
                        
 
1321
                                // remove the request that just finished
 
1322
                                // from the download queue plus the
 
1323
                                // skipped blocks.
 
1324
                                m_download_queue.erase(m_download_queue.begin()
 
1325
                                        , boost::next(b));
 
1326
                        }
 
1327
                        else
 
1328
                        {
 
1329
                                m_download_queue.erase(b);
 
1330
                        }
 
1331
 
 
1332
                        t->cancel_block(block_finished);
 
1333
                }
 
1334
                else
 
1335
                {
 
1336
                        if (t->alerts().should_post(alert::debug))
 
1337
                        {
 
1338
                                t->alerts().post_alert(
 
1339
                                        peer_error_alert(
 
1340
                                                m_remote
 
1341
                                                , m_peer_id
 
1342
                                                , "got a block that was not in the request queue"));
 
1343
                        }
 
1344
#ifdef TORRENT_VERBOSE_LOGGING
 
1345
                        (*m_logger) << " *** The block we just got was not in the "
 
1346
                                "request queue ***\n";
 
1347
#endif
 
1348
                        t->received_redundant_data(p.length);
 
1349
                        request_a_block(*t, *this);
 
1350
                        send_block_requests();
 
1351
                        return;
 
1352
                }
 
1353
 
 
1354
                // if the block we got is already finished, then ignore it
 
1355
                if (picker.is_downloaded(block_finished))
 
1356
                {
 
1357
                        t->received_redundant_data(p.length);
 
1358
 
 
1359
                        request_a_block(*t, *this);
 
1360
                        send_block_requests();
 
1361
                        return;
 
1362
                }
 
1363
                
 
1364
                fs.async_write(p, data, bind(&peer_connection::on_disk_write_complete
 
1365
                        , self(), _1, _2, p, t));
 
1366
                m_outstanding_writing_bytes += p.length;
 
1367
                TORRENT_ASSERT(!m_reading);
 
1368
                picker.mark_as_writing(block_finished, peer_info_struct());
 
1369
        }
 
1370
 
 
1371
        void peer_connection::on_disk_write_complete(int ret, disk_io_job const& j
 
1372
                , peer_request p, boost::shared_ptr<torrent> t)
 
1373
        {
 
1374
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
1375
 
 
1376
                INVARIANT_CHECK;
 
1377
 
 
1378
                m_outstanding_writing_bytes -= p.length;
 
1379
                TORRENT_ASSERT(m_outstanding_writing_bytes >= 0);
 
1380
 
 
1381
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
 
1382
                (*m_ses.m_logger) << time_now_string() << " *** DISK_WRITE_COMPLETE [ p: "
 
1383
                        << p.piece << " o: " << p.start << " ]\n";
 
1384
#endif
 
1385
                // in case the outstanding bytes just dropped down
 
1386
                // to allow to receive more data
 
1387
                setup_receive();
 
1388
 
 
1389
                piece_block block_finished(p.piece, p.start / t->block_size());
 
1390
 
 
1391
                if (ret == -1 || !t)
 
1392
                {
 
1393
                        if (t->has_picker()) t->picker().abort_download(block_finished);
 
1394
 
 
1395
                        if (!t)
 
1396
                        {
 
1397
                                m_ses.connection_failed(m_socket, remote(), j.str.c_str());
 
1398
                                return;
 
1399
                        }
 
1400
                
 
1401
                        if (t->alerts().should_post(alert::fatal))
 
1402
                        {
 
1403
                                std::string err = "torrent paused: disk write error, " + j.str;
 
1404
                                t->alerts().post_alert(file_error_alert(t->get_handle(), err));
 
1405
                        }
 
1406
                        t->pause();
 
1407
                        return;
 
1408
                }
 
1409
 
 
1410
                if (t->is_seed()) return;
 
1411
 
 
1412
                piece_picker& picker = t->picker();
 
1413
 
 
1414
                TORRENT_ASSERT(p.piece == j.piece);
 
1415
                TORRENT_ASSERT(p.start == j.offset);
 
1416
                picker.mark_as_finished(block_finished, peer_info_struct());
 
1417
                if (t->alerts().should_post(alert::debug))
 
1418
                {
 
1419
                        t->alerts().post_alert(block_finished_alert(t->get_handle(), 
 
1420
                                block_finished.block_index, block_finished.piece_index, "block finished"));
 
1421
                }
 
1422
 
 
1423
#ifndef NDEBUG
 
1424
                try
 
1425
                {
 
1426
#endif
 
1427
 
 
1428
                // did we just finish the piece?
 
1429
                if (picker.is_piece_finished(p.piece))
 
1430
                {
 
1431
#ifndef NDEBUG
 
1432
                        check_postcondition post_checker2_(t, false);
 
1433
#endif
 
1434
                        t->async_verify_piece(p.piece, bind(&torrent::piece_finished, t
 
1435
                                , p.piece, _1));
 
1436
                }
 
1437
 
 
1438
#ifndef NDEBUG
 
1439
                }
 
1440
                catch (std::exception const& e)
 
1441
                {
 
1442
                        std::cerr << e.what() << std::endl;
 
1443
                        TORRENT_ASSERT(false);
 
1444
                }
 
1445
#endif
 
1446
 
 
1447
                if (!t->is_seed() && !m_torrent.expired())
 
1448
                {
 
1449
                        // this is a free function defined in policy.cpp
 
1450
                        request_a_block(*t, *this);
 
1451
                        send_block_requests();
 
1452
                }
 
1453
 
 
1454
        }
 
1455
 
 
1456
        // -----------------------------
 
1457
        // ---------- CANCEL -----------
 
1458
        // -----------------------------
 
1459
 
 
1460
        void peer_connection::incoming_cancel(peer_request const& r)
 
1461
        {
 
1462
                INVARIANT_CHECK;
 
1463
 
 
1464
#ifndef TORRENT_DISABLE_EXTENSIONS
 
1465
                for (extension_list_t::iterator i = m_extensions.begin()
 
1466
                        , end(m_extensions.end()); i != end; ++i)
 
1467
                {
 
1468
                        if ((*i)->on_cancel(r)) return;
 
1469
                }
 
1470
#endif
 
1471
 
 
1472
#ifdef TORRENT_VERBOSE_LOGGING
 
1473
                (*m_logger) << time_now_string()
 
1474
                        << " <== CANCEL  [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
 
1475
#endif
 
1476
 
 
1477
                std::deque<peer_request>::iterator i
 
1478
                        = std::find(m_requests.begin(), m_requests.end(), r);
 
1479
 
 
1480
                if (i != m_requests.end())
 
1481
                {
 
1482
                        m_requests.erase(i);
 
1483
                }
 
1484
                else
 
1485
                {
 
1486
#ifdef TORRENT_VERBOSE_LOGGING
 
1487
                        (*m_logger) << time_now_string() << " *** GOT CANCEL NOT IN THE QUEUE\n";
 
1488
#endif
 
1489
                }
 
1490
        }
 
1491
 
 
1492
        // -----------------------------
 
1493
        // --------- DHT PORT ----------
 
1494
        // -----------------------------
 
1495
 
 
1496
        void peer_connection::incoming_dht_port(int listen_port)
 
1497
        {
 
1498
                INVARIANT_CHECK;
 
1499
 
 
1500
#ifdef TORRENT_VERBOSE_LOGGING
 
1501
                (*m_logger) << time_now_string()
 
1502
                        << " <== DHT_PORT [ p: " << listen_port << " ]\n";
 
1503
#endif
 
1504
#ifndef TORRENT_DISABLE_DHT
 
1505
                m_ses.add_dht_node(udp::endpoint(
 
1506
                        m_remote.address(), listen_port));
 
1507
#endif
 
1508
        }
 
1509
 
 
1510
        // -----------------------------
 
1511
        // --------- HAVE ALL ----------
 
1512
        // -----------------------------
 
1513
 
 
1514
        void peer_connection::incoming_have_all()
 
1515
        {
 
1516
                INVARIANT_CHECK;
 
1517
 
 
1518
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
1519
                TORRENT_ASSERT(t);
 
1520
 
 
1521
#ifdef TORRENT_VERBOSE_LOGGING
 
1522
                (*m_logger) << time_now_string() << " <== HAVE_ALL\n";
 
1523
#endif
 
1524
 
 
1525
#ifndef TORRENT_DISABLE_EXTENSIONS
 
1526
                for (extension_list_t::iterator i = m_extensions.begin()
 
1527
                        , end(m_extensions.end()); i != end; ++i)
 
1528
                {
 
1529
                        if ((*i)->on_have_all()) return;
 
1530
                }
 
1531
#endif
 
1532
 
 
1533
                m_have_all = true;
 
1534
 
 
1535
                if (m_peer_info) m_peer_info->seed = true;
 
1536
 
 
1537
                // if we don't have metadata yet
 
1538
                // just remember the bitmask
 
1539
                // don't update the piecepicker
 
1540
                // (since it doesn't exist yet)
 
1541
                if (!t->ready_for_connections())
 
1542
                {
 
1543
                        // TODO: this might need something more
 
1544
                        // so that once we have the metadata
 
1545
                        // we can construct a full bitfield
 
1546
                        return;
 
1547
                }
 
1548
 
 
1549
#ifdef TORRENT_VERBOSE_LOGGING
 
1550
                (*m_logger) << " *** THIS IS A SEED ***\n";
 
1551
#endif
 
1552
 
 
1553
                // if we're a seed too, disconnect
 
1554
                if (t->is_finished())
 
1555
                        throw protocol_error("seed to seed connection redundant, disconnecting");
 
1556
 
 
1557
                TORRENT_ASSERT(!m_have_piece.empty());
 
1558
                std::fill(m_have_piece.begin(), m_have_piece.end(), true);
 
1559
                m_num_pieces = m_have_piece.size();
 
1560
                
 
1561
                t->peer_has_all();
 
1562
                if (!t->is_finished())
 
1563
                        t->get_policy().peer_is_interesting(*this);
 
1564
        }
 
1565
        
 
1566
        // -----------------------------
 
1567
        // --------- HAVE NONE ---------
 
1568
        // -----------------------------
 
1569
 
 
1570
        void peer_connection::incoming_have_none()
 
1571
        {
 
1572
                INVARIANT_CHECK;
 
1573
 
 
1574
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
1575
                TORRENT_ASSERT(t);
 
1576
 
 
1577
#ifdef TORRENT_VERBOSE_LOGGING
 
1578
                (*m_logger) << time_now_string() << " <== HAVE_NONE\n";
 
1579
#endif
 
1580
 
 
1581
#ifndef TORRENT_DISABLE_EXTENSIONS
 
1582
                for (extension_list_t::iterator i = m_extensions.begin()
 
1583
                        , end(m_extensions.end()); i != end; ++i)
 
1584
                {
 
1585
                        if ((*i)->on_have_none()) return;
 
1586
                }
 
1587
#endif
 
1588
 
 
1589
                if (m_peer_info) m_peer_info->seed = false;
 
1590
                TORRENT_ASSERT(!m_have_piece.empty() || !t->ready_for_connections());
 
1591
        }
 
1592
 
 
1593
        // -----------------------------
 
1594
        // ------- ALLOWED FAST --------
 
1595
        // -----------------------------
 
1596
 
 
1597
        void peer_connection::incoming_allowed_fast(int index)
 
1598
        {
 
1599
                INVARIANT_CHECK;
 
1600
 
 
1601
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
1602
                TORRENT_ASSERT(t);
 
1603
 
 
1604
#ifdef TORRENT_VERBOSE_LOGGING
 
1605
                (*m_logger) << time_now_string() << " <== ALLOWED_FAST [ " << index << " ]\n";
 
1606
#endif
 
1607
 
 
1608
#ifndef TORRENT_DISABLE_EXTENSIONS
 
1609
                for (extension_list_t::iterator i = m_extensions.begin()
 
1610
                        , end(m_extensions.end()); i != end; ++i)
 
1611
                {
 
1612
                        if ((*i)->on_allowed_fast(index)) return;
 
1613
                }
 
1614
#endif
 
1615
 
 
1616
                // if we already have the piece, we can
 
1617
                // ignore this message
 
1618
                if (t->valid_metadata()
 
1619
                        && t->have_piece(index))
 
1620
                        return;
 
1621
 
 
1622
                if (index < 0 || index >= int(m_have_piece.size()))
 
1623
                {
 
1624
#ifdef TORRENT_VERBOSE_LOGGING
 
1625
                        (*m_logger) << time_now_string() << " <== INVALID_ALLOWED_FAST [ " << index << " | s: "
 
1626
                                << int(m_have_piece.size()) << " ]\n";
 
1627
#endif
 
1628
                        return;
 
1629
                }
 
1630
 
 
1631
                m_allowed_fast.push_back(index);
 
1632
 
 
1633
                // if the peer has the piece and we want
 
1634
                // to download it, request it
 
1635
                if (int(m_have_piece.size()) > index
 
1636
                        && m_have_piece[index]
 
1637
                        && t->has_picker()
 
1638
                        && t->picker().piece_priority(index) > 0)
 
1639
                {
 
1640
                        t->get_policy().peer_is_interesting(*this);
 
1641
                }
 
1642
        }
 
1643
 
 
1644
        std::vector<int> const& peer_connection::allowed_fast()
 
1645
        {
 
1646
                INVARIANT_CHECK;
 
1647
        
 
1648
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
1649
                TORRENT_ASSERT(t);
 
1650
 
 
1651
                m_allowed_fast.erase(std::remove_if(m_allowed_fast.begin()
 
1652
                        , m_allowed_fast.end(), bind(&torrent::have_piece, t, _1))
 
1653
                        , m_allowed_fast.end());
 
1654
 
 
1655
                // TODO: sort the allowed fast set in priority order
 
1656
                return m_allowed_fast;
 
1657
        }
 
1658
 
 
1659
        void peer_connection::add_request(piece_block const& block)
 
1660
        {
 
1661
                INVARIANT_CHECK;
 
1662
 
 
1663
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
1664
                TORRENT_ASSERT(t);
 
1665
 
 
1666
                TORRENT_ASSERT(t->valid_metadata());
 
1667
                TORRENT_ASSERT(block.piece_index >= 0);
 
1668
                TORRENT_ASSERT(block.piece_index < t->torrent_file().num_pieces());
 
1669
                TORRENT_ASSERT(block.block_index >= 0);
 
1670
                TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
 
1671
                TORRENT_ASSERT(!t->picker().is_requested(block) || (t->picker().num_peers(block) > 0));
 
1672
                TORRENT_ASSERT(!t->have_piece(block.piece_index));
 
1673
 
 
1674
                piece_picker::piece_state_t state;
 
1675
                peer_speed_t speed = peer_speed();
 
1676
                char const* speedmsg = 0;
 
1677
                if (speed == fast)
 
1678
                {
 
1679
                        speedmsg = "fast";
 
1680
                        state = piece_picker::fast;
 
1681
                }
 
1682
                else if (speed == medium)
 
1683
                {
 
1684
                        speedmsg = "medium";
 
1685
                        state = piece_picker::medium;
 
1686
                }
 
1687
                else
 
1688
                {
 
1689
                        speedmsg = "slow";
 
1690
                        state = piece_picker::slow;
 
1691
                }
 
1692
 
 
1693
                if (!t->picker().mark_as_downloading(block, peer_info_struct(), state))
 
1694
                        return;
 
1695
 
 
1696
                if (t->alerts().should_post(alert::debug))
 
1697
                {
 
1698
                        t->alerts().post_alert(block_downloading_alert(t->get_handle(), 
 
1699
                                speedmsg, block.block_index, block.piece_index, "block downloading"));
 
1700
                }
 
1701
 
 
1702
                m_request_queue.push_back(block);
 
1703
        }
 
1704
 
 
1705
        void peer_connection::cancel_request(piece_block const& block)
 
1706
        {
 
1707
                INVARIANT_CHECK;
 
1708
 
 
1709
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
1710
                TORRENT_ASSERT(t);
 
1711
 
 
1712
                TORRENT_ASSERT(t->valid_metadata());
 
1713
 
 
1714
                TORRENT_ASSERT(block.piece_index >= 0);
 
1715
                TORRENT_ASSERT(block.piece_index < t->torrent_file().num_pieces());
 
1716
                TORRENT_ASSERT(block.block_index >= 0);
 
1717
                TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
 
1718
 
 
1719
                // if all the peers that requested this block has been
 
1720
                // cancelled, then just ignore the cancel.
 
1721
                if (!t->picker().is_requested(block)) return;
 
1722
 
 
1723
                std::deque<piece_block>::iterator it
 
1724
                        = std::find(m_download_queue.begin(), m_download_queue.end(), block);
 
1725
                if (it == m_download_queue.end())
 
1726
                {
 
1727
                        it = std::find(m_request_queue.begin(), m_request_queue.end(), block);
 
1728
                        // when a multi block is received, it is cancelled
 
1729
                        // from all peers, so if this one hasn't requested
 
1730
                        // the block, just ignore to cancel it.
 
1731
                        if (it == m_request_queue.end()) return;
 
1732
 
 
1733
                        t->picker().abort_download(block);
 
1734
                        m_request_queue.erase(it);
 
1735
                        // since we found it in the request queue, it means it hasn't been
 
1736
                        // sent yet, so we don't have to send a cancel.
 
1737
                        return;
 
1738
                }
 
1739
                else
 
1740
                {       
 
1741
                        m_download_queue.erase(it);
 
1742
                        t->picker().abort_download(block);
 
1743
                }
 
1744
 
 
1745
                int block_offset = block.block_index * t->block_size();
 
1746
                int block_size
 
1747
                        = (std::min)((int)t->torrent_file().piece_size(block.piece_index)-block_offset,
 
1748
                        t->block_size());
 
1749
                TORRENT_ASSERT(block_size > 0);
 
1750
                TORRENT_ASSERT(block_size <= t->block_size());
 
1751
 
 
1752
                peer_request r;
 
1753
                r.piece = block.piece_index;
 
1754
                r.start = block_offset;
 
1755
                r.length = block_size;
 
1756
 
 
1757
                write_cancel(r);
 
1758
 
 
1759
#ifdef TORRENT_VERBOSE_LOGGING
 
1760
                (*m_logger) << time_now_string()
 
1761
                                << " ==> CANCEL  [ piece: " << block.piece_index << " | s: "
 
1762
                                << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n";
 
1763
#endif
 
1764
        }
 
1765
 
 
1766
        void peer_connection::send_choke()
 
1767
        {
 
1768
                INVARIANT_CHECK;
 
1769
 
 
1770
                TORRENT_ASSERT(!m_peer_info || !m_peer_info->optimistically_unchoked);
 
1771
 
 
1772
                if (m_choked) return;
 
1773
                write_choke();
 
1774
                m_choked = true;
 
1775
 
 
1776
#ifdef TORRENT_VERBOSE_LOGGING
 
1777
                (*m_logger) << time_now_string() << " ==> CHOKE\n";
 
1778
#endif
 
1779
#ifndef NDEBUG
 
1780
                m_last_choke = time_now();
 
1781
#endif
 
1782
                m_num_invalid_requests = 0;
 
1783
 
 
1784
                // reject the requests we have in the queue
 
1785
                std::for_each(m_requests.begin(), m_requests.end()
 
1786
                        , bind(&peer_connection::write_reject_request, this, _1));
 
1787
                m_requests.clear();
 
1788
        }
 
1789
 
 
1790
        void peer_connection::send_unchoke()
 
1791
        {
 
1792
                INVARIANT_CHECK;
 
1793
 
 
1794
                if (!m_choked) return;
 
1795
                m_last_unchoke = time_now();
 
1796
                write_unchoke();
 
1797
                m_choked = false;
 
1798
 
 
1799
#ifdef TORRENT_VERBOSE_LOGGING
 
1800
                (*m_logger) << time_now_string() << " ==> UNCHOKE\n";
 
1801
#endif
 
1802
        }
 
1803
 
 
1804
        void peer_connection::send_interested()
 
1805
        {
 
1806
                INVARIANT_CHECK;
 
1807
 
 
1808
                if (m_interesting) return;
 
1809
                write_interested();
 
1810
                m_interesting = true;
 
1811
 
 
1812
#ifdef TORRENT_VERBOSE_LOGGING
 
1813
                (*m_logger) << time_now_string() << " ==> INTERESTED\n";
 
1814
#endif
 
1815
        }
 
1816
 
 
1817
        void peer_connection::send_not_interested()
 
1818
        {
 
1819
                INVARIANT_CHECK;
 
1820
 
 
1821
                if (!m_interesting) return;
 
1822
                write_not_interested();
 
1823
                m_interesting = false;
 
1824
 
 
1825
                m_became_uninteresting = time_now();
 
1826
 
 
1827
#ifdef TORRENT_VERBOSE_LOGGING
 
1828
                (*m_logger) << time_now_string() << " ==> NOT_INTERESTED\n";
 
1829
#endif
 
1830
        }
 
1831
 
 
1832
        void peer_connection::send_block_requests()
 
1833
        {
 
1834
                INVARIANT_CHECK;
 
1835
                
 
1836
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
1837
                TORRENT_ASSERT(t);
 
1838
 
 
1839
                if ((int)m_download_queue.size() >= m_desired_queue_size) return;
 
1840
 
 
1841
                while (!m_request_queue.empty()
 
1842
                        && (int)m_download_queue.size() < m_desired_queue_size)
 
1843
                {
 
1844
                        piece_block block = m_request_queue.front();
 
1845
 
 
1846
                        int block_offset = block.block_index * t->block_size();
 
1847
                        int block_size = (std::min)((int)t->torrent_file().piece_size(
 
1848
                                block.piece_index) - block_offset, t->block_size());
 
1849
                        TORRENT_ASSERT(block_size > 0);
 
1850
                        TORRENT_ASSERT(block_size <= t->block_size());
 
1851
 
 
1852
                        peer_request r;
 
1853
                        r.piece = block.piece_index;
 
1854
                        r.start = block_offset;
 
1855
                        r.length = block_size;
 
1856
 
 
1857
                        m_request_queue.pop_front();
 
1858
                        m_download_queue.push_back(block);
 
1859
/*
 
1860
#ifdef TORRENT_VERBOSE_LOGGING
 
1861
                        (*m_logger) << time_now_string()
 
1862
                                << " *** REQUEST-QUEUE** [ "
 
1863
                                "piece: " << block.piece_index << " | "
 
1864
                                "block: " << block.block_index << " ]\n";
 
1865
#endif
 
1866
*/                      
 
1867
                        // if we are requesting large blocks, merge the smaller
 
1868
                        // blocks that are in the same piece into larger requests
 
1869
                        if (m_request_large_blocks)
 
1870
                        {
 
1871
                                int blocks_per_piece = t->torrent_file().piece_length() / t->block_size();
 
1872
 
 
1873
                                while (!m_request_queue.empty())
 
1874
                                {
 
1875
                                        // check to see if this block is connected to the previous one
 
1876
                                        // if it is, merge them, otherwise, break this merge loop
 
1877
                                        piece_block const& front = m_request_queue.front();
 
1878
                                        if (front.piece_index * blocks_per_piece + front.block_index
 
1879
                                                != block.piece_index * blocks_per_piece + block.block_index + 1)
 
1880
                                                break;
 
1881
                                        block = m_request_queue.front();
 
1882
                                        m_request_queue.pop_front();
 
1883
                                        m_download_queue.push_back(block);
 
1884
 
 
1885
#ifdef TORRENT_VERBOSE_LOGGING
 
1886
                                        (*m_logger) << time_now_string()
 
1887
                                                << " *** MERGING REQUEST ** [ "
 
1888
                                                "piece: " << block.piece_index << " | "
 
1889
                                                "block: " << block.block_index << " ]\n";
 
1890
#endif
 
1891
 
 
1892
                                        block_offset = block.block_index * t->block_size();
 
1893
                                        block_size = (std::min)((int)t->torrent_file().piece_size(
 
1894
                                                block.piece_index) - block_offset, t->block_size());
 
1895
                                        TORRENT_ASSERT(block_size > 0);
 
1896
                                        TORRENT_ASSERT(block_size <= t->block_size());
 
1897
 
 
1898
                                        r.length += block_size;
 
1899
                                }
 
1900
                        }
 
1901
 
 
1902
                        TORRENT_ASSERT(verify_piece(r));
 
1903
                        
 
1904
#ifndef TORRENT_DISABLE_EXTENSIONS
 
1905
                        bool handled = false;
 
1906
                        for (extension_list_t::iterator i = m_extensions.begin()
 
1907
                                , end(m_extensions.end()); i != end; ++i)
 
1908
                        {
 
1909
                                if (handled = (*i)->write_request(r)) break;
 
1910
                        }
 
1911
                        if (!handled)
 
1912
                        {
 
1913
                                write_request(r);
 
1914
                                m_last_request = time_now();
 
1915
                        }
 
1916
#else
 
1917
                        write_request(r);
 
1918
                        m_last_request = time_now();
 
1919
#endif
 
1920
 
 
1921
#ifdef TORRENT_VERBOSE_LOGGING
 
1922
                        (*m_logger) << time_now_string()
 
1923
                                << " ==> REQUEST [ "
 
1924
                                "piece: " << r.piece << " | "
 
1925
                                "s: " << r.start << " | "
 
1926
                                "l: " << r.length << " | "
 
1927
                                "ds: " << statistics().download_rate() << " B/s | "
 
1928
                                "qs: " << m_desired_queue_size << " "
 
1929
                                "blk: " << (m_request_large_blocks?"large":"single") << " ]\n";
 
1930
#endif
 
1931
                }
 
1932
                m_last_piece = time_now();
 
1933
        }
 
1934
 
 
1935
 
 
1936
        void close_socket_ignore_error(boost::shared_ptr<socket_type> s)
 
1937
        {
 
1938
                try { s->close(); } catch (std::exception& e) {}
 
1939
        }
 
1940
 
 
1941
        void peer_connection::timed_out()
 
1942
        {
 
1943
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
 
1944
                (*m_ses.m_logger) << "CONNECTION TIMED OUT: " << m_remote.address().to_string()
 
1945
                        << "\n";
 
1946
#endif
 
1947
                m_ses.connection_failed(m_socket, m_remote, "timed out");
 
1948
        }
 
1949
 
 
1950
        void peer_connection::disconnect()
 
1951
        {
 
1952
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
1953
 
 
1954
                boost::intrusive_ptr<peer_connection> me(this);
 
1955
 
 
1956
                INVARIANT_CHECK;
 
1957
 
 
1958
                if (m_disconnecting) return;
 
1959
                m_disconnecting = true;
 
1960
                if (m_connecting)
 
1961
                        m_ses.m_half_open.done(m_connection_ticket);
 
1962
 
 
1963
                m_ses.m_io_service.post(boost::bind(&close_socket_ignore_error, m_socket));
 
1964
 
 
1965
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
1966
 
 
1967
                if (t)
 
1968
                {
 
1969
                        if (t->has_picker())
 
1970
                        {
 
1971
                                piece_picker& picker = t->picker();
 
1972
 
 
1973
                                while (!m_download_queue.empty())
 
1974
                                {
 
1975
                                        picker.abort_download(m_download_queue.back());
 
1976
                                        m_download_queue.pop_back();
 
1977
                                }
 
1978
                                while (!m_request_queue.empty())
 
1979
                                {
 
1980
                                        picker.abort_download(m_request_queue.back());
 
1981
                                        m_request_queue.pop_back();
 
1982
                                }
 
1983
                        }
 
1984
 
 
1985
                        t->remove_peer(this);
 
1986
                        m_torrent.reset();
 
1987
                }
 
1988
 
 
1989
                m_ses.close_connection(me);
 
1990
        }
 
1991
 
 
1992
        void peer_connection::set_upload_limit(int limit)
 
1993
        {
 
1994
                TORRENT_ASSERT(limit >= -1);
 
1995
                if (limit == -1) limit = (std::numeric_limits<int>::max)();
 
1996
                if (limit < 10) limit = 10;
 
1997
                m_upload_limit = limit;
 
1998
                m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
 
1999
        }
 
2000
 
 
2001
        void peer_connection::set_download_limit(int limit)
 
2002
        {
 
2003
                TORRENT_ASSERT(limit >= -1);
 
2004
                if (limit == -1) limit = (std::numeric_limits<int>::max)();
 
2005
                if (limit < 10) limit = 10;
 
2006
                m_download_limit = limit;
 
2007
                m_bandwidth_limit[download_channel].throttle(m_download_limit);
 
2008
        }
 
2009
 
 
2010
        size_type peer_connection::share_diff() const
 
2011
        {
 
2012
                INVARIANT_CHECK;
 
2013
 
 
2014
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
2015
                TORRENT_ASSERT(t);
 
2016
 
 
2017
                float ratio = t->ratio();
 
2018
 
 
2019
                // if we have an infinite ratio, just say we have downloaded
 
2020
                // much more than we have uploaded. And we'll keep uploading.
 
2021
                if (ratio == 0.f)
 
2022
                        return (std::numeric_limits<size_type>::max)();
 
2023
 
 
2024
                return m_free_upload
 
2025
                        + static_cast<size_type>(m_statistics.total_payload_download() * ratio)
 
2026
                        - m_statistics.total_payload_upload();
 
2027
        }
 
2028
 
 
2029
        // defined in upnp.cpp
 
2030
        bool is_local(address const& a);
 
2031
 
 
2032
        bool peer_connection::on_local_network() const
 
2033
        {
 
2034
                if (libtorrent::is_local(m_remote.address())) return true;
 
2035
                return false;
 
2036
        }
 
2037
 
 
2038
        void peer_connection::get_peer_info(peer_info& p) const
 
2039
        {
 
2040
                TORRENT_ASSERT(!associated_torrent().expired());
 
2041
 
 
2042
                p.down_speed = statistics().download_rate();
 
2043
                p.up_speed = statistics().upload_rate();
 
2044
                p.payload_down_speed = statistics().download_payload_rate();
 
2045
                p.payload_up_speed = statistics().upload_payload_rate();
 
2046
                p.pid = pid();
 
2047
                p.ip = remote();
 
2048
                p.pending_disk_bytes = m_outstanding_writing_bytes;
 
2049
                
 
2050
#ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES       
 
2051
                p.country[0] = m_country[0];
 
2052
                p.country[1] = m_country[1];
 
2053
#endif
 
2054
 
 
2055
                p.total_download = statistics().total_payload_download();
 
2056
                p.total_upload = statistics().total_payload_upload();
 
2057
 
 
2058
                if (m_bandwidth_limit[upload_channel].throttle() == bandwidth_limit::inf)
 
2059
                        p.upload_limit = -1;
 
2060
                else
 
2061
                        p.upload_limit = m_bandwidth_limit[upload_channel].throttle();
 
2062
 
 
2063
                if (m_bandwidth_limit[download_channel].throttle() == bandwidth_limit::inf)
 
2064
                        p.download_limit = -1;
 
2065
                else
 
2066
                        p.download_limit = m_bandwidth_limit[download_channel].throttle();
 
2067
 
 
2068
                p.load_balancing = total_free_upload();
 
2069
 
 
2070
                p.download_queue_length = int(download_queue().size() + m_request_queue.size());
 
2071
                p.target_dl_queue_length = int(desired_queue_size());
 
2072
                p.upload_queue_length = int(upload_queue().size());
 
2073
 
 
2074
                if (boost::optional<piece_block_progress> ret = downloading_piece_progress())
 
2075
                {
 
2076
                        p.downloading_piece_index = ret->piece_index;
 
2077
                        p.downloading_block_index = ret->block_index;
 
2078
                        p.downloading_progress = ret->bytes_downloaded;
 
2079
                        p.downloading_total = ret->full_block_bytes;
 
2080
                }
 
2081
                else
 
2082
                {
 
2083
                        p.downloading_piece_index = -1;
 
2084
                        p.downloading_block_index = -1;
 
2085
                        p.downloading_progress = 0;
 
2086
                        p.downloading_total = 0;
 
2087
                }
 
2088
 
 
2089
                p.pieces = get_bitfield();
 
2090
                ptime now = time_now();
 
2091
                p.last_request = now - m_last_request;
 
2092
                p.last_active = now - (std::max)(m_last_sent, m_last_receive);
 
2093
 
 
2094
                // this will set the flags so that we can update them later
 
2095
                p.flags = 0;
 
2096
                get_specific_peer_info(p);
 
2097
 
 
2098
                p.flags |= is_seed() ? peer_info::seed : 0;
 
2099
                if (peer_info_struct())
 
2100
                {
 
2101
                        p.source = peer_info_struct()->source;
 
2102
                        p.failcount = peer_info_struct()->failcount;
 
2103
                        p.num_hashfails = peer_info_struct()->hashfails;
 
2104
                        p.flags |= peer_info_struct()->on_parole ? peer_info::on_parole : 0;
 
2105
                        p.flags |= peer_info_struct()->optimistically_unchoked ? peer_info::optimistic_unchoke : 0;
 
2106
                        p.remote_dl_rate = m_remote_dl_rate;
 
2107
                }
 
2108
                else
 
2109
                {
 
2110
                        p.source = 0;
 
2111
                        p.failcount = 0;
 
2112
                        p.num_hashfails = 0;
 
2113
                        p.remote_dl_rate = 0;
 
2114
                }
 
2115
 
 
2116
                p.send_buffer_size = m_send_buffer.capacity();
 
2117
        }
 
2118
 
 
2119
        void peer_connection::cut_receive_buffer(int size, int packet_size)
 
2120
        {
 
2121
                INVARIANT_CHECK;
 
2122
 
 
2123
                TORRENT_ASSERT(packet_size > 0);
 
2124
                TORRENT_ASSERT(int(m_recv_buffer.size()) >= size);
 
2125
                TORRENT_ASSERT(int(m_recv_buffer.size()) >= m_recv_pos);
 
2126
                TORRENT_ASSERT(m_recv_pos >= size);
 
2127
 
 
2128
                if (size > 0)           
 
2129
                        std::memmove(&m_recv_buffer[0], &m_recv_buffer[0] + size, m_recv_pos - size);
 
2130
 
 
2131
                m_recv_pos -= size;
 
2132
 
 
2133
#ifndef NDEBUG
 
2134
                std::fill(m_recv_buffer.begin() + m_recv_pos, m_recv_buffer.end(), 0);
 
2135
#endif
 
2136
 
 
2137
                m_packet_size = packet_size;
 
2138
                if (m_packet_size >= m_recv_pos) m_recv_buffer.resize(m_packet_size);
 
2139
        }
 
2140
 
 
2141
        void peer_connection::second_tick(float tick_interval) throw()
 
2142
        {
 
2143
                INVARIANT_CHECK;
 
2144
 
 
2145
                try
 
2146
                {
 
2147
 
 
2148
                ptime now(time_now());
 
2149
 
 
2150
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
2151
                TORRENT_ASSERT(t);
 
2152
 
 
2153
                on_tick();
 
2154
 
 
2155
#ifndef TORRENT_DISABLE_EXTENSIONS
 
2156
                for (extension_list_t::iterator i = m_extensions.begin()
 
2157
                        , end(m_extensions.end()); i != end; ++i)
 
2158
                {
 
2159
                        (*i)->tick();
 
2160
                }
 
2161
#endif
 
2162
 
 
2163
                m_ignore_bandwidth_limits = m_ses.settings().ignore_limits_on_local_network
 
2164
                        && on_local_network();
 
2165
 
 
2166
                m_statistics.second_tick(tick_interval);
 
2167
 
 
2168
                if (!t->valid_metadata()) return;
 
2169
 
 
2170
                // calculate the desired download queue size
 
2171
                const float queue_time = m_ses.settings().request_queue_time;
 
2172
                // (if the latency is more than this, the download will stall)
 
2173
                // so, the queue size is queue_time * down_rate / 16 kiB
 
2174
                // (16 kB is the size of each request)
 
2175
                // the minimum number of requests is 2 and the maximum is 48
 
2176
                // the block size doesn't have to be 16. So we first query the
 
2177
                // torrent for it
 
2178
                const int block_size = m_request_large_blocks
 
2179
                        ? t->torrent_file().piece_length() : t->block_size();
 
2180
                TORRENT_ASSERT(block_size > 0);
 
2181
                
 
2182
                m_desired_queue_size = static_cast<int>(queue_time
 
2183
                        * statistics().download_rate() / block_size);
 
2184
                if (m_desired_queue_size > m_max_out_request_queue)
 
2185
                        m_desired_queue_size = m_max_out_request_queue;
 
2186
                if (m_desired_queue_size < min_request_queue)
 
2187
                        m_desired_queue_size = min_request_queue;
 
2188
 
 
2189
                if (!m_download_queue.empty()
 
2190
                        && now - m_last_piece > seconds(m_ses.settings().piece_timeout))
 
2191
                {
 
2192
                        // this peer isn't sending the pieces we've
 
2193
                        // requested (this has been observed by BitComet)
 
2194
                        // in this case we'll clear our download queue and
 
2195
                        // re-request the blocks.
 
2196
#ifdef TORRENT_VERBOSE_LOGGING
 
2197
                        (*m_logger) << time_now_string()
 
2198
                                << " *** PIECE_REQUESTS TIMED OUT [ " << (int)m_download_queue.size()
 
2199
                                << " " << total_seconds(now - m_last_piece) << "] ***\n";
 
2200
#endif
 
2201
 
 
2202
                        if (t->is_seed())
 
2203
                        {
 
2204
                                m_download_queue.clear();
 
2205
                                m_request_queue.clear();
 
2206
                        }
 
2207
                        else
 
2208
                        {
 
2209
                                piece_picker& picker = t->picker();
 
2210
                                while (!m_download_queue.empty())
 
2211
                                {
 
2212
                                        piece_block const& r = m_download_queue.back();
 
2213
                                        picker.abort_download(r);
 
2214
                                        write_cancel(t->to_req(r));
 
2215
                                        m_download_queue.pop_back();
 
2216
                                }
 
2217
                                while (!m_request_queue.empty())
 
2218
                                {
 
2219
                                        piece_block const& r = m_request_queue.back();
 
2220
                                        picker.abort_download(r);
 
2221
                                        write_cancel(t->to_req(r));
 
2222
                                        m_request_queue.pop_back();
 
2223
                                }
 
2224
 
 
2225
                                m_assume_fifo = true;
 
2226
 
 
2227
                                request_a_block(*t, *this);
 
2228
                                send_block_requests();
 
2229
                        }
 
2230
                }
 
2231
 
 
2232
                // If the client sends more data
 
2233
                // we send it data faster, otherwise, slower.
 
2234
                // It will also depend on how much data the
 
2235
                // client has sent us. This is the mean to
 
2236
                // maintain the share ratio given by m_ratio
 
2237
                // with all peers.
 
2238
 
 
2239
                if (t->is_finished() || is_choked() || t->ratio() == 0.0f)
 
2240
                {
 
2241
                        // if we have downloaded more than one piece more
 
2242
                        // than we have uploaded OR if we are a seed
 
2243
                        // have an unlimited upload rate
 
2244
                        m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
 
2245
                }
 
2246
                else
 
2247
                {
 
2248
                        size_type bias = 0x10000 + 2 * t->block_size() + m_free_upload;
 
2249
 
 
2250
                        double break_even_time = 15; // seconds.
 
2251
                        size_type have_uploaded = m_statistics.total_payload_upload();
 
2252
                        size_type have_downloaded = m_statistics.total_payload_download();
 
2253
                        double download_speed = m_statistics.download_rate();
 
2254
 
 
2255
                        size_type soon_downloaded =
 
2256
                                have_downloaded + (size_type)(download_speed * break_even_time*1.5);
 
2257
 
 
2258
                        if (t->ratio() != 1.f)
 
2259
                                soon_downloaded = (size_type)(soon_downloaded*(double)t->ratio());
 
2260
 
 
2261
                        double upload_speed_limit = (std::min)((soon_downloaded - have_uploaded
 
2262
                                + bias) / break_even_time, double(m_upload_limit));
 
2263
 
 
2264
                        upload_speed_limit = (std::min)(upload_speed_limit,
 
2265
                                (double)(std::numeric_limits<int>::max)());
 
2266
 
 
2267
                        m_bandwidth_limit[upload_channel].throttle(
 
2268
                                (std::min)((std::max)((int)upload_speed_limit, 20)
 
2269
                                , m_upload_limit));
 
2270
                }
 
2271
 
 
2272
                // update once every minute
 
2273
                if (now - m_remote_dl_update >= seconds(60))
 
2274
                {
 
2275
                        float factor = 0.6666666666667f;
 
2276
                        
 
2277
                        if (m_remote_dl_rate == 0) factor = 0.0f;
 
2278
 
 
2279
                        m_remote_dl_rate = int((m_remote_dl_rate * factor) + 
 
2280
                                ((m_remote_bytes_dled * (1.0f-factor)) / 60.f));
 
2281
                        
 
2282
                        m_remote_bytes_dled = 0;
 
2283
                        m_remote_dl_update = now;
 
2284
                }
 
2285
 
 
2286
                fill_send_buffer();
 
2287
                }
 
2288
                catch (std::exception& e)
 
2289
                {
 
2290
#ifdef TORRENT_VERBOSE_LOGGING
 
2291
                        (*m_logger) << "**ERROR**: " << e.what() << "\n";
 
2292
#endif
 
2293
                        m_ses.connection_failed(m_socket, remote(), e.what());
 
2294
                }
 
2295
        }
 
2296
 
 
2297
        void peer_connection::fill_send_buffer()
 
2298
        {
 
2299
                INVARIANT_CHECK;
 
2300
 
 
2301
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
2302
                if (!t) return;
 
2303
 
 
2304
                // only add new piece-chunks if the send buffer is small enough
 
2305
                // otherwise there will be no end to how large it will be!
 
2306
                
 
2307
                int buffer_size_watermark = int(m_statistics.upload_rate()) / 2;
 
2308
                if (buffer_size_watermark < 1024) buffer_size_watermark = 1024;
 
2309
                else if (buffer_size_watermark > 80 * 1024) buffer_size_watermark = 80 * 1024;
 
2310
 
 
2311
                while (!m_requests.empty()
 
2312
                        && (send_buffer_size() + m_reading_bytes < buffer_size_watermark)
 
2313
                        && !m_choked)
 
2314
                {
 
2315
                        TORRENT_ASSERT(t->valid_metadata());
 
2316
                        peer_request& r = m_requests.front();
 
2317
                        
 
2318
                        TORRENT_ASSERT(r.piece >= 0);
 
2319
                        TORRENT_ASSERT(r.piece < (int)m_have_piece.size());
 
2320
                        TORRENT_ASSERT(t->have_piece(r.piece));
 
2321
                        TORRENT_ASSERT(r.start + r.length <= t->torrent_file().piece_size(r.piece));
 
2322
                        TORRENT_ASSERT(r.length > 0 && r.start >= 0);
 
2323
 
 
2324
                        t->filesystem().async_read(r, bind(&peer_connection::on_disk_read_complete
 
2325
                                , self(), _1, _2, r));
 
2326
                        m_reading_bytes += r.length;
 
2327
 
 
2328
                        m_requests.erase(m_requests.begin());
 
2329
                }
 
2330
        }
 
2331
 
 
2332
        void peer_connection::on_disk_read_complete(int ret, disk_io_job const& j, peer_request r)
 
2333
        {
 
2334
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2335
 
 
2336
                m_reading_bytes -= r.length;
 
2337
 
 
2338
                if (ret != r.length || m_torrent.expired())
 
2339
                {
 
2340
                        boost::shared_ptr<torrent> t = m_torrent.lock();
 
2341
                        if (!t)
 
2342
                        {
 
2343
                                m_ses.connection_failed(m_socket, remote(), j.str.c_str());
 
2344
                                return;
 
2345
                        }
 
2346
                
 
2347
                        if (t->alerts().should_post(alert::fatal))
 
2348
                        {
 
2349
                                std::string err = "torrent paused: disk read error";
 
2350
                                if (!j.str.empty())
 
2351
                                {
 
2352
                                        err += ", ";
 
2353
                                        err += j.str;
 
2354
                                }
 
2355
                                t->alerts().post_alert(file_error_alert(t->get_handle(), err));
 
2356
                        }
 
2357
                        t->pause();
 
2358
                        return;
 
2359
                }
 
2360
 
 
2361
#ifdef TORRENT_VERBOSE_LOGGING
 
2362
                (*m_logger) << time_now_string()
 
2363
                        << " ==> PIECE   [ piece: " << r.piece << " | s: " << r.start
 
2364
                        << " | l: " << r.length << " ]\n";
 
2365
#endif
 
2366
 
 
2367
                write_piece(r, j.buffer);
 
2368
                setup_send();
 
2369
        }
 
2370
 
 
2371
        void peer_connection::assign_bandwidth(int channel, int amount)
 
2372
        {
 
2373
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2374
 
 
2375
#ifdef TORRENT_VERBOSE_LOGGING
 
2376
                (*m_logger) << "bandwidth [ " << channel << " ] + " << amount << "\n";
 
2377
#endif
 
2378
 
 
2379
                m_bandwidth_limit[channel].assign(amount);
 
2380
                if (channel == upload_channel)
 
2381
                {
 
2382
                        TORRENT_ASSERT(m_writing);
 
2383
                        m_writing = false;
 
2384
                        setup_send();
 
2385
                }
 
2386
                else if (channel == download_channel)
 
2387
                {
 
2388
                        TORRENT_ASSERT(m_reading);
 
2389
                        m_reading = false;
 
2390
                        setup_receive();
 
2391
                }
 
2392
        }
 
2393
 
 
2394
        void peer_connection::expire_bandwidth(int channel, int amount)
 
2395
        {
 
2396
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2397
 
 
2398
                m_bandwidth_limit[channel].expire(amount);
 
2399
                if (channel == upload_channel)
 
2400
                {
 
2401
                        setup_send();
 
2402
                }
 
2403
                else if (channel == download_channel)
 
2404
                {
 
2405
                        setup_receive();
 
2406
                }
 
2407
        }
 
2408
 
 
2409
        void peer_connection::setup_send()
 
2410
        {
 
2411
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2412
 
 
2413
                INVARIANT_CHECK;
 
2414
 
 
2415
                if (m_writing) return;
 
2416
                
 
2417
                shared_ptr<torrent> t = m_torrent.lock();
 
2418
 
 
2419
                if (m_bandwidth_limit[upload_channel].quota_left() == 0
 
2420
                        && !m_send_buffer.empty()
 
2421
                        && !m_connecting
 
2422
                        && t
 
2423
                        && !m_ignore_bandwidth_limits)
 
2424
                {
 
2425
                        // in this case, we have data to send, but no
 
2426
                        // bandwidth. So, we simply request bandwidth
 
2427
                        // from the torrent
 
2428
                        TORRENT_ASSERT(t);
 
2429
                        if (m_bandwidth_limit[upload_channel].max_assignable() > 0)
 
2430
                        {
 
2431
#ifdef TORRENT_VERBOSE_LOGGING
 
2432
                                (*m_logger) << "req bandwidth [ " << upload_channel << " ]\n";
 
2433
#endif
 
2434
 
 
2435
                                TORRENT_ASSERT(!m_writing);
 
2436
                                // peers that we are not interested in are non-prioritized
 
2437
                                m_writing = true;
 
2438
                                t->request_bandwidth(upload_channel, self()
 
2439
                                        , !(is_interesting() && !has_peer_choked()));
 
2440
                        }
 
2441
                        return;
 
2442
                }
 
2443
 
 
2444
                if (!can_write()) return;
 
2445
 
 
2446
                TORRENT_ASSERT(!m_writing);
 
2447
 
 
2448
                // send the actual buffer
 
2449
                if (!m_send_buffer.empty())
 
2450
                {
 
2451
                        int amount_to_send = m_send_buffer.size();
 
2452
                        int quota_left = m_bandwidth_limit[upload_channel].quota_left();
 
2453
                        if (!m_ignore_bandwidth_limits && amount_to_send > quota_left)
 
2454
                                amount_to_send = quota_left;
 
2455
 
 
2456
                        TORRENT_ASSERT(amount_to_send > 0);
 
2457
 
 
2458
#ifdef TORRENT_VERBOSE_LOGGING
 
2459
                        (*m_logger) << "async_write " << amount_to_send << " bytes\n";
 
2460
#endif
 
2461
                        std::list<asio::const_buffer> const& vec = m_send_buffer.build_iovec(amount_to_send);
 
2462
                        m_socket->async_write_some(vec, bind(&peer_connection::on_send_data, self(), _1, _2));
 
2463
 
 
2464
                        m_writing = true;
 
2465
                }
 
2466
        }
 
2467
 
 
2468
        void peer_connection::setup_receive()
 
2469
        {
 
2470
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2471
 
 
2472
                INVARIANT_CHECK;
 
2473
 
 
2474
#ifdef TORRENT_VERBOSE_LOGGING
 
2475
                (*m_logger) << "setup_receive: reading = " << m_reading << "\n";
 
2476
#endif
 
2477
                if (m_reading) return;
 
2478
 
 
2479
                shared_ptr<torrent> t = m_torrent.lock();
 
2480
                
 
2481
                if (m_bandwidth_limit[download_channel].quota_left() == 0
 
2482
                        && !m_connecting
 
2483
                        && t
 
2484
                        && !m_ignore_bandwidth_limits)
 
2485
                {
 
2486
                        if (m_bandwidth_limit[download_channel].max_assignable() > 0)
 
2487
                        {
 
2488
#ifdef TORRENT_VERBOSE_LOGGING
 
2489
                                (*m_logger) << "req bandwidth [ " << download_channel << " ]\n";
 
2490
#endif
 
2491
                                m_reading = true;
 
2492
                                t->request_bandwidth(download_channel, self(), m_non_prioritized);
 
2493
                        }
 
2494
                        return;
 
2495
                }
 
2496
                
 
2497
                if (!can_read()) return;
 
2498
 
 
2499
                TORRENT_ASSERT(m_packet_size > 0);
 
2500
                int max_receive = m_packet_size - m_recv_pos;
 
2501
                int quota_left = m_bandwidth_limit[download_channel].quota_left();
 
2502
                if (!m_ignore_bandwidth_limits && max_receive > quota_left)
 
2503
                        max_receive = quota_left;
 
2504
 
 
2505
                if (max_receive == 0) return;
 
2506
 
 
2507
                TORRENT_ASSERT(m_recv_pos >= 0);
 
2508
                TORRENT_ASSERT(m_packet_size > 0);
 
2509
 
 
2510
                TORRENT_ASSERT(can_read());
 
2511
#ifdef TORRENT_VERBOSE_LOGGING
 
2512
                (*m_logger) << "async_read " << max_receive << " bytes\n";
 
2513
#endif
 
2514
                m_socket->async_read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
 
2515
                        , max_receive), bind(&peer_connection::on_receive_data, self(), _1, _2));
 
2516
                m_reading = true;
 
2517
        }
 
2518
 
 
2519
        void peer_connection::reset_recv_buffer(int packet_size)
 
2520
        {
 
2521
                TORRENT_ASSERT(packet_size > 0);
 
2522
                if (m_recv_pos > m_packet_size)
 
2523
                {
 
2524
                        cut_receive_buffer(m_packet_size, packet_size);
 
2525
                        return;
 
2526
                }
 
2527
                m_recv_pos = 0;
 
2528
                m_packet_size = packet_size;
 
2529
                if (int(m_recv_buffer.size()) < m_packet_size)
 
2530
                        m_recv_buffer.resize(m_packet_size);
 
2531
        }
 
2532
 
 
2533
        void peer_connection::send_buffer(char const* buf, int size)
 
2534
        {
 
2535
                int free_space = m_send_buffer.space_in_last_buffer();
 
2536
                if (free_space > size) free_space = size;
 
2537
                if (free_space > 0)
 
2538
                {
 
2539
                        m_send_buffer.append(buf, free_space);
 
2540
                        size -= free_space;
 
2541
                        buf += free_space;
 
2542
#ifdef TORRENT_STATS
 
2543
                        m_ses.m_buffer_usage_logger << log_time() << " send_buffer: "
 
2544
                                << free_space << std::endl;
 
2545
                        m_ses.log_buffer_usage();
 
2546
#endif
 
2547
                }
 
2548
                if (size <= 0) return;
 
2549
 
 
2550
                std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
 
2551
                TORRENT_ASSERT(buffer.second >= size);
 
2552
                std::memcpy(buffer.first, buf, size);
 
2553
                m_send_buffer.append_buffer(buffer.first, buffer.second, size
 
2554
                        , bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
 
2555
#ifdef TORRENT_STATS
 
2556
                m_ses.m_buffer_usage_logger << log_time() << " send_buffer_alloc: " << size << std::endl;
 
2557
                m_ses.log_buffer_usage();
 
2558
#endif
 
2559
                setup_send();
 
2560
        }
 
2561
 
 
2562
// TODO: change this interface to automatically call setup_send() when the
 
2563
// return value is destructed
 
2564
        buffer::interval peer_connection::allocate_send_buffer(int size)
 
2565
        {
 
2566
                char* insert = m_send_buffer.allocate_appendix(size);
 
2567
                if (insert == 0)
 
2568
                {
 
2569
                        std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
 
2570
                        TORRENT_ASSERT(buffer.second >= size);
 
2571
                        m_send_buffer.append_buffer(buffer.first, buffer.second, size
 
2572
                                , bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
 
2573
                        buffer::interval ret(buffer.first, buffer.first + size);
 
2574
#ifdef TORRENT_STATS
 
2575
                        m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer_alloc: " << size << std::endl;
 
2576
                        m_ses.log_buffer_usage();
 
2577
#endif
 
2578
                        return ret;
 
2579
                }
 
2580
                else
 
2581
                {
 
2582
#ifdef TORRENT_STATS
 
2583
                        m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer: " << size << std::endl;
 
2584
                        m_ses.log_buffer_usage();
 
2585
#endif
 
2586
                        buffer::interval ret(insert, insert + size);
 
2587
                        return ret;
 
2588
                }
 
2589
        }
 
2590
 
 
2591
        template<class T>
 
2592
        struct set_to_zero
 
2593
        {
 
2594
                set_to_zero(T& v, bool cond): m_val(v), m_cond(cond) {}
 
2595
                void fire() { if (!m_cond) return; m_cond = false; m_val = 0; }
 
2596
                ~set_to_zero() { if (m_cond) m_val = 0; }
 
2597
        private:
 
2598
                T& m_val;
 
2599
                bool m_cond;
 
2600
        };
 
2601
 
 
2602
        // --------------------------
 
2603
        // RECEIVE DATA
 
2604
        // --------------------------
 
2605
 
 
2606
        // throws exception when the client should be disconnected
 
2607
        void peer_connection::on_receive_data(const asio::error_code& error
 
2608
                , std::size_t bytes_transferred) try
 
2609
        {
 
2610
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2611
 
 
2612
                INVARIANT_CHECK;
 
2613
 
 
2614
                TORRENT_ASSERT(m_reading);
 
2615
                m_reading = false;
 
2616
 
 
2617
                if (error)
 
2618
                {
 
2619
#ifdef TORRENT_VERBOSE_LOGGING
 
2620
                        (*m_logger) << "**ERROR**: " << error.message() << "[in peer_connection::on_receive_data]\n";
 
2621
#endif
 
2622
                        on_receive(error, bytes_transferred);
 
2623
                        throw std::runtime_error(error.message());
 
2624
                }
 
2625
 
 
2626
                do
 
2627
                {
 
2628
#ifdef TORRENT_VERBOSE_LOGGING
 
2629
                        (*m_logger) << "read " << bytes_transferred << " bytes\n";
 
2630
#endif
 
2631
                        // correct the dl quota usage, if not all of the buffer was actually read
 
2632
                        if (!m_ignore_bandwidth_limits)
 
2633
                                m_bandwidth_limit[download_channel].use_quota(bytes_transferred);
 
2634
 
 
2635
                        if (m_disconnecting) return;
 
2636
        
 
2637
                        TORRENT_ASSERT(m_packet_size > 0);
 
2638
                        TORRENT_ASSERT(bytes_transferred > 0);
 
2639
 
 
2640
                        m_last_receive = time_now();
 
2641
                        m_recv_pos += bytes_transferred;
 
2642
                        TORRENT_ASSERT(m_recv_pos <= int(m_recv_buffer.size()));
 
2643
                
 
2644
                        on_receive(error, bytes_transferred);
 
2645
 
 
2646
                        TORRENT_ASSERT(m_packet_size > 0);
 
2647
 
 
2648
                        if (m_peer_choked
 
2649
                                && m_recv_pos == 0
 
2650
                                && (m_recv_buffer.capacity() - m_packet_size) > 128)
 
2651
                        {
 
2652
                                buffer(m_packet_size).swap(m_recv_buffer);
 
2653
                        }
 
2654
 
 
2655
                        int max_receive = m_packet_size - m_recv_pos;
 
2656
                        int quota_left = m_bandwidth_limit[download_channel].quota_left();
 
2657
                        if (!m_ignore_bandwidth_limits && max_receive > quota_left)
 
2658
                                max_receive = quota_left;
 
2659
 
 
2660
                        if (max_receive == 0) break;
 
2661
 
 
2662
                        asio::error_code ec;    
 
2663
                        bytes_transferred = m_socket->read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
 
2664
                                , max_receive), ec);
 
2665
                        if (ec && ec != asio::error::would_block)
 
2666
                                throw asio::system_error(ec);
 
2667
                }
 
2668
                while (bytes_transferred > 0);
 
2669
 
 
2670
                setup_receive();        
 
2671
        }
 
2672
        catch (file_error& e)
 
2673
        {
 
2674
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2675
                
 
2676
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
2677
                if (!t)
 
2678
                {
 
2679
                        m_ses.connection_failed(m_socket, remote(), e.what());
 
2680
                        return;
 
2681
                }
 
2682
                
 
2683
                if (t->alerts().should_post(alert::fatal))
 
2684
                {
 
2685
                        t->alerts().post_alert(
 
2686
                                file_error_alert(t->get_handle()
 
2687
                                , std::string("torrent paused: ") + e.what()));
 
2688
                }
 
2689
                t->pause();
 
2690
        }
 
2691
        catch (std::exception& e)
 
2692
        {
 
2693
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2694
                m_ses.connection_failed(m_socket, remote(), e.what());
 
2695
        }
 
2696
        catch (...)
 
2697
        {
 
2698
                // all exceptions should derive from std::exception
 
2699
                TORRENT_ASSERT(false);
 
2700
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2701
                m_ses.connection_failed(m_socket, remote(), "connection failed for unknown reason");
 
2702
        }
 
2703
 
 
2704
        bool peer_connection::can_write() const
 
2705
        {
 
2706
                INVARIANT_CHECK;
 
2707
 
 
2708
                // if we have requests or pending data to be sent or announcements to be made
 
2709
                // we want to send data
 
2710
                return !m_send_buffer.empty()
 
2711
                        && (m_bandwidth_limit[upload_channel].quota_left() > 0
 
2712
                                || m_ignore_bandwidth_limits)
 
2713
                        && !m_connecting;
 
2714
        }
 
2715
 
 
2716
        bool peer_connection::can_read() const
 
2717
        {
 
2718
                INVARIANT_CHECK;
 
2719
 
 
2720
                bool ret = (m_bandwidth_limit[download_channel].quota_left() > 0
 
2721
                                || m_ignore_bandwidth_limits)
 
2722
                        && !m_connecting
 
2723
                        && m_outstanding_writing_bytes <
 
2724
                                m_ses.settings().max_outstanding_disk_bytes_per_connection;
 
2725
                
 
2726
#if defined(TORRENT_VERBOSE_LOGGING)
 
2727
                (*m_logger) << "*** can_read() " << ret << " reading: " << m_reading << "\n";
 
2728
#endif
 
2729
                
 
2730
                return ret;
 
2731
        }
 
2732
 
 
2733
        void peer_connection::connect(int ticket)
 
2734
        {
 
2735
                INVARIANT_CHECK;
 
2736
 
 
2737
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
 
2738
                (*m_ses.m_logger) << "CONNECTING: " << m_remote.address().to_string()
 
2739
                        << ":" << m_remote.port() << "\n";
 
2740
#endif
 
2741
 
 
2742
                m_connection_ticket = ticket;
 
2743
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
2744
                TORRENT_ASSERT(t);
 
2745
 
 
2746
                m_queued = false;
 
2747
                TORRENT_ASSERT(m_connecting);
 
2748
                m_socket->open(t->get_interface().protocol());
 
2749
 
 
2750
                // set the socket to non-blocking, so that we can
 
2751
                // read the entire buffer on each read event we get
 
2752
                tcp::socket::non_blocking_io ioc(true);
 
2753
                m_socket->io_control(ioc);
 
2754
                m_socket->bind(t->get_interface());
 
2755
                m_socket->async_connect(m_remote
 
2756
                        , bind(&peer_connection::on_connection_complete, self(), _1));
 
2757
 
 
2758
                if (t->alerts().should_post(alert::debug))
 
2759
                {
 
2760
                        t->alerts().post_alert(peer_error_alert(
 
2761
                                m_remote, m_peer_id, "connecting to peer"));
 
2762
                }
 
2763
        }
 
2764
        
 
2765
        void peer_connection::on_connection_complete(asio::error_code const& e) try
 
2766
        {
 
2767
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2768
 
 
2769
                INVARIANT_CHECK;
 
2770
 
 
2771
                if (m_disconnecting) return;
 
2772
 
 
2773
                m_connecting = false;
 
2774
                m_ses.m_half_open.done(m_connection_ticket);
 
2775
 
 
2776
                if (e)
 
2777
                {
 
2778
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
 
2779
                        (*m_ses.m_logger) << "CONNECTION FAILED: " << m_remote.address().to_string()
 
2780
                                << ": " << e.message() << "\n";
 
2781
#endif
 
2782
                        m_ses.connection_failed(m_socket, m_remote, e.message().c_str());
 
2783
                        return;
 
2784
                }
 
2785
 
 
2786
                if (m_disconnecting) return;
 
2787
                m_last_receive = time_now();
 
2788
 
 
2789
                // this means the connection just succeeded
 
2790
 
 
2791
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
 
2792
                (*m_ses.m_logger) << "COMPLETED: " << m_remote.address().to_string() << "\n";
 
2793
#endif
 
2794
 
 
2795
                on_connected();
 
2796
                setup_send();
 
2797
                setup_receive();
 
2798
        }
 
2799
        catch (std::exception& ex)
 
2800
        {
 
2801
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2802
                m_ses.connection_failed(m_socket, remote(), ex.what());
 
2803
        }
 
2804
        catch (...)
 
2805
        {
 
2806
                // all exceptions should derive from std::exception
 
2807
                TORRENT_ASSERT(false);
 
2808
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2809
                m_ses.connection_failed(m_socket, remote(), "connection failed for unkown reason");
 
2810
        }
 
2811
        
 
2812
        // --------------------------
 
2813
        // SEND DATA
 
2814
        // --------------------------
 
2815
 
 
2816
        // throws exception when the client should be disconnected
 
2817
        void peer_connection::on_send_data(asio::error_code const& error
 
2818
                , std::size_t bytes_transferred) try
 
2819
        {
 
2820
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2821
 
 
2822
                INVARIANT_CHECK;
 
2823
 
 
2824
                TORRENT_ASSERT(m_writing);
 
2825
 
 
2826
                m_send_buffer.pop_front(bytes_transferred);
 
2827
                
 
2828
                m_writing = false;
 
2829
 
 
2830
                if (!m_ignore_bandwidth_limits)
 
2831
                        m_bandwidth_limit[upload_channel].use_quota(bytes_transferred);
 
2832
 
 
2833
#ifdef TORRENT_VERBOSE_LOGGING
 
2834
                (*m_logger) << "wrote " << bytes_transferred << " bytes\n";
 
2835
#endif
 
2836
 
 
2837
                if (error)
 
2838
                {
 
2839
#ifdef TORRENT_VERBOSE_LOGGING
 
2840
                        (*m_logger) << "**ERROR**: " << error.message() << " [in peer_connection::on_send_data]\n";
 
2841
#endif
 
2842
                        throw std::runtime_error(error.message());
 
2843
                }
 
2844
                if (m_disconnecting) return;
 
2845
 
 
2846
                TORRENT_ASSERT(!m_connecting);
 
2847
                TORRENT_ASSERT(bytes_transferred > 0);
 
2848
 
 
2849
                m_last_sent = time_now();
 
2850
 
 
2851
                on_sent(error, bytes_transferred);
 
2852
                fill_send_buffer();
 
2853
 
 
2854
                setup_send();
 
2855
        }
 
2856
        catch (std::exception& e)
 
2857
        {
 
2858
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2859
                m_ses.connection_failed(m_socket, remote(), e.what());
 
2860
        }
 
2861
        catch (...)
 
2862
        {
 
2863
                // all exceptions should derive from std::exception
 
2864
                TORRENT_ASSERT(false);
 
2865
                session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
 
2866
                m_ses.connection_failed(m_socket, remote(), "connection failed for unknown reason");
 
2867
        }
 
2868
 
 
2869
 
 
2870
#ifndef NDEBUG
 
2871
        void peer_connection::check_invariant() const
 
2872
        {
 
2873
                if (m_peer_info)
 
2874
                {
 
2875
                        TORRENT_ASSERT(m_peer_info->connection == this
 
2876
                                || m_peer_info->connection == 0);
 
2877
 
 
2878
                        if (m_peer_info->optimistically_unchoked)
 
2879
                                TORRENT_ASSERT(!is_choked());
 
2880
                }
 
2881
 
 
2882
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
2883
                if (!t)
 
2884
                {
 
2885
                        typedef session_impl::torrent_map torrent_map;
 
2886
                        torrent_map& m = m_ses.m_torrents;
 
2887
                        for (torrent_map::iterator i = m.begin(), end(m.end()); i != end; ++i)
 
2888
                        {
 
2889
                                torrent& t = *i->second;
 
2890
                                TORRENT_ASSERT(t.connection_for(m_remote) != this);
 
2891
                        }
 
2892
                        return;
 
2893
                }
 
2894
 
 
2895
                TORRENT_ASSERT(t->connection_for(remote()) != 0 || m_in_constructor);
 
2896
 
 
2897
                if (!m_in_constructor && t->connection_for(remote()) != this
 
2898
                        && !m_ses.settings().allow_multiple_connections_per_ip)
 
2899
                {
 
2900
                        TORRENT_ASSERT(false);
 
2901
                }
 
2902
 
 
2903
                if (t->has_picker() && !t->is_aborted())
 
2904
                {
 
2905
                        // make sure that pieces that have completed the download
 
2906
                        // of all their blocks are in the disk io thread's queue
 
2907
                        // to be checked.
 
2908
                        const std::vector<piece_picker::downloading_piece>& dl_queue
 
2909
                                = t->picker().get_download_queue();
 
2910
                        for (std::vector<piece_picker::downloading_piece>::const_iterator i =
 
2911
                                dl_queue.begin(); i != dl_queue.end(); ++i)
 
2912
                        {
 
2913
                                const int blocks_per_piece = t->picker().blocks_in_piece(i->index);
 
2914
 
 
2915
                                bool complete = true;
 
2916
                                for (int j = 0; j < blocks_per_piece; ++j)
 
2917
                                {
 
2918
                                        if (i->info[j].state == piece_picker::block_info::state_finished)
 
2919
                                                continue;
 
2920
                                        complete = false;
 
2921
                                        break;
 
2922
                                }
 
2923
                                if (complete)
 
2924
                                {
 
2925
                                        disk_io_job ret = m_ses.m_disk_thread.find_job(
 
2926
                                                &t->filesystem(), -1, i->index);
 
2927
                                        TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write);
 
2928
                                        TORRENT_ASSERT(ret.piece == i->index);
 
2929
                                }
 
2930
                        }
 
2931
                }
 
2932
// expensive when using checked iterators
 
2933
/*
 
2934
                if (t->valid_metadata())
 
2935
                {
 
2936
                        int piece_count = std::count(m_have_piece.begin()
 
2937
                                , m_have_piece.end(), true);
 
2938
                        if (m_num_pieces != piece_count)
 
2939
                        {
 
2940
                                TORRENT_ASSERT(false);
 
2941
                        }
 
2942
                }
 
2943
*/
 
2944
 
 
2945
// extremely expensive invariant check
 
2946
/*
 
2947
                if (!t->is_seed())
 
2948
                {
 
2949
                        piece_picker& p = t->picker();
 
2950
                        const std::vector<piece_picker::downloading_piece>& dlq = p.get_download_queue();
 
2951
                        const int blocks_per_piece = static_cast<int>(
 
2952
                                t->torrent_file().piece_length() / t->block_size());
 
2953
 
 
2954
                        for (std::vector<piece_picker::downloading_piece>::const_iterator i =
 
2955
                                dlq.begin(); i != dlq.end(); ++i)
 
2956
                        {
 
2957
                                for (int j = 0; j < blocks_per_piece; ++j)
 
2958
                                {
 
2959
                                        if (std::find(m_request_queue.begin(), m_request_queue.end()
 
2960
                                                , piece_block(i->index, j)) != m_request_queue.end()
 
2961
                                                ||
 
2962
                                                std::find(m_download_queue.begin(), m_download_queue.end()
 
2963
                                                , piece_block(i->index, j)) != m_download_queue.end())
 
2964
                                        {
 
2965
                                                TORRENT_ASSERT(i->info[j].peer == m_remote);
 
2966
                                        }
 
2967
                                        else
 
2968
                                        {
 
2969
                                                TORRENT_ASSERT(i->info[j].peer != m_remote || i->info[j].finished);
 
2970
                                        }
 
2971
                                }
 
2972
                        }
 
2973
                }
 
2974
*/
 
2975
        }
 
2976
#endif
 
2977
 
 
2978
        bool peer_connection::has_timed_out() const
 
2979
        {
 
2980
                // TODO: the timeout should be called by an event
 
2981
                INVARIANT_CHECK;
 
2982
 
 
2983
                ptime now(time_now());
 
2984
                
 
2985
                // if the socket is still connecting, don't
 
2986
                // consider it timed out. Because Windows XP SP2
 
2987
                // may delay connection attempts.
 
2988
                if (m_connecting) return false;
 
2989
                
 
2990
                // if the peer hasn't said a thing for a certain
 
2991
                // time, it is considered to have timed out
 
2992
                time_duration d;
 
2993
                d = now - m_last_receive;
 
2994
                if (d > seconds(m_timeout))
 
2995
                {
 
2996
#ifdef TORRENT_VERBOSE_LOGGING
 
2997
                        (*m_logger) << time_now_string() << " *** LAST ACTIVITY [ "
 
2998
                                << total_seconds(d) << " seconds ago ] ***\n";
 
2999
#endif
 
3000
                        return true;
 
3001
                }
 
3002
 
 
3003
                // do not stall waiting for a handshake
 
3004
                if (in_handshake() && d > seconds(m_ses.settings().handshake_timeout))
 
3005
                {
 
3006
#ifdef TORRENT_VERBOSE_LOGGING
 
3007
                        (*m_logger) << time_now_string() << " *** NO HANDSHAKE [ waited "
 
3008
                                << total_seconds(d) << " seconds ] ***\n";
 
3009
#endif
 
3010
                        return true;
 
3011
                }
 
3012
 
 
3013
                // disconnect peers that we unchoked, but
 
3014
                // they didn't send a request within 20 seconds.
 
3015
                // but only if we're a seed
 
3016
                boost::shared_ptr<torrent> t = m_torrent.lock();
 
3017
                d = now - (std::max)(m_last_unchoke, m_last_incoming_request);
 
3018
                if (m_requests.empty()
 
3019
                        && !m_choked
 
3020
                        && m_peer_interested
 
3021
                        && t && t->is_finished()
 
3022
                        && d > seconds(20))
 
3023
                {
 
3024
#ifdef TORRENT_VERBOSE_LOGGING
 
3025
                        (*m_logger) << time_now_string() << " *** NO REQUEST [ t: "
 
3026
                                << total_seconds(d) << " ] ***\n";
 
3027
#endif
 
3028
                        return true;
 
3029
                }
 
3030
 
 
3031
                // TODO: as long as we have less than 95% of the
 
3032
                // global (or local) connection limit, connections should
 
3033
                // never time out for another reason
 
3034
 
 
3035
                // if the peer hasn't become interested and we haven't
 
3036
                // become interested in the peer for 10 minutes, it
 
3037
                // has also timed out.
 
3038
                time_duration d1;
 
3039
                time_duration d2;
 
3040
                d1 = now - m_became_uninterested;
 
3041
                d2 = now - m_became_uninteresting;
 
3042
                time_duration time_limit = seconds(
 
3043
                        m_ses.settings().inactivity_timeout);
 
3044
 
 
3045
                // don't bother disconnect peers we haven't been intersted
 
3046
                // in (and that hasn't been interested in us) for a while
 
3047
                // unless we have used up all our connection slots
 
3048
                if (!m_interesting
 
3049
                        && !m_peer_interested
 
3050
                        && d1 > time_limit
 
3051
                        && d2 > time_limit
 
3052
                        && (m_ses.num_connections() >= m_ses.max_connections()
 
3053
                        || (t && t->num_peers() >= t->max_connections())))
 
3054
                {
 
3055
#ifdef TORRENT_VERBOSE_LOGGING
 
3056
                        (*m_logger) << time_now_string() << " *** MUTUAL NO INTEREST [ "
 
3057
                                "t1: " << total_seconds(d1) << " | "
 
3058
                                "t2: " << total_seconds(d2) << " ] ***\n";
 
3059
#endif
 
3060
                        return true;
 
3061
                }
 
3062
 
 
3063
                return false;
 
3064
        }
 
3065
 
 
3066
        peer_connection::peer_speed_t peer_connection::peer_speed()
 
3067
        {
 
3068
                shared_ptr<torrent> t = m_torrent.lock();
 
3069
                TORRENT_ASSERT(t);
 
3070
 
 
3071
                int download_rate = int(statistics().download_payload_rate());
 
3072
                int torrent_download_rate = int(t->statistics().download_payload_rate());
 
3073
 
 
3074
                if (download_rate > 512 && download_rate > torrent_download_rate / 16)
 
3075
                        m_speed = fast;
 
3076
                else if (download_rate > 4096 && download_rate > torrent_download_rate / 64)
 
3077
                        m_speed = medium;
 
3078
                else if (download_rate < torrent_download_rate / 15 && m_speed == fast)
 
3079
                        m_speed = medium;
 
3080
                else if (download_rate < torrent_download_rate / 63 && m_speed == medium)
 
3081
                        m_speed = slow;
 
3082
 
 
3083
                return m_speed;
 
3084
        }
 
3085
 
 
3086
        void peer_connection::keep_alive()
 
3087
        {
 
3088
                INVARIANT_CHECK;
 
3089
 
 
3090
                time_duration d;
 
3091
                d = time_now() - m_last_sent;
 
3092
                if (total_seconds(d) < m_timeout / 2) return;
 
3093
                
 
3094
                if (m_connecting) return;
 
3095
                if (in_handshake()) return;
 
3096
 
 
3097
                // if the last send has not completed yet, do not send a keep
 
3098
                // alive
 
3099
                if (m_writing) return;
 
3100
 
 
3101
#ifdef TORRENT_VERBOSE_LOGGING
 
3102
                (*m_logger) << time_now_string() << " ==> KEEPALIVE\n";
 
3103
#endif
 
3104
 
 
3105
                m_last_sent = time_now();
 
3106
                write_keepalive();
 
3107
        }
 
3108
 
 
3109
        bool peer_connection::is_seed() const
 
3110
        {
 
3111
                INVARIANT_CHECK;
 
3112
                // if m_num_pieces == 0, we probably don't have the
 
3113
                // metadata yet.
 
3114
                return m_num_pieces == (int)m_have_piece.size() && m_num_pieces > 0;
 
3115
        }
 
3116
}
 
3117