~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.c

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/*
 
17
  Cashing of files with only does (sequential) read or writes of fixed-
 
18
  length records. A read isn't allowed to go over file-length. A read is ok
 
19
  if it ends at file-length and next read can try to read after file-length
 
20
  (and get a EOF-error).
 
21
  Possibly use of asyncronic io.
 
22
  macros for read and writes for faster io.
 
23
  Used instead of FILE when reading or writing whole files.
 
24
  This code makes mf_rec_cache obsolete (currently only used by ISAM)
 
25
  One can change info->pos_in_file to a higher value to skip bytes in file if
 
26
  also info->read_pos is set to info->read_end.
 
27
  If called through open_cached_file(), then the temporary file will
 
28
  only be created if a write exeeds the file buffer or if one calls
 
29
  my_b_flush_io_cache().
 
30
 
 
31
  If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
 
32
  reading and another for writing.  Reads are first done from disk and
 
33
  then done from the write buffer.  This is an efficient way to read
 
34
  from a log file when one is writing to it at the same time.
 
35
  For this to work, the file has to be opened in append mode!
 
36
  Note that when one uses SEQ_READ_APPEND, one MUST write using
 
37
  my_b_append !  This is needed because we need to lock the mutex
 
38
  every time we access the write buffer.
 
39
 
 
40
TODO:
 
41
  When one SEQ_READ_APPEND and we are reading and writing at the same time,
 
42
  each time the write buffer gets full and it's written to disk, we will
 
43
  always do a disk read to read a part of the buffer from disk to the
 
44
  read buffer.
 
45
  This should be fixed so that when we do a my_b_flush_io_cache() and
 
46
  we have been reading the write buffer, we should transfer the rest of the
 
47
  write buffer to the read buffer before we start to reuse it.
 
48
*/
 
49
 
 
50
#define MAP_TO_USE_RAID
 
51
#include "mysys_priv.h"
 
52
#include <m_string.h>
 
53
#ifdef HAVE_AIOWAIT
 
54
#include "mysys_err.h"
 
55
static void my_aiowait(my_aio_result *result);
 
56
#endif
 
57
#include <errno.h>
 
58
 
 
59
#ifdef THREAD
 
60
#define lock_append_buffer(info) \
 
61
 pthread_mutex_lock(&(info)->append_buffer_lock)
 
62
#define unlock_append_buffer(info) \
 
63
 pthread_mutex_unlock(&(info)->append_buffer_lock)
 
64
#else
 
65
#define lock_append_buffer(info)
 
66
#define unlock_append_buffer(info)
 
67
#endif
 
68
 
 
69
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
 
70
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
 
71
 
 
72
/*
 
73
  Setup internal pointers inside IO_CACHE
 
74
 
 
75
  SYNOPSIS
 
76
    setup_io_cache()
 
77
    info                IO_CACHE handler
 
78
 
 
79
  NOTES
 
80
    This is called on automaticly on init or reinit of IO_CACHE
 
81
    It must be called externally if one moves or copies an IO_CACHE
 
82
    object.
 
83
*/
 
84
 
 
85
void setup_io_cache(IO_CACHE* info)
 
86
{
 
87
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
 
88
  if (info->type == WRITE_CACHE)
 
89
  {
 
90
    info->current_pos= &info->write_pos;
 
91
    info->current_end= &info->write_end;
 
92
  }
 
93
  else
 
94
  {
 
95
    info->current_pos= &info->read_pos;
 
96
    info->current_end= &info->read_end;
 
97
  }
 
98
}
 
99
 
 
100
 
 
101
static void
 
102
init_functions(IO_CACHE* info)
 
103
{
 
104
  enum cache_type type= info->type;
 
105
  switch (type) {
 
106
  case READ_NET:
 
107
    /*
 
108
      Must be initialized by the caller. The problem is that
 
109
      _my_b_net_read has to be defined in sql directory because of
 
110
      the dependency on THD, and therefore cannot be visible to
 
111
      programs that link against mysys but know nothing about THD, such
 
112
      as myisamchk
 
113
    */
 
114
    break;
 
115
  case SEQ_READ_APPEND:
 
116
    info->read_function = _my_b_seq_read;
 
117
    info->write_function = 0;                   /* Force a core if used */
 
118
    break;
 
119
  default:
 
120
    info->read_function =
 
121
#ifdef THREAD
 
122
                          info->share ? _my_b_read_r :
 
123
#endif
 
124
                                        _my_b_read;
 
125
    info->write_function = _my_b_write;
 
126
  }
 
127
 
 
128
  setup_io_cache(info);
 
129
}
 
130
 
 
131
 
 
132
/*
 
133
  Initialize an IO_CACHE object
 
134
 
 
135
  SYNOPSOS
 
136
    init_io_cache()
 
137
    info                cache handler to initialize
 
138
    file                File that should be associated to to the handler
 
139
                        If == -1 then real_open_cached_file()
 
140
                        will be called when it's time to open file.
 
141
    cachesize           Size of buffer to allocate for read/write
 
142
                        If == 0 then use my_default_record_cache_size
 
143
    type                Type of cache
 
144
    seek_offset         Where cache should start reading/writing
 
145
    use_async_io        Set to 1 of we should use async_io (if avaiable)
 
146
    cache_myflags       Bitmap of differnt flags
 
147
                        MY_WME | MY_FAE | MY_NABP | MY_FNABP |
 
148
                        MY_DONT_CHECK_FILESIZE
 
149
 
 
150
  RETURN
 
151
    0  ok
 
152
    #  error
 
153
*/
 
154
 
 
155
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
 
156
                  enum cache_type type, my_off_t seek_offset,
 
157
                  pbool use_async_io, myf cache_myflags)
 
158
{
 
159
  size_t min_cache;
 
160
  my_off_t pos;
 
161
  my_off_t end_of_file= ~(my_off_t) 0;
 
162
  DBUG_ENTER("init_io_cache");
 
163
  DBUG_PRINT("enter",("cache: 0x%lx  type: %d  pos: %ld",
 
164
                      (ulong) info, (int) type, (ulong) seek_offset));
 
165
 
 
166
  info->file= file;
 
167
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
 
168
  info->pos_in_file= seek_offset;
 
169
  info->pre_close = info->pre_read = info->post_read = 0;
 
170
  info->arg = 0;
 
171
  info->alloced_buffer = 0;
 
172
  info->buffer=0;
 
173
  info->seek_not_done= 0;
 
174
 
 
175
  if (file >= 0)
 
176
  {
 
177
    pos= my_tell(file, MYF(0));
 
178
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
 
179
    {
 
180
      /*
 
181
         This kind of object doesn't support seek() or tell(). Don't set a
 
182
         flag that will make us again try to seek() later and fail.
 
183
      */
 
184
      info->seek_not_done= 0;
 
185
      /*
 
186
        Additionally, if we're supposed to start somewhere other than the
 
187
        the beginning of whatever this file is, then somebody made a bad
 
188
        assumption.
 
189
      */
 
190
      DBUG_ASSERT(seek_offset == 0);
 
191
    }
 
192
    else
 
193
      info->seek_not_done= test(seek_offset != pos);
 
194
  }
 
195
 
 
196
  info->disk_writes= 0;
 
197
#ifdef THREAD
 
198
  info->share=0;
 
199
#endif
 
200
 
 
201
  if (!cachesize && !(cachesize= my_default_record_cache_size))
 
202
    DBUG_RETURN(1);                             /* No cache requested */
 
203
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
 
204
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
 
205
  {                                             /* Assume file isn't growing */
 
206
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
 
207
    {
 
208
      /* Calculate end of file to avoid allocating oversized buffers */
 
209
      end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
 
210
      /* Need to reset seek_not_done now that we just did a seek. */
 
211
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
 
212
      if (end_of_file < seek_offset)
 
213
        end_of_file=seek_offset;
 
214
      /* Trim cache size if the file is very small */
 
215
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
 
216
      {
 
217
        cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
 
218
        use_async_io=0;                         /* No need to use async */
 
219
      }
 
220
    }
 
221
  }
 
222
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
 
223
  if (type != READ_NET && type != WRITE_NET)
 
224
  {
 
225
    /* Retry allocating memory in smaller blocks until we get one */
 
226
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
 
227
    for (;;)
 
228
    {
 
229
      size_t buffer_block;
 
230
      /*
 
231
        Unset MY_WAIT_IF_FULL bit if it is set, to prevent conflict with
 
232
        MY_ZEROFILL.
 
233
      */
 
234
      myf flags= (myf) (cache_myflags & ~(MY_WME | MY_WAIT_IF_FULL));
 
235
 
 
236
      if (cachesize < min_cache)
 
237
        cachesize = min_cache;
 
238
      buffer_block= cachesize;
 
239
      if (type == SEQ_READ_APPEND)
 
240
        buffer_block *= 2;
 
241
      if (cachesize == min_cache)
 
242
        flags|= (myf) MY_WME;
 
243
 
 
244
      if ((info->buffer= (uchar*) my_malloc(buffer_block, flags)) != 0)
 
245
      {
 
246
        info->write_buffer=info->buffer;
 
247
        if (type == SEQ_READ_APPEND)
 
248
          info->write_buffer = info->buffer + cachesize;
 
249
        info->alloced_buffer=1;
 
250
        break;                                  /* Enough memory found */
 
251
      }
 
252
      if (cachesize == min_cache)
 
253
        DBUG_RETURN(2);                         /* Can't alloc cache */
 
254
      /* Try with less memory */
 
255
      cachesize= (cachesize*3/4 & ~(min_cache-1));
 
256
    }
 
257
  }
 
258
 
 
259
  DBUG_PRINT("info",("init_io_cache: cachesize = %lu", (ulong) cachesize));
 
260
  info->read_length=info->buffer_length=cachesize;
 
261
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
 
262
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
 
263
  if (type == SEQ_READ_APPEND)
 
264
  {
 
265
    info->append_read_pos = info->write_pos = info->write_buffer;
 
266
    info->write_end = info->write_buffer + info->buffer_length;
 
267
#ifdef THREAD
 
268
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
 
269
#endif
 
270
  }
 
271
#if defined(SAFE_MUTEX) && defined(THREAD)
 
272
  else
 
273
  {
 
274
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
 
275
    bzero((char*) &info->append_buffer_lock, sizeof(info));
 
276
  }
 
277
#endif
 
278
 
 
279
  if (type == WRITE_CACHE)
 
280
    info->write_end=
 
281
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
 
282
  else
 
283
    info->read_end=info->buffer;                /* Nothing in cache */
 
284
 
 
285
  /* End_of_file may be changed by user later */
 
286
  info->end_of_file= end_of_file;
 
287
  info->error=0;
 
288
  info->type= type;
 
289
  init_functions(info);
 
290
#ifdef HAVE_AIOWAIT
 
291
  if (use_async_io && ! my_disable_async_io)
 
292
  {
 
293
    DBUG_PRINT("info",("Using async io"));
 
294
    info->read_length/=2;
 
295
    info->read_function=_my_b_async_read;
 
296
  }
 
297
  info->inited=info->aio_result.pending=0;
 
298
#endif
 
299
  DBUG_RETURN(0);
 
300
}                                               /* init_io_cache */
 
301
 
 
302
        /* Wait until current request is ready */
 
303
 
 
304
#ifdef HAVE_AIOWAIT
 
305
static void my_aiowait(my_aio_result *result)
 
306
{
 
307
  if (result->pending)
 
308
  {
 
309
    struct aio_result_t *tmp;
 
310
    for (;;)
 
311
    {
 
312
      if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
 
313
      {
 
314
        if (errno == EINTR)
 
315
          continue;
 
316
        DBUG_PRINT("error",("No aio request, error: %d",errno));
 
317
        result->pending=0;                      /* Assume everythings is ok */
 
318
        break;
 
319
      }
 
320
      ((my_aio_result*) tmp)->pending=0;
 
321
      if ((my_aio_result*) tmp == result)
 
322
        break;
 
323
    }
 
324
  }
 
325
  return;
 
326
}
 
327
#endif
 
328
 
 
329
 
 
330
/*
 
331
  Use this to reset cache to re-start reading or to change the type
 
332
  between READ_CACHE <-> WRITE_CACHE
 
333
  If we are doing a reinit of a cache where we have the start of the file
 
334
  in the cache, we are reusing this memory without flushing it to disk.
 
335
*/
 
336
 
 
337
my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
 
338
                        my_off_t seek_offset,
 
339
                        pbool use_async_io __attribute__((unused)),
 
340
                        pbool clear_cache)
 
341
{
 
342
  DBUG_ENTER("reinit_io_cache");
 
343
  DBUG_PRINT("enter",("cache: 0x%lx type: %d  seek_offset: %lu  clear_cache: %d",
 
344
                      (ulong) info, type, (ulong) seek_offset,
 
345
                      (int) clear_cache));
 
346
 
 
347
  /* One can't do reinit with the following types */
 
348
  DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
 
349
              type != WRITE_NET && info->type != WRITE_NET &&
 
350
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
 
351
 
 
352
  /* If the whole file is in memory, avoid flushing to disk */
 
353
  if (! clear_cache &&
 
354
      seek_offset >= info->pos_in_file &&
 
355
      seek_offset <= my_b_tell(info))
 
356
  {
 
357
    /* Reuse current buffer without flushing it to disk */
 
358
    uchar *pos;
 
359
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
360
    {
 
361
      info->read_end=info->write_pos;
 
362
      info->end_of_file=my_b_tell(info);
 
363
      /*
 
364
        Trigger a new seek only if we have a valid
 
365
        file handle.
 
366
      */
 
367
      info->seek_not_done= (info->file != -1);
 
368
    }
 
369
    else if (type == WRITE_CACHE)
 
370
    {
 
371
      if (info->type == READ_CACHE)
 
372
      {
 
373
        info->write_end=info->write_buffer+info->buffer_length;
 
374
        info->seek_not_done=1;
 
375
      }
 
376
      info->end_of_file = ~(my_off_t) 0;
 
377
    }
 
378
    pos=info->request_pos+(seek_offset-info->pos_in_file);
 
379
    if (type == WRITE_CACHE)
 
380
      info->write_pos=pos;
 
381
    else
 
382
      info->read_pos= pos;
 
383
#ifdef HAVE_AIOWAIT
 
384
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
 
385
#endif
 
386
  }
 
387
  else
 
388
  {
 
389
    /*
 
390
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
 
391
      after the current positions should be ignored
 
392
    */
 
393
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
394
      info->end_of_file=my_b_tell(info);
 
395
    /* flush cache if we want to reuse it */
 
396
    if (!clear_cache && my_b_flush_io_cache(info,1))
 
397
      DBUG_RETURN(1);
 
398
    info->pos_in_file=seek_offset;
 
399
    /* Better to do always do a seek */
 
400
    info->seek_not_done=1;
 
401
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
 
402
    if (type == READ_CACHE)
 
403
    {
 
404
      info->read_end=info->buffer;              /* Nothing in cache */
 
405
    }
 
406
    else
 
407
    {
 
408
      info->write_end=(info->buffer + info->buffer_length -
 
409
                       (seek_offset & (IO_SIZE-1)));
 
410
      info->end_of_file= ~(my_off_t) 0;
 
411
    }
 
412
  }
 
413
  info->type=type;
 
414
  info->error=0;
 
415
  init_functions(info);
 
416
 
 
417
#ifdef HAVE_AIOWAIT
 
418
  if (use_async_io && ! my_disable_async_io &&
 
419
      ((ulong) info->buffer_length <
 
420
       (ulong) (info->end_of_file - seek_offset)))
 
421
  {
 
422
    info->read_length=info->buffer_length/2;
 
423
    info->read_function=_my_b_async_read;
 
424
  }
 
425
  info->inited=0;
 
426
#endif
 
427
  DBUG_RETURN(0);
 
428
} /* reinit_io_cache */
 
429
 
 
430
 
 
431
 
 
432
/*
 
433
  Read buffered.
 
434
 
 
435
  SYNOPSIS
 
436
    _my_b_read()
 
437
      info                      IO_CACHE pointer
 
438
      Buffer                    Buffer to retrieve count bytes from file
 
439
      Count                     Number of bytes to read into Buffer
 
440
 
 
441
  NOTE
 
442
    This function is only called from the my_b_read() macro when there
 
443
    isn't enough characters in the buffer to satisfy the request.
 
444
 
 
445
  WARNING
 
446
 
 
447
    When changing this function, be careful with handling file offsets
 
448
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
449
    types than my_off_t unless you can be sure that their value fits.
 
450
    Same applies to differences of file offsets.
 
451
 
 
452
    When changing this function, check _my_b_read_r(). It might need the
 
453
    same change.
 
454
 
 
455
  RETURN
 
456
    0      we succeeded in reading all data
 
457
    1      Error: can't read requested characters
 
458
*/
 
459
 
 
460
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
461
{
 
462
  size_t length,diff_length,left_length, max_length;
 
463
  my_off_t pos_in_file;
 
464
  DBUG_ENTER("_my_b_read");
 
465
 
 
466
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
 
467
  {
 
468
    DBUG_ASSERT(Count >= left_length);  /* User is not using my_b_read() */
 
469
    memcpy(Buffer,info->read_pos, left_length);
 
470
    Buffer+=left_length;
 
471
    Count-=left_length;
 
472
  }
 
473
 
 
474
  /* pos_in_file always point on where info->buffer was read */
 
475
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
 
476
 
 
477
  /* 
 
478
    Whenever a function which operates on IO_CACHE flushes/writes
 
479
    some part of the IO_CACHE to disk it will set the property
 
480
    "seek_not_done" to indicate this to other functions operating
 
481
    on the IO_CACHE.
 
482
  */
 
483
  if (info->seek_not_done)
 
484
  {
 
485
    if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) 
 
486
        != MY_FILEPOS_ERROR))
 
487
    {
 
488
      /* No error, reset seek_not_done flag. */
 
489
      info->seek_not_done= 0;
 
490
    }
 
491
    else
 
492
    {
 
493
      /*
 
494
        If the seek failed and the error number is ESPIPE, it is because
 
495
        info->file is a pipe or socket or FIFO.  We never should have tried
 
496
        to seek on that.  See Bugs#25807 and #22828 for more info.
 
497
      */
 
498
      DBUG_ASSERT(my_errno != ESPIPE);
 
499
      info->error= -1;
 
500
      DBUG_RETURN(1);
 
501
    }
 
502
  }
 
503
 
 
504
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
505
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
 
506
  {                                     /* Fill first intern buffer */
 
507
    size_t read_length;
 
508
    if (info->end_of_file <= pos_in_file)
 
509
    {                                   /* End of file */
 
510
      info->error= (int) left_length;
 
511
      DBUG_RETURN(1);
 
512
    }
 
513
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
514
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
 
515
        != length)
 
516
    {
 
517
      info->error= (read_length == (size_t) -1 ? -1 :
 
518
                    (int) (read_length+left_length));
 
519
      DBUG_RETURN(1);
 
520
    }
 
521
    Count-=length;
 
522
    Buffer+=length;
 
523
    pos_in_file+=length;
 
524
    left_length+=length;
 
525
    diff_length=0;
 
526
  }
 
527
 
 
528
  max_length= info->read_length-diff_length;
 
529
  if (info->type != READ_FIFO &&
 
530
      max_length > (info->end_of_file - pos_in_file))
 
531
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
532
  if (!max_length)
 
533
  {
 
534
    if (Count)
 
535
    {
 
536
      info->error= left_length;         /* We only got this many char */
 
537
      DBUG_RETURN(1);
 
538
    }
 
539
    length=0;                           /* Didn't read any chars */
 
540
  }
 
541
  else if ((length= my_read(info->file,info->buffer, max_length,
 
542
                            info->myflags)) < Count ||
 
543
           length == (size_t) -1)
 
544
  {
 
545
    if (length != (size_t) -1)
 
546
      memcpy(Buffer, info->buffer, length);
 
547
    info->pos_in_file= pos_in_file;
 
548
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
 
549
    info->read_pos=info->read_end=info->buffer;
 
550
    DBUG_RETURN(1);
 
551
  }
 
552
  info->read_pos=info->buffer+Count;
 
553
  info->read_end=info->buffer+length;
 
554
  info->pos_in_file=pos_in_file;
 
555
  memcpy(Buffer, info->buffer, Count);
 
556
  DBUG_RETURN(0);
 
557
}
 
558
 
 
559
 
 
560
#ifdef THREAD
 
561
/*
 
562
  Prepare IO_CACHE for shared use.
 
563
 
 
564
  SYNOPSIS
 
565
    init_io_cache_share()
 
566
      read_cache                A read cache. This will be copied for
 
567
                                every thread after setup.
 
568
      cshare                    The share.
 
569
      write_cache               If non-NULL a write cache that is to be
 
570
                                synchronized with the read caches.
 
571
      num_threads               Number of threads sharing the cache
 
572
                                including the write thread if any.
 
573
 
 
574
  DESCRIPTION
 
575
 
 
576
    The shared cache is used so: One IO_CACHE is initialized with
 
577
    init_io_cache(). This includes the allocation of a buffer. Then a
 
578
    share is allocated and init_io_cache_share() is called with the io
 
579
    cache and the share. Then the io cache is copied for each thread. So
 
580
    every thread has its own copy of IO_CACHE. But the allocated buffer
 
581
    is shared because cache->buffer is the same for all caches.
 
582
 
 
583
    One thread reads data from the file into the buffer. All threads
 
584
    read from the buffer, but every thread maintains its own set of
 
585
    pointers into the buffer. When all threads have used up the buffer
 
586
    contents, one of the threads reads the next block of data into the
 
587
    buffer. To accomplish this, each thread enters the cache lock before
 
588
    accessing the buffer. They wait in lock_io_cache() until all threads
 
589
    joined the lock. The last thread entering the lock is in charge of
 
590
    reading from file to buffer. It wakes all threads when done.
 
591
 
 
592
    Synchronizing a write cache to the read caches works so: Whenever
 
593
    the write buffer needs a flush, the write thread enters the lock and
 
594
    waits for all other threads to enter the lock too. They do this when
 
595
    they have used up the read buffer. When all threads are in the lock,
 
596
    the write thread copies the write buffer to the read buffer and
 
597
    wakes all threads.
 
598
 
 
599
    share->running_threads is the number of threads not being in the
 
600
    cache lock. When entering lock_io_cache() the number is decreased.
 
601
    When the thread that fills the buffer enters unlock_io_cache() the
 
602
    number is reset to the number of threads. The condition
 
603
    running_threads == 0 means that all threads are in the lock. Bumping
 
604
    up the number to the full count is non-intuitive. But increasing the
 
605
    number by one for each thread that leaves the lock could lead to a
 
606
    solo run of one thread. The last thread to join a lock reads from
 
607
    file to buffer, wakes the other threads, processes the data in the
 
608
    cache and enters the lock again. If no other thread left the lock
 
609
    meanwhile, it would think it's the last one again and read the next
 
610
    block...
 
611
 
 
612
    The share has copies of 'error', 'buffer', 'read_end', and
 
613
    'pos_in_file' from the thread that filled the buffer. We may not be
 
614
    able to access this information directly from its cache because the
 
615
    thread may be removed from the share before the variables could be
 
616
    copied by all other threads. Or, if a write buffer is synchronized,
 
617
    it would change its 'pos_in_file' after waking the other threads,
 
618
    possibly before they could copy its value.
 
619
 
 
620
    However, the 'buffer' variable in the share is for a synchronized
 
621
    write cache. It needs to know where to put the data. Otherwise it
 
622
    would need access to the read cache of one of the threads that is
 
623
    not yet removed from the share.
 
624
 
 
625
  RETURN
 
626
    void
 
627
*/
 
628
 
 
629
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
 
630
                         IO_CACHE *write_cache, uint num_threads)
 
631
{
 
632
  DBUG_ENTER("init_io_cache_share");
 
633
  DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx  share: 0x%lx  "
 
634
                                "write_cache: 0x%lx  threads: %u",
 
635
                                (long) read_cache, (long) cshare,
 
636
                                (long) write_cache, num_threads));
 
637
 
 
638
  DBUG_ASSERT(num_threads > 1);
 
639
  DBUG_ASSERT(read_cache->type == READ_CACHE);
 
640
  DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
 
641
 
 
642
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
 
643
  pthread_cond_init(&cshare->cond, 0);
 
644
  pthread_cond_init(&cshare->cond_writer, 0);
 
645
 
 
646
  cshare->running_threads= num_threads;
 
647
  cshare->total_threads=   num_threads;
 
648
  cshare->error=           0;    /* Initialize. */
 
649
  cshare->buffer=          read_cache->buffer;
 
650
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
 
651
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
 
652
  cshare->source_cache=    write_cache; /* Can be NULL. */
 
653
 
 
654
  read_cache->share=         cshare;
 
655
  read_cache->read_function= _my_b_read_r;
 
656
  read_cache->current_pos=   NULL;
 
657
  read_cache->current_end=   NULL;
 
658
 
 
659
  if (write_cache)
 
660
    write_cache->share= cshare;
 
661
 
 
662
  DBUG_VOID_RETURN;
 
663
}
 
664
 
 
665
 
 
666
/*
 
667
  Remove a thread from shared access to IO_CACHE.
 
668
 
 
669
  SYNOPSIS
 
670
    remove_io_thread()
 
671
      cache                     The IO_CACHE to be removed from the share.
 
672
 
 
673
  NOTE
 
674
 
 
675
    Every thread must do that on exit for not to deadlock other threads.
 
676
 
 
677
    The last thread destroys the pthread resources.
 
678
 
 
679
    A writer flushes its cache first.
 
680
 
 
681
  RETURN
 
682
    void
 
683
*/
 
684
 
 
685
void remove_io_thread(IO_CACHE *cache)
 
686
{
 
687
  IO_CACHE_SHARE *cshare= cache->share;
 
688
  uint total;
 
689
  DBUG_ENTER("remove_io_thread");
 
690
 
 
691
  /* If the writer goes, it needs to flush the write cache. */
 
692
  if (cache == cshare->source_cache)
 
693
    flush_io_cache(cache);
 
694
 
 
695
  pthread_mutex_lock(&cshare->mutex);
 
696
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
 
697
                                (cache == cshare->source_cache) ?
 
698
                                "writer" : "reader", (long) cache));
 
699
 
 
700
  /* Remove from share. */
 
701
  total= --cshare->total_threads;
 
702
  DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
 
703
 
 
704
  /* Detach from share. */
 
705
  cache->share= NULL;
 
706
 
 
707
  /* If the writer goes, let the readers know. */
 
708
  if (cache == cshare->source_cache)
 
709
  {
 
710
    DBUG_PRINT("io_cache_share", ("writer leaves"));
 
711
    cshare->source_cache= NULL;
 
712
  }
 
713
 
 
714
  /* If all threads are waiting for me to join the lock, wake them. */
 
715
  if (!--cshare->running_threads)
 
716
  {
 
717
    DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
 
718
    pthread_cond_signal(&cshare->cond_writer);
 
719
    pthread_cond_broadcast(&cshare->cond);
 
720
  }
 
721
 
 
722
  pthread_mutex_unlock(&cshare->mutex);
 
723
 
 
724
  if (!total)
 
725
  {
 
726
    DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
 
727
    pthread_cond_destroy (&cshare->cond_writer);
 
728
    pthread_cond_destroy (&cshare->cond);
 
729
    pthread_mutex_destroy(&cshare->mutex);
 
730
  }
 
731
 
 
732
  DBUG_VOID_RETURN;
 
733
}
 
734
 
 
735
 
 
736
/*
 
737
  Lock IO cache and wait for all other threads to join.
 
738
 
 
739
  SYNOPSIS
 
740
    lock_io_cache()
 
741
      cache                     The cache of the thread entering the lock.
 
742
      pos                       File position of the block to read.
 
743
                                Unused for the write thread.
 
744
 
 
745
  DESCRIPTION
 
746
 
 
747
    Wait for all threads to finish with the current buffer. We want
 
748
    all threads to proceed in concert. The last thread to join
 
749
    lock_io_cache() will read the block from file and all threads start
 
750
    to use it. Then they will join again for reading the next block.
 
751
 
 
752
    The waiting threads detect a fresh buffer by comparing
 
753
    cshare->pos_in_file with the position they want to process next.
 
754
    Since the first block may start at position 0, we take
 
755
    cshare->read_end as an additional condition. This variable is
 
756
    initialized to NULL and will be set after a block of data is written
 
757
    to the buffer.
 
758
 
 
759
  RETURN
 
760
    1           OK, lock in place, go ahead and read.
 
761
    0           OK, unlocked, another thread did the read.
 
762
*/
 
763
 
 
764
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
 
765
{
 
766
  IO_CACHE_SHARE *cshare= cache->share;
 
767
  DBUG_ENTER("lock_io_cache");
 
768
 
 
769
  /* Enter the lock. */
 
770
  pthread_mutex_lock(&cshare->mutex);
 
771
  cshare->running_threads--;
 
772
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
 
773
                                (cache == cshare->source_cache) ?
 
774
                                "writer" : "reader", (long) cache, (ulong) pos,
 
775
                                cshare->running_threads));
 
776
 
 
777
  if (cshare->source_cache)
 
778
  {
 
779
    /* A write cache is synchronized to the read caches. */
 
780
 
 
781
    if (cache == cshare->source_cache)
 
782
    {
 
783
      /* The writer waits until all readers are here. */
 
784
      while (cshare->running_threads)
 
785
      {
 
786
        DBUG_PRINT("io_cache_share", ("writer waits in lock"));
 
787
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
 
788
      }
 
789
      DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
 
790
 
 
791
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
792
      DBUG_RETURN(1);
 
793
    }
 
794
 
 
795
    /* The last thread wakes the writer. */
 
796
    if (!cshare->running_threads)
 
797
    {
 
798
      DBUG_PRINT("io_cache_share", ("waking writer"));
 
799
      pthread_cond_signal(&cshare->cond_writer);
 
800
    }
 
801
 
 
802
    /*
 
803
      Readers wait until the data is copied from the writer. Another
 
804
      reason to stop waiting is the removal of the write thread. If this
 
805
      happens, we leave the lock with old data in the buffer.
 
806
    */
 
807
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
808
           cshare->source_cache)
 
809
    {
 
810
      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
 
811
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
812
    }
 
813
 
 
814
    /*
 
815
      If the writer was removed from the share while this thread was
 
816
      asleep, we need to simulate an EOF condition. The writer cannot
 
817
      reset the share variables as they might still be in use by readers
 
818
      of the last block. When we awake here then because the last
 
819
      joining thread signalled us. If the writer is not the last, it
 
820
      will not signal. So it is safe to clear the buffer here.
 
821
    */
 
822
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
823
    {
 
824
      DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
 
825
      cshare->read_end= cshare->buffer; /* Empty buffer. */
 
826
      cshare->error= 0; /* EOF is not an error. */
 
827
    }
 
828
  }
 
829
  else
 
830
  {
 
831
    /*
 
832
      There are read caches only. The last thread arriving in
 
833
      lock_io_cache() continues with a locked cache and reads the block.
 
834
    */
 
835
    if (!cshare->running_threads)
 
836
    {
 
837
      DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
 
838
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
839
      DBUG_RETURN(1);
 
840
    }
 
841
 
 
842
    /*
 
843
      All other threads wait until the requested block is read by the
 
844
      last thread arriving. Another reason to stop waiting is the
 
845
      removal of a thread. If this leads to all threads being in the
 
846
      lock, we have to continue also. The first of the awaken threads
 
847
      will then do the read.
 
848
    */
 
849
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
850
           cshare->running_threads)
 
851
    {
 
852
      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
 
853
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
854
    }
 
855
 
 
856
    /* If the block is not yet read, continue with a locked cache and read. */
 
857
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
858
    {
 
859
      DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
 
860
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
861
      DBUG_RETURN(1);
 
862
    }
 
863
 
 
864
    /* Another thread did read the block already. */
 
865
  }
 
866
  DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
 
867
                                (uint) (cshare->read_end ? (size_t)
 
868
                                        (cshare->read_end - cshare->buffer) :
 
869
                                        0)));
 
870
 
 
871
  /*
 
872
    Leave the lock. Do not call unlock_io_cache() later. The thread that
 
873
    filled the buffer did this and marked all threads as running.
 
874
  */
 
875
  pthread_mutex_unlock(&cshare->mutex);
 
876
  DBUG_RETURN(0);
 
877
}
 
878
 
 
879
 
 
880
/*
 
881
  Unlock IO cache.
 
882
 
 
883
  SYNOPSIS
 
884
    unlock_io_cache()
 
885
      cache                     The cache of the thread leaving the lock.
 
886
 
 
887
  NOTE
 
888
    This is called by the thread that filled the buffer. It marks all
 
889
    threads as running and awakes them. This must not be done by any
 
890
    other thread.
 
891
 
 
892
    Do not signal cond_writer. Either there is no writer or the writer
 
893
    is the only one who can call this function.
 
894
 
 
895
    The reason for resetting running_threads to total_threads before
 
896
    waking all other threads is that it could be possible that this
 
897
    thread is so fast with processing the buffer that it enters the lock
 
898
    before even one other thread has left it. If every awoken thread
 
899
    would increase running_threads by one, this thread could think that
 
900
    he is again the last to join and would not wait for the other
 
901
    threads to process the data.
 
902
 
 
903
  RETURN
 
904
    void
 
905
*/
 
906
 
 
907
static void unlock_io_cache(IO_CACHE *cache)
 
908
{
 
909
  IO_CACHE_SHARE *cshare= cache->share;
 
910
  DBUG_ENTER("unlock_io_cache");
 
911
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
 
912
                                (cache == cshare->source_cache) ?
 
913
                                "writer" : "reader",
 
914
                                (long) cache, (ulong) cshare->pos_in_file,
 
915
                                cshare->total_threads));
 
916
 
 
917
  cshare->running_threads= cshare->total_threads;
 
918
  pthread_cond_broadcast(&cshare->cond);
 
919
  pthread_mutex_unlock(&cshare->mutex);
 
920
  DBUG_VOID_RETURN;
 
921
}
 
922
 
 
923
 
 
924
/*
 
925
  Read from IO_CACHE when it is shared between several threads.
 
926
 
 
927
  SYNOPSIS
 
928
    _my_b_read_r()
 
929
      cache                     IO_CACHE pointer
 
930
      Buffer                    Buffer to retrieve count bytes from file
 
931
      Count                     Number of bytes to read into Buffer
 
932
 
 
933
  NOTE
 
934
    This function is only called from the my_b_read() macro when there
 
935
    isn't enough characters in the buffer to satisfy the request.
 
936
 
 
937
  IMPLEMENTATION
 
938
 
 
939
    It works as follows: when a thread tries to read from a file (that
 
940
    is, after using all the data from the (shared) buffer), it just
 
941
    hangs on lock_io_cache(), waiting for other threads. When the very
 
942
    last thread attempts a read, lock_io_cache() returns 1, the thread
 
943
    does actual IO and unlock_io_cache(), which signals all the waiting
 
944
    threads that data is in the buffer.
 
945
 
 
946
  WARNING
 
947
 
 
948
    When changing this function, be careful with handling file offsets
 
949
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
950
    types than my_off_t unless you can be sure that their value fits.
 
951
    Same applies to differences of file offsets. (Bug #11527)
 
952
 
 
953
    When changing this function, check _my_b_read(). It might need the
 
954
    same change.
 
955
 
 
956
  RETURN
 
957
    0      we succeeded in reading all data
 
958
    1      Error: can't read requested characters
 
959
*/
 
960
 
 
961
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
 
962
{
 
963
  my_off_t pos_in_file;
 
964
  size_t length, diff_length, left_length;
 
965
  IO_CACHE_SHARE *cshare= cache->share;
 
966
  DBUG_ENTER("_my_b_read_r");
 
967
 
 
968
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
 
969
  {
 
970
    DBUG_ASSERT(Count >= left_length);  /* User is not using my_b_read() */
 
971
    memcpy(Buffer, cache->read_pos, left_length);
 
972
    Buffer+= left_length;
 
973
    Count-= left_length;
 
974
  }
 
975
  while (Count)
 
976
  {
 
977
    size_t cnt, len;
 
978
 
 
979
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
 
980
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
981
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
 
982
    length= ((length <= cache->read_length) ?
 
983
             length + IO_ROUND_DN(cache->read_length - length) :
 
984
             length - IO_ROUND_UP(length - cache->read_length));
 
985
    if (cache->type != READ_FIFO &&
 
986
        (length > (cache->end_of_file - pos_in_file)))
 
987
      length= (size_t) (cache->end_of_file - pos_in_file);
 
988
    if (length == 0)
 
989
    {
 
990
      cache->error= (int) left_length;
 
991
      DBUG_RETURN(1);
 
992
    }
 
993
    if (lock_io_cache(cache, pos_in_file))
 
994
    {
 
995
      /* With a synchronized write/read cache we won't come here... */
 
996
      DBUG_ASSERT(!cshare->source_cache);
 
997
      /*
 
998
        ... unless the writer has gone before this thread entered the
 
999
        lock. Simulate EOF in this case. It can be distinguished by
 
1000
        cache->file.
 
1001
      */
 
1002
      if (cache->file < 0)
 
1003
        len= 0;
 
1004
      else
 
1005
      {
 
1006
        /*
 
1007
          Whenever a function which operates on IO_CACHE flushes/writes
 
1008
          some part of the IO_CACHE to disk it will set the property
 
1009
          "seek_not_done" to indicate this to other functions operating
 
1010
          on the IO_CACHE.
 
1011
        */
 
1012
        if (cache->seek_not_done)
 
1013
        {
 
1014
          if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
 
1015
              == MY_FILEPOS_ERROR)
 
1016
          {
 
1017
            cache->error= -1;
 
1018
            unlock_io_cache(cache);
 
1019
            DBUG_RETURN(1);
 
1020
          }
 
1021
        }
 
1022
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
 
1023
      }
 
1024
      DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
 
1025
 
 
1026
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
 
1027
      cache->error=       (len == length ? 0 : (int) len);
 
1028
      cache->pos_in_file= pos_in_file;
 
1029
 
 
1030
      /* Copy important values to the share. */
 
1031
      cshare->error=       cache->error;
 
1032
      cshare->read_end=    cache->read_end;
 
1033
      cshare->pos_in_file= pos_in_file;
 
1034
 
 
1035
      /* Mark all threads as running and wake them. */
 
1036
      unlock_io_cache(cache);
 
1037
    }
 
1038
    else
 
1039
    {
 
1040
      /*
 
1041
        With a synchronized write/read cache readers always come here.
 
1042
        Copy important values from the share.
 
1043
      */
 
1044
      cache->error=       cshare->error;
 
1045
      cache->read_end=    cshare->read_end;
 
1046
      cache->pos_in_file= cshare->pos_in_file;
 
1047
 
 
1048
      len= ((cache->error == -1) ? (size_t) -1 :
 
1049
            (size_t) (cache->read_end - cache->buffer));
 
1050
    }
 
1051
    cache->read_pos=      cache->buffer;
 
1052
    cache->seek_not_done= 0;
 
1053
    if (len == 0 || len == (size_t) -1)
 
1054
    {
 
1055
      DBUG_PRINT("io_cache_share", ("reader error. len %lu  left %lu",
 
1056
                                    (ulong) len, (ulong) left_length));
 
1057
      cache->error= (int) left_length;
 
1058
      DBUG_RETURN(1);
 
1059
    }
 
1060
    cnt= (len > Count) ? Count : len;
 
1061
    memcpy(Buffer, cache->read_pos, cnt);
 
1062
    Count -= cnt;
 
1063
    Buffer+= cnt;
 
1064
    left_length+= cnt;
 
1065
    cache->read_pos+= cnt;
 
1066
  }
 
1067
  DBUG_RETURN(0);
 
1068
}
 
1069
 
 
1070
 
 
1071
/*
 
1072
  Copy data from write cache to read cache.
 
1073
 
 
1074
  SYNOPSIS
 
1075
    copy_to_read_buffer()
 
1076
      write_cache               The write cache.
 
1077
      write_buffer              The source of data, mostly the cache buffer.
 
1078
      write_length              The number of bytes to copy.
 
1079
 
 
1080
  NOTE
 
1081
    The write thread will wait for all read threads to join the cache
 
1082
    lock. Then it copies the data over and wakes the read threads.
 
1083
 
 
1084
  RETURN
 
1085
    void
 
1086
*/
 
1087
 
 
1088
static void copy_to_read_buffer(IO_CACHE *write_cache,
 
1089
                                const uchar *write_buffer, size_t write_length)
 
1090
{
 
1091
  IO_CACHE_SHARE *cshare= write_cache->share;
 
1092
 
 
1093
  DBUG_ASSERT(cshare->source_cache == write_cache);
 
1094
  /*
 
1095
    write_length is usually less or equal to buffer_length.
 
1096
    It can be bigger if _my_b_write() is called with a big length.
 
1097
  */
 
1098
  while (write_length)
 
1099
  {
 
1100
    size_t copy_length= min(write_length, write_cache->buffer_length);
 
1101
    int  __attribute__((unused)) rc;
 
1102
 
 
1103
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
 
1104
    /* The writing thread does always have the lock when it awakes. */
 
1105
    DBUG_ASSERT(rc);
 
1106
 
 
1107
    memcpy(cshare->buffer, write_buffer, copy_length);
 
1108
 
 
1109
    cshare->error=       0;
 
1110
    cshare->read_end=    cshare->buffer + copy_length;
 
1111
    cshare->pos_in_file= write_cache->pos_in_file;
 
1112
 
 
1113
    /* Mark all threads as running and wake them. */
 
1114
    unlock_io_cache(write_cache);
 
1115
 
 
1116
    write_buffer+= copy_length;
 
1117
    write_length-= copy_length;
 
1118
  }
 
1119
}
 
1120
#endif /*THREAD*/
 
1121
 
 
1122
 
 
1123
/*
 
1124
  Do sequential read from the SEQ_READ_APPEND cache.
 
1125
  
 
1126
  We do this in three stages:
 
1127
   - first read from info->buffer
 
1128
   - then if there are still data to read, try the file descriptor
 
1129
   - afterwards, if there are still data to read, try append buffer
 
1130
 
 
1131
  RETURNS
 
1132
    0  Success
 
1133
    1  Failed to read
 
1134
*/
 
1135
 
 
1136
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
1137
{
 
1138
  size_t length, diff_length, left_length, save_count, max_length;
 
1139
  my_off_t pos_in_file;
 
1140
  save_count=Count;
 
1141
 
 
1142
  /* first, read the regular buffer */
 
1143
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
 
1144
  {
 
1145
    DBUG_ASSERT(Count > left_length);   /* User is not using my_b_read() */
 
1146
    memcpy(Buffer,info->read_pos, left_length);
 
1147
    Buffer+=left_length;
 
1148
    Count-=left_length;
 
1149
  }
 
1150
  lock_append_buffer(info);
 
1151
 
 
1152
  /* pos_in_file always point on where info->buffer was read */
 
1153
  if ((pos_in_file=info->pos_in_file +
 
1154
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
 
1155
    goto read_append_buffer;
 
1156
 
 
1157
  /*
 
1158
    With read-append cache we must always do a seek before we read,
 
1159
    because the write could have moved the file pointer astray
 
1160
  */
 
1161
  if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
 
1162
  {
 
1163
   info->error= -1;
 
1164
   unlock_append_buffer(info);
 
1165
   return (1);
 
1166
  }
 
1167
  info->seek_not_done=0;
 
1168
 
 
1169
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
1170
 
 
1171
  /* now the second stage begins - read from file descriptor */
 
1172
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
 
1173
  {
 
1174
    /* Fill first intern buffer */
 
1175
    size_t read_length;
 
1176
 
 
1177
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
1178
    if ((read_length= my_read(info->file,Buffer, length,
 
1179
                              info->myflags)) == (size_t) -1)
 
1180
    {
 
1181
      info->error= -1;
 
1182
      unlock_append_buffer(info);
 
1183
      return 1;
 
1184
    }
 
1185
    Count-=read_length;
 
1186
    Buffer+=read_length;
 
1187
    pos_in_file+=read_length;
 
1188
 
 
1189
    if (read_length != length)
 
1190
    {
 
1191
      /*
 
1192
        We only got part of data;  Read the rest of the data from the
 
1193
        write buffer
 
1194
      */
 
1195
      goto read_append_buffer;
 
1196
    }
 
1197
    left_length+=length;
 
1198
    diff_length=0;
 
1199
  }
 
1200
 
 
1201
  max_length= info->read_length-diff_length;
 
1202
  if (max_length > (info->end_of_file - pos_in_file))
 
1203
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
1204
  if (!max_length)
 
1205
  {
 
1206
    if (Count)
 
1207
      goto read_append_buffer;
 
1208
    length=0;                           /* Didn't read any more chars */
 
1209
  }
 
1210
  else
 
1211
  {
 
1212
    length= my_read(info->file,info->buffer, max_length, info->myflags);
 
1213
    if (length == (size_t) -1)
 
1214
    {
 
1215
      info->error= -1;
 
1216
      unlock_append_buffer(info);
 
1217
      return 1;
 
1218
    }
 
1219
    if (length < Count)
 
1220
    {
 
1221
      memcpy(Buffer, info->buffer, length);
 
1222
      Count -= length;
 
1223
      Buffer += length;
 
1224
 
 
1225
      /*
 
1226
         added the line below to make
 
1227
         DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
 
1228
         otherwise this does not appear to be needed
 
1229
      */
 
1230
      pos_in_file += length;
 
1231
      goto read_append_buffer;
 
1232
    }
 
1233
  }
 
1234
  unlock_append_buffer(info);
 
1235
  info->read_pos=info->buffer+Count;
 
1236
  info->read_end=info->buffer+length;
 
1237
  info->pos_in_file=pos_in_file;
 
1238
  memcpy(Buffer,info->buffer,(size_t) Count);
 
1239
  return 0;
 
1240
 
 
1241
read_append_buffer:
 
1242
 
 
1243
  /*
 
1244
     Read data from the current write buffer.
 
1245
     Count should never be == 0 here (The code will work even if count is 0)
 
1246
  */
 
1247
 
 
1248
  {
 
1249
    /* First copy the data to Count */
 
1250
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
 
1251
    size_t copy_len;
 
1252
    size_t transfer_len;
 
1253
 
 
1254
    DBUG_ASSERT(info->append_read_pos <= info->write_pos);
 
1255
    /*
 
1256
      TODO: figure out if the assert below is needed or correct.
 
1257
    */
 
1258
    DBUG_ASSERT(pos_in_file == info->end_of_file);
 
1259
    copy_len=min(Count, len_in_buff);
 
1260
    memcpy(Buffer, info->append_read_pos, copy_len);
 
1261
    info->append_read_pos += copy_len;
 
1262
    Count -= copy_len;
 
1263
    if (Count)
 
1264
      info->error = save_count - Count;
 
1265
 
 
1266
    /* Fill read buffer with data from write buffer */
 
1267
    memcpy(info->buffer, info->append_read_pos,
 
1268
           (size_t) (transfer_len=len_in_buff - copy_len));
 
1269
    info->read_pos= info->buffer;
 
1270
    info->read_end= info->buffer+transfer_len;
 
1271
    info->append_read_pos=info->write_pos;
 
1272
    info->pos_in_file=pos_in_file+copy_len;
 
1273
    info->end_of_file+=len_in_buff;
 
1274
  }
 
1275
  unlock_append_buffer(info);
 
1276
  return Count ? 1 : 0;
 
1277
}
 
1278
 
 
1279
 
 
1280
#ifdef HAVE_AIOWAIT
 
1281
 
 
1282
/*
 
1283
  Read from the IO_CACHE into a buffer and feed asynchronously
 
1284
  from disk when needed.
 
1285
 
 
1286
  SYNOPSIS
 
1287
    _my_b_async_read()
 
1288
      info                      IO_CACHE pointer
 
1289
      Buffer                    Buffer to retrieve count bytes from file
 
1290
      Count                     Number of bytes to read into Buffer
 
1291
 
 
1292
  RETURN VALUE
 
1293
    -1          An error has occurred; my_errno is set.
 
1294
     0          Success
 
1295
     1          An error has occurred; IO_CACHE to error state.
 
1296
*/
 
1297
 
 
1298
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
1299
{
 
1300
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
 
1301
  size_t max_length;
 
1302
  my_off_t next_pos_in_file;
 
1303
  uchar *read_buffer;
 
1304
 
 
1305
  memcpy(Buffer,info->read_pos,
 
1306
         (left_length= (size_t) (info->read_end-info->read_pos)));
 
1307
  Buffer+=left_length;
 
1308
  org_Count=Count;
 
1309
  Count-=left_length;
 
1310
 
 
1311
  if (info->inited)
 
1312
  {                                             /* wait for read block */
 
1313
    info->inited=0;                             /* No more block to read */
 
1314
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
 
1315
    if (info->aio_result.result.aio_errno)
 
1316
    {
 
1317
      if (info->myflags & MY_WME)
 
1318
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
 
1319
                 my_filename(info->file),
 
1320
                 info->aio_result.result.aio_errno);
 
1321
      my_errno=info->aio_result.result.aio_errno;
 
1322
      info->error= -1;
 
1323
      return(1);
 
1324
    }
 
1325
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
 
1326
        read_length == (size_t) -1)
 
1327
    {
 
1328
      my_errno=0;                               /* For testing */
 
1329
      info->error= (read_length == (size_t) -1 ? -1 :
 
1330
                    (int) (read_length+left_length));
 
1331
      return(1);
 
1332
    }
 
1333
    info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
 
1334
 
 
1335
    if (info->request_pos != info->buffer)
 
1336
      info->request_pos=info->buffer;
 
1337
    else
 
1338
      info->request_pos=info->buffer+info->read_length;
 
1339
    info->read_pos=info->request_pos;
 
1340
    next_pos_in_file=info->aio_read_pos+read_length;
 
1341
 
 
1342
        /* Check if pos_in_file is changed
 
1343
           (_ni_read_cache may have skipped some bytes) */
 
1344
 
 
1345
    if (info->aio_read_pos < info->pos_in_file)
 
1346
    {                                           /* Fix if skipped bytes */
 
1347
      if (info->aio_read_pos + read_length < info->pos_in_file)
 
1348
      {
 
1349
        read_length=0;                          /* Skip block */
 
1350
        next_pos_in_file=info->pos_in_file;
 
1351
      }
 
1352
      else
 
1353
      {
 
1354
        my_off_t offset= (info->pos_in_file - info->aio_read_pos);
 
1355
        info->pos_in_file=info->aio_read_pos; /* Whe are here */
 
1356
        info->read_pos=info->request_pos+offset;
 
1357
        read_length-=offset;                    /* Bytes left from read_pos */
 
1358
      }
 
1359
    }
 
1360
#ifndef DBUG_OFF
 
1361
    if (info->aio_read_pos > info->pos_in_file)
 
1362
    {
 
1363
      my_errno=EINVAL;
 
1364
      return(info->read_length= (size_t) -1);
 
1365
    }
 
1366
#endif
 
1367
        /* Copy found bytes to buffer */
 
1368
    length=min(Count,read_length);
 
1369
    memcpy(Buffer,info->read_pos,(size_t) length);
 
1370
    Buffer+=length;
 
1371
    Count-=length;
 
1372
    left_length+=length;
 
1373
    info->read_end=info->rc_pos+read_length;
 
1374
    info->read_pos+=length;
 
1375
  }
 
1376
  else
 
1377
    next_pos_in_file=(info->pos_in_file+ (size_t)
 
1378
                      (info->read_end - info->request_pos));
 
1379
 
 
1380
        /* If reading large blocks, or first read or read with skip */
 
1381
  if (Count)
 
1382
  {
 
1383
    if (next_pos_in_file == info->end_of_file)
 
1384
    {
 
1385
      info->error=(int) (read_length+left_length);
 
1386
      return 1;
 
1387
    }
 
1388
    
 
1389
    if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
 
1390
        == MY_FILEPOS_ERROR)
 
1391
    {
 
1392
      info->error= -1;
 
1393
      return (1);
 
1394
    }
 
1395
 
 
1396
    read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
 
1397
    if (Count < read_length)
 
1398
    {                                   /* Small block, read to cache */
 
1399
      if ((read_length=my_read(info->file,info->request_pos,
 
1400
                               read_length, info->myflags)) == (size_t) -1)
 
1401
        return info->error= -1;
 
1402
      use_length=min(Count,read_length);
 
1403
      memcpy(Buffer,info->request_pos,(size_t) use_length);
 
1404
      info->read_pos=info->request_pos+Count;
 
1405
      info->read_end=info->request_pos+read_length;
 
1406
      info->pos_in_file=next_pos_in_file;       /* Start of block in cache */
 
1407
      next_pos_in_file+=read_length;
 
1408
 
 
1409
      if (Count != use_length)
 
1410
      {                                 /* Didn't find hole block */
 
1411
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
 
1412
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
 
1413
                   my_filename(info->file),my_errno);
 
1414
        info->error=(int) (read_length+left_length);
 
1415
        return 1;
 
1416
      }
 
1417
    }
 
1418
    else
 
1419
    {                                           /* Big block, don't cache it */
 
1420
      if ((read_length= my_read(info->file,Buffer, Count,info->myflags))
 
1421
          != Count)
 
1422
      {
 
1423
        info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
 
1424
        return 1;
 
1425
      }
 
1426
      info->read_pos=info->read_end=info->request_pos;
 
1427
      info->pos_in_file=(next_pos_in_file+=Count);
 
1428
    }
 
1429
  }
 
1430
 
 
1431
  /* Read next block with asyncronic io */
 
1432
  diff_length=(next_pos_in_file & (IO_SIZE-1));
 
1433
  max_length= info->read_length - diff_length;
 
1434
  if (max_length > info->end_of_file - next_pos_in_file)
 
1435
    max_length= (size_t) (info->end_of_file - next_pos_in_file);
 
1436
 
 
1437
  if (info->request_pos != info->buffer)
 
1438
    read_buffer=info->buffer;
 
1439
  else
 
1440
    read_buffer=info->buffer+info->read_length;
 
1441
  info->aio_read_pos=next_pos_in_file;
 
1442
  if (max_length)
 
1443
  {
 
1444
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
 
1445
    DBUG_PRINT("aioread",("filepos: %ld  length: %lu",
 
1446
                          (ulong) next_pos_in_file, (ulong) max_length));
 
1447
    if (aioread(info->file,read_buffer, max_length,
 
1448
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
 
1449
                &info->aio_result.result))
 
1450
    {                                           /* Skip async io */
 
1451
      my_errno=errno;
 
1452
      DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
 
1453
                          errno, info->aio_result.result.aio_errno));
 
1454
      if (info->request_pos != info->buffer)
 
1455
      {
 
1456
        bmove(info->buffer,info->request_pos,
 
1457
              (size_t) (info->read_end - info->read_pos));
 
1458
        info->request_pos=info->buffer;
 
1459
        info->read_pos-=info->read_length;
 
1460
        info->read_end-=info->read_length;
 
1461
      }
 
1462
      info->read_length=info->buffer_length;    /* Use hole buffer */
 
1463
      info->read_function=_my_b_read;           /* Use normal IO_READ next */
 
1464
    }
 
1465
    else
 
1466
      info->inited=info->aio_result.pending=1;
 
1467
  }
 
1468
  return 0;                                     /* Block read, async in use */
 
1469
} /* _my_b_async_read */
 
1470
#endif
 
1471
 
 
1472
 
 
1473
/* Read one byte when buffer is empty */
 
1474
 
 
1475
int _my_b_get(IO_CACHE *info)
 
1476
{
 
1477
  uchar buff;
 
1478
  IO_CACHE_CALLBACK pre_read,post_read;
 
1479
  if ((pre_read = info->pre_read))
 
1480
    (*pre_read)(info);
 
1481
  if ((*(info)->read_function)(info,&buff,1))
 
1482
    return my_b_EOF;
 
1483
  if ((post_read = info->post_read))
 
1484
    (*post_read)(info);
 
1485
  return (int) (uchar) buff;
 
1486
}
 
1487
 
 
1488
/* 
 
1489
   Write a byte buffer to IO_CACHE and flush to disk
 
1490
   if IO_CACHE is full.
 
1491
 
 
1492
   RETURN VALUE
 
1493
    1 On error on write
 
1494
    0 On success
 
1495
   -1 On error; my_errno contains error code.
 
1496
*/
 
1497
 
 
1498
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1499
{
 
1500
  size_t rest_length,length;
 
1501
 
 
1502
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
 
1503
  {
 
1504
    my_errno=errno=EFBIG;
 
1505
    return info->error = -1;
 
1506
  }
 
1507
 
 
1508
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1509
  memcpy(info->write_pos,Buffer,(size_t) rest_length);
 
1510
  Buffer+=rest_length;
 
1511
  Count-=rest_length;
 
1512
  info->write_pos+=rest_length;
 
1513
 
 
1514
  if (my_b_flush_io_cache(info,1))
 
1515
    return 1;
 
1516
  if (Count >= IO_SIZE)
 
1517
  {                                     /* Fill first intern buffer */
 
1518
    length=Count & (size_t) ~(IO_SIZE-1);
 
1519
    if (info->seek_not_done)
 
1520
    {
 
1521
      /*
 
1522
        Whenever a function which operates on IO_CACHE flushes/writes
 
1523
        some part of the IO_CACHE to disk it will set the property
 
1524
        "seek_not_done" to indicate this to other functions operating
 
1525
        on the IO_CACHE.
 
1526
      */
 
1527
      if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
 
1528
      {
 
1529
        info->error= -1;
 
1530
        return (1);
 
1531
      }
 
1532
      info->seek_not_done=0;
 
1533
    }
 
1534
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
 
1535
      return info->error= -1;
 
1536
 
 
1537
#ifdef THREAD
 
1538
    /*
 
1539
      In case of a shared I/O cache with a writer we normally do direct
 
1540
      write cache to read cache copy. Simulate this here by direct
 
1541
      caller buffer to read cache copy. Do it after the write so that
 
1542
      the cache readers actions on the flushed part can go in parallel
 
1543
      with the write of the extra stuff. copy_to_read_buffer()
 
1544
      synchronizes writer and readers so that after this call the
 
1545
      readers can act on the extra stuff while the writer can go ahead
 
1546
      and prepare the next output. copy_to_read_buffer() relies on
 
1547
      info->pos_in_file.
 
1548
    */
 
1549
    if (info->share)
 
1550
      copy_to_read_buffer(info, Buffer, length);
 
1551
#endif
 
1552
 
 
1553
    Count-=length;
 
1554
    Buffer+=length;
 
1555
    info->pos_in_file+=length;
 
1556
  }
 
1557
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1558
  info->write_pos+=Count;
 
1559
  return 0;
 
1560
}
 
1561
 
 
1562
 
 
1563
/*
 
1564
  Append a block to the write buffer.
 
1565
  This is done with the buffer locked to ensure that we don't read from
 
1566
  the write buffer before we are ready with it.
 
1567
*/
 
1568
 
 
1569
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1570
{
 
1571
  size_t rest_length,length;
 
1572
 
 
1573
#ifdef THREAD
 
1574
  /*
 
1575
    Assert that we cannot come here with a shared cache. If we do one
 
1576
    day, we might need to add a call to copy_to_read_buffer().
 
1577
  */
 
1578
  DBUG_ASSERT(!info->share);
 
1579
#endif
 
1580
 
 
1581
  lock_append_buffer(info);
 
1582
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1583
  if (Count <= rest_length)
 
1584
    goto end;
 
1585
  memcpy(info->write_pos, Buffer, rest_length);
 
1586
  Buffer+=rest_length;
 
1587
  Count-=rest_length;
 
1588
  info->write_pos+=rest_length;
 
1589
  if (my_b_flush_io_cache(info,0))
 
1590
  {
 
1591
    unlock_append_buffer(info);
 
1592
    return 1;
 
1593
  }
 
1594
  if (Count >= IO_SIZE)
 
1595
  {                                     /* Fill first intern buffer */
 
1596
    length=Count & (size_t) ~(IO_SIZE-1);
 
1597
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
 
1598
    {
 
1599
      unlock_append_buffer(info);
 
1600
      return info->error= -1;
 
1601
    }
 
1602
    Count-=length;
 
1603
    Buffer+=length;
 
1604
    info->end_of_file+=length;
 
1605
  }
 
1606
 
 
1607
end:
 
1608
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1609
  info->write_pos+=Count;
 
1610
  unlock_append_buffer(info);
 
1611
  return 0;
 
1612
}
 
1613
 
 
1614
 
 
1615
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1616
{
 
1617
  /*
 
1618
    Sasha: We are not writing this with the ? operator to avoid hitting
 
1619
    a possible compiler bug. At least gcc 2.95 cannot deal with 
 
1620
    several layers of ternary operators that evaluated comma(,) operator
 
1621
    expressions inside - I do have a test case if somebody wants it
 
1622
  */
 
1623
  if (info->type == SEQ_READ_APPEND)
 
1624
    return my_b_append(info, Buffer, Count);
 
1625
  return my_b_write(info, Buffer, Count);
 
1626
}
 
1627
 
 
1628
 
 
1629
/*
 
1630
  Write a block to disk where part of the data may be inside the record
 
1631
  buffer.  As all write calls to the data goes through the cache,
 
1632
  we will never get a seek over the end of the buffer
 
1633
*/
 
1634
 
 
1635
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
 
1636
                   my_off_t pos)
 
1637
{
 
1638
  size_t length;
 
1639
  int error=0;
 
1640
 
 
1641
#ifdef THREAD
 
1642
  /*
 
1643
    Assert that we cannot come here with a shared cache. If we do one
 
1644
    day, we might need to add a call to copy_to_read_buffer().
 
1645
  */
 
1646
  DBUG_ASSERT(!info->share);
 
1647
#endif
 
1648
 
 
1649
  if (pos < info->pos_in_file)
 
1650
  {
 
1651
    /* Of no overlap, write everything without buffering */
 
1652
    if (pos + Count <= info->pos_in_file)
 
1653
      return my_pwrite(info->file, Buffer, Count, pos,
 
1654
                       info->myflags | MY_NABP);
 
1655
    /* Write the part of the block that is before buffer */
 
1656
    length= (uint) (info->pos_in_file - pos);
 
1657
    if (my_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
 
1658
      info->error= error= -1;
 
1659
    Buffer+=length;
 
1660
    pos+=  length;
 
1661
    Count-= length;
 
1662
#ifndef HAVE_PREAD
 
1663
    info->seek_not_done=1;
 
1664
#endif
 
1665
  }
 
1666
 
 
1667
  /* Check if we want to write inside the used part of the buffer.*/
 
1668
  length= (size_t) (info->write_end - info->buffer);
 
1669
  if (pos < info->pos_in_file + length)
 
1670
  {
 
1671
    size_t offset= (size_t) (pos - info->pos_in_file);
 
1672
    length-=offset;
 
1673
    if (length > Count)
 
1674
      length=Count;
 
1675
    memcpy(info->buffer+offset, Buffer, length);
 
1676
    Buffer+=length;
 
1677
    Count-= length;
 
1678
    /* Fix length of buffer if the new data was larger */
 
1679
    if (info->buffer+length > info->write_pos)
 
1680
      info->write_pos=info->buffer+length;
 
1681
    if (!Count)
 
1682
      return (error);
 
1683
  }
 
1684
  /* Write at the end of the current buffer; This is the normal case */
 
1685
  if (_my_b_write(info, Buffer, Count))
 
1686
    error= -1;
 
1687
  return error;
 
1688
}
 
1689
 
 
1690
 
 
1691
        /* Flush write cache */
 
1692
 
 
1693
#ifdef THREAD
 
1694
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1695
  lock_append_buffer(info);
 
1696
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1697
  unlock_append_buffer(info);
 
1698
#else
 
1699
#define LOCK_APPEND_BUFFER
 
1700
#define UNLOCK_APPEND_BUFFER
 
1701
#endif
 
1702
 
 
1703
 
 
1704
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
 
1705
{
 
1706
  size_t length;
 
1707
  my_bool append_cache;
 
1708
  my_off_t pos_in_file;
 
1709
  DBUG_ENTER("my_b_flush_io_cache");
 
1710
  DBUG_PRINT("enter", ("cache: 0x%lx", (long) info));
 
1711
 
 
1712
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
 
1713
    need_append_buffer_lock=0;
 
1714
 
 
1715
  if (info->type == WRITE_CACHE || append_cache)
 
1716
  {
 
1717
    if (info->file == -1)
 
1718
    {
 
1719
      if (real_open_cached_file(info))
 
1720
        DBUG_RETURN((info->error= -1));
 
1721
    }
 
1722
    LOCK_APPEND_BUFFER;
 
1723
 
 
1724
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
 
1725
    {
 
1726
#ifdef THREAD
 
1727
      /*
 
1728
        In case of a shared I/O cache with a writer we do direct write
 
1729
        cache to read cache copy. Do it before the write here so that
 
1730
        the readers can work in parallel with the write.
 
1731
        copy_to_read_buffer() relies on info->pos_in_file.
 
1732
      */
 
1733
      if (info->share)
 
1734
        copy_to_read_buffer(info, info->write_buffer, length);
 
1735
#endif
 
1736
 
 
1737
      pos_in_file=info->pos_in_file;
 
1738
      /*
 
1739
        If we have append cache, we always open the file with
 
1740
        O_APPEND which moves the pos to EOF automatically on every write
 
1741
      */
 
1742
      if (!append_cache && info->seek_not_done)
 
1743
      {                                 /* File touched, do seek */
 
1744
        if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
 
1745
            MY_FILEPOS_ERROR)
 
1746
        {
 
1747
          UNLOCK_APPEND_BUFFER;
 
1748
          DBUG_RETURN((info->error= -1));
 
1749
        }
 
1750
        if (!append_cache)
 
1751
          info->seek_not_done=0;
 
1752
      }
 
1753
      if (!append_cache)
 
1754
        info->pos_in_file+=length;
 
1755
      info->write_end= (info->write_buffer+info->buffer_length-
 
1756
                        ((pos_in_file+length) & (IO_SIZE-1)));
 
1757
 
 
1758
      if (my_write(info->file,info->write_buffer,length,
 
1759
                   info->myflags | MY_NABP))
 
1760
        info->error= -1;
 
1761
      else
 
1762
        info->error= 0;
 
1763
      if (!append_cache)
 
1764
      {
 
1765
        set_if_bigger(info->end_of_file,(pos_in_file+length));
 
1766
      }
 
1767
      else
 
1768
      {
 
1769
        info->end_of_file+=(info->write_pos-info->append_read_pos);
 
1770
        DBUG_ASSERT(info->end_of_file == my_tell(info->file,MYF(0)));
 
1771
      }
 
1772
 
 
1773
      info->append_read_pos=info->write_pos=info->write_buffer;
 
1774
      ++info->disk_writes;
 
1775
      UNLOCK_APPEND_BUFFER;
 
1776
      DBUG_RETURN(info->error);
 
1777
    }
 
1778
  }
 
1779
#ifdef HAVE_AIOWAIT
 
1780
  else if (info->type != READ_NET)
 
1781
  {
 
1782
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
 
1783
    info->inited=0;
 
1784
  }
 
1785
#endif
 
1786
  UNLOCK_APPEND_BUFFER;
 
1787
  DBUG_RETURN(0);
 
1788
}
 
1789
 
 
1790
/*
 
1791
  Free an IO_CACHE object
 
1792
 
 
1793
  SYNOPSOS
 
1794
    end_io_cache()
 
1795
    info                IO_CACHE Handle to free
 
1796
 
 
1797
  NOTES
 
1798
    It's currently safe to call this if one has called init_io_cache()
 
1799
    on the 'info' object, even if init_io_cache() failed.
 
1800
    This function is also safe to call twice with the same handle.
 
1801
 
 
1802
  RETURN
 
1803
   0  ok
 
1804
   #  Error
 
1805
*/
 
1806
 
 
1807
int end_io_cache(IO_CACHE *info)
 
1808
{
 
1809
  int error=0;
 
1810
  IO_CACHE_CALLBACK pre_close;
 
1811
  DBUG_ENTER("end_io_cache");
 
1812
  DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info));
 
1813
 
 
1814
#ifdef THREAD
 
1815
  /*
 
1816
    Every thread must call remove_io_thread(). The last one destroys
 
1817
    the share elements.
 
1818
  */
 
1819
  DBUG_ASSERT(!info->share || !info->share->total_threads);
 
1820
#endif
 
1821
 
 
1822
  if ((pre_close=info->pre_close))
 
1823
  {
 
1824
    (*pre_close)(info);
 
1825
    info->pre_close= 0;
 
1826
  }
 
1827
  if (info->alloced_buffer)
 
1828
  {
 
1829
    info->alloced_buffer=0;
 
1830
    if (info->file != -1)                       /* File doesn't exist */
 
1831
      error= my_b_flush_io_cache(info,1);
 
1832
    my_free((uchar*) info->buffer,MYF(MY_WME));
 
1833
    info->buffer=info->read_pos=(uchar*) 0;
 
1834
  }
 
1835
  if (info->type == SEQ_READ_APPEND)
 
1836
  {
 
1837
    /* Destroy allocated mutex */
 
1838
    info->type= TYPE_NOT_SET;
 
1839
#ifdef THREAD
 
1840
    pthread_mutex_destroy(&info->append_buffer_lock);
 
1841
#endif
 
1842
  }
 
1843
  DBUG_RETURN(error);
 
1844
} /* end_io_cache */
 
1845
 
 
1846
 
 
1847
/**********************************************************************
 
1848
 Testing of MF_IOCACHE
 
1849
**********************************************************************/
 
1850
 
 
1851
#ifdef MAIN
 
1852
 
 
1853
#include <my_dir.h>
 
1854
 
 
1855
void die(const char* fmt, ...)
 
1856
{
 
1857
  va_list va_args;
 
1858
  va_start(va_args,fmt);
 
1859
  fprintf(stderr,"Error:");
 
1860
  vfprintf(stderr, fmt,va_args);
 
1861
  fprintf(stderr,", errno=%d\n", errno);
 
1862
  exit(1);
 
1863
}
 
1864
 
 
1865
int open_file(const char* fname, IO_CACHE* info, int cache_size)
 
1866
{
 
1867
  int fd;
 
1868
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
 
1869
    die("Could not open %s", fname);
 
1870
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
 
1871
    die("failed in init_io_cache()");
 
1872
  return fd;
 
1873
}
 
1874
 
 
1875
void close_file(IO_CACHE* info)
 
1876
{
 
1877
  end_io_cache(info);
 
1878
  my_close(info->file, MYF(MY_WME));
 
1879
}
 
1880
 
 
1881
int main(int argc, char** argv)
 
1882
{
 
1883
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
 
1884
  MY_STAT status;
 
1885
  const char* fname="/tmp/iocache.test";
 
1886
  int cache_size=16384;
 
1887
  char llstr_buf[22];
 
1888
  int max_block,total_bytes=0;
 
1889
  int i,num_loops=100,error=0;
 
1890
  char *p;
 
1891
  char* block, *block_end;
 
1892
  MY_INIT(argv[0]);
 
1893
  max_block = cache_size*3;
 
1894
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
 
1895
    die("Not enough memory to allocate test block");
 
1896
  block_end = block + max_block;
 
1897
  for (p = block,i=0; p < block_end;i++)
 
1898
  {
 
1899
    *p++ = (char)i;
 
1900
  }
 
1901
  if (my_stat(fname,&status, MYF(0)) &&
 
1902
      my_delete(fname,MYF(MY_WME)))
 
1903
    {
 
1904
      die("Delete of %s failed, aborting", fname);
 
1905
    }
 
1906
  open_file(fname,&sra_cache, cache_size);
 
1907
  for (i = 0; i < num_loops; i++)
 
1908
  {
 
1909
    char buf[4];
 
1910
    int block_size = abs(rand() % max_block);
 
1911
    int4store(buf, block_size);
 
1912
    if (my_b_append(&sra_cache,buf,4) ||
 
1913
        my_b_append(&sra_cache, block, block_size))
 
1914
      die("write failed");
 
1915
    total_bytes += 4+block_size;
 
1916
  }
 
1917
  close_file(&sra_cache);
 
1918
  my_free(block,MYF(MY_WME));
 
1919
  if (!my_stat(fname,&status,MYF(MY_WME)))
 
1920
    die("%s failed to stat, but I had just closed it,\
 
1921
 wonder how that happened");
 
1922
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
 
1923
         llstr(status.st_size,llstr_buf),
 
1924
         total_bytes);
 
1925
  my_delete(fname, MYF(MY_WME));
 
1926
  /* check correctness of tests */
 
1927
  if (total_bytes != status.st_size)
 
1928
  {
 
1929
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
 
1930
supposedly written\n");
 
1931
    error=1;
 
1932
  }
 
1933
  exit(error);
 
1934
  return 0;
 
1935
}
 
1936
#endif