3
Copyright (c) 2007, Arvid Norberg
6
Redistribution and use in source and binary forms, with or without
7
modification, are permitted provided that the following conditions
10
* Redistributions of source code must retain the above copyright
11
notice, this list of conditions and the following disclaimer.
12
* Redistributions in binary form must reproduce the above copyright
13
notice, this list of conditions and the following disclaimer in
14
the documentation and/or other materials provided with the distribution.
15
* Neither the name of the author nor the names of its
16
contributors may be used to endorse or promote products derived
17
from this software without specific prior written permission.
19
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
POSSIBILITY OF SUCH DAMAGE.
33
#ifndef TORRENT_BANDWIDTH_MANAGER_HPP_INCLUDED
34
#define TORRENT_BANDWIDTH_MANAGER_HPP_INCLUDED
36
#include <boost/shared_ptr.hpp>
37
#include <boost/intrusive_ptr.hpp>
38
#include <boost/function.hpp>
39
#include <boost/bind.hpp>
40
#include <boost/integer_traits.hpp>
41
#include <boost/thread/mutex.hpp>
44
#include "libtorrent/socket.hpp"
45
#include "libtorrent/invariant_check.hpp"
46
#include "libtorrent/assert.hpp"
47
#include "libtorrent/bandwidth_limit.hpp"
48
#include "libtorrent/bandwidth_queue_entry.hpp"
50
using boost::weak_ptr;
51
using boost::shared_ptr;
52
using boost::intrusive_ptr;
55
//#define TORRENT_VERBOSE_BANDWIDTH_LIMIT
57
namespace libtorrent {
59
// the maximum block of bandwidth quota to
60
// hand out is 33kB. The block size may
61
// be smaller on lower limits
64
max_bandwidth_block_size = 33000,
65
min_bandwidth_block_size = 400
68
const time_duration bw_window_size = seconds(1);
70
template<class PeerConnection, class Torrent>
73
history_entry(intrusive_ptr<PeerConnection> p, weak_ptr<Torrent> t
75
: expires_at(exp), amount(a), peer(p), tor(t) {}
78
intrusive_ptr<PeerConnection> peer;
79
weak_ptr<Torrent> tor;
83
T clamp(T val, T ceiling, T floor) throw()
85
TORRENT_ASSERT(ceiling >= floor);
86
if (val >= ceiling) return ceiling;
87
else if (val <= floor) return floor;
91
template<class PeerConnection, class Torrent>
92
struct bandwidth_manager
94
bandwidth_manager(io_service& ios, int channel) throw()
96
, m_history_timer(m_ios)
97
, m_limit(bandwidth_limit::inf)
100
, m_in_hand_out_bandwidth(false)
104
void throttle(int limit) throw()
106
mutex_t::scoped_lock l(m_mutex);
107
TORRENT_ASSERT(limit >= 0);
111
int throttle() const throw()
113
mutex_t::scoped_lock l(m_mutex);
123
m_history_timer.cancel();
127
bool is_in_history(PeerConnection const* peer) const
129
mutex_t::scoped_lock l(m_mutex);
130
return is_in_history(peer, l);
133
bool is_in_history(PeerConnection const* peer, boost::mutex::scoped_lock& l) const
135
for (typename history_t::const_iterator i
136
= m_history.begin(), end(m_history.end()); i != end; ++i)
138
if (i->peer.get() == peer) return true;
144
int queue_size() const
146
mutex_t::scoped_lock l(m_mutex);
147
return m_queue.size();
150
// non prioritized means that, if there's a line for bandwidth,
151
// others will cut in front of the non-prioritized peers.
152
// this is used by web seeds
153
void request_bandwidth(intrusive_ptr<PeerConnection> peer
154
, int blk, int priority)
156
mutex_t::scoped_lock l(m_mutex);
159
TORRENT_ASSERT(blk > 0);
161
// make sure this peer isn't already in line
162
// waiting for bandwidth
164
for (typename queue_t::iterator i = m_queue.begin()
165
, end(m_queue.end()); i != end; ++i)
167
TORRENT_ASSERT(i->peer < peer || peer < i->peer);
170
TORRENT_ASSERT(peer->max_assignable_bandwidth(m_channel) > 0);
172
typename queue_t::reverse_iterator i(m_queue.rbegin());
173
while (i != m_queue.rend() && priority > i->priority)
178
m_queue.insert(i.base(), bw_queue_entry<PeerConnection, Torrent>(peer, blk, priority));
179
if (!m_queue.empty()) hand_out_bandwidth(l);
183
void check_invariant() const
185
int current_quota = 0;
186
for (typename history_t::const_iterator i
187
= m_history.begin(), end(m_history.end()); i != end; ++i)
189
current_quota += i->amount;
191
TORRENT_ASSERT(current_quota == m_current_quota);
193
typename queue_t::const_iterator j = m_queue.begin();
194
if (j != m_queue.end())
197
for (typename queue_t::const_iterator i = m_queue.begin()
198
, end(m_queue.end()); i != end && j != end; ++i, ++j)
199
TORRENT_ASSERT(i->priority >= j->priority);
206
void add_history_entry(history_entry<PeerConnection, Torrent> const& e)
210
m_history.push_front(e);
211
m_current_quota += e.amount;
212
// in case the size > 1 there is already a timer
213
// active that will be invoked, no need to set one up
214
if (m_history.size() > 1) return;
218
m_history_timer.expires_at(e.expires_at);
219
m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
221
catch (std::exception&) {}
224
void on_history_expire(asio::error_code const& e)
229
mutex_t::scoped_lock l(m_mutex);
233
TORRENT_ASSERT(!m_history.empty());
235
ptime now(time_now());
236
while (!m_history.empty() && m_history.back().expires_at <= now)
238
history_entry<PeerConnection, Torrent> e = m_history.back();
239
m_history.pop_back();
240
m_current_quota -= e.amount;
241
TORRENT_ASSERT(m_current_quota >= 0);
242
intrusive_ptr<PeerConnection> c = e.peer;
243
shared_ptr<Torrent> t = e.tor.lock();
245
if (!c->is_disconnecting()) c->expire_bandwidth(m_channel, e.amount);
246
if (t) t->expire_bandwidth(m_channel, e.amount);
250
// now, wait for the next chunk to expire
251
if (!m_history.empty() && !m_abort)
253
m_history_timer.expires_at(m_history.back().expires_at);
254
m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
257
// since some bandwidth just expired, it
258
// means we can hand out more (in case there
259
// are still consumers in line)
260
if (!m_queue.empty()) hand_out_bandwidth(l);
262
catch (std::exception&) {}
265
void hand_out_bandwidth(boost::mutex::scoped_lock& l)
267
// if we're already handing out bandwidth, just return back
268
// to the loop further down on the callstack
269
if (m_in_hand_out_bandwidth) return;
270
m_in_hand_out_bandwidth = true;
275
ptime now(time_now());
279
// available bandwidth to hand out
280
int amount = limit - m_current_quota;
282
#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
283
std::cerr << " hand_out_bandwidht. m_queue.size() = " << m_queue.size()
284
<< " amount = " << amount
285
<< " limit = " << limit
286
<< " m_current_quota = " << m_current_quota << std::endl;
291
m_in_hand_out_bandwidth = false;
296
while (!m_queue.empty() && amount > 0)
298
bw_queue_entry<PeerConnection, Torrent> qe = m_queue.front();
299
TORRENT_ASSERT(qe.max_block_size > 0);
302
shared_ptr<Torrent> t = qe.torrent.lock();
304
if (qe.peer->is_disconnecting())
307
t->expire_bandwidth(m_channel, qe.max_block_size);
312
// at this point, max_assignable may actually be zero. Since
313
// the rate limit of the peer might have changed while it
315
int max_assignable = qe.peer->max_assignable_bandwidth(m_channel);
316
if (max_assignable == 0)
318
TORRENT_ASSERT(is_in_history(qe.peer.get(), l));
323
// this is the limit of the block size. It depends on the throttle
324
// so that it can be closer to optimal. Larger block sizes will give lower
325
// granularity to the rate but will be more efficient. At high rates
326
// the block sizes are bigger and at low rates, the granularity
327
// is more important and block sizes are smaller
329
// the minimum rate that can be given is the block size, so, the
330
// block size must be smaller for lower rates. This is because
331
// the history window is one second, and the block will be forgotten
333
int block_size = (std::min)(qe.peer->bandwidth_throttle(m_channel)
336
if (block_size < min_bandwidth_block_size)
338
block_size = (std::min)(int(min_bandwidth_block_size), limit);
340
else if (block_size > max_bandwidth_block_size)
342
if (limit == bandwidth_limit::inf)
344
block_size = max_bandwidth_block_size;
348
// try to make the block_size a divisor of
349
// m_limit to make the distributions as fair
351
// TODO: move this calculcation to where the limit
354
/ (limit / max_bandwidth_block_size);
357
if (block_size > qe.max_block_size) block_size = qe.max_block_size;
359
#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
360
std::cerr << " block_size = " << block_size << " amount = " << amount << std::endl;
363
// so, hand out max_assignable, but no more than
364
// the available bandwidth (amount) and no more
365
// than the max_bandwidth_block_size
366
int hand_out_amount = (std::min)((std::min)(block_size, max_assignable)
368
TORRENT_ASSERT(hand_out_amount > 0);
369
amount -= hand_out_amount;
370
TORRENT_ASSERT(hand_out_amount <= qe.max_block_size);
372
t->assign_bandwidth(m_channel, hand_out_amount, qe.max_block_size);
373
qe.peer->assign_bandwidth(m_channel, hand_out_amount);
375
add_history_entry(history_entry<PeerConnection, Torrent>(
376
qe.peer, t, hand_out_amount, now + bw_window_size));
378
if (!tmp.empty()) m_queue.insert(m_queue.begin(), tmp.begin(), tmp.end());
380
catch (std::exception&)
382
m_in_hand_out_bandwidth = false;
385
m_in_hand_out_bandwidth = false;
389
typedef boost::mutex mutex_t;
390
mutable mutex_t m_mutex;
392
// the io_service used for the timer
395
// the timer that is waiting for the entries
396
// in the history queue to expire (slide out
397
// of the history window)
398
deadline_timer m_history_timer;
400
// the rate limit (bytes per second)
403
// the sum of all recently handed out bandwidth blocks
406
// these are the consumers that want bandwidth
407
typedef std::deque<bw_queue_entry<PeerConnection, Torrent> > queue_t;
410
// these are the consumers that have received bandwidth
412
typedef std::deque<history_entry<PeerConnection, Torrent> > history_t;
415
// this is the channel within the consumers
416
// that bandwidth is assigned to (upload or download)
419
// this is true while we're in the hand_out_bandwidth loop
420
// to prevent recursive invocations to interfere
421
bool m_in_hand_out_bandwidth;