~ubuntu-branches/ubuntu/maverick/libtorrent-rasterbar/maverick

« back to all changes in this revision

Viewing changes to include/libtorrent/bandwidth_manager.hpp

  • Committer: Bazaar Package Importer
  • Author(s): Cristian Greco
  • Date: 2008-07-02 10:46:21 UTC
  • Revision ID: james.westby@ubuntu.com-20080702104621-jzx3pfke9lkcxfxn
Tags: upstream-0.13.1
ImportĀ upstreamĀ versionĀ 0.13.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 
 
3
Copyright (c) 2007, Arvid Norberg
 
4
All rights reserved.
 
5
 
 
6
Redistribution and use in source and binary forms, with or without
 
7
modification, are permitted provided that the following conditions
 
8
are met:
 
9
 
 
10
    * Redistributions of source code must retain the above copyright
 
11
      notice, this list of conditions and the following disclaimer.
 
12
    * Redistributions in binary form must reproduce the above copyright
 
13
      notice, this list of conditions and the following disclaimer in
 
14
      the documentation and/or other materials provided with the distribution.
 
15
    * Neither the name of the author nor the names of its
 
16
      contributors may be used to endorse or promote products derived
 
17
      from this software without specific prior written permission.
 
18
 
 
19
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 
20
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 
21
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 
22
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 
23
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 
24
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 
25
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 
26
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 
27
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 
28
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
29
POSSIBILITY OF SUCH DAMAGE.
 
30
 
 
31
*/
 
32
 
 
33
#ifndef TORRENT_BANDWIDTH_MANAGER_HPP_INCLUDED
 
34
#define TORRENT_BANDWIDTH_MANAGER_HPP_INCLUDED
 
35
 
 
36
#include <boost/shared_ptr.hpp>
 
37
#include <boost/intrusive_ptr.hpp>
 
38
#include <boost/function.hpp>
 
39
#include <boost/bind.hpp>
 
40
#include <boost/integer_traits.hpp>
 
41
#include <boost/thread/mutex.hpp>
 
42
#include <deque>
 
43
 
 
44
#include "libtorrent/socket.hpp"
 
45
#include "libtorrent/invariant_check.hpp"
 
46
#include "libtorrent/assert.hpp"
 
47
#include "libtorrent/bandwidth_limit.hpp"
 
48
#include "libtorrent/bandwidth_queue_entry.hpp"
 
49
 
 
50
using boost::weak_ptr;
 
51
using boost::shared_ptr;
 
52
using boost::intrusive_ptr;
 
53
using boost::bind;
 
54
 
 
55
//#define TORRENT_VERBOSE_BANDWIDTH_LIMIT
 
56
 
 
57
namespace libtorrent {
 
58
 
 
59
// the maximum block of bandwidth quota to
 
60
// hand out is 33kB. The block size may
 
61
// be smaller on lower limits
 
62
enum
 
63
{
 
64
        max_bandwidth_block_size = 33000,
 
65
        min_bandwidth_block_size = 400
 
66
};
 
67
 
 
68
const time_duration bw_window_size = seconds(1);
 
69
 
 
70
template<class PeerConnection, class Torrent>
 
71
struct history_entry
 
72
{
 
73
        history_entry(intrusive_ptr<PeerConnection> p, weak_ptr<Torrent> t
 
74
                , int a, ptime exp)
 
75
                : expires_at(exp), amount(a), peer(p), tor(t) {}
 
76
        ptime expires_at;
 
77
        int amount;
 
78
        intrusive_ptr<PeerConnection> peer;
 
79
        weak_ptr<Torrent> tor;
 
80
};
 
81
 
 
82
template<class T>
 
83
T clamp(T val, T ceiling, T floor) throw()
 
84
{
 
85
        TORRENT_ASSERT(ceiling >= floor);
 
86
        if (val >= ceiling) return ceiling;
 
87
        else if (val <= floor) return floor;
 
88
        return val;
 
89
}
 
90
 
 
91
template<class PeerConnection, class Torrent>
 
92
struct bandwidth_manager
 
93
{
 
94
        bandwidth_manager(io_service& ios, int channel) throw()
 
95
                : m_ios(ios)
 
96
                , m_history_timer(m_ios)
 
97
                , m_limit(bandwidth_limit::inf)
 
98
                , m_current_quota(0)
 
99
                , m_channel(channel)
 
100
                , m_in_hand_out_bandwidth(false)
 
101
                , m_abort(false)
 
102
        {}
 
103
 
 
104
        void throttle(int limit) throw()
 
105
        {
 
106
                mutex_t::scoped_lock l(m_mutex);
 
107
                TORRENT_ASSERT(limit >= 0);
 
108
                m_limit = limit;
 
109
        }
 
110
        
 
111
        int throttle() const throw()
 
112
        {
 
113
                mutex_t::scoped_lock l(m_mutex);
 
114
                return m_limit;
 
115
        }
 
116
 
 
117
        void close()
 
118
        {
 
119
                m_abort = true;
 
120
                m_queue.clear();
 
121
                m_history.clear();
 
122
                m_current_quota = 0;
 
123
                m_history_timer.cancel();
 
124
        }
 
125
 
 
126
#ifndef NDEBUG
 
127
        bool is_in_history(PeerConnection const* peer) const
 
128
        {
 
129
                mutex_t::scoped_lock l(m_mutex);
 
130
                return is_in_history(peer, l);
 
131
        }
 
132
 
 
133
        bool is_in_history(PeerConnection const* peer, boost::mutex::scoped_lock& l) const
 
134
        {
 
135
                for (typename history_t::const_iterator i
 
136
                        = m_history.begin(), end(m_history.end()); i != end; ++i)
 
137
                {
 
138
                        if (i->peer.get() == peer) return true;
 
139
                }
 
140
                return false;
 
141
        }
 
142
#endif
 
143
 
 
144
        int queue_size() const
 
145
        {
 
146
                mutex_t::scoped_lock l(m_mutex);
 
147
                return m_queue.size();
 
148
        }
 
149
 
 
150
        // non prioritized means that, if there's a line for bandwidth,
 
151
        // others will cut in front of the non-prioritized peers.
 
152
        // this is used by web seeds
 
153
        void request_bandwidth(intrusive_ptr<PeerConnection> peer
 
154
                , int blk, int priority)
 
155
        {
 
156
                mutex_t::scoped_lock l(m_mutex);
 
157
                INVARIANT_CHECK;
 
158
                if (m_abort) return;
 
159
                TORRENT_ASSERT(blk > 0);
 
160
 
 
161
                // make sure this peer isn't already in line
 
162
                // waiting for bandwidth
 
163
#ifndef NDEBUG
 
164
                for (typename queue_t::iterator i = m_queue.begin()
 
165
                        , end(m_queue.end()); i != end; ++i)
 
166
                {
 
167
                        TORRENT_ASSERT(i->peer < peer || peer < i->peer);
 
168
                }
 
169
#endif
 
170
                TORRENT_ASSERT(peer->max_assignable_bandwidth(m_channel) > 0);
 
171
 
 
172
                typename queue_t::reverse_iterator i(m_queue.rbegin());
 
173
                while (i != m_queue.rend() && priority > i->priority)
 
174
                {
 
175
                        ++i->priority;
 
176
                        ++i;
 
177
                }
 
178
                m_queue.insert(i.base(), bw_queue_entry<PeerConnection, Torrent>(peer, blk, priority));
 
179
                if (!m_queue.empty()) hand_out_bandwidth(l);
 
180
        }
 
181
 
 
182
#ifndef NDEBUG
 
183
        void check_invariant() const
 
184
        {
 
185
                int current_quota = 0;
 
186
                for (typename history_t::const_iterator i
 
187
                        = m_history.begin(), end(m_history.end()); i != end; ++i)
 
188
                {
 
189
                        current_quota += i->amount;
 
190
                }
 
191
                TORRENT_ASSERT(current_quota == m_current_quota);
 
192
 
 
193
                typename queue_t::const_iterator j = m_queue.begin();
 
194
                if (j != m_queue.end())
 
195
                {
 
196
                        ++j;
 
197
                        for (typename queue_t::const_iterator i = m_queue.begin()
 
198
                                , end(m_queue.end()); i != end && j != end; ++i, ++j)
 
199
                                TORRENT_ASSERT(i->priority >= j->priority);
 
200
                }
 
201
        }
 
202
#endif
 
203
 
 
204
private:
 
205
 
 
206
        void add_history_entry(history_entry<PeerConnection, Torrent> const& e)
 
207
        {
 
208
                try {
 
209
                INVARIANT_CHECK;
 
210
                m_history.push_front(e);
 
211
                m_current_quota += e.amount;
 
212
                // in case the size > 1 there is already a timer
 
213
                // active that will be invoked, no need to set one up
 
214
                if (m_history.size() > 1) return;
 
215
 
 
216
                if (m_abort) return;
 
217
 
 
218
                m_history_timer.expires_at(e.expires_at);
 
219
                m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
 
220
                }
 
221
                catch (std::exception&) {}
 
222
        }
 
223
        
 
224
        void on_history_expire(asio::error_code const& e)
 
225
        {
 
226
                try {
 
227
                if (e) return;
 
228
 
 
229
                mutex_t::scoped_lock l(m_mutex);
 
230
                INVARIANT_CHECK;
 
231
                if (m_abort) return;
 
232
 
 
233
                TORRENT_ASSERT(!m_history.empty());
 
234
 
 
235
                ptime now(time_now());
 
236
                while (!m_history.empty() && m_history.back().expires_at <= now)
 
237
                {
 
238
                        history_entry<PeerConnection, Torrent> e = m_history.back();
 
239
                        m_history.pop_back();
 
240
                        m_current_quota -= e.amount;
 
241
                        TORRENT_ASSERT(m_current_quota >= 0);
 
242
                        intrusive_ptr<PeerConnection> c = e.peer;
 
243
                        shared_ptr<Torrent> t = e.tor.lock();
 
244
                        l.unlock();
 
245
                        if (!c->is_disconnecting()) c->expire_bandwidth(m_channel, e.amount);
 
246
                        if (t) t->expire_bandwidth(m_channel, e.amount);
 
247
                        l.lock();
 
248
                }
 
249
                
 
250
                // now, wait for the next chunk to expire
 
251
                if (!m_history.empty() && !m_abort)
 
252
                {
 
253
                        m_history_timer.expires_at(m_history.back().expires_at);
 
254
                        m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
 
255
                }
 
256
 
 
257
                // since some bandwidth just expired, it
 
258
                // means we can hand out more (in case there
 
259
                // are still consumers in line)
 
260
                if (!m_queue.empty()) hand_out_bandwidth(l);
 
261
                }
 
262
                catch (std::exception&) {}
 
263
        }
 
264
 
 
265
        void hand_out_bandwidth(boost::mutex::scoped_lock& l)
 
266
        {
 
267
                // if we're already handing out bandwidth, just return back
 
268
                // to the loop further down on the callstack
 
269
                if (m_in_hand_out_bandwidth) return;
 
270
                m_in_hand_out_bandwidth = true;
 
271
 
 
272
                try {
 
273
                INVARIANT_CHECK;
 
274
 
 
275
                ptime now(time_now());
 
276
 
 
277
                int limit = m_limit;
 
278
 
 
279
                // available bandwidth to hand out
 
280
                int amount = limit - m_current_quota;
 
281
 
 
282
#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
 
283
                std::cerr << " hand_out_bandwidht. m_queue.size() = " << m_queue.size()
 
284
                        << " amount = " << amount
 
285
                        << " limit = " << limit
 
286
                        << " m_current_quota = " << m_current_quota << std::endl;
 
287
#endif
 
288
 
 
289
                if (amount <= 0)
 
290
                {
 
291
                        m_in_hand_out_bandwidth = false;
 
292
                        return;
 
293
                }
 
294
 
 
295
                queue_t tmp;
 
296
                while (!m_queue.empty() && amount > 0)
 
297
                {
 
298
                        bw_queue_entry<PeerConnection, Torrent> qe = m_queue.front();
 
299
                        TORRENT_ASSERT(qe.max_block_size > 0);
 
300
                        m_queue.pop_front();
 
301
 
 
302
                        shared_ptr<Torrent> t = qe.torrent.lock();
 
303
                        if (!t) continue;
 
304
                        if (qe.peer->is_disconnecting())
 
305
                        {
 
306
                                l.unlock();
 
307
                                t->expire_bandwidth(m_channel, qe.max_block_size);
 
308
                                l.lock();
 
309
                                continue;
 
310
                        }
 
311
 
 
312
                        // at this point, max_assignable may actually be zero. Since
 
313
                        // the rate limit of the peer might have changed while it
 
314
                        // was in the queue.
 
315
                        int max_assignable = qe.peer->max_assignable_bandwidth(m_channel);
 
316
                        if (max_assignable == 0)
 
317
                        {
 
318
                                TORRENT_ASSERT(is_in_history(qe.peer.get(), l));
 
319
                                tmp.push_back(qe);
 
320
                                continue;
 
321
                        }
 
322
 
 
323
                        // this is the limit of the block size. It depends on the throttle
 
324
                        // so that it can be closer to optimal. Larger block sizes will give lower
 
325
                        // granularity to the rate but will be more efficient. At high rates
 
326
                        // the block sizes are bigger and at low rates, the granularity
 
327
                        // is more important and block sizes are smaller
 
328
 
 
329
                        // the minimum rate that can be given is the block size, so, the
 
330
                        // block size must be smaller for lower rates. This is because
 
331
                        // the history window is one second, and the block will be forgotten
 
332
                        // after one second.
 
333
                        int block_size = (std::min)(qe.peer->bandwidth_throttle(m_channel)
 
334
                                , limit / 10);
 
335
 
 
336
                        if (block_size < min_bandwidth_block_size)
 
337
                        {
 
338
                                block_size = (std::min)(int(min_bandwidth_block_size), limit);
 
339
                        }
 
340
                        else if (block_size > max_bandwidth_block_size)
 
341
                        {
 
342
                                if (limit == bandwidth_limit::inf)
 
343
                                {
 
344
                                        block_size = max_bandwidth_block_size;
 
345
                                }
 
346
                                else
 
347
                                {
 
348
                                        // try to make the block_size a divisor of
 
349
                                        // m_limit to make the distributions as fair
 
350
                                        // as possible
 
351
                                        // TODO: move this calculcation to where the limit
 
352
                                        // is changed
 
353
                                        block_size = limit
 
354
                                                / (limit / max_bandwidth_block_size);
 
355
                                }
 
356
                        }
 
357
                        if (block_size > qe.max_block_size) block_size = qe.max_block_size;
 
358
 
 
359
#ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
 
360
                        std::cerr << " block_size = " << block_size << " amount = " << amount << std::endl;
 
361
#endif
 
362
 
 
363
                        // so, hand out max_assignable, but no more than
 
364
                        // the available bandwidth (amount) and no more
 
365
                        // than the max_bandwidth_block_size
 
366
                        int hand_out_amount = (std::min)((std::min)(block_size, max_assignable)
 
367
                                , amount);
 
368
                        TORRENT_ASSERT(hand_out_amount > 0);
 
369
                        amount -= hand_out_amount;
 
370
                        TORRENT_ASSERT(hand_out_amount <= qe.max_block_size);
 
371
                        l.unlock();
 
372
                        t->assign_bandwidth(m_channel, hand_out_amount, qe.max_block_size);
 
373
                        qe.peer->assign_bandwidth(m_channel, hand_out_amount);
 
374
                        l.lock();
 
375
                        add_history_entry(history_entry<PeerConnection, Torrent>(
 
376
                                qe.peer, t, hand_out_amount, now + bw_window_size));
 
377
                }
 
378
                if (!tmp.empty()) m_queue.insert(m_queue.begin(), tmp.begin(), tmp.end());
 
379
                }
 
380
                catch (std::exception&)
 
381
                {
 
382
                        m_in_hand_out_bandwidth = false;
 
383
                        throw;
 
384
                }
 
385
                m_in_hand_out_bandwidth = false;
 
386
        }
 
387
 
 
388
 
 
389
        typedef boost::mutex mutex_t;
 
390
        mutable mutex_t m_mutex;
 
391
 
 
392
        // the io_service used for the timer
 
393
        io_service& m_ios;
 
394
 
 
395
        // the timer that is waiting for the entries
 
396
        // in the history queue to expire (slide out
 
397
        // of the history window)
 
398
        deadline_timer m_history_timer;
 
399
 
 
400
        // the rate limit (bytes per second)
 
401
        int m_limit;
 
402
 
 
403
        // the sum of all recently handed out bandwidth blocks
 
404
        int m_current_quota;
 
405
 
 
406
        // these are the consumers that want bandwidth
 
407
        typedef std::deque<bw_queue_entry<PeerConnection, Torrent> > queue_t;
 
408
        queue_t m_queue;
 
409
 
 
410
        // these are the consumers that have received bandwidth
 
411
        // that will expire
 
412
        typedef std::deque<history_entry<PeerConnection, Torrent> > history_t;
 
413
        history_t m_history;
 
414
 
 
415
        // this is the channel within the consumers
 
416
        // that bandwidth is assigned to (upload or download)
 
417
        int m_channel;
 
418
 
 
419
        // this is true while we're in the hand_out_bandwidth loop
 
420
        // to prevent recursive invocations to interfere
 
421
        bool m_in_hand_out_bandwidth;
 
422
 
 
423
        bool m_abort;
 
424
};
 
425
 
 
426
}
 
427
 
 
428
#endif
 
429