~ubuntu-branches/ubuntu/trusty/mysql-5.6/trusty

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.c

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-02-12 11:54:27 UTC
  • Revision ID: package-import@ubuntu.com-20140212115427-oq6tfsqxl1wuwehi
Tags: upstream-5.6.15
ImportĀ upstreamĀ versionĀ 5.6.15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved.
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
 
15
 
 
16
/*
 
17
  Cashing of files with only does (sequential) read or writes of fixed-
 
18
  length records. A read isn't allowed to go over file-length. A read is ok
 
19
  if it ends at file-length and next read can try to read after file-length
 
20
  (and get a EOF-error).
 
21
  Possibly use of asyncronic io.
 
22
  macros for read and writes for faster io.
 
23
  Used instead of FILE when reading or writing whole files.
 
24
  This code makes mf_rec_cache obsolete (currently only used by ISAM)
 
25
  One can change info->pos_in_file to a higher value to skip bytes in file if
 
26
  also info->read_pos is set to info->read_end.
 
27
  If called through open_cached_file(), then the temporary file will
 
28
  only be created if a write exeeds the file buffer or if one calls
 
29
  my_b_flush_io_cache().
 
30
 
 
31
  If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
 
32
  reading and another for writing.  Reads are first done from disk and
 
33
  then done from the write buffer.  This is an efficient way to read
 
34
  from a log file when one is writing to it at the same time.
 
35
  For this to work, the file has to be opened in append mode!
 
36
  Note that when one uses SEQ_READ_APPEND, one MUST write using
 
37
  my_b_append !  This is needed because we need to lock the mutex
 
38
  every time we access the write buffer.
 
39
 
 
40
TODO:
 
41
  When one SEQ_READ_APPEND and we are reading and writing at the same time,
 
42
  each time the write buffer gets full and it's written to disk, we will
 
43
  always do a disk read to read a part of the buffer from disk to the
 
44
  read buffer.
 
45
  This should be fixed so that when we do a my_b_flush_io_cache() and
 
46
  we have been reading the write buffer, we should transfer the rest of the
 
47
  write buffer to the read buffer before we start to reuse it.
 
48
*/
 
49
 
 
50
#include "mysys_priv.h"
 
51
#include <m_string.h>
 
52
#ifdef HAVE_AIOWAIT
 
53
#include "mysys_err.h"
 
54
static void my_aiowait(my_aio_result *result);
 
55
#endif
 
56
#include <errno.h>
 
57
 
 
58
#define lock_append_buffer(info) \
 
59
  mysql_mutex_lock(&(info)->append_buffer_lock)
 
60
#define unlock_append_buffer(info) \
 
61
  mysql_mutex_unlock(&(info)->append_buffer_lock)
 
62
 
 
63
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
 
64
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
 
65
 
 
66
/*
 
67
  Setup internal pointers inside IO_CACHE
 
68
 
 
69
  SYNOPSIS
 
70
    setup_io_cache()
 
71
    info                IO_CACHE handler
 
72
 
 
73
  NOTES
 
74
    This is called on automaticly on init or reinit of IO_CACHE
 
75
    It must be called externally if one moves or copies an IO_CACHE
 
76
    object.
 
77
*/
 
78
 
 
79
void setup_io_cache(IO_CACHE* info)
 
80
{
 
81
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
 
82
  if (info->type == WRITE_CACHE)
 
83
  {
 
84
    info->current_pos= &info->write_pos;
 
85
    info->current_end= &info->write_end;
 
86
  }
 
87
  else
 
88
  {
 
89
    info->current_pos= &info->read_pos;
 
90
    info->current_end= &info->read_end;
 
91
  }
 
92
}
 
93
 
 
94
 
 
95
static void
 
96
init_functions(IO_CACHE* info)
 
97
{
 
98
  enum cache_type type= info->type;
 
99
  switch (type) {
 
100
  case READ_NET:
 
101
    /*
 
102
      Must be initialized by the caller. The problem is that
 
103
      _my_b_net_read has to be defined in sql directory because of
 
104
      the dependency on THD, and therefore cannot be visible to
 
105
      programs that link against mysys but know nothing about THD, such
 
106
      as myisamchk
 
107
    */
 
108
    break;
 
109
  case SEQ_READ_APPEND:
 
110
    info->read_function = _my_b_seq_read;
 
111
    info->write_function = 0;                   /* Force a core if used */
 
112
    break;
 
113
  default:
 
114
    info->read_function = info->share ? _my_b_read_r : _my_b_read;
 
115
    info->write_function = _my_b_write;
 
116
  }
 
117
 
 
118
  setup_io_cache(info);
 
119
}
 
120
 
 
121
 
 
122
/*
 
123
  Initialize an IO_CACHE object
 
124
 
 
125
  SYNOPSOS
 
126
    init_io_cache()
 
127
    info                cache handler to initialize
 
128
    file                File that should be associated to to the handler
 
129
                        If == -1 then real_open_cached_file()
 
130
                        will be called when it's time to open file.
 
131
    cachesize           Size of buffer to allocate for read/write
 
132
                        If == 0 then use my_default_record_cache_size
 
133
    type                Type of cache
 
134
    seek_offset         Where cache should start reading/writing
 
135
    use_async_io        Set to 1 of we should use async_io (if avaiable)
 
136
    cache_myflags       Bitmap of differnt flags
 
137
                        MY_WME | MY_FAE | MY_NABP | MY_FNABP |
 
138
                        MY_DONT_CHECK_FILESIZE
 
139
 
 
140
  RETURN
 
141
    0  ok
 
142
    #  error
 
143
*/
 
144
 
 
145
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
 
146
                  enum cache_type type, my_off_t seek_offset,
 
147
                  pbool use_async_io, myf cache_myflags)
 
148
{
 
149
  size_t min_cache;
 
150
  my_off_t pos;
 
151
  my_off_t end_of_file= ~(my_off_t) 0;
 
152
  DBUG_ENTER("init_io_cache");
 
153
  DBUG_PRINT("enter",("cache: 0x%lx  type: %d  pos: %ld",
 
154
                      (ulong) info, (int) type, (ulong) seek_offset));
 
155
 
 
156
  info->file= file;
 
157
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
 
158
  info->pos_in_file= seek_offset;
 
159
  info->pre_close = info->pre_read = info->post_read = 0;
 
160
  info->arg = 0;
 
161
  info->alloced_buffer = 0;
 
162
  info->buffer=0;
 
163
  info->seek_not_done= 0;
 
164
 
 
165
  if (file >= 0)
 
166
  {
 
167
    pos= mysql_file_tell(file, MYF(0));
 
168
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
 
169
    {
 
170
      /*
 
171
         This kind of object doesn't support seek() or tell(). Don't set a
 
172
         flag that will make us again try to seek() later and fail.
 
173
      */
 
174
      info->seek_not_done= 0;
 
175
      /*
 
176
        Additionally, if we're supposed to start somewhere other than the
 
177
        the beginning of whatever this file is, then somebody made a bad
 
178
        assumption.
 
179
      */
 
180
      DBUG_ASSERT(seek_offset == 0);
 
181
    }
 
182
    else
 
183
      info->seek_not_done= MY_TEST(seek_offset != pos);
 
184
  }
 
185
 
 
186
  info->disk_writes= 0;
 
187
  info->share=0;
 
188
 
 
189
  if (!cachesize && !(cachesize= my_default_record_cache_size))
 
190
    DBUG_RETURN(1);                             /* No cache requested */
 
191
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
 
192
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
 
193
  {                                             /* Assume file isn't growing */
 
194
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
 
195
    {
 
196
      /* Calculate end of file to avoid allocating oversized buffers */
 
197
      end_of_file= mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0));
 
198
      /* Need to reset seek_not_done now that we just did a seek. */
 
199
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
 
200
      if (end_of_file < seek_offset)
 
201
        end_of_file=seek_offset;
 
202
      /* Trim cache size if the file is very small */
 
203
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
 
204
      {
 
205
        cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
 
206
        use_async_io=0;                         /* No need to use async */
 
207
      }
 
208
    }
 
209
  }
 
210
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
 
211
  if (type != READ_NET && type != WRITE_NET)
 
212
  {
 
213
    /* Retry allocating memory in smaller blocks until we get one */
 
214
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
 
215
    for (;;)
 
216
    {
 
217
      size_t buffer_block;
 
218
      /*
 
219
        Unset MY_WAIT_IF_FULL bit if it is set, to prevent conflict with
 
220
        MY_ZEROFILL.
 
221
      */
 
222
      myf flags= (myf) (cache_myflags & ~(MY_WME | MY_WAIT_IF_FULL));
 
223
 
 
224
      if (cachesize < min_cache)
 
225
        cachesize = min_cache;
 
226
      buffer_block= cachesize;
 
227
      if (type == SEQ_READ_APPEND)
 
228
        buffer_block *= 2;
 
229
      if (cachesize == min_cache)
 
230
        flags|= (myf) MY_WME;
 
231
 
 
232
      if ((info->buffer= (uchar*) my_malloc(buffer_block, flags)) != 0)
 
233
      {
 
234
        info->write_buffer=info->buffer;
 
235
        if (type == SEQ_READ_APPEND)
 
236
          info->write_buffer = info->buffer + cachesize;
 
237
        info->alloced_buffer=1;
 
238
        break;                                  /* Enough memory found */
 
239
      }
 
240
      if (cachesize == min_cache)
 
241
        DBUG_RETURN(2);                         /* Can't alloc cache */
 
242
      /* Try with less memory */
 
243
      cachesize= (cachesize*3/4 & ~(min_cache-1));
 
244
    }
 
245
  }
 
246
 
 
247
  DBUG_PRINT("info",("init_io_cache: cachesize = %lu", (ulong) cachesize));
 
248
  info->read_length=info->buffer_length=cachesize;
 
249
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
 
250
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
 
251
  if (type == SEQ_READ_APPEND)
 
252
  {
 
253
    info->append_read_pos = info->write_pos = info->write_buffer;
 
254
    info->write_end = info->write_buffer + info->buffer_length;
 
255
    mysql_mutex_init(key_IO_CACHE_append_buffer_lock,
 
256
                     &info->append_buffer_lock, MY_MUTEX_INIT_FAST);
 
257
  }
 
258
#if defined(SAFE_MUTEX)
 
259
  else
 
260
  {
 
261
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
 
262
    memset(&info->append_buffer_lock, 0, sizeof(info->append_buffer_lock));
 
263
  }
 
264
#endif
 
265
 
 
266
  if (type == WRITE_CACHE)
 
267
    info->write_end=
 
268
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
 
269
  else
 
270
    info->read_end=info->buffer;                /* Nothing in cache */
 
271
 
 
272
  /* End_of_file may be changed by user later */
 
273
  info->end_of_file= end_of_file;
 
274
  info->error=0;
 
275
  info->type= type;
 
276
  init_functions(info);
 
277
#ifdef HAVE_AIOWAIT
 
278
  if (use_async_io && ! my_disable_async_io)
 
279
  {
 
280
    DBUG_PRINT("info",("Using async io"));
 
281
    info->read_length/=2;
 
282
    info->read_function=_my_b_async_read;
 
283
  }
 
284
  info->inited=info->aio_result.pending=0;
 
285
#endif
 
286
  DBUG_RETURN(0);
 
287
}                                               /* init_io_cache */
 
288
 
 
289
        /* Wait until current request is ready */
 
290
 
 
291
#ifdef HAVE_AIOWAIT
 
292
static void my_aiowait(my_aio_result *result)
 
293
{
 
294
  if (result->pending)
 
295
  {
 
296
    struct aio_result_t *tmp;
 
297
    for (;;)
 
298
    {
 
299
      if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
 
300
      {
 
301
        if (errno == EINTR)
 
302
          continue;
 
303
        DBUG_PRINT("error",("No aio request, error: %d",errno));
 
304
        result->pending=0;                      /* Assume everythings is ok */
 
305
        break;
 
306
      }
 
307
      ((my_aio_result*) tmp)->pending=0;
 
308
      if ((my_aio_result*) tmp == result)
 
309
        break;
 
310
    }
 
311
  }
 
312
  return;
 
313
}
 
314
#endif
 
315
 
 
316
 
 
317
/*
 
318
  Use this to reset cache to re-start reading or to change the type
 
319
  between READ_CACHE <-> WRITE_CACHE
 
320
  If we are doing a reinit of a cache where we have the start of the file
 
321
  in the cache, we are reusing this memory without flushing it to disk.
 
322
*/
 
323
 
 
324
my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
 
325
                        my_off_t seek_offset,
 
326
                        pbool use_async_io __attribute__((unused)),
 
327
                        pbool clear_cache)
 
328
{
 
329
  DBUG_ENTER("reinit_io_cache");
 
330
  DBUG_PRINT("enter",("cache: 0x%lx type: %d  seek_offset: %lu  clear_cache: %d",
 
331
                      (ulong) info, type, (ulong) seek_offset,
 
332
                      (int) clear_cache));
 
333
 
 
334
  /* One can't do reinit with the following types */
 
335
  DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
 
336
              type != WRITE_NET && info->type != WRITE_NET &&
 
337
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
 
338
 
 
339
  /* If the whole file is in memory, avoid flushing to disk */
 
340
  if (! clear_cache &&
 
341
      seek_offset >= info->pos_in_file &&
 
342
      seek_offset <= my_b_tell(info))
 
343
  {
 
344
    /* Reuse current buffer without flushing it to disk */
 
345
    uchar *pos;
 
346
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
347
    {
 
348
      info->read_end=info->write_pos;
 
349
      info->end_of_file=my_b_tell(info);
 
350
      /*
 
351
        Trigger a new seek only if we have a valid
 
352
        file handle.
 
353
      */
 
354
      info->seek_not_done= (info->file != -1);
 
355
    }
 
356
    else if (type == WRITE_CACHE)
 
357
    {
 
358
      if (info->type == READ_CACHE)
 
359
      {
 
360
        info->write_end=info->write_buffer+info->buffer_length;
 
361
        info->seek_not_done=1;
 
362
      }
 
363
      info->end_of_file = ~(my_off_t) 0;
 
364
    }
 
365
    pos=info->request_pos+(seek_offset-info->pos_in_file);
 
366
    if (type == WRITE_CACHE)
 
367
      info->write_pos=pos;
 
368
    else
 
369
      info->read_pos= pos;
 
370
#ifdef HAVE_AIOWAIT
 
371
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
 
372
#endif
 
373
  }
 
374
  else
 
375
  {
 
376
    /*
 
377
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
 
378
      after the current positions should be ignored
 
379
    */
 
380
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
381
      info->end_of_file=my_b_tell(info);
 
382
    /* flush cache if we want to reuse it */
 
383
    if (!clear_cache && my_b_flush_io_cache(info,1))
 
384
      DBUG_RETURN(1);
 
385
    info->pos_in_file=seek_offset;
 
386
    /* Better to do always do a seek */
 
387
    info->seek_not_done=1;
 
388
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
 
389
    if (type == READ_CACHE)
 
390
    {
 
391
      info->read_end=info->buffer;              /* Nothing in cache */
 
392
    }
 
393
    else
 
394
    {
 
395
      info->write_end=(info->buffer + info->buffer_length -
 
396
                       (seek_offset & (IO_SIZE-1)));
 
397
      info->end_of_file= ~(my_off_t) 0;
 
398
    }
 
399
  }
 
400
  info->type=type;
 
401
  info->error=0;
 
402
  init_functions(info);
 
403
 
 
404
#ifdef HAVE_AIOWAIT
 
405
  if (use_async_io && ! my_disable_async_io &&
 
406
      ((ulong) info->buffer_length <
 
407
       (ulong) (info->end_of_file - seek_offset)))
 
408
  {
 
409
    info->read_length=info->buffer_length/2;
 
410
    info->read_function=_my_b_async_read;
 
411
  }
 
412
  info->inited=0;
 
413
#endif
 
414
  DBUG_RETURN(0);
 
415
} /* reinit_io_cache */
 
416
 
 
417
 
 
418
 
 
419
/*
 
420
  Read buffered.
 
421
 
 
422
  SYNOPSIS
 
423
    _my_b_read()
 
424
      info                      IO_CACHE pointer
 
425
      Buffer                    Buffer to retrieve count bytes from file
 
426
      Count                     Number of bytes to read into Buffer
 
427
 
 
428
  NOTE
 
429
    This function is only called from the my_b_read() macro when there
 
430
    isn't enough characters in the buffer to satisfy the request.
 
431
 
 
432
  WARNING
 
433
 
 
434
    When changing this function, be careful with handling file offsets
 
435
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
436
    types than my_off_t unless you can be sure that their value fits.
 
437
    Same applies to differences of file offsets.
 
438
 
 
439
    When changing this function, check _my_b_read_r(). It might need the
 
440
    same change.
 
441
 
 
442
  RETURN
 
443
    0      we succeeded in reading all data
 
444
    1      Error: couldn't read requested characters. In this case:
 
445
             If info->error == -1, we got a read error.
 
446
             Otherwise info->error contains the number of bytes in Buffer.
 
447
*/
 
448
 
 
449
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
450
{
 
451
  size_t length,diff_length,left_length, max_length;
 
452
  my_off_t pos_in_file;
 
453
  DBUG_ENTER("_my_b_read");
 
454
 
 
455
  /* If the buffer is not empty yet, copy what is available. */
 
456
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
 
457
  {
 
458
    DBUG_ASSERT(Count >= left_length);  /* User is not using my_b_read() */
 
459
    memcpy(Buffer,info->read_pos, left_length);
 
460
    Buffer+=left_length;
 
461
    Count-=left_length;
 
462
  }
 
463
 
 
464
  /* pos_in_file always point on where info->buffer was read */
 
465
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
 
466
 
 
467
  /*
 
468
    Whenever a function which operates on IO_CACHE flushes/writes
 
469
    some part of the IO_CACHE to disk it will set the property
 
470
    "seek_not_done" to indicate this to other functions operating
 
471
    on the IO_CACHE.
 
472
  */
 
473
  if (info->seek_not_done)
 
474
  {
 
475
    if ((mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0)) 
 
476
        != MY_FILEPOS_ERROR))
 
477
    {
 
478
      /* No error, reset seek_not_done flag. */
 
479
      info->seek_not_done= 0;
 
480
    }
 
481
    else
 
482
    {
 
483
      /*
 
484
        If the seek failed and the error number is ESPIPE, it is because
 
485
        info->file is a pipe or socket or FIFO.  We never should have tried
 
486
        to seek on that.  See Bugs#25807 and #22828 for more info.
 
487
      */
 
488
      DBUG_ASSERT(my_errno != ESPIPE);
 
489
      info->error= -1;
 
490
      DBUG_RETURN(1);
 
491
    }
 
492
  }
 
493
 
 
494
  /*
 
495
    Calculate, how much we are within a IO_SIZE block. Ideally this
 
496
    should be zero.
 
497
  */
 
498
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
499
 
 
500
  /*
 
501
    If more than a block plus the rest of the current block is wanted,
 
502
    we do read directly, without filling the buffer.
 
503
  */
 
504
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
 
505
  {                                     /* Fill first intern buffer */
 
506
    size_t read_length;
 
507
    if (info->end_of_file <= pos_in_file)
 
508
    {
 
509
      /* End of file. Return, what we did copy from the buffer. */
 
510
      info->error= (int) left_length;
 
511
      DBUG_RETURN(1);
 
512
    }
 
513
    /*
 
514
      Crop the wanted count to a multiple of IO_SIZE and subtract,
 
515
      what we did already read from a block. That way, the read will
 
516
      end aligned with a block.
 
517
    */
 
518
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
519
    if ((read_length= mysql_file_read(info->file,Buffer, length, info->myflags))
 
520
        != length)
 
521
    {
 
522
      /*
 
523
        If we didn't get, what we wanted, we either return -1 for a read
 
524
        error, or (it's end of file), how much we got in total.
 
525
      */
 
526
      info->error= (read_length == (size_t) -1 ? -1 :
 
527
                    (int) (read_length+left_length));
 
528
      DBUG_RETURN(1);
 
529
    }
 
530
    Count-=length;
 
531
    Buffer+=length;
 
532
    pos_in_file+=length;
 
533
    left_length+=length;
 
534
    diff_length=0;
 
535
  }
 
536
 
 
537
  /*
 
538
    At this point, we want less than one and a partial block.
 
539
    We will read a full cache, minus the number of bytes, we are
 
540
    within a block already. So we will reach new alignment.
 
541
  */
 
542
  max_length= info->read_length-diff_length;
 
543
  /* We will not read past end of file. */
 
544
  if (info->type != READ_FIFO &&
 
545
      max_length > (info->end_of_file - pos_in_file))
 
546
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
547
  /*
 
548
    If there is nothing left to read,
 
549
      we either are done, or we failed to fulfill the request.
 
550
    Otherwise, we read max_length into the cache.
 
551
  */
 
552
  if (!max_length)
 
553
  {
 
554
    if (Count)
 
555
    {
 
556
      /* We couldn't fulfil the request. Return, how much we got. */
 
557
      info->error= left_length;
 
558
      DBUG_RETURN(1);
 
559
    }
 
560
    length=0;                           /* Didn't read any chars */
 
561
  }
 
562
  else if ((length= mysql_file_read(info->file,info->buffer, max_length,
 
563
                            info->myflags)) < Count ||
 
564
           length == (size_t) -1)
 
565
  {
 
566
    /*
 
567
      We got an read error, or less than requested (end of file).
 
568
      If not a read error, copy, what we got.
 
569
    */
 
570
    if (length != (size_t) -1)
 
571
      memcpy(Buffer, info->buffer, length);
 
572
    info->pos_in_file= pos_in_file;
 
573
    /* For a read error, return -1, otherwise, what we got in total. */
 
574
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
 
575
    info->read_pos=info->read_end=info->buffer;
 
576
    DBUG_RETURN(1);
 
577
  }
 
578
  /*
 
579
    Count is the remaining number of bytes requested.
 
580
    length is the amount of data in the cache.
 
581
    Read Count bytes from the cache.
 
582
  */
 
583
  info->read_pos=info->buffer+Count;
 
584
  info->read_end=info->buffer+length;
 
585
  info->pos_in_file=pos_in_file;
 
586
  memcpy(Buffer, info->buffer, Count);
 
587
  DBUG_RETURN(0);
 
588
}
 
589
 
 
590
 
 
591
/*
 
592
  Prepare IO_CACHE for shared use.
 
593
 
 
594
  SYNOPSIS
 
595
    init_io_cache_share()
 
596
      read_cache                A read cache. This will be copied for
 
597
                                every thread after setup.
 
598
      cshare                    The share.
 
599
      write_cache               If non-NULL a write cache that is to be
 
600
                                synchronized with the read caches.
 
601
      num_threads               Number of threads sharing the cache
 
602
                                including the write thread if any.
 
603
 
 
604
  DESCRIPTION
 
605
 
 
606
    The shared cache is used so: One IO_CACHE is initialized with
 
607
    init_io_cache(). This includes the allocation of a buffer. Then a
 
608
    share is allocated and init_io_cache_share() is called with the io
 
609
    cache and the share. Then the io cache is copied for each thread. So
 
610
    every thread has its own copy of IO_CACHE. But the allocated buffer
 
611
    is shared because cache->buffer is the same for all caches.
 
612
 
 
613
    One thread reads data from the file into the buffer. All threads
 
614
    read from the buffer, but every thread maintains its own set of
 
615
    pointers into the buffer. When all threads have used up the buffer
 
616
    contents, one of the threads reads the next block of data into the
 
617
    buffer. To accomplish this, each thread enters the cache lock before
 
618
    accessing the buffer. They wait in lock_io_cache() until all threads
 
619
    joined the lock. The last thread entering the lock is in charge of
 
620
    reading from file to buffer. It wakes all threads when done.
 
621
 
 
622
    Synchronizing a write cache to the read caches works so: Whenever
 
623
    the write buffer needs a flush, the write thread enters the lock and
 
624
    waits for all other threads to enter the lock too. They do this when
 
625
    they have used up the read buffer. When all threads are in the lock,
 
626
    the write thread copies the write buffer to the read buffer and
 
627
    wakes all threads.
 
628
 
 
629
    share->running_threads is the number of threads not being in the
 
630
    cache lock. When entering lock_io_cache() the number is decreased.
 
631
    When the thread that fills the buffer enters unlock_io_cache() the
 
632
    number is reset to the number of threads. The condition
 
633
    running_threads == 0 means that all threads are in the lock. Bumping
 
634
    up the number to the full count is non-intuitive. But increasing the
 
635
    number by one for each thread that leaves the lock could lead to a
 
636
    solo run of one thread. The last thread to join a lock reads from
 
637
    file to buffer, wakes the other threads, processes the data in the
 
638
    cache and enters the lock again. If no other thread left the lock
 
639
    meanwhile, it would think it's the last one again and read the next
 
640
    block...
 
641
 
 
642
    The share has copies of 'error', 'buffer', 'read_end', and
 
643
    'pos_in_file' from the thread that filled the buffer. We may not be
 
644
    able to access this information directly from its cache because the
 
645
    thread may be removed from the share before the variables could be
 
646
    copied by all other threads. Or, if a write buffer is synchronized,
 
647
    it would change its 'pos_in_file' after waking the other threads,
 
648
    possibly before they could copy its value.
 
649
 
 
650
    However, the 'buffer' variable in the share is for a synchronized
 
651
    write cache. It needs to know where to put the data. Otherwise it
 
652
    would need access to the read cache of one of the threads that is
 
653
    not yet removed from the share.
 
654
 
 
655
  RETURN
 
656
    void
 
657
*/
 
658
 
 
659
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
 
660
                         IO_CACHE *write_cache, uint num_threads)
 
661
{
 
662
  DBUG_ENTER("init_io_cache_share");
 
663
  DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx  share: 0x%lx  "
 
664
                                "write_cache: 0x%lx  threads: %u",
 
665
                                (long) read_cache, (long) cshare,
 
666
                                (long) write_cache, num_threads));
 
667
 
 
668
  DBUG_ASSERT(num_threads > 1);
 
669
  DBUG_ASSERT(read_cache->type == READ_CACHE);
 
670
  DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
 
671
 
 
672
  mysql_mutex_init(key_IO_CACHE_SHARE_mutex,
 
673
                   &cshare->mutex, MY_MUTEX_INIT_FAST);
 
674
  mysql_cond_init(key_IO_CACHE_SHARE_cond, &cshare->cond, 0);
 
675
  mysql_cond_init(key_IO_CACHE_SHARE_cond_writer, &cshare->cond_writer, 0);
 
676
 
 
677
  cshare->running_threads= num_threads;
 
678
  cshare->total_threads=   num_threads;
 
679
  cshare->error=           0;    /* Initialize. */
 
680
  cshare->buffer=          read_cache->buffer;
 
681
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
 
682
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
 
683
  cshare->source_cache=    write_cache; /* Can be NULL. */
 
684
 
 
685
  read_cache->share=         cshare;
 
686
  read_cache->read_function= _my_b_read_r;
 
687
  read_cache->current_pos=   NULL;
 
688
  read_cache->current_end=   NULL;
 
689
 
 
690
  if (write_cache)
 
691
    write_cache->share= cshare;
 
692
 
 
693
  DBUG_VOID_RETURN;
 
694
}
 
695
 
 
696
 
 
697
/*
 
698
  Remove a thread from shared access to IO_CACHE.
 
699
 
 
700
  SYNOPSIS
 
701
    remove_io_thread()
 
702
      cache                     The IO_CACHE to be removed from the share.
 
703
 
 
704
  NOTE
 
705
 
 
706
    Every thread must do that on exit for not to deadlock other threads.
 
707
 
 
708
    The last thread destroys the pthread resources.
 
709
 
 
710
    A writer flushes its cache first.
 
711
 
 
712
  RETURN
 
713
    void
 
714
*/
 
715
 
 
716
void remove_io_thread(IO_CACHE *cache)
 
717
{
 
718
  IO_CACHE_SHARE *cshare= cache->share;
 
719
  uint total;
 
720
  DBUG_ENTER("remove_io_thread");
 
721
 
 
722
  /* If the writer goes, it needs to flush the write cache. */
 
723
  if (cache == cshare->source_cache)
 
724
    flush_io_cache(cache);
 
725
 
 
726
  mysql_mutex_lock(&cshare->mutex);
 
727
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
 
728
                                (cache == cshare->source_cache) ?
 
729
                                "writer" : "reader", (long) cache));
 
730
 
 
731
  /* Remove from share. */
 
732
  total= --cshare->total_threads;
 
733
  DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
 
734
 
 
735
  /* Detach from share. */
 
736
  cache->share= NULL;
 
737
 
 
738
  /* If the writer goes, let the readers know. */
 
739
  if (cache == cshare->source_cache)
 
740
  {
 
741
    DBUG_PRINT("io_cache_share", ("writer leaves"));
 
742
    cshare->source_cache= NULL;
 
743
  }
 
744
 
 
745
  /* If all threads are waiting for me to join the lock, wake them. */
 
746
  if (!--cshare->running_threads)
 
747
  {
 
748
    DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
 
749
    mysql_cond_signal(&cshare->cond_writer);
 
750
    mysql_cond_broadcast(&cshare->cond);
 
751
  }
 
752
 
 
753
  mysql_mutex_unlock(&cshare->mutex);
 
754
 
 
755
  if (!total)
 
756
  {
 
757
    DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
 
758
    mysql_cond_destroy (&cshare->cond_writer);
 
759
    mysql_cond_destroy (&cshare->cond);
 
760
    mysql_mutex_destroy(&cshare->mutex);
 
761
  }
 
762
 
 
763
  DBUG_VOID_RETURN;
 
764
}
 
765
 
 
766
 
 
767
/*
 
768
  Lock IO cache and wait for all other threads to join.
 
769
 
 
770
  SYNOPSIS
 
771
    lock_io_cache()
 
772
      cache                     The cache of the thread entering the lock.
 
773
      pos                       File position of the block to read.
 
774
                                Unused for the write thread.
 
775
 
 
776
  DESCRIPTION
 
777
 
 
778
    Wait for all threads to finish with the current buffer. We want
 
779
    all threads to proceed in concert. The last thread to join
 
780
    lock_io_cache() will read the block from file and all threads start
 
781
    to use it. Then they will join again for reading the next block.
 
782
 
 
783
    The waiting threads detect a fresh buffer by comparing
 
784
    cshare->pos_in_file with the position they want to process next.
 
785
    Since the first block may start at position 0, we take
 
786
    cshare->read_end as an additional condition. This variable is
 
787
    initialized to NULL and will be set after a block of data is written
 
788
    to the buffer.
 
789
 
 
790
  RETURN
 
791
    1           OK, lock in place, go ahead and read.
 
792
    0           OK, unlocked, another thread did the read.
 
793
*/
 
794
 
 
795
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
 
796
{
 
797
  IO_CACHE_SHARE *cshare= cache->share;
 
798
  DBUG_ENTER("lock_io_cache");
 
799
 
 
800
  /* Enter the lock. */
 
801
  mysql_mutex_lock(&cshare->mutex);
 
802
  cshare->running_threads--;
 
803
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
 
804
                                (cache == cshare->source_cache) ?
 
805
                                "writer" : "reader", (long) cache, (ulong) pos,
 
806
                                cshare->running_threads));
 
807
 
 
808
  if (cshare->source_cache)
 
809
  {
 
810
    /* A write cache is synchronized to the read caches. */
 
811
 
 
812
    if (cache == cshare->source_cache)
 
813
    {
 
814
      /* The writer waits until all readers are here. */
 
815
      while (cshare->running_threads)
 
816
      {
 
817
        DBUG_PRINT("io_cache_share", ("writer waits in lock"));
 
818
        mysql_cond_wait(&cshare->cond_writer, &cshare->mutex);
 
819
      }
 
820
      DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
 
821
 
 
822
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
823
      DBUG_RETURN(1);
 
824
    }
 
825
 
 
826
    /* The last thread wakes the writer. */
 
827
    if (!cshare->running_threads)
 
828
    {
 
829
      DBUG_PRINT("io_cache_share", ("waking writer"));
 
830
      mysql_cond_signal(&cshare->cond_writer);
 
831
    }
 
832
 
 
833
    /*
 
834
      Readers wait until the data is copied from the writer. Another
 
835
      reason to stop waiting is the removal of the write thread. If this
 
836
      happens, we leave the lock with old data in the buffer.
 
837
    */
 
838
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
839
           cshare->source_cache)
 
840
    {
 
841
      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
 
842
      mysql_cond_wait(&cshare->cond, &cshare->mutex);
 
843
    }
 
844
 
 
845
    /*
 
846
      If the writer was removed from the share while this thread was
 
847
      asleep, we need to simulate an EOF condition. The writer cannot
 
848
      reset the share variables as they might still be in use by readers
 
849
      of the last block. When we awake here then because the last
 
850
      joining thread signalled us. If the writer is not the last, it
 
851
      will not signal. So it is safe to clear the buffer here.
 
852
    */
 
853
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
854
    {
 
855
      DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
 
856
      cshare->read_end= cshare->buffer; /* Empty buffer. */
 
857
      cshare->error= 0; /* EOF is not an error. */
 
858
    }
 
859
  }
 
860
  else
 
861
  {
 
862
    /*
 
863
      There are read caches only. The last thread arriving in
 
864
      lock_io_cache() continues with a locked cache and reads the block.
 
865
    */
 
866
    if (!cshare->running_threads)
 
867
    {
 
868
      DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
 
869
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
870
      DBUG_RETURN(1);
 
871
    }
 
872
 
 
873
    /*
 
874
      All other threads wait until the requested block is read by the
 
875
      last thread arriving. Another reason to stop waiting is the
 
876
      removal of a thread. If this leads to all threads being in the
 
877
      lock, we have to continue also. The first of the awaken threads
 
878
      will then do the read.
 
879
    */
 
880
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
881
           cshare->running_threads)
 
882
    {
 
883
      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
 
884
      mysql_cond_wait(&cshare->cond, &cshare->mutex);
 
885
    }
 
886
 
 
887
    /* If the block is not yet read, continue with a locked cache and read. */
 
888
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
889
    {
 
890
      DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
 
891
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
892
      DBUG_RETURN(1);
 
893
    }
 
894
 
 
895
    /* Another thread did read the block already. */
 
896
  }
 
897
  DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
 
898
                                (uint) (cshare->read_end ? (size_t)
 
899
                                        (cshare->read_end - cshare->buffer) :
 
900
                                        0)));
 
901
 
 
902
  /*
 
903
    Leave the lock. Do not call unlock_io_cache() later. The thread that
 
904
    filled the buffer did this and marked all threads as running.
 
905
  */
 
906
  mysql_mutex_unlock(&cshare->mutex);
 
907
  DBUG_RETURN(0);
 
908
}
 
909
 
 
910
 
 
911
/*
 
912
  Unlock IO cache.
 
913
 
 
914
  SYNOPSIS
 
915
    unlock_io_cache()
 
916
      cache                     The cache of the thread leaving the lock.
 
917
 
 
918
  NOTE
 
919
    This is called by the thread that filled the buffer. It marks all
 
920
    threads as running and awakes them. This must not be done by any
 
921
    other thread.
 
922
 
 
923
    Do not signal cond_writer. Either there is no writer or the writer
 
924
    is the only one who can call this function.
 
925
 
 
926
    The reason for resetting running_threads to total_threads before
 
927
    waking all other threads is that it could be possible that this
 
928
    thread is so fast with processing the buffer that it enters the lock
 
929
    before even one other thread has left it. If every awoken thread
 
930
    would increase running_threads by one, this thread could think that
 
931
    he is again the last to join and would not wait for the other
 
932
    threads to process the data.
 
933
 
 
934
  RETURN
 
935
    void
 
936
*/
 
937
 
 
938
static void unlock_io_cache(IO_CACHE *cache)
 
939
{
 
940
  IO_CACHE_SHARE *cshare= cache->share;
 
941
  DBUG_ENTER("unlock_io_cache");
 
942
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
 
943
                                (cache == cshare->source_cache) ?
 
944
                                "writer" : "reader",
 
945
                                (long) cache, (ulong) cshare->pos_in_file,
 
946
                                cshare->total_threads));
 
947
 
 
948
  cshare->running_threads= cshare->total_threads;
 
949
  mysql_cond_broadcast(&cshare->cond);
 
950
  mysql_mutex_unlock(&cshare->mutex);
 
951
  DBUG_VOID_RETURN;
 
952
}
 
953
 
 
954
 
 
955
/*
 
956
  Read from IO_CACHE when it is shared between several threads.
 
957
 
 
958
  SYNOPSIS
 
959
    _my_b_read_r()
 
960
      cache                     IO_CACHE pointer
 
961
      Buffer                    Buffer to retrieve count bytes from file
 
962
      Count                     Number of bytes to read into Buffer
 
963
 
 
964
  NOTE
 
965
    This function is only called from the my_b_read() macro when there
 
966
    isn't enough characters in the buffer to satisfy the request.
 
967
 
 
968
  IMPLEMENTATION
 
969
 
 
970
    It works as follows: when a thread tries to read from a file (that
 
971
    is, after using all the data from the (shared) buffer), it just
 
972
    hangs on lock_io_cache(), waiting for other threads. When the very
 
973
    last thread attempts a read, lock_io_cache() returns 1, the thread
 
974
    does actual IO and unlock_io_cache(), which signals all the waiting
 
975
    threads that data is in the buffer.
 
976
 
 
977
  WARNING
 
978
 
 
979
    When changing this function, be careful with handling file offsets
 
980
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
981
    types than my_off_t unless you can be sure that their value fits.
 
982
    Same applies to differences of file offsets. (Bug #11527)
 
983
 
 
984
    When changing this function, check _my_b_read(). It might need the
 
985
    same change.
 
986
 
 
987
  RETURN
 
988
    0      we succeeded in reading all data
 
989
    1      Error: can't read requested characters
 
990
*/
 
991
 
 
992
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
 
993
{
 
994
  my_off_t pos_in_file;
 
995
  size_t length, diff_length, left_length;
 
996
  IO_CACHE_SHARE *cshare= cache->share;
 
997
  DBUG_ENTER("_my_b_read_r");
 
998
 
 
999
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
 
1000
  {
 
1001
    DBUG_ASSERT(Count >= left_length);  /* User is not using my_b_read() */
 
1002
    memcpy(Buffer, cache->read_pos, left_length);
 
1003
    Buffer+= left_length;
 
1004
    Count-= left_length;
 
1005
  }
 
1006
  while (Count)
 
1007
  {
 
1008
    size_t cnt, len;
 
1009
 
 
1010
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
 
1011
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
1012
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
 
1013
    length= ((length <= cache->read_length) ?
 
1014
             length + IO_ROUND_DN(cache->read_length - length) :
 
1015
             length - IO_ROUND_UP(length - cache->read_length));
 
1016
    if (cache->type != READ_FIFO &&
 
1017
        (length > (cache->end_of_file - pos_in_file)))
 
1018
      length= (size_t) (cache->end_of_file - pos_in_file);
 
1019
    if (length == 0)
 
1020
    {
 
1021
      cache->error= (int) left_length;
 
1022
      DBUG_RETURN(1);
 
1023
    }
 
1024
    if (lock_io_cache(cache, pos_in_file))
 
1025
    {
 
1026
      /* With a synchronized write/read cache we won't come here... */
 
1027
      DBUG_ASSERT(!cshare->source_cache);
 
1028
      /*
 
1029
        ... unless the writer has gone before this thread entered the
 
1030
        lock. Simulate EOF in this case. It can be distinguished by
 
1031
        cache->file.
 
1032
      */
 
1033
      if (cache->file < 0)
 
1034
        len= 0;
 
1035
      else
 
1036
      {
 
1037
        /*
 
1038
          Whenever a function which operates on IO_CACHE flushes/writes
 
1039
          some part of the IO_CACHE to disk it will set the property
 
1040
          "seek_not_done" to indicate this to other functions operating
 
1041
          on the IO_CACHE.
 
1042
        */
 
1043
        if (cache->seek_not_done)
 
1044
        {
 
1045
          if (mysql_file_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
 
1046
              == MY_FILEPOS_ERROR)
 
1047
          {
 
1048
            cache->error= -1;
 
1049
            unlock_io_cache(cache);
 
1050
            DBUG_RETURN(1);
 
1051
          }
 
1052
        }
 
1053
        len= mysql_file_read(cache->file, cache->buffer, length, cache->myflags);
 
1054
      }
 
1055
      DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
 
1056
 
 
1057
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
 
1058
      cache->error=       (len == length ? 0 : (int) len);
 
1059
      cache->pos_in_file= pos_in_file;
 
1060
 
 
1061
      /* Copy important values to the share. */
 
1062
      cshare->error=       cache->error;
 
1063
      cshare->read_end=    cache->read_end;
 
1064
      cshare->pos_in_file= pos_in_file;
 
1065
 
 
1066
      /* Mark all threads as running and wake them. */
 
1067
      unlock_io_cache(cache);
 
1068
    }
 
1069
    else
 
1070
    {
 
1071
      /*
 
1072
        With a synchronized write/read cache readers always come here.
 
1073
        Copy important values from the share.
 
1074
      */
 
1075
      cache->error=       cshare->error;
 
1076
      cache->read_end=    cshare->read_end;
 
1077
      cache->pos_in_file= cshare->pos_in_file;
 
1078
 
 
1079
      len= ((cache->error == -1) ? (size_t) -1 :
 
1080
            (size_t) (cache->read_end - cache->buffer));
 
1081
    }
 
1082
    cache->read_pos=      cache->buffer;
 
1083
    cache->seek_not_done= 0;
 
1084
    if (len == 0 || len == (size_t) -1)
 
1085
    {
 
1086
      DBUG_PRINT("io_cache_share", ("reader error. len %lu  left %lu",
 
1087
                                    (ulong) len, (ulong) left_length));
 
1088
      cache->error= (int) left_length;
 
1089
      DBUG_RETURN(1);
 
1090
    }
 
1091
    cnt= (len > Count) ? Count : len;
 
1092
    memcpy(Buffer, cache->read_pos, cnt);
 
1093
    Count -= cnt;
 
1094
    Buffer+= cnt;
 
1095
    left_length+= cnt;
 
1096
    cache->read_pos+= cnt;
 
1097
  }
 
1098
  DBUG_RETURN(0);
 
1099
}
 
1100
 
 
1101
 
 
1102
/*
 
1103
  Copy data from write cache to read cache.
 
1104
 
 
1105
  SYNOPSIS
 
1106
    copy_to_read_buffer()
 
1107
      write_cache               The write cache.
 
1108
      write_buffer              The source of data, mostly the cache buffer.
 
1109
      write_length              The number of bytes to copy.
 
1110
 
 
1111
  NOTE
 
1112
    The write thread will wait for all read threads to join the cache
 
1113
    lock. Then it copies the data over and wakes the read threads.
 
1114
 
 
1115
  RETURN
 
1116
    void
 
1117
*/
 
1118
 
 
1119
static void copy_to_read_buffer(IO_CACHE *write_cache,
 
1120
                                const uchar *write_buffer, size_t write_length)
 
1121
{
 
1122
  IO_CACHE_SHARE *cshare= write_cache->share;
 
1123
 
 
1124
  DBUG_ASSERT(cshare->source_cache == write_cache);
 
1125
  /*
 
1126
    write_length is usually less or equal to buffer_length.
 
1127
    It can be bigger if _my_b_write() is called with a big length.
 
1128
  */
 
1129
  while (write_length)
 
1130
  {
 
1131
    size_t copy_length= MY_MIN(write_length, write_cache->buffer_length);
 
1132
    int  __attribute__((unused)) rc;
 
1133
 
 
1134
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
 
1135
    /* The writing thread does always have the lock when it awakes. */
 
1136
    DBUG_ASSERT(rc);
 
1137
 
 
1138
    memcpy(cshare->buffer, write_buffer, copy_length);
 
1139
 
 
1140
    cshare->error=       0;
 
1141
    cshare->read_end=    cshare->buffer + copy_length;
 
1142
    cshare->pos_in_file= write_cache->pos_in_file;
 
1143
 
 
1144
    /* Mark all threads as running and wake them. */
 
1145
    unlock_io_cache(write_cache);
 
1146
 
 
1147
    write_buffer+= copy_length;
 
1148
    write_length-= copy_length;
 
1149
  }
 
1150
}
 
1151
 
 
1152
 
 
1153
/*
 
1154
  Do sequential read from the SEQ_READ_APPEND cache.
 
1155
  
 
1156
  We do this in three stages:
 
1157
   - first read from info->buffer
 
1158
   - then if there are still data to read, try the file descriptor
 
1159
   - afterwards, if there are still data to read, try append buffer
 
1160
 
 
1161
  RETURNS
 
1162
    0  Success
 
1163
    1  Failed to read
 
1164
*/
 
1165
 
 
1166
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
1167
{
 
1168
  size_t length, diff_length, left_length, save_count, max_length;
 
1169
  my_off_t pos_in_file;
 
1170
  save_count=Count;
 
1171
 
 
1172
  /* first, read the regular buffer */
 
1173
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
 
1174
  {
 
1175
    DBUG_ASSERT(Count > left_length);   /* User is not using my_b_read() */
 
1176
    memcpy(Buffer,info->read_pos, left_length);
 
1177
    Buffer+=left_length;
 
1178
    Count-=left_length;
 
1179
  }
 
1180
  lock_append_buffer(info);
 
1181
 
 
1182
  /* pos_in_file always point on where info->buffer was read */
 
1183
  if ((pos_in_file=info->pos_in_file +
 
1184
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
 
1185
    goto read_append_buffer;
 
1186
 
 
1187
  /*
 
1188
    With read-append cache we must always do a seek before we read,
 
1189
    because the write could have moved the file pointer astray
 
1190
  */
 
1191
  if (mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR)
 
1192
  {
 
1193
   info->error= -1;
 
1194
   unlock_append_buffer(info);
 
1195
   return (1);
 
1196
  }
 
1197
  info->seek_not_done=0;
 
1198
 
 
1199
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
1200
 
 
1201
  /* now the second stage begins - read from file descriptor */
 
1202
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
 
1203
  {
 
1204
    /* Fill first intern buffer */
 
1205
    size_t read_length;
 
1206
 
 
1207
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
1208
    if ((read_length= mysql_file_read(info->file,Buffer, length,
 
1209
                                      info->myflags)) == (size_t) -1)
 
1210
    {
 
1211
      info->error= -1;
 
1212
      unlock_append_buffer(info);
 
1213
      return 1;
 
1214
    }
 
1215
    Count-=read_length;
 
1216
    Buffer+=read_length;
 
1217
    pos_in_file+=read_length;
 
1218
 
 
1219
    if (read_length != length)
 
1220
    {
 
1221
      /*
 
1222
        We only got part of data;  Read the rest of the data from the
 
1223
        write buffer
 
1224
      */
 
1225
      goto read_append_buffer;
 
1226
    }
 
1227
    left_length+=length;
 
1228
    diff_length=0;
 
1229
  }
 
1230
 
 
1231
  max_length= info->read_length-diff_length;
 
1232
  if (max_length > (info->end_of_file - pos_in_file))
 
1233
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
1234
  if (!max_length)
 
1235
  {
 
1236
    if (Count)
 
1237
      goto read_append_buffer;
 
1238
    length=0;                           /* Didn't read any more chars */
 
1239
  }
 
1240
  else
 
1241
  {
 
1242
    length= mysql_file_read(info->file,info->buffer, max_length, info->myflags);
 
1243
    if (length == (size_t) -1)
 
1244
    {
 
1245
      info->error= -1;
 
1246
      unlock_append_buffer(info);
 
1247
      return 1;
 
1248
    }
 
1249
    if (length < Count)
 
1250
    {
 
1251
      memcpy(Buffer, info->buffer, length);
 
1252
      Count -= length;
 
1253
      Buffer += length;
 
1254
 
 
1255
      /*
 
1256
         added the line below to make
 
1257
         DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
 
1258
         otherwise this does not appear to be needed
 
1259
      */
 
1260
      pos_in_file += length;
 
1261
      goto read_append_buffer;
 
1262
    }
 
1263
  }
 
1264
  unlock_append_buffer(info);
 
1265
  info->read_pos=info->buffer+Count;
 
1266
  info->read_end=info->buffer+length;
 
1267
  info->pos_in_file=pos_in_file;
 
1268
  memcpy(Buffer,info->buffer,(size_t) Count);
 
1269
  return 0;
 
1270
 
 
1271
read_append_buffer:
 
1272
 
 
1273
  /*
 
1274
     Read data from the current write buffer.
 
1275
     Count should never be == 0 here (The code will work even if count is 0)
 
1276
  */
 
1277
 
 
1278
  {
 
1279
    /* First copy the data to Count */
 
1280
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
 
1281
    size_t copy_len;
 
1282
    size_t transfer_len;
 
1283
 
 
1284
    DBUG_ASSERT(info->append_read_pos <= info->write_pos);
 
1285
    /*
 
1286
      TODO: figure out if the assert below is needed or correct.
 
1287
    */
 
1288
    DBUG_ASSERT(pos_in_file == info->end_of_file);
 
1289
    copy_len= MY_MIN(Count, len_in_buff);
 
1290
    memcpy(Buffer, info->append_read_pos, copy_len);
 
1291
    info->append_read_pos += copy_len;
 
1292
    Count -= copy_len;
 
1293
    if (Count)
 
1294
      info->error = save_count - Count;
 
1295
 
 
1296
    /* Fill read buffer with data from write buffer */
 
1297
    memcpy(info->buffer, info->append_read_pos,
 
1298
           (size_t) (transfer_len=len_in_buff - copy_len));
 
1299
    info->read_pos= info->buffer;
 
1300
    info->read_end= info->buffer+transfer_len;
 
1301
    info->append_read_pos=info->write_pos;
 
1302
    info->pos_in_file=pos_in_file+copy_len;
 
1303
    info->end_of_file+=len_in_buff;
 
1304
  }
 
1305
  unlock_append_buffer(info);
 
1306
  return Count ? 1 : 0;
 
1307
}
 
1308
 
 
1309
 
 
1310
#ifdef HAVE_AIOWAIT
 
1311
 
 
1312
/*
 
1313
  Read from the IO_CACHE into a buffer and feed asynchronously
 
1314
  from disk when needed.
 
1315
 
 
1316
  SYNOPSIS
 
1317
    _my_b_async_read()
 
1318
      info                      IO_CACHE pointer
 
1319
      Buffer                    Buffer to retrieve count bytes from file
 
1320
      Count                     Number of bytes to read into Buffer
 
1321
 
 
1322
  RETURN VALUE
 
1323
    -1          An error has occurred; my_errno is set.
 
1324
     0          Success
 
1325
     1          An error has occurred; IO_CACHE to error state.
 
1326
*/
 
1327
 
 
1328
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
1329
{
 
1330
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
 
1331
  size_t max_length;
 
1332
  my_off_t next_pos_in_file;
 
1333
  uchar *read_buffer;
 
1334
 
 
1335
  memcpy(Buffer,info->read_pos,
 
1336
         (left_length= (size_t) (info->read_end-info->read_pos)));
 
1337
  Buffer+=left_length;
 
1338
  org_Count=Count;
 
1339
  Count-=left_length;
 
1340
 
 
1341
  if (info->inited)
 
1342
  {                                             /* wait for read block */
 
1343
    info->inited=0;                             /* No more block to read */
 
1344
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
 
1345
    if (info->aio_result.result.aio_errno)
 
1346
    {
 
1347
      if (info->myflags & MY_WME)
 
1348
      {
 
1349
        char errbuf[MYSYS_STRERROR_SIZE];
 
1350
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
 
1351
                 my_filename(info->file),
 
1352
                 info->aio_result.result.aio_errno,
 
1353
                 my_strerror(errbuf, sizeof(errbuf),
 
1354
                             info->aio_result.result.aio_errno));
 
1355
      my_errno=info->aio_result.result.aio_errno;
 
1356
      info->error= -1;
 
1357
      return(1);
 
1358
    }
 
1359
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
 
1360
        read_length == (size_t) -1)
 
1361
    {
 
1362
      my_errno=0;                               /* For testing */
 
1363
      info->error= (read_length == (size_t) -1 ? -1 :
 
1364
                    (int) (read_length+left_length));
 
1365
      return(1);
 
1366
    }
 
1367
    info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
 
1368
 
 
1369
    if (info->request_pos != info->buffer)
 
1370
      info->request_pos=info->buffer;
 
1371
    else
 
1372
      info->request_pos=info->buffer+info->read_length;
 
1373
    info->read_pos=info->request_pos;
 
1374
    next_pos_in_file=info->aio_read_pos+read_length;
 
1375
 
 
1376
        /* Check if pos_in_file is changed
 
1377
           (_ni_read_cache may have skipped some bytes) */
 
1378
 
 
1379
    if (info->aio_read_pos < info->pos_in_file)
 
1380
    {                                           /* Fix if skipped bytes */
 
1381
      if (info->aio_read_pos + read_length < info->pos_in_file)
 
1382
      {
 
1383
        read_length=0;                          /* Skip block */
 
1384
        next_pos_in_file=info->pos_in_file;
 
1385
      }
 
1386
      else
 
1387
      {
 
1388
        my_off_t offset= (info->pos_in_file - info->aio_read_pos);
 
1389
        info->pos_in_file=info->aio_read_pos; /* Whe are here */
 
1390
        info->read_pos=info->request_pos+offset;
 
1391
        read_length-=offset;                    /* Bytes left from read_pos */
 
1392
      }
 
1393
    }
 
1394
#ifndef DBUG_OFF
 
1395
    if (info->aio_read_pos > info->pos_in_file)
 
1396
    {
 
1397
      my_errno=EINVAL;
 
1398
      return(info->read_length= (size_t) -1);
 
1399
    }
 
1400
#endif
 
1401
        /* Copy found bytes to buffer */
 
1402
    length= MY_MIN(Count, read_length);
 
1403
    memcpy(Buffer,info->read_pos,(size_t) length);
 
1404
    Buffer+=length;
 
1405
    Count-=length;
 
1406
    left_length+=length;
 
1407
    info->read_end=info->rc_pos+read_length;
 
1408
    info->read_pos+=length;
 
1409
  }
 
1410
  else
 
1411
    next_pos_in_file=(info->pos_in_file+ (size_t)
 
1412
                      (info->read_end - info->request_pos));
 
1413
 
 
1414
        /* If reading large blocks, or first read or read with skip */
 
1415
  if (Count)
 
1416
  {
 
1417
    if (next_pos_in_file == info->end_of_file)
 
1418
    {
 
1419
      info->error=(int) (read_length+left_length);
 
1420
      return 1;
 
1421
    }
 
1422
    
 
1423
    if (mysql_file_seek(info->file, next_pos_in_file, MY_SEEK_SET, MYF(0))
 
1424
        == MY_FILEPOS_ERROR)
 
1425
    {
 
1426
      info->error= -1;
 
1427
      return (1);
 
1428
    }
 
1429
 
 
1430
    read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
 
1431
    if (Count < read_length)
 
1432
    {                                   /* Small block, read to cache */
 
1433
      if ((read_length=mysql_file_read(info->file,info->request_pos,
 
1434
                                       read_length, info->myflags)) == (size_t) -1)
 
1435
        return info->error= -1;
 
1436
      use_length= MY_MIN(Count, read_length);
 
1437
      memcpy(Buffer,info->request_pos,(size_t) use_length);
 
1438
      info->read_pos=info->request_pos+Count;
 
1439
      info->read_end=info->request_pos+read_length;
 
1440
      info->pos_in_file=next_pos_in_file;       /* Start of block in cache */
 
1441
      next_pos_in_file+=read_length;
 
1442
 
 
1443
      if (Count != use_length)
 
1444
      {                                 /* Didn't find hole block */
 
1445
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
 
1446
        {
 
1447
          char errbuf[MYSYS_STRERROR_SIZE];
 
1448
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG), my_filename(info->file),
 
1449
                   my_errno, my_strerror(errbuf, sizeof(errbuf), my_errno));
 
1450
        }
 
1451
        info->error=(int) (read_length+left_length);
 
1452
        return 1;
 
1453
      }
 
1454
    }
 
1455
    else
 
1456
    {                                           /* Big block, don't cache it */
 
1457
      if ((read_length= mysql_file_read(info->file, Buffer, Count,info->myflags))
 
1458
          != Count)
 
1459
      {
 
1460
        info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
 
1461
        return 1;
 
1462
      }
 
1463
      info->read_pos=info->read_end=info->request_pos;
 
1464
      info->pos_in_file=(next_pos_in_file+=Count);
 
1465
    }
 
1466
  }
 
1467
 
 
1468
  /* Read next block with asyncronic io */
 
1469
  diff_length=(next_pos_in_file & (IO_SIZE-1));
 
1470
  max_length= info->read_length - diff_length;
 
1471
  if (max_length > info->end_of_file - next_pos_in_file)
 
1472
    max_length= (size_t) (info->end_of_file - next_pos_in_file);
 
1473
 
 
1474
  if (info->request_pos != info->buffer)
 
1475
    read_buffer=info->buffer;
 
1476
  else
 
1477
    read_buffer=info->buffer+info->read_length;
 
1478
  info->aio_read_pos=next_pos_in_file;
 
1479
  if (max_length)
 
1480
  {
 
1481
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
 
1482
    DBUG_PRINT("aioread",("filepos: %ld  length: %lu",
 
1483
                          (ulong) next_pos_in_file, (ulong) max_length));
 
1484
    if (aioread(info->file,read_buffer, max_length,
 
1485
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
 
1486
                &info->aio_result.result))
 
1487
    {                                           /* Skip async io */
 
1488
      my_errno=errno;
 
1489
      DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
 
1490
                          errno, info->aio_result.result.aio_errno));
 
1491
      if (info->request_pos != info->buffer)
 
1492
      {
 
1493
        bmove(info->buffer,info->request_pos,
 
1494
              (size_t) (info->read_end - info->read_pos));
 
1495
        info->request_pos=info->buffer;
 
1496
        info->read_pos-=info->read_length;
 
1497
        info->read_end-=info->read_length;
 
1498
      }
 
1499
      info->read_length=info->buffer_length;    /* Use hole buffer */
 
1500
      info->read_function=_my_b_read;           /* Use normal IO_READ next */
 
1501
    }
 
1502
    else
 
1503
      info->inited=info->aio_result.pending=1;
 
1504
  }
 
1505
  return 0;                                     /* Block read, async in use */
 
1506
} /* _my_b_async_read */
 
1507
#endif
 
1508
 
 
1509
 
 
1510
/* Read one byte when buffer is empty */
 
1511
 
 
1512
int _my_b_get(IO_CACHE *info)
 
1513
{
 
1514
  uchar buff;
 
1515
  IO_CACHE_CALLBACK pre_read,post_read;
 
1516
  if ((pre_read = info->pre_read))
 
1517
    (*pre_read)(info);
 
1518
  if ((*(info)->read_function)(info,&buff,1))
 
1519
    return my_b_EOF;
 
1520
  if ((post_read = info->post_read))
 
1521
    (*post_read)(info);
 
1522
  return (int) (uchar) buff;
 
1523
}
 
1524
 
 
1525
/* 
 
1526
   Write a byte buffer to IO_CACHE and flush to disk
 
1527
   if IO_CACHE is full.
 
1528
 
 
1529
   RETURN VALUE
 
1530
    1 On error on write
 
1531
    0 On success
 
1532
   -1 On error; my_errno contains error code.
 
1533
*/
 
1534
 
 
1535
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1536
{
 
1537
  size_t rest_length,length;
 
1538
  my_off_t pos_in_file= info->pos_in_file;
 
1539
 
 
1540
  DBUG_EXECUTE_IF("simulate_huge_load_data_file",
 
1541
                  {
 
1542
                    pos_in_file=(my_off_t)(5000000000ULL);
 
1543
                  });
 
1544
  if (pos_in_file+info->buffer_length > info->end_of_file)
 
1545
  {
 
1546
    my_errno=errno=EFBIG;
 
1547
    return info->error = -1;
 
1548
  }
 
1549
 
 
1550
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1551
  memcpy(info->write_pos,Buffer,(size_t) rest_length);
 
1552
  Buffer+=rest_length;
 
1553
  Count-=rest_length;
 
1554
  info->write_pos+=rest_length;
 
1555
 
 
1556
  if (my_b_flush_io_cache(info,1))
 
1557
    return 1;
 
1558
  if (Count >= IO_SIZE)
 
1559
  {                                     /* Fill first intern buffer */
 
1560
    length=Count & (size_t) ~(IO_SIZE-1);
 
1561
    if (info->seek_not_done)
 
1562
    {
 
1563
      /*
 
1564
        Whenever a function which operates on IO_CACHE flushes/writes
 
1565
        some part of the IO_CACHE to disk it will set the property
 
1566
        "seek_not_done" to indicate this to other functions operating
 
1567
        on the IO_CACHE.
 
1568
      */
 
1569
      if (mysql_file_seek(info->file, info->pos_in_file, MY_SEEK_SET, MYF(0)))
 
1570
      {
 
1571
        info->error= -1;
 
1572
        return (1);
 
1573
      }
 
1574
      info->seek_not_done=0;
 
1575
    }
 
1576
    if (mysql_file_write(info->file, Buffer, length, info->myflags | MY_NABP))
 
1577
      return info->error= -1;
 
1578
 
 
1579
    /*
 
1580
      In case of a shared I/O cache with a writer we normally do direct
 
1581
      write cache to read cache copy. Simulate this here by direct
 
1582
      caller buffer to read cache copy. Do it after the write so that
 
1583
      the cache readers actions on the flushed part can go in parallel
 
1584
      with the write of the extra stuff. copy_to_read_buffer()
 
1585
      synchronizes writer and readers so that after this call the
 
1586
      readers can act on the extra stuff while the writer can go ahead
 
1587
      and prepare the next output. copy_to_read_buffer() relies on
 
1588
      info->pos_in_file.
 
1589
    */
 
1590
    if (info->share)
 
1591
      copy_to_read_buffer(info, Buffer, length);
 
1592
 
 
1593
    Count-=length;
 
1594
    Buffer+=length;
 
1595
    info->pos_in_file+=length;
 
1596
  }
 
1597
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1598
  info->write_pos+=Count;
 
1599
  return 0;
 
1600
}
 
1601
 
 
1602
 
 
1603
/*
 
1604
  Append a block to the write buffer.
 
1605
  This is done with the buffer locked to ensure that we don't read from
 
1606
  the write buffer before we are ready with it.
 
1607
*/
 
1608
 
 
1609
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1610
{
 
1611
  size_t rest_length,length;
 
1612
 
 
1613
  /*
 
1614
    Assert that we cannot come here with a shared cache. If we do one
 
1615
    day, we might need to add a call to copy_to_read_buffer().
 
1616
  */
 
1617
  DBUG_ASSERT(!info->share);
 
1618
 
 
1619
  lock_append_buffer(info);
 
1620
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1621
  if (Count <= rest_length)
 
1622
    goto end;
 
1623
  memcpy(info->write_pos, Buffer, rest_length);
 
1624
  Buffer+=rest_length;
 
1625
  Count-=rest_length;
 
1626
  info->write_pos+=rest_length;
 
1627
  if (my_b_flush_io_cache(info,0))
 
1628
  {
 
1629
    unlock_append_buffer(info);
 
1630
    return 1;
 
1631
  }
 
1632
  if (Count >= IO_SIZE)
 
1633
  {                                     /* Fill first intern buffer */
 
1634
    length=Count & (size_t) ~(IO_SIZE-1);
 
1635
    if (mysql_file_write(info->file,Buffer, length, info->myflags | MY_NABP))
 
1636
    {
 
1637
      unlock_append_buffer(info);
 
1638
      return info->error= -1;
 
1639
    }
 
1640
    Count-=length;
 
1641
    Buffer+=length;
 
1642
    info->end_of_file+=length;
 
1643
  }
 
1644
 
 
1645
end:
 
1646
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1647
  info->write_pos+=Count;
 
1648
  unlock_append_buffer(info);
 
1649
  return 0;
 
1650
}
 
1651
 
 
1652
 
 
1653
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1654
{
 
1655
  /*
 
1656
    Sasha: We are not writing this with the ? operator to avoid hitting
 
1657
    a possible compiler bug. At least gcc 2.95 cannot deal with 
 
1658
    several layers of ternary operators that evaluated comma(,) operator
 
1659
    expressions inside - I do have a test case if somebody wants it
 
1660
  */
 
1661
  if (info->type == SEQ_READ_APPEND)
 
1662
    return my_b_append(info, Buffer, Count);
 
1663
  return my_b_write(info, Buffer, Count);
 
1664
}
 
1665
 
 
1666
 
 
1667
/*
 
1668
  Write a block to disk where part of the data may be inside the record
 
1669
  buffer.  As all write calls to the data goes through the cache,
 
1670
  we will never get a seek over the end of the buffer
 
1671
*/
 
1672
 
 
1673
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
 
1674
                   my_off_t pos)
 
1675
{
 
1676
  size_t length;
 
1677
  int error=0;
 
1678
 
 
1679
  /*
 
1680
    Assert that we cannot come here with a shared cache. If we do one
 
1681
    day, we might need to add a call to copy_to_read_buffer().
 
1682
  */
 
1683
  DBUG_ASSERT(!info->share);
 
1684
 
 
1685
  if (pos < info->pos_in_file)
 
1686
  {
 
1687
    /* Of no overlap, write everything without buffering */
 
1688
    if (pos + Count <= info->pos_in_file)
 
1689
      return mysql_file_pwrite(info->file, Buffer, Count, pos,
 
1690
                               info->myflags | MY_NABP);
 
1691
    /* Write the part of the block that is before buffer */
 
1692
    length= (uint) (info->pos_in_file - pos);
 
1693
    if (mysql_file_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
 
1694
      info->error= error= -1;
 
1695
    Buffer+=length;
 
1696
    pos+=  length;
 
1697
    Count-= length;
 
1698
#ifndef HAVE_PREAD
 
1699
    info->seek_not_done=1;
 
1700
#endif
 
1701
  }
 
1702
 
 
1703
  /* Check if we want to write inside the used part of the buffer.*/
 
1704
  length= (size_t) (info->write_end - info->buffer);
 
1705
  if (pos < info->pos_in_file + length)
 
1706
  {
 
1707
    size_t offset= (size_t) (pos - info->pos_in_file);
 
1708
    length-=offset;
 
1709
    if (length > Count)
 
1710
      length=Count;
 
1711
    memcpy(info->buffer+offset, Buffer, length);
 
1712
    Buffer+=length;
 
1713
    Count-= length;
 
1714
    /* Fix length of buffer if the new data was larger */
 
1715
    if (info->buffer+length > info->write_pos)
 
1716
      info->write_pos=info->buffer+length;
 
1717
    if (!Count)
 
1718
      return (error);
 
1719
  }
 
1720
  /* Write at the end of the current buffer; This is the normal case */
 
1721
  if (_my_b_write(info, Buffer, Count))
 
1722
    error= -1;
 
1723
  return error;
 
1724
}
 
1725
 
 
1726
 
 
1727
        /* Flush write cache */
 
1728
 
 
1729
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1730
  lock_append_buffer(info);
 
1731
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1732
  unlock_append_buffer(info);
 
1733
 
 
1734
int my_b_flush_io_cache(IO_CACHE *info,
 
1735
                        int need_append_buffer_lock __attribute__((unused)))
 
1736
{
 
1737
  size_t length;
 
1738
  my_off_t pos_in_file;
 
1739
  my_bool append_cache= (info->type == SEQ_READ_APPEND);
 
1740
  DBUG_ENTER("my_b_flush_io_cache");
 
1741
  DBUG_PRINT("enter", ("cache: 0x%lx", (long) info));
 
1742
 
 
1743
  if (!append_cache)
 
1744
    need_append_buffer_lock= 0;
 
1745
 
 
1746
  if (info->type == WRITE_CACHE || append_cache)
 
1747
  {
 
1748
    if (info->file == -1)
 
1749
    {
 
1750
      if (real_open_cached_file(info))
 
1751
        DBUG_RETURN((info->error= -1));
 
1752
    }
 
1753
    LOCK_APPEND_BUFFER;
 
1754
 
 
1755
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
 
1756
    {
 
1757
      /*
 
1758
        In case of a shared I/O cache with a writer we do direct write
 
1759
        cache to read cache copy. Do it before the write here so that
 
1760
        the readers can work in parallel with the write.
 
1761
        copy_to_read_buffer() relies on info->pos_in_file.
 
1762
      */
 
1763
      if (info->share)
 
1764
        copy_to_read_buffer(info, info->write_buffer, length);
 
1765
 
 
1766
      pos_in_file=info->pos_in_file;
 
1767
      /*
 
1768
        If we have append cache, we always open the file with
 
1769
        O_APPEND which moves the pos to EOF automatically on every write
 
1770
      */
 
1771
      if (!append_cache && info->seek_not_done)
 
1772
      {                                 /* File touched, do seek */
 
1773
        if (mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0)) ==
 
1774
            MY_FILEPOS_ERROR)
 
1775
        {
 
1776
          UNLOCK_APPEND_BUFFER;
 
1777
          DBUG_RETURN((info->error= -1));
 
1778
        }
 
1779
        if (!append_cache)
 
1780
          info->seek_not_done=0;
 
1781
      }
 
1782
      if (!append_cache)
 
1783
        info->pos_in_file+=length;
 
1784
      info->write_end= (info->write_buffer+info->buffer_length-
 
1785
                        ((pos_in_file+length) & (IO_SIZE-1)));
 
1786
 
 
1787
      if (mysql_file_write(info->file,info->write_buffer,length,
 
1788
                   info->myflags | MY_NABP))
 
1789
        info->error= -1;
 
1790
      else
 
1791
        info->error= 0;
 
1792
      if (!append_cache)
 
1793
      {
 
1794
        set_if_bigger(info->end_of_file,(pos_in_file+length));
 
1795
      }
 
1796
      else
 
1797
      {
 
1798
        info->end_of_file+=(info->write_pos-info->append_read_pos);
 
1799
        DBUG_ASSERT(info->end_of_file == mysql_file_tell(info->file, MYF(0)));
 
1800
      }
 
1801
 
 
1802
      info->append_read_pos=info->write_pos=info->write_buffer;
 
1803
      ++info->disk_writes;
 
1804
      UNLOCK_APPEND_BUFFER;
 
1805
      DBUG_RETURN(info->error);
 
1806
    }
 
1807
  }
 
1808
#ifdef HAVE_AIOWAIT
 
1809
  else if (info->type != READ_NET)
 
1810
  {
 
1811
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
 
1812
    info->inited=0;
 
1813
  }
 
1814
#endif
 
1815
  UNLOCK_APPEND_BUFFER;
 
1816
  DBUG_RETURN(0);
 
1817
}
 
1818
 
 
1819
/*
 
1820
  Free an IO_CACHE object
 
1821
 
 
1822
  SYNOPSOS
 
1823
    end_io_cache()
 
1824
    info                IO_CACHE Handle to free
 
1825
 
 
1826
  NOTES
 
1827
    It's currently safe to call this if one has called init_io_cache()
 
1828
    on the 'info' object, even if init_io_cache() failed.
 
1829
    This function is also safe to call twice with the same handle.
 
1830
 
 
1831
  RETURN
 
1832
   0  ok
 
1833
   #  Error
 
1834
*/
 
1835
 
 
1836
int end_io_cache(IO_CACHE *info)
 
1837
{
 
1838
  int error=0;
 
1839
  IO_CACHE_CALLBACK pre_close;
 
1840
  DBUG_ENTER("end_io_cache");
 
1841
  DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info));
 
1842
 
 
1843
  /*
 
1844
    Every thread must call remove_io_thread(). The last one destroys
 
1845
    the share elements.
 
1846
  */
 
1847
  DBUG_ASSERT(!info->share || !info->share->total_threads);
 
1848
 
 
1849
  if ((pre_close=info->pre_close))
 
1850
  {
 
1851
    (*pre_close)(info);
 
1852
    info->pre_close= 0;
 
1853
  }
 
1854
  if (info->alloced_buffer)
 
1855
  {
 
1856
    info->alloced_buffer=0;
 
1857
    if (info->file != -1)                       /* File doesn't exist */
 
1858
      error= my_b_flush_io_cache(info,1);
 
1859
    my_free(info->buffer);
 
1860
    info->buffer=info->read_pos=(uchar*) 0;
 
1861
  }
 
1862
  if (info->type == SEQ_READ_APPEND)
 
1863
  {
 
1864
    /* Destroy allocated mutex */
 
1865
    info->type= TYPE_NOT_SET;
 
1866
    mysql_mutex_destroy(&info->append_buffer_lock);
 
1867
  }
 
1868
  DBUG_RETURN(error);
 
1869
} /* end_io_cache */
 
1870
 
 
1871
 
 
1872
/**********************************************************************
 
1873
 Testing of MF_IOCACHE
 
1874
**********************************************************************/
 
1875
 
 
1876
#ifdef MAIN
 
1877
 
 
1878
#include <my_dir.h>
 
1879
 
 
1880
void die(const char* fmt, ...)
 
1881
{
 
1882
  va_list va_args;
 
1883
  va_start(va_args,fmt);
 
1884
  fprintf(stderr,"Error:");
 
1885
  vfprintf(stderr, fmt,va_args);
 
1886
  fprintf(stderr,", errno=%d\n", errno);
 
1887
  exit(1);
 
1888
}
 
1889
 
 
1890
int open_file(const char* fname, IO_CACHE* info, int cache_size)
 
1891
{
 
1892
  int fd;
 
1893
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
 
1894
    die("Could not open %s", fname);
 
1895
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
 
1896
    die("failed in init_io_cache()");
 
1897
  return fd;
 
1898
}
 
1899
 
 
1900
void close_file(IO_CACHE* info)
 
1901
{
 
1902
  end_io_cache(info);
 
1903
  my_close(info->file, MYF(MY_WME));
 
1904
}
 
1905
 
 
1906
int main(int argc, char** argv)
 
1907
{
 
1908
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
 
1909
  MY_STAT status;
 
1910
  const char* fname="/tmp/iocache.test";
 
1911
  int cache_size=16384;
 
1912
  char llstr_buf[22];
 
1913
  int max_block,total_bytes=0;
 
1914
  int i,num_loops=100,error=0;
 
1915
  char *p;
 
1916
  char* block, *block_end;
 
1917
  MY_INIT(argv[0]);
 
1918
  max_block = cache_size*3;
 
1919
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
 
1920
    die("Not enough memory to allocate test block");
 
1921
  block_end = block + max_block;
 
1922
  for (p = block,i=0; p < block_end;i++)
 
1923
  {
 
1924
    *p++ = (char)i;
 
1925
  }
 
1926
  if (my_stat(fname,&status, MYF(0)) &&
 
1927
      my_delete(fname,MYF(MY_WME)))
 
1928
    {
 
1929
      die("Delete of %s failed, aborting", fname);
 
1930
    }
 
1931
  open_file(fname,&sra_cache, cache_size);
 
1932
  for (i = 0; i < num_loops; i++)
 
1933
  {
 
1934
    char buf[4];
 
1935
    int block_size = abs(rand() % max_block);
 
1936
    int4store(buf, block_size);
 
1937
    if (my_b_append(&sra_cache,buf,4) ||
 
1938
        my_b_append(&sra_cache, block, block_size))
 
1939
      die("write failed");
 
1940
    total_bytes += 4+block_size;
 
1941
  }
 
1942
  close_file(&sra_cache);
 
1943
  my_free(block);
 
1944
  if (!my_stat(fname,&status,MYF(MY_WME)))
 
1945
    die("%s failed to stat, but I had just closed it,\
 
1946
 wonder how that happened");
 
1947
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
 
1948
         llstr(status.st_size,llstr_buf),
 
1949
         total_bytes);
 
1950
  my_delete(fname, MYF(MY_WME));
 
1951
  /* check correctness of tests */
 
1952
  if (total_bytes != status.st_size)
 
1953
  {
 
1954
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
 
1955
supposedly written\n");
 
1956
    error=1;
 
1957
  }
 
1958
  exit(error);
 
1959
  return 0;
 
1960
}
 
1961
#endif