~ubuntu-branches/ubuntu/maverick/libtorrent-rasterbar/maverick

« back to all changes in this revision

Viewing changes to src/disk_io_thread.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Cristian Greco
  • Date: 2008-07-02 10:46:21 UTC
  • Revision ID: james.westby@ubuntu.com-20080702104621-jzx3pfke9lkcxfxn
Tags: upstream-0.13.1
ImportĀ upstreamĀ versionĀ 0.13.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 
 
3
Copyright (c) 2007, Arvid Norberg
 
4
All rights reserved.
 
5
 
 
6
Redistribution and use in source and binary forms, with or without
 
7
modification, are permitted provided that the following conditions
 
8
are met:
 
9
 
 
10
    * Redistributions of source code must retain the above copyright
 
11
      notice, this list of conditions and the following disclaimer.
 
12
    * Redistributions in binary form must reproduce the above copyright
 
13
      notice, this list of conditions and the following disclaimer in
 
14
      the documentation and/or other materials provided with the distribution.
 
15
    * Neither the name of the author nor the names of its
 
16
      contributors may be used to endorse or promote products derived
 
17
      from this software without specific prior written permission.
 
18
 
 
19
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 
20
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 
21
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 
22
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 
23
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 
24
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 
25
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 
26
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 
27
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 
28
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
29
POSSIBILITY OF SUCH DAMAGE.
 
30
 
 
31
*/
 
32
 
 
33
#include "libtorrent/storage.hpp"
 
34
#include <deque>
 
35
#include "libtorrent/disk_io_thread.hpp"
 
36
 
 
37
#ifdef TORRENT_DISK_STATS
 
38
 
 
39
#include "libtorrent/time.hpp"
 
40
 
 
41
#endif
 
42
 
 
43
namespace libtorrent
 
44
{
 
45
 
 
46
        disk_io_thread::disk_io_thread(int block_size)
 
47
                : m_abort(false)
 
48
                , m_queue_buffer_size(0)
 
49
                , m_pool(block_size)
 
50
#ifndef NDEBUG
 
51
                , m_block_size(block_size)
 
52
#endif
 
53
                , m_disk_io_thread(boost::ref(*this))
 
54
        {
 
55
#ifdef TORRENT_STATS
 
56
                m_allocations = 0;
 
57
#endif
 
58
#ifdef TORRENT_DISK_STATS
 
59
                m_log.open("disk_io_thread.log", std::ios::trunc);
 
60
#endif
 
61
        }
 
62
 
 
63
        disk_io_thread::~disk_io_thread()
 
64
        {
 
65
                TORRENT_ASSERT(m_abort == true);
 
66
        }
 
67
 
 
68
#ifndef NDEBUG
 
69
        disk_io_job disk_io_thread::find_job(boost::intrusive_ptr<piece_manager> s
 
70
                , int action, int piece) const
 
71
        {
 
72
                mutex_t::scoped_lock l(m_mutex);
 
73
                for (std::list<disk_io_job>::const_iterator i = m_jobs.begin();
 
74
                        i != m_jobs.end(); ++i)
 
75
                {
 
76
                        if (i->storage != s)
 
77
                                continue;
 
78
                        if ((i->action == action || action == -1) && i->piece == piece)
 
79
                                return *i;
 
80
                }
 
81
                if ((m_current.action == action || action == -1)
 
82
                        && m_current.piece == piece)
 
83
                        return m_current;
 
84
 
 
85
                disk_io_job ret;
 
86
                ret.action = (disk_io_job::action_t)-1;
 
87
                ret.piece = -1;
 
88
                return ret;
 
89
        }
 
90
 
 
91
#endif
 
92
 
 
93
        void disk_io_thread::join()
 
94
        {
 
95
                mutex_t::scoped_lock l(m_mutex);
 
96
                m_abort = true;
 
97
                m_signal.notify_all();
 
98
                l.unlock();
 
99
 
 
100
                m_disk_io_thread.join();
 
101
        }
 
102
 
 
103
        // aborts read operations
 
104
        void disk_io_thread::stop(boost::intrusive_ptr<piece_manager> s)
 
105
        {
 
106
                mutex_t::scoped_lock l(m_mutex);
 
107
                // read jobs are aborted, write and move jobs are syncronized
 
108
                for (std::list<disk_io_job>::iterator i = m_jobs.begin();
 
109
                        i != m_jobs.end();)
 
110
                {
 
111
                        if (i->storage != s)
 
112
                        {
 
113
                                ++i;
 
114
                                continue;
 
115
                        }
 
116
                        if (i->action == disk_io_job::read)
 
117
                        {
 
118
                                i->callback(-1, *i);
 
119
                                m_jobs.erase(i++);
 
120
                                continue;
 
121
                        }
 
122
                        ++i;
 
123
                }
 
124
                m_signal.notify_all();
 
125
        }
 
126
 
 
127
        bool range_overlap(int start1, int length1, int start2, int length2)
 
128
        {
 
129
                return (start1 <= start2 && start1 + length1 > start2)
 
130
                        || (start2 <= start1 && start2 + length2 > start1);
 
131
        }
 
132
        
 
133
        namespace
 
134
        {
 
135
                // The semantic of this operator is:
 
136
                // shouls lhs come before rhs in the job queue
 
137
                bool operator<(disk_io_job const& lhs, disk_io_job const& rhs)
 
138
                {
 
139
                        // NOTE: comparison inverted to make higher priority
 
140
                        // skip _in_front_of_ lower priority
 
141
                        if (lhs.priority > rhs.priority) return true;
 
142
                        if (lhs.priority < rhs.priority) return false;
 
143
 
 
144
                        if (lhs.storage.get() < rhs.storage.get()) return true;
 
145
                        if (lhs.storage.get() > rhs.storage.get()) return false;
 
146
                        if (lhs.piece < rhs.piece) return true;
 
147
                        if (lhs.piece > rhs.piece) return false;
 
148
                        if (lhs.offset < rhs.offset) return true;
 
149
//                      if (lhs.offset > rhs.offset) return false;
 
150
                        return false;
 
151
                }
 
152
        }
 
153
        
 
154
        void disk_io_thread::add_job(disk_io_job const& j
 
155
                , boost::function<void(int, disk_io_job const&)> const& f)
 
156
        {
 
157
                TORRENT_ASSERT(!j.callback);
 
158
                TORRENT_ASSERT(j.storage);
 
159
                mutex_t::scoped_lock l(m_mutex);
 
160
                
 
161
                std::list<disk_io_job>::reverse_iterator i = m_jobs.rbegin();
 
162
                if (j.action == disk_io_job::read)
 
163
                {
 
164
                        // when we're reading, we may not skip
 
165
                        // ahead of any write operation that overlaps
 
166
                        // the region we're reading
 
167
                        for (; i != m_jobs.rend(); i++)
 
168
                        {
 
169
                                // if *i should come before j, stop
 
170
                                // and insert j before i
 
171
                                if (*i < j) break;
 
172
                                // if we come across a write operation that
 
173
                                // overlaps the region we're reading, we need
 
174
                                // to stop
 
175
                                if (i->action == disk_io_job::write
 
176
                                        && i->storage == j.storage
 
177
                                        && i->piece == j.piece
 
178
                                        && range_overlap(i->offset, i->buffer_size
 
179
                                                , j.offset, j.buffer_size))
 
180
                                        break;
 
181
                        }
 
182
                }
 
183
                else if (j.action == disk_io_job::write)
 
184
                {
 
185
                        for (; i != m_jobs.rend(); ++i)
 
186
                        {
 
187
                                if (*i < j)
 
188
                                {
 
189
                                        if (i != m_jobs.rbegin()
 
190
                                                && i.base()->storage.get() != j.storage.get())
 
191
                                                i = m_jobs.rbegin();
 
192
                                        break;
 
193
                                }
 
194
                        }
 
195
                }
 
196
                
 
197
                // if we are placed in front of all other jobs, put it on the back of
 
198
                // the queue, to sweep the disk in the same direction, and to avoid
 
199
                // starvation. The exception is if the priority is higher than the
 
200
                // job at the front of the queue
 
201
                if (i == m_jobs.rend() && (m_jobs.empty() || j.priority <= m_jobs.back().priority))
 
202
                        i = m_jobs.rbegin();
 
203
 
 
204
                std::list<disk_io_job>::iterator k = m_jobs.insert(i.base(), j);
 
205
                k->callback.swap(const_cast<boost::function<void(int, disk_io_job const&)>&>(f));
 
206
                if (j.action == disk_io_job::write)
 
207
                        m_queue_buffer_size += j.buffer_size;
 
208
                TORRENT_ASSERT(j.storage.get());
 
209
                m_signal.notify_all();
 
210
        }
 
211
 
 
212
        char* disk_io_thread::allocate_buffer()
 
213
        {
 
214
                mutex_t::scoped_lock l(m_mutex);
 
215
#ifdef TORRENT_STATS
 
216
                ++m_allocations;
 
217
#endif
 
218
                return (char*)m_pool.ordered_malloc();
 
219
        }
 
220
 
 
221
        void disk_io_thread::free_buffer(char* buf)
 
222
        {
 
223
                mutex_t::scoped_lock l(m_mutex);
 
224
#ifdef TORRENT_STATS
 
225
                --m_allocations;
 
226
#endif
 
227
                m_pool.ordered_free(buf);
 
228
        }
 
229
 
 
230
        void disk_io_thread::operator()()
 
231
        {
 
232
                for (;;)
 
233
                {
 
234
#ifdef TORRENT_DISK_STATS
 
235
                        m_log << log_time() << " idle" << std::endl;
 
236
#endif
 
237
                        mutex_t::scoped_lock l(m_mutex);
 
238
#ifndef NDEBUG
 
239
                        m_current.action = (disk_io_job::action_t)-1;
 
240
                        m_current.piece = -1;
 
241
#endif
 
242
                        while (m_jobs.empty() && !m_abort)
 
243
                                m_signal.wait(l);
 
244
                        if (m_abort && m_jobs.empty()) return;
 
245
 
 
246
                        boost::function<void(int, disk_io_job const&)> handler;
 
247
                        handler.swap(m_jobs.front().callback);
 
248
#ifndef NDEBUG
 
249
                        m_current = m_jobs.front();
 
250
#endif
 
251
                        disk_io_job j = m_jobs.front();
 
252
                        m_jobs.pop_front();
 
253
                        m_queue_buffer_size -= j.buffer_size;
 
254
                        l.unlock();
 
255
 
 
256
                        int ret = 0;
 
257
 
 
258
                        bool free_current_buffer = true;
 
259
                        try
 
260
                        {
 
261
                                TORRENT_ASSERT(j.storage);
 
262
#ifdef TORRENT_DISK_STATS
 
263
                                ptime start = time_now();
 
264
#endif
 
265
//                              std::cerr << "DISK THREAD: executing job: " << j.action << std::endl;
 
266
                                switch (j.action)
 
267
                                {
 
268
                                        case disk_io_job::read:
 
269
#ifdef TORRENT_DISK_STATS
 
270
                                                m_log << log_time() << " read " << j.buffer_size << std::endl;
 
271
#endif
 
272
                                                free_current_buffer = false;
 
273
                                                if (j.buffer == 0)
 
274
                                                {
 
275
                                                        j.buffer = allocate_buffer();
 
276
                                                        TORRENT_ASSERT(j.buffer_size <= m_block_size);
 
277
                                                        if (j.buffer == 0)
 
278
                                                        {
 
279
                                                                ret = -1;
 
280
                                                                j.str = "out of memory";
 
281
                                                                break;
 
282
                                                        }
 
283
                                                }
 
284
                                                ret = int(j.storage->read_impl(j.buffer, j.piece, j.offset
 
285
                                                        , j.buffer_size));
 
286
 
 
287
                                                // simulates slow drives
 
288
                                                // usleep(300);
 
289
                                                break;
 
290
                                        case disk_io_job::write:
 
291
#ifdef TORRENT_DISK_STATS
 
292
                                                m_log << log_time() << " write " << j.buffer_size << std::endl;
 
293
#endif
 
294
                                                TORRENT_ASSERT(j.buffer);
 
295
                                                TORRENT_ASSERT(j.buffer_size <= m_block_size);
 
296
                                                j.storage->write_impl(j.buffer, j.piece, j.offset
 
297
                                                        , j.buffer_size);
 
298
                                                
 
299
                                                // simulates a slow drive
 
300
                                                // usleep(300);
 
301
                                                break;
 
302
                                        case disk_io_job::hash:
 
303
                                                {
 
304
#ifdef TORRENT_DISK_STATS
 
305
                                                        m_log << log_time() << " hash" << std::endl;
 
306
#endif
 
307
                                                        sha1_hash h = j.storage->hash_for_piece_impl(j.piece);
 
308
                                                        j.str.resize(20);
 
309
                                                        std::memcpy(&j.str[0], &h[0], 20);
 
310
                                                }
 
311
                                                break;
 
312
                                        case disk_io_job::move_storage:
 
313
#ifdef TORRENT_DISK_STATS
 
314
                                                m_log << log_time() << " move" << std::endl;
 
315
#endif
 
316
                                                ret = j.storage->move_storage_impl(j.str) ? 1 : 0;
 
317
                                                j.str = j.storage->save_path().string();
 
318
                                                break;
 
319
                                        case disk_io_job::release_files:
 
320
#ifdef TORRENT_DISK_STATS
 
321
                                                m_log << log_time() << " release" << std::endl;
 
322
#endif
 
323
                                                j.storage->release_files_impl();
 
324
                                                break;
 
325
                                        case disk_io_job::delete_files:
 
326
#ifdef TORRENT_DISK_STATS
 
327
                                                m_log << log_time() << " delete" << std::endl;
 
328
#endif
 
329
                                                j.storage->delete_files_impl();
 
330
                                                break;
 
331
                                }
 
332
                        }
 
333
                        catch (std::exception& e)
 
334
                        {
 
335
//                              std::cerr << "DISK THREAD: exception: " << e.what() << std::endl;
 
336
                                try
 
337
                                {
 
338
                                        j.str = e.what();
 
339
                                }
 
340
                                catch (std::exception&) {}
 
341
                                ret = -1;
 
342
                        }
 
343
 
 
344
//                      if (!handler) std::cerr << "DISK THREAD: no callback specified" << std::endl;
 
345
//                      else std::cerr << "DISK THREAD: invoking callback" << std::endl;
 
346
                        try { if (handler) handler(ret, j); }
 
347
                        catch (std::exception&) {}
 
348
 
 
349
#ifndef NDEBUG
 
350
                        m_current.storage = 0;
 
351
                        m_current.callback.clear();
 
352
#endif
 
353
                        
 
354
                        if (j.buffer && free_current_buffer) free_buffer(j.buffer);
 
355
                }
 
356
        }
 
357
}
 
358