~ubuntu-branches/ubuntu/maverick/libtorrent-rasterbar/maverick

« back to all changes in this revision

Viewing changes to src/disk_io_thread.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Christophe Sauthier
  • Date: 2010-08-10 12:59:37 UTC
  • mfrom: (1.3.7 upstream)
  • Revision ID: james.westby@ubuntu.com-20100810125937-jbcmmf17y8yo9hgz
Tags: 0.15.0-0ubuntu1
* New upstream version.
* debian/patches/100_fix_html_docs.patch: refreshed.
* debian/control: bump up standards-version to 3.9.1 (no changes).

Show diffs side-by-side

added added

removed removed

Lines of Context:
30
30
 
31
31
*/
32
32
 
 
33
/*
 
34
        Disk queue elevator patch by Morten Husveit
 
35
*/
 
36
 
33
37
#include "libtorrent/storage.hpp"
34
 
#include <deque>
35
38
#include "libtorrent/disk_io_thread.hpp"
36
39
#include "libtorrent/disk_buffer_holder.hpp"
 
40
#include "libtorrent/alloca.hpp"
 
41
#include "libtorrent/invariant_check.hpp"
 
42
#include "libtorrent/error_code.hpp"
 
43
#include "libtorrent/file_pool.hpp"
37
44
#include <boost/scoped_array.hpp>
38
45
 
39
 
#ifdef _WIN32
40
 
#include <malloc.h>
41
 
#ifndef alloca
42
 
#define alloca(s) _alloca(s)
43
 
#endif
44
 
#endif
45
 
 
46
46
#ifdef TORRENT_DISK_STATS
47
47
#include "libtorrent/time.hpp"
48
48
#endif
49
49
 
 
50
#if TORRENT_USE_MLOCK && !defined TORRENT_WINDOWS
 
51
#include <sys/mman.h>
 
52
#endif
 
53
 
50
54
namespace libtorrent
51
55
{
52
 
 
53
 
        disk_io_thread::disk_io_thread(asio::io_service& ios, int block_size)
54
 
                : m_abort(false)
 
56
        bool should_cancel_on_abort(disk_io_job const& j);
 
57
        bool is_read_operation(disk_io_job const& j);
 
58
        bool operation_has_buffer(disk_io_job const& j);
 
59
 
 
60
        disk_buffer_pool::disk_buffer_pool(int block_size)
 
61
                : m_block_size(block_size)
 
62
                , m_in_use(0)
 
63
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
 
64
                , m_pool(block_size, m_settings.cache_buffer_chunk_size)
 
65
#endif
 
66
        {
 
67
#if defined TORRENT_DISK_STATS || defined TORRENT_STATS
 
68
                m_allocations = 0;
 
69
#endif
 
70
#ifdef TORRENT_DISK_STATS
 
71
                m_log.open("disk_buffers.log", std::ios::trunc);
 
72
                m_categories["read cache"] = 0;
 
73
                m_categories["write cache"] = 0;
 
74
 
 
75
                m_disk_access_log.open("disk_access.log", std::ios::trunc);
 
76
#endif
 
77
#ifdef TORRENT_DEBUG
 
78
                m_magic = 0x1337;
 
79
#endif
 
80
        }
 
81
 
 
82
#ifdef TORRENT_DEBUG
 
83
        disk_buffer_pool::~disk_buffer_pool()
 
84
        {
 
85
                TORRENT_ASSERT(m_magic == 0x1337);
 
86
                m_magic = 0;
 
87
        }
 
88
#endif
 
89
 
 
90
#if defined TORRENT_DEBUG || defined TORRENT_DISK_STATS
 
91
        bool disk_buffer_pool::is_disk_buffer(char* buffer
 
92
                ,boost::mutex::scoped_lock& l) const
 
93
        {
 
94
                TORRENT_ASSERT(m_magic == 0x1337);
 
95
#ifdef TORRENT_DISK_STATS
 
96
                if (m_buf_to_category.find(buffer)
 
97
                        == m_buf_to_category.end()) return false;
 
98
#endif
 
99
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
 
100
                return true;
 
101
#else
 
102
                return m_pool.is_from(buffer);
 
103
#endif
 
104
        }
 
105
 
 
106
        bool disk_buffer_pool::is_disk_buffer(char* buffer) const
 
107
        {
 
108
                mutex_t::scoped_lock l(m_pool_mutex);
 
109
                return is_disk_buffer(buffer, l);
 
110
        }
 
111
#endif
 
112
 
 
113
        char* disk_buffer_pool::allocate_buffer(char const* category)
 
114
        {
 
115
                mutex_t::scoped_lock l(m_pool_mutex);
 
116
                TORRENT_ASSERT(m_magic == 0x1337);
 
117
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
 
118
                char* ret = page_aligned_allocator::malloc(m_block_size);
 
119
#else
 
120
                char* ret = (char*)m_pool.ordered_malloc();
 
121
                m_pool.set_next_size(m_settings.cache_buffer_chunk_size);
 
122
#endif
 
123
                ++m_in_use;
 
124
#if TORRENT_USE_MLOCK
 
125
                if (m_settings.lock_disk_cache)
 
126
                {
 
127
#ifdef TORRENT_WINDOWS
 
128
                        VirtualLock(ret, m_block_size);
 
129
#else
 
130
                        mlock(ret, m_block_size);
 
131
#endif          
 
132
                }
 
133
#endif
 
134
 
 
135
#if defined TORRENT_DISK_STATS || defined TORRENT_STATS
 
136
                ++m_allocations;
 
137
#endif
 
138
#ifdef TORRENT_DISK_STATS
 
139
                ++m_categories[category];
 
140
                m_buf_to_category[ret] = category;
 
141
                m_log << log_time() << " " << category << ": " << m_categories[category] << "\n";
 
142
#endif
 
143
                TORRENT_ASSERT(ret == 0 || is_disk_buffer(ret, l));
 
144
                return ret;
 
145
        }
 
146
 
 
147
#ifdef TORRENT_DISK_STATS
 
148
        void disk_buffer_pool::rename_buffer(char* buf, char const* category)
 
149
        {
 
150
                mutex_t::scoped_lock l(m_pool_mutex);
 
151
                TORRENT_ASSERT(is_disk_buffer(buf, l));
 
152
                TORRENT_ASSERT(m_categories.find(m_buf_to_category[buf])
 
153
                        != m_categories.end());
 
154
                std::string const& prev_category = m_buf_to_category[buf];
 
155
                --m_categories[prev_category];
 
156
                m_log << log_time() << " " << prev_category << ": " << m_categories[prev_category] << "\n";
 
157
 
 
158
                ++m_categories[category];
 
159
                m_buf_to_category[buf] = category;
 
160
                m_log << log_time() << " " << category << ": " << m_categories[category] << "\n";
 
161
                TORRENT_ASSERT(m_categories.find(m_buf_to_category[buf])
 
162
                        != m_categories.end());
 
163
        }
 
164
#endif
 
165
 
 
166
        void disk_buffer_pool::free_buffer(char* buf)
 
167
        {
 
168
                TORRENT_ASSERT(buf);
 
169
                mutex_t::scoped_lock l(m_pool_mutex);
 
170
                TORRENT_ASSERT(m_magic == 0x1337);
 
171
                TORRENT_ASSERT(is_disk_buffer(buf, l));
 
172
#if defined TORRENT_DISK_STATS || defined TORRENT_STATS
 
173
                --m_allocations;
 
174
#endif
 
175
#ifdef TORRENT_DISK_STATS
 
176
                TORRENT_ASSERT(m_categories.find(m_buf_to_category[buf])
 
177
                        != m_categories.end());
 
178
                std::string const& category = m_buf_to_category[buf];
 
179
                --m_categories[category];
 
180
                m_log << log_time() << " " << category << ": " << m_categories[category] << "\n";
 
181
                m_buf_to_category.erase(buf);
 
182
#endif
 
183
#if TORRENT_USE_MLOCK
 
184
                if (m_settings.lock_disk_cache)
 
185
                {
 
186
#ifdef TORRENT_WINDOWS
 
187
                        VirtualUnlock(buf, m_block_size);
 
188
#else
 
189
                        munlock(buf, m_block_size);
 
190
#endif          
 
191
                }
 
192
#endif
 
193
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
 
194
                page_aligned_allocator::free(buf);
 
195
#else
 
196
                m_pool.ordered_free(buf);
 
197
#endif
 
198
                --m_in_use;
 
199
        }
 
200
 
 
201
        char* disk_buffer_pool::allocate_buffers(int num_blocks, char const* category)
 
202
        {
 
203
                mutex_t::scoped_lock l(m_pool_mutex);
 
204
                TORRENT_ASSERT(m_magic == 0x1337);
 
205
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
 
206
                char* ret = page_aligned_allocator::malloc(m_block_size * num_blocks);
 
207
#else
 
208
                char* ret = (char*)m_pool.ordered_malloc(num_blocks);
 
209
                m_pool.set_next_size(m_settings.cache_buffer_chunk_size);
 
210
#endif
 
211
                m_in_use += num_blocks;
 
212
#if TORRENT_USE_MLOCK
 
213
                if (m_settings.lock_disk_cache)
 
214
                {
 
215
#ifdef TORRENT_WINDOWS
 
216
                        VirtualLock(ret, m_block_size * num_blocks);
 
217
#else
 
218
                        mlock(ret, m_block_size * num_blocks);
 
219
#endif          
 
220
                }
 
221
#endif
 
222
#if defined TORRENT_DISK_STATS || defined TORRENT_STATS
 
223
                m_allocations += num_blocks;
 
224
#endif
 
225
#ifdef TORRENT_DISK_STATS
 
226
                m_categories[category] += num_blocks;
 
227
                m_buf_to_category[ret] = category;
 
228
                m_log << log_time() << " " << category << ": " << m_categories[category] << "\n";
 
229
#endif
 
230
                TORRENT_ASSERT(ret == 0 || is_disk_buffer(ret, l));
 
231
                return ret;
 
232
        }
 
233
 
 
234
        void disk_buffer_pool::free_buffers(char* buf, int num_blocks)
 
235
        {
 
236
                TORRENT_ASSERT(buf);
 
237
                TORRENT_ASSERT(num_blocks >= 1);
 
238
                mutex_t::scoped_lock l(m_pool_mutex);
 
239
                TORRENT_ASSERT(m_magic == 0x1337);
 
240
                TORRENT_ASSERT(is_disk_buffer(buf, l));
 
241
#if defined TORRENT_DISK_STATS || defined TORRENT_STATS
 
242
                m_allocations -= num_blocks;
 
243
#endif
 
244
#ifdef TORRENT_DISK_STATS
 
245
                TORRENT_ASSERT(m_categories.find(m_buf_to_category[buf])
 
246
                        != m_categories.end());
 
247
                std::string const& category = m_buf_to_category[buf];
 
248
                m_categories[category] -= num_blocks;
 
249
                m_log << log_time() << " " << category << ": " << m_categories[category] << "\n";
 
250
                m_buf_to_category.erase(buf);
 
251
#endif
 
252
#if TORRENT_USE_MLOCK
 
253
                if (m_settings.lock_disk_cache)
 
254
                {
 
255
#ifdef TORRENT_WINDOWS
 
256
                        VirtualUnlock(buf, m_block_size * num_blocks);
 
257
#else
 
258
                        munlock(buf, m_block_size * num_blocks);
 
259
#endif          
 
260
                }
 
261
#endif
 
262
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
 
263
                page_aligned_allocator::free(buf);
 
264
#else
 
265
                m_pool.ordered_free(buf, num_blocks);
 
266
#endif
 
267
                m_in_use -= num_blocks;
 
268
        }
 
269
 
 
270
        void disk_buffer_pool::release_memory()
 
271
        {
 
272
                TORRENT_ASSERT(m_magic == 0x1337);
 
273
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
 
274
                mutex_t::scoped_lock l(m_pool_mutex);
 
275
                m_pool.release_memory();
 
276
#endif
 
277
        }
 
278
 
 
279
// ------- disk_io_thread ------
 
280
 
 
281
 
 
282
        disk_io_thread::disk_io_thread(asio::io_service& ios
 
283
                , boost::function<void()> const& queue_callback
 
284
                , file_pool& fp
 
285
                , int block_size)
 
286
                : disk_buffer_pool(block_size)
 
287
                , m_abort(false)
 
288
                , m_waiting_to_shutdown(false)
55
289
                , m_queue_buffer_size(0)
56
 
                , m_cache_size(512) // 512 * 16kB = 8MB
57
 
                , m_cache_expiry(60) // 1 minute
58
 
                , m_coalesce_writes(true)
59
 
                , m_coalesce_reads(true)
60
 
                , m_use_read_cache(true)
61
 
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
62
 
                , m_pool(block_size, 16)
63
 
#endif
64
 
                , m_block_size(block_size)
 
290
                , m_last_file_check(time_now_hires())
65
291
                , m_ios(ios)
 
292
                , m_queue_callback(queue_callback)
66
293
                , m_work(io_service::work(m_ios))
 
294
                , m_file_pool(fp)
67
295
                , m_disk_io_thread(boost::ref(*this))
68
296
        {
69
 
#ifdef TORRENT_STATS
70
 
                m_allocations = 0;
71
 
#endif
72
297
#ifdef TORRENT_DISK_STATS
73
298
                m_log.open("disk_io_thread.log", std::ios::trunc);
74
299
#endif
75
 
#ifdef TORRENT_DEBUG
76
 
        m_magic = 0x1337;
77
 
#endif
78
300
        }
79
301
 
80
302
        disk_io_thread::~disk_io_thread()
81
303
        {
82
304
                TORRENT_ASSERT(m_abort == true);
83
 
                TORRENT_ASSERT(m_magic == 0x1337);
84
 
#ifdef TORRENT_DEBUG
85
 
                m_magic = 0;
86
 
#endif
87
305
        }
88
306
 
89
307
        void disk_io_thread::join()
90
308
        {
91
309
                mutex_t::scoped_lock l(m_queue_mutex);
92
310
                disk_io_job j;
 
311
                m_waiting_to_shutdown = true;
93
312
                j.action = disk_io_job::abort_thread;
94
313
                m_jobs.insert(m_jobs.begin(), j);
95
314
                m_signal.notify_all();
118
337
                        int blocks_in_piece = (ti.piece_size(i->piece) + (m_block_size) - 1) / m_block_size;
119
338
                        info.blocks.resize(blocks_in_piece);
120
339
                        for (int b = 0; b < blocks_in_piece; ++b)
121
 
                                if (i->blocks[b]) info.blocks[b] = true;
 
340
                                if (i->blocks[b].buf) info.blocks[b] = true;
122
341
                        ret.push_back(info);
123
342
                }
124
343
                for (cache_t::const_iterator i = m_read_pieces.begin()
133
352
                        int blocks_in_piece = (ti.piece_size(i->piece) + (m_block_size) - 1) / m_block_size;
134
353
                        info.blocks.resize(blocks_in_piece);
135
354
                        for (int b = 0; b < blocks_in_piece; ++b)
136
 
                                if (i->blocks[b]) info.blocks[b] = true;
 
355
                                if (i->blocks[b].buf) info.blocks[b] = true;
137
356
                        ret.push_back(info);
138
357
                }
139
358
        }
141
360
        cache_status disk_io_thread::status() const
142
361
        {
143
362
                mutex_t::scoped_lock l(m_piece_mutex);
 
363
                m_cache_stats.total_used_buffers = in_use();
 
364
                m_cache_stats.queued_bytes = m_queue_buffer_size;
144
365
                return m_cache_stats;
145
366
        }
146
367
 
147
 
        void disk_io_thread::set_cache_size(int s)
148
 
        {
149
 
                mutex_t::scoped_lock l(m_piece_mutex);
150
 
                TORRENT_ASSERT(s >= 0);
151
 
                m_cache_size = s;
152
 
        }
153
 
 
154
 
        void disk_io_thread::set_cache_expiry(int ex)
155
 
        {
156
 
                mutex_t::scoped_lock l(m_piece_mutex);
157
 
                TORRENT_ASSERT(ex > 0);
158
 
                m_cache_expiry = ex;
159
 
        }
160
 
 
161
368
        // aborts read operations
162
369
        void disk_io_thread::stop(boost::intrusive_ptr<piece_manager> s)
163
370
        {
171
378
                                ++i;
172
379
                                continue;
173
380
                        }
174
 
                        if (i->action == disk_io_job::read)
175
 
                        {
176
 
                                if (i->callback) m_ios.post(boost::bind(i->callback, -1, *i));
177
 
                                m_jobs.erase(i++);
178
 
                                continue;
179
 
                        }
180
 
                        if (i->action == disk_io_job::check_files)
181
 
                        {
182
 
                                if (i->callback) m_ios.post(boost::bind(i->callback
183
 
                                        , piece_manager::disk_check_aborted, *i));
 
381
                        if (should_cancel_on_abort(*i))
 
382
                        {
 
383
                                if (i->action == disk_io_job::write)
 
384
                                {
 
385
                                        TORRENT_ASSERT(m_queue_buffer_size >= i->buffer_size);
 
386
                                        m_queue_buffer_size -= i->buffer_size;
 
387
                                }
 
388
                                post_callback(i->callback, *i, -3);
184
389
                                m_jobs.erase(i++);
185
390
                                continue;
186
391
                        }
239
444
                mutex_t::scoped_lock l(m_piece_mutex);
240
445
 
241
446
                INVARIANT_CHECK;
 
447
                // flush write cache
242
448
                for (;;)
243
449
                {
244
450
                        cache_t::iterator i = std::min_element(
245
451
                                m_pieces.begin(), m_pieces.end()
246
452
                                , bind(&cached_piece_entry::last_use, _1)
247
453
                                < bind(&cached_piece_entry::last_use, _2));
248
 
                        if (i == m_pieces.end()) return;
 
454
                        if (i == m_pieces.end()) break;
249
455
                        int age = total_seconds(now - i->last_use);
250
 
                        if (age < m_cache_expiry) return;
 
456
                        if (age < m_settings.cache_expiry) break;
251
457
                        flush_and_remove(i, l);
252
458
                }
 
459
 
 
460
                // flush read cache
 
461
                for (;;)
 
462
                {
 
463
                        cache_t::iterator i = std::min_element(
 
464
                                m_read_pieces.begin(), m_read_pieces.end()
 
465
                                , bind(&cached_piece_entry::last_use, _1)
 
466
                                < bind(&cached_piece_entry::last_use, _2));
 
467
                        if (i == m_read_pieces.end()) break;
 
468
                        int age = total_seconds(now - i->last_use);
 
469
                        if (age < m_settings.cache_expiry) break;
 
470
                        free_piece(*i, l);
 
471
                        m_read_pieces.erase(i);
 
472
                }
253
473
        }
254
474
 
255
 
        void disk_io_thread::free_piece(cached_piece_entry& p, mutex_t::scoped_lock& l)
 
475
        // returns the number of blocks that were freed
 
476
        int disk_io_thread::free_piece(cached_piece_entry& p, mutex_t::scoped_lock& l)
256
477
        {
257
478
                int piece_size = p.storage->info()->piece_size(p.piece);
258
479
                int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
 
480
                int ret = 0;
259
481
 
260
482
                for (int i = 0; i < blocks_in_piece; ++i)
261
483
                {
262
 
                        if (p.blocks[i] == 0) continue;
263
 
                        free_buffer(p.blocks[i]);
264
 
                        p.blocks[i] = 0;
 
484
                        if (p.blocks[i].buf == 0) continue;
 
485
                        free_buffer(p.blocks[i].buf);
 
486
                        ++ret;
 
487
                        p.blocks[i].buf = 0;
265
488
                        --p.num_blocks;
266
489
                        --m_cache_stats.cache_size;
267
490
                        --m_cache_stats.read_cache_size;
268
491
                }
 
492
                return ret;
269
493
        }
270
494
 
271
 
        bool disk_io_thread::clear_oldest_read_piece(
272
 
                cache_t::iterator ignore
 
495
        // returns the number of blocks that were freed
 
496
        int disk_io_thread::clear_oldest_read_piece(
 
497
                int num_blocks
 
498
                , cache_t::iterator ignore
273
499
                , mutex_t::scoped_lock& l)
274
500
        {
275
501
                INVARIANT_CHECK;
281
507
                if (i != m_read_pieces.end() && i != ignore)
282
508
                {
283
509
                        // don't replace an entry that is less than one second old
284
 
                        if (time_now() - i->last_use < seconds(1)) return false;
285
 
                        free_piece(*i, l);
286
 
                        m_read_pieces.erase(i);
287
 
                        return true;
288
 
                }
289
 
                return false;
290
 
        }
291
 
 
292
 
        void disk_io_thread::flush_oldest_piece(mutex_t::scoped_lock& l)
293
 
        {
294
 
                INVARIANT_CHECK;
 
510
                        if (time_now() - i->last_use < seconds(1)) return 0;
 
511
                        int blocks = 0;
 
512
                        if (num_blocks >= i->num_blocks)
 
513
                        {
 
514
                                blocks = free_piece(*i, l);
 
515
                        }
 
516
                        else
 
517
                        {
 
518
                                // delete blocks from the start and from the end
 
519
                                // until num_blocks have been freed
 
520
                                int end = (i->storage->info()->piece_size(i->piece) + m_block_size - 1) / m_block_size - 1;
 
521
                                int start = 0;
 
522
 
 
523
                                while (num_blocks)
 
524
                                {
 
525
                                        while (i->blocks[start].buf == 0 && start <= end) ++start;
 
526
                                        if (start > end) break;
 
527
                                        free_buffer(i->blocks[start].buf);
 
528
                                        i->blocks[start].buf = 0;
 
529
                                        ++blocks;
 
530
                                        --i->num_blocks;
 
531
                                        --m_cache_stats.cache_size;
 
532
                                        --m_cache_stats.read_cache_size;
 
533
                                        --num_blocks;
 
534
                                        if (!num_blocks) break;
 
535
 
 
536
                                        while (i->blocks[end].buf == 0 && start <= end) --end;
 
537
                                        if (start > end) break;
 
538
                                        free_buffer(i->blocks[end].buf);
 
539
                                        i->blocks[end].buf = 0;
 
540
                                        ++blocks;
 
541
                                        --i->num_blocks;
 
542
                                        --m_cache_stats.cache_size;
 
543
                                        --m_cache_stats.read_cache_size;
 
544
                                        --num_blocks;
 
545
                                }
 
546
                        
 
547
                        }
 
548
                        if (i->num_blocks == 0) m_read_pieces.erase(i);
 
549
                        return blocks;
 
550
                }
 
551
                return 0;
 
552
        }
 
553
 
 
554
        int contiguous_blocks(disk_io_thread::cached_piece_entry const& b)
 
555
        {
 
556
                int ret = 0;
 
557
                int current = 0;
 
558
                int blocks_in_piece = (b.storage->info()->piece_size(b.piece) + 16 * 1024 - 1) / (16 * 1024);
 
559
                for (int i = 0; i < blocks_in_piece; ++i)
 
560
                {
 
561
                        if (b.blocks[i].buf) ++current;
 
562
                        else
 
563
                        {
 
564
                                if (current > ret) ret = current;
 
565
                                current = 0;
 
566
                        }
 
567
                }
 
568
                if (current > ret) ret = current;
 
569
                return ret;
 
570
        }
 
571
 
 
572
        int disk_io_thread::flush_contiguous_blocks(disk_io_thread::cache_t::iterator e
 
573
                , mutex_t::scoped_lock& l, int lower_limit)
 
574
        {
 
575
                // first find the largest range of contiguous  blocks
 
576
                int len = 0;
 
577
                int current = 0;
 
578
                int pos = 0;
 
579
                int start = 0;
 
580
                int blocks_in_piece = (e->storage->info()->piece_size(e->piece)
 
581
                        + m_block_size - 1) / m_block_size;
 
582
                for (int i = 0; i < blocks_in_piece; ++i)
 
583
                {
 
584
                        if (e->blocks[i].buf) ++current;
 
585
                        else
 
586
                        {
 
587
                                if (current > len)
 
588
                                {
 
589
                                        len = current;
 
590
                                        pos = start;
 
591
                                }
 
592
                                current = 0;
 
593
                                start = i + 1;
 
594
                        }
 
595
                }
 
596
                if (current > len)
 
597
                {
 
598
                        len = current;
 
599
                        pos = start;
 
600
                }
 
601
 
 
602
                if (len < lower_limit || len <= 0) return 0;
 
603
                len = flush_range(e, pos, pos + len, l);
 
604
                if (e->num_blocks == 0) m_pieces.erase(e);
 
605
                return len;
 
606
        }
 
607
 
 
608
        // flushes 'blocks' blocks from the cache
 
609
        int disk_io_thread::flush_cache_blocks(mutex_t::scoped_lock& l
 
610
                , int blocks, cache_t::iterator ignore, int options)
 
611
        {
295
612
                // first look if there are any read cache entries that can
296
613
                // be cleared
297
 
                if (clear_oldest_read_piece(m_read_pieces.end(), l)) return;
298
 
 
299
 
                cache_t::iterator i = std::min_element(
300
 
                        m_pieces.begin(), m_pieces.end()
301
 
                        , bind(&cached_piece_entry::last_use, _1)
302
 
                        < bind(&cached_piece_entry::last_use, _2));
303
 
                if (i == m_pieces.end()) return;
304
 
                flush_and_remove(i, l);
 
614
                int ret = 0;
 
615
                int tmp = 0;
 
616
                do {
 
617
                        tmp = clear_oldest_read_piece(blocks, ignore, l);
 
618
                        blocks -= tmp;
 
619
                        ret += tmp;
 
620
                } while (tmp > 0 && blocks > 0);
 
621
 
 
622
                if (options & dont_flush_write_blocks) return ret;
 
623
 
 
624
                if (m_settings.disk_cache_algorithm == session_settings::lru)
 
625
                {
 
626
                        while (blocks > 0)
 
627
                        {
 
628
                                cache_t::iterator i = std::min_element(
 
629
                                        m_pieces.begin(), m_pieces.end()
 
630
                                        , bind(&cached_piece_entry::last_use, _1)
 
631
                                        < bind(&cached_piece_entry::last_use, _2));
 
632
                                if (i == m_pieces.end()) return ret;
 
633
                                tmp = flush_and_remove(i, l);
 
634
                                blocks -= tmp;
 
635
                                ret += tmp;
 
636
                        }
 
637
                }
 
638
                else if (m_settings.disk_cache_algorithm == session_settings::largest_contiguous)
 
639
                {
 
640
                        while (blocks > 0)
 
641
                        {
 
642
                                cache_t::iterator i = std::max_element(
 
643
                                        m_pieces.begin(), m_pieces.end()
 
644
                                        , bind(&contiguous_blocks, _1)
 
645
                                        < bind(&contiguous_blocks, _2));
 
646
                                if (i == m_pieces.end()) return ret;
 
647
                                tmp = flush_contiguous_blocks(i, l);
 
648
                                blocks -= tmp;
 
649
                                ret += tmp;
 
650
                        }
 
651
                }
 
652
                return ret;
305
653
        }
306
654
 
307
 
        void disk_io_thread::flush_and_remove(disk_io_thread::cache_t::iterator e
 
655
        int disk_io_thread::flush_and_remove(disk_io_thread::cache_t::iterator e
308
656
                , mutex_t::scoped_lock& l)
309
657
        {
310
 
                flush(e, l);
 
658
                int ret = flush_range(e, 0, INT_MAX, l);
311
659
                m_pieces.erase(e);
 
660
                return ret;
312
661
        }
313
662
 
314
 
        void disk_io_thread::flush(disk_io_thread::cache_t::iterator e
315
 
                , mutex_t::scoped_lock& l)
 
663
        int disk_io_thread::flush_range(disk_io_thread::cache_t::iterator e
 
664
                , int start, int end, mutex_t::scoped_lock& l)
316
665
        {
317
666
                INVARIANT_CHECK;
 
667
 
 
668
                TORRENT_ASSERT(start < end);
 
669
 
 
670
                // TODO: copy *e and unlink it before unlocking
318
671
                cached_piece_entry& p = *e;
319
672
                int piece_size = p.storage->info()->piece_size(p.piece);
320
673
#ifdef TORRENT_DISK_STATS
321
674
                m_log << log_time() << " flushing " << piece_size << std::endl;
322
675
#endif
323
676
                TORRENT_ASSERT(piece_size > 0);
324
 
                boost::scoped_array<char> buf;
325
 
                if (m_coalesce_writes) buf.reset(new (std::nothrow) char[piece_size]);
326
677
                
327
678
                int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
328
679
                int buffer_size = 0;
329
680
                int offset = 0;
330
 
                for (int i = 0; i <= blocks_in_piece; ++i)
 
681
 
 
682
                boost::scoped_array<char> buf;
 
683
                file::iovec_t* iov = 0;
 
684
                int iov_counter = 0;
 
685
                if (m_settings.coalesce_writes) buf.reset(new (std::nothrow) char[piece_size]);
 
686
                else iov = TORRENT_ALLOCA(file::iovec_t, blocks_in_piece);
 
687
 
 
688
                end = (std::min)(end, blocks_in_piece);
 
689
                for (int i = start; i <= end; ++i)
331
690
                {
332
 
                        if (i == blocks_in_piece || p.blocks[i] == 0)
 
691
                        if (i == end || p.blocks[i].buf == 0)
333
692
                        {
334
693
                                if (buffer_size == 0) continue;
335
 
                                TORRENT_ASSERT(buf);
336
694
                        
337
695
                                TORRENT_ASSERT(buffer_size <= i * m_block_size);
338
696
                                l.unlock();
339
 
                                p.storage->write_impl(buf.get(), p.piece, (std::min)(
340
 
                                        i * m_block_size, piece_size) - buffer_size, buffer_size);
 
697
                                if (iov)
 
698
                                {
 
699
                                        p.storage->write_impl(iov, p.piece, (std::min)(
 
700
                                                i * m_block_size, piece_size) - buffer_size, iov_counter);
 
701
                                        iov_counter = 0;
 
702
                                }
 
703
                                else
 
704
                                {
 
705
                                        TORRENT_ASSERT(buf);
 
706
                                        file::iovec_t b = { buf.get(), buffer_size };
 
707
                                        p.storage->write_impl(&b, p.piece, (std::min)(
 
708
                                                i * m_block_size, piece_size) - buffer_size, 1);
 
709
                                }
341
710
                                l.lock();
342
711
                                ++m_cache_stats.writes;
343
712
//                              std::cerr << " flushing p: " << p.piece << " bytes: " << buffer_size << std::endl;
350
719
                        TORRENT_ASSERT(offset + block_size > 0);
351
720
                        if (!buf)
352
721
                        {
353
 
                                l.unlock();
354
 
                                p.storage->write_impl(p.blocks[i], p.piece, i * m_block_size, block_size);
355
 
                                l.lock();
356
 
                                ++m_cache_stats.writes;
 
722
                                iov[iov_counter].iov_base = p.blocks[i].buf;
 
723
                                iov[iov_counter].iov_len = block_size;
 
724
                                ++iov_counter;
357
725
                        }
358
726
                        else
359
727
                        {
360
 
                                std::memcpy(buf.get() + offset, p.blocks[i], block_size);
 
728
                                std::memcpy(buf.get() + offset, p.blocks[i].buf, block_size);
361
729
                                offset += m_block_size;
362
 
                                buffer_size += block_size;
363
730
                        }
364
 
                        free_buffer(p.blocks[i]);
365
 
                        p.blocks[i] = 0;
 
731
                        buffer_size += block_size;
366
732
                        TORRENT_ASSERT(p.num_blocks > 0);
367
733
                        --p.num_blocks;
368
734
                        ++m_cache_stats.blocks_written;
369
735
                        --m_cache_stats.cache_size;
370
736
                }
 
737
 
 
738
                int ret = 0;
 
739
                disk_io_job j;
 
740
                j.storage = p.storage;
 
741
                j.action = disk_io_job::write;
 
742
                j.buffer = 0;
 
743
                j.piece = p.piece;
 
744
                test_error(j);
 
745
                for (int i = start; i < end; ++i)
 
746
                {
 
747
                        if (p.blocks[i].buf == 0) continue;
 
748
                        j.buffer_size = (std::min)(piece_size - i * m_block_size, m_block_size);
 
749
                        int result = j.error ? -1 : j.buffer_size;
 
750
                        j.offset = i * m_block_size;
 
751
                        free_buffer(p.blocks[i].buf);
 
752
                        post_callback(p.blocks[i].callback, j, result);
 
753
                        p.blocks[i].callback.clear();
 
754
                        p.blocks[i].buf = 0;
 
755
                        ++ret;
 
756
                }
 
757
 
371
758
                TORRENT_ASSERT(buffer_size == 0);
372
759
//              std::cerr << " flushing p: " << p.piece << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
373
760
#ifdef TORRENT_DEBUG
374
 
                for (int i = 0; i < blocks_in_piece; ++i)
375
 
                        TORRENT_ASSERT(p.blocks[i] == 0);
 
761
                for (int i = start; i < end; ++i)
 
762
                        TORRENT_ASSERT(p.blocks[i].buf == 0);
376
763
#endif
 
764
                return ret;
377
765
        }
378
766
 
379
767
        // returns -1 on failure
380
 
        int disk_io_thread::cache_block(disk_io_job& j, mutex_t::scoped_lock& l)
 
768
        int disk_io_thread::cache_block(disk_io_job& j
 
769
                , boost::function<void(int,disk_io_job const&)>& handler
 
770
                , mutex_t::scoped_lock& l)
381
771
        {
382
772
                INVARIANT_CHECK;
383
773
                TORRENT_ASSERT(find_cached_piece(m_pieces, j, l) == m_pieces.end());
386
776
 
387
777
                int piece_size = j.storage->info()->piece_size(j.piece);
388
778
                int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
 
779
                // there's no point in caching the piece if
 
780
                // there's only one block in it
 
781
                if (blocks_in_piece <= 1) return -1;
 
782
 
 
783
#ifdef TORRENT_DISK_STATS
 
784
                rename_buffer(j.buffer, "write cache");
 
785
#endif
389
786
 
390
787
                p.piece = j.piece;
391
788
                p.storage = j.storage;
392
789
                p.last_use = time_now();
393
790
                p.num_blocks = 1;
394
 
                p.blocks.reset(new (std::nothrow) char*[blocks_in_piece]);
 
791
                p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]);
395
792
                if (!p.blocks) return -1;
396
 
                std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*));
397
793
                int block = j.offset / m_block_size;
398
794
//              std::cerr << " adding cache entry for p: " << j.piece << " block: " << block << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
399
 
                p.blocks[block] = j.buffer;
 
795
                p.blocks[block].buf = j.buffer;
 
796
                p.blocks[block].callback.swap(handler);
400
797
                ++m_cache_stats.cache_size;
401
798
                m_pieces.push_back(p);
402
799
                return 0;
404
801
 
405
802
        // fills a piece with data from disk, returns the total number of bytes
406
803
        // read or -1 if there was an error
407
 
        int disk_io_thread::read_into_piece(cached_piece_entry& p, int start_block, mutex_t::scoped_lock& l)
 
804
        int disk_io_thread::read_into_piece(cached_piece_entry& p, int start_block
 
805
                , int options, int num_blocks, mutex_t::scoped_lock& l)
408
806
        {
 
807
                TORRENT_ASSERT(num_blocks > 0);
409
808
                int piece_size = p.storage->info()->piece_size(p.piece);
410
809
                int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
411
810
 
412
811
                int end_block = start_block;
 
812
                int num_read = 0;
 
813
 
 
814
                int iov_counter = 0;
 
815
                file::iovec_t* iov = TORRENT_ALLOCA(file::iovec_t, (std::min)(blocks_in_piece - start_block, num_blocks));
 
816
 
 
817
                int piece_offset = start_block * m_block_size;
 
818
 
 
819
                int ret = 0;
 
820
 
 
821
                boost::scoped_array<char> buf;
413
822
                for (int i = start_block; i < blocks_in_piece
414
 
                        && m_cache_stats.cache_size < m_cache_size; ++i)
 
823
                        && ((options & ignore_cache_size)
 
824
                                || in_use() < m_settings.cache_size); ++i)
415
825
                {
 
826
                        int block_size = (std::min)(piece_size - piece_offset, m_block_size);
 
827
                        TORRENT_ASSERT(piece_offset <= piece_size);
 
828
 
416
829
                        // this is a block that is already allocated
417
 
                        // stop allocating and don't read more than
418
 
                        // what we've allocated now
419
 
                        if (p.blocks[i]) break;
420
 
                        p.blocks[i] = allocate_buffer();
 
830
                        // free it an allocate a new one
 
831
                        if (p.blocks[i].buf)
 
832
                        {
 
833
                                free_buffer(p.blocks[i].buf);
 
834
                                --p.num_blocks;
 
835
                                --m_cache_stats.cache_size;
 
836
                                --m_cache_stats.read_cache_size;
 
837
                        }
 
838
                        p.blocks[i].buf = allocate_buffer("read cache");
421
839
 
422
840
                        // the allocation failed, break
423
 
                        if (p.blocks[i] == 0) break;
 
841
                        if (p.blocks[i].buf == 0)
 
842
                        {
 
843
                                free_piece(p, l);
 
844
                                return -1;
 
845
                        }
424
846
                        ++p.num_blocks;
425
847
                        ++m_cache_stats.cache_size;
426
848
                        ++m_cache_stats.read_cache_size;
427
849
                        ++end_block;
428
 
                }
429
 
 
430
 
                if (end_block == start_block) return -2;
 
850
                        ++num_read;
 
851
                        iov[iov_counter].iov_base = p.blocks[i].buf;
 
852
                        iov[iov_counter].iov_len = block_size;
 
853
                        ++iov_counter;
 
854
                        piece_offset += m_block_size;
 
855
                        if (num_read >= num_blocks) break;
 
856
                }
 
857
 
 
858
                if (end_block == start_block)
 
859
                {
 
860
                        // something failed. Free all buffers
 
861
                        // we just allocated
 
862
                        free_piece(p, l);
 
863
                        return -2;
 
864
                }
 
865
 
 
866
                TORRENT_ASSERT(iov_counter <= (std::min)(blocks_in_piece - start_block, num_blocks));
431
867
 
432
868
                // the buffer_size is the size of the buffer we need to read
433
869
                // all these blocks.
434
870
                const int buffer_size = (std::min)((end_block - start_block) * m_block_size
435
871
                        , piece_size - start_block * m_block_size);
 
872
                TORRENT_ASSERT(buffer_size > 0);
436
873
                TORRENT_ASSERT(buffer_size <= piece_size);
437
874
                TORRENT_ASSERT(buffer_size + start_block * m_block_size <= piece_size);
438
 
                boost::scoped_array<char> buf;
439
 
                if (m_coalesce_reads) buf.reset(new (std::nothrow) char[buffer_size]);
440
 
                int ret = 0;
 
875
 
 
876
                if (m_settings.coalesce_reads)
 
877
                        buf.reset(new (std::nothrow) char[buffer_size]);
 
878
 
441
879
                if (buf)
442
880
                {
443
881
                        l.unlock();
444
 
                        ret += p.storage->read_impl(buf.get(), p.piece, start_block * m_block_size, buffer_size);
 
882
                        file::iovec_t b = { buf.get(), buffer_size };
 
883
                        ret = p.storage->read_impl(&b, p.piece, start_block * m_block_size, 1);
445
884
                        l.lock();
446
 
                        if (p.storage->error()) { return -1; }
447
885
                        ++m_cache_stats.reads;
448
 
                }
 
886
                        if (p.storage->error())
 
887
                        {
 
888
                                free_piece(p, l);
 
889
                                return -1;
 
890
                        }
 
891
 
 
892
                        if (ret != buffer_size)
 
893
                        {
 
894
                                // this means the file wasn't big enough for this read
 
895
                                p.storage->get_storage_impl()->set_error(""
 
896
                                        , errors::file_too_short);
 
897
                                free_piece(p, l);
 
898
                                return -1;
 
899
                        }
449
900
                
450
 
                int piece_offset = start_block * m_block_size;
451
 
                int offset = 0;
452
 
                for (int i = start_block; i < end_block; ++i)
 
901
                        int offset = 0;
 
902
                        for (int i = 0; i < iov_counter; ++i)
 
903
                        {
 
904
                                TORRENT_ASSERT(iov[i].iov_base);
 
905
                                TORRENT_ASSERT(iov[i].iov_len > 0);
 
906
                                TORRENT_ASSERT(offset + iov[i].iov_len <= buffer_size);
 
907
                                std::memcpy(iov[i].iov_base, buf.get() + offset, iov[i].iov_len);
 
908
                                offset += iov[i].iov_len;
 
909
                        }
 
910
                }
 
911
                else
453
912
                {
454
 
                        int block_size = (std::min)(piece_size - piece_offset, m_block_size);
455
 
                        if (p.blocks[i] == 0) break;
456
 
                        TORRENT_ASSERT(offset <= buffer_size);
457
 
                        TORRENT_ASSERT(piece_offset <= piece_size);
458
 
                        TORRENT_ASSERT(offset + block_size <= buffer_size);
459
 
                        if (buf)
460
 
                        {
461
 
                                std::memcpy(p.blocks[i], buf.get() + offset, block_size);
462
 
                        }
463
 
                        else
464
 
                        {
465
 
                                l.unlock();
466
 
                                ret += p.storage->read_impl(p.blocks[i], p.piece, piece_offset, block_size);
467
 
                                if (p.storage->error()) { return -1; }
468
 
                                l.lock();
469
 
                                ++m_cache_stats.reads;
470
 
                        }
471
 
                        offset += m_block_size;
472
 
                        piece_offset += m_block_size;
 
913
                        l.unlock();
 
914
                        ret = p.storage->read_impl(iov, p.piece, start_block * m_block_size, iov_counter);
 
915
                        l.lock();
 
916
                        ++m_cache_stats.reads;
 
917
                        if (p.storage->error())
 
918
                        {
 
919
                                free_piece(p, l);
 
920
                                return -1;
 
921
                        }
 
922
 
 
923
                        if (ret != buffer_size)
 
924
                        {
 
925
                                // this means the file wasn't big enough for this read
 
926
                                p.storage->get_storage_impl()->set_error(""
 
927
                                        , errors::file_too_short);
 
928
                                free_piece(p, l);
 
929
                                return -1;
 
930
                        }
473
931
                }
474
 
                TORRENT_ASSERT(ret <= buffer_size);
475
 
                return (ret != buffer_size) ? -1 : ret;
 
932
 
 
933
                TORRENT_ASSERT(ret == buffer_size);
 
934
                return ret;
476
935
        }
477
936
        
478
 
        bool disk_io_thread::make_room(int num_blocks
479
 
                , cache_t::iterator ignore
480
 
                , mutex_t::scoped_lock& l)
 
937
        // returns -1 on read error, -2 on out of memory error or the number of bytes read
 
938
        // this function ignores the cache size limit, it will read the entire
 
939
        // piece regardless of the offset in j
 
940
        // this is used for seed-mode, where we need to read the entire piece to calculate
 
941
        // the hash
 
942
        int disk_io_thread::cache_read_piece(disk_io_job const& j, mutex_t::scoped_lock& l)
481
943
        {
482
 
                if (m_cache_size - m_cache_stats.cache_size < num_blocks)
483
 
                {
484
 
                        // there's not enough room in the cache, clear a piece
485
 
                        // from the read cache
486
 
                        if (!clear_oldest_read_piece(ignore, l)) return false;
487
 
                }
488
 
 
489
 
                return m_cache_size - m_cache_stats.cache_size >= num_blocks;
 
944
                INVARIANT_CHECK;
 
945
 
 
946
                int piece_size = j.storage->info()->piece_size(j.piece);
 
947
                int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
 
948
 
 
949
                if (in_use() + blocks_in_piece > m_settings.cache_size)
 
950
                        flush_cache_blocks(l, in_use() + blocks_in_piece - m_settings.cache_size, m_read_pieces.end());
 
951
 
 
952
                cached_piece_entry p;
 
953
                p.piece = j.piece;
 
954
                p.storage = j.storage;
 
955
                p.last_use = time_now();
 
956
                p.num_blocks = 0;
 
957
                p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]);
 
958
                if (!p.blocks) return -1;
 
959
                int ret = read_into_piece(p, 0, ignore_cache_size, INT_MAX, l);
 
960
                
 
961
                if (ret >= 0) m_read_pieces.push_back(p);
 
962
 
 
963
                return ret;
490
964
        }
491
965
 
492
966
        // returns -1 on read error, -2 if there isn't any space in the cache
500
974
 
501
975
                int start_block = j.offset / m_block_size;
502
976
 
503
 
                if (!make_room(blocks_in_piece - start_block
504
 
                        , m_read_pieces.end(), l)) return -2;
 
977
                int blocks_to_read = blocks_in_piece - start_block;
 
978
                blocks_to_read = (std::min)(blocks_to_read, (std::max)((m_settings.cache_size
 
979
                        + m_cache_stats.read_cache_size - in_use())/2, 3));
 
980
                blocks_to_read = (std::min)(blocks_to_read, m_settings.read_cache_line_size);
 
981
 
 
982
                if (in_use() + blocks_to_read > m_settings.cache_size)
 
983
                        if (flush_cache_blocks(l, in_use() + blocks_to_read - m_settings.cache_size
 
984
                                , m_read_pieces.end(), dont_flush_write_blocks) == 0)
 
985
                                return -2;
505
986
 
506
987
                cached_piece_entry p;
507
988
                p.piece = j.piece;
508
989
                p.storage = j.storage;
509
990
                p.last_use = time_now();
510
991
                p.num_blocks = 0;
511
 
                p.blocks.reset(new (std::nothrow) char*[blocks_in_piece]);
 
992
                p.blocks.reset(new (std::nothrow) cached_block_entry[blocks_in_piece]);
512
993
                if (!p.blocks) return -1;
513
 
                std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*));
514
 
                int ret = read_into_piece(p, start_block, l);
 
994
                int ret = read_into_piece(p, start_block, 0, blocks_to_read, l);
515
995
                
516
 
                if (ret < 0)
517
 
                        free_piece(p, l);
518
 
                else
519
 
                        m_read_pieces.push_back(p);
 
996
                if (ret >= 0) m_read_pieces.push_back(p);
520
997
 
521
998
                return ret;
522
999
        }
537
1014
                        int blocks = 0;
538
1015
                        for (int k = 0; k < blocks_in_piece; ++k)
539
1016
                        {
540
 
                                if (p.blocks[k])
 
1017
                                if (p.blocks[k].buf)
541
1018
                                {
542
1019
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
543
 
                                        TORRENT_ASSERT(is_disk_buffer(p.blocks[k]));
 
1020
                                        TORRENT_ASSERT(is_disk_buffer(p.blocks[k].buf));
544
1021
#endif
545
1022
                                        ++blocks;
546
1023
                                }
561
1038
                        int blocks = 0;
562
1039
                        for (int k = 0; k < blocks_in_piece; ++k)
563
1040
                        {
564
 
                                if (p.blocks[k])
 
1041
                                if (p.blocks[k].buf)
565
1042
                                {
566
1043
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
567
 
                                        TORRENT_ASSERT(is_disk_buffer(p.blocks[k]));
 
1044
                                        TORRENT_ASSERT(is_disk_buffer(p.blocks[k].buf));
568
1045
#endif
569
1046
                                        ++blocks;
570
1047
                                }
576
1053
                TORRENT_ASSERT(cached_read_blocks + cached_write_blocks == m_cache_stats.cache_size);
577
1054
                TORRENT_ASSERT(cached_read_blocks == m_cache_stats.read_cache_size);
578
1055
 
 
1056
#ifdef TORRENT_DISK_STATS
 
1057
                int read_allocs = m_categories.find(std::string("read cache"))->second;
 
1058
                int write_allocs = m_categories.find(std::string("write cache"))->second;
 
1059
                TORRENT_ASSERT(cached_read_blocks == read_allocs);
 
1060
                TORRENT_ASSERT(cached_write_blocks == write_allocs);
 
1061
#endif
 
1062
 
579
1063
                // when writing, there may be a one block difference, right before an old piece
580
1064
                // is flushed
581
 
                TORRENT_ASSERT(m_cache_stats.cache_size <= m_cache_size + 1);
 
1065
                TORRENT_ASSERT(m_cache_stats.cache_size <= m_settings.cache_size + 1);
582
1066
        }
583
1067
#endif
584
1068
 
 
1069
        int disk_io_thread::read_piece_from_cache_and_hash(disk_io_job const& j, sha1_hash& h)
 
1070
        {
 
1071
                TORRENT_ASSERT(j.buffer);
 
1072
 
 
1073
                mutex_t::scoped_lock l(m_piece_mutex);
 
1074
        
 
1075
                cache_t::iterator p
 
1076
                        = find_cached_piece(m_read_pieces, j, l);
 
1077
 
 
1078
                bool hit = true;
 
1079
                int ret = 0;
 
1080
 
 
1081
                int piece_size = j.storage->info()->piece_size(j.piece);
 
1082
                int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
 
1083
 
 
1084
                if (p != m_read_pieces.end() && p->num_blocks != blocks_in_piece)
 
1085
                {
 
1086
                        // we have the piece in the cache, but not all of the blocks
 
1087
                        ret = read_into_piece(*p, 0, ignore_cache_size, blocks_in_piece, l);
 
1088
                        hit = false;
 
1089
                        if (ret < 0) return ret;
 
1090
                        TORRENT_ASSERT(!m_read_pieces.empty());
 
1091
                        TORRENT_ASSERT(p->piece == j.piece);
 
1092
                        TORRENT_ASSERT(p->storage == j.storage);
 
1093
                }
 
1094
 
 
1095
                // if the piece cannot be found in the cache,
 
1096
                // read the whole piece starting at the block
 
1097
                // we got a request for.
 
1098
                if (p == m_read_pieces.end())
 
1099
                {
 
1100
                        ret = cache_read_piece(j, l);
 
1101
                        hit = false;
 
1102
                        if (ret < 0) return ret;
 
1103
                        p = m_read_pieces.end();
 
1104
                        --p;
 
1105
                        TORRENT_ASSERT(!m_read_pieces.empty());
 
1106
                        TORRENT_ASSERT(p->piece == j.piece);
 
1107
                        TORRENT_ASSERT(p->storage == j.storage);
 
1108
                }
 
1109
 
 
1110
                hasher ctx;
 
1111
 
 
1112
                for (int i = 0; i < blocks_in_piece; ++i)
 
1113
                {
 
1114
                        TORRENT_ASSERT(p->blocks[i].buf);
 
1115
                        ctx.update((char const*)p->blocks[i].buf, (std::min)(piece_size, m_block_size));
 
1116
                        piece_size -= m_block_size;
 
1117
                }
 
1118
                h = ctx.final();
 
1119
 
 
1120
                ret = copy_from_piece(p, hit, j, l);
 
1121
                TORRENT_ASSERT(ret > 0);
 
1122
                if (ret < 0) return ret;
 
1123
 
 
1124
                // if read cache is disabled or we exceeded the
 
1125
                // limit, remove this piece from the cache
 
1126
                if (in_use() >= m_settings.cache_size
 
1127
                        || !m_settings.use_read_cache)
 
1128
                {
 
1129
                        TORRENT_ASSERT(!m_read_pieces.empty());
 
1130
                        TORRENT_ASSERT(p->piece == j.piece);
 
1131
                        TORRENT_ASSERT(p->storage == j.storage);
 
1132
                        if (p != m_read_pieces.end())
 
1133
                        {
 
1134
                                free_piece(*p, l);
 
1135
                                m_read_pieces.erase(p);
 
1136
                        }
 
1137
                }
 
1138
 
 
1139
                ret = j.buffer_size;
 
1140
                ++m_cache_stats.blocks_read;
 
1141
                if (hit) ++m_cache_stats.blocks_read_hit;
 
1142
                return ret;
 
1143
        }
 
1144
 
 
1145
        // this doesn't modify the read cache, it only
 
1146
        // checks to see if the given read request can
 
1147
        // be fully satisfied from the given cached piece
 
1148
        // this is similar to copy_from_piece() but it
 
1149
        // doesn't do anything but determining if it's a
 
1150
        // cache hit or not
 
1151
        bool disk_io_thread::is_cache_hit(cache_t::iterator p
 
1152
                , disk_io_job const& j, mutex_t::scoped_lock& l)
 
1153
        {
 
1154
                int block = j.offset / m_block_size;
 
1155
                int block_offset = j.offset & (m_block_size-1);
 
1156
                int size = j.buffer_size;
 
1157
                int min_blocks_to_read = block_offset > 0 ? 2 : 1;
 
1158
                TORRENT_ASSERT(size <= m_block_size);
 
1159
                int start_block = block;
 
1160
                // if we have to read more than one block, and
 
1161
                // the first block is there, make sure we test
 
1162
                // for the second block
 
1163
                if (p->blocks[start_block].buf != 0 && min_blocks_to_read > 1)
 
1164
                        ++start_block;
 
1165
 
 
1166
                return p->blocks[start_block].buf != 0;
 
1167
        }
 
1168
 
 
1169
        int disk_io_thread::copy_from_piece(cache_t::iterator p, bool& hit
 
1170
                , disk_io_job const& j, mutex_t::scoped_lock& l)
 
1171
        {
 
1172
                TORRENT_ASSERT(j.buffer);
 
1173
 
 
1174
                // update timestamp early so that we
 
1175
                // don't run the risk of evicting our own piece
 
1176
                // when making more room in the cache
 
1177
                p->last_use = time_now();
 
1178
 
 
1179
                // copy from the cache and update the last use timestamp
 
1180
                int block = j.offset / m_block_size;
 
1181
                int block_offset = j.offset & (m_block_size-1);
 
1182
                int buffer_offset = 0;
 
1183
                int size = j.buffer_size;
 
1184
                int min_blocks_to_read = block_offset > 0 ? 2 : 1;
 
1185
                TORRENT_ASSERT(size <= m_block_size);
 
1186
                int start_block = block;
 
1187
                if (p->blocks[start_block].buf != 0 && min_blocks_to_read > 1)
 
1188
                        ++start_block;
 
1189
                // if block_offset > 0, we need to read two blocks, and then
 
1190
                // copy parts of both, because it's not aligned to the block
 
1191
                // boundaries
 
1192
                if (p->blocks[start_block].buf == 0)
 
1193
                {
 
1194
                        int piece_size = j.storage->info()->piece_size(j.piece);
 
1195
                        int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
 
1196
                        int end_block = start_block;
 
1197
                        while (end_block < blocks_in_piece && p->blocks[end_block].buf == 0) ++end_block;
 
1198
 
 
1199
                        int blocks_to_read = end_block - block;
 
1200
                        blocks_to_read = (std::min)(blocks_to_read, (std::max)((m_settings.cache_size
 
1201
                                + m_cache_stats.read_cache_size - in_use())/2, 3));
 
1202
                        blocks_to_read = (std::min)(blocks_to_read, m_settings.read_cache_line_size);
 
1203
                        blocks_to_read = (std::max)(blocks_to_read, min_blocks_to_read);
 
1204
                        if (in_use() + blocks_to_read > m_settings.cache_size)
 
1205
                                if (flush_cache_blocks(l, in_use() + blocks_to_read - m_settings.cache_size
 
1206
                                        , p, dont_flush_write_blocks) == 0)
 
1207
                                        return -2;
 
1208
 
 
1209
                        int ret = read_into_piece(*p, block, 0, blocks_to_read, l);
 
1210
                        hit = false;
 
1211
                        if (ret < 0) return ret;
 
1212
                        if (ret < size + block_offset) return -2;
 
1213
                        TORRENT_ASSERT(p->blocks[block].buf);
 
1214
                }
 
1215
 
 
1216
                while (size > 0)
 
1217
                {
 
1218
                        TORRENT_ASSERT(p->blocks[block].buf);
 
1219
                        int to_copy = (std::min)(m_block_size
 
1220
                                        - block_offset, size);
 
1221
                        std::memcpy(j.buffer + buffer_offset
 
1222
                                        , p->blocks[block].buf + block_offset
 
1223
                                        , to_copy);
 
1224
                        size -= to_copy;
 
1225
                        block_offset = 0;
 
1226
                        buffer_offset += to_copy;
 
1227
                        ++block;
 
1228
                }
 
1229
                return j.buffer_size;
 
1230
        }
 
1231
 
585
1232
        int disk_io_thread::try_read_from_cache(disk_io_job const& j)
586
1233
        {
587
1234
                TORRENT_ASSERT(j.buffer);
588
1235
 
589
1236
                mutex_t::scoped_lock l(m_piece_mutex);
590
 
                if (!m_use_read_cache) return -2;
 
1237
                if (!m_settings.use_read_cache) return -2;
591
1238
 
592
1239
                cache_t::iterator p
593
1240
                        = find_cached_piece(m_read_pieces, j, l);
610
1257
                        TORRENT_ASSERT(p->storage == j.storage);
611
1258
                }
612
1259
 
613
 
                if (p != m_read_pieces.end())
614
 
                {
615
 
                        // copy from the cache and update the last use timestamp
616
 
                        int block = j.offset / m_block_size;
617
 
                        int block_offset = j.offset % m_block_size;
618
 
                        int buffer_offset = 0;
619
 
                        int size = j.buffer_size;
620
 
                        if (p->blocks[block] == 0)
621
 
                        {
622
 
                                int piece_size = j.storage->info()->piece_size(j.piece);
623
 
                                int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
624
 
                                int end_block = block;
625
 
                                while (end_block < blocks_in_piece && p->blocks[end_block] == 0) ++end_block;
626
 
                                if (!make_room(end_block - block, p, l)) return -2;
627
 
                                ret = read_into_piece(*p, block, l);
628
 
                                hit = false;
629
 
                                if (ret < 0) return ret;
630
 
                                TORRENT_ASSERT(p->blocks[block]);
631
 
                        }
632
 
                        
633
 
                        p->last_use = time_now();
634
 
                        while (size > 0)
635
 
                        {
636
 
                                TORRENT_ASSERT(p->blocks[block]);
637
 
                                int to_copy = (std::min)(m_block_size
638
 
                                        - block_offset, size);
639
 
                                std::memcpy(j.buffer + buffer_offset
640
 
                                        , p->blocks[block] + block_offset
641
 
                                        , to_copy);
642
 
                                size -= to_copy;
643
 
                                block_offset = 0;
644
 
                                buffer_offset += to_copy;
645
 
                                ++block;
646
 
                        }
647
 
                        ret = j.buffer_size;
648
 
                        ++m_cache_stats.blocks_read;
649
 
                        if (hit) ++m_cache_stats.blocks_read_hit;
650
 
                }
 
1260
                if (p == m_read_pieces.end()) return ret;
 
1261
 
 
1262
                ret = copy_from_piece(p, hit, j, l);
 
1263
                if (ret < 0) return ret;
 
1264
 
 
1265
                ret = j.buffer_size;
 
1266
                ++m_cache_stats.blocks_read;
 
1267
                if (hit) ++m_cache_stats.blocks_read_hit;
651
1268
                return ret;
652
1269
        }
653
1270
 
 
1271
        size_type disk_io_thread::queue_buffer_size() const
 
1272
        {
 
1273
                mutex_t::scoped_lock l(m_queue_mutex);
 
1274
                return m_queue_buffer_size;
 
1275
        }
 
1276
 
654
1277
        void disk_io_thread::add_job(disk_io_job const& j
655
1278
                , boost::function<void(int, disk_io_job const&)> const& f)
656
1279
        {
657
1280
                TORRENT_ASSERT(!m_abort);
658
 
                TORRENT_ASSERT(!j.callback);
659
 
                TORRENT_ASSERT(j.storage);
 
1281
                TORRENT_ASSERT(j.storage
 
1282
                        || j.action == disk_io_job::abort_thread
 
1283
                        || j.action == disk_io_job::update_settings);
660
1284
                TORRENT_ASSERT(j.buffer_size <= m_block_size);
661
1285
                mutex_t::scoped_lock l(m_queue_mutex);
662
1286
 
663
 
                std::list<disk_io_job>::reverse_iterator i = m_jobs.rbegin();
664
 
                if (j.action == disk_io_job::read)
665
 
                {
666
 
                        // when we're reading, we may not skip
667
 
                        // ahead of any write operation that overlaps
668
 
                        // the region we're reading
669
 
                        for (; i != m_jobs.rend(); i++)
670
 
                        {
671
 
                                // if *i should come before j, stop
672
 
                                // and insert j before i
673
 
                                if (*i < j) break;
674
 
                                // if we come across a write operation that
675
 
                                // overlaps the region we're reading, we need
676
 
                                // to stop
677
 
                                if (i->action == disk_io_job::write
678
 
                                        && i->storage == j.storage
679
 
                                        && i->piece == j.piece
680
 
                                        && range_overlap(i->offset, i->buffer_size
681
 
                                                , j.offset, j.buffer_size))
682
 
                                        break;
683
 
                        }
684
 
                }
685
 
                else if (j.action == disk_io_job::write)
686
 
                {
687
 
                        for (; i != m_jobs.rend(); ++i)
688
 
                        {
689
 
                                if (*i < j)
690
 
                                {
691
 
                                        if (i != m_jobs.rbegin()
692
 
                                                && i.base()->storage.get() != j.storage.get())
693
 
                                                i = m_jobs.rbegin();
694
 
                                        break;
695
 
                                }
696
 
                        }
697
 
                }
698
 
                
699
 
                // if we are placed in front of all other jobs, put it on the back of
700
 
                // the queue, to sweep the disk in the same direction, and to avoid
701
 
                // starvation. The exception is if the priority is higher than the
702
 
                // job at the front of the queue
703
 
                if (i == m_jobs.rend() && (m_jobs.empty() || j.priority <= m_jobs.back().priority))
704
 
                        i = m_jobs.rbegin();
705
 
 
706
 
                std::list<disk_io_job>::iterator k = m_jobs.insert(i.base(), j);
707
 
                k->callback.swap(const_cast<boost::function<void(int, disk_io_job const&)>&>(f));
 
1287
                m_jobs.push_back(j);
 
1288
                m_jobs.back().callback = f;
708
1289
                if (j.action == disk_io_job::write)
709
1290
                        m_queue_buffer_size += j.buffer_size;
710
 
                TORRENT_ASSERT(j.storage.get());
711
1291
                m_signal.notify_all();
712
1292
        }
713
1293
 
714
 
#ifdef TORRENT_DEBUG
715
 
        bool disk_io_thread::is_disk_buffer(char* buffer) const
716
 
        {
717
 
                TORRENT_ASSERT(m_magic == 0x1337);
718
 
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
719
 
                return true;
720
 
#else
721
 
                mutex_t::scoped_lock l(m_pool_mutex);
722
 
#ifdef TORRENT_DISK_STATS
723
 
                if (m_buf_to_category.find(buffer)
724
 
                        == m_buf_to_category.end()) return false;
725
 
#endif
726
 
                return m_pool.is_from(buffer);
727
 
#endif
728
 
        }
729
 
#endif
730
 
 
731
 
        char* disk_io_thread::allocate_buffer()
732
 
        {
733
 
                mutex_t::scoped_lock l(m_pool_mutex);
734
 
                TORRENT_ASSERT(m_magic == 0x1337);
735
 
#ifdef TORRENT_STATS
736
 
                ++m_allocations;
737
 
#endif
738
 
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
739
 
                return (char*)malloc(m_block_size);
740
 
#else
741
 
                m_pool.set_next_size(16);
742
 
                return (char*)m_pool.ordered_malloc();
743
 
#endif
744
 
        }
745
 
 
746
 
        void disk_io_thread::free_buffer(char* buf)
747
 
        {
748
 
                mutex_t::scoped_lock l(m_pool_mutex);
749
 
                TORRENT_ASSERT(m_magic == 0x1337);
750
 
#ifdef TORRENT_STATS
751
 
                --m_allocations;
752
 
#endif
753
 
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
754
 
                free(buf);
755
 
#else
756
 
                m_pool.ordered_free(buf);
757
 
#endif
758
 
        }
759
 
 
760
1294
        bool disk_io_thread::test_error(disk_io_job& j)
761
1295
        {
 
1296
                TORRENT_ASSERT(j.storage);
762
1297
                error_code const& ec = j.storage->error();
763
1298
                if (ec)
764
1299
                {
765
1300
                        j.buffer = 0;
766
 
                        j.str = ec.message();
 
1301
                        j.str.clear();
767
1302
                        j.error = ec;
768
1303
                        j.error_file = j.storage->error_file();
 
1304
#ifdef TORRENT_DEBUG
 
1305
                        std::cout << "ERROR: '" << ec.message() << " in " 
 
1306
                                << j.error_file << std::endl;
 
1307
#endif
769
1308
                        j.storage->clear_error();
770
 
#ifdef TORRENT_DEBUG
771
 
                        std::cout << "ERROR: '" << j.str << "' " << j.error_file << std::endl;
772
 
#endif
773
1309
                        return true;
774
1310
                }
775
1311
                return false;
776
1312
        }
777
1313
 
 
1314
        void disk_io_thread::post_callback(
 
1315
                boost::function<void(int, disk_io_job const&)> const& handler
 
1316
                , disk_io_job const& j, int ret)
 
1317
        {
 
1318
                if (!handler) return;
 
1319
 
 
1320
                m_ios.post(boost::bind(handler, ret, j));
 
1321
        }
 
1322
 
 
1323
        enum action_flags_t
 
1324
        {
 
1325
                read_operation = 1
 
1326
                , buffer_operation = 2
 
1327
                , cancel_on_abort = 4
 
1328
        };
 
1329
 
 
1330
        static const boost::uint8_t action_flags[] =
 
1331
        {
 
1332
                read_operation + buffer_operation + cancel_on_abort // read
 
1333
                , buffer_operation // write
 
1334
                , 0 // hash
 
1335
                , 0 // move_storage
 
1336
                , 0 // release_files
 
1337
                , 0 // delete_files
 
1338
                , 0 // check_fastresume
 
1339
                , read_operation + cancel_on_abort // check_files
 
1340
                , 0 // save_resume_data
 
1341
                , 0 // rename_file
 
1342
                , 0 // abort_thread
 
1343
                , 0 // clear_read_cache
 
1344
                , 0 // abort_torrent
 
1345
                , cancel_on_abort // update_settings
 
1346
                , read_operation + cancel_on_abort // read_and_hash
 
1347
        };
 
1348
 
 
1349
        bool should_cancel_on_abort(disk_io_job const& j)
 
1350
        {
 
1351
                TORRENT_ASSERT(j.action >= 0 && j.action < sizeof(action_flags));
 
1352
                return action_flags[j.action] & cancel_on_abort;
 
1353
        }
 
1354
 
 
1355
        bool is_read_operation(disk_io_job const& j)
 
1356
        {
 
1357
                TORRENT_ASSERT(j.action >= 0 && j.action < sizeof(action_flags));
 
1358
                return action_flags[j.action] & read_operation;
 
1359
        }
 
1360
 
 
1361
        bool operation_has_buffer(disk_io_job const& j)
 
1362
        {
 
1363
                TORRENT_ASSERT(j.action >= 0 && j.action < sizeof(action_flags));
 
1364
                return action_flags[j.action] & buffer_operation;
 
1365
        }
 
1366
 
778
1367
        void disk_io_thread::operator()()
779
1368
        {
 
1369
                // 1 = forward in list, -1 = backwards in list
 
1370
                int elevator_direction = 1;
 
1371
 
 
1372
                typedef std::multimap<size_type, disk_io_job> read_jobs_t;
 
1373
                read_jobs_t sorted_read_jobs;
 
1374
                read_jobs_t::iterator elevator_job_pos = sorted_read_jobs.begin();
 
1375
 
780
1376
                for (;;)
781
1377
                {
782
1378
#ifdef TORRENT_DISK_STATS
784
1380
#endif
785
1381
                        mutex_t::scoped_lock jl(m_queue_mutex);
786
1382
 
787
 
                        while (m_jobs.empty() && !m_abort)
 
1383
                        while (m_jobs.empty() && sorted_read_jobs.empty() && !m_abort)
 
1384
                        {
 
1385
                                // if there hasn't been an event in one second
 
1386
                                // see if we should flush the cache
 
1387
//                              if (!m_signal.timed_wait(jl, boost::posix_time::seconds(1)))
 
1388
//                                      flush_expired_pieces();
788
1389
                                m_signal.wait(jl);
 
1390
                        }
 
1391
 
789
1392
                        if (m_abort && m_jobs.empty())
790
1393
                        {
791
1394
                                jl.unlock();
794
1397
                                // flush all disk caches
795
1398
                                for (cache_t::iterator i = m_pieces.begin()
796
1399
                                        , end(m_pieces.end()); i != end; ++i)
797
 
                                        flush(i, l);
 
1400
                                        flush_range(i, 0, INT_MAX, l);
798
1401
                                for (cache_t::iterator i = m_read_pieces.begin()
799
1402
                                        , end(m_read_pieces.end()); i != end; ++i)
800
1403
                                        free_piece(*i, l);
806
1409
                                return;
807
1410
                        }
808
1411
 
 
1412
                        disk_io_job j;
 
1413
 
 
1414
                        if (!m_jobs.empty())
 
1415
                        {
 
1416
                                // we have a job in the job queue. If it's
 
1417
                                // a read operation and we are allowed to
 
1418
                                // reorder jobs, sort it into the read job
 
1419
                                // list and continue, otherwise just pop it
 
1420
                                // and use it later
 
1421
                                j = m_jobs.front();
 
1422
                                m_jobs.pop_front();
 
1423
                                if (j.action == disk_io_job::write)
 
1424
                                {
 
1425
                                        TORRENT_ASSERT(m_queue_buffer_size >= j.buffer_size);
 
1426
                                        m_queue_buffer_size -= j.buffer_size;
 
1427
                                }
 
1428
 
 
1429
                                jl.unlock();
 
1430
 
 
1431
                                bool defer = false;
 
1432
 
 
1433
                                if (is_read_operation(j))
 
1434
                                {
 
1435
                                        defer = true;
 
1436
 
 
1437
                                        // at this point the operation we're looking
 
1438
                                        // at is a read operation. If this read operation
 
1439
                                        // can be fully satisfied by the read cache, handle
 
1440
                                        // it immediately
 
1441
                                        if (m_settings.use_read_cache)
 
1442
                                        {
 
1443
#ifdef TORRENT_DISK_STATS
 
1444
                                                m_log << log_time() << " check_cache_hit" << std::endl;
 
1445
#endif
 
1446
                                                // unfortunately we need to lock the cache
 
1447
                                                // if the cache querying function would be
 
1448
                                                // made asyncronous, this would not be
 
1449
                                                // necessary anymore
 
1450
                                                mutex_t::scoped_lock l(m_piece_mutex);
 
1451
                                                cache_t::iterator p
 
1452
                                                        = find_cached_piece(m_read_pieces, j, l);
 
1453
                                
 
1454
                                                // if it's a cache hit, process the job immediately
 
1455
                                                if (p != m_read_pieces.end() && is_cache_hit(p, j, l))
 
1456
                                                        defer = false;
 
1457
                                        }
 
1458
                                }
 
1459
 
 
1460
                                TORRENT_ASSERT(j.offset >= 0);
 
1461
                                if (m_settings.allow_reordered_disk_operations && defer)
 
1462
                                {
 
1463
#ifdef TORRENT_DISK_STATS
 
1464
                                        m_log << log_time() << " sorting_job" << std::endl;
 
1465
#endif
 
1466
                                        size_type phys_off = j.storage->physical_offset(j.piece, j.offset);
 
1467
                                        sorted_read_jobs.insert(std::pair<size_type, disk_io_job>(phys_off, j));
 
1468
                                        continue;
 
1469
                                }
 
1470
                        }
 
1471
                        else
 
1472
                        {
 
1473
                                // the job queue is empty, pick the next read job
 
1474
                                // from the sorted job list. So we don't need the
 
1475
                                // job queue lock anymore
 
1476
                                jl.unlock();
 
1477
 
 
1478
                                TORRENT_ASSERT(!sorted_read_jobs.empty());
 
1479
 
 
1480
                                // if we've reached the end, change the elevator direction
 
1481
                                if (elevator_job_pos == sorted_read_jobs.end() && elevator_direction == 1)
 
1482
                                {
 
1483
                                        elevator_direction = -1;
 
1484
                                        --elevator_job_pos;
 
1485
                                }
 
1486
 
 
1487
                                j = elevator_job_pos->second;
 
1488
                                read_jobs_t::iterator to_erase = elevator_job_pos;
 
1489
 
 
1490
                                // if we've reached the begining of the sorted list,
 
1491
                                // change the elvator direction
 
1492
                                if (elevator_job_pos == sorted_read_jobs.begin() && elevator_direction == -1)
 
1493
                                        elevator_direction = 1;
 
1494
 
 
1495
                                // move the elevator before erasing the job we're processing
 
1496
                                // to keep the iterator valid
 
1497
                                if (elevator_direction > 0) ++elevator_job_pos;
 
1498
                                else --elevator_job_pos;
 
1499
 
 
1500
                                sorted_read_jobs.erase(to_erase);
 
1501
                        }
 
1502
 
809
1503
                        // if there's a buffer in this job, it will be freed
810
1504
                        // when this holder is destructed, unless it has been
811
1505
                        // released.
812
1506
                        disk_buffer_holder holder(*this
813
 
                                , m_jobs.front().action != disk_io_job::check_fastresume
814
 
                                ? m_jobs.front().buffer : 0);
815
 
 
816
 
                        boost::function<void(int, disk_io_job const&)> handler;
817
 
                        handler.swap(m_jobs.front().callback);
818
 
 
819
 
                        disk_io_job j = m_jobs.front();
820
 
                        m_jobs.pop_front();
821
 
                        m_queue_buffer_size -= j.buffer_size;
822
 
                        jl.unlock();
 
1507
                                , operation_has_buffer(j) ? j.buffer : 0);
 
1508
  
 
1509
                        bool post = false;
 
1510
                        if (m_queue_buffer_size + j.buffer_size >= m_settings.max_queued_disk_bytes
 
1511
                                && m_queue_buffer_size < m_settings.max_queued_disk_bytes
 
1512
                                && m_queue_callback
 
1513
                                && m_settings.max_queued_disk_bytes > 0)
 
1514
                        {
 
1515
                                // we just dropped below the high watermark of number of bytes
 
1516
                                // queued for writing to the disk. Notify the session so that it
 
1517
                                // can trigger all the connections waiting for this event
 
1518
                                post = true;
 
1519
                        }
 
1520
 
 
1521
                        if (post) m_ios.post(m_queue_callback);
823
1522
 
824
1523
                        flush_expired_pieces();
825
1524
 
826
1525
                        int ret = 0;
827
1526
 
828
 
                        TORRENT_ASSERT(j.storage || j.action == disk_io_job::abort_thread);
 
1527
                        TORRENT_ASSERT(j.storage
 
1528
                                || j.action == disk_io_job::abort_thread
 
1529
                                || j.action == disk_io_job::update_settings);
829
1530
#ifdef TORRENT_DISK_STATS
830
1531
                        ptime start = time_now();
831
1532
#endif
833
1534
                        try {
834
1535
#endif
835
1536
 
 
1537
                        if (j.storage && j.storage->get_storage_impl()->m_settings == 0)
 
1538
                                j.storage->get_storage_impl()->m_settings = &m_settings;
 
1539
 
836
1540
                        switch (j.action)
837
1541
                        {
 
1542
                                case disk_io_job::update_settings:
 
1543
                                {
 
1544
#ifdef TORRENT_DISK_STATS
 
1545
                                        m_log << log_time() << " update_settings " << std::endl;
 
1546
#endif
 
1547
                                        TORRENT_ASSERT(j.buffer);
 
1548
                                        session_settings const& s = *((session_settings*)j.buffer);
 
1549
                                        TORRENT_ASSERT(s.cache_size >= 0);
 
1550
                                        TORRENT_ASSERT(s.cache_expiry > 0);
 
1551
 
 
1552
#if defined TORRENT_WINDOWS
 
1553
                                        if (m_settings.low_prio_disk != s.low_prio_disk)
 
1554
                                        {
 
1555
                                                m_file_pool.set_low_prio_io(s.low_prio_disk);
 
1556
                                                // we need to close all files, since the prio
 
1557
                                                // only takes affect when files are opened
 
1558
                                                m_file_pool.release(0);
 
1559
                                        }
 
1560
#endif
 
1561
                                        m_settings = s;
 
1562
                                        m_file_pool.resize(m_settings.file_pool_size);
 
1563
#if defined __APPLE__ && defined __MACH__ && MAC_OS_X_VERSION_MIN_REQUIRED >= 1050
 
1564
                                        setiopolicy_np(IOPOL_TYPE_DISK, IOPOL_SCOPE_THREAD
 
1565
                                                , m_settings.low_prio_disk ? IOPOL_THROTTLE : IOPOL_DEFAULT);
 
1566
#endif
 
1567
                                        break;
 
1568
                                }
838
1569
                                case disk_io_job::abort_torrent:
839
1570
                                {
 
1571
#ifdef TORRENT_DISK_STATS
 
1572
                                        m_log << log_time() << " abort_torrent " << std::endl;
 
1573
#endif
840
1574
                                        mutex_t::scoped_lock jl(m_queue_mutex);
841
1575
                                        for (std::list<disk_io_job>::iterator i = m_jobs.begin();
842
1576
                                                i != m_jobs.end();)
846
1580
                                                        ++i;
847
1581
                                                        continue;
848
1582
                                                }
849
 
                                                if (i->action == disk_io_job::check_files)
 
1583
                                                if (should_cancel_on_abort(*i))
850
1584
                                                {
851
 
                                                        if (i->callback) m_ios.post(boost::bind(i->callback
852
 
                                                                        , piece_manager::disk_check_aborted, *i));
 
1585
                                                        if (i->action == disk_io_job::write)
 
1586
                                                        {
 
1587
                                                                TORRENT_ASSERT(m_queue_buffer_size >= i->buffer_size);
 
1588
                                                                m_queue_buffer_size -= i->buffer_size;
 
1589
                                                        }
 
1590
                                                        post_callback(i->callback, *i, -3);
853
1591
                                                        m_jobs.erase(i++);
854
1592
                                                        continue;
855
1593
                                                }
856
1594
                                                ++i;
857
1595
                                        }
 
1596
                                        // now clear all the read jobs
 
1597
                                        for (read_jobs_t::iterator i = sorted_read_jobs.begin();
 
1598
                                                i != sorted_read_jobs.end();)
 
1599
                                        {
 
1600
                                                if (i->second.storage != j.storage)
 
1601
                                                {
 
1602
                                                        ++i;
 
1603
                                                        continue;
 
1604
                                                }
 
1605
                                                post_callback(i->second.callback, i->second, -3);
 
1606
                                                sorted_read_jobs.erase(i++);
 
1607
                                        }
858
1608
                                        jl.unlock();
859
1609
 
860
1610
                                        mutex_t::scoped_lock l(m_piece_mutex);
873
1623
                                                }
874
1624
                                        }
875
1625
                                        l.unlock();
876
 
 
877
 
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
878
 
                                        {
879
 
                                                mutex_t::scoped_lock l(m_pool_mutex);
880
 
                                                m_pool.release_memory();
881
 
                                        }
882
 
#endif
 
1626
                                        release_memory();
883
1627
                                        break;
884
1628
                                }
885
1629
                                case disk_io_job::abort_thread:
886
1630
                                {
 
1631
#ifdef TORRENT_DISK_STATS
 
1632
                                        m_log << log_time() << " abort_thread " << std::endl;
 
1633
#endif
 
1634
                                        // clear all read jobs
887
1635
                                        mutex_t::scoped_lock jl(m_queue_mutex);
888
1636
 
889
1637
                                        for (std::list<disk_io_job>::iterator i = m_jobs.begin();
890
 
                                                        i != m_jobs.end();)
 
1638
                                                i != m_jobs.end();)
891
1639
                                        {
892
 
                                                if (i->action == disk_io_job::read)
893
 
                                                {
894
 
                                                        if (i->callback) m_ios.post(boost::bind(i->callback, -1, *i));
895
 
                                                        m_jobs.erase(i++);
896
 
                                                        continue;
897
 
                                                }
898
 
                                                if (i->action == disk_io_job::check_files)
899
 
                                                {
900
 
                                                        if (i->callback) m_ios.post(bind(i->callback
901
 
                                                                , piece_manager::disk_check_aborted, *i));
 
1640
                                                if (should_cancel_on_abort(*i))
 
1641
                                                {
 
1642
                                                        if (i->action == disk_io_job::write)
 
1643
                                                        {
 
1644
                                                                TORRENT_ASSERT(m_queue_buffer_size >= i->buffer_size);
 
1645
                                                                m_queue_buffer_size -= i->buffer_size;
 
1646
                                                        }
 
1647
                                                        post_callback(i->callback, *i, -3);
902
1648
                                                        m_jobs.erase(i++);
903
1649
                                                        continue;
904
1650
                                                }
905
1651
                                                ++i;
906
1652
                                        }
 
1653
                                        jl.unlock();
907
1654
 
 
1655
                                        for (read_jobs_t::iterator i = sorted_read_jobs.begin();
 
1656
                                                i != sorted_read_jobs.end();)
 
1657
                                        {
 
1658
                                                if (i->second.storage != j.storage)
 
1659
                                                {
 
1660
                                                        ++i;
 
1661
                                                        continue;
 
1662
                                                }
 
1663
                                                post_callback(i->second.callback, i->second, -3);
 
1664
                                                sorted_read_jobs.erase(i++);
 
1665
                                        }
908
1666
                                        m_abort = true;
909
1667
                                        break;
910
1668
                                }
 
1669
                                case disk_io_job::read_and_hash:
 
1670
                                {
 
1671
#ifdef TORRENT_DISK_STATS
 
1672
                                        m_log << log_time() << " read_and_hash " << j.buffer_size << std::endl;
 
1673
#endif
 
1674
                                        INVARIANT_CHECK;
 
1675
                                        TORRENT_ASSERT(j.buffer == 0);
 
1676
                                        j.buffer = allocate_buffer("send buffer");
 
1677
                                        TORRENT_ASSERT(j.buffer_size <= m_block_size);
 
1678
                                        if (j.buffer == 0)
 
1679
                                        {
 
1680
                                                ret = -1;
 
1681
#if BOOST_VERSION == 103500
 
1682
                                                j.error = error_code(boost::system::posix_error::not_enough_memory
 
1683
                                                        , get_posix_category());
 
1684
#elif BOOST_VERSION > 103500
 
1685
                                                j.error = error_code(boost::system::errc::not_enough_memory
 
1686
                                                        , get_posix_category());
 
1687
#else
 
1688
                                                j.error = asio::error::no_memory;
 
1689
#endif
 
1690
                                                j.str.clear();
 
1691
                                                break;
 
1692
                                        }
 
1693
 
 
1694
                                        disk_buffer_holder read_holder(*this, j.buffer);
 
1695
 
 
1696
                                        // read the entire piece and verify the piece hash
 
1697
                                        // since we need to check the hash, this function
 
1698
                                        // will ignore the cache size limit (at least for
 
1699
                                        // reading and hashing, not for keeping it around)
 
1700
                                        sha1_hash h;
 
1701
                                        ret = read_piece_from_cache_and_hash(j, h);
 
1702
 
 
1703
                                        // -2 means there's no space in the read cache
 
1704
                                        // or that the read cache is disabled
 
1705
                                        if (ret == -1)
 
1706
                                        {
 
1707
                                                test_error(j);
 
1708
                                                break;
 
1709
                                        }
 
1710
                                        if (!m_settings.disable_hash_checks)
 
1711
                                                ret = (j.storage->info()->hash_for_piece(j.piece) == h)?ret:-3;
 
1712
                                        if (ret == -3)
 
1713
                                        {
 
1714
                                                j.storage->mark_failed(j.piece);
 
1715
                                                j.error = errors::failed_hash_check;
 
1716
                                                j.str.clear();
 
1717
                                                j.buffer = 0;
 
1718
                                                break;
 
1719
                                        }
 
1720
 
 
1721
                                        TORRENT_ASSERT(j.buffer == read_holder.get());
 
1722
                                        read_holder.release();
 
1723
#if TORRENT_DISK_STATS
 
1724
                                        rename_buffer(j.buffer, "released send buffer");
 
1725
#endif
 
1726
                                        break;
 
1727
                                }
911
1728
                                case disk_io_job::read:
912
1729
                                {
913
1730
                                        if (test_error(j))
920
1737
#endif
921
1738
                                        INVARIANT_CHECK;
922
1739
                                        TORRENT_ASSERT(j.buffer == 0);
923
 
                                        j.buffer = allocate_buffer();
 
1740
                                        j.buffer = allocate_buffer("send buffer");
924
1741
                                        TORRENT_ASSERT(j.buffer_size <= m_block_size);
925
1742
                                        if (j.buffer == 0)
926
1743
                                        {
927
1744
                                                ret = -1;
928
 
                                                j.error = error_code(ENOMEM, get_posix_category());
929
 
                                                j.str = j.error.message();
 
1745
#if BOOST_VERSION == 103500
 
1746
                                                j.error = error_code(boost::system::posix_error::not_enough_memory
 
1747
                                                        , get_posix_category());
 
1748
#elif BOOST_VERSION > 103500
 
1749
                                                j.error = error_code(boost::system::errc::not_enough_memory
 
1750
                                                        , get_posix_category());
 
1751
#else
 
1752
                                                j.error = asio::error::no_memory;
 
1753
#endif
 
1754
                                                j.str.clear();
930
1755
                                                break;
931
1756
                                        }
932
1757
 
933
1758
                                        disk_buffer_holder read_holder(*this, j.buffer);
 
1759
 
934
1760
                                        ret = try_read_from_cache(j);
935
1761
 
936
1762
                                        // -2 means there's no space in the read cache
937
1763
                                        // or that the read cache is disabled
938
1764
                                        if (ret == -1)
939
1765
                                        {
 
1766
                                                j.buffer = 0;
940
1767
                                                test_error(j);
941
1768
                                                break;
942
1769
                                        }
943
1770
                                        else if (ret == -2)
944
1771
                                        {
945
 
                                                ret = j.storage->read_impl(j.buffer, j.piece, j.offset
946
 
                                                        , j.buffer_size);
 
1772
                                                file::iovec_t b = { j.buffer, j.buffer_size };
 
1773
                                                ret = j.storage->read_impl(&b, j.piece, j.offset, 1);
947
1774
                                                if (ret < 0)
948
1775
                                                {
949
1776
                                                        test_error(j);
950
1777
                                                        break;
951
1778
                                                }
 
1779
                                                if (ret != j.buffer_size)
 
1780
                                                {
 
1781
                                                        // this means the file wasn't big enough for this read
 
1782
                                                        j.buffer = 0;
 
1783
                                                        j.error = errors::file_too_short;
 
1784
                                                        j.error_file.clear();
 
1785
                                                        j.str.clear();
 
1786
                                                        ret = -1;
 
1787
                                                        break;
 
1788
                                                }
952
1789
                                                ++m_cache_stats.blocks_read;
953
1790
                                        }
954
1791
                                        TORRENT_ASSERT(j.buffer == read_holder.get());
955
1792
                                        read_holder.release();
 
1793
#if TORRENT_DISK_STATS
 
1794
                                        rename_buffer(j.buffer, "released send buffer");
 
1795
#endif
956
1796
                                        break;
957
1797
                                }
958
1798
                                case disk_io_job::write:
959
1799
                                {
960
 
                                        if (test_error(j))
961
 
                                        {
962
 
                                                ret = -1;
963
 
                                                break;
964
 
                                        }
965
1800
#ifdef TORRENT_DISK_STATS
966
1801
                                        m_log << log_time() << " write " << j.buffer_size << std::endl;
967
1802
#endif
968
1803
                                        mutex_t::scoped_lock l(m_piece_mutex);
969
1804
                                        INVARIANT_CHECK;
 
1805
 
 
1806
                                        if (in_use() >= m_settings.cache_size)
 
1807
                                                flush_cache_blocks(l, in_use() - m_settings.cache_size + 1, m_read_pieces.end());
 
1808
 
970
1809
                                        cache_t::iterator p
971
1810
                                                = find_cached_piece(m_pieces, j, l);
972
1811
                                        int block = j.offset / m_block_size;
974
1813
                                        TORRENT_ASSERT(j.buffer_size <= m_block_size);
975
1814
                                        if (p != m_pieces.end())
976
1815
                                        {
977
 
                                                TORRENT_ASSERT(p->blocks[block] == 0);
978
 
 
979
 
                                                if (p->blocks[block])
 
1816
                                                TORRENT_ASSERT(p->blocks[block].buf == 0);
 
1817
                                                if (p->blocks[block].buf)
980
1818
                                                {
981
 
                                                        free_buffer(p->blocks[block]);
 
1819
                                                        free_buffer(p->blocks[block].buf);
 
1820
                                                        --m_cache_stats.cache_size;
982
1821
                                                        --p->num_blocks;
983
1822
                                                }
984
 
                                                p->blocks[block] = j.buffer;
 
1823
                                                p->blocks[block].buf = j.buffer;
 
1824
                                                p->blocks[block].callback.swap(j.callback);
 
1825
#ifdef TORRENT_DISK_STATS
 
1826
                                                rename_buffer(j.buffer, "write cache");
 
1827
#endif
985
1828
                                                ++m_cache_stats.cache_size;
986
1829
                                                ++p->num_blocks;
987
1830
                                                p->last_use = time_now();
 
1831
                                                // we might just have created a contiguous range
 
1832
                                                // that meets the requirement to be flushed. try it
 
1833
                                                flush_contiguous_blocks(p, l, m_settings.write_cache_line_size);
988
1834
                                        }
989
1835
                                        else
990
1836
                                        {
991
 
                                                if (cache_block(j, l) < 0)
 
1837
                                                if (cache_block(j, j.callback, l) < 0)
992
1838
                                                {
993
 
                                                        ret = j.storage->write_impl(j.buffer, j.piece, j.offset, j.buffer_size);
 
1839
                                                        l.unlock();
 
1840
                                                        file::iovec_t iov = {j.buffer, j.buffer_size};
 
1841
                                                        ret = j.storage->write_impl(&iov, j.piece, j.offset, 1);
 
1842
                                                        l.lock();
994
1843
                                                        if (ret < 0)
995
1844
                                                        {
996
1845
                                                                test_error(j);
1003
1852
                                        // in the cache, we should not
1004
1853
                                        // free it at the end
1005
1854
                                        holder.release();
1006
 
                                        if (m_cache_stats.cache_size >= m_cache_size)
1007
 
                                                flush_oldest_piece(l);
 
1855
 
 
1856
                                        if (in_use() > m_settings.cache_size)
 
1857
                                                flush_cache_blocks(l, in_use() - m_settings.cache_size, m_read_pieces.end());
 
1858
 
1008
1859
                                        break;
1009
1860
                                }
1010
1861
                                case disk_io_job::hash:
1028
1879
                                                }
1029
1880
                                        }
1030
1881
                                        l.unlock();
 
1882
                                        if (m_settings.disable_hash_checks)
 
1883
                                        {
 
1884
                                                ret = 0;
 
1885
                                                break;
 
1886
                                        }
1031
1887
                                        sha1_hash h = j.storage->hash_for_piece_impl(j.piece);
1032
1888
                                        if (test_error(j))
1033
1889
                                        {
1035
1891
                                                j.storage->mark_failed(j.piece);
1036
1892
                                                break;
1037
1893
                                        }
 
1894
 
1038
1895
                                        ret = (j.storage->info()->hash_for_piece(j.piece) == h)?0:-2;
1039
1896
                                        if (ret == -2) j.storage->mark_failed(j.piece);
1040
1897
                                        break;
1068
1925
                                        {
1069
1926
                                                if (i->storage == j.storage)
1070
1927
                                                {
1071
 
                                                        flush(i, l);
 
1928
                                                        flush_range(i, 0, INT_MAX, l);
1072
1929
                                                        i = m_pieces.erase(i);
1073
1930
                                                }
1074
1931
                                                else
1077
1934
                                                }
1078
1935
                                        }
1079
1936
                                        l.unlock();
1080
 
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
1081
 
                                        {
1082
 
                                                mutex_t::scoped_lock l(m_pool_mutex);
1083
 
                                                TORRENT_ASSERT(m_magic == 0x1337);
1084
 
                                                m_pool.release_memory();
1085
 
                                        }
1086
 
#endif
 
1937
                                        release_memory();
 
1938
 
1087
1939
                                        ret = j.storage->release_files_impl();
1088
1940
                                        if (ret != 0) test_error(j);
1089
1941
                                        break;
1112
1964
                                                }
1113
1965
                                        }
1114
1966
                                        l.unlock();
1115
 
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
1116
 
                                        {
1117
 
                                                mutex_t::scoped_lock l(m_pool_mutex);
1118
 
                                                m_pool.release_memory();
1119
 
                                        }
1120
 
#endif
 
1967
                                        release_memory();
1121
1968
                                        ret = 0;
1122
1969
                                        break;
1123
1970
                                }
1140
1987
                                                int blocks_in_piece = (ti.piece_size(k->piece) + m_block_size - 1) / m_block_size;
1141
1988
                                                for (int j = 0; j < blocks_in_piece; ++j)
1142
1989
                                                {
1143
 
                                                        if (k->blocks[j] == 0) continue;
1144
 
                                                        free_buffer(k->blocks[j]);
1145
 
                                                        k->blocks[j] = 0;
 
1990
                                                        if (k->blocks[j].buf == 0) continue;
 
1991
                                                        free_buffer(k->blocks[j].buf);
 
1992
                                                        k->blocks[j].buf = 0;
1146
1993
                                                        --m_cache_stats.cache_size;
1147
1994
                                                }
1148
1995
                                        }
1149
1996
                                        m_pieces.erase(i, m_pieces.end());
1150
1997
                                        l.unlock();
1151
 
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
1152
 
                                        {
1153
 
                                                mutex_t::scoped_lock l(m_pool_mutex);
1154
 
                                                m_pool.release_memory();
1155
 
                                        }
1156
 
#endif
 
1998
                                        release_memory();
 
1999
 
1157
2000
                                        ret = j.storage->delete_files_impl();
1158
2001
                                        if (ret != 0) test_error(j);
1159
2002
                                        break;
1161
2004
                                case disk_io_job::check_fastresume:
1162
2005
                                {
1163
2006
#ifdef TORRENT_DISK_STATS
1164
 
                                        m_log << log_time() << " check fastresume" << std::endl;
 
2007
                                        m_log << log_time() << " check_fastresume" << std::endl;
1165
2008
#endif
1166
2009
                                        lazy_entry const* rd = (lazy_entry const*)j.buffer;
1167
2010
                                        TORRENT_ASSERT(rd != 0);
1168
 
                                        ret = j.storage->check_fastresume(*rd, j.str);
 
2011
                                        ret = j.storage->check_fastresume(*rd, j.error);
1169
2012
                                        break;
1170
2013
                                }
1171
2014
                                case disk_io_job::check_files:
1172
2015
                                {
1173
2016
#ifdef TORRENT_DISK_STATS
1174
 
                                        m_log << log_time() << " check files" << std::endl;
 
2017
                                        m_log << log_time() << " check_files" << std::endl;
1175
2018
#endif
1176
2019
                                        int piece_size = j.storage->info()->piece_length();
1177
2020
                                        for (int processed = 0; processed < 4 * 1024 * 1024; processed += piece_size)
1178
2021
                                        {
1179
 
                                                ret = j.storage->check_files(j.piece, j.offset, j.str);
 
2022
                                                ptime now = time_now_hires();
 
2023
                                                TORRENT_ASSERT(now >= m_last_file_check);
 
2024
#if BOOST_VERSION > 103600
 
2025
                                                if (now - m_last_file_check < milliseconds(m_settings.file_checks_delay_per_block))
 
2026
                                                {
 
2027
                                                        int sleep_time = m_settings.file_checks_delay_per_block
 
2028
                                                                * (piece_size / (16 * 1024))
 
2029
                                                                - total_milliseconds(now - m_last_file_check);
 
2030
                                                        if (sleep_time < 0) sleep_time = 0;
 
2031
                                                        TORRENT_ASSERT(sleep_time < 5 * 1000);
 
2032
        
 
2033
                                                        boost::thread::sleep(boost::get_system_time()
 
2034
                                                                + boost::posix_time::milliseconds(sleep_time));
 
2035
                                                }
 
2036
                                                m_last_file_check = time_now_hires();
 
2037
#endif
 
2038
 
 
2039
                                                if (m_waiting_to_shutdown) break;
 
2040
 
 
2041
                                                ret = j.storage->check_files(j.piece, j.offset, j.error);
1180
2042
 
1181
2043
#ifndef BOOST_NO_EXCEPTIONS
1182
2044
                                                try {
1183
2045
#endif
1184
 
                                                        TORRENT_ASSERT(handler);
1185
 
                                                        if (handler && ret == piece_manager::need_full_check)
1186
 
                                                                m_ios.post(bind(handler, ret, j));
 
2046
                                                        TORRENT_ASSERT(j.callback);
 
2047
                                                        if (j.callback && ret == piece_manager::need_full_check)
 
2048
                                                                post_callback(j.callback, j, ret);
1187
2049
#ifndef BOOST_NO_EXCEPTIONS
1188
2050
                                                } catch (std::exception&) {}
1189
2051
#endif
1199
2061
                                        // if the check is not done, add it at the end of the job queue
1200
2062
                                        if (ret == piece_manager::need_full_check)
1201
2063
                                        {
1202
 
                                                add_job(j, handler);
 
2064
                                                // offset needs to be reset to 0 so that the disk
 
2065
                                                // job sorting can be done correctly
 
2066
                                                j.offset = 0;
 
2067
                                                add_job(j, j.callback);
1203
2068
                                                continue;
1204
2069
                                        }
1205
2070
                                        break;
1207
2072
                                case disk_io_job::save_resume_data:
1208
2073
                                {
1209
2074
#ifdef TORRENT_DISK_STATS
1210
 
                                        m_log << log_time() << " save resume data" << std::endl;
 
2075
                                        m_log << log_time() << " save_resume_data" << std::endl;
1211
2076
#endif
1212
2077
                                        j.resume_data.reset(new entry(entry::dictionary_t));
1213
2078
                                        j.storage->write_resume_data(*j.resume_data);
1217
2082
                                case disk_io_job::rename_file:
1218
2083
                                {
1219
2084
#ifdef TORRENT_DISK_STATS
1220
 
                                        m_log << log_time() << " rename file" << std::endl;
 
2085
                                        m_log << log_time() << " rename_file" << std::endl;
1221
2086
#endif
1222
2087
                                        ret = j.storage->rename_file_impl(j.piece, j.str);
1223
2088
                                        if (ret != 0)
1228
2093
                                }
1229
2094
                        }
1230
2095
#ifndef BOOST_NO_EXCEPTIONS
1231
 
                        } catch (std::exception& e)
 
2096
                        }
 
2097
                        catch (std::exception& e)
1232
2098
                        {
1233
2099
                                ret = -1;
1234
2100
                                try
1239
2105
                        }
1240
2106
#endif
1241
2107
 
1242
 
//                      if (!handler) std::cerr << "DISK THREAD: no callback specified" << std::endl;
 
2108
//                      if (!j.callback) std::cerr << "DISK THREAD: no callback specified" << std::endl;
1243
2109
//                      else std::cerr << "DISK THREAD: invoking callback" << std::endl;
1244
2110
#ifndef BOOST_NO_EXCEPTIONS
1245
2111
                        try {
1246
2112
#endif
1247
2113
                                TORRENT_ASSERT(ret != -2 || !j.str.empty()
1248
2114
                                        || j.action == disk_io_job::hash);
1249
 
                                if (handler) m_ios.post(boost::bind(handler, ret, j));
 
2115
#if TORRENT_DISK_STATS
 
2116
                                if ((j.action == disk_io_job::read || j.action == disk_io_job::read_and_hash)
 
2117
                                        && j.buffer != 0)
 
2118
                                        rename_buffer(j.buffer, "posted send buffer");
 
2119
#endif
 
2120
                                post_callback(j.callback, j, ret);
1250
2121
#ifndef BOOST_NO_EXCEPTIONS
1251
2122
                        } catch (std::exception&)
1252
2123
                        {