34
Disk queue elevator patch by Morten Husveit
33
37
#include "libtorrent/storage.hpp"
35
38
#include "libtorrent/disk_io_thread.hpp"
36
39
#include "libtorrent/disk_buffer_holder.hpp"
40
#include "libtorrent/alloca.hpp"
41
#include "libtorrent/invariant_check.hpp"
42
#include "libtorrent/error_code.hpp"
43
#include "libtorrent/file_pool.hpp"
37
44
#include <boost/scoped_array.hpp>
42
#define alloca(s) _alloca(s)
46
46
#ifdef TORRENT_DISK_STATS
47
47
#include "libtorrent/time.hpp"
50
#if TORRENT_USE_MLOCK && !defined TORRENT_WINDOWS
50
54
namespace libtorrent
53
disk_io_thread::disk_io_thread(asio::io_service& ios, int block_size)
56
bool should_cancel_on_abort(disk_io_job const& j);
57
bool is_read_operation(disk_io_job const& j);
58
bool operation_has_buffer(disk_io_job const& j);
60
disk_buffer_pool::disk_buffer_pool(int block_size)
61
: m_block_size(block_size)
63
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
64
, m_pool(block_size, m_settings.cache_buffer_chunk_size)
67
#if defined TORRENT_DISK_STATS || defined TORRENT_STATS
70
#ifdef TORRENT_DISK_STATS
71
m_log.open("disk_buffers.log", std::ios::trunc);
72
m_categories["read cache"] = 0;
73
m_categories["write cache"] = 0;
75
m_disk_access_log.open("disk_access.log", std::ios::trunc);
83
disk_buffer_pool::~disk_buffer_pool()
85
TORRENT_ASSERT(m_magic == 0x1337);
90
#if defined TORRENT_DEBUG || defined TORRENT_DISK_STATS
91
bool disk_buffer_pool::is_disk_buffer(char* buffer
92
,boost::mutex::scoped_lock& l) const
94
TORRENT_ASSERT(m_magic == 0x1337);
95
#ifdef TORRENT_DISK_STATS
96
if (m_buf_to_category.find(buffer)
97
== m_buf_to_category.end()) return false;
99
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
102
return m_pool.is_from(buffer);
106
bool disk_buffer_pool::is_disk_buffer(char* buffer) const
108
mutex_t::scoped_lock l(m_pool_mutex);
109
return is_disk_buffer(buffer, l);
113
char* disk_buffer_pool::allocate_buffer(char const* category)
115
mutex_t::scoped_lock l(m_pool_mutex);
116
TORRENT_ASSERT(m_magic == 0x1337);
117
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
118
char* ret = page_aligned_allocator::malloc(m_block_size);
120
char* ret = (char*)m_pool.ordered_malloc();
121
m_pool.set_next_size(m_settings.cache_buffer_chunk_size);
124
#if TORRENT_USE_MLOCK
125
if (m_settings.lock_disk_cache)
127
#ifdef TORRENT_WINDOWS
128
VirtualLock(ret, m_block_size);
130
mlock(ret, m_block_size);
135
#if defined TORRENT_DISK_STATS || defined TORRENT_STATS
138
#ifdef TORRENT_DISK_STATS
139
++m_categories[category];
140
m_buf_to_category[ret] = category;
141
m_log << log_time() << " " << category << ": " << m_categories[category] << "\n";
143
TORRENT_ASSERT(ret == 0 || is_disk_buffer(ret, l));
147
#ifdef TORRENT_DISK_STATS
148
void disk_buffer_pool::rename_buffer(char* buf, char const* category)
150
mutex_t::scoped_lock l(m_pool_mutex);
151
TORRENT_ASSERT(is_disk_buffer(buf, l));
152
TORRENT_ASSERT(m_categories.find(m_buf_to_category[buf])
153
!= m_categories.end());
154
std::string const& prev_category = m_buf_to_category[buf];
155
--m_categories[prev_category];
156
m_log << log_time() << " " << prev_category << ": " << m_categories[prev_category] << "\n";
158
++m_categories[category];
159
m_buf_to_category[buf] = category;
160
m_log << log_time() << " " << category << ": " << m_categories[category] << "\n";
161
TORRENT_ASSERT(m_categories.find(m_buf_to_category[buf])
162
!= m_categories.end());
166
void disk_buffer_pool::free_buffer(char* buf)
169
mutex_t::scoped_lock l(m_pool_mutex);
170
TORRENT_ASSERT(m_magic == 0x1337);
171
TORRENT_ASSERT(is_disk_buffer(buf, l));
172
#if defined TORRENT_DISK_STATS || defined TORRENT_STATS
175
#ifdef TORRENT_DISK_STATS
176
TORRENT_ASSERT(m_categories.find(m_buf_to_category[buf])
177
!= m_categories.end());
178
std::string const& category = m_buf_to_category[buf];
179
--m_categories[category];
180
m_log << log_time() << " " << category << ": " << m_categories[category] << "\n";
181
m_buf_to_category.erase(buf);
183
#if TORRENT_USE_MLOCK
184
if (m_settings.lock_disk_cache)
186
#ifdef TORRENT_WINDOWS
187
VirtualUnlock(buf, m_block_size);
189
munlock(buf, m_block_size);
193
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
194
page_aligned_allocator::free(buf);
196
m_pool.ordered_free(buf);
201
char* disk_buffer_pool::allocate_buffers(int num_blocks, char const* category)
203
mutex_t::scoped_lock l(m_pool_mutex);
204
TORRENT_ASSERT(m_magic == 0x1337);
205
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
206
char* ret = page_aligned_allocator::malloc(m_block_size * num_blocks);
208
char* ret = (char*)m_pool.ordered_malloc(num_blocks);
209
m_pool.set_next_size(m_settings.cache_buffer_chunk_size);
211
m_in_use += num_blocks;
212
#if TORRENT_USE_MLOCK
213
if (m_settings.lock_disk_cache)
215
#ifdef TORRENT_WINDOWS
216
VirtualLock(ret, m_block_size * num_blocks);
218
mlock(ret, m_block_size * num_blocks);
222
#if defined TORRENT_DISK_STATS || defined TORRENT_STATS
223
m_allocations += num_blocks;
225
#ifdef TORRENT_DISK_STATS
226
m_categories[category] += num_blocks;
227
m_buf_to_category[ret] = category;
228
m_log << log_time() << " " << category << ": " << m_categories[category] << "\n";
230
TORRENT_ASSERT(ret == 0 || is_disk_buffer(ret, l));
234
void disk_buffer_pool::free_buffers(char* buf, int num_blocks)
237
TORRENT_ASSERT(num_blocks >= 1);
238
mutex_t::scoped_lock l(m_pool_mutex);
239
TORRENT_ASSERT(m_magic == 0x1337);
240
TORRENT_ASSERT(is_disk_buffer(buf, l));
241
#if defined TORRENT_DISK_STATS || defined TORRENT_STATS
242
m_allocations -= num_blocks;
244
#ifdef TORRENT_DISK_STATS
245
TORRENT_ASSERT(m_categories.find(m_buf_to_category[buf])
246
!= m_categories.end());
247
std::string const& category = m_buf_to_category[buf];
248
m_categories[category] -= num_blocks;
249
m_log << log_time() << " " << category << ": " << m_categories[category] << "\n";
250
m_buf_to_category.erase(buf);
252
#if TORRENT_USE_MLOCK
253
if (m_settings.lock_disk_cache)
255
#ifdef TORRENT_WINDOWS
256
VirtualUnlock(buf, m_block_size * num_blocks);
258
munlock(buf, m_block_size * num_blocks);
262
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
263
page_aligned_allocator::free(buf);
265
m_pool.ordered_free(buf, num_blocks);
267
m_in_use -= num_blocks;
270
void disk_buffer_pool::release_memory()
272
TORRENT_ASSERT(m_magic == 0x1337);
273
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
274
mutex_t::scoped_lock l(m_pool_mutex);
275
m_pool.release_memory();
279
// ------- disk_io_thread ------
282
disk_io_thread::disk_io_thread(asio::io_service& ios
283
, boost::function<void()> const& queue_callback
286
: disk_buffer_pool(block_size)
288
, m_waiting_to_shutdown(false)
55
289
, m_queue_buffer_size(0)
56
, m_cache_size(512) // 512 * 16kB = 8MB
57
, m_cache_expiry(60) // 1 minute
58
, m_coalesce_writes(true)
59
, m_coalesce_reads(true)
60
, m_use_read_cache(true)
61
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
62
, m_pool(block_size, 16)
64
, m_block_size(block_size)
290
, m_last_file_check(time_now_hires())
292
, m_queue_callback(queue_callback)
66
293
, m_work(io_service::work(m_ios))
67
295
, m_disk_io_thread(boost::ref(*this))
72
297
#ifdef TORRENT_DISK_STATS
73
298
m_log.open("disk_io_thread.log", std::ios::trunc);
80
302
disk_io_thread::~disk_io_thread()
82
304
TORRENT_ASSERT(m_abort == true);
83
TORRENT_ASSERT(m_magic == 0x1337);
89
307
void disk_io_thread::join()
91
309
mutex_t::scoped_lock l(m_queue_mutex);
311
m_waiting_to_shutdown = true;
93
312
j.action = disk_io_job::abort_thread;
94
313
m_jobs.insert(m_jobs.begin(), j);
95
314
m_signal.notify_all();
239
444
mutex_t::scoped_lock l(m_piece_mutex);
244
450
cache_t::iterator i = std::min_element(
245
451
m_pieces.begin(), m_pieces.end()
246
452
, bind(&cached_piece_entry::last_use, _1)
247
453
< bind(&cached_piece_entry::last_use, _2));
248
if (i == m_pieces.end()) return;
454
if (i == m_pieces.end()) break;
249
455
int age = total_seconds(now - i->last_use);
250
if (age < m_cache_expiry) return;
456
if (age < m_settings.cache_expiry) break;
251
457
flush_and_remove(i, l);
463
cache_t::iterator i = std::min_element(
464
m_read_pieces.begin(), m_read_pieces.end()
465
, bind(&cached_piece_entry::last_use, _1)
466
< bind(&cached_piece_entry::last_use, _2));
467
if (i == m_read_pieces.end()) break;
468
int age = total_seconds(now - i->last_use);
469
if (age < m_settings.cache_expiry) break;
471
m_read_pieces.erase(i);
255
void disk_io_thread::free_piece(cached_piece_entry& p, mutex_t::scoped_lock& l)
475
// returns the number of blocks that were freed
476
int disk_io_thread::free_piece(cached_piece_entry& p, mutex_t::scoped_lock& l)
257
478
int piece_size = p.storage->info()->piece_size(p.piece);
258
479
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
260
482
for (int i = 0; i < blocks_in_piece; ++i)
262
if (p.blocks[i] == 0) continue;
263
free_buffer(p.blocks[i]);
484
if (p.blocks[i].buf == 0) continue;
485
free_buffer(p.blocks[i].buf);
266
489
--m_cache_stats.cache_size;
267
490
--m_cache_stats.read_cache_size;
271
bool disk_io_thread::clear_oldest_read_piece(
272
cache_t::iterator ignore
495
// returns the number of blocks that were freed
496
int disk_io_thread::clear_oldest_read_piece(
498
, cache_t::iterator ignore
273
499
, mutex_t::scoped_lock& l)
281
507
if (i != m_read_pieces.end() && i != ignore)
283
509
// don't replace an entry that is less than one second old
284
if (time_now() - i->last_use < seconds(1)) return false;
286
m_read_pieces.erase(i);
292
void disk_io_thread::flush_oldest_piece(mutex_t::scoped_lock& l)
510
if (time_now() - i->last_use < seconds(1)) return 0;
512
if (num_blocks >= i->num_blocks)
514
blocks = free_piece(*i, l);
518
// delete blocks from the start and from the end
519
// until num_blocks have been freed
520
int end = (i->storage->info()->piece_size(i->piece) + m_block_size - 1) / m_block_size - 1;
525
while (i->blocks[start].buf == 0 && start <= end) ++start;
526
if (start > end) break;
527
free_buffer(i->blocks[start].buf);
528
i->blocks[start].buf = 0;
531
--m_cache_stats.cache_size;
532
--m_cache_stats.read_cache_size;
534
if (!num_blocks) break;
536
while (i->blocks[end].buf == 0 && start <= end) --end;
537
if (start > end) break;
538
free_buffer(i->blocks[end].buf);
539
i->blocks[end].buf = 0;
542
--m_cache_stats.cache_size;
543
--m_cache_stats.read_cache_size;
548
if (i->num_blocks == 0) m_read_pieces.erase(i);
554
int contiguous_blocks(disk_io_thread::cached_piece_entry const& b)
558
int blocks_in_piece = (b.storage->info()->piece_size(b.piece) + 16 * 1024 - 1) / (16 * 1024);
559
for (int i = 0; i < blocks_in_piece; ++i)
561
if (b.blocks[i].buf) ++current;
564
if (current > ret) ret = current;
568
if (current > ret) ret = current;
572
int disk_io_thread::flush_contiguous_blocks(disk_io_thread::cache_t::iterator e
573
, mutex_t::scoped_lock& l, int lower_limit)
575
// first find the largest range of contiguous blocks
580
int blocks_in_piece = (e->storage->info()->piece_size(e->piece)
581
+ m_block_size - 1) / m_block_size;
582
for (int i = 0; i < blocks_in_piece; ++i)
584
if (e->blocks[i].buf) ++current;
602
if (len < lower_limit || len <= 0) return 0;
603
len = flush_range(e, pos, pos + len, l);
604
if (e->num_blocks == 0) m_pieces.erase(e);
608
// flushes 'blocks' blocks from the cache
609
int disk_io_thread::flush_cache_blocks(mutex_t::scoped_lock& l
610
, int blocks, cache_t::iterator ignore, int options)
295
612
// first look if there are any read cache entries that can
297
if (clear_oldest_read_piece(m_read_pieces.end(), l)) return;
299
cache_t::iterator i = std::min_element(
300
m_pieces.begin(), m_pieces.end()
301
, bind(&cached_piece_entry::last_use, _1)
302
< bind(&cached_piece_entry::last_use, _2));
303
if (i == m_pieces.end()) return;
304
flush_and_remove(i, l);
617
tmp = clear_oldest_read_piece(blocks, ignore, l);
620
} while (tmp > 0 && blocks > 0);
622
if (options & dont_flush_write_blocks) return ret;
624
if (m_settings.disk_cache_algorithm == session_settings::lru)
628
cache_t::iterator i = std::min_element(
629
m_pieces.begin(), m_pieces.end()
630
, bind(&cached_piece_entry::last_use, _1)
631
< bind(&cached_piece_entry::last_use, _2));
632
if (i == m_pieces.end()) return ret;
633
tmp = flush_and_remove(i, l);
638
else if (m_settings.disk_cache_algorithm == session_settings::largest_contiguous)
642
cache_t::iterator i = std::max_element(
643
m_pieces.begin(), m_pieces.end()
644
, bind(&contiguous_blocks, _1)
645
< bind(&contiguous_blocks, _2));
646
if (i == m_pieces.end()) return ret;
647
tmp = flush_contiguous_blocks(i, l);
307
void disk_io_thread::flush_and_remove(disk_io_thread::cache_t::iterator e
655
int disk_io_thread::flush_and_remove(disk_io_thread::cache_t::iterator e
308
656
, mutex_t::scoped_lock& l)
658
int ret = flush_range(e, 0, INT_MAX, l);
311
659
m_pieces.erase(e);
314
void disk_io_thread::flush(disk_io_thread::cache_t::iterator e
315
, mutex_t::scoped_lock& l)
663
int disk_io_thread::flush_range(disk_io_thread::cache_t::iterator e
664
, int start, int end, mutex_t::scoped_lock& l)
668
TORRENT_ASSERT(start < end);
670
// TODO: copy *e and unlink it before unlocking
318
671
cached_piece_entry& p = *e;
319
672
int piece_size = p.storage->info()->piece_size(p.piece);
320
673
#ifdef TORRENT_DISK_STATS
321
674
m_log << log_time() << " flushing " << piece_size << std::endl;
323
676
TORRENT_ASSERT(piece_size > 0);
324
boost::scoped_array<char> buf;
325
if (m_coalesce_writes) buf.reset(new (std::nothrow) char[piece_size]);
327
678
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
328
679
int buffer_size = 0;
330
for (int i = 0; i <= blocks_in_piece; ++i)
682
boost::scoped_array<char> buf;
683
file::iovec_t* iov = 0;
685
if (m_settings.coalesce_writes) buf.reset(new (std::nothrow) char[piece_size]);
686
else iov = TORRENT_ALLOCA(file::iovec_t, blocks_in_piece);
688
end = (std::min)(end, blocks_in_piece);
689
for (int i = start; i <= end; ++i)
332
if (i == blocks_in_piece || p.blocks[i] == 0)
691
if (i == end || p.blocks[i].buf == 0)
334
693
if (buffer_size == 0) continue;
337
695
TORRENT_ASSERT(buffer_size <= i * m_block_size);
339
p.storage->write_impl(buf.get(), p.piece, (std::min)(
340
i * m_block_size, piece_size) - buffer_size, buffer_size);
699
p.storage->write_impl(iov, p.piece, (std::min)(
700
i * m_block_size, piece_size) - buffer_size, iov_counter);
706
file::iovec_t b = { buf.get(), buffer_size };
707
p.storage->write_impl(&b, p.piece, (std::min)(
708
i * m_block_size, piece_size) - buffer_size, 1);
342
711
++m_cache_stats.writes;
343
712
// std::cerr << " flushing p: " << p.piece << " bytes: " << buffer_size << std::endl;
350
719
TORRENT_ASSERT(offset + block_size > 0);
354
p.storage->write_impl(p.blocks[i], p.piece, i * m_block_size, block_size);
356
++m_cache_stats.writes;
722
iov[iov_counter].iov_base = p.blocks[i].buf;
723
iov[iov_counter].iov_len = block_size;
360
std::memcpy(buf.get() + offset, p.blocks[i], block_size);
728
std::memcpy(buf.get() + offset, p.blocks[i].buf, block_size);
361
729
offset += m_block_size;
362
buffer_size += block_size;
364
free_buffer(p.blocks[i]);
731
buffer_size += block_size;
366
732
TORRENT_ASSERT(p.num_blocks > 0);
368
734
++m_cache_stats.blocks_written;
369
735
--m_cache_stats.cache_size;
740
j.storage = p.storage;
741
j.action = disk_io_job::write;
745
for (int i = start; i < end; ++i)
747
if (p.blocks[i].buf == 0) continue;
748
j.buffer_size = (std::min)(piece_size - i * m_block_size, m_block_size);
749
int result = j.error ? -1 : j.buffer_size;
750
j.offset = i * m_block_size;
751
free_buffer(p.blocks[i].buf);
752
post_callback(p.blocks[i].callback, j, result);
753
p.blocks[i].callback.clear();
371
758
TORRENT_ASSERT(buffer_size == 0);
372
759
// std::cerr << " flushing p: " << p.piece << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
373
760
#ifdef TORRENT_DEBUG
374
for (int i = 0; i < blocks_in_piece; ++i)
375
TORRENT_ASSERT(p.blocks[i] == 0);
761
for (int i = start; i < end; ++i)
762
TORRENT_ASSERT(p.blocks[i].buf == 0);
379
767
// returns -1 on failure
380
int disk_io_thread::cache_block(disk_io_job& j, mutex_t::scoped_lock& l)
768
int disk_io_thread::cache_block(disk_io_job& j
769
, boost::function<void(int,disk_io_job const&)>& handler
770
, mutex_t::scoped_lock& l)
383
773
TORRENT_ASSERT(find_cached_piece(m_pieces, j, l) == m_pieces.end());
405
802
// fills a piece with data from disk, returns the total number of bytes
406
803
// read or -1 if there was an error
407
int disk_io_thread::read_into_piece(cached_piece_entry& p, int start_block, mutex_t::scoped_lock& l)
804
int disk_io_thread::read_into_piece(cached_piece_entry& p, int start_block
805
, int options, int num_blocks, mutex_t::scoped_lock& l)
807
TORRENT_ASSERT(num_blocks > 0);
409
808
int piece_size = p.storage->info()->piece_size(p.piece);
410
809
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
412
811
int end_block = start_block;
815
file::iovec_t* iov = TORRENT_ALLOCA(file::iovec_t, (std::min)(blocks_in_piece - start_block, num_blocks));
817
int piece_offset = start_block * m_block_size;
821
boost::scoped_array<char> buf;
413
822
for (int i = start_block; i < blocks_in_piece
414
&& m_cache_stats.cache_size < m_cache_size; ++i)
823
&& ((options & ignore_cache_size)
824
|| in_use() < m_settings.cache_size); ++i)
826
int block_size = (std::min)(piece_size - piece_offset, m_block_size);
827
TORRENT_ASSERT(piece_offset <= piece_size);
416
829
// this is a block that is already allocated
417
// stop allocating and don't read more than
418
// what we've allocated now
419
if (p.blocks[i]) break;
420
p.blocks[i] = allocate_buffer();
830
// free it an allocate a new one
833
free_buffer(p.blocks[i].buf);
835
--m_cache_stats.cache_size;
836
--m_cache_stats.read_cache_size;
838
p.blocks[i].buf = allocate_buffer("read cache");
422
840
// the allocation failed, break
423
if (p.blocks[i] == 0) break;
841
if (p.blocks[i].buf == 0)
425
847
++m_cache_stats.cache_size;
426
848
++m_cache_stats.read_cache_size;
430
if (end_block == start_block) return -2;
851
iov[iov_counter].iov_base = p.blocks[i].buf;
852
iov[iov_counter].iov_len = block_size;
854
piece_offset += m_block_size;
855
if (num_read >= num_blocks) break;
858
if (end_block == start_block)
860
// something failed. Free all buffers
866
TORRENT_ASSERT(iov_counter <= (std::min)(blocks_in_piece - start_block, num_blocks));
432
868
// the buffer_size is the size of the buffer we need to read
433
869
// all these blocks.
434
870
const int buffer_size = (std::min)((end_block - start_block) * m_block_size
435
871
, piece_size - start_block * m_block_size);
872
TORRENT_ASSERT(buffer_size > 0);
436
873
TORRENT_ASSERT(buffer_size <= piece_size);
437
874
TORRENT_ASSERT(buffer_size + start_block * m_block_size <= piece_size);
438
boost::scoped_array<char> buf;
439
if (m_coalesce_reads) buf.reset(new (std::nothrow) char[buffer_size]);
876
if (m_settings.coalesce_reads)
877
buf.reset(new (std::nothrow) char[buffer_size]);
444
ret += p.storage->read_impl(buf.get(), p.piece, start_block * m_block_size, buffer_size);
882
file::iovec_t b = { buf.get(), buffer_size };
883
ret = p.storage->read_impl(&b, p.piece, start_block * m_block_size, 1);
446
if (p.storage->error()) { return -1; }
447
885
++m_cache_stats.reads;
886
if (p.storage->error())
892
if (ret != buffer_size)
894
// this means the file wasn't big enough for this read
895
p.storage->get_storage_impl()->set_error(""
896
, errors::file_too_short);
450
int piece_offset = start_block * m_block_size;
452
for (int i = start_block; i < end_block; ++i)
902
for (int i = 0; i < iov_counter; ++i)
904
TORRENT_ASSERT(iov[i].iov_base);
905
TORRENT_ASSERT(iov[i].iov_len > 0);
906
TORRENT_ASSERT(offset + iov[i].iov_len <= buffer_size);
907
std::memcpy(iov[i].iov_base, buf.get() + offset, iov[i].iov_len);
908
offset += iov[i].iov_len;
454
int block_size = (std::min)(piece_size - piece_offset, m_block_size);
455
if (p.blocks[i] == 0) break;
456
TORRENT_ASSERT(offset <= buffer_size);
457
TORRENT_ASSERT(piece_offset <= piece_size);
458
TORRENT_ASSERT(offset + block_size <= buffer_size);
461
std::memcpy(p.blocks[i], buf.get() + offset, block_size);
466
ret += p.storage->read_impl(p.blocks[i], p.piece, piece_offset, block_size);
467
if (p.storage->error()) { return -1; }
469
++m_cache_stats.reads;
471
offset += m_block_size;
472
piece_offset += m_block_size;
914
ret = p.storage->read_impl(iov, p.piece, start_block * m_block_size, iov_counter);
916
++m_cache_stats.reads;
917
if (p.storage->error())
923
if (ret != buffer_size)
925
// this means the file wasn't big enough for this read
926
p.storage->get_storage_impl()->set_error(""
927
, errors::file_too_short);
474
TORRENT_ASSERT(ret <= buffer_size);
475
return (ret != buffer_size) ? -1 : ret;
933
TORRENT_ASSERT(ret == buffer_size);
478
bool disk_io_thread::make_room(int num_blocks
479
, cache_t::iterator ignore
480
, mutex_t::scoped_lock& l)
937
// returns -1 on read error, -2 on out of memory error or the number of bytes read
938
// this function ignores the cache size limit, it will read the entire
939
// piece regardless of the offset in j
940
// this is used for seed-mode, where we need to read the entire piece to calculate
942
int disk_io_thread::cache_read_piece(disk_io_job const& j, mutex_t::scoped_lock& l)
482
if (m_cache_size - m_cache_stats.cache_size < num_blocks)
484
// there's not enough room in the cache, clear a piece
485
// from the read cache
486
if (!clear_oldest_read_piece(ignore, l)) return false;
489
return m_cache_size - m_cache_stats.cache_size >= num_blocks;
946
int piece_size = j.storage->info()->piece_size(j.piece);
947
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
949
if (in_use() + blocks_in_piece > m_settings.cache_size)
950
flush_cache_blocks(l, in_use() + blocks_in_piece - m_settings.cache_size, m_read_pieces.end());
952
cached_piece_entry p;
954
p.storage = j.storage;
955
p.last_use = time_now();
957
p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]);
958
if (!p.blocks) return -1;
959
int ret = read_into_piece(p, 0, ignore_cache_size, INT_MAX, l);
961
if (ret >= 0) m_read_pieces.push_back(p);
492
966
// returns -1 on read error, -2 if there isn't any space in the cache
576
1053
TORRENT_ASSERT(cached_read_blocks + cached_write_blocks == m_cache_stats.cache_size);
577
1054
TORRENT_ASSERT(cached_read_blocks == m_cache_stats.read_cache_size);
1056
#ifdef TORRENT_DISK_STATS
1057
int read_allocs = m_categories.find(std::string("read cache"))->second;
1058
int write_allocs = m_categories.find(std::string("write cache"))->second;
1059
TORRENT_ASSERT(cached_read_blocks == read_allocs);
1060
TORRENT_ASSERT(cached_write_blocks == write_allocs);
579
1063
// when writing, there may be a one block difference, right before an old piece
581
TORRENT_ASSERT(m_cache_stats.cache_size <= m_cache_size + 1);
1065
TORRENT_ASSERT(m_cache_stats.cache_size <= m_settings.cache_size + 1);
1069
int disk_io_thread::read_piece_from_cache_and_hash(disk_io_job const& j, sha1_hash& h)
1071
TORRENT_ASSERT(j.buffer);
1073
mutex_t::scoped_lock l(m_piece_mutex);
1076
= find_cached_piece(m_read_pieces, j, l);
1081
int piece_size = j.storage->info()->piece_size(j.piece);
1082
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
1084
if (p != m_read_pieces.end() && p->num_blocks != blocks_in_piece)
1086
// we have the piece in the cache, but not all of the blocks
1087
ret = read_into_piece(*p, 0, ignore_cache_size, blocks_in_piece, l);
1089
if (ret < 0) return ret;
1090
TORRENT_ASSERT(!m_read_pieces.empty());
1091
TORRENT_ASSERT(p->piece == j.piece);
1092
TORRENT_ASSERT(p->storage == j.storage);
1095
// if the piece cannot be found in the cache,
1096
// read the whole piece starting at the block
1097
// we got a request for.
1098
if (p == m_read_pieces.end())
1100
ret = cache_read_piece(j, l);
1102
if (ret < 0) return ret;
1103
p = m_read_pieces.end();
1105
TORRENT_ASSERT(!m_read_pieces.empty());
1106
TORRENT_ASSERT(p->piece == j.piece);
1107
TORRENT_ASSERT(p->storage == j.storage);
1112
for (int i = 0; i < blocks_in_piece; ++i)
1114
TORRENT_ASSERT(p->blocks[i].buf);
1115
ctx.update((char const*)p->blocks[i].buf, (std::min)(piece_size, m_block_size));
1116
piece_size -= m_block_size;
1120
ret = copy_from_piece(p, hit, j, l);
1121
TORRENT_ASSERT(ret > 0);
1122
if (ret < 0) return ret;
1124
// if read cache is disabled or we exceeded the
1125
// limit, remove this piece from the cache
1126
if (in_use() >= m_settings.cache_size
1127
|| !m_settings.use_read_cache)
1129
TORRENT_ASSERT(!m_read_pieces.empty());
1130
TORRENT_ASSERT(p->piece == j.piece);
1131
TORRENT_ASSERT(p->storage == j.storage);
1132
if (p != m_read_pieces.end())
1135
m_read_pieces.erase(p);
1139
ret = j.buffer_size;
1140
++m_cache_stats.blocks_read;
1141
if (hit) ++m_cache_stats.blocks_read_hit;
1145
// this doesn't modify the read cache, it only
1146
// checks to see if the given read request can
1147
// be fully satisfied from the given cached piece
1148
// this is similar to copy_from_piece() but it
1149
// doesn't do anything but determining if it's a
1151
bool disk_io_thread::is_cache_hit(cache_t::iterator p
1152
, disk_io_job const& j, mutex_t::scoped_lock& l)
1154
int block = j.offset / m_block_size;
1155
int block_offset = j.offset & (m_block_size-1);
1156
int size = j.buffer_size;
1157
int min_blocks_to_read = block_offset > 0 ? 2 : 1;
1158
TORRENT_ASSERT(size <= m_block_size);
1159
int start_block = block;
1160
// if we have to read more than one block, and
1161
// the first block is there, make sure we test
1162
// for the second block
1163
if (p->blocks[start_block].buf != 0 && min_blocks_to_read > 1)
1166
return p->blocks[start_block].buf != 0;
1169
int disk_io_thread::copy_from_piece(cache_t::iterator p, bool& hit
1170
, disk_io_job const& j, mutex_t::scoped_lock& l)
1172
TORRENT_ASSERT(j.buffer);
1174
// update timestamp early so that we
1175
// don't run the risk of evicting our own piece
1176
// when making more room in the cache
1177
p->last_use = time_now();
1179
// copy from the cache and update the last use timestamp
1180
int block = j.offset / m_block_size;
1181
int block_offset = j.offset & (m_block_size-1);
1182
int buffer_offset = 0;
1183
int size = j.buffer_size;
1184
int min_blocks_to_read = block_offset > 0 ? 2 : 1;
1185
TORRENT_ASSERT(size <= m_block_size);
1186
int start_block = block;
1187
if (p->blocks[start_block].buf != 0 && min_blocks_to_read > 1)
1189
// if block_offset > 0, we need to read two blocks, and then
1190
// copy parts of both, because it's not aligned to the block
1192
if (p->blocks[start_block].buf == 0)
1194
int piece_size = j.storage->info()->piece_size(j.piece);
1195
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
1196
int end_block = start_block;
1197
while (end_block < blocks_in_piece && p->blocks[end_block].buf == 0) ++end_block;
1199
int blocks_to_read = end_block - block;
1200
blocks_to_read = (std::min)(blocks_to_read, (std::max)((m_settings.cache_size
1201
+ m_cache_stats.read_cache_size - in_use())/2, 3));
1202
blocks_to_read = (std::min)(blocks_to_read, m_settings.read_cache_line_size);
1203
blocks_to_read = (std::max)(blocks_to_read, min_blocks_to_read);
1204
if (in_use() + blocks_to_read > m_settings.cache_size)
1205
if (flush_cache_blocks(l, in_use() + blocks_to_read - m_settings.cache_size
1206
, p, dont_flush_write_blocks) == 0)
1209
int ret = read_into_piece(*p, block, 0, blocks_to_read, l);
1211
if (ret < 0) return ret;
1212
if (ret < size + block_offset) return -2;
1213
TORRENT_ASSERT(p->blocks[block].buf);
1218
TORRENT_ASSERT(p->blocks[block].buf);
1219
int to_copy = (std::min)(m_block_size
1220
- block_offset, size);
1221
std::memcpy(j.buffer + buffer_offset
1222
, p->blocks[block].buf + block_offset
1226
buffer_offset += to_copy;
1229
return j.buffer_size;
585
1232
int disk_io_thread::try_read_from_cache(disk_io_job const& j)
587
1234
TORRENT_ASSERT(j.buffer);
589
1236
mutex_t::scoped_lock l(m_piece_mutex);
590
if (!m_use_read_cache) return -2;
1237
if (!m_settings.use_read_cache) return -2;
592
1239
cache_t::iterator p
593
1240
= find_cached_piece(m_read_pieces, j, l);
610
1257
TORRENT_ASSERT(p->storage == j.storage);
613
if (p != m_read_pieces.end())
615
// copy from the cache and update the last use timestamp
616
int block = j.offset / m_block_size;
617
int block_offset = j.offset % m_block_size;
618
int buffer_offset = 0;
619
int size = j.buffer_size;
620
if (p->blocks[block] == 0)
622
int piece_size = j.storage->info()->piece_size(j.piece);
623
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
624
int end_block = block;
625
while (end_block < blocks_in_piece && p->blocks[end_block] == 0) ++end_block;
626
if (!make_room(end_block - block, p, l)) return -2;
627
ret = read_into_piece(*p, block, l);
629
if (ret < 0) return ret;
630
TORRENT_ASSERT(p->blocks[block]);
633
p->last_use = time_now();
636
TORRENT_ASSERT(p->blocks[block]);
637
int to_copy = (std::min)(m_block_size
638
- block_offset, size);
639
std::memcpy(j.buffer + buffer_offset
640
, p->blocks[block] + block_offset
644
buffer_offset += to_copy;
648
++m_cache_stats.blocks_read;
649
if (hit) ++m_cache_stats.blocks_read_hit;
1260
if (p == m_read_pieces.end()) return ret;
1262
ret = copy_from_piece(p, hit, j, l);
1263
if (ret < 0) return ret;
1265
ret = j.buffer_size;
1266
++m_cache_stats.blocks_read;
1267
if (hit) ++m_cache_stats.blocks_read_hit;
1271
size_type disk_io_thread::queue_buffer_size() const
1273
mutex_t::scoped_lock l(m_queue_mutex);
1274
return m_queue_buffer_size;
654
1277
void disk_io_thread::add_job(disk_io_job const& j
655
1278
, boost::function<void(int, disk_io_job const&)> const& f)
657
1280
TORRENT_ASSERT(!m_abort);
658
TORRENT_ASSERT(!j.callback);
659
TORRENT_ASSERT(j.storage);
1281
TORRENT_ASSERT(j.storage
1282
|| j.action == disk_io_job::abort_thread
1283
|| j.action == disk_io_job::update_settings);
660
1284
TORRENT_ASSERT(j.buffer_size <= m_block_size);
661
1285
mutex_t::scoped_lock l(m_queue_mutex);
663
std::list<disk_io_job>::reverse_iterator i = m_jobs.rbegin();
664
if (j.action == disk_io_job::read)
666
// when we're reading, we may not skip
667
// ahead of any write operation that overlaps
668
// the region we're reading
669
for (; i != m_jobs.rend(); i++)
671
// if *i should come before j, stop
672
// and insert j before i
674
// if we come across a write operation that
675
// overlaps the region we're reading, we need
677
if (i->action == disk_io_job::write
678
&& i->storage == j.storage
679
&& i->piece == j.piece
680
&& range_overlap(i->offset, i->buffer_size
681
, j.offset, j.buffer_size))
685
else if (j.action == disk_io_job::write)
687
for (; i != m_jobs.rend(); ++i)
691
if (i != m_jobs.rbegin()
692
&& i.base()->storage.get() != j.storage.get())
699
// if we are placed in front of all other jobs, put it on the back of
700
// the queue, to sweep the disk in the same direction, and to avoid
701
// starvation. The exception is if the priority is higher than the
702
// job at the front of the queue
703
if (i == m_jobs.rend() && (m_jobs.empty() || j.priority <= m_jobs.back().priority))
706
std::list<disk_io_job>::iterator k = m_jobs.insert(i.base(), j);
707
k->callback.swap(const_cast<boost::function<void(int, disk_io_job const&)>&>(f));
1287
m_jobs.push_back(j);
1288
m_jobs.back().callback = f;
708
1289
if (j.action == disk_io_job::write)
709
1290
m_queue_buffer_size += j.buffer_size;
710
TORRENT_ASSERT(j.storage.get());
711
1291
m_signal.notify_all();
715
bool disk_io_thread::is_disk_buffer(char* buffer) const
717
TORRENT_ASSERT(m_magic == 0x1337);
718
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
721
mutex_t::scoped_lock l(m_pool_mutex);
722
#ifdef TORRENT_DISK_STATS
723
if (m_buf_to_category.find(buffer)
724
== m_buf_to_category.end()) return false;
726
return m_pool.is_from(buffer);
731
char* disk_io_thread::allocate_buffer()
733
mutex_t::scoped_lock l(m_pool_mutex);
734
TORRENT_ASSERT(m_magic == 0x1337);
738
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
739
return (char*)malloc(m_block_size);
741
m_pool.set_next_size(16);
742
return (char*)m_pool.ordered_malloc();
746
void disk_io_thread::free_buffer(char* buf)
748
mutex_t::scoped_lock l(m_pool_mutex);
749
TORRENT_ASSERT(m_magic == 0x1337);
753
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
756
m_pool.ordered_free(buf);
760
1294
bool disk_io_thread::test_error(disk_io_job& j)
1296
TORRENT_ASSERT(j.storage);
762
1297
error_code const& ec = j.storage->error();
766
j.str = ec.message();
768
1303
j.error_file = j.storage->error_file();
1304
#ifdef TORRENT_DEBUG
1305
std::cout << "ERROR: '" << ec.message() << " in "
1306
<< j.error_file << std::endl;
769
1308
j.storage->clear_error();
771
std::cout << "ERROR: '" << j.str << "' " << j.error_file << std::endl;
1314
void disk_io_thread::post_callback(
1315
boost::function<void(int, disk_io_job const&)> const& handler
1316
, disk_io_job const& j, int ret)
1318
if (!handler) return;
1320
m_ios.post(boost::bind(handler, ret, j));
1326
, buffer_operation = 2
1327
, cancel_on_abort = 4
1330
static const boost::uint8_t action_flags[] =
1332
read_operation + buffer_operation + cancel_on_abort // read
1333
, buffer_operation // write
1336
, 0 // release_files
1338
, 0 // check_fastresume
1339
, read_operation + cancel_on_abort // check_files
1340
, 0 // save_resume_data
1343
, 0 // clear_read_cache
1344
, 0 // abort_torrent
1345
, cancel_on_abort // update_settings
1346
, read_operation + cancel_on_abort // read_and_hash
1349
bool should_cancel_on_abort(disk_io_job const& j)
1351
TORRENT_ASSERT(j.action >= 0 && j.action < sizeof(action_flags));
1352
return action_flags[j.action] & cancel_on_abort;
1355
bool is_read_operation(disk_io_job const& j)
1357
TORRENT_ASSERT(j.action >= 0 && j.action < sizeof(action_flags));
1358
return action_flags[j.action] & read_operation;
1361
bool operation_has_buffer(disk_io_job const& j)
1363
TORRENT_ASSERT(j.action >= 0 && j.action < sizeof(action_flags));
1364
return action_flags[j.action] & buffer_operation;
778
1367
void disk_io_thread::operator()()
1369
// 1 = forward in list, -1 = backwards in list
1370
int elevator_direction = 1;
1372
typedef std::multimap<size_type, disk_io_job> read_jobs_t;
1373
read_jobs_t sorted_read_jobs;
1374
read_jobs_t::iterator elevator_job_pos = sorted_read_jobs.begin();
782
1378
#ifdef TORRENT_DISK_STATS
1414
if (!m_jobs.empty())
1416
// we have a job in the job queue. If it's
1417
// a read operation and we are allowed to
1418
// reorder jobs, sort it into the read job
1419
// list and continue, otherwise just pop it
1423
if (j.action == disk_io_job::write)
1425
TORRENT_ASSERT(m_queue_buffer_size >= j.buffer_size);
1426
m_queue_buffer_size -= j.buffer_size;
1433
if (is_read_operation(j))
1437
// at this point the operation we're looking
1438
// at is a read operation. If this read operation
1439
// can be fully satisfied by the read cache, handle
1441
if (m_settings.use_read_cache)
1443
#ifdef TORRENT_DISK_STATS
1444
m_log << log_time() << " check_cache_hit" << std::endl;
1446
// unfortunately we need to lock the cache
1447
// if the cache querying function would be
1448
// made asyncronous, this would not be
1449
// necessary anymore
1450
mutex_t::scoped_lock l(m_piece_mutex);
1452
= find_cached_piece(m_read_pieces, j, l);
1454
// if it's a cache hit, process the job immediately
1455
if (p != m_read_pieces.end() && is_cache_hit(p, j, l))
1460
TORRENT_ASSERT(j.offset >= 0);
1461
if (m_settings.allow_reordered_disk_operations && defer)
1463
#ifdef TORRENT_DISK_STATS
1464
m_log << log_time() << " sorting_job" << std::endl;
1466
size_type phys_off = j.storage->physical_offset(j.piece, j.offset);
1467
sorted_read_jobs.insert(std::pair<size_type, disk_io_job>(phys_off, j));
1473
// the job queue is empty, pick the next read job
1474
// from the sorted job list. So we don't need the
1475
// job queue lock anymore
1478
TORRENT_ASSERT(!sorted_read_jobs.empty());
1480
// if we've reached the end, change the elevator direction
1481
if (elevator_job_pos == sorted_read_jobs.end() && elevator_direction == 1)
1483
elevator_direction = -1;
1487
j = elevator_job_pos->second;
1488
read_jobs_t::iterator to_erase = elevator_job_pos;
1490
// if we've reached the begining of the sorted list,
1491
// change the elvator direction
1492
if (elevator_job_pos == sorted_read_jobs.begin() && elevator_direction == -1)
1493
elevator_direction = 1;
1495
// move the elevator before erasing the job we're processing
1496
// to keep the iterator valid
1497
if (elevator_direction > 0) ++elevator_job_pos;
1498
else --elevator_job_pos;
1500
sorted_read_jobs.erase(to_erase);
809
1503
// if there's a buffer in this job, it will be freed
810
1504
// when this holder is destructed, unless it has been
812
1506
disk_buffer_holder holder(*this
813
, m_jobs.front().action != disk_io_job::check_fastresume
814
? m_jobs.front().buffer : 0);
816
boost::function<void(int, disk_io_job const&)> handler;
817
handler.swap(m_jobs.front().callback);
819
disk_io_job j = m_jobs.front();
821
m_queue_buffer_size -= j.buffer_size;
1507
, operation_has_buffer(j) ? j.buffer : 0);
1510
if (m_queue_buffer_size + j.buffer_size >= m_settings.max_queued_disk_bytes
1511
&& m_queue_buffer_size < m_settings.max_queued_disk_bytes
1513
&& m_settings.max_queued_disk_bytes > 0)
1515
// we just dropped below the high watermark of number of bytes
1516
// queued for writing to the disk. Notify the session so that it
1517
// can trigger all the connections waiting for this event
1521
if (post) m_ios.post(m_queue_callback);
824
1523
flush_expired_pieces();
828
TORRENT_ASSERT(j.storage || j.action == disk_io_job::abort_thread);
1527
TORRENT_ASSERT(j.storage
1528
|| j.action == disk_io_job::abort_thread
1529
|| j.action == disk_io_job::update_settings);
829
1530
#ifdef TORRENT_DISK_STATS
830
1531
ptime start = time_now();
877
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
879
mutex_t::scoped_lock l(m_pool_mutex);
880
m_pool.release_memory();
885
1629
case disk_io_job::abort_thread:
1631
#ifdef TORRENT_DISK_STATS
1632
m_log << log_time() << " abort_thread " << std::endl;
1634
// clear all read jobs
887
1635
mutex_t::scoped_lock jl(m_queue_mutex);
889
1637
for (std::list<disk_io_job>::iterator i = m_jobs.begin();
892
if (i->action == disk_io_job::read)
894
if (i->callback) m_ios.post(boost::bind(i->callback, -1, *i));
898
if (i->action == disk_io_job::check_files)
900
if (i->callback) m_ios.post(bind(i->callback
901
, piece_manager::disk_check_aborted, *i));
1640
if (should_cancel_on_abort(*i))
1642
if (i->action == disk_io_job::write)
1644
TORRENT_ASSERT(m_queue_buffer_size >= i->buffer_size);
1645
m_queue_buffer_size -= i->buffer_size;
1647
post_callback(i->callback, *i, -3);
902
1648
m_jobs.erase(i++);
1655
for (read_jobs_t::iterator i = sorted_read_jobs.begin();
1656
i != sorted_read_jobs.end();)
1658
if (i->second.storage != j.storage)
1663
post_callback(i->second.callback, i->second, -3);
1664
sorted_read_jobs.erase(i++);
1669
case disk_io_job::read_and_hash:
1671
#ifdef TORRENT_DISK_STATS
1672
m_log << log_time() << " read_and_hash " << j.buffer_size << std::endl;
1675
TORRENT_ASSERT(j.buffer == 0);
1676
j.buffer = allocate_buffer("send buffer");
1677
TORRENT_ASSERT(j.buffer_size <= m_block_size);
1681
#if BOOST_VERSION == 103500
1682
j.error = error_code(boost::system::posix_error::not_enough_memory
1683
, get_posix_category());
1684
#elif BOOST_VERSION > 103500
1685
j.error = error_code(boost::system::errc::not_enough_memory
1686
, get_posix_category());
1688
j.error = asio::error::no_memory;
1694
disk_buffer_holder read_holder(*this, j.buffer);
1696
// read the entire piece and verify the piece hash
1697
// since we need to check the hash, this function
1698
// will ignore the cache size limit (at least for
1699
// reading and hashing, not for keeping it around)
1701
ret = read_piece_from_cache_and_hash(j, h);
1703
// -2 means there's no space in the read cache
1704
// or that the read cache is disabled
1710
if (!m_settings.disable_hash_checks)
1711
ret = (j.storage->info()->hash_for_piece(j.piece) == h)?ret:-3;
1714
j.storage->mark_failed(j.piece);
1715
j.error = errors::failed_hash_check;
1721
TORRENT_ASSERT(j.buffer == read_holder.get());
1722
read_holder.release();
1723
#if TORRENT_DISK_STATS
1724
rename_buffer(j.buffer, "released send buffer");
911
1728
case disk_io_job::read:
913
1730
if (test_error(j))
921
1738
INVARIANT_CHECK;
922
1739
TORRENT_ASSERT(j.buffer == 0);
923
j.buffer = allocate_buffer();
1740
j.buffer = allocate_buffer("send buffer");
924
1741
TORRENT_ASSERT(j.buffer_size <= m_block_size);
925
1742
if (j.buffer == 0)
928
j.error = error_code(ENOMEM, get_posix_category());
929
j.str = j.error.message();
1745
#if BOOST_VERSION == 103500
1746
j.error = error_code(boost::system::posix_error::not_enough_memory
1747
, get_posix_category());
1748
#elif BOOST_VERSION > 103500
1749
j.error = error_code(boost::system::errc::not_enough_memory
1750
, get_posix_category());
1752
j.error = asio::error::no_memory;
933
1758
disk_buffer_holder read_holder(*this, j.buffer);
934
1760
ret = try_read_from_cache(j);
936
1762
// -2 means there's no space in the read cache
937
1763
// or that the read cache is disabled
943
1770
else if (ret == -2)
945
ret = j.storage->read_impl(j.buffer, j.piece, j.offset
1772
file::iovec_t b = { j.buffer, j.buffer_size };
1773
ret = j.storage->read_impl(&b, j.piece, j.offset, 1);
1779
if (ret != j.buffer_size)
1781
// this means the file wasn't big enough for this read
1783
j.error = errors::file_too_short;
1784
j.error_file.clear();
952
1789
++m_cache_stats.blocks_read;
954
1791
TORRENT_ASSERT(j.buffer == read_holder.get());
955
1792
read_holder.release();
1793
#if TORRENT_DISK_STATS
1794
rename_buffer(j.buffer, "released send buffer");
958
1798
case disk_io_job::write:
965
1800
#ifdef TORRENT_DISK_STATS
966
1801
m_log << log_time() << " write " << j.buffer_size << std::endl;
968
1803
mutex_t::scoped_lock l(m_piece_mutex);
969
1804
INVARIANT_CHECK;
1806
if (in_use() >= m_settings.cache_size)
1807
flush_cache_blocks(l, in_use() - m_settings.cache_size + 1, m_read_pieces.end());
970
1809
cache_t::iterator p
971
1810
= find_cached_piece(m_pieces, j, l);
972
1811
int block = j.offset / m_block_size;
1161
2004
case disk_io_job::check_fastresume:
1163
2006
#ifdef TORRENT_DISK_STATS
1164
m_log << log_time() << " check fastresume" << std::endl;
2007
m_log << log_time() << " check_fastresume" << std::endl;
1166
2009
lazy_entry const* rd = (lazy_entry const*)j.buffer;
1167
2010
TORRENT_ASSERT(rd != 0);
1168
ret = j.storage->check_fastresume(*rd, j.str);
2011
ret = j.storage->check_fastresume(*rd, j.error);
1171
2014
case disk_io_job::check_files:
1173
2016
#ifdef TORRENT_DISK_STATS
1174
m_log << log_time() << " check files" << std::endl;
2017
m_log << log_time() << " check_files" << std::endl;
1176
2019
int piece_size = j.storage->info()->piece_length();
1177
2020
for (int processed = 0; processed < 4 * 1024 * 1024; processed += piece_size)
1179
ret = j.storage->check_files(j.piece, j.offset, j.str);
2022
ptime now = time_now_hires();
2023
TORRENT_ASSERT(now >= m_last_file_check);
2024
#if BOOST_VERSION > 103600
2025
if (now - m_last_file_check < milliseconds(m_settings.file_checks_delay_per_block))
2027
int sleep_time = m_settings.file_checks_delay_per_block
2028
* (piece_size / (16 * 1024))
2029
- total_milliseconds(now - m_last_file_check);
2030
if (sleep_time < 0) sleep_time = 0;
2031
TORRENT_ASSERT(sleep_time < 5 * 1000);
2033
boost::thread::sleep(boost::get_system_time()
2034
+ boost::posix_time::milliseconds(sleep_time));
2036
m_last_file_check = time_now_hires();
2039
if (m_waiting_to_shutdown) break;
2041
ret = j.storage->check_files(j.piece, j.offset, j.error);
1181
2043
#ifndef BOOST_NO_EXCEPTIONS
1184
TORRENT_ASSERT(handler);
1185
if (handler && ret == piece_manager::need_full_check)
1186
m_ios.post(bind(handler, ret, j));
2046
TORRENT_ASSERT(j.callback);
2047
if (j.callback && ret == piece_manager::need_full_check)
2048
post_callback(j.callback, j, ret);
1187
2049
#ifndef BOOST_NO_EXCEPTIONS
1188
2050
} catch (std::exception&) {}