169
109
// We can do the choking before the slot is called as this
170
110
// ChokeManager won't be unchoking the same peers due to the
172
adjust = choke_range(m_connectionList->begin(), beginChoked, -adjust);
112
adjust = choke_range(m_unchoked.begin(), m_unchoked.end(), -adjust);
114
m_slotUnchoke(-adjust);
179
ChokeManager::cycle(unsigned int quota) {
180
iterator lastChoked = seperate_interested(m_connectionList->begin(), m_connectionList->end());
181
iterator firstChoked = seperate_unchoked(m_connectionList->begin(), lastChoked);
183
// Partition away the connections we shall not choke.
185
if (std::distance(m_connectionList->begin(), firstChoked) != (ConnectionList::difference_type)m_currentlyUnchoked)
186
throw internal_error("ChokeManager::cycle() std::distance(m_connectionList->begin(), firstChoked) != m_currentlyUnchoked.");
188
if (std::distance(m_connectionList->begin(), lastChoked) != (ConnectionList::difference_type)m_currentlyInterested)
189
throw internal_error("ChokeManager::cycle() std::distance(m_connectionList->begin(), lastChoked) != m_currentlyInterested.");
191
iterator firstUnchoked = m_connectionList->begin();
192
iterator lastUnchoked = firstChoked;
196
// We don't call the resource manager slots, the number of un/choked
197
// connections is returned.
199
adjust = std::min(quota, m_maxUnchoked) - m_currentlyUnchoked;
202
firstChoked += (cycled = unchoke_range(firstChoked, lastChoked, adjust));
204
firstUnchoked -= (cycled = -choke_range(firstUnchoked, lastUnchoked, -adjust));
208
adjust = max_alternate() - std::abs(cycled);
210
// Consider rolling this into the above calls.
212
alternate_ranges(firstUnchoked, lastUnchoked, firstChoked, lastChoked, adjust);
214
if (m_currentlyUnchoked > quota)
215
throw internal_error("ChokeManager::cycle() m_currentlyUnchoked > quota.");
221
ChokeManager::set_interested(PeerConnectionBase* pc) {
222
m_currentlyInterested++;
224
if (!pc->is_up_choked())
227
if (m_currentlyUnchoked < m_maxUnchoked &&
228
pc->time_last_choked() + rak::timer::from_seconds(10) < cachedTime &&
229
m_slotCanUnchoke()) {
230
pc->receive_choke(false);
232
m_currentlyUnchoked++;
238
ChokeManager::set_not_interested(PeerConnectionBase* pc) {
239
m_currentlyInterested--;
241
if (pc->is_up_choked())
244
pc->receive_choke(true);
246
m_currentlyUnchoked--;
250
// We are no longer be in m_connectionList.
252
ChokeManager::disconnected(PeerConnectionBase* pc) {
253
if (pc->is_upload_wanted())
254
m_currentlyInterested--;
256
if (!pc->is_up_choked()) {
257
m_currentlyUnchoked--;
263
ChokeManager::choke_range(iterator first, iterator last, unsigned int max) {
264
max = std::min(max, (unsigned int)distance(first, last));
266
std::sort(first, last, choke_manager_read_rate_increasing());
269
split = std::stable_partition(first, last, choke_manager_not_recently_unchoked());
270
split = std::find_if(first, split, choke_manager_is_remote_uploading());
272
std::sort(first, split, choke_manager_write_rate_increasing());
274
std::for_each(first, first + max, std::bind2nd(std::mem_fun(&PeerConnectionBase::receive_choke), true));
276
m_currentlyUnchoked -= max;
281
ChokeManager::unchoke_range(iterator first, iterator last, unsigned int max) {
282
std::sort(first, last, choke_manager_read_rate_decreasing());
284
unsigned int count = 0;
286
// Find the split between the ones that are uploading to us, and
287
// those that arn't. When unchoking, circa every third unchoke is of
288
// a connection in the list of those not uploading to us.
290
// Perhaps we should prefer those we are interested in?
292
iterator split = std::find_if(first, last, choke_manager_is_remote_not_uploading());
294
for ( ; count != max && first != last; count++, first++) {
297
(*(*first)->peer_chunks()->download_throttle()->rate() < 500 || ::random() % m_generousUnchokes == 0)) {
298
// Use a random connection that is not uploading to us.
299
std::iter_swap(split, split + ::random() % std::distance(split, last));
300
swap_with_shift(first, split++);
303
(*first)->receive_choke(false);
306
m_currentlyUnchoked += count;
119
ChokeManager::cycle(uint32_t quota) {
120
quota = std::min(quota, m_maxUnchoked);
122
// Does this properly handle 'unlimited' quota?
123
uint32_t oldSize = m_unchoked.size();
124
uint32_t unchoked = unchoke_range(m_queued.begin(), m_queued.end(),
125
std::max<uint32_t>(m_unchoked.size() < quota ? quota - m_unchoked.size() : 0,
126
std::min(quota, max_alternate())));
128
if (m_unchoked.size() > quota)
129
choke_range(m_unchoked.begin(), m_unchoked.end() - unchoked, m_unchoked.size() - quota);
131
if (m_unchoked.size() > quota)
132
throw internal_error("ChokeManager::cycle() m_unchoked.size() > quota.");
134
return m_unchoked.size() - oldSize;
138
ChokeManager::set_queued(PeerConnectionBase* pc, ChokeManagerNode* base) {
139
if (base->queued() || base->unchoked())
142
base->set_queued(true);
147
if ((m_flags & flag_unchoke_all_new || (!is_full() && m_slotCanUnchoke())) &&
148
base->time_last_choke() + rak::timer::from_seconds(10) < cachedTime) {
149
m_unchoked.push_back(value_type(pc, 0));
150
m_slotConnection(pc, false);
155
m_queued.push_back(value_type(pc, 0));
160
ChokeManager::set_not_queued(PeerConnectionBase* pc, ChokeManagerNode* base) {
164
base->set_queued(false);
169
if (base->unchoked()) {
170
choke_manager_erase(&m_unchoked, pc);
171
m_slotConnection(pc, true);
175
choke_manager_erase(&m_queued, pc);
180
ChokeManager::set_snubbed(PeerConnectionBase* pc, ChokeManagerNode* base) {
184
base->set_snubbed(true);
186
if (base->unchoked()) {
187
choke_manager_erase(&m_unchoked, pc);
188
m_slotConnection(pc, true);
191
} else if (base->queued()) {
192
choke_manager_erase(&m_queued, pc);
195
base->set_queued(false);
199
ChokeManager::set_not_snubbed(PeerConnectionBase* pc, ChokeManagerNode* base) {
200
if (!base->snubbed())
203
base->set_snubbed(false);
208
if (base->unchoked())
209
throw internal_error("ChokeManager::set_not_snubbed(...) base->unchoked().");
211
if ((m_flags & flag_unchoke_all_new || (!is_full() && m_slotCanUnchoke())) &&
212
base->time_last_choke() + rak::timer::from_seconds(10) < cachedTime) {
213
m_unchoked.push_back(value_type(pc, 0));
214
m_slotConnection(pc, false);
219
m_queued.push_back(value_type(pc, 0));
223
// We are no longer in m_connectionList.
225
ChokeManager::disconnected(PeerConnectionBase* pc, ChokeManagerNode* base) {
226
if (base->snubbed()) {
229
} else if (base->unchoked()) {
230
choke_manager_erase(&m_unchoked, pc);
233
} else if (base->queued()) {
234
choke_manager_erase(&m_queued, pc);
237
base->set_queued(false);
240
struct choke_manager_less {
241
bool operator () (ChokeManager::value_type v1, ChokeManager::value_type v2) const { return v1.second < v2.second; }
245
choke_manager_allocate_slots(ChokeManager::iterator first, ChokeManager::iterator last,
246
uint32_t max, uint32_t* weights, ChokeManager::target_type* target) {
247
std::sort(first, last, choke_manager_less());
249
// 'weightTotal' only contains the weight of targets that have
250
// connections to unchoke. When all connections are in a group are
251
// to be unchoked, then the group's weight is removed.
252
uint32_t weightTotal = 0;
253
uint32_t unchoke = max;
255
target[0].second = first;
257
for (uint32_t i = 0; i < ChokeManager::order_max_size; i++) {
259
target[i + 1].second = std::find_if(target[i].second, last,
260
rak::less(i * ChokeManager::order_base + (ChokeManager::order_base - 1),
261
rak::mem_ref(&ChokeManager::value_type::second)));
263
if (std::distance(target[i].second, target[i + 1].second) != 0)
264
weightTotal += weights[i];
267
// Spread available unchoke slots as long as we can give everyone an
269
while (weightTotal != 0 && unchoke / weightTotal > 0) {
270
uint32_t base = unchoke / weightTotal;
272
for (uint32_t itr = 0; itr < ChokeManager::order_max_size; itr++) {
273
uint32_t s = std::distance(target[itr].second, target[itr + 1].second);
275
if (weights[itr] == 0 || target[itr].first >= s)
278
uint32_t u = std::min(s - target[itr].first, base * weights[itr]);
281
target[itr].first += u;
283
if (target[itr].first >= s)
284
weightTotal -= weights[itr];
288
// Spread the remainder starting from a random position based on the
289
// total weight. This will ensure that aggregated over time we
290
// spread the unchokes equally according to the weight table.
291
if (weightTotal != 0 && unchoke != 0) {
292
uint32_t start = ::random() % weightTotal;
293
unsigned int itr = 0;
296
uint32_t s = std::distance(target[itr].second, target[itr + 1].second);
298
if (weights[itr] == 0 || target[itr].first >= s)
301
if (start < weights[itr])
304
start -= weights[itr];
307
for ( ; weightTotal != 0 && unchoke != 0; itr = (itr + 1) % ChokeManager::order_max_size) {
308
uint32_t s = std::distance(target[itr].second, target[itr + 1].second);
310
if (weights[itr] == 0 || target[itr].first >= s)
313
uint32_t u = std::min(unchoke, std::min(s - target[itr].first, weights[itr] - start));
317
target[itr].first += u;
319
if (target[itr].first >= s)
320
weightTotal -= weights[itr];
326
ChokeManager::choke_range(iterator first, iterator last, uint32_t max) {
327
m_slotChokeWeight(first, last);
329
target_type target[order_max_size + 1];
330
choke_manager_allocate_slots(first, last, max, m_chokeWeight, target);
332
// Now do the actual unchoking.
335
// Iterate in reverse so that the iterators in 'target' remain vaild
336
// even though we remove random ranges.
337
for (target_type* itr = target + order_max_size; itr != target; itr--) {
338
if ((itr - 1)->first > (uint32_t)std::distance((itr - 1)->second, itr->second))
339
throw internal_error("ChokeManager::choke_range(...) itr->first > std::distance((itr - 1)->second, itr->second).");
341
if (itr->second - (itr - 1)->first > itr->second ||
342
itr->second - (itr - 1)->first < m_unchoked.begin() ||
343
itr->second - (itr - 1)->first > m_unchoked.end() ||
344
(itr - 1)->second < m_unchoked.begin() ||
345
(itr - 1)->second > m_unchoked.end())
346
throw internal_error("ChokeManager::choke_range(...) bad iterator range.");
348
count += (itr - 1)->first;
350
// We move the connections that return true, while the ones that
351
// return false get thrown out. The function called must update
352
// ChunkManager::m_queued if false is returned.
354
// The C++ standard says std::partition will call the predicate
355
// max 'last - first' times, so we can assume it gets called once
357
iterator split = std::partition(itr->second - (itr - 1)->first, itr->second,
358
rak::on(rak::mem_ref(&value_type::first), std::bind2nd(m_slotConnection, true)));
360
m_queued.insert(m_queued.end(), itr->second - (itr - 1)->first, split);
361
m_unchoked.erase(itr->second - (itr - 1)->first, itr->second);
365
throw internal_error("ChokeManager::choke_range(...) count > max.");
371
ChokeManager::unchoke_range(iterator first, iterator last, uint32_t max) {
372
m_slotUnchokeWeight(first, last);
374
target_type target[order_max_size + 1];
375
choke_manager_allocate_slots(first, last, max, m_unchokeWeight, target);
377
// Now do the actual unchoking.
380
for (target_type* itr = target + order_max_size; itr != target; itr--) {
381
if ((itr - 1)->first > (uint32_t)std::distance((itr - 1)->second, itr->second))
382
throw internal_error("ChokeManager::unchoke_range(...) itr->first > std::distance((itr - 1)->second, itr->second).");
384
if (itr->second - (itr - 1)->first > itr->second ||
385
itr->second - (itr - 1)->first < m_queued.begin() ||
386
itr->second - (itr - 1)->first > m_queued.end() ||
387
(itr - 1)->second < m_queued.begin() ||
388
(itr - 1)->second > m_queued.end())
389
throw internal_error("ChokeManager::unchoke_range(...) bad iterator range.");
391
count += (itr - 1)->first;
393
std::for_each(itr->second - (itr - 1)->first, itr->second,
394
rak::on(rak::mem_ref(&value_type::first), std::bind2nd(m_slotConnection, false)));
396
m_unchoked.insert(m_unchoked.end(), itr->second - (itr - 1)->first, itr->second);
397
m_queued.erase(itr->second - (itr - 1)->first, itr->second);
401
throw internal_error("ChokeManager::unchoke_range(...) count > max.");
406
// Note that these algorithms fail if the rate >= 2^30.
408
// Need to add the recently unchoked check here?
410
uint32_t weights_upload_choke[ChokeManager::order_max_size] = { 1, 1, 1, 1 };
411
uint32_t weights_upload_unchoke[ChokeManager::order_max_size] = { 1, 3, 9, 0 };
414
calculate_upload_choke(ChokeManager::iterator first, ChokeManager::iterator last) {
415
while (first != last) {
416
// Very crude version for now.
417
uint32_t downloadRate = first->first->peer_chunks()->download_throttle()->rate()->rate();
418
first->second = ChokeManager::order_base - 1 - downloadRate;
425
calculate_upload_unchoke(ChokeManager::iterator first, ChokeManager::iterator last) {
426
while (first != last) {
427
if (first->first->is_down_local_unchoked()) {
428
uint32_t downloadRate = first->first->peer_chunks()->download_throttle()->rate()->rate();
430
// If the peer transmits at less than 1KB, we should consider it
431
// to be a rather stingy peer, and should look for new ones.
433
if (downloadRate < 1000)
434
first->second = downloadRate;
436
first->second = 2 * ChokeManager::order_base + downloadRate;
439
// This will be our optimistic unchoke queue, should be
440
// semi-random. Give lower weights to known stingy peers.
442
first->second = 1 * ChokeManager::order_base + ::random() % (1 << 10);
449
// Fix this, but for now just use something simple.
451
uint32_t weights_download_choke[ChokeManager::order_max_size] = { 1, 1, 1, 1 };
452
uint32_t weights_download_unchoke[ChokeManager::order_max_size] = { 1, 1, 1, 1 };
455
calculate_download_choke(ChokeManager::iterator first, ChokeManager::iterator last) {
456
while (first != last) {
457
// Very crude version for now.
458
uint32_t downloadRate = first->first->peer_chunks()->download_throttle()->rate()->rate();
459
first->second = ChokeManager::order_base - 1 - downloadRate;
466
calculate_download_unchoke(ChokeManager::iterator first, ChokeManager::iterator last) {
467
while (first != last) {
468
// Very crude version for now.
469
uint32_t downloadRate = first->first->peer_chunks()->download_throttle()->rate()->rate();
470
first->second = downloadRate;