3
Copyright (c) 2003, Arvid Norberg
6
Redistribution and use in source and binary forms, with or without
7
modification, are permitted provided that the following conditions
10
* Redistributions of source code must retain the above copyright
11
notice, this list of conditions and the following disclaimer.
12
* Redistributions in binary form must reproduce the above copyright
13
notice, this list of conditions and the following disclaimer in
14
the documentation and/or other materials provided with the distribution.
15
* Neither the name of the author nor the names of its
16
contributors may be used to endorse or promote products derived
17
from this software without specific prior written permission.
19
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
POSSIBILITY OF SUCH DAMAGE.
33
#include "libtorrent/pch.hpp"
39
#include <boost/bind.hpp>
41
#include "libtorrent/peer_connection.hpp"
42
#include "libtorrent/identify_client.hpp"
43
#include "libtorrent/entry.hpp"
44
#include "libtorrent/bencode.hpp"
45
#include "libtorrent/alert_types.hpp"
46
#include "libtorrent/invariant_check.hpp"
47
#include "libtorrent/io.hpp"
48
#include "libtorrent/file.hpp"
49
#include "libtorrent/version.hpp"
50
#include "libtorrent/extensions.hpp"
51
#include "libtorrent/aux_/session_impl.hpp"
52
#include "libtorrent/policy.hpp"
53
#include "libtorrent/socket_type.hpp"
54
#include "libtorrent/assert.hpp"
57
using boost::shared_ptr;
58
using libtorrent::aux::session_impl;
63
// outbound connection
64
peer_connection::peer_connection(
66
, boost::weak_ptr<torrent> tor
67
, shared_ptr<socket_type> s
68
, tcp::endpoint const& remote
69
, policy::peer* peerinfo)
72
m_last_choke(time_now() - hours(1))
76
, m_max_out_request_queue(m_ses.settings().max_out_request_queue)
77
, m_timeout(m_ses.settings().peer_timeout)
78
, m_last_piece(time_now())
79
, m_last_request(time_now())
80
, m_last_incoming_request(min_time())
81
, m_last_unchoke(min_time())
85
, m_last_receive(time_now())
86
, m_last_sent(time_now())
91
, m_peer_interested(false)
93
, m_interesting(false)
96
, m_ignore_bandwidth_limits(false)
99
, m_desired_queue_size(2)
101
, m_assume_fifo(false)
102
, m_num_invalid_requests(0)
103
, m_disconnecting(false)
104
, m_became_uninterested(time_now())
105
, m_became_uninteresting(time_now())
110
, m_prefer_whole_pieces(false)
111
, m_request_large_blocks(false)
112
, m_non_prioritized(false)
113
, m_upload_limit(bandwidth_limit::inf)
114
, m_download_limit(bandwidth_limit::inf)
115
, m_peer_info(peerinfo)
117
, m_connection_ticket(-1)
118
, m_remote_bytes_dled(0)
119
, m_remote_dl_rate(0)
120
, m_remote_dl_update(time_now())
121
, m_outstanding_writing_bytes(0)
122
, m_fast_reconnect(false)
124
, m_in_constructor(true)
127
#ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
128
std::fill(m_country, m_country + 2, 0);
130
#ifdef TORRENT_VERBOSE_LOGGING
131
m_logger = m_ses.create_log(m_remote.address().to_string() + "_"
132
+ boost::lexical_cast<std::string>(m_remote.port()), m_ses.listen_port());
133
(*m_logger) << "*** OUTGOING CONNECTION\n";
136
boost::shared_ptr<torrent> t = m_torrent.lock();
138
std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
140
if (t->ready_for_connections())
144
// incoming connection
145
peer_connection::peer_connection(
147
, boost::shared_ptr<socket_type> s
148
, policy::peer* peerinfo)
151
m_last_choke(time_now() - hours(1))
155
, m_max_out_request_queue(m_ses.settings().max_out_request_queue)
156
, m_timeout(m_ses.settings().peer_timeout)
157
, m_last_piece(time_now())
158
, m_last_request(time_now())
159
, m_last_incoming_request(min_time())
160
, m_last_unchoke(min_time())
164
, m_last_receive(time_now())
165
, m_last_sent(time_now())
168
, m_peer_interested(false)
169
, m_peer_choked(true)
170
, m_interesting(false)
173
, m_ignore_bandwidth_limits(false)
176
, m_desired_queue_size(2)
178
, m_assume_fifo(false)
179
, m_num_invalid_requests(0)
180
, m_disconnecting(false)
181
, m_became_uninterested(time_now())
182
, m_became_uninteresting(time_now())
183
, m_connecting(false)
187
, m_prefer_whole_pieces(false)
188
, m_request_large_blocks(false)
189
, m_non_prioritized(false)
190
, m_upload_limit(bandwidth_limit::inf)
191
, m_download_limit(bandwidth_limit::inf)
192
, m_peer_info(peerinfo)
194
, m_connection_ticket(-1)
195
, m_remote_bytes_dled(0)
196
, m_remote_dl_rate(0)
197
, m_remote_dl_update(time_now())
198
, m_outstanding_writing_bytes(0)
199
, m_fast_reconnect(false)
201
, m_in_constructor(true)
204
tcp::socket::non_blocking_io ioc(true);
205
m_socket->io_control(ioc);
206
#ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
207
std::fill(m_country, m_country + 2, 0);
209
m_remote = m_socket->remote_endpoint();
211
#ifdef TORRENT_VERBOSE_LOGGING
212
TORRENT_ASSERT(m_socket->remote_endpoint() == remote());
213
m_logger = m_ses.create_log(remote().address().to_string() + "_"
214
+ boost::lexical_cast<std::string>(remote().port()), m_ses.listen_port());
215
(*m_logger) << "*** INCOMING CONNECTION\n";
218
std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
221
void peer_connection::update_interest()
225
boost::shared_ptr<torrent> t = m_torrent.lock();
228
bool interested = false;
229
const std::vector<bool>& we_have = t->pieces();
230
for (int j = 0; j != (int)we_have.size(); ++j)
233
&& t->piece_priority(j) > 0
243
send_not_interested();
245
t->get_policy().peer_is_interesting(*this);
247
// may throw an asio error if socket has disconnected
248
catch (std::exception& e) {}
250
TORRENT_ASSERT(is_interesting() == interested);
253
#ifndef TORRENT_DISABLE_EXTENSIONS
254
void peer_connection::add_extension(boost::shared_ptr<peer_plugin> ext)
256
m_extensions.push_back(ext);
260
void peer_connection::send_allowed_set()
264
boost::shared_ptr<torrent> t = m_torrent.lock();
267
int num_allowed_pieces = m_ses.settings().allowed_fast_set_size;
268
int num_pieces = t->torrent_file().num_pieces();
270
if (num_allowed_pieces >= num_pieces)
272
for (int i = 0; i < num_pieces; ++i)
274
#ifdef TORRENT_VERBOSE_LOGGING
275
(*m_logger) << time_now_string()
276
<< " ==> ALLOWED_FAST [ " << i << " ]\n";
279
m_accept_fast.insert(i);
285
address const& addr = m_remote.address();
288
address_v4::bytes_type bytes = addr.to_v4().to_bytes();
289
x.assign((char*)&bytes[0], bytes.size());
293
address_v6::bytes_type bytes = addr.to_v6().to_bytes();
294
x.assign((char*)&bytes[0], bytes.size());
296
x.append((char*)&t->torrent_file().info_hash()[0], 20);
298
sha1_hash hash = hasher(&x[0], x.size()).final();
301
char* p = (char*)&hash[0];
302
for (int i = 0; i < 5; ++i)
304
int piece = detail::read_uint32(p) % num_pieces;
305
if (m_accept_fast.find(piece) == m_accept_fast.end())
307
#ifdef TORRENT_VERBOSE_LOGGING
308
(*m_logger) << time_now_string()
309
<< " ==> ALLOWED_FAST [ " << piece << " ]\n";
311
write_allow_fast(piece);
312
m_accept_fast.insert(piece);
313
if (int(m_accept_fast.size()) >= num_allowed_pieces
314
|| int(m_accept_fast.size()) == num_pieces) return;
317
hash = hasher((char*)&hash[0], 20).final();
321
void peer_connection::init()
325
boost::shared_ptr<torrent> t = m_torrent.lock();
327
TORRENT_ASSERT(t->valid_metadata());
328
TORRENT_ASSERT(t->ready_for_connections());
330
m_have_piece.resize(t->torrent_file().num_pieces(), m_have_all);
332
// now that we have a piece_picker,
333
// update it with this peers pieces
335
int num_pieces = std::count(m_have_piece.begin(), m_have_piece.end(), true);
336
if (num_pieces == int(m_have_piece.size()))
338
#ifdef TORRENT_VERBOSE_LOGGING
339
(*m_logger) << " *** THIS IS A SEED ***\n";
341
// if this is a web seed. we don't have a peer_info struct
342
if (m_peer_info) m_peer_info->seed = true;
343
// if we're a seed too, disconnect
344
if (t->is_finished())
346
throw std::runtime_error("seed to seed connection redundant, disconnecting");
348
m_num_pieces = num_pieces;
350
if (!t->is_finished())
351
t->get_policy().peer_is_interesting(*this);
355
m_num_pieces = num_pieces;
356
// if we're a seed, we don't keep track of piece availability
359
bool interesting = false;
360
for (int i = 0; i < int(m_have_piece.size()); ++i)
365
// if the peer has a piece and we don't, the peer is interesting
366
if (!t->have_piece(i)
367
&& t->picker().piece_priority(i) != 0)
372
t->get_policy().peer_is_interesting(*this);
376
peer_connection::~peer_connection()
379
TORRENT_ASSERT(m_disconnecting);
381
#ifdef TORRENT_VERBOSE_LOGGING
384
(*m_logger) << time_now_string()
385
<< " *** CONNECTION CLOSED\n";
390
TORRENT_ASSERT(m_peer_info->connection == 0);
392
boost::shared_ptr<torrent> t = m_torrent.lock();
393
if (t) TORRENT_ASSERT(t->connection_for(remote()) != this);
397
void peer_connection::fast_reconnect(bool r)
399
if (peer_info_struct() && peer_info_struct()->fast_reconnects > 1) return;
400
m_fast_reconnect = r;
401
peer_info_struct()->connected = time_now()
402
- seconds(m_ses.settings().min_reconnect_time
403
* m_ses.settings().max_failcount);
404
if (peer_info_struct()) ++peer_info_struct()->fast_reconnects;
407
void peer_connection::announce_piece(int index)
409
// dont announce during handshake
410
if (in_handshake()) return;
412
// remove suggested pieces that we have
413
std::vector<int>::iterator i = std::find(
414
m_suggested_pieces.begin(), m_suggested_pieces.end(), index);
415
if (i != m_suggested_pieces.end()) m_suggested_pieces.erase(i);
417
// optimization, don't send have messages
418
// to peers that already have the piece
419
if (!m_ses.settings().send_redundant_have
420
&& has_piece(index)) return;
422
#ifdef TORRENT_VERBOSE_LOGGING
423
(*m_logger) << time_now_string()
424
<< " ==> HAVE [ piece: " << index << "]\n";
428
boost::shared_ptr<torrent> t = m_torrent.lock();
430
TORRENT_ASSERT(t->have_piece(index));
434
bool peer_connection::has_piece(int i) const
438
boost::shared_ptr<torrent> t = m_torrent.lock();
440
TORRENT_ASSERT(t->valid_metadata());
441
TORRENT_ASSERT(i >= 0);
442
TORRENT_ASSERT(i < t->torrent_file().num_pieces());
443
return m_have_piece[i];
446
std::deque<piece_block> const& peer_connection::request_queue() const
448
return m_request_queue;
451
std::deque<piece_block> const& peer_connection::download_queue() const
453
return m_download_queue;
456
std::deque<peer_request> const& peer_connection::upload_queue() const
461
void peer_connection::add_stat(size_type downloaded, size_type uploaded)
463
m_statistics.add_stat(downloaded, uploaded);
466
std::vector<bool> const& peer_connection::get_bitfield() const
471
void peer_connection::received_valid_data(int index)
475
#ifndef TORRENT_DISABLE_EXTENSIONS
476
for (extension_list_t::iterator i = m_extensions.begin()
477
, end(m_extensions.end()); i != end; ++i)
479
try { (*i)->on_piece_pass(index); } catch (std::exception&) {}
484
void peer_connection::received_invalid_data(int index)
488
#ifndef TORRENT_DISABLE_EXTENSIONS
489
for (extension_list_t::iterator i = m_extensions.begin()
490
, end(m_extensions.end()); i != end; ++i)
492
try { (*i)->on_piece_failed(index); } catch (std::exception&) {}
496
if (peer_info_struct())
498
peer_info_struct()->on_parole = true;
499
++peer_info_struct()->hashfails;
500
int& trust_points = peer_info_struct()->trust_points;
502
// we decrease more than we increase, to keep the
503
// allowed failed/passed ratio low.
504
// TODO: make this limit user settable
506
if (trust_points < -7) trust_points = -7;
510
size_type peer_connection::total_free_upload() const
512
return m_free_upload;
515
void peer_connection::add_free_upload(size_type free_upload)
519
m_free_upload += free_upload;
522
// verifies a piece to see if it is valid (is within a valid range)
523
// and if it can correspond to a request generated by libtorrent.
524
bool peer_connection::verify_piece(const peer_request& p) const
528
boost::shared_ptr<torrent> t = m_torrent.lock();
531
TORRENT_ASSERT(t->valid_metadata());
532
torrent_info const& ti = t->torrent_file();
535
&& p.piece < t->torrent_file().num_pieces()
538
&& (p.length == t->block_size()
539
|| (p.length < t->block_size()
540
&& p.piece == ti.num_pieces()-1
541
&& p.start + p.length == ti.piece_size(p.piece))
542
|| (m_request_large_blocks
543
&& p.length <= ti.piece_length() * m_prefer_whole_pieces == 0 ?
544
1 : m_prefer_whole_pieces))
545
&& p.piece * size_type(ti.piece_length()) + p.start + p.length
547
&& (p.start % t->block_size() == 0);
550
void peer_connection::attach_to_torrent(sha1_hash const& ih)
554
TORRENT_ASSERT(!m_disconnecting);
555
TORRENT_ASSERT(m_torrent.expired());
556
boost::weak_ptr<torrent> wpt = m_ses.find_torrent(ih);
557
boost::shared_ptr<torrent> t = wpt.lock();
559
if (t && t->is_aborted())
561
#ifdef TORRENT_VERBOSE_LOGGING
562
(*m_logger) << " *** the torrent has been aborted\n";
569
// we couldn't find the torrent!
570
#ifdef TORRENT_VERBOSE_LOGGING
571
(*m_logger) << " *** couldn't find a torrent with the given info_hash: " << ih << "\n";
572
(*m_logger) << " torrents:\n";
573
session_impl::torrent_map const& torrents = m_ses.m_torrents;
574
for (session_impl::torrent_map::const_iterator i = torrents.begin()
575
, end(torrents.end()); i != end; ++i)
577
(*m_logger) << " " << i->second->torrent_file().info_hash() << "\n";
580
throw std::runtime_error("got info-hash that is not in our session");
585
// paused torrents will not accept
586
// incoming connections
587
#ifdef TORRENT_VERBOSE_LOGGING
588
(*m_logger) << " rejected connection to paused torrent\n";
590
throw std::runtime_error("connection rejected by paused torrent");
593
TORRENT_ASSERT(m_torrent.expired());
594
// check to make sure we don't have another connection with the same
595
// info_hash and peer_id. If we do. close this connection.
596
t->attach_peer(this);
599
TORRENT_ASSERT(!m_torrent.expired());
601
// if the torrent isn't ready to accept
602
// connections yet, we'll have to wait with
603
// our initialization
604
if (t->ready_for_connections()) init();
606
TORRENT_ASSERT(!m_torrent.expired());
608
// assume the other end has no pieces
609
// if we don't have valid metadata yet,
610
// leave the vector unallocated
611
TORRENT_ASSERT(m_num_pieces == 0);
612
std::fill(m_have_piece.begin(), m_have_piece.end(), false);
613
TORRENT_ASSERT(!m_torrent.expired());
618
// -----------------------------
619
// --------- KEEPALIVE ---------
620
// -----------------------------
622
void peer_connection::incoming_keepalive()
626
#ifdef TORRENT_VERBOSE_LOGGING
627
(*m_logger) << time_now_string() << " <== KEEPALIVE\n";
631
// -----------------------------
632
// ----------- CHOKE -----------
633
// -----------------------------
635
void peer_connection::incoming_choke()
639
boost::shared_ptr<torrent> t = m_torrent.lock();
642
#ifndef TORRENT_DISABLE_EXTENSIONS
643
for (extension_list_t::iterator i = m_extensions.begin()
644
, end(m_extensions.end()); i != end; ++i)
646
if ((*i)->on_choke()) return;
650
#ifdef TORRENT_VERBOSE_LOGGING
651
(*m_logger) << time_now_string() << " <== CHOKE\n";
653
m_peer_choked = true;
654
t->get_policy().choked(*this);
656
if (peer_info_struct() == 0 || !peer_info_struct()->on_parole)
658
// if the peer is not in parole mode, clear the queued
662
piece_picker& p = t->picker();
663
for (std::deque<piece_block>::const_iterator i = m_request_queue.begin()
664
, end(m_request_queue.end()); i != end; ++i)
666
// since this piece was skipped, clear it and allow it to
667
// be requested from other peers
668
p.abort_download(*i);
671
m_request_queue.clear();
675
bool match_request(peer_request const& r, piece_block const& b, int block_size)
677
if (b.piece_index != r.piece) return false;
678
if (b.block_index != r.start / block_size) return false;
679
if (r.start % block_size != 0) return false;
683
// -----------------------------
684
// -------- REJECT PIECE -------
685
// -----------------------------
687
void peer_connection::incoming_reject_request(peer_request const& r)
691
boost::shared_ptr<torrent> t = m_torrent.lock();
694
#ifndef TORRENT_DISABLE_EXTENSIONS
695
for (extension_list_t::iterator i = m_extensions.begin()
696
, end(m_extensions.end()); i != end; ++i)
698
if ((*i)->on_reject(r)) return;
702
std::deque<piece_block>::iterator i = std::find_if(
703
m_download_queue.begin(), m_download_queue.end()
704
, bind(match_request, boost::cref(r), _1, t->block_size()));
706
#ifdef TORRENT_VERBOSE_LOGGING
707
(*m_logger) << time_now_string()
708
<< " <== REJECT_PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
711
piece_block b(-1, 0);
712
if (i != m_download_queue.end())
715
m_download_queue.erase(i);
717
// if the peer is in parole mode, keep the request
718
if (peer_info_struct() && peer_info_struct()->on_parole)
720
m_request_queue.push_front(b);
722
else if (!t->is_seed())
724
piece_picker& p = t->picker();
728
#ifdef TORRENT_VERBOSE_LOGGING
731
(*m_logger) << time_now_string()
732
<< " *** PIECE NOT IN REQUEST QUEUE\n";
735
if (has_peer_choked())
737
// if we're choked and we got a rejection of
738
// a piece in the allowed fast set, remove it
739
// from the allow fast set.
740
std::vector<int>::iterator i = std::find(
741
m_allowed_fast.begin(), m_allowed_fast.end(), r.piece);
742
if (i != m_allowed_fast.end()) m_allowed_fast.erase(i);
746
std::vector<int>::iterator i = std::find(m_suggested_pieces.begin()
747
, m_suggested_pieces.end(), r.piece);
748
if (i != m_suggested_pieces.end())
749
m_suggested_pieces.erase(i);
752
if (m_request_queue.empty())
754
if (m_download_queue.size() < 2)
756
request_a_block(*t, *this);
758
send_block_requests();
762
// -----------------------------
763
// -------- REJECT PIECE -------
764
// -----------------------------
766
void peer_connection::incoming_suggest(int index)
770
#ifdef TORRENT_VERBOSE_LOGGING
771
(*m_logger) << time_now_string()
772
<< " <== SUGGEST_PIECE [ piece: " << index << " ]\n";
774
boost::shared_ptr<torrent> t = m_torrent.lock();
777
#ifndef TORRENT_DISABLE_EXTENSIONS
778
for (extension_list_t::iterator i = m_extensions.begin()
779
, end(m_extensions.end()); i != end; ++i)
781
if ((*i)->on_suggest(index)) return;
785
if (t->have_piece(index)) return;
787
if (m_suggested_pieces.size() > 9)
788
m_suggested_pieces.erase(m_suggested_pieces.begin());
789
m_suggested_pieces.push_back(index);
791
#ifdef TORRENT_VERBOSE_LOGGING
792
(*m_logger) << time_now_string()
793
<< " ** SUGGEST_PIECE [ piece: " << index << " added to set: " << m_suggested_pieces.size() << " ]\n";
797
// -----------------------------
798
// ---------- UNCHOKE ----------
799
// -----------------------------
801
void peer_connection::incoming_unchoke()
805
boost::shared_ptr<torrent> t = m_torrent.lock();
808
#ifndef TORRENT_DISABLE_EXTENSIONS
809
for (extension_list_t::iterator i = m_extensions.begin()
810
, end(m_extensions.end()); i != end; ++i)
812
if ((*i)->on_unchoke()) return;
816
#ifdef TORRENT_VERBOSE_LOGGING
817
(*m_logger) << time_now_string() << " <== UNCHOKE\n";
819
m_peer_choked = false;
820
t->get_policy().unchoked(*this);
823
// -----------------------------
824
// -------- INTERESTED ---------
825
// -----------------------------
827
void peer_connection::incoming_interested()
831
boost::shared_ptr<torrent> t = m_torrent.lock();
834
#ifndef TORRENT_DISABLE_EXTENSIONS
835
for (extension_list_t::iterator i = m_extensions.begin()
836
, end(m_extensions.end()); i != end; ++i)
838
if ((*i)->on_interested()) return;
842
#ifdef TORRENT_VERBOSE_LOGGING
843
(*m_logger) << time_now_string() << " <== INTERESTED\n";
845
m_peer_interested = true;
846
t->get_policy().interested(*this);
849
// -----------------------------
850
// ------ NOT INTERESTED -------
851
// -----------------------------
853
void peer_connection::incoming_not_interested()
857
#ifndef TORRENT_DISABLE_EXTENSIONS
858
for (extension_list_t::iterator i = m_extensions.begin()
859
, end(m_extensions.end()); i != end; ++i)
861
if ((*i)->on_not_interested()) return;
865
m_became_uninterested = time_now();
867
#ifdef TORRENT_VERBOSE_LOGGING
868
(*m_logger) << time_now_string() << " <== NOT_INTERESTED\n";
871
boost::shared_ptr<torrent> t = m_torrent.lock();
874
m_peer_interested = false;
875
t->get_policy().not_interested(*this);
878
// -----------------------------
879
// ----------- HAVE ------------
880
// -----------------------------
882
void peer_connection::incoming_have(int index)
886
boost::shared_ptr<torrent> t = m_torrent.lock();
889
#ifndef TORRENT_DISABLE_EXTENSIONS
890
for (extension_list_t::iterator i = m_extensions.begin()
891
, end(m_extensions.end()); i != end; ++i)
893
if ((*i)->on_have(index)) return;
897
#ifdef TORRENT_VERBOSE_LOGGING
898
(*m_logger) << time_now_string()
899
<< " <== HAVE [ piece: " << index << "]\n";
902
// if we got an invalid message, abort
903
if (index >= (int)m_have_piece.size() || index < 0)
904
throw protocol_error("got 'have'-message with higher index "
905
"than the number of pieces");
907
if (m_have_piece[index])
909
#ifdef TORRENT_VERBOSE_LOGGING
910
(*m_logger) << " got redundant HAVE message for index: " << index << "\n";
915
m_have_piece[index] = true;
917
// only update the piece_picker if
918
// we have the metadata and if
919
// we're not a seed (in which case
920
// we won't have a piece picker)
921
if (t->valid_metadata())
926
if (!t->have_piece(index)
929
&& t->picker().piece_priority(index) != 0)
930
t->get_policy().peer_is_interesting(*this);
932
// this will disregard all have messages we get within
933
// the first two seconds. Since some clients implements
934
// lazy bitfields, these will not be reliable to use
935
// for an estimated peer download rate.
936
if (!peer_info_struct() || time_now() - peer_info_struct()->connected > seconds(2))
938
// update bytes downloaded since last timer
939
m_remote_bytes_dled += t->torrent_file().piece_size(index);
945
TORRENT_ASSERT(m_peer_info);
946
m_peer_info->seed = true;
947
if (t->is_finished())
949
throw protocol_error("seed to seed connection redundant, disconnecting");
955
// -----------------------------
956
// --------- BITFIELD ----------
957
// -----------------------------
959
void peer_connection::incoming_bitfield(std::vector<bool> const& bitfield)
963
boost::shared_ptr<torrent> t = m_torrent.lock();
966
#ifndef TORRENT_DISABLE_EXTENSIONS
967
for (extension_list_t::iterator i = m_extensions.begin()
968
, end(m_extensions.end()); i != end; ++i)
970
if ((*i)->on_bitfield(bitfield)) return;
974
#ifdef TORRENT_VERBOSE_LOGGING
975
(*m_logger) << time_now_string() << " <== BITFIELD ";
977
for (int i = 0; i < int(bitfield.size()); ++i)
979
if (bitfield[i]) (*m_logger) << "1";
980
else (*m_logger) << "0";
985
// if we don't have the metedata, we cannot
986
// verify the bitfield size
987
if (t->valid_metadata()
988
&& (bitfield.size() / 8) != (m_have_piece.size() / 8))
989
throw protocol_error("got bitfield with invalid size: "
990
+ boost::lexical_cast<std::string>(bitfield.size() / 8)
991
+ "bytes. expected: "
992
+ boost::lexical_cast<std::string>(m_have_piece.size() / 8)
995
// if we don't have metadata yet
996
// just remember the bitmask
997
// don't update the piecepicker
998
// (since it doesn't exist yet)
999
if (!t->ready_for_connections())
1001
m_have_piece = bitfield;
1002
m_num_pieces = std::count(bitfield.begin(), bitfield.end(), true);
1003
if (m_peer_info) m_peer_info->seed = (m_num_pieces == int(bitfield.size()));
1007
TORRENT_ASSERT(t->valid_metadata());
1009
int num_pieces = std::count(bitfield.begin(), bitfield.end(), true);
1010
if (num_pieces == int(m_have_piece.size()))
1012
#ifdef TORRENT_VERBOSE_LOGGING
1013
(*m_logger) << " *** THIS IS A SEED ***\n";
1015
// if this is a web seed. we don't have a peer_info struct
1016
if (m_peer_info) m_peer_info->seed = true;
1017
// if we're a seed too, disconnect
1018
if (t->is_finished())
1020
throw protocol_error("seed to seed connection redundant, disconnecting");
1023
std::fill(m_have_piece.begin(), m_have_piece.end(), true);
1024
m_num_pieces = num_pieces;
1026
if (!t->is_finished())
1027
t->get_policy().peer_is_interesting(*this);
1031
// let the torrent know which pieces the
1033
// if we're a seed, we don't keep track of piece availability
1036
bool interesting = false;
1037
for (int i = 0; i < (int)m_have_piece.size(); ++i)
1039
bool have = bitfield[i];
1040
if (have && !m_have_piece[i])
1042
m_have_piece[i] = true;
1045
if (!t->have_piece(i) && t->picker().piece_priority(i) != 0)
1048
else if (!have && m_have_piece[i])
1050
// this should probably not be allowed
1051
m_have_piece[i] = false;
1057
if (interesting) t->get_policy().peer_is_interesting(*this);
1061
for (int i = 0; i < (int)m_have_piece.size(); ++i)
1063
bool have = bitfield[i];
1064
if (have && !m_have_piece[i])
1066
m_have_piece[i] = true;
1069
else if (!have && m_have_piece[i])
1071
// this should probably not be allowed
1072
m_have_piece[i] = false;
1079
// -----------------------------
1080
// ---------- REQUEST ----------
1081
// -----------------------------
1083
void peer_connection::incoming_request(peer_request const& r)
1087
boost::shared_ptr<torrent> t = m_torrent.lock();
1090
#ifndef TORRENT_DISABLE_EXTENSIONS
1091
for (extension_list_t::iterator i = m_extensions.begin()
1092
, end(m_extensions.end()); i != end; ++i)
1094
if ((*i)->on_request(r)) return;
1098
if (!t->valid_metadata())
1100
// if we don't have valid metadata yet,
1101
// we shouldn't get a request
1102
#ifdef TORRENT_VERBOSE_LOGGING
1103
(*m_logger) << time_now_string()
1104
<< " <== UNEXPECTED_REQUEST [ "
1105
"piece: " << r.piece << " | "
1106
"s: " << r.start << " | "
1107
"l: " << r.length << " | "
1108
"i: " << m_peer_interested << " | "
1109
"t: " << (int)t->torrent_file().piece_size(r.piece) << " | "
1110
"n: " << t->torrent_file().num_pieces() << " ]\n";
1112
write_reject_request(r);
1116
if (int(m_requests.size()) > m_ses.settings().max_allowed_in_request_queue)
1118
// don't allow clients to abuse our
1119
// memory consumption.
1120
// ignore requests if the client
1121
// is making too many of them.
1122
#ifdef TORRENT_VERBOSE_LOGGING
1123
(*m_logger) << time_now_string()
1124
<< " <== TOO MANY REQUESTS [ "
1125
"piece: " << r.piece << " | "
1126
"s: " << r.start << " | "
1127
"l: " << r.length << " | "
1128
"i: " << m_peer_interested << " | "
1129
"t: " << (int)t->torrent_file().piece_size(r.piece) << " | "
1130
"n: " << t->torrent_file().num_pieces() << " ]\n";
1132
write_reject_request(r);
1136
// make sure this request
1137
// is legal and that the peer
1140
&& r.piece < t->torrent_file().num_pieces()
1141
&& t->have_piece(r.piece)
1143
&& r.start < t->torrent_file().piece_size(r.piece)
1145
&& r.length + r.start <= t->torrent_file().piece_size(r.piece)
1146
&& m_peer_interested
1147
&& r.length <= t->block_size())
1149
#ifdef TORRENT_VERBOSE_LOGGING
1150
(*m_logger) << time_now_string()
1151
<< " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
1153
// if we have choked the client
1154
// ignore the request
1155
if (m_choked && m_accept_fast.find(r.piece) == m_accept_fast.end())
1157
write_reject_request(r);
1158
#ifdef TORRENT_VERBOSE_LOGGING
1159
(*m_logger) << time_now_string()
1160
<< " *** REJECTING REQUEST [ peer choked and piece not in allowed fast set ]\n";
1165
m_requests.push_back(r);
1166
m_last_incoming_request = time_now();
1172
#ifdef TORRENT_VERBOSE_LOGGING
1173
(*m_logger) << time_now_string()
1174
<< " <== INVALID_REQUEST [ "
1175
"piece: " << r.piece << " | "
1176
"s: " << r.start << " | "
1177
"l: " << r.length << " | "
1178
"i: " << m_peer_interested << " | "
1179
"t: " << (int)t->torrent_file().piece_size(r.piece) << " | "
1180
"n: " << t->torrent_file().num_pieces() << " | "
1181
"h: " << t->have_piece(r.piece) << " | "
1182
"block_limit: " << t->block_size() << " ]\n";
1185
write_reject_request(r);
1186
++m_num_invalid_requests;
1188
if (t->alerts().should_post(alert::debug))
1190
t->alerts().post_alert(invalid_request_alert(
1195
, "peer sent an illegal piece request"));
1200
void peer_connection::incoming_piece_fragment()
1202
m_last_piece = time_now();
1206
struct check_postcondition
1208
check_postcondition(boost::shared_ptr<torrent> const& t_
1209
, bool init_check = true): t(t_) { if (init_check) check(); }
1211
~check_postcondition() { check(); }
1217
const int blocks_per_piece = static_cast<int>(
1218
t->torrent_file().piece_length() / t->block_size());
1220
std::vector<piece_picker::downloading_piece> const& dl_queue
1221
= t->picker().get_download_queue();
1223
for (std::vector<piece_picker::downloading_piece>::const_iterator i =
1224
dl_queue.begin(); i != dl_queue.end(); ++i)
1226
TORRENT_ASSERT(i->finished <= blocks_per_piece);
1231
shared_ptr<torrent> t;
1236
// -----------------------------
1237
// ----------- PIECE -----------
1238
// -----------------------------
1240
void peer_connection::incoming_piece(peer_request const& p, char const* data)
1244
boost::shared_ptr<torrent> t = m_torrent.lock();
1247
#ifndef TORRENT_DISABLE_EXTENSIONS
1248
for (extension_list_t::iterator i = m_extensions.begin()
1249
, end(m_extensions.end()); i != end; ++i)
1251
if ((*i)->on_piece(p, data)) return;
1256
check_postcondition post_checker_(t);
1257
t->check_invariant();
1260
#ifdef TORRENT_VERBOSE_LOGGING
1261
(*m_logger) << time_now_string()
1262
<< " <== PIECE [ piece: " << p.piece << " | "
1263
"s: " << p.start << " | "
1264
"l: " << p.length << " | "
1265
"ds: " << statistics().download_rate() << " | "
1266
"qs: " << m_desired_queue_size << " ]\n";
1269
if (!verify_piece(p))
1271
#ifdef TORRENT_VERBOSE_LOGGING
1272
(*m_logger) << time_now_string()
1273
<< " <== INVALID_PIECE [ piece: " << p.piece << " | "
1274
"start: " << p.start << " | "
1275
"length: " << p.length << " ]\n";
1277
throw protocol_error("got invalid piece packet");
1280
// if we're already seeding, don't bother,
1284
t->received_redundant_data(p.length);
1288
piece_picker& picker = t->picker();
1289
piece_manager& fs = t->filesystem();
1291
std::vector<piece_block> finished_blocks;
1292
piece_block block_finished(p.piece, p.start / t->block_size());
1293
TORRENT_ASSERT(p.start % t->block_size() == 0);
1294
TORRENT_ASSERT(p.length == t->block_size()
1295
|| p.length == t->torrent_file().total_size() % t->block_size());
1297
std::deque<piece_block>::iterator b
1299
m_download_queue.begin()
1300
, m_download_queue.end()
1303
if (b != m_download_queue.end())
1307
for (std::deque<piece_block>::iterator i = m_download_queue.begin();
1310
#ifdef TORRENT_VERBOSE_LOGGING
1311
(*m_logger) << time_now_string()
1312
<< " *** SKIPPED_PIECE [ piece: " << i->piece_index << " | "
1313
"b: " << i->block_index << " ] ***\n";
1315
// since this piece was skipped, clear it and allow it to
1316
// be requested from other peers
1317
// TODO: send cancel?
1318
picker.abort_download(*i);
1321
// remove the request that just finished
1322
// from the download queue plus the
1324
m_download_queue.erase(m_download_queue.begin()
1329
m_download_queue.erase(b);
1332
t->cancel_block(block_finished);
1336
if (t->alerts().should_post(alert::debug))
1338
t->alerts().post_alert(
1342
, "got a block that was not in the request queue"));
1344
#ifdef TORRENT_VERBOSE_LOGGING
1345
(*m_logger) << " *** The block we just got was not in the "
1346
"request queue ***\n";
1348
t->received_redundant_data(p.length);
1349
request_a_block(*t, *this);
1350
send_block_requests();
1354
// if the block we got is already finished, then ignore it
1355
if (picker.is_downloaded(block_finished))
1357
t->received_redundant_data(p.length);
1359
request_a_block(*t, *this);
1360
send_block_requests();
1364
fs.async_write(p, data, bind(&peer_connection::on_disk_write_complete
1365
, self(), _1, _2, p, t));
1366
m_outstanding_writing_bytes += p.length;
1367
TORRENT_ASSERT(!m_reading);
1368
picker.mark_as_writing(block_finished, peer_info_struct());
1371
void peer_connection::on_disk_write_complete(int ret, disk_io_job const& j
1372
, peer_request p, boost::shared_ptr<torrent> t)
1374
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
1378
m_outstanding_writing_bytes -= p.length;
1379
TORRENT_ASSERT(m_outstanding_writing_bytes >= 0);
1381
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
1382
(*m_ses.m_logger) << time_now_string() << " *** DISK_WRITE_COMPLETE [ p: "
1383
<< p.piece << " o: " << p.start << " ]\n";
1385
// in case the outstanding bytes just dropped down
1386
// to allow to receive more data
1389
piece_block block_finished(p.piece, p.start / t->block_size());
1391
if (ret == -1 || !t)
1393
if (t->has_picker()) t->picker().abort_download(block_finished);
1397
m_ses.connection_failed(m_socket, remote(), j.str.c_str());
1401
if (t->alerts().should_post(alert::fatal))
1403
std::string err = "torrent paused: disk write error, " + j.str;
1404
t->alerts().post_alert(file_error_alert(t->get_handle(), err));
1410
if (t->is_seed()) return;
1412
piece_picker& picker = t->picker();
1414
TORRENT_ASSERT(p.piece == j.piece);
1415
TORRENT_ASSERT(p.start == j.offset);
1416
picker.mark_as_finished(block_finished, peer_info_struct());
1417
if (t->alerts().should_post(alert::debug))
1419
t->alerts().post_alert(block_finished_alert(t->get_handle(),
1420
block_finished.block_index, block_finished.piece_index, "block finished"));
1428
// did we just finish the piece?
1429
if (picker.is_piece_finished(p.piece))
1432
check_postcondition post_checker2_(t, false);
1434
t->async_verify_piece(p.piece, bind(&torrent::piece_finished, t
1440
catch (std::exception const& e)
1442
std::cerr << e.what() << std::endl;
1443
TORRENT_ASSERT(false);
1447
if (!t->is_seed() && !m_torrent.expired())
1449
// this is a free function defined in policy.cpp
1450
request_a_block(*t, *this);
1451
send_block_requests();
1456
// -----------------------------
1457
// ---------- CANCEL -----------
1458
// -----------------------------
1460
void peer_connection::incoming_cancel(peer_request const& r)
1464
#ifndef TORRENT_DISABLE_EXTENSIONS
1465
for (extension_list_t::iterator i = m_extensions.begin()
1466
, end(m_extensions.end()); i != end; ++i)
1468
if ((*i)->on_cancel(r)) return;
1472
#ifdef TORRENT_VERBOSE_LOGGING
1473
(*m_logger) << time_now_string()
1474
<< " <== CANCEL [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
1477
std::deque<peer_request>::iterator i
1478
= std::find(m_requests.begin(), m_requests.end(), r);
1480
if (i != m_requests.end())
1482
m_requests.erase(i);
1486
#ifdef TORRENT_VERBOSE_LOGGING
1487
(*m_logger) << time_now_string() << " *** GOT CANCEL NOT IN THE QUEUE\n";
1492
// -----------------------------
1493
// --------- DHT PORT ----------
1494
// -----------------------------
1496
void peer_connection::incoming_dht_port(int listen_port)
1500
#ifdef TORRENT_VERBOSE_LOGGING
1501
(*m_logger) << time_now_string()
1502
<< " <== DHT_PORT [ p: " << listen_port << " ]\n";
1504
#ifndef TORRENT_DISABLE_DHT
1505
m_ses.add_dht_node(udp::endpoint(
1506
m_remote.address(), listen_port));
1510
// -----------------------------
1511
// --------- HAVE ALL ----------
1512
// -----------------------------
1514
void peer_connection::incoming_have_all()
1518
boost::shared_ptr<torrent> t = m_torrent.lock();
1521
#ifdef TORRENT_VERBOSE_LOGGING
1522
(*m_logger) << time_now_string() << " <== HAVE_ALL\n";
1525
#ifndef TORRENT_DISABLE_EXTENSIONS
1526
for (extension_list_t::iterator i = m_extensions.begin()
1527
, end(m_extensions.end()); i != end; ++i)
1529
if ((*i)->on_have_all()) return;
1535
if (m_peer_info) m_peer_info->seed = true;
1537
// if we don't have metadata yet
1538
// just remember the bitmask
1539
// don't update the piecepicker
1540
// (since it doesn't exist yet)
1541
if (!t->ready_for_connections())
1543
// TODO: this might need something more
1544
// so that once we have the metadata
1545
// we can construct a full bitfield
1549
#ifdef TORRENT_VERBOSE_LOGGING
1550
(*m_logger) << " *** THIS IS A SEED ***\n";
1553
// if we're a seed too, disconnect
1554
if (t->is_finished())
1555
throw protocol_error("seed to seed connection redundant, disconnecting");
1557
TORRENT_ASSERT(!m_have_piece.empty());
1558
std::fill(m_have_piece.begin(), m_have_piece.end(), true);
1559
m_num_pieces = m_have_piece.size();
1562
if (!t->is_finished())
1563
t->get_policy().peer_is_interesting(*this);
1566
// -----------------------------
1567
// --------- HAVE NONE ---------
1568
// -----------------------------
1570
void peer_connection::incoming_have_none()
1574
boost::shared_ptr<torrent> t = m_torrent.lock();
1577
#ifdef TORRENT_VERBOSE_LOGGING
1578
(*m_logger) << time_now_string() << " <== HAVE_NONE\n";
1581
#ifndef TORRENT_DISABLE_EXTENSIONS
1582
for (extension_list_t::iterator i = m_extensions.begin()
1583
, end(m_extensions.end()); i != end; ++i)
1585
if ((*i)->on_have_none()) return;
1589
if (m_peer_info) m_peer_info->seed = false;
1590
TORRENT_ASSERT(!m_have_piece.empty() || !t->ready_for_connections());
1593
// -----------------------------
1594
// ------- ALLOWED FAST --------
1595
// -----------------------------
1597
void peer_connection::incoming_allowed_fast(int index)
1601
boost::shared_ptr<torrent> t = m_torrent.lock();
1604
#ifdef TORRENT_VERBOSE_LOGGING
1605
(*m_logger) << time_now_string() << " <== ALLOWED_FAST [ " << index << " ]\n";
1608
#ifndef TORRENT_DISABLE_EXTENSIONS
1609
for (extension_list_t::iterator i = m_extensions.begin()
1610
, end(m_extensions.end()); i != end; ++i)
1612
if ((*i)->on_allowed_fast(index)) return;
1616
// if we already have the piece, we can
1617
// ignore this message
1618
if (t->valid_metadata()
1619
&& t->have_piece(index))
1622
if (index < 0 || index >= int(m_have_piece.size()))
1624
#ifdef TORRENT_VERBOSE_LOGGING
1625
(*m_logger) << time_now_string() << " <== INVALID_ALLOWED_FAST [ " << index << " | s: "
1626
<< int(m_have_piece.size()) << " ]\n";
1631
m_allowed_fast.push_back(index);
1633
// if the peer has the piece and we want
1634
// to download it, request it
1635
if (int(m_have_piece.size()) > index
1636
&& m_have_piece[index]
1638
&& t->picker().piece_priority(index) > 0)
1640
t->get_policy().peer_is_interesting(*this);
1644
std::vector<int> const& peer_connection::allowed_fast()
1648
boost::shared_ptr<torrent> t = m_torrent.lock();
1651
m_allowed_fast.erase(std::remove_if(m_allowed_fast.begin()
1652
, m_allowed_fast.end(), bind(&torrent::have_piece, t, _1))
1653
, m_allowed_fast.end());
1655
// TODO: sort the allowed fast set in priority order
1656
return m_allowed_fast;
1659
void peer_connection::add_request(piece_block const& block)
1663
boost::shared_ptr<torrent> t = m_torrent.lock();
1666
TORRENT_ASSERT(t->valid_metadata());
1667
TORRENT_ASSERT(block.piece_index >= 0);
1668
TORRENT_ASSERT(block.piece_index < t->torrent_file().num_pieces());
1669
TORRENT_ASSERT(block.block_index >= 0);
1670
TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
1671
TORRENT_ASSERT(!t->picker().is_requested(block) || (t->picker().num_peers(block) > 0));
1672
TORRENT_ASSERT(!t->have_piece(block.piece_index));
1674
piece_picker::piece_state_t state;
1675
peer_speed_t speed = peer_speed();
1676
char const* speedmsg = 0;
1680
state = piece_picker::fast;
1682
else if (speed == medium)
1684
speedmsg = "medium";
1685
state = piece_picker::medium;
1690
state = piece_picker::slow;
1693
if (!t->picker().mark_as_downloading(block, peer_info_struct(), state))
1696
if (t->alerts().should_post(alert::debug))
1698
t->alerts().post_alert(block_downloading_alert(t->get_handle(),
1699
speedmsg, block.block_index, block.piece_index, "block downloading"));
1702
m_request_queue.push_back(block);
1705
void peer_connection::cancel_request(piece_block const& block)
1709
boost::shared_ptr<torrent> t = m_torrent.lock();
1712
TORRENT_ASSERT(t->valid_metadata());
1714
TORRENT_ASSERT(block.piece_index >= 0);
1715
TORRENT_ASSERT(block.piece_index < t->torrent_file().num_pieces());
1716
TORRENT_ASSERT(block.block_index >= 0);
1717
TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
1719
// if all the peers that requested this block has been
1720
// cancelled, then just ignore the cancel.
1721
if (!t->picker().is_requested(block)) return;
1723
std::deque<piece_block>::iterator it
1724
= std::find(m_download_queue.begin(), m_download_queue.end(), block);
1725
if (it == m_download_queue.end())
1727
it = std::find(m_request_queue.begin(), m_request_queue.end(), block);
1728
// when a multi block is received, it is cancelled
1729
// from all peers, so if this one hasn't requested
1730
// the block, just ignore to cancel it.
1731
if (it == m_request_queue.end()) return;
1733
t->picker().abort_download(block);
1734
m_request_queue.erase(it);
1735
// since we found it in the request queue, it means it hasn't been
1736
// sent yet, so we don't have to send a cancel.
1741
m_download_queue.erase(it);
1742
t->picker().abort_download(block);
1745
int block_offset = block.block_index * t->block_size();
1747
= (std::min)((int)t->torrent_file().piece_size(block.piece_index)-block_offset,
1749
TORRENT_ASSERT(block_size > 0);
1750
TORRENT_ASSERT(block_size <= t->block_size());
1753
r.piece = block.piece_index;
1754
r.start = block_offset;
1755
r.length = block_size;
1759
#ifdef TORRENT_VERBOSE_LOGGING
1760
(*m_logger) << time_now_string()
1761
<< " ==> CANCEL [ piece: " << block.piece_index << " | s: "
1762
<< block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n";
1766
void peer_connection::send_choke()
1770
TORRENT_ASSERT(!m_peer_info || !m_peer_info->optimistically_unchoked);
1772
if (m_choked) return;
1776
#ifdef TORRENT_VERBOSE_LOGGING
1777
(*m_logger) << time_now_string() << " ==> CHOKE\n";
1780
m_last_choke = time_now();
1782
m_num_invalid_requests = 0;
1784
// reject the requests we have in the queue
1785
std::for_each(m_requests.begin(), m_requests.end()
1786
, bind(&peer_connection::write_reject_request, this, _1));
1790
void peer_connection::send_unchoke()
1794
if (!m_choked) return;
1795
m_last_unchoke = time_now();
1799
#ifdef TORRENT_VERBOSE_LOGGING
1800
(*m_logger) << time_now_string() << " ==> UNCHOKE\n";
1804
void peer_connection::send_interested()
1808
if (m_interesting) return;
1810
m_interesting = true;
1812
#ifdef TORRENT_VERBOSE_LOGGING
1813
(*m_logger) << time_now_string() << " ==> INTERESTED\n";
1817
void peer_connection::send_not_interested()
1821
if (!m_interesting) return;
1822
write_not_interested();
1823
m_interesting = false;
1825
m_became_uninteresting = time_now();
1827
#ifdef TORRENT_VERBOSE_LOGGING
1828
(*m_logger) << time_now_string() << " ==> NOT_INTERESTED\n";
1832
void peer_connection::send_block_requests()
1836
boost::shared_ptr<torrent> t = m_torrent.lock();
1839
if ((int)m_download_queue.size() >= m_desired_queue_size) return;
1841
while (!m_request_queue.empty()
1842
&& (int)m_download_queue.size() < m_desired_queue_size)
1844
piece_block block = m_request_queue.front();
1846
int block_offset = block.block_index * t->block_size();
1847
int block_size = (std::min)((int)t->torrent_file().piece_size(
1848
block.piece_index) - block_offset, t->block_size());
1849
TORRENT_ASSERT(block_size > 0);
1850
TORRENT_ASSERT(block_size <= t->block_size());
1853
r.piece = block.piece_index;
1854
r.start = block_offset;
1855
r.length = block_size;
1857
m_request_queue.pop_front();
1858
m_download_queue.push_back(block);
1860
#ifdef TORRENT_VERBOSE_LOGGING
1861
(*m_logger) << time_now_string()
1862
<< " *** REQUEST-QUEUE** [ "
1863
"piece: " << block.piece_index << " | "
1864
"block: " << block.block_index << " ]\n";
1867
// if we are requesting large blocks, merge the smaller
1868
// blocks that are in the same piece into larger requests
1869
if (m_request_large_blocks)
1871
int blocks_per_piece = t->torrent_file().piece_length() / t->block_size();
1873
while (!m_request_queue.empty())
1875
// check to see if this block is connected to the previous one
1876
// if it is, merge them, otherwise, break this merge loop
1877
piece_block const& front = m_request_queue.front();
1878
if (front.piece_index * blocks_per_piece + front.block_index
1879
!= block.piece_index * blocks_per_piece + block.block_index + 1)
1881
block = m_request_queue.front();
1882
m_request_queue.pop_front();
1883
m_download_queue.push_back(block);
1885
#ifdef TORRENT_VERBOSE_LOGGING
1886
(*m_logger) << time_now_string()
1887
<< " *** MERGING REQUEST ** [ "
1888
"piece: " << block.piece_index << " | "
1889
"block: " << block.block_index << " ]\n";
1892
block_offset = block.block_index * t->block_size();
1893
block_size = (std::min)((int)t->torrent_file().piece_size(
1894
block.piece_index) - block_offset, t->block_size());
1895
TORRENT_ASSERT(block_size > 0);
1896
TORRENT_ASSERT(block_size <= t->block_size());
1898
r.length += block_size;
1902
TORRENT_ASSERT(verify_piece(r));
1904
#ifndef TORRENT_DISABLE_EXTENSIONS
1905
bool handled = false;
1906
for (extension_list_t::iterator i = m_extensions.begin()
1907
, end(m_extensions.end()); i != end; ++i)
1909
if (handled = (*i)->write_request(r)) break;
1914
m_last_request = time_now();
1918
m_last_request = time_now();
1921
#ifdef TORRENT_VERBOSE_LOGGING
1922
(*m_logger) << time_now_string()
1923
<< " ==> REQUEST [ "
1924
"piece: " << r.piece << " | "
1925
"s: " << r.start << " | "
1926
"l: " << r.length << " | "
1927
"ds: " << statistics().download_rate() << " B/s | "
1928
"qs: " << m_desired_queue_size << " "
1929
"blk: " << (m_request_large_blocks?"large":"single") << " ]\n";
1932
m_last_piece = time_now();
1936
void close_socket_ignore_error(boost::shared_ptr<socket_type> s)
1938
try { s->close(); } catch (std::exception& e) {}
1941
void peer_connection::timed_out()
1943
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
1944
(*m_ses.m_logger) << "CONNECTION TIMED OUT: " << m_remote.address().to_string()
1947
m_ses.connection_failed(m_socket, m_remote, "timed out");
1950
void peer_connection::disconnect()
1952
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
1954
boost::intrusive_ptr<peer_connection> me(this);
1958
if (m_disconnecting) return;
1959
m_disconnecting = true;
1961
m_ses.m_half_open.done(m_connection_ticket);
1963
m_ses.m_io_service.post(boost::bind(&close_socket_ignore_error, m_socket));
1965
boost::shared_ptr<torrent> t = m_torrent.lock();
1969
if (t->has_picker())
1971
piece_picker& picker = t->picker();
1973
while (!m_download_queue.empty())
1975
picker.abort_download(m_download_queue.back());
1976
m_download_queue.pop_back();
1978
while (!m_request_queue.empty())
1980
picker.abort_download(m_request_queue.back());
1981
m_request_queue.pop_back();
1985
t->remove_peer(this);
1989
m_ses.close_connection(me);
1992
void peer_connection::set_upload_limit(int limit)
1994
TORRENT_ASSERT(limit >= -1);
1995
if (limit == -1) limit = (std::numeric_limits<int>::max)();
1996
if (limit < 10) limit = 10;
1997
m_upload_limit = limit;
1998
m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
2001
void peer_connection::set_download_limit(int limit)
2003
TORRENT_ASSERT(limit >= -1);
2004
if (limit == -1) limit = (std::numeric_limits<int>::max)();
2005
if (limit < 10) limit = 10;
2006
m_download_limit = limit;
2007
m_bandwidth_limit[download_channel].throttle(m_download_limit);
2010
size_type peer_connection::share_diff() const
2014
boost::shared_ptr<torrent> t = m_torrent.lock();
2017
float ratio = t->ratio();
2019
// if we have an infinite ratio, just say we have downloaded
2020
// much more than we have uploaded. And we'll keep uploading.
2022
return (std::numeric_limits<size_type>::max)();
2024
return m_free_upload
2025
+ static_cast<size_type>(m_statistics.total_payload_download() * ratio)
2026
- m_statistics.total_payload_upload();
2029
// defined in upnp.cpp
2030
bool is_local(address const& a);
2032
bool peer_connection::on_local_network() const
2034
if (libtorrent::is_local(m_remote.address())) return true;
2038
void peer_connection::get_peer_info(peer_info& p) const
2040
TORRENT_ASSERT(!associated_torrent().expired());
2042
p.down_speed = statistics().download_rate();
2043
p.up_speed = statistics().upload_rate();
2044
p.payload_down_speed = statistics().download_payload_rate();
2045
p.payload_up_speed = statistics().upload_payload_rate();
2048
p.pending_disk_bytes = m_outstanding_writing_bytes;
2050
#ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
2051
p.country[0] = m_country[0];
2052
p.country[1] = m_country[1];
2055
p.total_download = statistics().total_payload_download();
2056
p.total_upload = statistics().total_payload_upload();
2058
if (m_bandwidth_limit[upload_channel].throttle() == bandwidth_limit::inf)
2059
p.upload_limit = -1;
2061
p.upload_limit = m_bandwidth_limit[upload_channel].throttle();
2063
if (m_bandwidth_limit[download_channel].throttle() == bandwidth_limit::inf)
2064
p.download_limit = -1;
2066
p.download_limit = m_bandwidth_limit[download_channel].throttle();
2068
p.load_balancing = total_free_upload();
2070
p.download_queue_length = int(download_queue().size() + m_request_queue.size());
2071
p.target_dl_queue_length = int(desired_queue_size());
2072
p.upload_queue_length = int(upload_queue().size());
2074
if (boost::optional<piece_block_progress> ret = downloading_piece_progress())
2076
p.downloading_piece_index = ret->piece_index;
2077
p.downloading_block_index = ret->block_index;
2078
p.downloading_progress = ret->bytes_downloaded;
2079
p.downloading_total = ret->full_block_bytes;
2083
p.downloading_piece_index = -1;
2084
p.downloading_block_index = -1;
2085
p.downloading_progress = 0;
2086
p.downloading_total = 0;
2089
p.pieces = get_bitfield();
2090
ptime now = time_now();
2091
p.last_request = now - m_last_request;
2092
p.last_active = now - (std::max)(m_last_sent, m_last_receive);
2094
// this will set the flags so that we can update them later
2096
get_specific_peer_info(p);
2098
p.flags |= is_seed() ? peer_info::seed : 0;
2099
if (peer_info_struct())
2101
p.source = peer_info_struct()->source;
2102
p.failcount = peer_info_struct()->failcount;
2103
p.num_hashfails = peer_info_struct()->hashfails;
2104
p.flags |= peer_info_struct()->on_parole ? peer_info::on_parole : 0;
2105
p.flags |= peer_info_struct()->optimistically_unchoked ? peer_info::optimistic_unchoke : 0;
2106
p.remote_dl_rate = m_remote_dl_rate;
2112
p.num_hashfails = 0;
2113
p.remote_dl_rate = 0;
2116
p.send_buffer_size = m_send_buffer.capacity();
2119
void peer_connection::cut_receive_buffer(int size, int packet_size)
2123
TORRENT_ASSERT(packet_size > 0);
2124
TORRENT_ASSERT(int(m_recv_buffer.size()) >= size);
2125
TORRENT_ASSERT(int(m_recv_buffer.size()) >= m_recv_pos);
2126
TORRENT_ASSERT(m_recv_pos >= size);
2129
std::memmove(&m_recv_buffer[0], &m_recv_buffer[0] + size, m_recv_pos - size);
2134
std::fill(m_recv_buffer.begin() + m_recv_pos, m_recv_buffer.end(), 0);
2137
m_packet_size = packet_size;
2138
if (m_packet_size >= m_recv_pos) m_recv_buffer.resize(m_packet_size);
2141
void peer_connection::second_tick(float tick_interval) throw()
2148
ptime now(time_now());
2150
boost::shared_ptr<torrent> t = m_torrent.lock();
2155
#ifndef TORRENT_DISABLE_EXTENSIONS
2156
for (extension_list_t::iterator i = m_extensions.begin()
2157
, end(m_extensions.end()); i != end; ++i)
2163
m_ignore_bandwidth_limits = m_ses.settings().ignore_limits_on_local_network
2164
&& on_local_network();
2166
m_statistics.second_tick(tick_interval);
2168
if (!t->valid_metadata()) return;
2170
// calculate the desired download queue size
2171
const float queue_time = m_ses.settings().request_queue_time;
2172
// (if the latency is more than this, the download will stall)
2173
// so, the queue size is queue_time * down_rate / 16 kiB
2174
// (16 kB is the size of each request)
2175
// the minimum number of requests is 2 and the maximum is 48
2176
// the block size doesn't have to be 16. So we first query the
2178
const int block_size = m_request_large_blocks
2179
? t->torrent_file().piece_length() : t->block_size();
2180
TORRENT_ASSERT(block_size > 0);
2182
m_desired_queue_size = static_cast<int>(queue_time
2183
* statistics().download_rate() / block_size);
2184
if (m_desired_queue_size > m_max_out_request_queue)
2185
m_desired_queue_size = m_max_out_request_queue;
2186
if (m_desired_queue_size < min_request_queue)
2187
m_desired_queue_size = min_request_queue;
2189
if (!m_download_queue.empty()
2190
&& now - m_last_piece > seconds(m_ses.settings().piece_timeout))
2192
// this peer isn't sending the pieces we've
2193
// requested (this has been observed by BitComet)
2194
// in this case we'll clear our download queue and
2195
// re-request the blocks.
2196
#ifdef TORRENT_VERBOSE_LOGGING
2197
(*m_logger) << time_now_string()
2198
<< " *** PIECE_REQUESTS TIMED OUT [ " << (int)m_download_queue.size()
2199
<< " " << total_seconds(now - m_last_piece) << "] ***\n";
2204
m_download_queue.clear();
2205
m_request_queue.clear();
2209
piece_picker& picker = t->picker();
2210
while (!m_download_queue.empty())
2212
piece_block const& r = m_download_queue.back();
2213
picker.abort_download(r);
2214
write_cancel(t->to_req(r));
2215
m_download_queue.pop_back();
2217
while (!m_request_queue.empty())
2219
piece_block const& r = m_request_queue.back();
2220
picker.abort_download(r);
2221
write_cancel(t->to_req(r));
2222
m_request_queue.pop_back();
2225
m_assume_fifo = true;
2227
request_a_block(*t, *this);
2228
send_block_requests();
2232
// If the client sends more data
2233
// we send it data faster, otherwise, slower.
2234
// It will also depend on how much data the
2235
// client has sent us. This is the mean to
2236
// maintain the share ratio given by m_ratio
2239
if (t->is_finished() || is_choked() || t->ratio() == 0.0f)
2241
// if we have downloaded more than one piece more
2242
// than we have uploaded OR if we are a seed
2243
// have an unlimited upload rate
2244
m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
2248
size_type bias = 0x10000 + 2 * t->block_size() + m_free_upload;
2250
double break_even_time = 15; // seconds.
2251
size_type have_uploaded = m_statistics.total_payload_upload();
2252
size_type have_downloaded = m_statistics.total_payload_download();
2253
double download_speed = m_statistics.download_rate();
2255
size_type soon_downloaded =
2256
have_downloaded + (size_type)(download_speed * break_even_time*1.5);
2258
if (t->ratio() != 1.f)
2259
soon_downloaded = (size_type)(soon_downloaded*(double)t->ratio());
2261
double upload_speed_limit = (std::min)((soon_downloaded - have_uploaded
2262
+ bias) / break_even_time, double(m_upload_limit));
2264
upload_speed_limit = (std::min)(upload_speed_limit,
2265
(double)(std::numeric_limits<int>::max)());
2267
m_bandwidth_limit[upload_channel].throttle(
2268
(std::min)((std::max)((int)upload_speed_limit, 20)
2272
// update once every minute
2273
if (now - m_remote_dl_update >= seconds(60))
2275
float factor = 0.6666666666667f;
2277
if (m_remote_dl_rate == 0) factor = 0.0f;
2279
m_remote_dl_rate = int((m_remote_dl_rate * factor) +
2280
((m_remote_bytes_dled * (1.0f-factor)) / 60.f));
2282
m_remote_bytes_dled = 0;
2283
m_remote_dl_update = now;
2288
catch (std::exception& e)
2290
#ifdef TORRENT_VERBOSE_LOGGING
2291
(*m_logger) << "**ERROR**: " << e.what() << "\n";
2293
m_ses.connection_failed(m_socket, remote(), e.what());
2297
void peer_connection::fill_send_buffer()
2301
boost::shared_ptr<torrent> t = m_torrent.lock();
2304
// only add new piece-chunks if the send buffer is small enough
2305
// otherwise there will be no end to how large it will be!
2307
int buffer_size_watermark = int(m_statistics.upload_rate()) / 2;
2308
if (buffer_size_watermark < 1024) buffer_size_watermark = 1024;
2309
else if (buffer_size_watermark > 80 * 1024) buffer_size_watermark = 80 * 1024;
2311
while (!m_requests.empty()
2312
&& (send_buffer_size() + m_reading_bytes < buffer_size_watermark)
2315
TORRENT_ASSERT(t->valid_metadata());
2316
peer_request& r = m_requests.front();
2318
TORRENT_ASSERT(r.piece >= 0);
2319
TORRENT_ASSERT(r.piece < (int)m_have_piece.size());
2320
TORRENT_ASSERT(t->have_piece(r.piece));
2321
TORRENT_ASSERT(r.start + r.length <= t->torrent_file().piece_size(r.piece));
2322
TORRENT_ASSERT(r.length > 0 && r.start >= 0);
2324
t->filesystem().async_read(r, bind(&peer_connection::on_disk_read_complete
2325
, self(), _1, _2, r));
2326
m_reading_bytes += r.length;
2328
m_requests.erase(m_requests.begin());
2332
void peer_connection::on_disk_read_complete(int ret, disk_io_job const& j, peer_request r)
2334
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2336
m_reading_bytes -= r.length;
2338
if (ret != r.length || m_torrent.expired())
2340
boost::shared_ptr<torrent> t = m_torrent.lock();
2343
m_ses.connection_failed(m_socket, remote(), j.str.c_str());
2347
if (t->alerts().should_post(alert::fatal))
2349
std::string err = "torrent paused: disk read error";
2355
t->alerts().post_alert(file_error_alert(t->get_handle(), err));
2361
#ifdef TORRENT_VERBOSE_LOGGING
2362
(*m_logger) << time_now_string()
2363
<< " ==> PIECE [ piece: " << r.piece << " | s: " << r.start
2364
<< " | l: " << r.length << " ]\n";
2367
write_piece(r, j.buffer);
2371
void peer_connection::assign_bandwidth(int channel, int amount)
2373
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2375
#ifdef TORRENT_VERBOSE_LOGGING
2376
(*m_logger) << "bandwidth [ " << channel << " ] + " << amount << "\n";
2379
m_bandwidth_limit[channel].assign(amount);
2380
if (channel == upload_channel)
2382
TORRENT_ASSERT(m_writing);
2386
else if (channel == download_channel)
2388
TORRENT_ASSERT(m_reading);
2394
void peer_connection::expire_bandwidth(int channel, int amount)
2396
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2398
m_bandwidth_limit[channel].expire(amount);
2399
if (channel == upload_channel)
2403
else if (channel == download_channel)
2409
void peer_connection::setup_send()
2411
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2415
if (m_writing) return;
2417
shared_ptr<torrent> t = m_torrent.lock();
2419
if (m_bandwidth_limit[upload_channel].quota_left() == 0
2420
&& !m_send_buffer.empty()
2423
&& !m_ignore_bandwidth_limits)
2425
// in this case, we have data to send, but no
2426
// bandwidth. So, we simply request bandwidth
2429
if (m_bandwidth_limit[upload_channel].max_assignable() > 0)
2431
#ifdef TORRENT_VERBOSE_LOGGING
2432
(*m_logger) << "req bandwidth [ " << upload_channel << " ]\n";
2435
TORRENT_ASSERT(!m_writing);
2436
// peers that we are not interested in are non-prioritized
2438
t->request_bandwidth(upload_channel, self()
2439
, !(is_interesting() && !has_peer_choked()));
2444
if (!can_write()) return;
2446
TORRENT_ASSERT(!m_writing);
2448
// send the actual buffer
2449
if (!m_send_buffer.empty())
2451
int amount_to_send = m_send_buffer.size();
2452
int quota_left = m_bandwidth_limit[upload_channel].quota_left();
2453
if (!m_ignore_bandwidth_limits && amount_to_send > quota_left)
2454
amount_to_send = quota_left;
2456
TORRENT_ASSERT(amount_to_send > 0);
2458
#ifdef TORRENT_VERBOSE_LOGGING
2459
(*m_logger) << "async_write " << amount_to_send << " bytes\n";
2461
std::list<asio::const_buffer> const& vec = m_send_buffer.build_iovec(amount_to_send);
2462
m_socket->async_write_some(vec, bind(&peer_connection::on_send_data, self(), _1, _2));
2468
void peer_connection::setup_receive()
2470
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2474
#ifdef TORRENT_VERBOSE_LOGGING
2475
(*m_logger) << "setup_receive: reading = " << m_reading << "\n";
2477
if (m_reading) return;
2479
shared_ptr<torrent> t = m_torrent.lock();
2481
if (m_bandwidth_limit[download_channel].quota_left() == 0
2484
&& !m_ignore_bandwidth_limits)
2486
if (m_bandwidth_limit[download_channel].max_assignable() > 0)
2488
#ifdef TORRENT_VERBOSE_LOGGING
2489
(*m_logger) << "req bandwidth [ " << download_channel << " ]\n";
2492
t->request_bandwidth(download_channel, self(), m_non_prioritized);
2497
if (!can_read()) return;
2499
TORRENT_ASSERT(m_packet_size > 0);
2500
int max_receive = m_packet_size - m_recv_pos;
2501
int quota_left = m_bandwidth_limit[download_channel].quota_left();
2502
if (!m_ignore_bandwidth_limits && max_receive > quota_left)
2503
max_receive = quota_left;
2505
if (max_receive == 0) return;
2507
TORRENT_ASSERT(m_recv_pos >= 0);
2508
TORRENT_ASSERT(m_packet_size > 0);
2510
TORRENT_ASSERT(can_read());
2511
#ifdef TORRENT_VERBOSE_LOGGING
2512
(*m_logger) << "async_read " << max_receive << " bytes\n";
2514
m_socket->async_read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
2515
, max_receive), bind(&peer_connection::on_receive_data, self(), _1, _2));
2519
void peer_connection::reset_recv_buffer(int packet_size)
2521
TORRENT_ASSERT(packet_size > 0);
2522
if (m_recv_pos > m_packet_size)
2524
cut_receive_buffer(m_packet_size, packet_size);
2528
m_packet_size = packet_size;
2529
if (int(m_recv_buffer.size()) < m_packet_size)
2530
m_recv_buffer.resize(m_packet_size);
2533
void peer_connection::send_buffer(char const* buf, int size)
2535
int free_space = m_send_buffer.space_in_last_buffer();
2536
if (free_space > size) free_space = size;
2539
m_send_buffer.append(buf, free_space);
2542
#ifdef TORRENT_STATS
2543
m_ses.m_buffer_usage_logger << log_time() << " send_buffer: "
2544
<< free_space << std::endl;
2545
m_ses.log_buffer_usage();
2548
if (size <= 0) return;
2550
std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
2551
TORRENT_ASSERT(buffer.second >= size);
2552
std::memcpy(buffer.first, buf, size);
2553
m_send_buffer.append_buffer(buffer.first, buffer.second, size
2554
, bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
2555
#ifdef TORRENT_STATS
2556
m_ses.m_buffer_usage_logger << log_time() << " send_buffer_alloc: " << size << std::endl;
2557
m_ses.log_buffer_usage();
2562
// TODO: change this interface to automatically call setup_send() when the
2563
// return value is destructed
2564
buffer::interval peer_connection::allocate_send_buffer(int size)
2566
char* insert = m_send_buffer.allocate_appendix(size);
2569
std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
2570
TORRENT_ASSERT(buffer.second >= size);
2571
m_send_buffer.append_buffer(buffer.first, buffer.second, size
2572
, bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
2573
buffer::interval ret(buffer.first, buffer.first + size);
2574
#ifdef TORRENT_STATS
2575
m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer_alloc: " << size << std::endl;
2576
m_ses.log_buffer_usage();
2582
#ifdef TORRENT_STATS
2583
m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer: " << size << std::endl;
2584
m_ses.log_buffer_usage();
2586
buffer::interval ret(insert, insert + size);
2594
set_to_zero(T& v, bool cond): m_val(v), m_cond(cond) {}
2595
void fire() { if (!m_cond) return; m_cond = false; m_val = 0; }
2596
~set_to_zero() { if (m_cond) m_val = 0; }
2602
// --------------------------
2604
// --------------------------
2606
// throws exception when the client should be disconnected
2607
void peer_connection::on_receive_data(const asio::error_code& error
2608
, std::size_t bytes_transferred) try
2610
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2614
TORRENT_ASSERT(m_reading);
2619
#ifdef TORRENT_VERBOSE_LOGGING
2620
(*m_logger) << "**ERROR**: " << error.message() << "[in peer_connection::on_receive_data]\n";
2622
on_receive(error, bytes_transferred);
2623
throw std::runtime_error(error.message());
2628
#ifdef TORRENT_VERBOSE_LOGGING
2629
(*m_logger) << "read " << bytes_transferred << " bytes\n";
2631
// correct the dl quota usage, if not all of the buffer was actually read
2632
if (!m_ignore_bandwidth_limits)
2633
m_bandwidth_limit[download_channel].use_quota(bytes_transferred);
2635
if (m_disconnecting) return;
2637
TORRENT_ASSERT(m_packet_size > 0);
2638
TORRENT_ASSERT(bytes_transferred > 0);
2640
m_last_receive = time_now();
2641
m_recv_pos += bytes_transferred;
2642
TORRENT_ASSERT(m_recv_pos <= int(m_recv_buffer.size()));
2644
on_receive(error, bytes_transferred);
2646
TORRENT_ASSERT(m_packet_size > 0);
2650
&& (m_recv_buffer.capacity() - m_packet_size) > 128)
2652
buffer(m_packet_size).swap(m_recv_buffer);
2655
int max_receive = m_packet_size - m_recv_pos;
2656
int quota_left = m_bandwidth_limit[download_channel].quota_left();
2657
if (!m_ignore_bandwidth_limits && max_receive > quota_left)
2658
max_receive = quota_left;
2660
if (max_receive == 0) break;
2662
asio::error_code ec;
2663
bytes_transferred = m_socket->read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
2664
, max_receive), ec);
2665
if (ec && ec != asio::error::would_block)
2666
throw asio::system_error(ec);
2668
while (bytes_transferred > 0);
2672
catch (file_error& e)
2674
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2676
boost::shared_ptr<torrent> t = m_torrent.lock();
2679
m_ses.connection_failed(m_socket, remote(), e.what());
2683
if (t->alerts().should_post(alert::fatal))
2685
t->alerts().post_alert(
2686
file_error_alert(t->get_handle()
2687
, std::string("torrent paused: ") + e.what()));
2691
catch (std::exception& e)
2693
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2694
m_ses.connection_failed(m_socket, remote(), e.what());
2698
// all exceptions should derive from std::exception
2699
TORRENT_ASSERT(false);
2700
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2701
m_ses.connection_failed(m_socket, remote(), "connection failed for unknown reason");
2704
bool peer_connection::can_write() const
2708
// if we have requests or pending data to be sent or announcements to be made
2709
// we want to send data
2710
return !m_send_buffer.empty()
2711
&& (m_bandwidth_limit[upload_channel].quota_left() > 0
2712
|| m_ignore_bandwidth_limits)
2716
bool peer_connection::can_read() const
2720
bool ret = (m_bandwidth_limit[download_channel].quota_left() > 0
2721
|| m_ignore_bandwidth_limits)
2723
&& m_outstanding_writing_bytes <
2724
m_ses.settings().max_outstanding_disk_bytes_per_connection;
2726
#if defined(TORRENT_VERBOSE_LOGGING)
2727
(*m_logger) << "*** can_read() " << ret << " reading: " << m_reading << "\n";
2733
void peer_connection::connect(int ticket)
2737
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
2738
(*m_ses.m_logger) << "CONNECTING: " << m_remote.address().to_string()
2739
<< ":" << m_remote.port() << "\n";
2742
m_connection_ticket = ticket;
2743
boost::shared_ptr<torrent> t = m_torrent.lock();
2747
TORRENT_ASSERT(m_connecting);
2748
m_socket->open(t->get_interface().protocol());
2750
// set the socket to non-blocking, so that we can
2751
// read the entire buffer on each read event we get
2752
tcp::socket::non_blocking_io ioc(true);
2753
m_socket->io_control(ioc);
2754
m_socket->bind(t->get_interface());
2755
m_socket->async_connect(m_remote
2756
, bind(&peer_connection::on_connection_complete, self(), _1));
2758
if (t->alerts().should_post(alert::debug))
2760
t->alerts().post_alert(peer_error_alert(
2761
m_remote, m_peer_id, "connecting to peer"));
2765
void peer_connection::on_connection_complete(asio::error_code const& e) try
2767
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2771
if (m_disconnecting) return;
2773
m_connecting = false;
2774
m_ses.m_half_open.done(m_connection_ticket);
2778
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
2779
(*m_ses.m_logger) << "CONNECTION FAILED: " << m_remote.address().to_string()
2780
<< ": " << e.message() << "\n";
2782
m_ses.connection_failed(m_socket, m_remote, e.message().c_str());
2786
if (m_disconnecting) return;
2787
m_last_receive = time_now();
2789
// this means the connection just succeeded
2791
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
2792
(*m_ses.m_logger) << "COMPLETED: " << m_remote.address().to_string() << "\n";
2799
catch (std::exception& ex)
2801
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2802
m_ses.connection_failed(m_socket, remote(), ex.what());
2806
// all exceptions should derive from std::exception
2807
TORRENT_ASSERT(false);
2808
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2809
m_ses.connection_failed(m_socket, remote(), "connection failed for unkown reason");
2812
// --------------------------
2814
// --------------------------
2816
// throws exception when the client should be disconnected
2817
void peer_connection::on_send_data(asio::error_code const& error
2818
, std::size_t bytes_transferred) try
2820
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2824
TORRENT_ASSERT(m_writing);
2826
m_send_buffer.pop_front(bytes_transferred);
2830
if (!m_ignore_bandwidth_limits)
2831
m_bandwidth_limit[upload_channel].use_quota(bytes_transferred);
2833
#ifdef TORRENT_VERBOSE_LOGGING
2834
(*m_logger) << "wrote " << bytes_transferred << " bytes\n";
2839
#ifdef TORRENT_VERBOSE_LOGGING
2840
(*m_logger) << "**ERROR**: " << error.message() << " [in peer_connection::on_send_data]\n";
2842
throw std::runtime_error(error.message());
2844
if (m_disconnecting) return;
2846
TORRENT_ASSERT(!m_connecting);
2847
TORRENT_ASSERT(bytes_transferred > 0);
2849
m_last_sent = time_now();
2851
on_sent(error, bytes_transferred);
2856
catch (std::exception& e)
2858
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2859
m_ses.connection_failed(m_socket, remote(), e.what());
2863
// all exceptions should derive from std::exception
2864
TORRENT_ASSERT(false);
2865
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2866
m_ses.connection_failed(m_socket, remote(), "connection failed for unknown reason");
2871
void peer_connection::check_invariant() const
2875
TORRENT_ASSERT(m_peer_info->connection == this
2876
|| m_peer_info->connection == 0);
2878
if (m_peer_info->optimistically_unchoked)
2879
TORRENT_ASSERT(!is_choked());
2882
boost::shared_ptr<torrent> t = m_torrent.lock();
2885
typedef session_impl::torrent_map torrent_map;
2886
torrent_map& m = m_ses.m_torrents;
2887
for (torrent_map::iterator i = m.begin(), end(m.end()); i != end; ++i)
2889
torrent& t = *i->second;
2890
TORRENT_ASSERT(t.connection_for(m_remote) != this);
2895
TORRENT_ASSERT(t->connection_for(remote()) != 0 || m_in_constructor);
2897
if (!m_in_constructor && t->connection_for(remote()) != this
2898
&& !m_ses.settings().allow_multiple_connections_per_ip)
2900
TORRENT_ASSERT(false);
2903
if (t->has_picker() && !t->is_aborted())
2905
// make sure that pieces that have completed the download
2906
// of all their blocks are in the disk io thread's queue
2908
const std::vector<piece_picker::downloading_piece>& dl_queue
2909
= t->picker().get_download_queue();
2910
for (std::vector<piece_picker::downloading_piece>::const_iterator i =
2911
dl_queue.begin(); i != dl_queue.end(); ++i)
2913
const int blocks_per_piece = t->picker().blocks_in_piece(i->index);
2915
bool complete = true;
2916
for (int j = 0; j < blocks_per_piece; ++j)
2918
if (i->info[j].state == piece_picker::block_info::state_finished)
2925
disk_io_job ret = m_ses.m_disk_thread.find_job(
2926
&t->filesystem(), -1, i->index);
2927
TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write);
2928
TORRENT_ASSERT(ret.piece == i->index);
2932
// expensive when using checked iterators
2934
if (t->valid_metadata())
2936
int piece_count = std::count(m_have_piece.begin()
2937
, m_have_piece.end(), true);
2938
if (m_num_pieces != piece_count)
2940
TORRENT_ASSERT(false);
2945
// extremely expensive invariant check
2949
piece_picker& p = t->picker();
2950
const std::vector<piece_picker::downloading_piece>& dlq = p.get_download_queue();
2951
const int blocks_per_piece = static_cast<int>(
2952
t->torrent_file().piece_length() / t->block_size());
2954
for (std::vector<piece_picker::downloading_piece>::const_iterator i =
2955
dlq.begin(); i != dlq.end(); ++i)
2957
for (int j = 0; j < blocks_per_piece; ++j)
2959
if (std::find(m_request_queue.begin(), m_request_queue.end()
2960
, piece_block(i->index, j)) != m_request_queue.end()
2962
std::find(m_download_queue.begin(), m_download_queue.end()
2963
, piece_block(i->index, j)) != m_download_queue.end())
2965
TORRENT_ASSERT(i->info[j].peer == m_remote);
2969
TORRENT_ASSERT(i->info[j].peer != m_remote || i->info[j].finished);
2978
bool peer_connection::has_timed_out() const
2980
// TODO: the timeout should be called by an event
2983
ptime now(time_now());
2985
// if the socket is still connecting, don't
2986
// consider it timed out. Because Windows XP SP2
2987
// may delay connection attempts.
2988
if (m_connecting) return false;
2990
// if the peer hasn't said a thing for a certain
2991
// time, it is considered to have timed out
2993
d = now - m_last_receive;
2994
if (d > seconds(m_timeout))
2996
#ifdef TORRENT_VERBOSE_LOGGING
2997
(*m_logger) << time_now_string() << " *** LAST ACTIVITY [ "
2998
<< total_seconds(d) << " seconds ago ] ***\n";
3003
// do not stall waiting for a handshake
3004
if (in_handshake() && d > seconds(m_ses.settings().handshake_timeout))
3006
#ifdef TORRENT_VERBOSE_LOGGING
3007
(*m_logger) << time_now_string() << " *** NO HANDSHAKE [ waited "
3008
<< total_seconds(d) << " seconds ] ***\n";
3013
// disconnect peers that we unchoked, but
3014
// they didn't send a request within 20 seconds.
3015
// but only if we're a seed
3016
boost::shared_ptr<torrent> t = m_torrent.lock();
3017
d = now - (std::max)(m_last_unchoke, m_last_incoming_request);
3018
if (m_requests.empty()
3020
&& m_peer_interested
3021
&& t && t->is_finished()
3024
#ifdef TORRENT_VERBOSE_LOGGING
3025
(*m_logger) << time_now_string() << " *** NO REQUEST [ t: "
3026
<< total_seconds(d) << " ] ***\n";
3031
// TODO: as long as we have less than 95% of the
3032
// global (or local) connection limit, connections should
3033
// never time out for another reason
3035
// if the peer hasn't become interested and we haven't
3036
// become interested in the peer for 10 minutes, it
3037
// has also timed out.
3040
d1 = now - m_became_uninterested;
3041
d2 = now - m_became_uninteresting;
3042
time_duration time_limit = seconds(
3043
m_ses.settings().inactivity_timeout);
3045
// don't bother disconnect peers we haven't been intersted
3046
// in (and that hasn't been interested in us) for a while
3047
// unless we have used up all our connection slots
3049
&& !m_peer_interested
3052
&& (m_ses.num_connections() >= m_ses.max_connections()
3053
|| (t && t->num_peers() >= t->max_connections())))
3055
#ifdef TORRENT_VERBOSE_LOGGING
3056
(*m_logger) << time_now_string() << " *** MUTUAL NO INTEREST [ "
3057
"t1: " << total_seconds(d1) << " | "
3058
"t2: " << total_seconds(d2) << " ] ***\n";
3066
peer_connection::peer_speed_t peer_connection::peer_speed()
3068
shared_ptr<torrent> t = m_torrent.lock();
3071
int download_rate = int(statistics().download_payload_rate());
3072
int torrent_download_rate = int(t->statistics().download_payload_rate());
3074
if (download_rate > 512 && download_rate > torrent_download_rate / 16)
3076
else if (download_rate > 4096 && download_rate > torrent_download_rate / 64)
3078
else if (download_rate < torrent_download_rate / 15 && m_speed == fast)
3080
else if (download_rate < torrent_download_rate / 63 && m_speed == medium)
3086
void peer_connection::keep_alive()
3091
d = time_now() - m_last_sent;
3092
if (total_seconds(d) < m_timeout / 2) return;
3094
if (m_connecting) return;
3095
if (in_handshake()) return;
3097
// if the last send has not completed yet, do not send a keep
3099
if (m_writing) return;
3101
#ifdef TORRENT_VERBOSE_LOGGING
3102
(*m_logger) << time_now_string() << " ==> KEEPALIVE\n";
3105
m_last_sent = time_now();
3109
bool peer_connection::is_seed() const
3112
// if m_num_pieces == 0, we probably don't have the
3114
return m_num_pieces == (int)m_have_piece.size() && m_num_pieces > 0;