~ubuntu-branches/ubuntu/wily/libtorrent/wily-proposed

« back to all changes in this revision

Viewing changes to .pc/spelling-fixes/src/protocol/peer_connection_base.cc

  • Committer: Bazaar Package Importer
  • Author(s): Rogério Brito
  • Date: 2011-03-20 01:06:18 UTC
  • mfrom: (1.1.13 upstream) (4.1.9 sid)
  • Revision ID: james.westby@ubuntu.com-20110320010618-g3wyylccqzqko73c
Tags: 0.12.7-5
* Use Steinar's "real" patch for IPv6. Addresses #490277, #618275,
  and Closes: #617791.
* Adapt libtorrent-0.12.6-ipv6-07.patch. It FTBFS otherwise.
* Add proper attibution to the IPv6 patch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// libTorrent - BitTorrent library
 
2
// Copyright (C) 2005-2007, Jari Sundell
 
3
//
 
4
// This program is free software; you can redistribute it and/or modify
 
5
// it under the terms of the GNU General Public License as published by
 
6
// the Free Software Foundation; either version 2 of the License, or
 
7
// (at your option) any later version.
 
8
// 
 
9
// This program is distributed in the hope that it will be useful,
 
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
// GNU General Public License for more details.
 
13
// 
 
14
// You should have received a copy of the GNU General Public License
 
15
// along with this program; if not, write to the Free Software
 
16
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
17
//
 
18
// In addition, as a special exception, the copyright holders give
 
19
// permission to link the code of portions of this program with the
 
20
// OpenSSL library under certain conditions as described in each
 
21
// individual source file, and distribute linked combinations
 
22
// including the two.
 
23
//
 
24
// You must obey the GNU General Public License in all respects for
 
25
// all of the code used other than OpenSSL.  If you modify file(s)
 
26
// with this exception, you may extend this exception to your version
 
27
// of the file(s), but you are not obligated to do so.  If you do not
 
28
// wish to do so, delete this exception statement from your version.
 
29
// If you delete this exception statement from all source files in the
 
30
// program, then also delete it here.
 
31
//
 
32
// Contact:  Jari Sundell <jaris@ifi.uio.no>
 
33
//
 
34
//           Skomakerveien 33
 
35
//           3185 Skoppum, NORWAY
 
36
 
 
37
#include "config.h"
 
38
 
 
39
#include <cstdio>
 
40
#include <fcntl.h>
 
41
#include <rak/error_number.h>
 
42
#include <rak/string_manip.h>
 
43
 
 
44
#include "torrent/exceptions.h"
 
45
#include "torrent/data/block.h"
 
46
#include "torrent/chunk_manager.h"
 
47
#include "data/chunk_iterator.h"
 
48
#include "data/chunk_list.h"
 
49
#include "download/choke_manager.h"
 
50
#include "download/chunk_selector.h"
 
51
#include "download/chunk_statistics.h"
 
52
#include "download/download_main.h"
 
53
#include "net/socket_base.h"
 
54
#include "torrent/connection_manager.h"
 
55
#include "torrent/download_info.h"
 
56
#include "torrent/throttle.h"
 
57
#include "torrent/peer/peer_info.h"
 
58
#include "torrent/peer/connection_list.h"
 
59
 
 
60
#include "extensions.h"
 
61
#include "peer_connection_base.h"
 
62
 
 
63
#include "manager.h"
 
64
 
 
65
namespace torrent {
 
66
 
 
67
PeerConnectionBase::PeerConnectionBase() :
 
68
  m_download(NULL),
 
69
  
 
70
  m_down(new ProtocolRead()),
 
71
  m_up(new ProtocolWrite()),
 
72
 
 
73
  m_downStall(0),
 
74
 
 
75
  m_downInterested(false),
 
76
  m_downUnchoked(false),
 
77
 
 
78
  m_sendChoked(false),
 
79
  m_sendInterested(false),
 
80
  m_tryRequest(true),
 
81
  m_sendPEXMask(0),
 
82
 
 
83
  m_encryptBuffer(NULL),
 
84
  m_extensions(NULL),
 
85
 
 
86
  m_incoreContinous(false) {
 
87
 
 
88
  m_peerInfo = NULL;
 
89
}
 
90
 
 
91
PeerConnectionBase::~PeerConnectionBase() {
 
92
  delete m_up;
 
93
  delete m_down;
 
94
 
 
95
  delete m_encryptBuffer;
 
96
 
 
97
  if (m_extensions != NULL && !m_extensions->is_default())
 
98
    delete m_extensions;
 
99
 
 
100
  m_extensionMessage.clear();
 
101
}
 
102
 
 
103
void
 
104
PeerConnectionBase::initialize(DownloadMain* download, PeerInfo* peerInfo, SocketFd fd, Bitfield* bitfield, EncryptionInfo* encryptionInfo, ProtocolExtension* extensions) {
 
105
  if (get_fd().is_valid())
 
106
    throw internal_error("Tried to re-set PeerConnection.");
 
107
 
 
108
  if (!fd.is_valid())
 
109
    throw internal_error("PeerConnectionBase::set(...) received bad input.");
 
110
 
 
111
  if (encryptionInfo->is_encrypted() != encryptionInfo->decrypt_valid())
 
112
    throw internal_error("Encryption and decryption inconsistent.");
 
113
 
 
114
  set_fd(fd);
 
115
 
 
116
  m_peerInfo = peerInfo;
 
117
  m_download = download;
 
118
 
 
119
  m_encryption = *encryptionInfo;
 
120
  m_extensions = extensions;
 
121
 
 
122
  m_extensions->set_connection(this);
 
123
 
 
124
  m_peerChunks.set_peer_info(m_peerInfo);
 
125
  m_peerChunks.bitfield()->swap(*bitfield);
 
126
 
 
127
  std::pair<ThrottleList*, ThrottleList*> throttles = m_download->throttles(m_peerInfo->socket_address());
 
128
  m_up->set_throttle(throttles.first);
 
129
  m_down->set_throttle(throttles.second);
 
130
 
 
131
  m_peerChunks.upload_throttle()->set_list_iterator(m_up->throttle()->end());
 
132
  m_peerChunks.upload_throttle()->slot_activate(rak::make_mem_fun(static_cast<SocketBase*>(this), &SocketBase::receive_throttle_up_activate));
 
133
 
 
134
  m_peerChunks.download_throttle()->set_list_iterator(m_down->throttle()->end());
 
135
  m_peerChunks.download_throttle()->slot_activate(rak::make_mem_fun(static_cast<SocketBase*>(this), &SocketBase::receive_throttle_down_activate));
 
136
 
 
137
  download_queue()->set_delegator(m_download->delegator());
 
138
  download_queue()->set_peer_chunks(&m_peerChunks);
 
139
 
 
140
  manager->poll()->open(this);
 
141
  manager->poll()->insert_read(this);
 
142
  manager->poll()->insert_write(this);
 
143
  manager->poll()->insert_error(this);
 
144
 
 
145
  m_timeLastRead = cachedTime;
 
146
 
 
147
  m_download->chunk_statistics()->received_connect(&m_peerChunks);
 
148
 
 
149
  // Hmm... cleanup?
 
150
//   update_interested();
 
151
 
 
152
  m_peerChunks.download_cache()->clear();
 
153
 
 
154
  if (!m_download->file_list()->is_done()) {
 
155
    m_sendInterested = true;
 
156
    m_downInterested = true;
 
157
  }
 
158
 
 
159
  initialize_custom();
 
160
}
 
161
 
 
162
void
 
163
PeerConnectionBase::cleanup() {
 
164
  if (!get_fd().is_valid())
 
165
    return;
 
166
 
 
167
  if (m_download == NULL)
 
168
    throw internal_error("PeerConnection::~PeerConnection() m_fd is valid but m_state and/or m_net is NULL");
 
169
 
 
170
  m_downloadQueue.clear();
 
171
 
 
172
  up_chunk_release();
 
173
  down_chunk_release();
 
174
 
 
175
  m_download->upload_choke_manager()->disconnected(this, &m_upChoke);
 
176
  m_download->download_choke_manager()->disconnected(this, &m_downChoke);
 
177
  m_download->chunk_statistics()->received_disconnect(&m_peerChunks);
 
178
 
 
179
  if (!m_extensions->is_default())
 
180
    m_extensions->cleanup();
 
181
 
 
182
  manager->poll()->remove_read(this);
 
183
  manager->poll()->remove_write(this);
 
184
  manager->poll()->remove_error(this);
 
185
  manager->poll()->close(this);
 
186
  
 
187
  manager->connection_manager()->dec_socket_count();
 
188
 
 
189
  get_fd().close();
 
190
  get_fd().clear();
 
191
 
 
192
  m_up->throttle()->erase(m_peerChunks.upload_throttle());
 
193
  m_down->throttle()->erase(m_peerChunks.download_throttle());
 
194
 
 
195
  m_up->set_state(ProtocolWrite::INTERNAL_ERROR);
 
196
  m_down->set_state(ProtocolRead::INTERNAL_ERROR);
 
197
 
 
198
  m_download = NULL;
 
199
}
 
200
 
 
201
void
 
202
PeerConnectionBase::set_upload_snubbed(bool v) {
 
203
  if (v)
 
204
    m_download->upload_choke_manager()->set_snubbed(this, &m_upChoke);
 
205
  else
 
206
    m_download->upload_choke_manager()->set_not_snubbed(this, &m_upChoke);
 
207
}
 
208
 
 
209
bool
 
210
PeerConnectionBase::receive_upload_choke(bool choke) {
 
211
  if (choke == m_upChoke.choked())
 
212
    throw internal_error("PeerConnectionBase::receive_upload_choke(...) already set to the same state.");
 
213
 
 
214
  write_insert_poll_safe();
 
215
 
 
216
  m_sendChoked = true;
 
217
  m_upChoke.set_unchoked(!choke);
 
218
  m_upChoke.set_time_last_choke(cachedTime);
 
219
 
 
220
  return true;
 
221
}
 
222
 
 
223
bool
 
224
PeerConnectionBase::receive_download_choke(bool choke) {
 
225
  if (choke == m_downChoke.choked())
 
226
    throw internal_error("PeerConnectionBase::receive_download_choke(...) already set to the same state.");
 
227
 
 
228
  write_insert_poll_safe();
 
229
 
 
230
  m_downChoke.set_unchoked(!choke);
 
231
  m_downChoke.set_time_last_choke(cachedTime);
 
232
 
 
233
  if (choke) {
 
234
    m_peerChunks.download_cache()->disable();
 
235
 
 
236
    // If the queue isn't empty, then we might still receive some
 
237
    // pieces, so don't remove us from throttle.
 
238
    if (!download_queue()->is_downloading() && download_queue()->queued_empty())
 
239
      m_down->throttle()->erase(m_peerChunks.download_throttle());
 
240
 
 
241
    // Send uninterested if unchoked, but only _after_ receiving our
 
242
    // chunks?
 
243
 
 
244
    if (m_downUnchoked) {
 
245
      // Tell the peer we're no longer interested to avoid
 
246
      // disconnects. We keep the connection in the queue so that
 
247
      // ChokeManager::cycle(...) can attempt to get us unchoked
 
248
      // again.
 
249
 
 
250
      m_sendInterested = m_downInterested;
 
251
      m_downInterested = false;
 
252
 
 
253
    } else {
 
254
      // Remove from queue so that an unchoke from the remote peer
 
255
      // will cause the connection to be unchoked immediately by the
 
256
      // choke manager.
 
257
      m_downChoke.set_queued(false);
 
258
      return false;
 
259
    }
 
260
 
 
261
  } else {
 
262
    m_tryRequest = true;
 
263
 
 
264
    if (!m_downInterested) {
 
265
      // We were marked as not interested by the cycling choke and
 
266
      // kept in the queue, thus the peer should have some pieces of
 
267
      // interest.
 
268
      //
 
269
      // We have now been 'unchoked' by the choke manager, so tell the
 
270
      // peer that we're again interested. If the peer doesn't unchoke
 
271
      // us within a cycle or two we're likely to be choked and left
 
272
      // out of the queue. So if the peer unchokes us at a later time,
 
273
      // we skip the queue and unchoke immediately.
 
274
 
 
275
      m_sendInterested = !m_downInterested;
 
276
      m_downInterested = true;
 
277
    }
 
278
  }
 
279
 
 
280
  return true;
 
281
}
 
282
 
 
283
inline static void
 
284
log_upload_chunk_mincore(Chunk* chunk, const Piece& piece, bool new_index, bool& continous) {
 
285
#ifdef LT_LOG_MINCORE_FILE
 
286
  static int mincore_fd = -1;
 
287
  static int32_t ticker = rak::timer::current().seconds() / 10 * 10;
 
288
 
 
289
  static int counter_incore = 0;
 
290
  static int counter_not_incore = 0;
 
291
  static int counter_incore_new = 0;
 
292
  static int counter_not_incore_new = 0;
 
293
  static int counter_incore_break = 0;
 
294
 
 
295
  if (rak::timer::current().seconds() >= ticker + 10) {
 
296
    char buffer[256];
 
297
 
 
298
    if (mincore_fd == -1) {
 
299
      snprintf(buffer, 256, LT_LOG_MINCORE_FILE, getpid());
 
300
    
 
301
      if ((mincore_fd = open(buffer, O_WRONLY | O_CREAT | O_TRUNC)) == -1)
 
302
        throw internal_error("Could not open mincore log file.");
 
303
    }
 
304
 
 
305
    // Log the result of mincore for every piece uploaded to a file.
 
306
    unsigned int buf_lenght = snprintf(buffer, 256, "%i %u %u %u %u %u\n",
 
307
                                       ticker,
 
308
                                       counter_incore, counter_incore_new, counter_not_incore,
 
309
                                       counter_not_incore_new, counter_incore_break);
 
310
 
 
311
    write(mincore_fd, buffer, buf_lenght);
 
312
    
 
313
    ticker = rak::timer::current().seconds() / 10 * 10;
 
314
 
 
315
    counter_incore = 0;
 
316
    counter_not_incore = 0;
 
317
    counter_incore_new = 0;
 
318
    counter_not_incore_new = 0;
 
319
    counter_incore_break = 0;
 
320
  }
 
321
 
 
322
  bool is_incore = chunk->is_incore(piece.offset(), piece.length());
 
323
 
 
324
  counter_incore += !new_index && is_incore;
 
325
  counter_incore_new += new_index && is_incore;
 
326
  counter_not_incore += !new_index && !is_incore;
 
327
  counter_not_incore_new += new_index && !is_incore;
 
328
 
 
329
  counter_incore_break += continous && !is_incore;
 
330
  continous = is_incore;
 
331
#endif
 
332
}
 
333
 
 
334
void
 
335
PeerConnectionBase::load_up_chunk() {
 
336
  if (m_upChunk.is_valid() && m_upChunk.index() == m_upPiece.index()) {
 
337
    // Better checking needed.
 
338
    //     m_upChunk.chunk()->preload(m_upPiece.offset(), m_upChunk.chunk()->size());
 
339
    log_upload_chunk_mincore(m_upChunk.chunk(), m_upPiece, false, m_incoreContinous);
 
340
    return;
 
341
  }
 
342
 
 
343
  up_chunk_release();
 
344
  
 
345
  m_upChunk = m_download->chunk_list()->get(m_upPiece.index(), false);
 
346
  
 
347
  if (!m_upChunk.is_valid())
 
348
    throw storage_error("File chunk read error: " + std::string(m_upChunk.error_number().c_str()));
 
349
 
 
350
  if (is_encrypted() && m_encryptBuffer == NULL) {
 
351
    m_encryptBuffer = new EncryptBuffer();
 
352
    m_encryptBuffer->reset();
 
353
  }
 
354
 
 
355
  m_incoreContinous = false;
 
356
  log_upload_chunk_mincore(m_upChunk.chunk(), m_upPiece, true, m_incoreContinous);
 
357
  m_incoreContinous = true;
 
358
 
 
359
  // Also check if we've already preloaded in the recent past, even
 
360
  // past unmaps.
 
361
  ChunkManager* cm = manager->chunk_manager();
 
362
  uint32_t preloadSize = m_upChunk.chunk()->chunk_size() - m_upPiece.offset();
 
363
 
 
364
  if (cm->preload_type() == 0 ||
 
365
      m_upChunk.object()->time_preloaded() >= cachedTime - rak::timer::from_seconds(60) ||
 
366
 
 
367
      preloadSize < cm->preload_min_size() ||
 
368
      m_peerChunks.upload_throttle()->rate()->rate() < cm->preload_required_rate() * ((preloadSize + (2 << 20) - 1) / (2 << 20))) {
 
369
    cm->inc_stats_not_preloaded();
 
370
    return;
 
371
  }
 
372
 
 
373
  cm->inc_stats_preloaded();
 
374
 
 
375
  m_upChunk.object()->set_time_preloaded(cachedTime);
 
376
  m_upChunk.chunk()->preload(m_upPiece.offset(), m_upChunk.chunk()->chunk_size(), cm->preload_type() == 1);
 
377
}
 
378
 
 
379
void
 
380
PeerConnectionBase::cancel_transfer(BlockTransfer* transfer) {
 
381
  if (!get_fd().is_valid())
 
382
    throw internal_error("PeerConnectionBase::cancel_transfer(...) !get_fd().is_valid().");
 
383
 
 
384
  // We don't send cancel messages if the transfer has already
 
385
  // started.
 
386
  if (transfer == m_downloadQueue.transfer())
 
387
    return;
 
388
 
 
389
  write_insert_poll_safe();
 
390
 
 
391
  m_peerChunks.cancel_queue()->push_back(transfer->piece());
 
392
//   m_downloadQueue.cancel_transfer(transfer);
 
393
}
 
394
 
 
395
void
 
396
PeerConnectionBase::event_error() {
 
397
  m_download->connection_list()->erase(this, 0);
 
398
}
 
399
 
 
400
bool
 
401
PeerConnectionBase::down_chunk_start(const Piece& piece) {
 
402
  if (!download_queue()->downloading(piece)) {
 
403
    if (piece.length() == 0)
 
404
      m_download->info()->signal_network_log().emit("Received piece with length zero.");
 
405
 
 
406
    return false;
 
407
  }
 
408
 
 
409
  if (!m_download->file_list()->is_valid_piece(piece))
 
410
    throw internal_error("Incoming pieces list contains a bad piece.");
 
411
  
 
412
  if (!m_downChunk.is_valid() || piece.index() != m_downChunk.index()) {
 
413
    down_chunk_release();
 
414
    m_downChunk = m_download->chunk_list()->get(piece.index(), true);
 
415
  
 
416
    if (!m_downChunk.is_valid())
 
417
      throw storage_error("File chunk write error: " + std::string(m_downChunk.error_number().c_str()) + ".");
 
418
  }
 
419
 
 
420
  return m_downloadQueue.transfer()->is_leader();
 
421
}
 
422
 
 
423
void
 
424
PeerConnectionBase::down_chunk_finished() {
 
425
  if (!download_queue()->transfer()->is_finished())
 
426
    throw internal_error("PeerConnectionBase::down_chunk_finished() Transfer not finished.");
 
427
 
 
428
  if (download_queue()->transfer()->is_leader()) {
 
429
    if (!m_downChunk.is_valid())
 
430
      throw internal_error("PeerConnectionBase::down_chunk_finished() Transfer is the leader, but no chunk allocated.");
 
431
 
 
432
    download_queue()->finished();
 
433
    m_downChunk.object()->set_time_modified(cachedTime);
 
434
 
 
435
  } else {
 
436
    download_queue()->skipped();
 
437
  }
 
438
        
 
439
  if (m_downStall > 0)
 
440
    m_downStall--;
 
441
        
 
442
  // TODO: clear m_down.data?
 
443
 
 
444
  // If we were choked by choke_manager but still had queued pieces,
 
445
  // then we might still be in the throttle.
 
446
  if (m_downChoke.choked() && download_queue()->queued_empty())
 
447
    m_down->throttle()->erase(m_peerChunks.download_throttle());
 
448
 
 
449
  write_insert_poll_safe();
 
450
}
 
451
 
 
452
bool
 
453
PeerConnectionBase::down_chunk() {
 
454
  if (!m_down->throttle()->is_throttled(m_peerChunks.download_throttle()))
 
455
    throw internal_error("PeerConnectionBase::down_chunk() tried to read a piece but is not in throttle list");
 
456
 
 
457
  if (!m_downChunk.chunk()->is_writable())
 
458
    throw internal_error("PeerConnectionBase::down_part() chunk not writable, permission denided");
 
459
 
 
460
  uint32_t quota = m_down->throttle()->node_quota(m_peerChunks.download_throttle());
 
461
 
 
462
  if (quota == 0) {
 
463
    manager->poll()->remove_read(this);
 
464
    m_down->throttle()->node_deactivate(m_peerChunks.download_throttle());
 
465
    return false;
 
466
  }
 
467
 
 
468
  uint32_t bytesTransfered = 0;
 
469
  BlockTransfer* transfer = m_downloadQueue.transfer();
 
470
 
 
471
  Chunk::data_type data;
 
472
  ChunkIterator itr(m_downChunk.chunk(),
 
473
                    transfer->piece().offset() + transfer->position(),
 
474
                    transfer->piece().offset() + std::min(transfer->position() + quota, transfer->piece().length()));
 
475
 
 
476
  do {
 
477
    data = itr.data();
 
478
    data.second = read_stream_throws(data.first, data.second);
 
479
 
 
480
    if (is_encrypted())
 
481
      m_encryption.decrypt(data.first, data.second);
 
482
 
 
483
    bytesTransfered += data.second;
 
484
 
 
485
  } while (data.second != 0 && itr.forward(data.second));
 
486
 
 
487
  transfer->adjust_position(bytesTransfered);
 
488
 
 
489
  m_down->throttle()->node_used(m_peerChunks.download_throttle(), bytesTransfered);
 
490
  m_download->info()->mutable_down_rate()->insert(bytesTransfered);
 
491
 
 
492
  return transfer->is_finished();
 
493
}
 
494
 
 
495
bool
 
496
PeerConnectionBase::down_chunk_from_buffer() {
 
497
  m_down->buffer()->consume(down_chunk_process(m_down->buffer()->position(), m_down->buffer()->remaining()));
 
498
 
 
499
  if (!m_downloadQueue.transfer()->is_finished() && m_down->buffer()->remaining() != 0)
 
500
    throw internal_error("PeerConnectionBase::down_chunk_from_buffer() !transfer->is_finished() && m_down->buffer()->remaining() != 0.");
 
501
 
 
502
  return m_downloadQueue.transfer()->is_finished();
 
503
}  
 
504
 
 
505
// When this transfer again becomes the leader, we just return false
 
506
// and wait for the next polling. It is an exceptional case so we
 
507
// don't really care that much about performance.
 
508
bool
 
509
PeerConnectionBase::down_chunk_skip() {
 
510
  ThrottleList* throttle = m_down->throttle();
 
511
 
 
512
  if (!throttle->is_throttled(m_peerChunks.download_throttle()))
 
513
    throw internal_error("PeerConnectionBase::down_chunk_skip() tried to read a piece but is not in throttle list");
 
514
 
 
515
  uint32_t quota = throttle->node_quota(m_peerChunks.download_throttle());
 
516
 
 
517
  if (quota == 0) {
 
518
    manager->poll()->remove_read(this);
 
519
    throttle->node_deactivate(m_peerChunks.download_throttle());
 
520
    return false;
 
521
  }
 
522
 
 
523
  uint32_t length = read_stream_throws(m_nullBuffer, std::min(quota, m_downloadQueue.transfer()->piece().length() - m_downloadQueue.transfer()->position()));
 
524
  throttle->node_used(m_peerChunks.download_throttle(), length);
 
525
 
 
526
  if (is_encrypted())
 
527
    m_encryption.decrypt(m_nullBuffer, length);
 
528
 
 
529
  if (down_chunk_skip_process(m_nullBuffer, length) != length)
 
530
    throw internal_error("PeerConnectionBase::down_chunk_skip() down_chunk_skip_process(m_nullBuffer, length) != length.");
 
531
 
 
532
  return m_downloadQueue.transfer()->is_finished();
 
533
}
 
534
 
 
535
bool
 
536
PeerConnectionBase::down_chunk_skip_from_buffer() {
 
537
  m_down->buffer()->consume(down_chunk_skip_process(m_down->buffer()->position(), m_down->buffer()->remaining()));
 
538
  
 
539
  return m_downloadQueue.transfer()->is_finished();
 
540
}
 
541
 
 
542
// Process data from a leading transfer.
 
543
uint32_t
 
544
PeerConnectionBase::down_chunk_process(const void* buffer, uint32_t length) {
 
545
  if (!m_downChunk.is_valid() || m_downChunk.index() != m_downloadQueue.transfer()->index())
 
546
    throw internal_error("PeerConnectionBase::down_chunk_process(...) !m_downChunk.is_valid() || m_downChunk.index() != m_downloadQueue.transfer()->index().");
 
547
 
 
548
  if (length == 0)
 
549
    return length;
 
550
 
 
551
  BlockTransfer* transfer = m_downloadQueue.transfer();
 
552
 
 
553
  length = std::min(transfer->piece().length() - transfer->position(), length);
 
554
 
 
555
  m_downChunk.chunk()->from_buffer(buffer, transfer->piece().offset() + transfer->position(), length);
 
556
 
 
557
  transfer->adjust_position(length);
 
558
 
 
559
  m_down->throttle()->node_used(m_peerChunks.download_throttle(), length);
 
560
  m_download->info()->mutable_down_rate()->insert(length);
 
561
 
 
562
  return length;
 
563
}
 
564
 
 
565
// Process data from non-leading transfer. If this transfer encounters
 
566
// mismatching data with the leader then bork this transfer. If we get
 
567
// ahead of the leader, we switch the leader.
 
568
uint32_t
 
569
PeerConnectionBase::down_chunk_skip_process(const void* buffer, uint32_t length) {
 
570
  BlockTransfer* transfer = m_downloadQueue.transfer();
 
571
 
 
572
  // Adjust 'length' to be less than or equal to what is remaining of
 
573
  // the block to simplify the rest of the function.
 
574
  length = std::min(length, transfer->piece().length() - transfer->position());
 
575
 
 
576
  // Hmm, this might result in more bytes than nessesary being
 
577
  // counted.
 
578
  m_down->throttle()->node_used(m_peerChunks.download_throttle(), length);
 
579
  m_download->info()->mutable_down_rate()->insert(length);
 
580
  m_download->info()->mutable_skip_rate()->insert(length);
 
581
 
 
582
  if (!transfer->is_valid()) {
 
583
    transfer->adjust_position(length);
 
584
    return length;
 
585
  }
 
586
 
 
587
  if (!transfer->block()->is_transfering())
 
588
    throw internal_error("PeerConnectionBase::down_chunk_skip_process(...) block is not transfering, yet we have non-leaders.");
 
589
 
 
590
  // Temporary test.
 
591
  if (transfer->position() > transfer->block()->leader()->position())
 
592
    throw internal_error("PeerConnectionBase::down_chunk_skip_process(...) transfer is past the Block's position.");
 
593
 
 
594
  // If the transfer is valid, compare the downloaded data to the
 
595
  // leader.
 
596
  uint32_t compareLength = std::min(length, transfer->block()->leader()->position() - transfer->position());
 
597
 
 
598
  // The data doesn't match with what has previously been downloaded,
 
599
  // bork this transfer.
 
600
  if (!m_downChunk.chunk()->compare_buffer(buffer, transfer->piece().offset() + transfer->position(), compareLength)) {
 
601
    m_download->info()->signal_network_log().emit("Data does not match what was previously downloaded.");
 
602
    
 
603
    m_downloadQueue.transfer_dissimilar();
 
604
    m_downloadQueue.transfer()->adjust_position(length);
 
605
 
 
606
    return length;
 
607
  }
 
608
 
 
609
  transfer->adjust_position(compareLength);
 
610
 
 
611
  if (compareLength == length)
 
612
    return length;
 
613
 
 
614
  // Add another check here to see if we really want to be the new
 
615
  // leader.
 
616
 
 
617
  transfer->block()->change_leader(transfer);
 
618
 
 
619
  if (down_chunk_process(static_cast<const char*>(buffer) + compareLength, length - compareLength) != length - compareLength)
 
620
    throw internal_error("PeerConnectionBase::down_chunk_skip_process(...) down_chunk_process(...) returned wrong value.");
 
621
  
 
622
  return length;
 
623
}
 
624
 
 
625
bool
 
626
PeerConnectionBase::down_extension() {
 
627
  if (m_down->buffer()->remaining()) {
 
628
    uint32_t need = std::min(m_extensions->read_need(), (uint32_t)m_down->buffer()->remaining());
 
629
    std::memcpy(m_extensions->read_position(), m_down->buffer()->position(), need);
 
630
 
 
631
    m_extensions->read_move(need);
 
632
    m_down->buffer()->consume(need);
 
633
  }
 
634
 
 
635
  if (!m_extensions->is_complete()) {
 
636
    uint32_t bytes = read_stream_throws(m_extensions->read_position(), m_extensions->read_need());
 
637
    m_down->throttle()->node_used_unthrottled(bytes);
 
638
    
 
639
    if (is_encrypted())
 
640
      m_encryption.decrypt(m_extensions->read_position(), bytes);
 
641
 
 
642
    m_extensions->read_move(bytes);
 
643
  }
 
644
 
 
645
  // If extension can't be processed yet (due to a pending write),
 
646
  // disable reads until the pending message is completely sent.
 
647
  if (m_extensions->is_complete() && !m_extensions->is_invalid() && !m_extensions->read_done()) {
 
648
    manager->poll()->remove_read(this);
 
649
    return false;
 
650
  }
 
651
 
 
652
  return m_extensions->is_complete();
 
653
}
 
654
 
 
655
inline uint32_t
 
656
PeerConnectionBase::up_chunk_encrypt(uint32_t quota) {
 
657
  if (m_encryptBuffer == NULL)
 
658
    throw internal_error("PeerConnectionBase::up_chunk: m_encryptBuffer is NULL.");
 
659
 
 
660
  if (quota <= m_encryptBuffer->remaining())
 
661
    return quota;
 
662
 
 
663
  // Also, consider checking here if the number of bytes remaining in
 
664
  // the buffer is small enought that the cost of moving them would
 
665
  // outweigh the extra context switches, etc.
 
666
 
 
667
  if (m_encryptBuffer->remaining() == 0) {
 
668
    // This handles reset also for new chunk transfers.
 
669
    m_encryptBuffer->reset();
 
670
 
 
671
    quota = std::min<uint32_t>(quota, m_encryptBuffer->reserved());
 
672
 
 
673
  } else {
 
674
    quota = std::min<uint32_t>(quota - m_encryptBuffer->remaining(), m_encryptBuffer->reserved_left());
 
675
  }
 
676
 
 
677
  m_upChunk.chunk()->to_buffer(m_encryptBuffer->end(), m_upPiece.offset() + m_encryptBuffer->remaining(), quota);
 
678
  m_encryption.encrypt(m_encryptBuffer->end(), quota);
 
679
  m_encryptBuffer->move_end(quota);
 
680
 
 
681
  return m_encryptBuffer->remaining();
 
682
}
 
683
 
 
684
bool
 
685
PeerConnectionBase::up_chunk() {
 
686
  if (!m_up->throttle()->is_throttled(m_peerChunks.upload_throttle()))
 
687
    throw internal_error("PeerConnectionBase::up_chunk() tried to write a piece but is not in throttle list");
 
688
 
 
689
  if (!m_upChunk.chunk()->is_readable())
 
690
    throw internal_error("ProtocolChunk::write_part() chunk not readable, permission denided");
 
691
 
 
692
  uint32_t quota = m_up->throttle()->node_quota(m_peerChunks.upload_throttle());
 
693
 
 
694
  if (quota == 0) {
 
695
    manager->poll()->remove_write(this);
 
696
    m_up->throttle()->node_deactivate(m_peerChunks.upload_throttle());
 
697
    return false;
 
698
  }
 
699
 
 
700
  uint32_t bytesTransfered = 0;
 
701
 
 
702
  if (is_encrypted()) {
 
703
    // Prepare as many bytes as quota specifies, up to end of piece or
 
704
    // buffer. Only bytes beyond remaining() are new and will be
 
705
    // encrypted.
 
706
    quota = up_chunk_encrypt(std::min(quota, m_upPiece.length()));
 
707
 
 
708
    bytesTransfered = write_stream_throws(m_encryptBuffer->position(), quota);
 
709
    m_encryptBuffer->consume(bytesTransfered);
 
710
 
 
711
  } else {
 
712
    Chunk::data_type data;
 
713
    ChunkIterator itr(m_upChunk.chunk(), m_upPiece.offset(), m_upPiece.offset() + std::min(quota, m_upPiece.length()));
 
714
 
 
715
    do {
 
716
      data = itr.data();
 
717
      data.second = write_stream_throws(data.first, data.second);
 
718
 
 
719
      bytesTransfered += data.second;
 
720
 
 
721
    } while (data.second != 0 && itr.forward(data.second));
 
722
  }
 
723
 
 
724
  m_up->throttle()->node_used(m_peerChunks.upload_throttle(), bytesTransfered);
 
725
  m_download->info()->mutable_up_rate()->insert(bytesTransfered);
 
726
 
 
727
  // Just modifying the piece to cover the remaining data ends up
 
728
  // being much cleaner and we avoid an unnessesary position variable.
 
729
  m_upPiece.set_offset(m_upPiece.offset() + bytesTransfered);
 
730
  m_upPiece.set_length(m_upPiece.length() - bytesTransfered);
 
731
 
 
732
  return m_upPiece.length() == 0;
 
733
}
 
734
 
 
735
bool
 
736
PeerConnectionBase::up_extension() {
 
737
  if (m_extensionOffset == extension_must_encrypt) {
 
738
    if (m_extensionMessage.owned()) {
 
739
      m_encryption.encrypt(m_extensionMessage.data(), m_extensionMessage.length());
 
740
 
 
741
    } else {
 
742
      char* buffer = new char[m_extensionMessage.length()];
 
743
 
 
744
      m_encryption.encrypt(m_extensionMessage.data(), buffer, m_extensionMessage.length());
 
745
      m_extensionMessage.set(buffer, buffer + m_extensionMessage.length(), true);
 
746
    }
 
747
 
 
748
    m_extensionOffset = 0;
 
749
  }
 
750
 
 
751
  if (m_extensionOffset >= m_extensionMessage.length())
 
752
    throw internal_error("PeerConnectionBase::up_extension bad offset.");
 
753
 
 
754
  uint32_t written = write_stream_throws(m_extensionMessage.data() + m_extensionOffset, m_extensionMessage.length() - m_extensionOffset);
 
755
  m_up->throttle()->node_used_unthrottled(written);
 
756
  m_extensionOffset += written;
 
757
 
 
758
  if (m_extensionOffset < m_extensionMessage.length())
 
759
    return false;
 
760
 
 
761
  m_extensionMessage.clear();
 
762
 
 
763
  // If we have an unprocessed message, process it now and enable reads again.
 
764
  if (m_extensions->is_complete() && !m_extensions->is_invalid()) {
 
765
    // DEBUG: What, this should fail when we block, no?
 
766
    if (!m_extensions->read_done())
 
767
      throw internal_error("PeerConnectionBase::up_extension could not process complete extension message.");
 
768
 
 
769
    manager->poll()->insert_read(this);
 
770
  }
 
771
 
 
772
  return true;
 
773
}
 
774
 
 
775
void
 
776
PeerConnectionBase::down_chunk_release() {
 
777
  if (m_downChunk.is_valid())
 
778
    m_download->chunk_list()->release(&m_downChunk);
 
779
}
 
780
 
 
781
void
 
782
PeerConnectionBase::up_chunk_release() {
 
783
  if (m_upChunk.is_valid())
 
784
    m_download->chunk_list()->release(&m_upChunk);
 
785
}
 
786
 
 
787
void
 
788
PeerConnectionBase::read_request_piece(const Piece& p) {
 
789
  PeerChunks::piece_list_type::iterator itr = std::find(m_peerChunks.upload_queue()->begin(), m_peerChunks.upload_queue()->end(), p);
 
790
  
 
791
  if (m_upChoke.choked() || itr != m_peerChunks.upload_queue()->end() || p.length() > (1 << 17))
 
792
    return;
 
793
 
 
794
  m_peerChunks.upload_queue()->push_back(p);
 
795
  write_insert_poll_safe();
 
796
}
 
797
 
 
798
void
 
799
PeerConnectionBase::read_cancel_piece(const Piece& p) {
 
800
  PeerChunks::piece_list_type::iterator itr = std::find(m_peerChunks.upload_queue()->begin(), m_peerChunks.upload_queue()->end(), p);
 
801
  
 
802
  if (itr != m_peerChunks.upload_queue()->end())
 
803
    m_peerChunks.upload_queue()->erase(itr);
 
804
}  
 
805
 
 
806
void
 
807
PeerConnectionBase::write_prepare_piece() {
 
808
  m_upPiece = m_peerChunks.upload_queue()->front();
 
809
  m_peerChunks.upload_queue()->pop_front();
 
810
 
 
811
  // Move these checks somewhere else?
 
812
  if (!m_download->file_list()->is_valid_piece(m_upPiece) ||
 
813
      !m_download->file_list()->bitfield()->get(m_upPiece.index())) {
 
814
    char buffer[128];
 
815
    snprintf(buffer, 128, "Peer requested an invalid piece: %u %u %u", m_upPiece.index(), m_upPiece.length(), m_upPiece.offset());
 
816
 
 
817
    throw communication_error(buffer);
 
818
  }
 
819
  
 
820
  m_up->write_piece(m_upPiece);
 
821
}
 
822
 
 
823
void
 
824
PeerConnectionBase::write_prepare_extension(int type, const DataBuffer& message) {
 
825
  m_up->write_extension(m_extensions->id(type), message.length());
 
826
 
 
827
  m_extensionOffset = 0;
 
828
  m_extensionMessage = message;
 
829
 
 
830
  // Need to encrypt the buffer, but not until the m_up
 
831
  // write buffer has been flushed, so flag it for now.
 
832
  if (is_encrypted())
 
833
    m_extensionOffset = extension_must_encrypt;
 
834
}
 
835
 
 
836
// High stall count peers should request if we're *not* in endgame, or
 
837
// if we're in endgame and the download is too slow. Prefere not to request
 
838
// from high stall counts when we are doing decent speeds.
 
839
bool
 
840
PeerConnectionBase::should_request() {
 
841
  if (m_downChoke.choked() || !m_downInterested || !m_downUnchoked)
 
842
    // || m_down->get_state() == ProtocolRead::READ_SKIP_PIECE)
 
843
    return false;
 
844
 
 
845
  else if (!m_download->delegator()->get_aggressive())
 
846
    return true;
 
847
 
 
848
  else
 
849
    // We check if the peer is stalled, if it is not then we should
 
850
    // request. If the peer is stalled then we only request if the
 
851
    // download rate is below a certain value.
 
852
    return m_downStall <= 1 || m_download->info()->down_rate()->rate() < (10 << 10);
 
853
}
 
854
 
 
855
bool
 
856
PeerConnectionBase::try_request_pieces() {
 
857
  if (download_queue()->queued_empty())
 
858
    m_downStall = 0;
 
859
 
 
860
  uint32_t pipeSize = download_queue()->calculate_pipe_size(m_peerChunks.download_throttle()->rate()->rate());
 
861
 
 
862
  // Don't start requesting if we can't do it in large enough chunks.
 
863
  if (download_queue()->queued_size() >= (pipeSize + 10) / 2)
 
864
    return false;
 
865
 
 
866
  bool success = false;
 
867
 
 
868
  while (download_queue()->queued_size() < pipeSize && m_up->can_write_request()) {
 
869
 
 
870
    // Delegator should return a vector of pieces, and it should be
 
871
    // passed the number of pieces it should delegate. Try to ensure
 
872
    // it receives large enough request to fill a whole chunk if the
 
873
    // peer is fast enough.
 
874
 
 
875
    const Piece* p = download_queue()->delegate();
 
876
 
 
877
    if (p == NULL)
 
878
      break;
 
879
 
 
880
    if (!m_download->file_list()->is_valid_piece(*p) || !m_peerChunks.bitfield()->get(p->index()))
 
881
      throw internal_error("PeerConnectionBase::try_request_pieces() tried to use an invalid piece.");
 
882
 
 
883
    m_up->write_request(*p);
 
884
 
 
885
    success = true;
 
886
  }
 
887
 
 
888
  return success;
 
889
}
 
890
 
 
891
// Send one peer exchange message according to bits set in m_sendPEXMask.
 
892
// We can only send one message at a time, because the derived class
 
893
// needs to flush the buffer and call up_extension before the next one.
 
894
bool
 
895
PeerConnectionBase::send_pex_message() {
 
896
  if (!m_extensions->is_remote_supported(ProtocolExtension::UT_PEX)) {
 
897
    m_sendPEXMask = 0;
 
898
    return false;
 
899
  }
 
900
 
 
901
  // Message to tell peer to stop/start doing PEX is small so send it first.
 
902
  if (m_sendPEXMask & (PEX_ENABLE | PEX_DISABLE)) {
 
903
    if (!m_extensions->is_remote_supported(ProtocolExtension::UT_PEX))
 
904
      throw internal_error("PeerConnectionBase::send_pex_message() Not supported by peer.");
 
905
 
 
906
    write_prepare_extension(ProtocolExtension::HANDSHAKE,
 
907
                            ProtocolExtension::generate_toggle_message(ProtocolExtension::UT_PEX, (m_sendPEXMask & PEX_ENABLE) != 0));
 
908
 
 
909
    m_sendPEXMask &= ~(PEX_ENABLE | PEX_DISABLE);
 
910
 
 
911
  } else if (m_sendPEXMask & PEX_DO && m_extensions->id(ProtocolExtension::UT_PEX)) {
 
912
    const DataBuffer& pexMessage = m_download->get_ut_pex(m_extensions->is_initial_pex());
 
913
    m_extensions->clear_initial_pex();
 
914
 
 
915
    m_sendPEXMask &= ~PEX_DO;
 
916
 
 
917
    if (pexMessage.empty())
 
918
      return false;
 
919
 
 
920
    write_prepare_extension(ProtocolExtension::UT_PEX, pexMessage);
 
921
 
 
922
  } else {
 
923
    m_sendPEXMask = 0;
 
924
  }
 
925
 
 
926
  return true;
 
927
}
 
928
 
 
929
// Extension protocol needs to send a reply.
 
930
bool
 
931
PeerConnectionBase::send_ext_message() {
 
932
  write_prepare_extension(m_extensions->pending_message_type(), m_extensions->pending_message_data());
 
933
  m_extensions->clear_pending_message();
 
934
  return true;
 
935
}
 
936
 
 
937
void
 
938
PeerConnectionBase::receive_metadata_piece(uint32_t piece, const char* data, uint32_t length) {
 
939
}
 
940
 
 
941
}