~ubuntu-branches/debian/wheezy/libtorrent/wheezy

« back to all changes in this revision

Viewing changes to src/dht/dht_server.cc

  • Committer: Bazaar Package Importer
  • Author(s): Rogério Brito
  • Date: 2011-03-09 20:04:41 UTC
  • mfrom: (1.3.4 upstream) (7.1.5 sid)
  • Revision ID: james.westby@ubuntu.com-20110309200441-ffi9aaqdyd72d8xl
Tags: 0.12.7-4
* Steal patch from upstream's bug tracking system to enable IPv6.
* Refresh patches to apply cleanly.
* This should, hopefully, be a good step towards addressing #490277.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// libTorrent - BitTorrent library
 
2
// Copyright (C) 2005-2007, Jari Sundell
 
3
//
 
4
// This program is free software; you can redistribute it and/or modify
 
5
// it under the terms of the GNU General Public License as published by
 
6
// the Free Software Foundation; either version 2 of the License, or
 
7
// (at your option) any later version.
 
8
// 
 
9
// This program is distributed in the hope that it will be useful,
 
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
// GNU General Public License for more details.
 
13
// 
 
14
// You should have received a copy of the GNU General Public License
 
15
// along with this program; if not, write to the Free Software
 
16
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
17
//
 
18
// In addition, as a special exception, the copyright holders give
 
19
// permission to link the code of portions of this program with the
 
20
// OpenSSL library under certain conditions as described in each
 
21
// individual source file, and distribute linked combinations
 
22
// including the two.
 
23
//
 
24
// You must obey the GNU General Public License in all respects for
 
25
// all of the code used other than OpenSSL.  If you modify file(s)
 
26
// with this exception, you may extend this exception to your version
 
27
// of the file(s), but you are not obligated to do so.  If you do not
 
28
// wish to do so, delete this exception statement from your version.
 
29
// If you delete this exception statement from all source files in the
 
30
// program, then also delete it here.
 
31
//
 
32
// Contact:  Jari Sundell <jaris@ifi.uio.no>
 
33
//
 
34
//           Skomakerveien 33
 
35
//           3185 Skoppum, NORWAY
 
36
 
 
37
#include "config.h"
 
38
#include "globals.h"
 
39
 
 
40
#include <algorithm>
 
41
#include <cstdio>
 
42
 
 
43
#include "torrent/exceptions.h"
 
44
#include "torrent/connection_manager.h"
 
45
#include "torrent/download_info.h"
 
46
#include "torrent/object.h"
 
47
#include "torrent/object_stream.h"
 
48
#include "torrent/poll.h"
 
49
#include "torrent/object_static_map.h"
 
50
#include "torrent/throttle.h"
 
51
#include "tracker/tracker_dht.h"
 
52
 
 
53
#include "dht_bucket.h"
 
54
#include "dht_router.h"
 
55
#include "dht_transaction.h"
 
56
 
 
57
#include "manager.h"
 
58
 
 
59
namespace torrent {
 
60
 
 
61
const char* DhtServer::queries[] = {
 
62
  "ping",
 
63
  "find_node",
 
64
  "get_peers",
 
65
  "announce_peer",
 
66
};
 
67
 
 
68
// List of all possible keys we need/support in a DHT message.
 
69
// Unsupported keys we receive are dropped (ignored) while decoding.
 
70
// See torrent/object_static_map.h for how this works.
 
71
template <>
 
72
const DhtMessage::key_list_type DhtMessage::base_type::keys = {
 
73
  { key_a_id,       "a::id*S" },
 
74
  { key_a_infoHash, "a::info_hash*S" },
 
75
  { key_a_port,     "a::port", },
 
76
  { key_a_target,   "a::target*S" },
 
77
  { key_a_token,    "a::token*S" },
 
78
 
 
79
  { key_e_0,        "e[]*" },
 
80
  { key_e_1,        "e[]*" },
 
81
 
 
82
  { key_q,          "q*S" },
 
83
 
 
84
  { key_r_id,       "r::id*S" },
 
85
  { key_r_nodes,    "r::nodes*S" },
 
86
  { key_r_token,    "r::token*S" },
 
87
  { key_r_values,   "r::values*L" },
 
88
 
 
89
  { key_t,          "t*S" },
 
90
  { key_v,          "v*" },
 
91
  { key_y,          "y*S" },
 
92
};
 
93
 
 
94
// Error in DHT protocol, avoids std::string ctor from communication_error
 
95
class dht_error : public network_error {
 
96
public:
 
97
  dht_error(int code, const char* message) : m_message(message), m_code(code) {}
 
98
 
 
99
  virtual int          code() const throw()   { return m_code; }
 
100
  virtual const char*  what() const throw()   { return m_message; }
 
101
 
 
102
private:
 
103
  const char*  m_message;
 
104
  int          m_code;
 
105
};
 
106
 
 
107
DhtServer::DhtServer(DhtRouter* router) :
 
108
  m_router(router),
 
109
 
 
110
  m_uploadNode(60),
 
111
  m_downloadNode(60),
 
112
 
 
113
  m_uploadThrottle(manager->upload_throttle()->throttle_list()),
 
114
  m_downloadThrottle(manager->download_throttle()->throttle_list()),
 
115
 
 
116
  m_networkUp(false) {
 
117
 
 
118
  get_fd().clear();
 
119
  reset_statistics();
 
120
 
 
121
  // Reserve a socket for the DHT server, even though we don't
 
122
  // actually open it until the server is started, which may not
 
123
  // happen until the first non-private torrent is started.
 
124
  manager->connection_manager()->inc_socket_count();
 
125
}
 
126
 
 
127
DhtServer::~DhtServer() {
 
128
  stop();
 
129
 
 
130
  std::for_each(m_highQueue.begin(), m_highQueue.end(), rak::call_delete<DhtTransactionPacket>());
 
131
  std::for_each(m_lowQueue.begin(), m_lowQueue.end(), rak::call_delete<DhtTransactionPacket>());
 
132
 
 
133
  manager->connection_manager()->dec_socket_count();
 
134
}
 
135
 
 
136
void
 
137
DhtServer::start(int port) {
 
138
  try {
 
139
    if (!get_fd().open_datagram() || !get_fd().set_nonblock())
 
140
      throw resource_error("Could not allocate datagram socket.");
 
141
 
 
142
    if (!get_fd().set_reuse_address(true))
 
143
      throw resource_error("Could not set listening port to reuse address.");
 
144
 
 
145
    rak::socket_address sa = *m_router->address();
 
146
    sa.set_port(port);
 
147
 
 
148
    if (!get_fd().bind(sa))
 
149
      throw resource_error("Could not bind datagram socket.");
 
150
 
 
151
  } catch (torrent::base_error& e) {
 
152
    get_fd().close();
 
153
    get_fd().clear();
 
154
    throw;
 
155
  }
 
156
 
 
157
  m_taskTimeout.set_slot(rak::mem_fn(this, &DhtServer::receive_timeout));
 
158
 
 
159
  m_uploadNode.set_list_iterator(m_uploadThrottle->end());
 
160
  m_uploadNode.slot_activate(rak::make_mem_fun(static_cast<SocketBase*>(this), &SocketBase::receive_throttle_up_activate));
 
161
 
 
162
  m_downloadNode.set_list_iterator(m_downloadThrottle->end());
 
163
  m_downloadThrottle->insert(&m_downloadNode);
 
164
 
 
165
  manager->poll()->open(this);
 
166
  manager->poll()->insert_read(this);
 
167
  manager->poll()->insert_error(this);
 
168
}
 
169
 
 
170
void
 
171
DhtServer::stop() {
 
172
  if (!is_active())
 
173
    return;
 
174
 
 
175
  clear_transactions();
 
176
 
 
177
  priority_queue_erase(&taskScheduler, &m_taskTimeout);
 
178
 
 
179
  m_uploadThrottle->erase(&m_uploadNode);
 
180
  m_downloadThrottle->erase(&m_downloadNode);
 
181
 
 
182
  manager->poll()->remove_read(this);
 
183
  manager->poll()->remove_write(this);
 
184
  manager->poll()->remove_error(this);
 
185
  manager->poll()->close(this);
 
186
 
 
187
  get_fd().close();
 
188
  get_fd().clear();
 
189
 
 
190
  m_networkUp = false;
 
191
}
 
192
 
 
193
void
 
194
DhtServer::reset_statistics() { 
 
195
  m_queriesReceived = 0;
 
196
  m_queriesSent = 0;
 
197
  m_repliesReceived = 0;
 
198
  m_errorsReceived = 0;
 
199
  m_errorsCaught = 0;
 
200
 
 
201
  m_uploadNode.rate()->set_total(0);
 
202
  m_downloadNode.rate()->set_total(0);
 
203
}
 
204
 
 
205
// Ping a node whose ID we know.
 
206
void
 
207
DhtServer::ping(const HashString& id, const rak::socket_address* sa) {
 
208
  // No point pinging a node that we're already contacting otherwise.
 
209
  transaction_itr itr = m_transactions.lower_bound(DhtTransaction::key(sa, 0));
 
210
  if (itr == m_transactions.end() || !DhtTransaction::key_match(itr->first, sa))
 
211
    add_transaction(new DhtTransactionPing(id, sa), packet_prio_low);
 
212
}
 
213
 
 
214
// Contact nodes in given bucket and ask for their nodes closest to target.
 
215
void
 
216
DhtServer::find_node(const DhtBucket& contacts, const HashString& target) {
 
217
  DhtSearch* search = new DhtSearch(target, contacts);
 
218
 
 
219
  DhtSearch::const_accessor n;
 
220
  while ((n = search->get_contact()) != search->end())
 
221
    add_transaction(new DhtTransactionFindNode(n), packet_prio_low);
 
222
 
 
223
  // This shouldn't happen, it means we had no contactable nodes at all.
 
224
  if (!search->start())
 
225
    delete search;
 
226
}
 
227
 
 
228
void
 
229
DhtServer::announce(const DhtBucket& contacts, const HashString& infoHash, TrackerDht* tracker) {
 
230
  DhtAnnounce* announce = new DhtAnnounce(infoHash, tracker, contacts);
 
231
 
 
232
  DhtSearch::const_accessor n;
 
233
  while ((n = announce->get_contact()) != announce->end())
 
234
    add_transaction(new DhtTransactionFindNode(n), packet_prio_high);
 
235
 
 
236
  // This can only happen if all nodes we know are bad.
 
237
  if (!announce->start())
 
238
    delete announce;
 
239
  else
 
240
    announce->update_status();
 
241
}
 
242
 
 
243
void
 
244
DhtServer::cancel_announce(DownloadInfo* info, const TrackerDht* tracker) {
 
245
  transaction_itr itr = m_transactions.begin();
 
246
 
 
247
  while (itr != m_transactions.end()) {
 
248
    if (itr->second->is_search() && itr->second->as_search()->search()->is_announce()) {
 
249
      DhtAnnounce* announce = static_cast<DhtAnnounce*>(itr->second->as_search()->search());
 
250
 
 
251
      if ((info == NULL || announce->target() == info->hash()) && (tracker == NULL || announce->tracker() == tracker)) {
 
252
        delete itr->second;
 
253
        m_transactions.erase(itr++);
 
254
        continue;
 
255
      }
 
256
    }
 
257
 
 
258
    ++itr;
 
259
  }
 
260
}
 
261
 
 
262
void
 
263
DhtServer::update() {
 
264
  // Reset this every 15 minutes. It'll get set back to true if we receive
 
265
  // any valid packets. This allows detecting when the entire network goes
 
266
  // down, and prevents all nodes from getting removed as unresponsive.
 
267
  m_networkUp = false;
 
268
}
 
269
 
 
270
void
 
271
DhtServer::process_query(const HashString& id, const rak::socket_address* sa, const DhtMessage& msg) {
 
272
  m_queriesReceived++;
 
273
  m_networkUp = true;
 
274
 
 
275
  raw_string query = msg[key_q].as_raw_string();
 
276
 
 
277
  // Construct reply.
 
278
  DhtMessage reply;
 
279
 
 
280
  if (query == raw_string::from_c_str("find_node"))
 
281
    create_find_node_response(msg, reply);
 
282
 
 
283
  else if (query == raw_string::from_c_str("get_peers"))
 
284
    create_get_peers_response(msg, sa, reply);
 
285
 
 
286
  else if (query == raw_string::from_c_str("announce_peer"))
 
287
    create_announce_peer_response(msg, sa, reply);
 
288
 
 
289
  else if (query != raw_string::from_c_str("ping"))
 
290
    throw dht_error(dht_error_bad_method, "Unknown query type.");
 
291
 
 
292
  m_router->node_queried(id, sa);
 
293
  create_response(msg, sa, reply);
 
294
}
 
295
 
 
296
void
 
297
DhtServer::create_find_node_response(const DhtMessage& req, DhtMessage& reply) {
 
298
  raw_string target = req[key_a_target].as_raw_string();
 
299
 
 
300
  if (target.size() < HashString::size_data)
 
301
    throw dht_error(dht_error_protocol, "target string too short");
 
302
 
 
303
  reply[key_r_nodes] = m_router->get_closest_nodes(*HashString::cast_from(target.data()));
 
304
 
 
305
  if (reply[key_r_nodes].as_raw_string().empty())
 
306
    throw dht_error(dht_error_generic, "No nodes");
 
307
}
 
308
 
 
309
void
 
310
DhtServer::create_get_peers_response(const DhtMessage& req, const rak::socket_address* sa, DhtMessage& reply) {
 
311
  reply[key_r_token] = m_router->make_token(sa, reply.data_end);
 
312
  reply.data_end += reply[key_r_token].as_raw_string().size();
 
313
 
 
314
  raw_string info_hash_str = req[key_a_infoHash].as_raw_string();
 
315
 
 
316
  if (info_hash_str.size() < HashString::size_data)
 
317
    throw dht_error(dht_error_protocol, "info hash too short");
 
318
 
 
319
  const HashString* info_hash = HashString::cast_from(info_hash_str.data());
 
320
 
 
321
  DhtTracker* tracker = m_router->get_tracker(*info_hash, false);
 
322
 
 
323
  // If we're not tracking or have no peers, send closest nodes.
 
324
  if (!tracker || tracker->empty()) {
 
325
    raw_string nodes = m_router->get_closest_nodes(*info_hash);
 
326
 
 
327
    if (nodes.empty())
 
328
      throw dht_error(dht_error_generic, "No peers nor nodes");
 
329
 
 
330
    reply[key_r_nodes] = nodes;
 
331
 
 
332
  } else {
 
333
    reply[key_r_values] = tracker->get_peers();
 
334
  }
 
335
}
 
336
 
 
337
void
 
338
DhtServer::create_announce_peer_response(const DhtMessage& req, const rak::socket_address* sa, DhtMessage& reply) {
 
339
  raw_string info_hash = req[key_a_infoHash].as_raw_string();
 
340
 
 
341
  if (info_hash.size() < HashString::size_data)
 
342
    throw dht_error(dht_error_protocol, "info hash too short");
 
343
 
 
344
  if (!m_router->token_valid(req[key_a_token].as_raw_string(), sa))
 
345
    throw dht_error(dht_error_protocol, "Token invalid.");
 
346
 
 
347
  DhtTracker* tracker = m_router->get_tracker(*HashString::cast_from(info_hash.data()), true);
 
348
  tracker->add_peer(sa->sa_inet()->address_n(), req[key_a_port].as_value());
 
349
}
 
350
 
 
351
void
 
352
DhtServer::process_response(const HashString& id, const rak::socket_address* sa, const DhtMessage& response) {
 
353
  int transactionId = (unsigned char)response[key_t].as_raw_string().data()[0];
 
354
  transaction_itr itr = m_transactions.find(DhtTransaction::key(sa, transactionId));
 
355
 
 
356
  // Response to a transaction we don't have in our table. At this point it's
 
357
  // impossible to tell whether it used to be a valid transaction but timed out
 
358
  // the node did not return the ID we sent it, or it returned it with a
 
359
  // different address than we sent it o. Best we can do is ignore the reply,
 
360
  // since the protocol doesn't call for returning errors in responses.
 
361
  if (itr == m_transactions.end())
 
362
    return;
 
363
 
 
364
  m_repliesReceived++;
 
365
  m_networkUp = true;
 
366
 
 
367
  // Make sure transaction is erased even if an exception is thrown.
 
368
  try {
 
369
    DhtTransaction* transaction = itr->second;
 
370
#ifdef USE_EXTRA_DEBUG
 
371
    if (DhtTransaction::key(sa, transactionId) != transaction->key(transactionId))
 
372
      throw internal_error("DhtServer::process_response key mismatch.");
 
373
#endif
 
374
 
 
375
    // If we contact a node but its ID is not the one we expect, ignore the reply
 
376
    // to prevent interference from rogue nodes.
 
377
    if ((id != transaction->id() && transaction->id() != m_router->zero_id))
 
378
      return;
 
379
 
 
380
    switch (transaction->type()) {
 
381
      case DhtTransaction::DHT_FIND_NODE:
 
382
        parse_find_node_reply(transaction->as_find_node(), response[key_r_nodes].as_raw_string());
 
383
        break;
 
384
 
 
385
      case DhtTransaction::DHT_GET_PEERS:
 
386
        parse_get_peers_reply(transaction->as_get_peers(), response);
 
387
        break;
 
388
 
 
389
      // Nothing to do for DHT_PING and DHT_ANNOUNCE_PEER
 
390
      default:
 
391
        break;
 
392
    }
 
393
 
 
394
    // Mark node responsive only if all processing was successful, without errors.
 
395
    m_router->node_replied(id, sa);
 
396
 
 
397
  } catch (std::exception& e) {
 
398
    delete itr->second;
 
399
    m_transactions.erase(itr);
 
400
 
 
401
    m_errorsCaught++;
 
402
    throw;
 
403
  }
 
404
 
 
405
  delete itr->second;
 
406
  m_transactions.erase(itr);
 
407
}
 
408
 
 
409
void
 
410
DhtServer::process_error(const rak::socket_address* sa, const DhtMessage& error) {
 
411
  int transactionId = (unsigned char)error[key_t].as_raw_string().data()[0];
 
412
  transaction_itr itr = m_transactions.find(DhtTransaction::key(sa, transactionId));
 
413
 
 
414
  if (itr == m_transactions.end())
 
415
    return;
 
416
 
 
417
  m_repliesReceived++;
 
418
  m_errorsReceived++;
 
419
  m_networkUp = true;
 
420
 
 
421
  // Don't mark node as good (because it replied) or bad (because it returned an error).
 
422
  // If it consistently returns errors for valid queries it's probably broken.  But a
 
423
  // few error messages are acceptable. So we do nothing and pretend the query never happened.
 
424
 
 
425
  delete itr->second;
 
426
  m_transactions.erase(itr);
 
427
}
 
428
 
 
429
void
 
430
DhtServer::parse_find_node_reply(DhtTransactionSearch* transaction, raw_string nodes) {
 
431
  transaction->complete(true);
 
432
 
 
433
  if (sizeof(const compact_node_info) != 26)
 
434
    throw internal_error("DhtServer::parse_find_node_reply(...) bad struct size.");
 
435
 
 
436
  node_info_list list;
 
437
  std::copy(reinterpret_cast<const compact_node_info*>(nodes.data()),
 
438
            reinterpret_cast<const compact_node_info*>(nodes.data() + nodes.size() - nodes.size() % sizeof(compact_node_info)),
 
439
            std::back_inserter(list));
 
440
 
 
441
  for (node_info_list::iterator itr = list.begin(); itr != list.end(); ++itr) {
 
442
    if (itr->id() != m_router->id()) {
 
443
      rak::socket_address sa = itr->address();
 
444
      transaction->search()->add_contact(itr->id(), &sa);
 
445
    }
 
446
  }
 
447
 
 
448
  find_node_next(transaction);
 
449
}
 
450
 
 
451
void
 
452
DhtServer::parse_get_peers_reply(DhtTransactionGetPeers* transaction, const DhtMessage& response) {
 
453
  DhtAnnounce* announce = static_cast<DhtAnnounce*>(transaction->as_search()->search());
 
454
 
 
455
  transaction->complete(true);
 
456
 
 
457
  if (response[key_r_values].is_raw_list())
 
458
    announce->receive_peers(response[key_r_values].as_raw_list());
 
459
 
 
460
  if (response[key_r_token].is_raw_string())
 
461
    add_transaction(new DhtTransactionAnnouncePeer(transaction->id(),
 
462
                                                   transaction->address(),
 
463
                                                   announce->target(),
 
464
                                                   response[key_r_token].as_raw_string()),
 
465
                    packet_prio_low);
 
466
 
 
467
  announce->update_status();
 
468
}
 
469
 
 
470
void
 
471
DhtServer::find_node_next(DhtTransactionSearch* transaction) {
 
472
  int priority = packet_prio_low;
 
473
  if (transaction->search()->is_announce())
 
474
    priority = packet_prio_high;
 
475
 
 
476
  DhtSearch::const_accessor node;
 
477
  while ((node = transaction->search()->get_contact()) != transaction->search()->end())
 
478
    add_transaction(new DhtTransactionFindNode(node), priority);
 
479
 
 
480
  if (!transaction->search()->is_announce())
 
481
    return;
 
482
 
 
483
  DhtAnnounce* announce = static_cast<DhtAnnounce*>(transaction->search());
 
484
  if (announce->complete()) {
 
485
    // We have found the 8 closest nodes to the info hash. Retrieve peers
 
486
    // from them and announce to them.
 
487
    for (node = announce->start_announce(); node != announce->end(); ++node)
 
488
      add_transaction(new DhtTransactionGetPeers(node), packet_prio_high);
 
489
  }
 
490
 
 
491
  announce->update_status();
 
492
}
 
493
 
 
494
void
 
495
DhtServer::add_packet(DhtTransactionPacket* packet, int priority) {
 
496
  switch (priority) {
 
497
    // High priority packets are for important queries, and quite small.
 
498
    // They're added to front of high priority queue and thus will be the
 
499
    // next packets sent.
 
500
    case packet_prio_high:
 
501
      m_highQueue.push_front(packet);
 
502
      break;
 
503
 
 
504
    // Low priority query packets are added to the back of the high priority
 
505
    // queue and will be sent when all high priority packets have been transmitted.
 
506
    case packet_prio_low:
 
507
      m_highQueue.push_back(packet);
 
508
      break;
 
509
 
 
510
    // Reply packets will be processed after all of our own packets have been send.
 
511
    case packet_prio_reply:
 
512
      m_lowQueue.push_back(packet);
 
513
      break;
 
514
 
 
515
    default:
 
516
      throw internal_error("DhtServer::add_packet called with invalid priority.");
 
517
  }
 
518
}
 
519
 
 
520
void
 
521
DhtServer::create_query(transaction_itr itr, int tID, const rak::socket_address* sa, int priority) {
 
522
  if (itr->second->id() == m_router->id())
 
523
    throw internal_error("DhtServer::create_query trying to send to itself.");
 
524
 
 
525
  DhtMessage query;
 
526
 
 
527
  // Transaction ID is a bencode string.
 
528
  query[key_t] = raw_bencode(query.data_end, 3);
 
529
  *query.data_end++ = '1';
 
530
  *query.data_end++ = ':';
 
531
  *query.data_end++ = tID;
 
532
 
 
533
  DhtTransaction* transaction = itr->second;
 
534
  query[key_q] = raw_string::from_c_str(queries[transaction->type()]);
 
535
  query[key_y] = raw_bencode::from_c_str("1:q");
 
536
  query[key_v] = raw_bencode("4:" PEER_VERSION, 6);
 
537
  query[key_a_id] = m_router->id_raw_string();
 
538
 
 
539
  switch (transaction->type()) {
 
540
    case DhtTransaction::DHT_PING:
 
541
      // nothing to do
 
542
      break;
 
543
 
 
544
    case DhtTransaction::DHT_FIND_NODE:
 
545
      query[key_a_target] = transaction->as_find_node()->search()->target_raw_string();
 
546
      break;
 
547
 
 
548
    case DhtTransaction::DHT_GET_PEERS:
 
549
      query[key_a_infoHash] = transaction->as_get_peers()->search()->target_raw_string();
 
550
      break;
 
551
 
 
552
    case DhtTransaction::DHT_ANNOUNCE_PEER:
 
553
      query[key_a_infoHash] = transaction->as_announce_peer()->info_hash_raw_string();
 
554
      query[key_a_token] = transaction->as_announce_peer()->token();
 
555
      query[key_a_port] = manager->connection_manager()->listen_port();
 
556
      break;
 
557
  }
 
558
 
 
559
  DhtTransactionPacket* packet = new DhtTransactionPacket(transaction->address(), query, tID, transaction);
 
560
  transaction->set_packet(packet);
 
561
  add_packet(packet, priority);
 
562
 
 
563
  m_queriesSent++;
 
564
}
 
565
 
 
566
void
 
567
DhtServer::create_response(const DhtMessage& req, const rak::socket_address* sa, DhtMessage& reply) {
 
568
  reply[key_r_id] = m_router->id_raw_string();
 
569
  reply[key_t] = req[key_t];
 
570
  reply[key_y] = raw_bencode::from_c_str("1:r");
 
571
  reply[key_v] = raw_bencode("4:" PEER_VERSION, 6);
 
572
 
 
573
  add_packet(new DhtTransactionPacket(sa, reply), packet_prio_reply);
 
574
}
 
575
 
 
576
void
 
577
DhtServer::create_error(const DhtMessage& req, const rak::socket_address* sa, int num, const char* msg) {
 
578
  DhtMessage error;
 
579
 
 
580
  if (req[key_t].is_raw_bencode() || req[key_t].is_raw_string())
 
581
    error[key_t] = req[key_t];
 
582
 
 
583
  error[key_y] = raw_bencode::from_c_str("1:e");
 
584
  error[key_v] = raw_bencode("4:" PEER_VERSION, 6);
 
585
  error[key_e_0] = num;
 
586
  error[key_e_1] = raw_string::from_c_str(msg);
 
587
 
 
588
  add_packet(new DhtTransactionPacket(sa, error), packet_prio_reply);
 
589
}
 
590
 
 
591
int
 
592
DhtServer::add_transaction(DhtTransaction* transaction, int priority) {
 
593
  // Try random transaction ID. This is to make it less likely that we reuse
 
594
  // a transaction ID from an earlier transaction which timed out and we forgot
 
595
  // about it, so that if the node replies after the timeout it's less likely
 
596
  // that we match the reply to the wrong transaction.
 
597
  //
 
598
  // If there's an existing transaction with the random ID we search for the next
 
599
  // unused one. Since normally only one or two transactions will be active per
 
600
  // node, a collision is extremely unlikely, and a linear search for the first
 
601
  // open one is the most efficient.
 
602
  unsigned int rnd = (uint8_t)random();
 
603
  unsigned int id = rnd;
 
604
 
 
605
  transaction_itr insertItr = m_transactions.lower_bound(transaction->key(rnd));
 
606
 
 
607
  // If key matches, keep trying successive IDs.
 
608
  while (insertItr != m_transactions.end() && insertItr->first == transaction->key(id)) {
 
609
    ++insertItr;
 
610
    id = (uint8_t)(id + 1);
 
611
 
 
612
    // Give up after trying all possible IDs. This should never happen.
 
613
    if (id == rnd) {
 
614
      delete transaction;
 
615
      return -1;
 
616
    }
 
617
 
 
618
    // Transaction ID wrapped around, reset iterator.
 
619
    if (id == 0)
 
620
      insertItr = m_transactions.lower_bound(transaction->key(id));
 
621
  }
 
622
 
 
623
  // We know where to insert it, so pass that as hint.
 
624
  insertItr = m_transactions.insert(insertItr, std::make_pair(transaction->key(id), transaction));
 
625
 
 
626
  create_query(insertItr, id, transaction->address(), priority);
 
627
 
 
628
  start_write();
 
629
 
 
630
  return id;
 
631
}
 
632
 
 
633
// Transaction received no reply and timed out. Mark node as bad and remove
 
634
// transaction (except if it was only the quick timeout).
 
635
DhtServer::transaction_itr
 
636
DhtServer::failed_transaction(transaction_itr itr, bool quick) {
 
637
  DhtTransaction* transaction = itr->second;
 
638
 
 
639
  // If it was a known node, remember that it didn't reply, unless the transaction
 
640
  // is only stalled (had quick timeout, but not full timeout).  Also if the
 
641
  // transaction still has an associated packet, the packet never got sent due to
 
642
  // throttling, so don't blame the remote node for not replying.
 
643
  // Finally, if we haven't received anything whatsoever so far, assume the entire
 
644
  // network is down and so we can't blame the node either.
 
645
  if (!quick && m_networkUp && transaction->packet() == NULL && transaction->id() != m_router->zero_id)
 
646
    m_router->node_inactive(transaction->id(), transaction->address());
 
647
 
 
648
  if (transaction->type() == DhtTransaction::DHT_FIND_NODE) {
 
649
    if (quick)
 
650
      transaction->as_find_node()->set_stalled();
 
651
    else
 
652
      transaction->as_find_node()->complete(false);
 
653
 
 
654
    try {
 
655
      find_node_next(transaction->as_find_node());
 
656
 
 
657
    } catch (std::exception& e) {
 
658
      if (!quick) {
 
659
        delete itr->second;
 
660
        m_transactions.erase(itr);
 
661
      }
 
662
 
 
663
      throw;
 
664
    }
 
665
  }
 
666
 
 
667
  if (quick) {
 
668
    return ++itr;         // don't actually delete the transaction until the final timeout
 
669
 
 
670
  } else {
 
671
    delete itr->second;
 
672
    m_transactions.erase(itr++);
 
673
    return itr;
 
674
  }
 
675
}
 
676
 
 
677
void
 
678
DhtServer::clear_transactions() {
 
679
  std::for_each(m_transactions.begin(), m_transactions.end(),
 
680
                rak::on(rak::mem_ref(&transaction_map::value_type::second),
 
681
                        rak::call_delete<DhtTransaction>()));
 
682
  m_transactions.clear();
 
683
}
 
684
 
 
685
void
 
686
DhtServer::event_read() {
 
687
  uint32_t total = 0;
 
688
 
 
689
  while (true) {
 
690
    Object request;
 
691
    rak::socket_address sa;
 
692
    int type = '?';
 
693
    DhtMessage message;
 
694
    const HashString* nodeId = NULL;
 
695
 
 
696
    try {
 
697
      char buffer[2048];
 
698
      int32_t read = read_datagram(buffer, sizeof(buffer), &sa);
 
699
 
 
700
      if (read < 0)
 
701
        break;
 
702
 
 
703
#ifdef RAK_USE_INET6
 
704
      // We can currently only process mapped-IPv4 addresses, not real IPv6.
 
705
      // Translate them to an af_inet socket_address.
 
706
      if (sa.family() == rak::socket_address::af_inet6)
 
707
        sa = sa.sa_inet6()->normalize_address();
 
708
#endif
 
709
 
 
710
      if (sa.family() != rak::socket_address::af_inet)
 
711
        continue;
 
712
 
 
713
      total += read;
 
714
 
 
715
      // If it's not a valid bencode dictionary at all, it's probably not a DHT
 
716
      // packet at all, so we don't throw an error to prevent bounce loops.
 
717
      try {
 
718
        static_map_read_bencode(buffer, buffer + read, message);
 
719
      } catch (bencode_error& e) {
 
720
        continue;
 
721
      }
 
722
 
 
723
      if (!message[key_t].is_raw_string())
 
724
        throw dht_error(dht_error_protocol, "No transaction ID");
 
725
 
 
726
      if (!message[key_y].is_raw_string())
 
727
        throw dht_error(dht_error_protocol, "No message type");
 
728
 
 
729
      if (message[key_y].as_raw_string().size() != 1)
 
730
        throw dht_error(dht_error_bad_method, "Unsupported message type");
 
731
 
 
732
      type = message[key_y].as_raw_string().data()[0];
 
733
 
 
734
      // Queries and replies have node ID in different dictionaries.
 
735
      if (type == 'r' || type == 'q') {
 
736
        if (!message[type == 'q' ? key_a_id : key_r_id].is_raw_string())
 
737
          throw dht_error(dht_error_protocol, "Invalid `id' value");
 
738
 
 
739
        raw_string nodeIdStr = message[type == 'q' ? key_a_id : key_r_id].as_raw_string();
 
740
 
 
741
        if (nodeIdStr.size() < HashString::size_data)
 
742
          throw dht_error(dht_error_protocol, "`id' value too short");
 
743
 
 
744
        nodeId = HashString::cast_from(nodeIdStr.data());
 
745
      }
 
746
 
 
747
      // Sanity check the returned transaction ID.
 
748
      if ((type == 'r' || type == 'e') && 
 
749
          (!message[key_t].is_raw_string() || message[key_t].as_raw_string().size() != 1))
 
750
        throw dht_error(dht_error_protocol, "Invalid transaction ID type/length.");
 
751
 
 
752
      // Stupid broken implementations.
 
753
      if (nodeId != NULL && *nodeId == m_router->id())
 
754
        throw dht_error(dht_error_protocol, "Send your own ID, not mine");
 
755
 
 
756
      switch (type) {
 
757
        case 'q':
 
758
          process_query(*nodeId, &sa, message);
 
759
          break;
 
760
 
 
761
        case 'r':
 
762
          process_response(*nodeId, &sa, message);
 
763
          break;
 
764
 
 
765
        case 'e':
 
766
          process_error(&sa, message);
 
767
          break;
 
768
 
 
769
        default:
 
770
          throw dht_error(dht_error_bad_method, "Unknown message type.");
 
771
      }
 
772
 
 
773
    // If node was querying us, reply with error packet, otherwise mark the node as "query failed",
 
774
    // so that if it repeatedly sends malformed replies we will drop it instead of propagating it
 
775
    // to other nodes.
 
776
    } catch (bencode_error& e) {
 
777
      if ((type == 'r' || type == 'e') && nodeId != NULL) {
 
778
        m_router->node_inactive(*nodeId, &sa);
 
779
      } else {
 
780
        snprintf(message.data_end, message.data + message.data_size - message.data_end - 1, "Malformed packet: %s", e.what());
 
781
        message.data[message.data_size - 1] = '\0';
 
782
        create_error(message, &sa, dht_error_protocol, message.data_end);
 
783
      }
 
784
 
 
785
    } catch (dht_error& e) {
 
786
      if ((type == 'r' || type == 'e') && nodeId != NULL)
 
787
        m_router->node_inactive(*nodeId, &sa);
 
788
      else
 
789
        create_error(message, &sa, e.code(), e.what());
 
790
 
 
791
    } catch (network_error& e) {
 
792
 
 
793
    }
 
794
  }
 
795
 
 
796
  m_downloadThrottle->node_used_unthrottled(total);
 
797
  m_downloadNode.rate()->insert(total);
 
798
 
 
799
  start_write();
 
800
}
 
801
 
 
802
bool
 
803
DhtServer::process_queue(packet_queue& queue, uint32_t* quota) {
 
804
  uint32_t used = 0;
 
805
 
 
806
  while (!queue.empty()) {
 
807
    DhtTransactionPacket* packet = queue.front();
 
808
 
 
809
    // Make sure its transaction hasn't timed out yet, if it has/had one
 
810
    // and don't bother sending non-transaction packets (replies) after 
 
811
    // more than 15 seconds in the queue.
 
812
    if (packet->has_failed() || packet->age() > 15) {
 
813
      delete packet;
 
814
      queue.pop_front();
 
815
      continue;
 
816
    }
 
817
 
 
818
    if (packet->length() > *quota) {
 
819
      m_uploadThrottle->node_used(&m_uploadNode, used);
 
820
      return false;
 
821
    }
 
822
 
 
823
    queue.pop_front();
 
824
 
 
825
    try {
 
826
      int written = write_datagram(packet->c_str(), packet->length(), packet->address());
 
827
 
 
828
      if (written == -1)
 
829
        throw network_error();
 
830
 
 
831
      used += written;
 
832
      *quota -= written;
 
833
 
 
834
      if ((unsigned int)written != packet->length())
 
835
        throw network_error();
 
836
 
 
837
    } catch (network_error& e) {
 
838
      // Couldn't write packet, maybe something wrong with node address or routing, so mark node as bad.
 
839
      if (packet->has_transaction()) {
 
840
        transaction_itr itr = m_transactions.find(packet->transaction()->key(packet->id()));
 
841
        if (itr == m_transactions.end())
 
842
          throw internal_error("DhtServer::process_queue could not find transaction.");
 
843
 
 
844
        failed_transaction(itr, false);
 
845
      }
 
846
    }
 
847
 
 
848
    if (packet->has_transaction())
 
849
      packet->transaction()->set_packet(NULL);
 
850
 
 
851
    delete packet;
 
852
  }
 
853
 
 
854
  m_uploadThrottle->node_used(&m_uploadNode, used);
 
855
  return true;
 
856
}
 
857
 
 
858
void
 
859
DhtServer::event_write() {
 
860
  if (m_highQueue.empty() && m_lowQueue.empty())
 
861
    throw internal_error("DhtServer::event_write called but both write queues are empty.");
 
862
 
 
863
  if (!m_uploadThrottle->is_throttled(&m_uploadNode))
 
864
    throw internal_error("DhtServer::event_write called while not in throttle list.");
 
865
 
 
866
  uint32_t quota = m_uploadThrottle->node_quota(&m_uploadNode);
 
867
 
 
868
  if (quota == 0 || !process_queue(m_highQueue, &quota) || !process_queue(m_lowQueue, &quota)) {
 
869
    manager->poll()->remove_write(this);
 
870
    m_uploadThrottle->node_deactivate(&m_uploadNode);
 
871
 
 
872
  } else if (m_highQueue.empty() && m_lowQueue.empty()) {
 
873
    manager->poll()->remove_write(this);
 
874
    m_uploadThrottle->erase(&m_uploadNode);
 
875
  }
 
876
}
 
877
 
 
878
void
 
879
DhtServer::event_error() {
 
880
}
 
881
 
 
882
void
 
883
DhtServer::start_write() {
 
884
  if ((!m_highQueue.empty() || !m_lowQueue.empty()) && !m_uploadThrottle->is_throttled(&m_uploadNode)) {
 
885
    m_uploadThrottle->insert(&m_uploadNode);
 
886
    manager->poll()->insert_write(this);
 
887
  }
 
888
 
 
889
  if (!m_taskTimeout.is_queued() && !m_transactions.empty())
 
890
    priority_queue_insert(&taskScheduler, &m_taskTimeout, (cachedTime + rak::timer::from_seconds(5)).round_seconds());
 
891
}
 
892
 
 
893
void
 
894
DhtServer::receive_timeout() {
 
895
  transaction_itr itr = m_transactions.begin();
 
896
  while (itr != m_transactions.end()) {
 
897
    if (itr->second->has_quick_timeout() && itr->second->quick_timeout() < cachedTime.seconds()) {
 
898
      itr = failed_transaction(itr, true);
 
899
 
 
900
    } else if (itr->second->timeout() < cachedTime.seconds()) {
 
901
      itr = failed_transaction(itr, false);
 
902
 
 
903
    } else {
 
904
      ++itr;
 
905
    }
 
906
  }
 
907
 
 
908
  start_write();
 
909
}
 
910
 
 
911
}