~jaypipes/drizzle/new-test-runner

« back to all changes in this revision

Viewing changes to storage/archive/azio.cc

  • Committer: Jay Pipes
  • Date: 2008-12-11 17:52:34 UTC
  • mfrom: (482.16.152 testable)
  • Revision ID: jpipes@serialcoder-20081211175234-uqsfvmgxejvmellq
merge with trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
43
43
  int fd;
44
44
  char *buffer;
45
45
  size_t offset;
46
 
  azio_stream *s= (azio_stream *)p;  
 
46
  azio_stream *s= (azio_stream *)p;
47
47
 
48
48
  my_thread_init();
49
49
 
78
78
static void azio_kill(azio_stream *s)
79
79
{
80
80
  pthread_mutex_lock(&s->container.thresh_mutex);
81
 
  s->container.ready= AZ_THREAD_DEAD; 
 
81
  s->container.ready= AZ_THREAD_DEAD;
82
82
  pthread_mutex_unlock(&s->container.thresh_mutex);
83
83
 
84
84
  pthread_cond_signal(&s->container.threshhold);
120
120
  pthread_attr_init(&attr);
121
121
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
122
122
 
123
 
  s->container.ready= AZ_THREAD_FINISHED; 
 
123
  s->container.ready= AZ_THREAD_FINISHED;
124
124
 
125
125
  /* If we don't create a thread, signal the caller */
126
126
  if (pthread_create(&s->container.mainthread, &attr, run_task,
135
135
static int azio_read(azio_stream *s)
136
136
{
137
137
  pthread_mutex_lock(&s->container.thresh_mutex);
138
 
  s->container.ready= AZ_THREAD_ACTIVE; 
 
138
  s->container.ready= AZ_THREAD_ACTIVE;
139
139
  pthread_mutex_unlock(&s->container.thresh_mutex);
140
140
  pthread_cond_broadcast(&s->container.threshhold);
141
141
 
181
181
  s->method= method;
182
182
 
183
183
  /*
184
 
    We do our own version of append by nature. 
 
184
    We do our own version of append by nature.
185
185
    We must always have write access to take card of the header.
186
186
  */
187
187
  assert(Flags | O_APPEND);
188
188
  assert(Flags | O_WRONLY);
189
189
 
190
 
  if (Flags & O_RDWR) 
 
190
  if (Flags & O_RDWR)
191
191
    s->mode = 'w';
192
192
 
193
 
  if (s->mode == 'w') 
 
193
  if (s->mode == 'w')
194
194
  {
195
195
    err = deflateInit2(&(s->stream), level,
196
196
                       Z_DEFLATED, -MAX_WBITS, 8, strategy);
227
227
  s->container.fd= s->file;
228
228
#endif
229
229
 
230
 
  if (s->file < 0 ) 
 
230
  if (s->file < 0 )
231
231
  {
232
232
    destroy(s);
233
233
    return Z_NULL;
234
234
  }
235
235
 
236
 
  if (Flags & O_CREAT || Flags & O_TRUNC) 
 
236
  if (Flags & O_CREAT || Flags & O_TRUNC)
237
237
  {
238
238
    s->rows= 0;
239
239
    s->forced_flushes= 0;
249
249
    s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
250
250
    if(write_header(s))
251
251
      return Z_NULL;
252
 
    s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
 
252
    s->pos= (size_t)lseek(s->file, 0, SEEK_END);
253
253
  }
254
 
  else if (s->mode == 'w') 
 
254
  else if (s->mode == 'w')
255
255
  {
256
256
    unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
257
257
    const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
258
258
    if(pread(s->file, buffer, read_size, 0) < read_size)
259
259
      return Z_NULL;
260
260
    read_header(s, buffer);
261
 
    s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
 
261
    s->pos= (size_t)lseek(s->file, 0, SEEK_END);
262
262
  }
263
263
  else
264
264
  {
310
310
  int8store(ptr + AZ_AUTOINCREMENT_POS, (uint64_t)s->auto_increment); /* Start of Data Block Index Block */
311
311
  int4store(ptr+ AZ_LONGEST_POS , s->longest_row); /* Longest row */
312
312
  int4store(ptr+ AZ_SHORTEST_POS, s->shortest_row); /* Shorest row */
313
 
  int4store(ptr+ AZ_FRM_POS, 
 
313
  int4store(ptr+ AZ_FRM_POS,
314
314
            AZHEADER_SIZE + AZMETA_BUFFER_SIZE); /* FRM position */
315
315
  *(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block */
316
316
 
330
330
int get_byte(azio_stream *s)
331
331
{
332
332
  if (s->z_eof) return EOF;
333
 
  if (s->stream.avail_in == 0) 
 
333
  if (s->stream.avail_in == 0)
334
334
  {
335
335
    errno = 0;
336
 
    if (s->stream.avail_in == 0) 
 
336
    if (s->stream.avail_in == 0)
337
337
    {
338
338
      s->z_eof = 1;
339
339
      return EOF;
429
429
{
430
430
  int err = Z_OK;
431
431
 
432
 
  if (s->stream.state != NULL) 
 
432
  if (s->stream.state != NULL)
433
433
  {
434
 
    if (s->mode == 'w') 
 
434
    if (s->mode == 'w')
435
435
    {
436
436
      err = deflateEnd(&(s->stream));
437
437
      my_sync(s->file, MYF(0));
438
438
    }
439
 
    else if (s->mode == 'r') 
 
439
    else if (s->mode == 'r')
440
440
      err = inflateEnd(&(s->stream));
441
441
  }
442
442
 
443
443
  do_aio_cleanup(s);
444
444
 
445
 
  if (s->file > 0 && my_close(s->file, MYF(0))) 
 
445
  if (s->file > 0 && my_close(s->file, MYF(0)))
446
446
      err = Z_ERRNO;
447
447
 
448
448
  s->file= -1;
463
463
  *error= 0;
464
464
 
465
465
  if (s->mode != 'r')
466
 
  { 
 
466
  {
467
467
    *error= Z_STREAM_ERROR;
468
468
    return 0;
469
469
  }
470
470
 
471
471
  if (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO)
472
 
  { 
 
472
  {
473
473
    *error= s->z_err;
474
474
    return 0;
475
475
  }
476
476
 
477
477
  if (s->z_err == Z_STREAM_END)  /* EOF */
478
 
  { 
 
478
  {
479
479
    return 0;
480
480
  }
481
481
 
492
492
    start++;
493
493
    if (s->last) {
494
494
      s->z_err = Z_STREAM_END;
495
 
      { 
 
495
      {
496
496
        return 1;
497
497
      }
498
498
    }
504
504
 
505
505
      errno = 0;
506
506
      get_block(s);
507
 
      if (s->stream.avail_in == 0) 
 
507
      if (s->stream.avail_in == 0)
508
508
      {
509
509
        s->z_eof = 1;
510
510
      }
530
530
         * Check for such files:
531
531
       */
532
532
        check_header(s);
533
 
        if (s->z_err == Z_OK) 
 
533
        if (s->z_err == Z_OK)
534
534
        {
535
535
          inflateReset(&(s->stream));
536
536
          s->crc = crc32(0L, Z_NULL, 0);
553
553
}
554
554
 
555
555
/* ===========================================================================
556
 
  Experimental Interface. We abstract out a concecpt of rows 
 
556
  Experimental Interface. We abstract out a concecpt of rows
557
557
*/
558
558
size_t azwrite_row(azio_stream *s, void *buf, unsigned int len)
559
559
{
587
587
  size_t read;
588
588
 
589
589
  read= azread_internal(s, buffer, sizeof(unsigned int), error);
590
 
  
 
590
 
591
591
  /* On error the return value will be zero as well */
592
592
  if (read == 0)
593
593
    return read;
616
616
  s->stream.next_in = (Bytef*)buf;
617
617
  s->stream.avail_in = len;
618
618
 
619
 
  while (s->stream.avail_in != 0) 
 
619
  while (s->stream.avail_in != 0)
620
620
  {
621
 
    if (s->stream.avail_out == 0) 
 
621
    if (s->stream.avail_out == 0)
622
622
    {
623
623
 
624
624
      s->stream.next_out = s->outbuf;
625
 
      if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE) 
 
625
      if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
626
626
      {
627
627
        s->z_err = Z_ERRNO;
628
628
        break;
657
657
 
658
658
  s->stream.avail_in = 0; /* should be zero already anyway */
659
659
 
660
 
  for (;;) 
 
660
  for (;;)
661
661
  {
662
662
    len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
663
663
 
664
 
    if (len != 0) 
 
664
    if (len != 0)
665
665
    {
666
 
      if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len) 
 
666
      if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len)
667
667
      {
668
668
        s->z_err = Z_ERRNO;
669
669
        assert(0);
695
695
  else
696
696
    s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */
697
697
 
698
 
  afterwrite_pos= (size_t)my_tell(s->file);
 
698
  afterwrite_pos= (size_t)lseek(s->file, 0, SEEK_CUR);
699
699
  if(write_header(s))
700
700
    return Z_ERRNO;
701
701
 
725
725
{
726
726
  int err;
727
727
 
728
 
  if (s->mode == 'r') 
 
728
  if (s->mode == 'r')
729
729
  {
730
730
    unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
731
731
    const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
814
814
    return SIZE_MAX;
815
815
  }
816
816
 
817
 
  if (s->mode == 'w') 
 
817
  if (s->mode == 'w')
818
818
  {
819
 
    if (whence == SEEK_SET) 
 
819
    if (whence == SEEK_SET)
820
820
      offset -= s->in;
821
821
 
822
822
    /* At this point, offset is the number of zero bytes to write. */
823
823
    /* There was a zmemzero here if inbuf was null -Brian */
824
 
    while (offset > 0)  
 
824
    while (offset > 0)
825
825
    {
826
826
      uInt size = AZ_BUFSIZE_READ;
827
827
      if (offset < AZ_BUFSIZE_READ) size = (uInt)offset;
886
886
  int n;
887
887
  unsigned char buffer[1];
888
888
 
889
 
  for (n = 0; n < 4; n++) 
 
889
  for (n = 0; n < 4; n++)
890
890
  {
891
891
    buffer[0]= (int)(x & 0xff);
892
892
    assert(pwrite(s->file, buffer, 1, s->pos)==1);
921
921
  int returnable;
922
922
 
923
923
  if (s == NULL) return Z_STREAM_ERROR;
924
 
  
 
924
 
925
925
  if (s->file < 1) return Z_OK;
926
926
 
927
 
  if (s->mode == 'w') 
 
927
  if (s->mode == 'w')
928
928
  {
929
929
    if (do_flush(s, Z_FINISH) != Z_OK)
930
930
      return destroy(s);
955
955
}
956
956
 
957
957
/*
958
 
  Though this was added to support MySQL's FRM file, anything can be 
 
958
  Though this was added to support MySQL's FRM file, anything can be
959
959
  stored in this location.
960
960
*/
961
961
int azwrite_frm(azio_stream *s, char *blob, unsigned int length)
962
962
{
963
 
  if (s->mode == 'r') 
 
963
  if (s->mode == 'r')
964
964
    return 1;
965
965
 
966
 
  if (s->rows > 0) 
 
966
  if (s->rows > 0)
967
967
    return 1;
968
968
 
969
969
  s->frm_start_pos= (uint) s->start;
974
974
    return 1;
975
975
 
976
976
  write_header(s);
977
 
  s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
 
977
  s->pos= (size_t)lseek(s->file, 0, SEEK_END);
978
978
 
979
979
  return 0;
980
980
}
995
995
*/
996
996
int azwrite_comment(azio_stream *s, char *blob, unsigned int length)
997
997
{
998
 
  if (s->mode == 'r') 
 
998
  if (s->mode == 'r')
999
999
    return 1;
1000
1000
 
1001
 
  if (s->rows > 0) 
 
1001
  if (s->rows > 0)
1002
1002
    return 1;
1003
1003
 
1004
1004
  s->comment_start_pos= (uint) s->start;
1011
1011
    return r;
1012
1012
 
1013
1013
  write_header(s);
1014
 
  s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
 
1014
  s->pos= (size_t)lseek(s->file, 0, SEEK_END);
1015
1015
 
1016
1016
  return 0;
1017
1017
}
1037
1037
}
1038
1038
#endif
1039
1039
 
1040
 
/* 
 
1040
/*
1041
1041
  Normally all IO goes through azio_read(), but in case of error or non-support
1042
1042
  we make use of pread().
1043
1043
*/
1044
1044
static void get_block(azio_stream *s)
1045
1045
{
1046
1046
#ifdef AZIO_AIO
1047
 
  if (s->method == AZ_METHOD_AIO && s->mode == 'r' 
 
1047
  if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1048
1048
      && s->pos < s->check_point
1049
1049
      && s->aio_inited)
1050
1050
  {