1
// libTorrent - BitTorrent library
2
// Copyright (C) 2005-2007, Jari Sundell
4
// This program is free software; you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation; either version 2 of the License, or
7
// (at your option) any later version.
9
// This program is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
// GNU General Public License for more details.
14
// You should have received a copy of the GNU General Public License
15
// along with this program; if not, write to the Free Software
16
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
// In addition, as a special exception, the copyright holders give
19
// permission to link the code of portions of this program with the
20
// OpenSSL library under certain conditions as described in each
21
// individual source file, and distribute linked combinations
24
// You must obey the GNU General Public License in all respects for
25
// all of the code used other than OpenSSL. If you modify file(s)
26
// with this exception, you may extend this exception to your version
27
// of the file(s), but you are not obligated to do so. If you do not
28
// wish to do so, delete this exception statement from your version.
29
// If you delete this exception statement from all source files in the
30
// program, then also delete it here.
32
// Contact: Jari Sundell <jaris@ifi.uio.no>
35
// 3185 Skoppum, NORWAY
41
#include <rak/error_number.h>
42
#include <rak/string_manip.h>
44
#include "torrent/exceptions.h"
45
#include "torrent/data/block.h"
46
#include "torrent/chunk_manager.h"
47
#include "data/chunk_iterator.h"
48
#include "data/chunk_list.h"
49
#include "download/choke_manager.h"
50
#include "download/chunk_selector.h"
51
#include "download/chunk_statistics.h"
52
#include "download/download_main.h"
53
#include "net/socket_base.h"
54
#include "torrent/connection_manager.h"
55
#include "torrent/download_info.h"
56
#include "torrent/throttle.h"
57
#include "torrent/peer/peer_info.h"
58
#include "torrent/peer/connection_list.h"
60
#include "extensions.h"
61
#include "peer_connection_base.h"
67
PeerConnectionBase::PeerConnectionBase() :
70
m_down(new ProtocolRead()),
71
m_up(new ProtocolWrite()),
75
m_downInterested(false),
76
m_downUnchoked(false),
79
m_sendInterested(false),
83
m_encryptBuffer(NULL),
86
m_incoreContinous(false) {
91
PeerConnectionBase::~PeerConnectionBase() {
95
delete m_encryptBuffer;
97
if (m_extensions != NULL && !m_extensions->is_default())
100
m_extensionMessage.clear();
104
PeerConnectionBase::initialize(DownloadMain* download, PeerInfo* peerInfo, SocketFd fd, Bitfield* bitfield, EncryptionInfo* encryptionInfo, ProtocolExtension* extensions) {
105
if (get_fd().is_valid())
106
throw internal_error("Tried to re-set PeerConnection.");
109
throw internal_error("PeerConnectionBase::set(...) received bad input.");
111
if (encryptionInfo->is_encrypted() != encryptionInfo->decrypt_valid())
112
throw internal_error("Encryption and decryption inconsistent.");
116
m_peerInfo = peerInfo;
117
m_download = download;
119
m_encryption = *encryptionInfo;
120
m_extensions = extensions;
122
m_extensions->set_connection(this);
124
m_peerChunks.set_peer_info(m_peerInfo);
125
m_peerChunks.bitfield()->swap(*bitfield);
127
std::pair<ThrottleList*, ThrottleList*> throttles = m_download->throttles(m_peerInfo->socket_address());
128
m_up->set_throttle(throttles.first);
129
m_down->set_throttle(throttles.second);
131
m_peerChunks.upload_throttle()->set_list_iterator(m_up->throttle()->end());
132
m_peerChunks.upload_throttle()->slot_activate(rak::make_mem_fun(static_cast<SocketBase*>(this), &SocketBase::receive_throttle_up_activate));
134
m_peerChunks.download_throttle()->set_list_iterator(m_down->throttle()->end());
135
m_peerChunks.download_throttle()->slot_activate(rak::make_mem_fun(static_cast<SocketBase*>(this), &SocketBase::receive_throttle_down_activate));
137
download_queue()->set_delegator(m_download->delegator());
138
download_queue()->set_peer_chunks(&m_peerChunks);
140
manager->poll()->open(this);
141
manager->poll()->insert_read(this);
142
manager->poll()->insert_write(this);
143
manager->poll()->insert_error(this);
145
m_timeLastRead = cachedTime;
147
m_download->chunk_statistics()->received_connect(&m_peerChunks);
150
// update_interested();
152
m_peerChunks.download_cache()->clear();
154
if (!m_download->file_list()->is_done()) {
155
m_sendInterested = true;
156
m_downInterested = true;
163
PeerConnectionBase::cleanup() {
164
if (!get_fd().is_valid())
167
if (m_download == NULL)
168
throw internal_error("PeerConnection::~PeerConnection() m_fd is valid but m_state and/or m_net is NULL");
170
m_downloadQueue.clear();
173
down_chunk_release();
175
m_download->upload_choke_manager()->disconnected(this, &m_upChoke);
176
m_download->download_choke_manager()->disconnected(this, &m_downChoke);
177
m_download->chunk_statistics()->received_disconnect(&m_peerChunks);
179
if (!m_extensions->is_default())
180
m_extensions->cleanup();
182
manager->poll()->remove_read(this);
183
manager->poll()->remove_write(this);
184
manager->poll()->remove_error(this);
185
manager->poll()->close(this);
187
manager->connection_manager()->dec_socket_count();
192
m_up->throttle()->erase(m_peerChunks.upload_throttle());
193
m_down->throttle()->erase(m_peerChunks.download_throttle());
195
m_up->set_state(ProtocolWrite::INTERNAL_ERROR);
196
m_down->set_state(ProtocolRead::INTERNAL_ERROR);
202
PeerConnectionBase::set_upload_snubbed(bool v) {
204
m_download->upload_choke_manager()->set_snubbed(this, &m_upChoke);
206
m_download->upload_choke_manager()->set_not_snubbed(this, &m_upChoke);
210
PeerConnectionBase::receive_upload_choke(bool choke) {
211
if (choke == m_upChoke.choked())
212
throw internal_error("PeerConnectionBase::receive_upload_choke(...) already set to the same state.");
214
write_insert_poll_safe();
217
m_upChoke.set_unchoked(!choke);
218
m_upChoke.set_time_last_choke(cachedTime);
224
PeerConnectionBase::receive_download_choke(bool choke) {
225
if (choke == m_downChoke.choked())
226
throw internal_error("PeerConnectionBase::receive_download_choke(...) already set to the same state.");
228
write_insert_poll_safe();
230
m_downChoke.set_unchoked(!choke);
231
m_downChoke.set_time_last_choke(cachedTime);
234
m_peerChunks.download_cache()->disable();
236
// If the queue isn't empty, then we might still receive some
237
// pieces, so don't remove us from throttle.
238
if (!download_queue()->is_downloading() && download_queue()->queued_empty())
239
m_down->throttle()->erase(m_peerChunks.download_throttle());
241
// Send uninterested if unchoked, but only _after_ receiving our
244
if (m_downUnchoked) {
245
// Tell the peer we're no longer interested to avoid
246
// disconnects. We keep the connection in the queue so that
247
// ChokeManager::cycle(...) can attempt to get us unchoked
250
m_sendInterested = m_downInterested;
251
m_downInterested = false;
254
// Remove from queue so that an unchoke from the remote peer
255
// will cause the connection to be unchoked immediately by the
257
m_downChoke.set_queued(false);
264
if (!m_downInterested) {
265
// We were marked as not interested by the cycling choke and
266
// kept in the queue, thus the peer should have some pieces of
269
// We have now been 'unchoked' by the choke manager, so tell the
270
// peer that we're again interested. If the peer doesn't unchoke
271
// us within a cycle or two we're likely to be choked and left
272
// out of the queue. So if the peer unchokes us at a later time,
273
// we skip the queue and unchoke immediately.
275
m_sendInterested = !m_downInterested;
276
m_downInterested = true;
284
log_upload_chunk_mincore(Chunk* chunk, const Piece& piece, bool new_index, bool& continous) {
285
#ifdef LT_LOG_MINCORE_FILE
286
static int mincore_fd = -1;
287
static int32_t ticker = rak::timer::current().seconds() / 10 * 10;
289
static int counter_incore = 0;
290
static int counter_not_incore = 0;
291
static int counter_incore_new = 0;
292
static int counter_not_incore_new = 0;
293
static int counter_incore_break = 0;
295
if (rak::timer::current().seconds() >= ticker + 10) {
298
if (mincore_fd == -1) {
299
snprintf(buffer, 256, LT_LOG_MINCORE_FILE, getpid());
301
if ((mincore_fd = open(buffer, O_WRONLY | O_CREAT | O_TRUNC)) == -1)
302
throw internal_error("Could not open mincore log file.");
305
// Log the result of mincore for every piece uploaded to a file.
306
unsigned int buf_lenght = snprintf(buffer, 256, "%i %u %u %u %u %u\n",
308
counter_incore, counter_incore_new, counter_not_incore,
309
counter_not_incore_new, counter_incore_break);
311
write(mincore_fd, buffer, buf_lenght);
313
ticker = rak::timer::current().seconds() / 10 * 10;
316
counter_not_incore = 0;
317
counter_incore_new = 0;
318
counter_not_incore_new = 0;
319
counter_incore_break = 0;
322
bool is_incore = chunk->is_incore(piece.offset(), piece.length());
324
counter_incore += !new_index && is_incore;
325
counter_incore_new += new_index && is_incore;
326
counter_not_incore += !new_index && !is_incore;
327
counter_not_incore_new += new_index && !is_incore;
329
counter_incore_break += continous && !is_incore;
330
continous = is_incore;
335
PeerConnectionBase::load_up_chunk() {
336
if (m_upChunk.is_valid() && m_upChunk.index() == m_upPiece.index()) {
337
// Better checking needed.
338
// m_upChunk.chunk()->preload(m_upPiece.offset(), m_upChunk.chunk()->size());
339
log_upload_chunk_mincore(m_upChunk.chunk(), m_upPiece, false, m_incoreContinous);
345
m_upChunk = m_download->chunk_list()->get(m_upPiece.index(), false);
347
if (!m_upChunk.is_valid())
348
throw storage_error("File chunk read error: " + std::string(m_upChunk.error_number().c_str()));
350
if (is_encrypted() && m_encryptBuffer == NULL) {
351
m_encryptBuffer = new EncryptBuffer();
352
m_encryptBuffer->reset();
355
m_incoreContinous = false;
356
log_upload_chunk_mincore(m_upChunk.chunk(), m_upPiece, true, m_incoreContinous);
357
m_incoreContinous = true;
359
// Also check if we've already preloaded in the recent past, even
361
ChunkManager* cm = manager->chunk_manager();
362
uint32_t preloadSize = m_upChunk.chunk()->chunk_size() - m_upPiece.offset();
364
if (cm->preload_type() == 0 ||
365
m_upChunk.object()->time_preloaded() >= cachedTime - rak::timer::from_seconds(60) ||
367
preloadSize < cm->preload_min_size() ||
368
m_peerChunks.upload_throttle()->rate()->rate() < cm->preload_required_rate() * ((preloadSize + (2 << 20) - 1) / (2 << 20))) {
369
cm->inc_stats_not_preloaded();
373
cm->inc_stats_preloaded();
375
m_upChunk.object()->set_time_preloaded(cachedTime);
376
m_upChunk.chunk()->preload(m_upPiece.offset(), m_upChunk.chunk()->chunk_size(), cm->preload_type() == 1);
380
PeerConnectionBase::cancel_transfer(BlockTransfer* transfer) {
381
if (!get_fd().is_valid())
382
throw internal_error("PeerConnectionBase::cancel_transfer(...) !get_fd().is_valid().");
384
// We don't send cancel messages if the transfer has already
386
if (transfer == m_downloadQueue.transfer())
389
write_insert_poll_safe();
391
m_peerChunks.cancel_queue()->push_back(transfer->piece());
392
// m_downloadQueue.cancel_transfer(transfer);
396
PeerConnectionBase::event_error() {
397
m_download->connection_list()->erase(this, 0);
401
PeerConnectionBase::down_chunk_start(const Piece& piece) {
402
if (!download_queue()->downloading(piece)) {
403
if (piece.length() == 0)
404
m_download->info()->signal_network_log().emit("Received piece with length zero.");
409
if (!m_download->file_list()->is_valid_piece(piece))
410
throw internal_error("Incoming pieces list contains a bad piece.");
412
if (!m_downChunk.is_valid() || piece.index() != m_downChunk.index()) {
413
down_chunk_release();
414
m_downChunk = m_download->chunk_list()->get(piece.index(), true);
416
if (!m_downChunk.is_valid())
417
throw storage_error("File chunk write error: " + std::string(m_downChunk.error_number().c_str()) + ".");
420
return m_downloadQueue.transfer()->is_leader();
424
PeerConnectionBase::down_chunk_finished() {
425
if (!download_queue()->transfer()->is_finished())
426
throw internal_error("PeerConnectionBase::down_chunk_finished() Transfer not finished.");
428
if (download_queue()->transfer()->is_leader()) {
429
if (!m_downChunk.is_valid())
430
throw internal_error("PeerConnectionBase::down_chunk_finished() Transfer is the leader, but no chunk allocated.");
432
download_queue()->finished();
433
m_downChunk.object()->set_time_modified(cachedTime);
436
download_queue()->skipped();
442
// TODO: clear m_down.data?
444
// If we were choked by choke_manager but still had queued pieces,
445
// then we might still be in the throttle.
446
if (m_downChoke.choked() && download_queue()->queued_empty())
447
m_down->throttle()->erase(m_peerChunks.download_throttle());
449
write_insert_poll_safe();
453
PeerConnectionBase::down_chunk() {
454
if (!m_down->throttle()->is_throttled(m_peerChunks.download_throttle()))
455
throw internal_error("PeerConnectionBase::down_chunk() tried to read a piece but is not in throttle list");
457
if (!m_downChunk.chunk()->is_writable())
458
throw internal_error("PeerConnectionBase::down_part() chunk not writable, permission denided");
460
uint32_t quota = m_down->throttle()->node_quota(m_peerChunks.download_throttle());
463
manager->poll()->remove_read(this);
464
m_down->throttle()->node_deactivate(m_peerChunks.download_throttle());
468
uint32_t bytesTransfered = 0;
469
BlockTransfer* transfer = m_downloadQueue.transfer();
471
Chunk::data_type data;
472
ChunkIterator itr(m_downChunk.chunk(),
473
transfer->piece().offset() + transfer->position(),
474
transfer->piece().offset() + std::min(transfer->position() + quota, transfer->piece().length()));
478
data.second = read_stream_throws(data.first, data.second);
481
m_encryption.decrypt(data.first, data.second);
483
bytesTransfered += data.second;
485
} while (data.second != 0 && itr.forward(data.second));
487
transfer->adjust_position(bytesTransfered);
489
m_down->throttle()->node_used(m_peerChunks.download_throttle(), bytesTransfered);
490
m_download->info()->mutable_down_rate()->insert(bytesTransfered);
492
return transfer->is_finished();
496
PeerConnectionBase::down_chunk_from_buffer() {
497
m_down->buffer()->consume(down_chunk_process(m_down->buffer()->position(), m_down->buffer()->remaining()));
499
if (!m_downloadQueue.transfer()->is_finished() && m_down->buffer()->remaining() != 0)
500
throw internal_error("PeerConnectionBase::down_chunk_from_buffer() !transfer->is_finished() && m_down->buffer()->remaining() != 0.");
502
return m_downloadQueue.transfer()->is_finished();
505
// When this transfer again becomes the leader, we just return false
506
// and wait for the next polling. It is an exceptional case so we
507
// don't really care that much about performance.
509
PeerConnectionBase::down_chunk_skip() {
510
ThrottleList* throttle = m_down->throttle();
512
if (!throttle->is_throttled(m_peerChunks.download_throttle()))
513
throw internal_error("PeerConnectionBase::down_chunk_skip() tried to read a piece but is not in throttle list");
515
uint32_t quota = throttle->node_quota(m_peerChunks.download_throttle());
518
manager->poll()->remove_read(this);
519
throttle->node_deactivate(m_peerChunks.download_throttle());
523
uint32_t length = read_stream_throws(m_nullBuffer, std::min(quota, m_downloadQueue.transfer()->piece().length() - m_downloadQueue.transfer()->position()));
524
throttle->node_used(m_peerChunks.download_throttle(), length);
527
m_encryption.decrypt(m_nullBuffer, length);
529
if (down_chunk_skip_process(m_nullBuffer, length) != length)
530
throw internal_error("PeerConnectionBase::down_chunk_skip() down_chunk_skip_process(m_nullBuffer, length) != length.");
532
return m_downloadQueue.transfer()->is_finished();
536
PeerConnectionBase::down_chunk_skip_from_buffer() {
537
m_down->buffer()->consume(down_chunk_skip_process(m_down->buffer()->position(), m_down->buffer()->remaining()));
539
return m_downloadQueue.transfer()->is_finished();
542
// Process data from a leading transfer.
544
PeerConnectionBase::down_chunk_process(const void* buffer, uint32_t length) {
545
if (!m_downChunk.is_valid() || m_downChunk.index() != m_downloadQueue.transfer()->index())
546
throw internal_error("PeerConnectionBase::down_chunk_process(...) !m_downChunk.is_valid() || m_downChunk.index() != m_downloadQueue.transfer()->index().");
551
BlockTransfer* transfer = m_downloadQueue.transfer();
553
length = std::min(transfer->piece().length() - transfer->position(), length);
555
m_downChunk.chunk()->from_buffer(buffer, transfer->piece().offset() + transfer->position(), length);
557
transfer->adjust_position(length);
559
m_down->throttle()->node_used(m_peerChunks.download_throttle(), length);
560
m_download->info()->mutable_down_rate()->insert(length);
565
// Process data from non-leading transfer. If this transfer encounters
566
// mismatching data with the leader then bork this transfer. If we get
567
// ahead of the leader, we switch the leader.
569
PeerConnectionBase::down_chunk_skip_process(const void* buffer, uint32_t length) {
570
BlockTransfer* transfer = m_downloadQueue.transfer();
572
// Adjust 'length' to be less than or equal to what is remaining of
573
// the block to simplify the rest of the function.
574
length = std::min(length, transfer->piece().length() - transfer->position());
576
// Hmm, this might result in more bytes than nessesary being
578
m_down->throttle()->node_used(m_peerChunks.download_throttle(), length);
579
m_download->info()->mutable_down_rate()->insert(length);
580
m_download->info()->mutable_skip_rate()->insert(length);
582
if (!transfer->is_valid()) {
583
transfer->adjust_position(length);
587
if (!transfer->block()->is_transfering())
588
throw internal_error("PeerConnectionBase::down_chunk_skip_process(...) block is not transfering, yet we have non-leaders.");
591
if (transfer->position() > transfer->block()->leader()->position())
592
throw internal_error("PeerConnectionBase::down_chunk_skip_process(...) transfer is past the Block's position.");
594
// If the transfer is valid, compare the downloaded data to the
596
uint32_t compareLength = std::min(length, transfer->block()->leader()->position() - transfer->position());
598
// The data doesn't match with what has previously been downloaded,
599
// bork this transfer.
600
if (!m_downChunk.chunk()->compare_buffer(buffer, transfer->piece().offset() + transfer->position(), compareLength)) {
601
m_download->info()->signal_network_log().emit("Data does not match what was previously downloaded.");
603
m_downloadQueue.transfer_dissimilar();
604
m_downloadQueue.transfer()->adjust_position(length);
609
transfer->adjust_position(compareLength);
611
if (compareLength == length)
614
// Add another check here to see if we really want to be the new
617
transfer->block()->change_leader(transfer);
619
if (down_chunk_process(static_cast<const char*>(buffer) + compareLength, length - compareLength) != length - compareLength)
620
throw internal_error("PeerConnectionBase::down_chunk_skip_process(...) down_chunk_process(...) returned wrong value.");
626
PeerConnectionBase::down_extension() {
627
if (m_down->buffer()->remaining()) {
628
uint32_t need = std::min(m_extensions->read_need(), (uint32_t)m_down->buffer()->remaining());
629
std::memcpy(m_extensions->read_position(), m_down->buffer()->position(), need);
631
m_extensions->read_move(need);
632
m_down->buffer()->consume(need);
635
if (!m_extensions->is_complete()) {
636
uint32_t bytes = read_stream_throws(m_extensions->read_position(), m_extensions->read_need());
637
m_down->throttle()->node_used_unthrottled(bytes);
640
m_encryption.decrypt(m_extensions->read_position(), bytes);
642
m_extensions->read_move(bytes);
645
// If extension can't be processed yet (due to a pending write),
646
// disable reads until the pending message is completely sent.
647
if (m_extensions->is_complete() && !m_extensions->is_invalid() && !m_extensions->read_done()) {
648
manager->poll()->remove_read(this);
652
return m_extensions->is_complete();
656
PeerConnectionBase::up_chunk_encrypt(uint32_t quota) {
657
if (m_encryptBuffer == NULL)
658
throw internal_error("PeerConnectionBase::up_chunk: m_encryptBuffer is NULL.");
660
if (quota <= m_encryptBuffer->remaining())
663
// Also, consider checking here if the number of bytes remaining in
664
// the buffer is small enought that the cost of moving them would
665
// outweigh the extra context switches, etc.
667
if (m_encryptBuffer->remaining() == 0) {
668
// This handles reset also for new chunk transfers.
669
m_encryptBuffer->reset();
671
quota = std::min<uint32_t>(quota, m_encryptBuffer->reserved());
674
quota = std::min<uint32_t>(quota - m_encryptBuffer->remaining(), m_encryptBuffer->reserved_left());
677
m_upChunk.chunk()->to_buffer(m_encryptBuffer->end(), m_upPiece.offset() + m_encryptBuffer->remaining(), quota);
678
m_encryption.encrypt(m_encryptBuffer->end(), quota);
679
m_encryptBuffer->move_end(quota);
681
return m_encryptBuffer->remaining();
685
PeerConnectionBase::up_chunk() {
686
if (!m_up->throttle()->is_throttled(m_peerChunks.upload_throttle()))
687
throw internal_error("PeerConnectionBase::up_chunk() tried to write a piece but is not in throttle list");
689
if (!m_upChunk.chunk()->is_readable())
690
throw internal_error("ProtocolChunk::write_part() chunk not readable, permission denided");
692
uint32_t quota = m_up->throttle()->node_quota(m_peerChunks.upload_throttle());
695
manager->poll()->remove_write(this);
696
m_up->throttle()->node_deactivate(m_peerChunks.upload_throttle());
700
uint32_t bytesTransfered = 0;
702
if (is_encrypted()) {
703
// Prepare as many bytes as quota specifies, up to end of piece or
704
// buffer. Only bytes beyond remaining() are new and will be
706
quota = up_chunk_encrypt(std::min(quota, m_upPiece.length()));
708
bytesTransfered = write_stream_throws(m_encryptBuffer->position(), quota);
709
m_encryptBuffer->consume(bytesTransfered);
712
Chunk::data_type data;
713
ChunkIterator itr(m_upChunk.chunk(), m_upPiece.offset(), m_upPiece.offset() + std::min(quota, m_upPiece.length()));
717
data.second = write_stream_throws(data.first, data.second);
719
bytesTransfered += data.second;
721
} while (data.second != 0 && itr.forward(data.second));
724
m_up->throttle()->node_used(m_peerChunks.upload_throttle(), bytesTransfered);
725
m_download->info()->mutable_up_rate()->insert(bytesTransfered);
727
// Just modifying the piece to cover the remaining data ends up
728
// being much cleaner and we avoid an unnessesary position variable.
729
m_upPiece.set_offset(m_upPiece.offset() + bytesTransfered);
730
m_upPiece.set_length(m_upPiece.length() - bytesTransfered);
732
return m_upPiece.length() == 0;
736
PeerConnectionBase::up_extension() {
737
if (m_extensionOffset == extension_must_encrypt) {
738
if (m_extensionMessage.owned()) {
739
m_encryption.encrypt(m_extensionMessage.data(), m_extensionMessage.length());
742
char* buffer = new char[m_extensionMessage.length()];
744
m_encryption.encrypt(m_extensionMessage.data(), buffer, m_extensionMessage.length());
745
m_extensionMessage.set(buffer, buffer + m_extensionMessage.length(), true);
748
m_extensionOffset = 0;
751
if (m_extensionOffset >= m_extensionMessage.length())
752
throw internal_error("PeerConnectionBase::up_extension bad offset.");
754
uint32_t written = write_stream_throws(m_extensionMessage.data() + m_extensionOffset, m_extensionMessage.length() - m_extensionOffset);
755
m_up->throttle()->node_used_unthrottled(written);
756
m_extensionOffset += written;
758
if (m_extensionOffset < m_extensionMessage.length())
761
m_extensionMessage.clear();
763
// If we have an unprocessed message, process it now and enable reads again.
764
if (m_extensions->is_complete() && !m_extensions->is_invalid()) {
765
// DEBUG: What, this should fail when we block, no?
766
if (!m_extensions->read_done())
767
throw internal_error("PeerConnectionBase::up_extension could not process complete extension message.");
769
manager->poll()->insert_read(this);
776
PeerConnectionBase::down_chunk_release() {
777
if (m_downChunk.is_valid())
778
m_download->chunk_list()->release(&m_downChunk);
782
PeerConnectionBase::up_chunk_release() {
783
if (m_upChunk.is_valid())
784
m_download->chunk_list()->release(&m_upChunk);
788
PeerConnectionBase::read_request_piece(const Piece& p) {
789
PeerChunks::piece_list_type::iterator itr = std::find(m_peerChunks.upload_queue()->begin(), m_peerChunks.upload_queue()->end(), p);
791
if (m_upChoke.choked() || itr != m_peerChunks.upload_queue()->end() || p.length() > (1 << 17))
794
m_peerChunks.upload_queue()->push_back(p);
795
write_insert_poll_safe();
799
PeerConnectionBase::read_cancel_piece(const Piece& p) {
800
PeerChunks::piece_list_type::iterator itr = std::find(m_peerChunks.upload_queue()->begin(), m_peerChunks.upload_queue()->end(), p);
802
if (itr != m_peerChunks.upload_queue()->end())
803
m_peerChunks.upload_queue()->erase(itr);
807
PeerConnectionBase::write_prepare_piece() {
808
m_upPiece = m_peerChunks.upload_queue()->front();
809
m_peerChunks.upload_queue()->pop_front();
811
// Move these checks somewhere else?
812
if (!m_download->file_list()->is_valid_piece(m_upPiece) ||
813
!m_download->file_list()->bitfield()->get(m_upPiece.index())) {
815
snprintf(buffer, 128, "Peer requested an invalid piece: %u %u %u", m_upPiece.index(), m_upPiece.length(), m_upPiece.offset());
817
throw communication_error(buffer);
820
m_up->write_piece(m_upPiece);
824
PeerConnectionBase::write_prepare_extension(int type, const DataBuffer& message) {
825
m_up->write_extension(m_extensions->id(type), message.length());
827
m_extensionOffset = 0;
828
m_extensionMessage = message;
830
// Need to encrypt the buffer, but not until the m_up
831
// write buffer has been flushed, so flag it for now.
833
m_extensionOffset = extension_must_encrypt;
836
// High stall count peers should request if we're *not* in endgame, or
837
// if we're in endgame and the download is too slow. Prefere not to request
838
// from high stall counts when we are doing decent speeds.
840
PeerConnectionBase::should_request() {
841
if (m_downChoke.choked() || !m_downInterested || !m_downUnchoked)
842
// || m_down->get_state() == ProtocolRead::READ_SKIP_PIECE)
845
else if (!m_download->delegator()->get_aggressive())
849
// We check if the peer is stalled, if it is not then we should
850
// request. If the peer is stalled then we only request if the
851
// download rate is below a certain value.
852
return m_downStall <= 1 || m_download->info()->down_rate()->rate() < (10 << 10);
856
PeerConnectionBase::try_request_pieces() {
857
if (download_queue()->queued_empty())
860
uint32_t pipeSize = download_queue()->calculate_pipe_size(m_peerChunks.download_throttle()->rate()->rate());
862
// Don't start requesting if we can't do it in large enough chunks.
863
if (download_queue()->queued_size() >= (pipeSize + 10) / 2)
866
bool success = false;
868
while (download_queue()->queued_size() < pipeSize && m_up->can_write_request()) {
870
// Delegator should return a vector of pieces, and it should be
871
// passed the number of pieces it should delegate. Try to ensure
872
// it receives large enough request to fill a whole chunk if the
873
// peer is fast enough.
875
const Piece* p = download_queue()->delegate();
880
if (!m_download->file_list()->is_valid_piece(*p) || !m_peerChunks.bitfield()->get(p->index()))
881
throw internal_error("PeerConnectionBase::try_request_pieces() tried to use an invalid piece.");
883
m_up->write_request(*p);
891
// Send one peer exchange message according to bits set in m_sendPEXMask.
892
// We can only send one message at a time, because the derived class
893
// needs to flush the buffer and call up_extension before the next one.
895
PeerConnectionBase::send_pex_message() {
896
if (!m_extensions->is_remote_supported(ProtocolExtension::UT_PEX)) {
901
// Message to tell peer to stop/start doing PEX is small so send it first.
902
if (m_sendPEXMask & (PEX_ENABLE | PEX_DISABLE)) {
903
if (!m_extensions->is_remote_supported(ProtocolExtension::UT_PEX))
904
throw internal_error("PeerConnectionBase::send_pex_message() Not supported by peer.");
906
write_prepare_extension(ProtocolExtension::HANDSHAKE,
907
ProtocolExtension::generate_toggle_message(ProtocolExtension::UT_PEX, (m_sendPEXMask & PEX_ENABLE) != 0));
909
m_sendPEXMask &= ~(PEX_ENABLE | PEX_DISABLE);
911
} else if (m_sendPEXMask & PEX_DO && m_extensions->id(ProtocolExtension::UT_PEX)) {
912
const DataBuffer& pexMessage = m_download->get_ut_pex(m_extensions->is_initial_pex());
913
m_extensions->clear_initial_pex();
915
m_sendPEXMask &= ~PEX_DO;
917
if (pexMessage.empty())
920
write_prepare_extension(ProtocolExtension::UT_PEX, pexMessage);
929
// Extension protocol needs to send a reply.
931
PeerConnectionBase::send_ext_message() {
932
write_prepare_extension(m_extensions->pending_message_type(), m_extensions->pending_message_data());
933
m_extensions->clear_pending_message();
938
PeerConnectionBase::receive_metadata_piece(uint32_t piece, const char* data, uint32_t length) {