1
/* Copyright (C) 2000 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
Cashing of files with only does (sequential) read or writes of fixed-
18
length records. A read isn't allowed to go over file-length. A read is ok
19
if it ends at file-length and next read can try to read after file-length
20
(and get a EOF-error).
21
Possibly use of asyncronic io.
22
macros for read and writes for faster io.
23
Used instead of FILE when reading or writing whole files.
24
This code makes mf_rec_cache obsolete (currently only used by ISAM)
25
One can change info->pos_in_file to a higher value to skip bytes in file if
26
also info->read_pos is set to info->read_end.
27
If called through open_cached_file(), then the temporary file will
28
only be created if a write exeeds the file buffer or if one calls
29
my_b_flush_io_cache().
31
If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
32
reading and another for writing. Reads are first done from disk and
33
then done from the write buffer. This is an efficient way to read
34
from a log file when one is writing to it at the same time.
35
For this to work, the file has to be opened in append mode!
36
Note that when one uses SEQ_READ_APPEND, one MUST write using
37
my_b_append ! This is needed because we need to lock the mutex
38
every time we access the write buffer.
41
When one SEQ_READ_APPEND and we are reading and writing at the same time,
42
each time the write buffer gets full and it's written to disk, we will
43
always do a disk read to read a part of the buffer from disk to the
45
This should be fixed so that when we do a my_b_flush_io_cache() and
46
we have been reading the write buffer, we should transfer the rest of the
47
write buffer to the read buffer before we start to reuse it.
50
#define MAP_TO_USE_RAID
51
#include "mysys_priv.h"
54
#include "mysys_err.h"
55
static void my_aiowait(my_aio_result *result);
60
#define lock_append_buffer(info) \
61
pthread_mutex_lock(&(info)->append_buffer_lock)
62
#define unlock_append_buffer(info) \
63
pthread_mutex_unlock(&(info)->append_buffer_lock)
65
#define lock_append_buffer(info)
66
#define unlock_append_buffer(info)
69
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
70
#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
73
Setup internal pointers inside IO_CACHE
80
This is called on automaticly on init or reinit of IO_CACHE
81
It must be called externally if one moves or copies an IO_CACHE
85
void setup_io_cache(IO_CACHE* info)
87
/* Ensure that my_b_tell() and my_b_bytes_in_cache works */
88
if (info->type == WRITE_CACHE)
90
info->current_pos= &info->write_pos;
91
info->current_end= &info->write_end;
95
info->current_pos= &info->read_pos;
96
info->current_end= &info->read_end;
102
init_functions(IO_CACHE* info)
104
enum cache_type type= info->type;
108
Must be initialized by the caller. The problem is that
109
_my_b_net_read has to be defined in sql directory because of
110
the dependency on THD, and therefore cannot be visible to
111
programs that link against mysys but know nothing about THD, such
115
case SEQ_READ_APPEND:
116
info->read_function = _my_b_seq_read;
117
info->write_function = 0; /* Force a core if used */
120
info->read_function =
122
info->share ? _my_b_read_r :
125
info->write_function = _my_b_write;
128
setup_io_cache(info);
133
Initialize an IO_CACHE object
137
info cache handler to initialize
138
file File that should be associated to to the handler
139
If == -1 then real_open_cached_file()
140
will be called when it's time to open file.
141
cachesize Size of buffer to allocate for read/write
142
If == 0 then use my_default_record_cache_size
144
seek_offset Where cache should start reading/writing
145
use_async_io Set to 1 of we should use async_io (if avaiable)
146
cache_myflags Bitmap of differnt flags
147
MY_WME | MY_FAE | MY_NABP | MY_FNABP |
148
MY_DONT_CHECK_FILESIZE
155
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
156
enum cache_type type, my_off_t seek_offset,
157
pbool use_async_io, myf cache_myflags)
161
my_off_t end_of_file= ~(my_off_t) 0;
162
DBUG_ENTER("init_io_cache");
163
DBUG_PRINT("enter",("cache: 0x%lx type: %d pos: %ld",
164
(ulong) info, (int) type, (ulong) seek_offset));
167
info->type= TYPE_NOT_SET; /* Don't set it until mutex are created */
168
info->pos_in_file= seek_offset;
169
info->pre_close = info->pre_read = info->post_read = 0;
171
info->alloced_buffer = 0;
173
info->seek_not_done= 0;
177
pos= my_tell(file, MYF(0));
178
if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
181
This kind of object doesn't support seek() or tell(). Don't set a
182
flag that will make us again try to seek() later and fail.
184
info->seek_not_done= 0;
186
Additionally, if we're supposed to start somewhere other than the
187
the beginning of whatever this file is, then somebody made a bad
190
DBUG_ASSERT(seek_offset == 0);
193
info->seek_not_done= test(seek_offset != pos);
196
info->disk_writes= 0;
201
if (!cachesize && !(cachesize= my_default_record_cache_size))
202
DBUG_RETURN(1); /* No cache requested */
203
min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
204
if (type == READ_CACHE || type == SEQ_READ_APPEND)
205
{ /* Assume file isn't growing */
206
if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
208
/* Calculate end of file to avoid allocating oversized buffers */
209
end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
210
/* Need to reset seek_not_done now that we just did a seek. */
211
info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
212
if (end_of_file < seek_offset)
213
end_of_file=seek_offset;
214
/* Trim cache size if the file is very small */
215
if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
217
cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
218
use_async_io=0; /* No need to use async */
222
cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
223
if (type != READ_NET && type != WRITE_NET)
225
/* Retry allocating memory in smaller blocks until we get one */
226
cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
231
Unset MY_WAIT_IF_FULL bit if it is set, to prevent conflict with
234
myf flags= (myf) (cache_myflags & ~(MY_WME | MY_WAIT_IF_FULL));
236
if (cachesize < min_cache)
237
cachesize = min_cache;
238
buffer_block= cachesize;
239
if (type == SEQ_READ_APPEND)
241
if (cachesize == min_cache)
242
flags|= (myf) MY_WME;
244
if ((info->buffer= (uchar*) my_malloc(buffer_block, flags)) != 0)
246
info->write_buffer=info->buffer;
247
if (type == SEQ_READ_APPEND)
248
info->write_buffer = info->buffer + cachesize;
249
info->alloced_buffer=1;
250
break; /* Enough memory found */
252
if (cachesize == min_cache)
253
DBUG_RETURN(2); /* Can't alloc cache */
254
/* Try with less memory */
255
cachesize= (cachesize*3/4 & ~(min_cache-1));
259
DBUG_PRINT("info",("init_io_cache: cachesize = %lu", (ulong) cachesize));
260
info->read_length=info->buffer_length=cachesize;
261
info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
262
info->request_pos= info->read_pos= info->write_pos = info->buffer;
263
if (type == SEQ_READ_APPEND)
265
info->append_read_pos = info->write_pos = info->write_buffer;
266
info->write_end = info->write_buffer + info->buffer_length;
268
pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
271
#if defined(SAFE_MUTEX) && defined(THREAD)
274
/* Clear mutex so that safe_mutex will notice that it's not initialized */
275
bzero((char*) &info->append_buffer_lock, sizeof(info));
279
if (type == WRITE_CACHE)
281
info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
283
info->read_end=info->buffer; /* Nothing in cache */
285
/* End_of_file may be changed by user later */
286
info->end_of_file= end_of_file;
289
init_functions(info);
291
if (use_async_io && ! my_disable_async_io)
293
DBUG_PRINT("info",("Using async io"));
294
info->read_length/=2;
295
info->read_function=_my_b_async_read;
297
info->inited=info->aio_result.pending=0;
300
} /* init_io_cache */
302
/* Wait until current request is ready */
305
static void my_aiowait(my_aio_result *result)
309
struct aio_result_t *tmp;
312
if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
316
DBUG_PRINT("error",("No aio request, error: %d",errno));
317
result->pending=0; /* Assume everythings is ok */
320
((my_aio_result*) tmp)->pending=0;
321
if ((my_aio_result*) tmp == result)
331
Use this to reset cache to re-start reading or to change the type
332
between READ_CACHE <-> WRITE_CACHE
333
If we are doing a reinit of a cache where we have the start of the file
334
in the cache, we are reusing this memory without flushing it to disk.
337
my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
338
my_off_t seek_offset,
339
pbool use_async_io __attribute__((unused)),
342
DBUG_ENTER("reinit_io_cache");
343
DBUG_PRINT("enter",("cache: 0x%lx type: %d seek_offset: %lu clear_cache: %d",
344
(ulong) info, type, (ulong) seek_offset,
347
/* One can't do reinit with the following types */
348
DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
349
type != WRITE_NET && info->type != WRITE_NET &&
350
type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
352
/* If the whole file is in memory, avoid flushing to disk */
354
seek_offset >= info->pos_in_file &&
355
seek_offset <= my_b_tell(info))
357
/* Reuse current buffer without flushing it to disk */
359
if (info->type == WRITE_CACHE && type == READ_CACHE)
361
info->read_end=info->write_pos;
362
info->end_of_file=my_b_tell(info);
364
Trigger a new seek only if we have a valid
367
info->seek_not_done= (info->file != -1);
369
else if (type == WRITE_CACHE)
371
if (info->type == READ_CACHE)
373
info->write_end=info->write_buffer+info->buffer_length;
374
info->seek_not_done=1;
376
info->end_of_file = ~(my_off_t) 0;
378
pos=info->request_pos+(seek_offset-info->pos_in_file);
379
if (type == WRITE_CACHE)
384
my_aiowait(&info->aio_result); /* Wait for outstanding req */
390
If we change from WRITE_CACHE to READ_CACHE, assume that everything
391
after the current positions should be ignored
393
if (info->type == WRITE_CACHE && type == READ_CACHE)
394
info->end_of_file=my_b_tell(info);
395
/* flush cache if we want to reuse it */
396
if (!clear_cache && my_b_flush_io_cache(info,1))
398
info->pos_in_file=seek_offset;
399
/* Better to do always do a seek */
400
info->seek_not_done=1;
401
info->request_pos=info->read_pos=info->write_pos=info->buffer;
402
if (type == READ_CACHE)
404
info->read_end=info->buffer; /* Nothing in cache */
408
info->write_end=(info->buffer + info->buffer_length -
409
(seek_offset & (IO_SIZE-1)));
410
info->end_of_file= ~(my_off_t) 0;
415
init_functions(info);
418
if (use_async_io && ! my_disable_async_io &&
419
((ulong) info->buffer_length <
420
(ulong) (info->end_of_file - seek_offset)))
422
info->read_length=info->buffer_length/2;
423
info->read_function=_my_b_async_read;
428
} /* reinit_io_cache */
437
info IO_CACHE pointer
438
Buffer Buffer to retrieve count bytes from file
439
Count Number of bytes to read into Buffer
442
This function is only called from the my_b_read() macro when there
443
isn't enough characters in the buffer to satisfy the request.
447
When changing this function, be careful with handling file offsets
448
(end-of_file, pos_in_file). Do not cast them to possibly smaller
449
types than my_off_t unless you can be sure that their value fits.
450
Same applies to differences of file offsets.
452
When changing this function, check _my_b_read_r(). It might need the
456
0 we succeeded in reading all data
457
1 Error: can't read requested characters
460
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
462
size_t length,diff_length,left_length, max_length;
463
my_off_t pos_in_file;
464
DBUG_ENTER("_my_b_read");
466
if ((left_length= (size_t) (info->read_end-info->read_pos)))
468
DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
469
memcpy(Buffer,info->read_pos, left_length);
474
/* pos_in_file always point on where info->buffer was read */
475
pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
478
Whenever a function which operates on IO_CACHE flushes/writes
479
some part of the IO_CACHE to disk it will set the property
480
"seek_not_done" to indicate this to other functions operating
483
if (info->seek_not_done)
485
if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))
486
!= MY_FILEPOS_ERROR))
488
/* No error, reset seek_not_done flag. */
489
info->seek_not_done= 0;
494
If the seek failed and the error number is ESPIPE, it is because
495
info->file is a pipe or socket or FIFO. We never should have tried
496
to seek on that. See Bugs#25807 and #22828 for more info.
498
DBUG_ASSERT(my_errno != ESPIPE);
504
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
505
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
506
{ /* Fill first intern buffer */
508
if (info->end_of_file <= pos_in_file)
510
info->error= (int) left_length;
513
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
514
if ((read_length= my_read(info->file,Buffer, length, info->myflags))
517
info->error= (read_length == (size_t) -1 ? -1 :
518
(int) (read_length+left_length));
528
max_length= info->read_length-diff_length;
529
if (info->type != READ_FIFO &&
530
max_length > (info->end_of_file - pos_in_file))
531
max_length= (size_t) (info->end_of_file - pos_in_file);
536
info->error= left_length; /* We only got this many char */
539
length=0; /* Didn't read any chars */
541
else if ((length= my_read(info->file,info->buffer, max_length,
542
info->myflags)) < Count ||
543
length == (size_t) -1)
545
if (length != (size_t) -1)
546
memcpy(Buffer, info->buffer, length);
547
info->pos_in_file= pos_in_file;
548
info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
549
info->read_pos=info->read_end=info->buffer;
552
info->read_pos=info->buffer+Count;
553
info->read_end=info->buffer+length;
554
info->pos_in_file=pos_in_file;
555
memcpy(Buffer, info->buffer, Count);
562
Prepare IO_CACHE for shared use.
565
init_io_cache_share()
566
read_cache A read cache. This will be copied for
567
every thread after setup.
569
write_cache If non-NULL a write cache that is to be
570
synchronized with the read caches.
571
num_threads Number of threads sharing the cache
572
including the write thread if any.
576
The shared cache is used so: One IO_CACHE is initialized with
577
init_io_cache(). This includes the allocation of a buffer. Then a
578
share is allocated and init_io_cache_share() is called with the io
579
cache and the share. Then the io cache is copied for each thread. So
580
every thread has its own copy of IO_CACHE. But the allocated buffer
581
is shared because cache->buffer is the same for all caches.
583
One thread reads data from the file into the buffer. All threads
584
read from the buffer, but every thread maintains its own set of
585
pointers into the buffer. When all threads have used up the buffer
586
contents, one of the threads reads the next block of data into the
587
buffer. To accomplish this, each thread enters the cache lock before
588
accessing the buffer. They wait in lock_io_cache() until all threads
589
joined the lock. The last thread entering the lock is in charge of
590
reading from file to buffer. It wakes all threads when done.
592
Synchronizing a write cache to the read caches works so: Whenever
593
the write buffer needs a flush, the write thread enters the lock and
594
waits for all other threads to enter the lock too. They do this when
595
they have used up the read buffer. When all threads are in the lock,
596
the write thread copies the write buffer to the read buffer and
599
share->running_threads is the number of threads not being in the
600
cache lock. When entering lock_io_cache() the number is decreased.
601
When the thread that fills the buffer enters unlock_io_cache() the
602
number is reset to the number of threads. The condition
603
running_threads == 0 means that all threads are in the lock. Bumping
604
up the number to the full count is non-intuitive. But increasing the
605
number by one for each thread that leaves the lock could lead to a
606
solo run of one thread. The last thread to join a lock reads from
607
file to buffer, wakes the other threads, processes the data in the
608
cache and enters the lock again. If no other thread left the lock
609
meanwhile, it would think it's the last one again and read the next
612
The share has copies of 'error', 'buffer', 'read_end', and
613
'pos_in_file' from the thread that filled the buffer. We may not be
614
able to access this information directly from its cache because the
615
thread may be removed from the share before the variables could be
616
copied by all other threads. Or, if a write buffer is synchronized,
617
it would change its 'pos_in_file' after waking the other threads,
618
possibly before they could copy its value.
620
However, the 'buffer' variable in the share is for a synchronized
621
write cache. It needs to know where to put the data. Otherwise it
622
would need access to the read cache of one of the threads that is
623
not yet removed from the share.
629
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
630
IO_CACHE *write_cache, uint num_threads)
632
DBUG_ENTER("init_io_cache_share");
633
DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx share: 0x%lx "
634
"write_cache: 0x%lx threads: %u",
635
(long) read_cache, (long) cshare,
636
(long) write_cache, num_threads));
638
DBUG_ASSERT(num_threads > 1);
639
DBUG_ASSERT(read_cache->type == READ_CACHE);
640
DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
642
pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
643
pthread_cond_init(&cshare->cond, 0);
644
pthread_cond_init(&cshare->cond_writer, 0);
646
cshare->running_threads= num_threads;
647
cshare->total_threads= num_threads;
648
cshare->error= 0; /* Initialize. */
649
cshare->buffer= read_cache->buffer;
650
cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
651
cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
652
cshare->source_cache= write_cache; /* Can be NULL. */
654
read_cache->share= cshare;
655
read_cache->read_function= _my_b_read_r;
656
read_cache->current_pos= NULL;
657
read_cache->current_end= NULL;
660
write_cache->share= cshare;
667
Remove a thread from shared access to IO_CACHE.
671
cache The IO_CACHE to be removed from the share.
675
Every thread must do that on exit for not to deadlock other threads.
677
The last thread destroys the pthread resources.
679
A writer flushes its cache first.
685
void remove_io_thread(IO_CACHE *cache)
687
IO_CACHE_SHARE *cshare= cache->share;
689
DBUG_ENTER("remove_io_thread");
691
/* If the writer goes, it needs to flush the write cache. */
692
if (cache == cshare->source_cache)
693
flush_io_cache(cache);
695
pthread_mutex_lock(&cshare->mutex);
696
DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
697
(cache == cshare->source_cache) ?
698
"writer" : "reader", (long) cache));
700
/* Remove from share. */
701
total= --cshare->total_threads;
702
DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
704
/* Detach from share. */
707
/* If the writer goes, let the readers know. */
708
if (cache == cshare->source_cache)
710
DBUG_PRINT("io_cache_share", ("writer leaves"));
711
cshare->source_cache= NULL;
714
/* If all threads are waiting for me to join the lock, wake them. */
715
if (!--cshare->running_threads)
717
DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
718
pthread_cond_signal(&cshare->cond_writer);
719
pthread_cond_broadcast(&cshare->cond);
722
pthread_mutex_unlock(&cshare->mutex);
726
DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
727
pthread_cond_destroy (&cshare->cond_writer);
728
pthread_cond_destroy (&cshare->cond);
729
pthread_mutex_destroy(&cshare->mutex);
737
Lock IO cache and wait for all other threads to join.
741
cache The cache of the thread entering the lock.
742
pos File position of the block to read.
743
Unused for the write thread.
747
Wait for all threads to finish with the current buffer. We want
748
all threads to proceed in concert. The last thread to join
749
lock_io_cache() will read the block from file and all threads start
750
to use it. Then they will join again for reading the next block.
752
The waiting threads detect a fresh buffer by comparing
753
cshare->pos_in_file with the position they want to process next.
754
Since the first block may start at position 0, we take
755
cshare->read_end as an additional condition. This variable is
756
initialized to NULL and will be set after a block of data is written
760
1 OK, lock in place, go ahead and read.
761
0 OK, unlocked, another thread did the read.
764
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
766
IO_CACHE_SHARE *cshare= cache->share;
767
DBUG_ENTER("lock_io_cache");
769
/* Enter the lock. */
770
pthread_mutex_lock(&cshare->mutex);
771
cshare->running_threads--;
772
DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
773
(cache == cshare->source_cache) ?
774
"writer" : "reader", (long) cache, (ulong) pos,
775
cshare->running_threads));
777
if (cshare->source_cache)
779
/* A write cache is synchronized to the read caches. */
781
if (cache == cshare->source_cache)
783
/* The writer waits until all readers are here. */
784
while (cshare->running_threads)
786
DBUG_PRINT("io_cache_share", ("writer waits in lock"));
787
pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
789
DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
791
/* Stay locked. Leave the lock later by unlock_io_cache(). */
795
/* The last thread wakes the writer. */
796
if (!cshare->running_threads)
798
DBUG_PRINT("io_cache_share", ("waking writer"));
799
pthread_cond_signal(&cshare->cond_writer);
803
Readers wait until the data is copied from the writer. Another
804
reason to stop waiting is the removal of the write thread. If this
805
happens, we leave the lock with old data in the buffer.
807
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
808
cshare->source_cache)
810
DBUG_PRINT("io_cache_share", ("reader waits in lock"));
811
pthread_cond_wait(&cshare->cond, &cshare->mutex);
815
If the writer was removed from the share while this thread was
816
asleep, we need to simulate an EOF condition. The writer cannot
817
reset the share variables as they might still be in use by readers
818
of the last block. When we awake here then because the last
819
joining thread signalled us. If the writer is not the last, it
820
will not signal. So it is safe to clear the buffer here.
822
if (!cshare->read_end || (cshare->pos_in_file < pos))
824
DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
825
cshare->read_end= cshare->buffer; /* Empty buffer. */
826
cshare->error= 0; /* EOF is not an error. */
832
There are read caches only. The last thread arriving in
833
lock_io_cache() continues with a locked cache and reads the block.
835
if (!cshare->running_threads)
837
DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
838
/* Stay locked. Leave the lock later by unlock_io_cache(). */
843
All other threads wait until the requested block is read by the
844
last thread arriving. Another reason to stop waiting is the
845
removal of a thread. If this leads to all threads being in the
846
lock, we have to continue also. The first of the awaken threads
847
will then do the read.
849
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
850
cshare->running_threads)
852
DBUG_PRINT("io_cache_share", ("reader waits in lock"));
853
pthread_cond_wait(&cshare->cond, &cshare->mutex);
856
/* If the block is not yet read, continue with a locked cache and read. */
857
if (!cshare->read_end || (cshare->pos_in_file < pos))
859
DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
860
/* Stay locked. Leave the lock later by unlock_io_cache(). */
864
/* Another thread did read the block already. */
866
DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
867
(uint) (cshare->read_end ? (size_t)
868
(cshare->read_end - cshare->buffer) :
872
Leave the lock. Do not call unlock_io_cache() later. The thread that
873
filled the buffer did this and marked all threads as running.
875
pthread_mutex_unlock(&cshare->mutex);
885
cache The cache of the thread leaving the lock.
888
This is called by the thread that filled the buffer. It marks all
889
threads as running and awakes them. This must not be done by any
892
Do not signal cond_writer. Either there is no writer or the writer
893
is the only one who can call this function.
895
The reason for resetting running_threads to total_threads before
896
waking all other threads is that it could be possible that this
897
thread is so fast with processing the buffer that it enters the lock
898
before even one other thread has left it. If every awoken thread
899
would increase running_threads by one, this thread could think that
900
he is again the last to join and would not wait for the other
901
threads to process the data.
907
static void unlock_io_cache(IO_CACHE *cache)
909
IO_CACHE_SHARE *cshare= cache->share;
910
DBUG_ENTER("unlock_io_cache");
911
DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
912
(cache == cshare->source_cache) ?
914
(long) cache, (ulong) cshare->pos_in_file,
915
cshare->total_threads));
917
cshare->running_threads= cshare->total_threads;
918
pthread_cond_broadcast(&cshare->cond);
919
pthread_mutex_unlock(&cshare->mutex);
925
Read from IO_CACHE when it is shared between several threads.
929
cache IO_CACHE pointer
930
Buffer Buffer to retrieve count bytes from file
931
Count Number of bytes to read into Buffer
934
This function is only called from the my_b_read() macro when there
935
isn't enough characters in the buffer to satisfy the request.
939
It works as follows: when a thread tries to read from a file (that
940
is, after using all the data from the (shared) buffer), it just
941
hangs on lock_io_cache(), waiting for other threads. When the very
942
last thread attempts a read, lock_io_cache() returns 1, the thread
943
does actual IO and unlock_io_cache(), which signals all the waiting
944
threads that data is in the buffer.
948
When changing this function, be careful with handling file offsets
949
(end-of_file, pos_in_file). Do not cast them to possibly smaller
950
types than my_off_t unless you can be sure that their value fits.
951
Same applies to differences of file offsets. (Bug #11527)
953
When changing this function, check _my_b_read(). It might need the
957
0 we succeeded in reading all data
958
1 Error: can't read requested characters
961
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
963
my_off_t pos_in_file;
964
size_t length, diff_length, left_length;
965
IO_CACHE_SHARE *cshare= cache->share;
966
DBUG_ENTER("_my_b_read_r");
968
if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
970
DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
971
memcpy(Buffer, cache->read_pos, left_length);
972
Buffer+= left_length;
979
pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
980
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
981
length=IO_ROUND_UP(Count+diff_length)-diff_length;
982
length= ((length <= cache->read_length) ?
983
length + IO_ROUND_DN(cache->read_length - length) :
984
length - IO_ROUND_UP(length - cache->read_length));
985
if (cache->type != READ_FIFO &&
986
(length > (cache->end_of_file - pos_in_file)))
987
length= (size_t) (cache->end_of_file - pos_in_file);
990
cache->error= (int) left_length;
993
if (lock_io_cache(cache, pos_in_file))
995
/* With a synchronized write/read cache we won't come here... */
996
DBUG_ASSERT(!cshare->source_cache);
998
... unless the writer has gone before this thread entered the
999
lock. Simulate EOF in this case. It can be distinguished by
1002
if (cache->file < 0)
1007
Whenever a function which operates on IO_CACHE flushes/writes
1008
some part of the IO_CACHE to disk it will set the property
1009
"seek_not_done" to indicate this to other functions operating
1012
if (cache->seek_not_done)
1014
if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
1015
== MY_FILEPOS_ERROR)
1018
unlock_io_cache(cache);
1022
len= my_read(cache->file, cache->buffer, length, cache->myflags);
1024
DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
1026
cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
1027
cache->error= (len == length ? 0 : (int) len);
1028
cache->pos_in_file= pos_in_file;
1030
/* Copy important values to the share. */
1031
cshare->error= cache->error;
1032
cshare->read_end= cache->read_end;
1033
cshare->pos_in_file= pos_in_file;
1035
/* Mark all threads as running and wake them. */
1036
unlock_io_cache(cache);
1041
With a synchronized write/read cache readers always come here.
1042
Copy important values from the share.
1044
cache->error= cshare->error;
1045
cache->read_end= cshare->read_end;
1046
cache->pos_in_file= cshare->pos_in_file;
1048
len= ((cache->error == -1) ? (size_t) -1 :
1049
(size_t) (cache->read_end - cache->buffer));
1051
cache->read_pos= cache->buffer;
1052
cache->seek_not_done= 0;
1053
if (len == 0 || len == (size_t) -1)
1055
DBUG_PRINT("io_cache_share", ("reader error. len %lu left %lu",
1056
(ulong) len, (ulong) left_length));
1057
cache->error= (int) left_length;
1060
cnt= (len > Count) ? Count : len;
1061
memcpy(Buffer, cache->read_pos, cnt);
1065
cache->read_pos+= cnt;
1072
Copy data from write cache to read cache.
1075
copy_to_read_buffer()
1076
write_cache The write cache.
1077
write_buffer The source of data, mostly the cache buffer.
1078
write_length The number of bytes to copy.
1081
The write thread will wait for all read threads to join the cache
1082
lock. Then it copies the data over and wakes the read threads.
1088
static void copy_to_read_buffer(IO_CACHE *write_cache,
1089
const uchar *write_buffer, size_t write_length)
1091
IO_CACHE_SHARE *cshare= write_cache->share;
1093
DBUG_ASSERT(cshare->source_cache == write_cache);
1095
write_length is usually less or equal to buffer_length.
1096
It can be bigger if _my_b_write() is called with a big length.
1098
while (write_length)
1100
size_t copy_length= min(write_length, write_cache->buffer_length);
1101
int __attribute__((unused)) rc;
1103
rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1104
/* The writing thread does always have the lock when it awakes. */
1107
memcpy(cshare->buffer, write_buffer, copy_length);
1110
cshare->read_end= cshare->buffer + copy_length;
1111
cshare->pos_in_file= write_cache->pos_in_file;
1113
/* Mark all threads as running and wake them. */
1114
unlock_io_cache(write_cache);
1116
write_buffer+= copy_length;
1117
write_length-= copy_length;
1124
Do sequential read from the SEQ_READ_APPEND cache.
1126
We do this in three stages:
1127
- first read from info->buffer
1128
- then if there are still data to read, try the file descriptor
1129
- afterwards, if there are still data to read, try append buffer
1136
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1138
size_t length, diff_length, left_length, save_count, max_length;
1139
my_off_t pos_in_file;
1142
/* first, read the regular buffer */
1143
if ((left_length=(size_t) (info->read_end-info->read_pos)))
1145
DBUG_ASSERT(Count > left_length); /* User is not using my_b_read() */
1146
memcpy(Buffer,info->read_pos, left_length);
1147
Buffer+=left_length;
1150
lock_append_buffer(info);
1152
/* pos_in_file always point on where info->buffer was read */
1153
if ((pos_in_file=info->pos_in_file +
1154
(size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1155
goto read_append_buffer;
1158
With read-append cache we must always do a seek before we read,
1159
because the write could have moved the file pointer astray
1161
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
1164
unlock_append_buffer(info);
1167
info->seek_not_done=0;
1169
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1171
/* now the second stage begins - read from file descriptor */
1172
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1174
/* Fill first intern buffer */
1177
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1178
if ((read_length= my_read(info->file,Buffer, length,
1179
info->myflags)) == (size_t) -1)
1182
unlock_append_buffer(info);
1186
Buffer+=read_length;
1187
pos_in_file+=read_length;
1189
if (read_length != length)
1192
We only got part of data; Read the rest of the data from the
1195
goto read_append_buffer;
1197
left_length+=length;
1201
max_length= info->read_length-diff_length;
1202
if (max_length > (info->end_of_file - pos_in_file))
1203
max_length= (size_t) (info->end_of_file - pos_in_file);
1207
goto read_append_buffer;
1208
length=0; /* Didn't read any more chars */
1212
length= my_read(info->file,info->buffer, max_length, info->myflags);
1213
if (length == (size_t) -1)
1216
unlock_append_buffer(info);
1221
memcpy(Buffer, info->buffer, length);
1226
added the line below to make
1227
DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
1228
otherwise this does not appear to be needed
1230
pos_in_file += length;
1231
goto read_append_buffer;
1234
unlock_append_buffer(info);
1235
info->read_pos=info->buffer+Count;
1236
info->read_end=info->buffer+length;
1237
info->pos_in_file=pos_in_file;
1238
memcpy(Buffer,info->buffer,(size_t) Count);
1244
Read data from the current write buffer.
1245
Count should never be == 0 here (The code will work even if count is 0)
1249
/* First copy the data to Count */
1250
size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1252
size_t transfer_len;
1254
DBUG_ASSERT(info->append_read_pos <= info->write_pos);
1256
TODO: figure out if the assert below is needed or correct.
1258
DBUG_ASSERT(pos_in_file == info->end_of_file);
1259
copy_len=min(Count, len_in_buff);
1260
memcpy(Buffer, info->append_read_pos, copy_len);
1261
info->append_read_pos += copy_len;
1264
info->error = save_count - Count;
1266
/* Fill read buffer with data from write buffer */
1267
memcpy(info->buffer, info->append_read_pos,
1268
(size_t) (transfer_len=len_in_buff - copy_len));
1269
info->read_pos= info->buffer;
1270
info->read_end= info->buffer+transfer_len;
1271
info->append_read_pos=info->write_pos;
1272
info->pos_in_file=pos_in_file+copy_len;
1273
info->end_of_file+=len_in_buff;
1275
unlock_append_buffer(info);
1276
return Count ? 1 : 0;
1283
Read from the IO_CACHE into a buffer and feed asynchronously
1284
from disk when needed.
1288
info IO_CACHE pointer
1289
Buffer Buffer to retrieve count bytes from file
1290
Count Number of bytes to read into Buffer
1293
-1 An error has occurred; my_errno is set.
1295
1 An error has occurred; IO_CACHE to error state.
1298
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1300
size_t length,read_length,diff_length,left_length,use_length,org_Count;
1302
my_off_t next_pos_in_file;
1305
memcpy(Buffer,info->read_pos,
1306
(left_length= (size_t) (info->read_end-info->read_pos)));
1307
Buffer+=left_length;
1312
{ /* wait for read block */
1313
info->inited=0; /* No more block to read */
1314
my_aiowait(&info->aio_result); /* Wait for outstanding req */
1315
if (info->aio_result.result.aio_errno)
1317
if (info->myflags & MY_WME)
1318
my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
1319
my_filename(info->file),
1320
info->aio_result.result.aio_errno);
1321
my_errno=info->aio_result.result.aio_errno;
1325
if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
1326
read_length == (size_t) -1)
1328
my_errno=0; /* For testing */
1329
info->error= (read_length == (size_t) -1 ? -1 :
1330
(int) (read_length+left_length));
1333
info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
1335
if (info->request_pos != info->buffer)
1336
info->request_pos=info->buffer;
1338
info->request_pos=info->buffer+info->read_length;
1339
info->read_pos=info->request_pos;
1340
next_pos_in_file=info->aio_read_pos+read_length;
1342
/* Check if pos_in_file is changed
1343
(_ni_read_cache may have skipped some bytes) */
1345
if (info->aio_read_pos < info->pos_in_file)
1346
{ /* Fix if skipped bytes */
1347
if (info->aio_read_pos + read_length < info->pos_in_file)
1349
read_length=0; /* Skip block */
1350
next_pos_in_file=info->pos_in_file;
1354
my_off_t offset= (info->pos_in_file - info->aio_read_pos);
1355
info->pos_in_file=info->aio_read_pos; /* Whe are here */
1356
info->read_pos=info->request_pos+offset;
1357
read_length-=offset; /* Bytes left from read_pos */
1361
if (info->aio_read_pos > info->pos_in_file)
1364
return(info->read_length= (size_t) -1);
1367
/* Copy found bytes to buffer */
1368
length=min(Count,read_length);
1369
memcpy(Buffer,info->read_pos,(size_t) length);
1372
left_length+=length;
1373
info->read_end=info->rc_pos+read_length;
1374
info->read_pos+=length;
1377
next_pos_in_file=(info->pos_in_file+ (size_t)
1378
(info->read_end - info->request_pos));
1380
/* If reading large blocks, or first read or read with skip */
1383
if (next_pos_in_file == info->end_of_file)
1385
info->error=(int) (read_length+left_length);
1389
if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
1390
== MY_FILEPOS_ERROR)
1396
read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
1397
if (Count < read_length)
1398
{ /* Small block, read to cache */
1399
if ((read_length=my_read(info->file,info->request_pos,
1400
read_length, info->myflags)) == (size_t) -1)
1401
return info->error= -1;
1402
use_length=min(Count,read_length);
1403
memcpy(Buffer,info->request_pos,(size_t) use_length);
1404
info->read_pos=info->request_pos+Count;
1405
info->read_end=info->request_pos+read_length;
1406
info->pos_in_file=next_pos_in_file; /* Start of block in cache */
1407
next_pos_in_file+=read_length;
1409
if (Count != use_length)
1410
{ /* Didn't find hole block */
1411
if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
1412
my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
1413
my_filename(info->file),my_errno);
1414
info->error=(int) (read_length+left_length);
1419
{ /* Big block, don't cache it */
1420
if ((read_length= my_read(info->file,Buffer, Count,info->myflags))
1423
info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
1426
info->read_pos=info->read_end=info->request_pos;
1427
info->pos_in_file=(next_pos_in_file+=Count);
1431
/* Read next block with asyncronic io */
1432
diff_length=(next_pos_in_file & (IO_SIZE-1));
1433
max_length= info->read_length - diff_length;
1434
if (max_length > info->end_of_file - next_pos_in_file)
1435
max_length= (size_t) (info->end_of_file - next_pos_in_file);
1437
if (info->request_pos != info->buffer)
1438
read_buffer=info->buffer;
1440
read_buffer=info->buffer+info->read_length;
1441
info->aio_read_pos=next_pos_in_file;
1444
info->aio_result.result.aio_errno=AIO_INPROGRESS; /* Marker for test */
1445
DBUG_PRINT("aioread",("filepos: %ld length: %lu",
1446
(ulong) next_pos_in_file, (ulong) max_length));
1447
if (aioread(info->file,read_buffer, max_length,
1448
(my_off_t) next_pos_in_file,MY_SEEK_SET,
1449
&info->aio_result.result))
1450
{ /* Skip async io */
1452
DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
1453
errno, info->aio_result.result.aio_errno));
1454
if (info->request_pos != info->buffer)
1456
bmove(info->buffer,info->request_pos,
1457
(size_t) (info->read_end - info->read_pos));
1458
info->request_pos=info->buffer;
1459
info->read_pos-=info->read_length;
1460
info->read_end-=info->read_length;
1462
info->read_length=info->buffer_length; /* Use hole buffer */
1463
info->read_function=_my_b_read; /* Use normal IO_READ next */
1466
info->inited=info->aio_result.pending=1;
1468
return 0; /* Block read, async in use */
1469
} /* _my_b_async_read */
1473
/* Read one byte when buffer is empty */
1475
int _my_b_get(IO_CACHE *info)
1478
IO_CACHE_CALLBACK pre_read,post_read;
1479
if ((pre_read = info->pre_read))
1481
if ((*(info)->read_function)(info,&buff,1))
1483
if ((post_read = info->post_read))
1485
return (int) (uchar) buff;
1489
Write a byte buffer to IO_CACHE and flush to disk
1490
if IO_CACHE is full.
1495
-1 On error; my_errno contains error code.
1498
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1500
size_t rest_length,length;
1502
if (info->pos_in_file+info->buffer_length > info->end_of_file)
1504
my_errno=errno=EFBIG;
1505
return info->error = -1;
1508
rest_length= (size_t) (info->write_end - info->write_pos);
1509
memcpy(info->write_pos,Buffer,(size_t) rest_length);
1510
Buffer+=rest_length;
1512
info->write_pos+=rest_length;
1514
if (my_b_flush_io_cache(info,1))
1516
if (Count >= IO_SIZE)
1517
{ /* Fill first intern buffer */
1518
length=Count & (size_t) ~(IO_SIZE-1);
1519
if (info->seek_not_done)
1522
Whenever a function which operates on IO_CACHE flushes/writes
1523
some part of the IO_CACHE to disk it will set the property
1524
"seek_not_done" to indicate this to other functions operating
1527
if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
1532
info->seek_not_done=0;
1534
if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
1535
return info->error= -1;
1539
In case of a shared I/O cache with a writer we normally do direct
1540
write cache to read cache copy. Simulate this here by direct
1541
caller buffer to read cache copy. Do it after the write so that
1542
the cache readers actions on the flushed part can go in parallel
1543
with the write of the extra stuff. copy_to_read_buffer()
1544
synchronizes writer and readers so that after this call the
1545
readers can act on the extra stuff while the writer can go ahead
1546
and prepare the next output. copy_to_read_buffer() relies on
1550
copy_to_read_buffer(info, Buffer, length);
1555
info->pos_in_file+=length;
1557
memcpy(info->write_pos,Buffer,(size_t) Count);
1558
info->write_pos+=Count;
1564
Append a block to the write buffer.
1565
This is done with the buffer locked to ensure that we don't read from
1566
the write buffer before we are ready with it.
1569
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1571
size_t rest_length,length;
1575
Assert that we cannot come here with a shared cache. If we do one
1576
day, we might need to add a call to copy_to_read_buffer().
1578
DBUG_ASSERT(!info->share);
1581
lock_append_buffer(info);
1582
rest_length= (size_t) (info->write_end - info->write_pos);
1583
if (Count <= rest_length)
1585
memcpy(info->write_pos, Buffer, rest_length);
1586
Buffer+=rest_length;
1588
info->write_pos+=rest_length;
1589
if (my_b_flush_io_cache(info,0))
1591
unlock_append_buffer(info);
1594
if (Count >= IO_SIZE)
1595
{ /* Fill first intern buffer */
1596
length=Count & (size_t) ~(IO_SIZE-1);
1597
if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
1599
unlock_append_buffer(info);
1600
return info->error= -1;
1604
info->end_of_file+=length;
1608
memcpy(info->write_pos,Buffer,(size_t) Count);
1609
info->write_pos+=Count;
1610
unlock_append_buffer(info);
1615
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1618
Sasha: We are not writing this with the ? operator to avoid hitting
1619
a possible compiler bug. At least gcc 2.95 cannot deal with
1620
several layers of ternary operators that evaluated comma(,) operator
1621
expressions inside - I do have a test case if somebody wants it
1623
if (info->type == SEQ_READ_APPEND)
1624
return my_b_append(info, Buffer, Count);
1625
return my_b_write(info, Buffer, Count);
1630
Write a block to disk where part of the data may be inside the record
1631
buffer. As all write calls to the data goes through the cache,
1632
we will never get a seek over the end of the buffer
1635
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
1643
Assert that we cannot come here with a shared cache. If we do one
1644
day, we might need to add a call to copy_to_read_buffer().
1646
DBUG_ASSERT(!info->share);
1649
if (pos < info->pos_in_file)
1651
/* Of no overlap, write everything without buffering */
1652
if (pos + Count <= info->pos_in_file)
1653
return my_pwrite(info->file, Buffer, Count, pos,
1654
info->myflags | MY_NABP);
1655
/* Write the part of the block that is before buffer */
1656
length= (uint) (info->pos_in_file - pos);
1657
if (my_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
1658
info->error= error= -1;
1663
info->seek_not_done=1;
1667
/* Check if we want to write inside the used part of the buffer.*/
1668
length= (size_t) (info->write_end - info->buffer);
1669
if (pos < info->pos_in_file + length)
1671
size_t offset= (size_t) (pos - info->pos_in_file);
1675
memcpy(info->buffer+offset, Buffer, length);
1678
/* Fix length of buffer if the new data was larger */
1679
if (info->buffer+length > info->write_pos)
1680
info->write_pos=info->buffer+length;
1684
/* Write at the end of the current buffer; This is the normal case */
1685
if (_my_b_write(info, Buffer, Count))
1691
/* Flush write cache */
1694
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1695
lock_append_buffer(info);
1696
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1697
unlock_append_buffer(info);
1699
#define LOCK_APPEND_BUFFER
1700
#define UNLOCK_APPEND_BUFFER
1704
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
1707
my_bool append_cache;
1708
my_off_t pos_in_file;
1709
DBUG_ENTER("my_b_flush_io_cache");
1710
DBUG_PRINT("enter", ("cache: 0x%lx", (long) info));
1712
if (!(append_cache = (info->type == SEQ_READ_APPEND)))
1713
need_append_buffer_lock=0;
1715
if (info->type == WRITE_CACHE || append_cache)
1717
if (info->file == -1)
1719
if (real_open_cached_file(info))
1720
DBUG_RETURN((info->error= -1));
1724
if ((length=(size_t) (info->write_pos - info->write_buffer)))
1728
In case of a shared I/O cache with a writer we do direct write
1729
cache to read cache copy. Do it before the write here so that
1730
the readers can work in parallel with the write.
1731
copy_to_read_buffer() relies on info->pos_in_file.
1734
copy_to_read_buffer(info, info->write_buffer, length);
1737
pos_in_file=info->pos_in_file;
1739
If we have append cache, we always open the file with
1740
O_APPEND which moves the pos to EOF automatically on every write
1742
if (!append_cache && info->seek_not_done)
1743
{ /* File touched, do seek */
1744
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
1747
UNLOCK_APPEND_BUFFER;
1748
DBUG_RETURN((info->error= -1));
1751
info->seek_not_done=0;
1754
info->pos_in_file+=length;
1755
info->write_end= (info->write_buffer+info->buffer_length-
1756
((pos_in_file+length) & (IO_SIZE-1)));
1758
if (my_write(info->file,info->write_buffer,length,
1759
info->myflags | MY_NABP))
1765
set_if_bigger(info->end_of_file,(pos_in_file+length));
1769
info->end_of_file+=(info->write_pos-info->append_read_pos);
1770
DBUG_ASSERT(info->end_of_file == my_tell(info->file,MYF(0)));
1773
info->append_read_pos=info->write_pos=info->write_buffer;
1774
++info->disk_writes;
1775
UNLOCK_APPEND_BUFFER;
1776
DBUG_RETURN(info->error);
1780
else if (info->type != READ_NET)
1782
my_aiowait(&info->aio_result); /* Wait for outstanding req */
1786
UNLOCK_APPEND_BUFFER;
1791
Free an IO_CACHE object
1795
info IO_CACHE Handle to free
1798
It's currently safe to call this if one has called init_io_cache()
1799
on the 'info' object, even if init_io_cache() failed.
1800
This function is also safe to call twice with the same handle.
1807
int end_io_cache(IO_CACHE *info)
1810
IO_CACHE_CALLBACK pre_close;
1811
DBUG_ENTER("end_io_cache");
1812
DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info));
1816
Every thread must call remove_io_thread(). The last one destroys
1819
DBUG_ASSERT(!info->share || !info->share->total_threads);
1822
if ((pre_close=info->pre_close))
1827
if (info->alloced_buffer)
1829
info->alloced_buffer=0;
1830
if (info->file != -1) /* File doesn't exist */
1831
error= my_b_flush_io_cache(info,1);
1832
my_free((uchar*) info->buffer,MYF(MY_WME));
1833
info->buffer=info->read_pos=(uchar*) 0;
1835
if (info->type == SEQ_READ_APPEND)
1837
/* Destroy allocated mutex */
1838
info->type= TYPE_NOT_SET;
1840
pthread_mutex_destroy(&info->append_buffer_lock);
1844
} /* end_io_cache */
1847
/**********************************************************************
1848
Testing of MF_IOCACHE
1849
**********************************************************************/
1855
void die(const char* fmt, ...)
1858
va_start(va_args,fmt);
1859
fprintf(stderr,"Error:");
1860
vfprintf(stderr, fmt,va_args);
1861
fprintf(stderr,", errno=%d\n", errno);
1865
int open_file(const char* fname, IO_CACHE* info, int cache_size)
1868
if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1869
die("Could not open %s", fname);
1870
if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1871
die("failed in init_io_cache()");
1875
void close_file(IO_CACHE* info)
1878
my_close(info->file, MYF(MY_WME));
1881
int main(int argc, char** argv)
1883
IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1885
const char* fname="/tmp/iocache.test";
1886
int cache_size=16384;
1888
int max_block,total_bytes=0;
1889
int i,num_loops=100,error=0;
1891
char* block, *block_end;
1893
max_block = cache_size*3;
1894
if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1895
die("Not enough memory to allocate test block");
1896
block_end = block + max_block;
1897
for (p = block,i=0; p < block_end;i++)
1901
if (my_stat(fname,&status, MYF(0)) &&
1902
my_delete(fname,MYF(MY_WME)))
1904
die("Delete of %s failed, aborting", fname);
1906
open_file(fname,&sra_cache, cache_size);
1907
for (i = 0; i < num_loops; i++)
1910
int block_size = abs(rand() % max_block);
1911
int4store(buf, block_size);
1912
if (my_b_append(&sra_cache,buf,4) ||
1913
my_b_append(&sra_cache, block, block_size))
1914
die("write failed");
1915
total_bytes += 4+block_size;
1917
close_file(&sra_cache);
1918
my_free(block,MYF(MY_WME));
1919
if (!my_stat(fname,&status,MYF(MY_WME)))
1920
die("%s failed to stat, but I had just closed it,\
1921
wonder how that happened");
1922
printf("Final size of %s is %s, wrote %d bytes\n",fname,
1923
llstr(status.st_size,llstr_buf),
1925
my_delete(fname, MYF(MY_WME));
1926
/* check correctness of tests */
1927
if (total_bytes != status.st_size)
1929
fprintf(stderr,"Not the same number of bytes acutally in file as bytes \
1930
supposedly written\n");