1
// libTorrent - BitTorrent library
2
// Copyright (C) 2005-2007, Jari Sundell
4
// This program is free software; you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation; either version 2 of the License, or
7
// (at your option) any later version.
9
// This program is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
// GNU General Public License for more details.
14
// You should have received a copy of the GNU General Public License
15
// along with this program; if not, write to the Free Software
16
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
// In addition, as a special exception, the copyright holders give
19
// permission to link the code of portions of this program with the
20
// OpenSSL library under certain conditions as described in each
21
// individual source file, and distribute linked combinations
24
// You must obey the GNU General Public License in all respects for
25
// all of the code used other than OpenSSL. If you modify file(s)
26
// with this exception, you may extend this exception to your version
27
// of the file(s), but you are not obligated to do so. If you do not
28
// wish to do so, delete this exception statement from your version.
29
// If you delete this exception statement from all source files in the
30
// program, then also delete it here.
32
// Contact: Jari Sundell <jaris@ifi.uio.no>
35
// 3185 Skoppum, NORWAY
43
#include "torrent/exceptions.h"
44
#include "torrent/connection_manager.h"
45
#include "torrent/download_info.h"
46
#include "torrent/object.h"
47
#include "torrent/object_stream.h"
48
#include "torrent/poll.h"
49
#include "torrent/object_static_map.h"
50
#include "torrent/throttle.h"
51
#include "tracker/tracker_dht.h"
53
#include "dht_bucket.h"
54
#include "dht_router.h"
55
#include "dht_transaction.h"
61
const char* DhtServer::queries[] = {
68
// List of all possible keys we need/support in a DHT message.
69
// Unsupported keys we receive are dropped (ignored) while decoding.
70
// See torrent/object_static_map.h for how this works.
72
const DhtMessage::key_list_type DhtMessage::base_type::keys = {
73
{ key_a_id, "a::id*S" },
74
{ key_a_infoHash, "a::info_hash*S" },
75
{ key_a_port, "a::port", },
76
{ key_a_target, "a::target*S" },
77
{ key_a_token, "a::token*S" },
84
{ key_r_id, "r::id*S" },
85
{ key_r_nodes, "r::nodes*S" },
86
{ key_r_token, "r::token*S" },
87
{ key_r_values, "r::values*L" },
94
// Error in DHT protocol, avoids std::string ctor from communication_error
95
class dht_error : public network_error {
97
dht_error(int code, const char* message) : m_message(message), m_code(code) {}
99
virtual int code() const throw() { return m_code; }
100
virtual const char* what() const throw() { return m_message; }
103
const char* m_message;
107
DhtServer::DhtServer(DhtRouter* router) :
113
m_uploadThrottle(manager->upload_throttle()->throttle_list()),
114
m_downloadThrottle(manager->download_throttle()->throttle_list()),
121
// Reserve a socket for the DHT server, even though we don't
122
// actually open it until the server is started, which may not
123
// happen until the first non-private torrent is started.
124
manager->connection_manager()->inc_socket_count();
127
DhtServer::~DhtServer() {
130
std::for_each(m_highQueue.begin(), m_highQueue.end(), rak::call_delete<DhtTransactionPacket>());
131
std::for_each(m_lowQueue.begin(), m_lowQueue.end(), rak::call_delete<DhtTransactionPacket>());
133
manager->connection_manager()->dec_socket_count();
137
DhtServer::start(int port) {
139
if (!get_fd().open_datagram() || !get_fd().set_nonblock())
140
throw resource_error("Could not allocate datagram socket.");
142
if (!get_fd().set_reuse_address(true))
143
throw resource_error("Could not set listening port to reuse address.");
145
rak::socket_address sa = *m_router->address();
148
if (!get_fd().bind(sa))
149
throw resource_error("Could not bind datagram socket.");
151
} catch (torrent::base_error& e) {
157
m_taskTimeout.set_slot(rak::mem_fn(this, &DhtServer::receive_timeout));
159
m_uploadNode.set_list_iterator(m_uploadThrottle->end());
160
m_uploadNode.slot_activate(rak::make_mem_fun(static_cast<SocketBase*>(this), &SocketBase::receive_throttle_up_activate));
162
m_downloadNode.set_list_iterator(m_downloadThrottle->end());
163
m_downloadThrottle->insert(&m_downloadNode);
165
manager->poll()->open(this);
166
manager->poll()->insert_read(this);
167
manager->poll()->insert_error(this);
175
clear_transactions();
177
priority_queue_erase(&taskScheduler, &m_taskTimeout);
179
m_uploadThrottle->erase(&m_uploadNode);
180
m_downloadThrottle->erase(&m_downloadNode);
182
manager->poll()->remove_read(this);
183
manager->poll()->remove_write(this);
184
manager->poll()->remove_error(this);
185
manager->poll()->close(this);
194
DhtServer::reset_statistics() {
195
m_queriesReceived = 0;
197
m_repliesReceived = 0;
198
m_errorsReceived = 0;
201
m_uploadNode.rate()->set_total(0);
202
m_downloadNode.rate()->set_total(0);
205
// Ping a node whose ID we know.
207
DhtServer::ping(const HashString& id, const rak::socket_address* sa) {
208
// No point pinging a node that we're already contacting otherwise.
209
transaction_itr itr = m_transactions.lower_bound(DhtTransaction::key(sa, 0));
210
if (itr == m_transactions.end() || !DhtTransaction::key_match(itr->first, sa))
211
add_transaction(new DhtTransactionPing(id, sa), packet_prio_low);
214
// Contact nodes in given bucket and ask for their nodes closest to target.
216
DhtServer::find_node(const DhtBucket& contacts, const HashString& target) {
217
DhtSearch* search = new DhtSearch(target, contacts);
219
DhtSearch::const_accessor n;
220
while ((n = search->get_contact()) != search->end())
221
add_transaction(new DhtTransactionFindNode(n), packet_prio_low);
223
// This shouldn't happen, it means we had no contactable nodes at all.
224
if (!search->start())
229
DhtServer::announce(const DhtBucket& contacts, const HashString& infoHash, TrackerDht* tracker) {
230
DhtAnnounce* announce = new DhtAnnounce(infoHash, tracker, contacts);
232
DhtSearch::const_accessor n;
233
while ((n = announce->get_contact()) != announce->end())
234
add_transaction(new DhtTransactionFindNode(n), packet_prio_high);
236
// This can only happen if all nodes we know are bad.
237
if (!announce->start())
240
announce->update_status();
244
DhtServer::cancel_announce(DownloadInfo* info, const TrackerDht* tracker) {
245
transaction_itr itr = m_transactions.begin();
247
while (itr != m_transactions.end()) {
248
if (itr->second->is_search() && itr->second->as_search()->search()->is_announce()) {
249
DhtAnnounce* announce = static_cast<DhtAnnounce*>(itr->second->as_search()->search());
251
if ((info == NULL || announce->target() == info->hash()) && (tracker == NULL || announce->tracker() == tracker)) {
253
m_transactions.erase(itr++);
263
DhtServer::update() {
264
// Reset this every 15 minutes. It'll get set back to true if we receive
265
// any valid packets. This allows detecting when the entire network goes
266
// down, and prevents all nodes from getting removed as unresponsive.
271
DhtServer::process_query(const HashString& id, const rak::socket_address* sa, const DhtMessage& msg) {
275
raw_string query = msg[key_q].as_raw_string();
280
if (query == raw_string::from_c_str("find_node"))
281
create_find_node_response(msg, reply);
283
else if (query == raw_string::from_c_str("get_peers"))
284
create_get_peers_response(msg, sa, reply);
286
else if (query == raw_string::from_c_str("announce_peer"))
287
create_announce_peer_response(msg, sa, reply);
289
else if (query != raw_string::from_c_str("ping"))
290
throw dht_error(dht_error_bad_method, "Unknown query type.");
292
m_router->node_queried(id, sa);
293
create_response(msg, sa, reply);
297
DhtServer::create_find_node_response(const DhtMessage& req, DhtMessage& reply) {
298
raw_string target = req[key_a_target].as_raw_string();
300
if (target.size() < HashString::size_data)
301
throw dht_error(dht_error_protocol, "target string too short");
303
reply[key_r_nodes] = m_router->get_closest_nodes(*HashString::cast_from(target.data()));
305
if (reply[key_r_nodes].as_raw_string().empty())
306
throw dht_error(dht_error_generic, "No nodes");
310
DhtServer::create_get_peers_response(const DhtMessage& req, const rak::socket_address* sa, DhtMessage& reply) {
311
reply[key_r_token] = m_router->make_token(sa, reply.data_end);
312
reply.data_end += reply[key_r_token].as_raw_string().size();
314
raw_string info_hash_str = req[key_a_infoHash].as_raw_string();
316
if (info_hash_str.size() < HashString::size_data)
317
throw dht_error(dht_error_protocol, "info hash too short");
319
const HashString* info_hash = HashString::cast_from(info_hash_str.data());
321
DhtTracker* tracker = m_router->get_tracker(*info_hash, false);
323
// If we're not tracking or have no peers, send closest nodes.
324
if (!tracker || tracker->empty()) {
325
raw_string nodes = m_router->get_closest_nodes(*info_hash);
328
throw dht_error(dht_error_generic, "No peers nor nodes");
330
reply[key_r_nodes] = nodes;
333
reply[key_r_values] = tracker->get_peers();
338
DhtServer::create_announce_peer_response(const DhtMessage& req, const rak::socket_address* sa, DhtMessage& reply) {
339
raw_string info_hash = req[key_a_infoHash].as_raw_string();
341
if (info_hash.size() < HashString::size_data)
342
throw dht_error(dht_error_protocol, "info hash too short");
344
if (!m_router->token_valid(req[key_a_token].as_raw_string(), sa))
345
throw dht_error(dht_error_protocol, "Token invalid.");
347
DhtTracker* tracker = m_router->get_tracker(*HashString::cast_from(info_hash.data()), true);
348
tracker->add_peer(sa->sa_inet()->address_n(), req[key_a_port].as_value());
352
DhtServer::process_response(const HashString& id, const rak::socket_address* sa, const DhtMessage& response) {
353
int transactionId = (unsigned char)response[key_t].as_raw_string().data()[0];
354
transaction_itr itr = m_transactions.find(DhtTransaction::key(sa, transactionId));
356
// Response to a transaction we don't have in our table. At this point it's
357
// impossible to tell whether it used to be a valid transaction but timed out
358
// the node did not return the ID we sent it, or it returned it with a
359
// different address than we sent it o. Best we can do is ignore the reply,
360
// since the protocol doesn't call for returning errors in responses.
361
if (itr == m_transactions.end())
367
// Make sure transaction is erased even if an exception is thrown.
369
DhtTransaction* transaction = itr->second;
370
#ifdef USE_EXTRA_DEBUG
371
if (DhtTransaction::key(sa, transactionId) != transaction->key(transactionId))
372
throw internal_error("DhtServer::process_response key mismatch.");
375
// If we contact a node but its ID is not the one we expect, ignore the reply
376
// to prevent interference from rogue nodes.
377
if ((id != transaction->id() && transaction->id() != m_router->zero_id))
380
switch (transaction->type()) {
381
case DhtTransaction::DHT_FIND_NODE:
382
parse_find_node_reply(transaction->as_find_node(), response[key_r_nodes].as_raw_string());
385
case DhtTransaction::DHT_GET_PEERS:
386
parse_get_peers_reply(transaction->as_get_peers(), response);
389
// Nothing to do for DHT_PING and DHT_ANNOUNCE_PEER
394
// Mark node responsive only if all processing was successful, without errors.
395
m_router->node_replied(id, sa);
397
} catch (std::exception& e) {
399
m_transactions.erase(itr);
406
m_transactions.erase(itr);
410
DhtServer::process_error(const rak::socket_address* sa, const DhtMessage& error) {
411
int transactionId = (unsigned char)error[key_t].as_raw_string().data()[0];
412
transaction_itr itr = m_transactions.find(DhtTransaction::key(sa, transactionId));
414
if (itr == m_transactions.end())
421
// Don't mark node as good (because it replied) or bad (because it returned an error).
422
// If it consistently returns errors for valid queries it's probably broken. But a
423
// few error messages are acceptable. So we do nothing and pretend the query never happened.
426
m_transactions.erase(itr);
430
DhtServer::parse_find_node_reply(DhtTransactionSearch* transaction, raw_string nodes) {
431
transaction->complete(true);
433
if (sizeof(const compact_node_info) != 26)
434
throw internal_error("DhtServer::parse_find_node_reply(...) bad struct size.");
437
std::copy(reinterpret_cast<const compact_node_info*>(nodes.data()),
438
reinterpret_cast<const compact_node_info*>(nodes.data() + nodes.size() - nodes.size() % sizeof(compact_node_info)),
439
std::back_inserter(list));
441
for (node_info_list::iterator itr = list.begin(); itr != list.end(); ++itr) {
442
if (itr->id() != m_router->id()) {
443
rak::socket_address sa = itr->address();
444
transaction->search()->add_contact(itr->id(), &sa);
448
find_node_next(transaction);
452
DhtServer::parse_get_peers_reply(DhtTransactionGetPeers* transaction, const DhtMessage& response) {
453
DhtAnnounce* announce = static_cast<DhtAnnounce*>(transaction->as_search()->search());
455
transaction->complete(true);
457
if (response[key_r_values].is_raw_list())
458
announce->receive_peers(response[key_r_values].as_raw_list());
460
if (response[key_r_token].is_raw_string())
461
add_transaction(new DhtTransactionAnnouncePeer(transaction->id(),
462
transaction->address(),
464
response[key_r_token].as_raw_string()),
467
announce->update_status();
471
DhtServer::find_node_next(DhtTransactionSearch* transaction) {
472
int priority = packet_prio_low;
473
if (transaction->search()->is_announce())
474
priority = packet_prio_high;
476
DhtSearch::const_accessor node;
477
while ((node = transaction->search()->get_contact()) != transaction->search()->end())
478
add_transaction(new DhtTransactionFindNode(node), priority);
480
if (!transaction->search()->is_announce())
483
DhtAnnounce* announce = static_cast<DhtAnnounce*>(transaction->search());
484
if (announce->complete()) {
485
// We have found the 8 closest nodes to the info hash. Retrieve peers
486
// from them and announce to them.
487
for (node = announce->start_announce(); node != announce->end(); ++node)
488
add_transaction(new DhtTransactionGetPeers(node), packet_prio_high);
491
announce->update_status();
495
DhtServer::add_packet(DhtTransactionPacket* packet, int priority) {
497
// High priority packets are for important queries, and quite small.
498
// They're added to front of high priority queue and thus will be the
499
// next packets sent.
500
case packet_prio_high:
501
m_highQueue.push_front(packet);
504
// Low priority query packets are added to the back of the high priority
505
// queue and will be sent when all high priority packets have been transmitted.
506
case packet_prio_low:
507
m_highQueue.push_back(packet);
510
// Reply packets will be processed after all of our own packets have been send.
511
case packet_prio_reply:
512
m_lowQueue.push_back(packet);
516
throw internal_error("DhtServer::add_packet called with invalid priority.");
521
DhtServer::create_query(transaction_itr itr, int tID, const rak::socket_address* sa, int priority) {
522
if (itr->second->id() == m_router->id())
523
throw internal_error("DhtServer::create_query trying to send to itself.");
527
// Transaction ID is a bencode string.
528
query[key_t] = raw_bencode(query.data_end, 3);
529
*query.data_end++ = '1';
530
*query.data_end++ = ':';
531
*query.data_end++ = tID;
533
DhtTransaction* transaction = itr->second;
534
query[key_q] = raw_string::from_c_str(queries[transaction->type()]);
535
query[key_y] = raw_bencode::from_c_str("1:q");
536
query[key_v] = raw_bencode("4:" PEER_VERSION, 6);
537
query[key_a_id] = m_router->id_raw_string();
539
switch (transaction->type()) {
540
case DhtTransaction::DHT_PING:
544
case DhtTransaction::DHT_FIND_NODE:
545
query[key_a_target] = transaction->as_find_node()->search()->target_raw_string();
548
case DhtTransaction::DHT_GET_PEERS:
549
query[key_a_infoHash] = transaction->as_get_peers()->search()->target_raw_string();
552
case DhtTransaction::DHT_ANNOUNCE_PEER:
553
query[key_a_infoHash] = transaction->as_announce_peer()->info_hash_raw_string();
554
query[key_a_token] = transaction->as_announce_peer()->token();
555
query[key_a_port] = manager->connection_manager()->listen_port();
559
DhtTransactionPacket* packet = new DhtTransactionPacket(transaction->address(), query, tID, transaction);
560
transaction->set_packet(packet);
561
add_packet(packet, priority);
567
DhtServer::create_response(const DhtMessage& req, const rak::socket_address* sa, DhtMessage& reply) {
568
reply[key_r_id] = m_router->id_raw_string();
569
reply[key_t] = req[key_t];
570
reply[key_y] = raw_bencode::from_c_str("1:r");
571
reply[key_v] = raw_bencode("4:" PEER_VERSION, 6);
573
add_packet(new DhtTransactionPacket(sa, reply), packet_prio_reply);
577
DhtServer::create_error(const DhtMessage& req, const rak::socket_address* sa, int num, const char* msg) {
580
if (req[key_t].is_raw_bencode() || req[key_t].is_raw_string())
581
error[key_t] = req[key_t];
583
error[key_y] = raw_bencode::from_c_str("1:e");
584
error[key_v] = raw_bencode("4:" PEER_VERSION, 6);
585
error[key_e_0] = num;
586
error[key_e_1] = raw_string::from_c_str(msg);
588
add_packet(new DhtTransactionPacket(sa, error), packet_prio_reply);
592
DhtServer::add_transaction(DhtTransaction* transaction, int priority) {
593
// Try random transaction ID. This is to make it less likely that we reuse
594
// a transaction ID from an earlier transaction which timed out and we forgot
595
// about it, so that if the node replies after the timeout it's less likely
596
// that we match the reply to the wrong transaction.
598
// If there's an existing transaction with the random ID we search for the next
599
// unused one. Since normally only one or two transactions will be active per
600
// node, a collision is extremely unlikely, and a linear search for the first
601
// open one is the most efficient.
602
unsigned int rnd = (uint8_t)random();
603
unsigned int id = rnd;
605
transaction_itr insertItr = m_transactions.lower_bound(transaction->key(rnd));
607
// If key matches, keep trying successive IDs.
608
while (insertItr != m_transactions.end() && insertItr->first == transaction->key(id)) {
610
id = (uint8_t)(id + 1);
612
// Give up after trying all possible IDs. This should never happen.
618
// Transaction ID wrapped around, reset iterator.
620
insertItr = m_transactions.lower_bound(transaction->key(id));
623
// We know where to insert it, so pass that as hint.
624
insertItr = m_transactions.insert(insertItr, std::make_pair(transaction->key(id), transaction));
626
create_query(insertItr, id, transaction->address(), priority);
633
// Transaction received no reply and timed out. Mark node as bad and remove
634
// transaction (except if it was only the quick timeout).
635
DhtServer::transaction_itr
636
DhtServer::failed_transaction(transaction_itr itr, bool quick) {
637
DhtTransaction* transaction = itr->second;
639
// If it was a known node, remember that it didn't reply, unless the transaction
640
// is only stalled (had quick timeout, but not full timeout). Also if the
641
// transaction still has an associated packet, the packet never got sent due to
642
// throttling, so don't blame the remote node for not replying.
643
// Finally, if we haven't received anything whatsoever so far, assume the entire
644
// network is down and so we can't blame the node either.
645
if (!quick && m_networkUp && transaction->packet() == NULL && transaction->id() != m_router->zero_id)
646
m_router->node_inactive(transaction->id(), transaction->address());
648
if (transaction->type() == DhtTransaction::DHT_FIND_NODE) {
650
transaction->as_find_node()->set_stalled();
652
transaction->as_find_node()->complete(false);
655
find_node_next(transaction->as_find_node());
657
} catch (std::exception& e) {
660
m_transactions.erase(itr);
668
return ++itr; // don't actually delete the transaction until the final timeout
672
m_transactions.erase(itr++);
678
DhtServer::clear_transactions() {
679
std::for_each(m_transactions.begin(), m_transactions.end(),
680
rak::on(rak::mem_ref(&transaction_map::value_type::second),
681
rak::call_delete<DhtTransaction>()));
682
m_transactions.clear();
686
DhtServer::event_read() {
691
rak::socket_address sa;
694
const HashString* nodeId = NULL;
698
int32_t read = read_datagram(buffer, sizeof(buffer), &sa);
704
// We can currently only process mapped-IPv4 addresses, not real IPv6.
705
// Translate them to an af_inet socket_address.
706
if (sa.family() == rak::socket_address::af_inet6)
707
sa = sa.sa_inet6()->normalize_address();
710
if (sa.family() != rak::socket_address::af_inet)
715
// If it's not a valid bencode dictionary at all, it's probably not a DHT
716
// packet at all, so we don't throw an error to prevent bounce loops.
718
static_map_read_bencode(buffer, buffer + read, message);
719
} catch (bencode_error& e) {
723
if (!message[key_t].is_raw_string())
724
throw dht_error(dht_error_protocol, "No transaction ID");
726
if (!message[key_y].is_raw_string())
727
throw dht_error(dht_error_protocol, "No message type");
729
if (message[key_y].as_raw_string().size() != 1)
730
throw dht_error(dht_error_bad_method, "Unsupported message type");
732
type = message[key_y].as_raw_string().data()[0];
734
// Queries and replies have node ID in different dictionaries.
735
if (type == 'r' || type == 'q') {
736
if (!message[type == 'q' ? key_a_id : key_r_id].is_raw_string())
737
throw dht_error(dht_error_protocol, "Invalid `id' value");
739
raw_string nodeIdStr = message[type == 'q' ? key_a_id : key_r_id].as_raw_string();
741
if (nodeIdStr.size() < HashString::size_data)
742
throw dht_error(dht_error_protocol, "`id' value too short");
744
nodeId = HashString::cast_from(nodeIdStr.data());
747
// Sanity check the returned transaction ID.
748
if ((type == 'r' || type == 'e') &&
749
(!message[key_t].is_raw_string() || message[key_t].as_raw_string().size() != 1))
750
throw dht_error(dht_error_protocol, "Invalid transaction ID type/length.");
752
// Stupid broken implementations.
753
if (nodeId != NULL && *nodeId == m_router->id())
754
throw dht_error(dht_error_protocol, "Send your own ID, not mine");
758
process_query(*nodeId, &sa, message);
762
process_response(*nodeId, &sa, message);
766
process_error(&sa, message);
770
throw dht_error(dht_error_bad_method, "Unknown message type.");
773
// If node was querying us, reply with error packet, otherwise mark the node as "query failed",
774
// so that if it repeatedly sends malformed replies we will drop it instead of propagating it
776
} catch (bencode_error& e) {
777
if ((type == 'r' || type == 'e') && nodeId != NULL) {
778
m_router->node_inactive(*nodeId, &sa);
780
snprintf(message.data_end, message.data + message.data_size - message.data_end - 1, "Malformed packet: %s", e.what());
781
message.data[message.data_size - 1] = '\0';
782
create_error(message, &sa, dht_error_protocol, message.data_end);
785
} catch (dht_error& e) {
786
if ((type == 'r' || type == 'e') && nodeId != NULL)
787
m_router->node_inactive(*nodeId, &sa);
789
create_error(message, &sa, e.code(), e.what());
791
} catch (network_error& e) {
796
m_downloadThrottle->node_used_unthrottled(total);
797
m_downloadNode.rate()->insert(total);
803
DhtServer::process_queue(packet_queue& queue, uint32_t* quota) {
806
while (!queue.empty()) {
807
DhtTransactionPacket* packet = queue.front();
809
// Make sure its transaction hasn't timed out yet, if it has/had one
810
// and don't bother sending non-transaction packets (replies) after
811
// more than 15 seconds in the queue.
812
if (packet->has_failed() || packet->age() > 15) {
818
if (packet->length() > *quota) {
819
m_uploadThrottle->node_used(&m_uploadNode, used);
826
int written = write_datagram(packet->c_str(), packet->length(), packet->address());
829
throw network_error();
834
if ((unsigned int)written != packet->length())
835
throw network_error();
837
} catch (network_error& e) {
838
// Couldn't write packet, maybe something wrong with node address or routing, so mark node as bad.
839
if (packet->has_transaction()) {
840
transaction_itr itr = m_transactions.find(packet->transaction()->key(packet->id()));
841
if (itr == m_transactions.end())
842
throw internal_error("DhtServer::process_queue could not find transaction.");
844
failed_transaction(itr, false);
848
if (packet->has_transaction())
849
packet->transaction()->set_packet(NULL);
854
m_uploadThrottle->node_used(&m_uploadNode, used);
859
DhtServer::event_write() {
860
if (m_highQueue.empty() && m_lowQueue.empty())
861
throw internal_error("DhtServer::event_write called but both write queues are empty.");
863
if (!m_uploadThrottle->is_throttled(&m_uploadNode))
864
throw internal_error("DhtServer::event_write called while not in throttle list.");
866
uint32_t quota = m_uploadThrottle->node_quota(&m_uploadNode);
868
if (quota == 0 || !process_queue(m_highQueue, "a) || !process_queue(m_lowQueue, "a)) {
869
manager->poll()->remove_write(this);
870
m_uploadThrottle->node_deactivate(&m_uploadNode);
872
} else if (m_highQueue.empty() && m_lowQueue.empty()) {
873
manager->poll()->remove_write(this);
874
m_uploadThrottle->erase(&m_uploadNode);
879
DhtServer::event_error() {
883
DhtServer::start_write() {
884
if ((!m_highQueue.empty() || !m_lowQueue.empty()) && !m_uploadThrottle->is_throttled(&m_uploadNode)) {
885
m_uploadThrottle->insert(&m_uploadNode);
886
manager->poll()->insert_write(this);
889
if (!m_taskTimeout.is_queued() && !m_transactions.empty())
890
priority_queue_insert(&taskScheduler, &m_taskTimeout, (cachedTime + rak::timer::from_seconds(5)).round_seconds());
894
DhtServer::receive_timeout() {
895
transaction_itr itr = m_transactions.begin();
896
while (itr != m_transactions.end()) {
897
if (itr->second->has_quick_timeout() && itr->second->quick_timeout() < cachedTime.seconds()) {
898
itr = failed_transaction(itr, true);
900
} else if (itr->second->timeout() < cachedTime.seconds()) {
901
itr = failed_transaction(itr, false);