3
Copyright (c) 2007, Arvid Norberg
6
Redistribution and use in source and binary forms, with or without
7
modification, are permitted provided that the following conditions
10
* Redistributions of source code must retain the above copyright
11
notice, this list of conditions and the following disclaimer.
12
* Redistributions in binary form must reproduce the above copyright
13
notice, this list of conditions and the following disclaimer in
14
the documentation and/or other materials provided with the distribution.
15
* Neither the name of the author nor the names of its
16
contributors may be used to endorse or promote products derived
17
from this software without specific prior written permission.
19
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
POSSIBILITY OF SUCH DAMAGE.
33
#include "libtorrent/storage.hpp"
35
#include "libtorrent/disk_io_thread.hpp"
37
#ifdef TORRENT_DISK_STATS
39
#include "libtorrent/time.hpp"
46
disk_io_thread::disk_io_thread(int block_size)
48
, m_queue_buffer_size(0)
51
, m_block_size(block_size)
53
, m_disk_io_thread(boost::ref(*this))
58
#ifdef TORRENT_DISK_STATS
59
m_log.open("disk_io_thread.log", std::ios::trunc);
63
disk_io_thread::~disk_io_thread()
65
TORRENT_ASSERT(m_abort == true);
69
disk_io_job disk_io_thread::find_job(boost::intrusive_ptr<piece_manager> s
70
, int action, int piece) const
72
mutex_t::scoped_lock l(m_mutex);
73
for (std::list<disk_io_job>::const_iterator i = m_jobs.begin();
74
i != m_jobs.end(); ++i)
78
if ((i->action == action || action == -1) && i->piece == piece)
81
if ((m_current.action == action || action == -1)
82
&& m_current.piece == piece)
86
ret.action = (disk_io_job::action_t)-1;
93
void disk_io_thread::join()
95
mutex_t::scoped_lock l(m_mutex);
97
m_signal.notify_all();
100
m_disk_io_thread.join();
103
// aborts read operations
104
void disk_io_thread::stop(boost::intrusive_ptr<piece_manager> s)
106
mutex_t::scoped_lock l(m_mutex);
107
// read jobs are aborted, write and move jobs are syncronized
108
for (std::list<disk_io_job>::iterator i = m_jobs.begin();
116
if (i->action == disk_io_job::read)
124
m_signal.notify_all();
127
bool range_overlap(int start1, int length1, int start2, int length2)
129
return (start1 <= start2 && start1 + length1 > start2)
130
|| (start2 <= start1 && start2 + length2 > start1);
135
// The semantic of this operator is:
136
// shouls lhs come before rhs in the job queue
137
bool operator<(disk_io_job const& lhs, disk_io_job const& rhs)
139
// NOTE: comparison inverted to make higher priority
140
// skip _in_front_of_ lower priority
141
if (lhs.priority > rhs.priority) return true;
142
if (lhs.priority < rhs.priority) return false;
144
if (lhs.storage.get() < rhs.storage.get()) return true;
145
if (lhs.storage.get() > rhs.storage.get()) return false;
146
if (lhs.piece < rhs.piece) return true;
147
if (lhs.piece > rhs.piece) return false;
148
if (lhs.offset < rhs.offset) return true;
149
// if (lhs.offset > rhs.offset) return false;
154
void disk_io_thread::add_job(disk_io_job const& j
155
, boost::function<void(int, disk_io_job const&)> const& f)
157
TORRENT_ASSERT(!j.callback);
158
TORRENT_ASSERT(j.storage);
159
mutex_t::scoped_lock l(m_mutex);
161
std::list<disk_io_job>::reverse_iterator i = m_jobs.rbegin();
162
if (j.action == disk_io_job::read)
164
// when we're reading, we may not skip
165
// ahead of any write operation that overlaps
166
// the region we're reading
167
for (; i != m_jobs.rend(); i++)
169
// if *i should come before j, stop
170
// and insert j before i
172
// if we come across a write operation that
173
// overlaps the region we're reading, we need
175
if (i->action == disk_io_job::write
176
&& i->storage == j.storage
177
&& i->piece == j.piece
178
&& range_overlap(i->offset, i->buffer_size
179
, j.offset, j.buffer_size))
183
else if (j.action == disk_io_job::write)
185
for (; i != m_jobs.rend(); ++i)
189
if (i != m_jobs.rbegin()
190
&& i.base()->storage.get() != j.storage.get())
197
// if we are placed in front of all other jobs, put it on the back of
198
// the queue, to sweep the disk in the same direction, and to avoid
199
// starvation. The exception is if the priority is higher than the
200
// job at the front of the queue
201
if (i == m_jobs.rend() && (m_jobs.empty() || j.priority <= m_jobs.back().priority))
204
std::list<disk_io_job>::iterator k = m_jobs.insert(i.base(), j);
205
k->callback.swap(const_cast<boost::function<void(int, disk_io_job const&)>&>(f));
206
if (j.action == disk_io_job::write)
207
m_queue_buffer_size += j.buffer_size;
208
TORRENT_ASSERT(j.storage.get());
209
m_signal.notify_all();
212
char* disk_io_thread::allocate_buffer()
214
mutex_t::scoped_lock l(m_mutex);
218
return (char*)m_pool.ordered_malloc();
221
void disk_io_thread::free_buffer(char* buf)
223
mutex_t::scoped_lock l(m_mutex);
227
m_pool.ordered_free(buf);
230
void disk_io_thread::operator()()
234
#ifdef TORRENT_DISK_STATS
235
m_log << log_time() << " idle" << std::endl;
237
mutex_t::scoped_lock l(m_mutex);
239
m_current.action = (disk_io_job::action_t)-1;
240
m_current.piece = -1;
242
while (m_jobs.empty() && !m_abort)
244
if (m_abort && m_jobs.empty()) return;
246
boost::function<void(int, disk_io_job const&)> handler;
247
handler.swap(m_jobs.front().callback);
249
m_current = m_jobs.front();
251
disk_io_job j = m_jobs.front();
253
m_queue_buffer_size -= j.buffer_size;
258
bool free_current_buffer = true;
261
TORRENT_ASSERT(j.storage);
262
#ifdef TORRENT_DISK_STATS
263
ptime start = time_now();
265
// std::cerr << "DISK THREAD: executing job: " << j.action << std::endl;
268
case disk_io_job::read:
269
#ifdef TORRENT_DISK_STATS
270
m_log << log_time() << " read " << j.buffer_size << std::endl;
272
free_current_buffer = false;
275
j.buffer = allocate_buffer();
276
TORRENT_ASSERT(j.buffer_size <= m_block_size);
280
j.str = "out of memory";
284
ret = int(j.storage->read_impl(j.buffer, j.piece, j.offset
287
// simulates slow drives
290
case disk_io_job::write:
291
#ifdef TORRENT_DISK_STATS
292
m_log << log_time() << " write " << j.buffer_size << std::endl;
294
TORRENT_ASSERT(j.buffer);
295
TORRENT_ASSERT(j.buffer_size <= m_block_size);
296
j.storage->write_impl(j.buffer, j.piece, j.offset
299
// simulates a slow drive
302
case disk_io_job::hash:
304
#ifdef TORRENT_DISK_STATS
305
m_log << log_time() << " hash" << std::endl;
307
sha1_hash h = j.storage->hash_for_piece_impl(j.piece);
309
std::memcpy(&j.str[0], &h[0], 20);
312
case disk_io_job::move_storage:
313
#ifdef TORRENT_DISK_STATS
314
m_log << log_time() << " move" << std::endl;
316
ret = j.storage->move_storage_impl(j.str) ? 1 : 0;
317
j.str = j.storage->save_path().string();
319
case disk_io_job::release_files:
320
#ifdef TORRENT_DISK_STATS
321
m_log << log_time() << " release" << std::endl;
323
j.storage->release_files_impl();
325
case disk_io_job::delete_files:
326
#ifdef TORRENT_DISK_STATS
327
m_log << log_time() << " delete" << std::endl;
329
j.storage->delete_files_impl();
333
catch (std::exception& e)
335
// std::cerr << "DISK THREAD: exception: " << e.what() << std::endl;
340
catch (std::exception&) {}
344
// if (!handler) std::cerr << "DISK THREAD: no callback specified" << std::endl;
345
// else std::cerr << "DISK THREAD: invoking callback" << std::endl;
346
try { if (handler) handler(ret, j); }
347
catch (std::exception&) {}
350
m_current.storage = 0;
351
m_current.callback.clear();
354
if (j.buffer && free_current_buffer) free_buffer(j.buffer);