1
/* Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved.
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
17
Cashing of files with only does (sequential) read or writes of fixed-
18
length records. A read isn't allowed to go over file-length. A read is ok
19
if it ends at file-length and next read can try to read after file-length
20
(and get a EOF-error).
21
Possibly use of asyncronic io.
22
macros for read and writes for faster io.
23
Used instead of FILE when reading or writing whole files.
24
This code makes mf_rec_cache obsolete (currently only used by ISAM)
25
One can change info->pos_in_file to a higher value to skip bytes in file if
26
also info->read_pos is set to info->read_end.
27
If called through open_cached_file(), then the temporary file will
28
only be created if a write exeeds the file buffer or if one calls
29
my_b_flush_io_cache().
31
If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
32
reading and another for writing. Reads are first done from disk and
33
then done from the write buffer. This is an efficient way to read
34
from a log file when one is writing to it at the same time.
35
For this to work, the file has to be opened in append mode!
36
Note that when one uses SEQ_READ_APPEND, one MUST write using
37
my_b_append ! This is needed because we need to lock the mutex
38
every time we access the write buffer.
41
When one SEQ_READ_APPEND and we are reading and writing at the same time,
42
each time the write buffer gets full and it's written to disk, we will
43
always do a disk read to read a part of the buffer from disk to the
45
This should be fixed so that when we do a my_b_flush_io_cache() and
46
we have been reading the write buffer, we should transfer the rest of the
47
write buffer to the read buffer before we start to reuse it.
50
#include "mysys_priv.h"
53
#include "mysys_err.h"
54
static void my_aiowait(my_aio_result *result);
58
#define lock_append_buffer(info) \
59
mysql_mutex_lock(&(info)->append_buffer_lock)
60
#define unlock_append_buffer(info) \
61
mysql_mutex_unlock(&(info)->append_buffer_lock)
63
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
64
#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
67
Setup internal pointers inside IO_CACHE
74
This is called on automaticly on init or reinit of IO_CACHE
75
It must be called externally if one moves or copies an IO_CACHE
79
void setup_io_cache(IO_CACHE* info)
81
/* Ensure that my_b_tell() and my_b_bytes_in_cache works */
82
if (info->type == WRITE_CACHE)
84
info->current_pos= &info->write_pos;
85
info->current_end= &info->write_end;
89
info->current_pos= &info->read_pos;
90
info->current_end= &info->read_end;
96
init_functions(IO_CACHE* info)
98
enum cache_type type= info->type;
102
Must be initialized by the caller. The problem is that
103
_my_b_net_read has to be defined in sql directory because of
104
the dependency on THD, and therefore cannot be visible to
105
programs that link against mysys but know nothing about THD, such
109
case SEQ_READ_APPEND:
110
info->read_function = _my_b_seq_read;
111
info->write_function = 0; /* Force a core if used */
114
info->read_function = info->share ? _my_b_read_r : _my_b_read;
115
info->write_function = _my_b_write;
118
setup_io_cache(info);
123
Initialize an IO_CACHE object
127
info cache handler to initialize
128
file File that should be associated to to the handler
129
If == -1 then real_open_cached_file()
130
will be called when it's time to open file.
131
cachesize Size of buffer to allocate for read/write
132
If == 0 then use my_default_record_cache_size
134
seek_offset Where cache should start reading/writing
135
use_async_io Set to 1 of we should use async_io (if avaiable)
136
cache_myflags Bitmap of differnt flags
137
MY_WME | MY_FAE | MY_NABP | MY_FNABP |
138
MY_DONT_CHECK_FILESIZE
145
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
146
enum cache_type type, my_off_t seek_offset,
147
pbool use_async_io, myf cache_myflags)
151
my_off_t end_of_file= ~(my_off_t) 0;
152
DBUG_ENTER("init_io_cache");
153
DBUG_PRINT("enter",("cache: 0x%lx type: %d pos: %ld",
154
(ulong) info, (int) type, (ulong) seek_offset));
157
info->type= TYPE_NOT_SET; /* Don't set it until mutex are created */
158
info->pos_in_file= seek_offset;
159
info->pre_close = info->pre_read = info->post_read = 0;
161
info->alloced_buffer = 0;
163
info->seek_not_done= 0;
167
pos= mysql_file_tell(file, MYF(0));
168
if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
171
This kind of object doesn't support seek() or tell(). Don't set a
172
flag that will make us again try to seek() later and fail.
174
info->seek_not_done= 0;
176
Additionally, if we're supposed to start somewhere other than the
177
the beginning of whatever this file is, then somebody made a bad
180
DBUG_ASSERT(seek_offset == 0);
183
info->seek_not_done= MY_TEST(seek_offset != pos);
186
info->disk_writes= 0;
189
if (!cachesize && !(cachesize= my_default_record_cache_size))
190
DBUG_RETURN(1); /* No cache requested */
191
min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
192
if (type == READ_CACHE || type == SEQ_READ_APPEND)
193
{ /* Assume file isn't growing */
194
if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
196
/* Calculate end of file to avoid allocating oversized buffers */
197
end_of_file= mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0));
198
/* Need to reset seek_not_done now that we just did a seek. */
199
info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
200
if (end_of_file < seek_offset)
201
end_of_file=seek_offset;
202
/* Trim cache size if the file is very small */
203
if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
205
cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
206
use_async_io=0; /* No need to use async */
210
cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
211
if (type != READ_NET && type != WRITE_NET)
213
/* Retry allocating memory in smaller blocks until we get one */
214
cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
219
Unset MY_WAIT_IF_FULL bit if it is set, to prevent conflict with
222
myf flags= (myf) (cache_myflags & ~(MY_WME | MY_WAIT_IF_FULL));
224
if (cachesize < min_cache)
225
cachesize = min_cache;
226
buffer_block= cachesize;
227
if (type == SEQ_READ_APPEND)
229
if (cachesize == min_cache)
230
flags|= (myf) MY_WME;
232
if ((info->buffer= (uchar*) my_malloc(buffer_block, flags)) != 0)
234
info->write_buffer=info->buffer;
235
if (type == SEQ_READ_APPEND)
236
info->write_buffer = info->buffer + cachesize;
237
info->alloced_buffer=1;
238
break; /* Enough memory found */
240
if (cachesize == min_cache)
241
DBUG_RETURN(2); /* Can't alloc cache */
242
/* Try with less memory */
243
cachesize= (cachesize*3/4 & ~(min_cache-1));
247
DBUG_PRINT("info",("init_io_cache: cachesize = %lu", (ulong) cachesize));
248
info->read_length=info->buffer_length=cachesize;
249
info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
250
info->request_pos= info->read_pos= info->write_pos = info->buffer;
251
if (type == SEQ_READ_APPEND)
253
info->append_read_pos = info->write_pos = info->write_buffer;
254
info->write_end = info->write_buffer + info->buffer_length;
255
mysql_mutex_init(key_IO_CACHE_append_buffer_lock,
256
&info->append_buffer_lock, MY_MUTEX_INIT_FAST);
258
#if defined(SAFE_MUTEX)
261
/* Clear mutex so that safe_mutex will notice that it's not initialized */
262
memset(&info->append_buffer_lock, 0, sizeof(info->append_buffer_lock));
266
if (type == WRITE_CACHE)
268
info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
270
info->read_end=info->buffer; /* Nothing in cache */
272
/* End_of_file may be changed by user later */
273
info->end_of_file= end_of_file;
276
init_functions(info);
278
if (use_async_io && ! my_disable_async_io)
280
DBUG_PRINT("info",("Using async io"));
281
info->read_length/=2;
282
info->read_function=_my_b_async_read;
284
info->inited=info->aio_result.pending=0;
287
} /* init_io_cache */
289
/* Wait until current request is ready */
292
static void my_aiowait(my_aio_result *result)
296
struct aio_result_t *tmp;
299
if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
303
DBUG_PRINT("error",("No aio request, error: %d",errno));
304
result->pending=0; /* Assume everythings is ok */
307
((my_aio_result*) tmp)->pending=0;
308
if ((my_aio_result*) tmp == result)
318
Use this to reset cache to re-start reading or to change the type
319
between READ_CACHE <-> WRITE_CACHE
320
If we are doing a reinit of a cache where we have the start of the file
321
in the cache, we are reusing this memory without flushing it to disk.
324
my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
325
my_off_t seek_offset,
326
pbool use_async_io __attribute__((unused)),
329
DBUG_ENTER("reinit_io_cache");
330
DBUG_PRINT("enter",("cache: 0x%lx type: %d seek_offset: %lu clear_cache: %d",
331
(ulong) info, type, (ulong) seek_offset,
334
/* One can't do reinit with the following types */
335
DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
336
type != WRITE_NET && info->type != WRITE_NET &&
337
type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
339
/* If the whole file is in memory, avoid flushing to disk */
341
seek_offset >= info->pos_in_file &&
342
seek_offset <= my_b_tell(info))
344
/* Reuse current buffer without flushing it to disk */
346
if (info->type == WRITE_CACHE && type == READ_CACHE)
348
info->read_end=info->write_pos;
349
info->end_of_file=my_b_tell(info);
351
Trigger a new seek only if we have a valid
354
info->seek_not_done= (info->file != -1);
356
else if (type == WRITE_CACHE)
358
if (info->type == READ_CACHE)
360
info->write_end=info->write_buffer+info->buffer_length;
361
info->seek_not_done=1;
363
info->end_of_file = ~(my_off_t) 0;
365
pos=info->request_pos+(seek_offset-info->pos_in_file);
366
if (type == WRITE_CACHE)
371
my_aiowait(&info->aio_result); /* Wait for outstanding req */
377
If we change from WRITE_CACHE to READ_CACHE, assume that everything
378
after the current positions should be ignored
380
if (info->type == WRITE_CACHE && type == READ_CACHE)
381
info->end_of_file=my_b_tell(info);
382
/* flush cache if we want to reuse it */
383
if (!clear_cache && my_b_flush_io_cache(info,1))
385
info->pos_in_file=seek_offset;
386
/* Better to do always do a seek */
387
info->seek_not_done=1;
388
info->request_pos=info->read_pos=info->write_pos=info->buffer;
389
if (type == READ_CACHE)
391
info->read_end=info->buffer; /* Nothing in cache */
395
info->write_end=(info->buffer + info->buffer_length -
396
(seek_offset & (IO_SIZE-1)));
397
info->end_of_file= ~(my_off_t) 0;
402
init_functions(info);
405
if (use_async_io && ! my_disable_async_io &&
406
((ulong) info->buffer_length <
407
(ulong) (info->end_of_file - seek_offset)))
409
info->read_length=info->buffer_length/2;
410
info->read_function=_my_b_async_read;
415
} /* reinit_io_cache */
424
info IO_CACHE pointer
425
Buffer Buffer to retrieve count bytes from file
426
Count Number of bytes to read into Buffer
429
This function is only called from the my_b_read() macro when there
430
isn't enough characters in the buffer to satisfy the request.
434
When changing this function, be careful with handling file offsets
435
(end-of_file, pos_in_file). Do not cast them to possibly smaller
436
types than my_off_t unless you can be sure that their value fits.
437
Same applies to differences of file offsets.
439
When changing this function, check _my_b_read_r(). It might need the
443
0 we succeeded in reading all data
444
1 Error: couldn't read requested characters. In this case:
445
If info->error == -1, we got a read error.
446
Otherwise info->error contains the number of bytes in Buffer.
449
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
451
size_t length,diff_length,left_length, max_length;
452
my_off_t pos_in_file;
453
DBUG_ENTER("_my_b_read");
455
/* If the buffer is not empty yet, copy what is available. */
456
if ((left_length= (size_t) (info->read_end-info->read_pos)))
458
DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
459
memcpy(Buffer,info->read_pos, left_length);
464
/* pos_in_file always point on where info->buffer was read */
465
pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
468
Whenever a function which operates on IO_CACHE flushes/writes
469
some part of the IO_CACHE to disk it will set the property
470
"seek_not_done" to indicate this to other functions operating
473
if (info->seek_not_done)
475
if ((mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0))
476
!= MY_FILEPOS_ERROR))
478
/* No error, reset seek_not_done flag. */
479
info->seek_not_done= 0;
484
If the seek failed and the error number is ESPIPE, it is because
485
info->file is a pipe or socket or FIFO. We never should have tried
486
to seek on that. See Bugs#25807 and #22828 for more info.
488
DBUG_ASSERT(my_errno != ESPIPE);
495
Calculate, how much we are within a IO_SIZE block. Ideally this
498
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
501
If more than a block plus the rest of the current block is wanted,
502
we do read directly, without filling the buffer.
504
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
505
{ /* Fill first intern buffer */
507
if (info->end_of_file <= pos_in_file)
509
/* End of file. Return, what we did copy from the buffer. */
510
info->error= (int) left_length;
514
Crop the wanted count to a multiple of IO_SIZE and subtract,
515
what we did already read from a block. That way, the read will
516
end aligned with a block.
518
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
519
if ((read_length= mysql_file_read(info->file,Buffer, length, info->myflags))
523
If we didn't get, what we wanted, we either return -1 for a read
524
error, or (it's end of file), how much we got in total.
526
info->error= (read_length == (size_t) -1 ? -1 :
527
(int) (read_length+left_length));
538
At this point, we want less than one and a partial block.
539
We will read a full cache, minus the number of bytes, we are
540
within a block already. So we will reach new alignment.
542
max_length= info->read_length-diff_length;
543
/* We will not read past end of file. */
544
if (info->type != READ_FIFO &&
545
max_length > (info->end_of_file - pos_in_file))
546
max_length= (size_t) (info->end_of_file - pos_in_file);
548
If there is nothing left to read,
549
we either are done, or we failed to fulfill the request.
550
Otherwise, we read max_length into the cache.
556
/* We couldn't fulfil the request. Return, how much we got. */
557
info->error= left_length;
560
length=0; /* Didn't read any chars */
562
else if ((length= mysql_file_read(info->file,info->buffer, max_length,
563
info->myflags)) < Count ||
564
length == (size_t) -1)
567
We got an read error, or less than requested (end of file).
568
If not a read error, copy, what we got.
570
if (length != (size_t) -1)
571
memcpy(Buffer, info->buffer, length);
572
info->pos_in_file= pos_in_file;
573
/* For a read error, return -1, otherwise, what we got in total. */
574
info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
575
info->read_pos=info->read_end=info->buffer;
579
Count is the remaining number of bytes requested.
580
length is the amount of data in the cache.
581
Read Count bytes from the cache.
583
info->read_pos=info->buffer+Count;
584
info->read_end=info->buffer+length;
585
info->pos_in_file=pos_in_file;
586
memcpy(Buffer, info->buffer, Count);
592
Prepare IO_CACHE for shared use.
595
init_io_cache_share()
596
read_cache A read cache. This will be copied for
597
every thread after setup.
599
write_cache If non-NULL a write cache that is to be
600
synchronized with the read caches.
601
num_threads Number of threads sharing the cache
602
including the write thread if any.
606
The shared cache is used so: One IO_CACHE is initialized with
607
init_io_cache(). This includes the allocation of a buffer. Then a
608
share is allocated and init_io_cache_share() is called with the io
609
cache and the share. Then the io cache is copied for each thread. So
610
every thread has its own copy of IO_CACHE. But the allocated buffer
611
is shared because cache->buffer is the same for all caches.
613
One thread reads data from the file into the buffer. All threads
614
read from the buffer, but every thread maintains its own set of
615
pointers into the buffer. When all threads have used up the buffer
616
contents, one of the threads reads the next block of data into the
617
buffer. To accomplish this, each thread enters the cache lock before
618
accessing the buffer. They wait in lock_io_cache() until all threads
619
joined the lock. The last thread entering the lock is in charge of
620
reading from file to buffer. It wakes all threads when done.
622
Synchronizing a write cache to the read caches works so: Whenever
623
the write buffer needs a flush, the write thread enters the lock and
624
waits for all other threads to enter the lock too. They do this when
625
they have used up the read buffer. When all threads are in the lock,
626
the write thread copies the write buffer to the read buffer and
629
share->running_threads is the number of threads not being in the
630
cache lock. When entering lock_io_cache() the number is decreased.
631
When the thread that fills the buffer enters unlock_io_cache() the
632
number is reset to the number of threads. The condition
633
running_threads == 0 means that all threads are in the lock. Bumping
634
up the number to the full count is non-intuitive. But increasing the
635
number by one for each thread that leaves the lock could lead to a
636
solo run of one thread. The last thread to join a lock reads from
637
file to buffer, wakes the other threads, processes the data in the
638
cache and enters the lock again. If no other thread left the lock
639
meanwhile, it would think it's the last one again and read the next
642
The share has copies of 'error', 'buffer', 'read_end', and
643
'pos_in_file' from the thread that filled the buffer. We may not be
644
able to access this information directly from its cache because the
645
thread may be removed from the share before the variables could be
646
copied by all other threads. Or, if a write buffer is synchronized,
647
it would change its 'pos_in_file' after waking the other threads,
648
possibly before they could copy its value.
650
However, the 'buffer' variable in the share is for a synchronized
651
write cache. It needs to know where to put the data. Otherwise it
652
would need access to the read cache of one of the threads that is
653
not yet removed from the share.
659
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
660
IO_CACHE *write_cache, uint num_threads)
662
DBUG_ENTER("init_io_cache_share");
663
DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx share: 0x%lx "
664
"write_cache: 0x%lx threads: %u",
665
(long) read_cache, (long) cshare,
666
(long) write_cache, num_threads));
668
DBUG_ASSERT(num_threads > 1);
669
DBUG_ASSERT(read_cache->type == READ_CACHE);
670
DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
672
mysql_mutex_init(key_IO_CACHE_SHARE_mutex,
673
&cshare->mutex, MY_MUTEX_INIT_FAST);
674
mysql_cond_init(key_IO_CACHE_SHARE_cond, &cshare->cond, 0);
675
mysql_cond_init(key_IO_CACHE_SHARE_cond_writer, &cshare->cond_writer, 0);
677
cshare->running_threads= num_threads;
678
cshare->total_threads= num_threads;
679
cshare->error= 0; /* Initialize. */
680
cshare->buffer= read_cache->buffer;
681
cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
682
cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
683
cshare->source_cache= write_cache; /* Can be NULL. */
685
read_cache->share= cshare;
686
read_cache->read_function= _my_b_read_r;
687
read_cache->current_pos= NULL;
688
read_cache->current_end= NULL;
691
write_cache->share= cshare;
698
Remove a thread from shared access to IO_CACHE.
702
cache The IO_CACHE to be removed from the share.
706
Every thread must do that on exit for not to deadlock other threads.
708
The last thread destroys the pthread resources.
710
A writer flushes its cache first.
716
void remove_io_thread(IO_CACHE *cache)
718
IO_CACHE_SHARE *cshare= cache->share;
720
DBUG_ENTER("remove_io_thread");
722
/* If the writer goes, it needs to flush the write cache. */
723
if (cache == cshare->source_cache)
724
flush_io_cache(cache);
726
mysql_mutex_lock(&cshare->mutex);
727
DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
728
(cache == cshare->source_cache) ?
729
"writer" : "reader", (long) cache));
731
/* Remove from share. */
732
total= --cshare->total_threads;
733
DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
735
/* Detach from share. */
738
/* If the writer goes, let the readers know. */
739
if (cache == cshare->source_cache)
741
DBUG_PRINT("io_cache_share", ("writer leaves"));
742
cshare->source_cache= NULL;
745
/* If all threads are waiting for me to join the lock, wake them. */
746
if (!--cshare->running_threads)
748
DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
749
mysql_cond_signal(&cshare->cond_writer);
750
mysql_cond_broadcast(&cshare->cond);
753
mysql_mutex_unlock(&cshare->mutex);
757
DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
758
mysql_cond_destroy (&cshare->cond_writer);
759
mysql_cond_destroy (&cshare->cond);
760
mysql_mutex_destroy(&cshare->mutex);
768
Lock IO cache and wait for all other threads to join.
772
cache The cache of the thread entering the lock.
773
pos File position of the block to read.
774
Unused for the write thread.
778
Wait for all threads to finish with the current buffer. We want
779
all threads to proceed in concert. The last thread to join
780
lock_io_cache() will read the block from file and all threads start
781
to use it. Then they will join again for reading the next block.
783
The waiting threads detect a fresh buffer by comparing
784
cshare->pos_in_file with the position they want to process next.
785
Since the first block may start at position 0, we take
786
cshare->read_end as an additional condition. This variable is
787
initialized to NULL and will be set after a block of data is written
791
1 OK, lock in place, go ahead and read.
792
0 OK, unlocked, another thread did the read.
795
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
797
IO_CACHE_SHARE *cshare= cache->share;
798
DBUG_ENTER("lock_io_cache");
800
/* Enter the lock. */
801
mysql_mutex_lock(&cshare->mutex);
802
cshare->running_threads--;
803
DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
804
(cache == cshare->source_cache) ?
805
"writer" : "reader", (long) cache, (ulong) pos,
806
cshare->running_threads));
808
if (cshare->source_cache)
810
/* A write cache is synchronized to the read caches. */
812
if (cache == cshare->source_cache)
814
/* The writer waits until all readers are here. */
815
while (cshare->running_threads)
817
DBUG_PRINT("io_cache_share", ("writer waits in lock"));
818
mysql_cond_wait(&cshare->cond_writer, &cshare->mutex);
820
DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
822
/* Stay locked. Leave the lock later by unlock_io_cache(). */
826
/* The last thread wakes the writer. */
827
if (!cshare->running_threads)
829
DBUG_PRINT("io_cache_share", ("waking writer"));
830
mysql_cond_signal(&cshare->cond_writer);
834
Readers wait until the data is copied from the writer. Another
835
reason to stop waiting is the removal of the write thread. If this
836
happens, we leave the lock with old data in the buffer.
838
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
839
cshare->source_cache)
841
DBUG_PRINT("io_cache_share", ("reader waits in lock"));
842
mysql_cond_wait(&cshare->cond, &cshare->mutex);
846
If the writer was removed from the share while this thread was
847
asleep, we need to simulate an EOF condition. The writer cannot
848
reset the share variables as they might still be in use by readers
849
of the last block. When we awake here then because the last
850
joining thread signalled us. If the writer is not the last, it
851
will not signal. So it is safe to clear the buffer here.
853
if (!cshare->read_end || (cshare->pos_in_file < pos))
855
DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
856
cshare->read_end= cshare->buffer; /* Empty buffer. */
857
cshare->error= 0; /* EOF is not an error. */
863
There are read caches only. The last thread arriving in
864
lock_io_cache() continues with a locked cache and reads the block.
866
if (!cshare->running_threads)
868
DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
869
/* Stay locked. Leave the lock later by unlock_io_cache(). */
874
All other threads wait until the requested block is read by the
875
last thread arriving. Another reason to stop waiting is the
876
removal of a thread. If this leads to all threads being in the
877
lock, we have to continue also. The first of the awaken threads
878
will then do the read.
880
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
881
cshare->running_threads)
883
DBUG_PRINT("io_cache_share", ("reader waits in lock"));
884
mysql_cond_wait(&cshare->cond, &cshare->mutex);
887
/* If the block is not yet read, continue with a locked cache and read. */
888
if (!cshare->read_end || (cshare->pos_in_file < pos))
890
DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
891
/* Stay locked. Leave the lock later by unlock_io_cache(). */
895
/* Another thread did read the block already. */
897
DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
898
(uint) (cshare->read_end ? (size_t)
899
(cshare->read_end - cshare->buffer) :
903
Leave the lock. Do not call unlock_io_cache() later. The thread that
904
filled the buffer did this and marked all threads as running.
906
mysql_mutex_unlock(&cshare->mutex);
916
cache The cache of the thread leaving the lock.
919
This is called by the thread that filled the buffer. It marks all
920
threads as running and awakes them. This must not be done by any
923
Do not signal cond_writer. Either there is no writer or the writer
924
is the only one who can call this function.
926
The reason for resetting running_threads to total_threads before
927
waking all other threads is that it could be possible that this
928
thread is so fast with processing the buffer that it enters the lock
929
before even one other thread has left it. If every awoken thread
930
would increase running_threads by one, this thread could think that
931
he is again the last to join and would not wait for the other
932
threads to process the data.
938
static void unlock_io_cache(IO_CACHE *cache)
940
IO_CACHE_SHARE *cshare= cache->share;
941
DBUG_ENTER("unlock_io_cache");
942
DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
943
(cache == cshare->source_cache) ?
945
(long) cache, (ulong) cshare->pos_in_file,
946
cshare->total_threads));
948
cshare->running_threads= cshare->total_threads;
949
mysql_cond_broadcast(&cshare->cond);
950
mysql_mutex_unlock(&cshare->mutex);
956
Read from IO_CACHE when it is shared between several threads.
960
cache IO_CACHE pointer
961
Buffer Buffer to retrieve count bytes from file
962
Count Number of bytes to read into Buffer
965
This function is only called from the my_b_read() macro when there
966
isn't enough characters in the buffer to satisfy the request.
970
It works as follows: when a thread tries to read from a file (that
971
is, after using all the data from the (shared) buffer), it just
972
hangs on lock_io_cache(), waiting for other threads. When the very
973
last thread attempts a read, lock_io_cache() returns 1, the thread
974
does actual IO and unlock_io_cache(), which signals all the waiting
975
threads that data is in the buffer.
979
When changing this function, be careful with handling file offsets
980
(end-of_file, pos_in_file). Do not cast them to possibly smaller
981
types than my_off_t unless you can be sure that their value fits.
982
Same applies to differences of file offsets. (Bug #11527)
984
When changing this function, check _my_b_read(). It might need the
988
0 we succeeded in reading all data
989
1 Error: can't read requested characters
992
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
994
my_off_t pos_in_file;
995
size_t length, diff_length, left_length;
996
IO_CACHE_SHARE *cshare= cache->share;
997
DBUG_ENTER("_my_b_read_r");
999
if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
1001
DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
1002
memcpy(Buffer, cache->read_pos, left_length);
1003
Buffer+= left_length;
1004
Count-= left_length;
1010
pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
1011
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1012
length=IO_ROUND_UP(Count+diff_length)-diff_length;
1013
length= ((length <= cache->read_length) ?
1014
length + IO_ROUND_DN(cache->read_length - length) :
1015
length - IO_ROUND_UP(length - cache->read_length));
1016
if (cache->type != READ_FIFO &&
1017
(length > (cache->end_of_file - pos_in_file)))
1018
length= (size_t) (cache->end_of_file - pos_in_file);
1021
cache->error= (int) left_length;
1024
if (lock_io_cache(cache, pos_in_file))
1026
/* With a synchronized write/read cache we won't come here... */
1027
DBUG_ASSERT(!cshare->source_cache);
1029
... unless the writer has gone before this thread entered the
1030
lock. Simulate EOF in this case. It can be distinguished by
1033
if (cache->file < 0)
1038
Whenever a function which operates on IO_CACHE flushes/writes
1039
some part of the IO_CACHE to disk it will set the property
1040
"seek_not_done" to indicate this to other functions operating
1043
if (cache->seek_not_done)
1045
if (mysql_file_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
1046
== MY_FILEPOS_ERROR)
1049
unlock_io_cache(cache);
1053
len= mysql_file_read(cache->file, cache->buffer, length, cache->myflags);
1055
DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
1057
cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
1058
cache->error= (len == length ? 0 : (int) len);
1059
cache->pos_in_file= pos_in_file;
1061
/* Copy important values to the share. */
1062
cshare->error= cache->error;
1063
cshare->read_end= cache->read_end;
1064
cshare->pos_in_file= pos_in_file;
1066
/* Mark all threads as running and wake them. */
1067
unlock_io_cache(cache);
1072
With a synchronized write/read cache readers always come here.
1073
Copy important values from the share.
1075
cache->error= cshare->error;
1076
cache->read_end= cshare->read_end;
1077
cache->pos_in_file= cshare->pos_in_file;
1079
len= ((cache->error == -1) ? (size_t) -1 :
1080
(size_t) (cache->read_end - cache->buffer));
1082
cache->read_pos= cache->buffer;
1083
cache->seek_not_done= 0;
1084
if (len == 0 || len == (size_t) -1)
1086
DBUG_PRINT("io_cache_share", ("reader error. len %lu left %lu",
1087
(ulong) len, (ulong) left_length));
1088
cache->error= (int) left_length;
1091
cnt= (len > Count) ? Count : len;
1092
memcpy(Buffer, cache->read_pos, cnt);
1096
cache->read_pos+= cnt;
1103
Copy data from write cache to read cache.
1106
copy_to_read_buffer()
1107
write_cache The write cache.
1108
write_buffer The source of data, mostly the cache buffer.
1109
write_length The number of bytes to copy.
1112
The write thread will wait for all read threads to join the cache
1113
lock. Then it copies the data over and wakes the read threads.
1119
static void copy_to_read_buffer(IO_CACHE *write_cache,
1120
const uchar *write_buffer, size_t write_length)
1122
IO_CACHE_SHARE *cshare= write_cache->share;
1124
DBUG_ASSERT(cshare->source_cache == write_cache);
1126
write_length is usually less or equal to buffer_length.
1127
It can be bigger if _my_b_write() is called with a big length.
1129
while (write_length)
1131
size_t copy_length= MY_MIN(write_length, write_cache->buffer_length);
1132
int __attribute__((unused)) rc;
1134
rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1135
/* The writing thread does always have the lock when it awakes. */
1138
memcpy(cshare->buffer, write_buffer, copy_length);
1141
cshare->read_end= cshare->buffer + copy_length;
1142
cshare->pos_in_file= write_cache->pos_in_file;
1144
/* Mark all threads as running and wake them. */
1145
unlock_io_cache(write_cache);
1147
write_buffer+= copy_length;
1148
write_length-= copy_length;
1154
Do sequential read from the SEQ_READ_APPEND cache.
1156
We do this in three stages:
1157
- first read from info->buffer
1158
- then if there are still data to read, try the file descriptor
1159
- afterwards, if there are still data to read, try append buffer
1166
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1168
size_t length, diff_length, left_length, save_count, max_length;
1169
my_off_t pos_in_file;
1172
/* first, read the regular buffer */
1173
if ((left_length=(size_t) (info->read_end-info->read_pos)))
1175
DBUG_ASSERT(Count > left_length); /* User is not using my_b_read() */
1176
memcpy(Buffer,info->read_pos, left_length);
1177
Buffer+=left_length;
1180
lock_append_buffer(info);
1182
/* pos_in_file always point on where info->buffer was read */
1183
if ((pos_in_file=info->pos_in_file +
1184
(size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1185
goto read_append_buffer;
1188
With read-append cache we must always do a seek before we read,
1189
because the write could have moved the file pointer astray
1191
if (mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR)
1194
unlock_append_buffer(info);
1197
info->seek_not_done=0;
1199
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1201
/* now the second stage begins - read from file descriptor */
1202
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1204
/* Fill first intern buffer */
1207
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1208
if ((read_length= mysql_file_read(info->file,Buffer, length,
1209
info->myflags)) == (size_t) -1)
1212
unlock_append_buffer(info);
1216
Buffer+=read_length;
1217
pos_in_file+=read_length;
1219
if (read_length != length)
1222
We only got part of data; Read the rest of the data from the
1225
goto read_append_buffer;
1227
left_length+=length;
1231
max_length= info->read_length-diff_length;
1232
if (max_length > (info->end_of_file - pos_in_file))
1233
max_length= (size_t) (info->end_of_file - pos_in_file);
1237
goto read_append_buffer;
1238
length=0; /* Didn't read any more chars */
1242
length= mysql_file_read(info->file,info->buffer, max_length, info->myflags);
1243
if (length == (size_t) -1)
1246
unlock_append_buffer(info);
1251
memcpy(Buffer, info->buffer, length);
1256
added the line below to make
1257
DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
1258
otherwise this does not appear to be needed
1260
pos_in_file += length;
1261
goto read_append_buffer;
1264
unlock_append_buffer(info);
1265
info->read_pos=info->buffer+Count;
1266
info->read_end=info->buffer+length;
1267
info->pos_in_file=pos_in_file;
1268
memcpy(Buffer,info->buffer,(size_t) Count);
1274
Read data from the current write buffer.
1275
Count should never be == 0 here (The code will work even if count is 0)
1279
/* First copy the data to Count */
1280
size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1282
size_t transfer_len;
1284
DBUG_ASSERT(info->append_read_pos <= info->write_pos);
1286
TODO: figure out if the assert below is needed or correct.
1288
DBUG_ASSERT(pos_in_file == info->end_of_file);
1289
copy_len= MY_MIN(Count, len_in_buff);
1290
memcpy(Buffer, info->append_read_pos, copy_len);
1291
info->append_read_pos += copy_len;
1294
info->error = save_count - Count;
1296
/* Fill read buffer with data from write buffer */
1297
memcpy(info->buffer, info->append_read_pos,
1298
(size_t) (transfer_len=len_in_buff - copy_len));
1299
info->read_pos= info->buffer;
1300
info->read_end= info->buffer+transfer_len;
1301
info->append_read_pos=info->write_pos;
1302
info->pos_in_file=pos_in_file+copy_len;
1303
info->end_of_file+=len_in_buff;
1305
unlock_append_buffer(info);
1306
return Count ? 1 : 0;
1313
Read from the IO_CACHE into a buffer and feed asynchronously
1314
from disk when needed.
1318
info IO_CACHE pointer
1319
Buffer Buffer to retrieve count bytes from file
1320
Count Number of bytes to read into Buffer
1323
-1 An error has occurred; my_errno is set.
1325
1 An error has occurred; IO_CACHE to error state.
1328
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1330
size_t length,read_length,diff_length,left_length,use_length,org_Count;
1332
my_off_t next_pos_in_file;
1335
memcpy(Buffer,info->read_pos,
1336
(left_length= (size_t) (info->read_end-info->read_pos)));
1337
Buffer+=left_length;
1342
{ /* wait for read block */
1343
info->inited=0; /* No more block to read */
1344
my_aiowait(&info->aio_result); /* Wait for outstanding req */
1345
if (info->aio_result.result.aio_errno)
1347
if (info->myflags & MY_WME)
1349
char errbuf[MYSYS_STRERROR_SIZE];
1350
my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
1351
my_filename(info->file),
1352
info->aio_result.result.aio_errno,
1353
my_strerror(errbuf, sizeof(errbuf),
1354
info->aio_result.result.aio_errno));
1355
my_errno=info->aio_result.result.aio_errno;
1359
if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
1360
read_length == (size_t) -1)
1362
my_errno=0; /* For testing */
1363
info->error= (read_length == (size_t) -1 ? -1 :
1364
(int) (read_length+left_length));
1367
info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
1369
if (info->request_pos != info->buffer)
1370
info->request_pos=info->buffer;
1372
info->request_pos=info->buffer+info->read_length;
1373
info->read_pos=info->request_pos;
1374
next_pos_in_file=info->aio_read_pos+read_length;
1376
/* Check if pos_in_file is changed
1377
(_ni_read_cache may have skipped some bytes) */
1379
if (info->aio_read_pos < info->pos_in_file)
1380
{ /* Fix if skipped bytes */
1381
if (info->aio_read_pos + read_length < info->pos_in_file)
1383
read_length=0; /* Skip block */
1384
next_pos_in_file=info->pos_in_file;
1388
my_off_t offset= (info->pos_in_file - info->aio_read_pos);
1389
info->pos_in_file=info->aio_read_pos; /* Whe are here */
1390
info->read_pos=info->request_pos+offset;
1391
read_length-=offset; /* Bytes left from read_pos */
1395
if (info->aio_read_pos > info->pos_in_file)
1398
return(info->read_length= (size_t) -1);
1401
/* Copy found bytes to buffer */
1402
length= MY_MIN(Count, read_length);
1403
memcpy(Buffer,info->read_pos,(size_t) length);
1406
left_length+=length;
1407
info->read_end=info->rc_pos+read_length;
1408
info->read_pos+=length;
1411
next_pos_in_file=(info->pos_in_file+ (size_t)
1412
(info->read_end - info->request_pos));
1414
/* If reading large blocks, or first read or read with skip */
1417
if (next_pos_in_file == info->end_of_file)
1419
info->error=(int) (read_length+left_length);
1423
if (mysql_file_seek(info->file, next_pos_in_file, MY_SEEK_SET, MYF(0))
1424
== MY_FILEPOS_ERROR)
1430
read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
1431
if (Count < read_length)
1432
{ /* Small block, read to cache */
1433
if ((read_length=mysql_file_read(info->file,info->request_pos,
1434
read_length, info->myflags)) == (size_t) -1)
1435
return info->error= -1;
1436
use_length= MY_MIN(Count, read_length);
1437
memcpy(Buffer,info->request_pos,(size_t) use_length);
1438
info->read_pos=info->request_pos+Count;
1439
info->read_end=info->request_pos+read_length;
1440
info->pos_in_file=next_pos_in_file; /* Start of block in cache */
1441
next_pos_in_file+=read_length;
1443
if (Count != use_length)
1444
{ /* Didn't find hole block */
1445
if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
1447
char errbuf[MYSYS_STRERROR_SIZE];
1448
my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG), my_filename(info->file),
1449
my_errno, my_strerror(errbuf, sizeof(errbuf), my_errno));
1451
info->error=(int) (read_length+left_length);
1456
{ /* Big block, don't cache it */
1457
if ((read_length= mysql_file_read(info->file, Buffer, Count,info->myflags))
1460
info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
1463
info->read_pos=info->read_end=info->request_pos;
1464
info->pos_in_file=(next_pos_in_file+=Count);
1468
/* Read next block with asyncronic io */
1469
diff_length=(next_pos_in_file & (IO_SIZE-1));
1470
max_length= info->read_length - diff_length;
1471
if (max_length > info->end_of_file - next_pos_in_file)
1472
max_length= (size_t) (info->end_of_file - next_pos_in_file);
1474
if (info->request_pos != info->buffer)
1475
read_buffer=info->buffer;
1477
read_buffer=info->buffer+info->read_length;
1478
info->aio_read_pos=next_pos_in_file;
1481
info->aio_result.result.aio_errno=AIO_INPROGRESS; /* Marker for test */
1482
DBUG_PRINT("aioread",("filepos: %ld length: %lu",
1483
(ulong) next_pos_in_file, (ulong) max_length));
1484
if (aioread(info->file,read_buffer, max_length,
1485
(my_off_t) next_pos_in_file,MY_SEEK_SET,
1486
&info->aio_result.result))
1487
{ /* Skip async io */
1489
DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
1490
errno, info->aio_result.result.aio_errno));
1491
if (info->request_pos != info->buffer)
1493
bmove(info->buffer,info->request_pos,
1494
(size_t) (info->read_end - info->read_pos));
1495
info->request_pos=info->buffer;
1496
info->read_pos-=info->read_length;
1497
info->read_end-=info->read_length;
1499
info->read_length=info->buffer_length; /* Use hole buffer */
1500
info->read_function=_my_b_read; /* Use normal IO_READ next */
1503
info->inited=info->aio_result.pending=1;
1505
return 0; /* Block read, async in use */
1506
} /* _my_b_async_read */
1510
/* Read one byte when buffer is empty */
1512
int _my_b_get(IO_CACHE *info)
1515
IO_CACHE_CALLBACK pre_read,post_read;
1516
if ((pre_read = info->pre_read))
1518
if ((*(info)->read_function)(info,&buff,1))
1520
if ((post_read = info->post_read))
1522
return (int) (uchar) buff;
1526
Write a byte buffer to IO_CACHE and flush to disk
1527
if IO_CACHE is full.
1532
-1 On error; my_errno contains error code.
1535
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1537
size_t rest_length,length;
1538
my_off_t pos_in_file= info->pos_in_file;
1540
DBUG_EXECUTE_IF("simulate_huge_load_data_file",
1542
pos_in_file=(my_off_t)(5000000000ULL);
1544
if (pos_in_file+info->buffer_length > info->end_of_file)
1546
my_errno=errno=EFBIG;
1547
return info->error = -1;
1550
rest_length= (size_t) (info->write_end - info->write_pos);
1551
memcpy(info->write_pos,Buffer,(size_t) rest_length);
1552
Buffer+=rest_length;
1554
info->write_pos+=rest_length;
1556
if (my_b_flush_io_cache(info,1))
1558
if (Count >= IO_SIZE)
1559
{ /* Fill first intern buffer */
1560
length=Count & (size_t) ~(IO_SIZE-1);
1561
if (info->seek_not_done)
1564
Whenever a function which operates on IO_CACHE flushes/writes
1565
some part of the IO_CACHE to disk it will set the property
1566
"seek_not_done" to indicate this to other functions operating
1569
if (mysql_file_seek(info->file, info->pos_in_file, MY_SEEK_SET, MYF(0)))
1574
info->seek_not_done=0;
1576
if (mysql_file_write(info->file, Buffer, length, info->myflags | MY_NABP))
1577
return info->error= -1;
1580
In case of a shared I/O cache with a writer we normally do direct
1581
write cache to read cache copy. Simulate this here by direct
1582
caller buffer to read cache copy. Do it after the write so that
1583
the cache readers actions on the flushed part can go in parallel
1584
with the write of the extra stuff. copy_to_read_buffer()
1585
synchronizes writer and readers so that after this call the
1586
readers can act on the extra stuff while the writer can go ahead
1587
and prepare the next output. copy_to_read_buffer() relies on
1591
copy_to_read_buffer(info, Buffer, length);
1595
info->pos_in_file+=length;
1597
memcpy(info->write_pos,Buffer,(size_t) Count);
1598
info->write_pos+=Count;
1604
Append a block to the write buffer.
1605
This is done with the buffer locked to ensure that we don't read from
1606
the write buffer before we are ready with it.
1609
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1611
size_t rest_length,length;
1614
Assert that we cannot come here with a shared cache. If we do one
1615
day, we might need to add a call to copy_to_read_buffer().
1617
DBUG_ASSERT(!info->share);
1619
lock_append_buffer(info);
1620
rest_length= (size_t) (info->write_end - info->write_pos);
1621
if (Count <= rest_length)
1623
memcpy(info->write_pos, Buffer, rest_length);
1624
Buffer+=rest_length;
1626
info->write_pos+=rest_length;
1627
if (my_b_flush_io_cache(info,0))
1629
unlock_append_buffer(info);
1632
if (Count >= IO_SIZE)
1633
{ /* Fill first intern buffer */
1634
length=Count & (size_t) ~(IO_SIZE-1);
1635
if (mysql_file_write(info->file,Buffer, length, info->myflags | MY_NABP))
1637
unlock_append_buffer(info);
1638
return info->error= -1;
1642
info->end_of_file+=length;
1646
memcpy(info->write_pos,Buffer,(size_t) Count);
1647
info->write_pos+=Count;
1648
unlock_append_buffer(info);
1653
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1656
Sasha: We are not writing this with the ? operator to avoid hitting
1657
a possible compiler bug. At least gcc 2.95 cannot deal with
1658
several layers of ternary operators that evaluated comma(,) operator
1659
expressions inside - I do have a test case if somebody wants it
1661
if (info->type == SEQ_READ_APPEND)
1662
return my_b_append(info, Buffer, Count);
1663
return my_b_write(info, Buffer, Count);
1668
Write a block to disk where part of the data may be inside the record
1669
buffer. As all write calls to the data goes through the cache,
1670
we will never get a seek over the end of the buffer
1673
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
1680
Assert that we cannot come here with a shared cache. If we do one
1681
day, we might need to add a call to copy_to_read_buffer().
1683
DBUG_ASSERT(!info->share);
1685
if (pos < info->pos_in_file)
1687
/* Of no overlap, write everything without buffering */
1688
if (pos + Count <= info->pos_in_file)
1689
return mysql_file_pwrite(info->file, Buffer, Count, pos,
1690
info->myflags | MY_NABP);
1691
/* Write the part of the block that is before buffer */
1692
length= (uint) (info->pos_in_file - pos);
1693
if (mysql_file_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
1694
info->error= error= -1;
1699
info->seek_not_done=1;
1703
/* Check if we want to write inside the used part of the buffer.*/
1704
length= (size_t) (info->write_end - info->buffer);
1705
if (pos < info->pos_in_file + length)
1707
size_t offset= (size_t) (pos - info->pos_in_file);
1711
memcpy(info->buffer+offset, Buffer, length);
1714
/* Fix length of buffer if the new data was larger */
1715
if (info->buffer+length > info->write_pos)
1716
info->write_pos=info->buffer+length;
1720
/* Write at the end of the current buffer; This is the normal case */
1721
if (_my_b_write(info, Buffer, Count))
1727
/* Flush write cache */
1729
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1730
lock_append_buffer(info);
1731
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1732
unlock_append_buffer(info);
1734
int my_b_flush_io_cache(IO_CACHE *info,
1735
int need_append_buffer_lock __attribute__((unused)))
1738
my_off_t pos_in_file;
1739
my_bool append_cache= (info->type == SEQ_READ_APPEND);
1740
DBUG_ENTER("my_b_flush_io_cache");
1741
DBUG_PRINT("enter", ("cache: 0x%lx", (long) info));
1744
need_append_buffer_lock= 0;
1746
if (info->type == WRITE_CACHE || append_cache)
1748
if (info->file == -1)
1750
if (real_open_cached_file(info))
1751
DBUG_RETURN((info->error= -1));
1755
if ((length=(size_t) (info->write_pos - info->write_buffer)))
1758
In case of a shared I/O cache with a writer we do direct write
1759
cache to read cache copy. Do it before the write here so that
1760
the readers can work in parallel with the write.
1761
copy_to_read_buffer() relies on info->pos_in_file.
1764
copy_to_read_buffer(info, info->write_buffer, length);
1766
pos_in_file=info->pos_in_file;
1768
If we have append cache, we always open the file with
1769
O_APPEND which moves the pos to EOF automatically on every write
1771
if (!append_cache && info->seek_not_done)
1772
{ /* File touched, do seek */
1773
if (mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0)) ==
1776
UNLOCK_APPEND_BUFFER;
1777
DBUG_RETURN((info->error= -1));
1780
info->seek_not_done=0;
1783
info->pos_in_file+=length;
1784
info->write_end= (info->write_buffer+info->buffer_length-
1785
((pos_in_file+length) & (IO_SIZE-1)));
1787
if (mysql_file_write(info->file,info->write_buffer,length,
1788
info->myflags | MY_NABP))
1794
set_if_bigger(info->end_of_file,(pos_in_file+length));
1798
info->end_of_file+=(info->write_pos-info->append_read_pos);
1799
DBUG_ASSERT(info->end_of_file == mysql_file_tell(info->file, MYF(0)));
1802
info->append_read_pos=info->write_pos=info->write_buffer;
1803
++info->disk_writes;
1804
UNLOCK_APPEND_BUFFER;
1805
DBUG_RETURN(info->error);
1809
else if (info->type != READ_NET)
1811
my_aiowait(&info->aio_result); /* Wait for outstanding req */
1815
UNLOCK_APPEND_BUFFER;
1820
Free an IO_CACHE object
1824
info IO_CACHE Handle to free
1827
It's currently safe to call this if one has called init_io_cache()
1828
on the 'info' object, even if init_io_cache() failed.
1829
This function is also safe to call twice with the same handle.
1836
int end_io_cache(IO_CACHE *info)
1839
IO_CACHE_CALLBACK pre_close;
1840
DBUG_ENTER("end_io_cache");
1841
DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info));
1844
Every thread must call remove_io_thread(). The last one destroys
1847
DBUG_ASSERT(!info->share || !info->share->total_threads);
1849
if ((pre_close=info->pre_close))
1854
if (info->alloced_buffer)
1856
info->alloced_buffer=0;
1857
if (info->file != -1) /* File doesn't exist */
1858
error= my_b_flush_io_cache(info,1);
1859
my_free(info->buffer);
1860
info->buffer=info->read_pos=(uchar*) 0;
1862
if (info->type == SEQ_READ_APPEND)
1864
/* Destroy allocated mutex */
1865
info->type= TYPE_NOT_SET;
1866
mysql_mutex_destroy(&info->append_buffer_lock);
1869
} /* end_io_cache */
1872
/**********************************************************************
1873
Testing of MF_IOCACHE
1874
**********************************************************************/
1880
void die(const char* fmt, ...)
1883
va_start(va_args,fmt);
1884
fprintf(stderr,"Error:");
1885
vfprintf(stderr, fmt,va_args);
1886
fprintf(stderr,", errno=%d\n", errno);
1890
int open_file(const char* fname, IO_CACHE* info, int cache_size)
1893
if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1894
die("Could not open %s", fname);
1895
if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1896
die("failed in init_io_cache()");
1900
void close_file(IO_CACHE* info)
1903
my_close(info->file, MYF(MY_WME));
1906
int main(int argc, char** argv)
1908
IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1910
const char* fname="/tmp/iocache.test";
1911
int cache_size=16384;
1913
int max_block,total_bytes=0;
1914
int i,num_loops=100,error=0;
1916
char* block, *block_end;
1918
max_block = cache_size*3;
1919
if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1920
die("Not enough memory to allocate test block");
1921
block_end = block + max_block;
1922
for (p = block,i=0; p < block_end;i++)
1926
if (my_stat(fname,&status, MYF(0)) &&
1927
my_delete(fname,MYF(MY_WME)))
1929
die("Delete of %s failed, aborting", fname);
1931
open_file(fname,&sra_cache, cache_size);
1932
for (i = 0; i < num_loops; i++)
1935
int block_size = abs(rand() % max_block);
1936
int4store(buf, block_size);
1937
if (my_b_append(&sra_cache,buf,4) ||
1938
my_b_append(&sra_cache, block, block_size))
1939
die("write failed");
1940
total_bytes += 4+block_size;
1942
close_file(&sra_cache);
1944
if (!my_stat(fname,&status,MYF(MY_WME)))
1945
die("%s failed to stat, but I had just closed it,\
1946
wonder how that happened");
1947
printf("Final size of %s is %s, wrote %d bytes\n",fname,
1948
llstr(status.st_size,llstr_buf),
1950
my_delete(fname, MYF(MY_WME));
1951
/* check correctness of tests */
1952
if (total_bytes != status.st_size)
1954
fprintf(stderr,"Not the same number of bytes acutally in file as bytes \
1955
supposedly written\n");