~ubuntu-branches/ubuntu/saucy/drizzle/saucy-proposed

« back to all changes in this revision

Viewing changes to plugin/archive/azio.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2011-03-15 10:41:18 UTC
  • mfrom: (1.2.10 upstream)
  • Revision ID: james.westby@ubuntu.com-20110315104118-eaf0hvlytjdl4zrf
Tags: 2011.03.13-0ubuntu1
* New upstream release.
* Added slave plugin.
* Removed archive, blackhole and blitzdb plugins.
* Moved location of libdrizzle headers.
* Removed drizzleadmin manpage patch.
* Add drizzle_safe_write_string to symbols.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
  azio is a modified version of gzio. It  makes use of mysys and removes mallocs.
3
 
    -Brian Aker
4
 
*/
5
 
 
6
 
/* gzio.c -- IO on .gz files
7
 
 * Copyright (C) 1995-2005 Jean-loup Gailly.
8
 
 * For conditions of distribution and use, see copyright notice in zlib.h
9
 
 *
10
 
 */
11
 
 
12
 
/* @(#) $Id$ */
13
 
 
14
 
#include <config.h>
15
 
 
16
 
#include "azio.h"
17
 
 
18
 
#include <fcntl.h>
19
 
#include <unistd.h>
20
 
 
21
 
#include <cstdio>
22
 
#include <cstring>
23
 
#include <cstdlib>
24
 
#include <cassert>
25
 
 
26
 
using namespace drizzled;
27
 
 
28
 
static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
29
 
 
30
 
static unsigned int azwrite(azio_stream *s, void *buf, unsigned int len);
31
 
static int azrewind (azio_stream *s);
32
 
static unsigned int azio_enable_aio(azio_stream *s);
33
 
static int do_flush(azio_stream *file, int flush);
34
 
static int    get_byte(azio_stream *s);
35
 
static void   check_header(azio_stream *s);
36
 
static int write_header(azio_stream *s);
37
 
static int    destroy(azio_stream *s);
38
 
static void putLong(azio_stream *s, uLong x);
39
 
static uLong  getLong(azio_stream *s);
40
 
static void read_header(azio_stream *s, unsigned char *buffer);
41
 
static void get_block(azio_stream *s);
42
 
#ifdef AZIO_AIO
43
 
static void do_aio_cleanup(azio_stream *s);
44
 
#endif
45
 
 
46
 
extern "C" pthread_handler_t run_task(void *p);
47
 
 
48
 
extern "C" pthread_handler_t run_task(void *p)
49
 
{
50
 
  int fd;
51
 
  char *buffer;
52
 
  size_t offset;
53
 
  azio_stream *s= (azio_stream *)p;
54
 
 
55
 
  while (1)
56
 
  {
57
 
    pthread_mutex_lock(&s->container.thresh_mutex);
58
 
    while (s->container.ready == AZ_THREAD_FINISHED)
59
 
    {
60
 
      pthread_cond_wait(&s->container.threshhold, &s->container.thresh_mutex);
61
 
    }
62
 
    offset= s->container.offset;
63
 
    fd= s->container.fd;
64
 
    buffer= (char *)s->container.buffer;
65
 
    pthread_mutex_unlock(&s->container.thresh_mutex);
66
 
 
67
 
    if (s->container.ready == AZ_THREAD_DEAD)
68
 
      break;
69
 
 
70
 
    s->container.read_size= pread((int)fd, (void *)buffer,
71
 
                                  (size_t)AZ_BUFSIZE_READ, (off_t)offset);
72
 
 
73
 
    pthread_mutex_lock(&s->container.thresh_mutex);
74
 
    s->container.ready= AZ_THREAD_FINISHED;
75
 
    pthread_mutex_unlock(&s->container.thresh_mutex);
76
 
  }
77
 
 
78
 
  return 0;
79
 
}
80
 
 
81
 
static void azio_kill(azio_stream *s)
82
 
{
83
 
  pthread_mutex_lock(&s->container.thresh_mutex);
84
 
  s->container.ready= AZ_THREAD_DEAD;
85
 
  pthread_mutex_unlock(&s->container.thresh_mutex);
86
 
 
87
 
  pthread_cond_signal(&s->container.threshhold);
88
 
  pthread_join(s->container.mainthread, NULL);
89
 
}
90
 
 
91
 
static size_t azio_return(azio_stream *s)
92
 
{
93
 
  return s->container.read_size;
94
 
}
95
 
 
96
 
/*
97
 
  Worried about spin?
98
 
  Don't be. In tests it never has spun more then 1 times.
99
 
*/
100
 
 
101
 
static az_thread_type azio_ready(azio_stream *s)
102
 
{
103
 
  az_thread_type temp;
104
 
 
105
 
  while (1)
106
 
  {
107
 
    pthread_mutex_lock(&s->container.thresh_mutex);
108
 
    temp= s->container.ready;
109
 
    pthread_mutex_unlock(&s->container.thresh_mutex);
110
 
 
111
 
    if (temp == AZ_THREAD_FINISHED || temp == AZ_THREAD_DEAD)
112
 
      break;
113
 
  }
114
 
 
115
 
  return temp;
116
 
}
117
 
 
118
 
static int azio_start(azio_stream *s)
119
 
{
120
 
  int rc= 0;
121
 
  pthread_attr_t attr;          /* Thread attributes */
122
 
 
123
 
  pthread_attr_init(&attr);
124
 
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
125
 
 
126
 
  s->container.ready= AZ_THREAD_FINISHED;
127
 
 
128
 
  /* If we don't create a thread, signal the caller */
129
 
  if (pthread_create(&s->container.mainthread, &attr, run_task,
130
 
                     (void *)s) != 0)
131
 
    rc= 1;
132
 
 
133
 
  pthread_attr_destroy(&attr);
134
 
 
135
 
  return rc;
136
 
}
137
 
 
138
 
static int azio_read(azio_stream *s)
139
 
{
140
 
  pthread_mutex_lock(&s->container.thresh_mutex);
141
 
  s->container.ready= AZ_THREAD_ACTIVE;
142
 
  pthread_mutex_unlock(&s->container.thresh_mutex);
143
 
  pthread_cond_broadcast(&s->container.threshhold);
144
 
 
145
 
  return 0;
146
 
}
147
 
 
148
 
/* ===========================================================================
149
 
  Opens a gzip (.gz) file for reading or writing. The mode parameter
150
 
  is as in fopen ("rb" or "wb"). The file is given either by file descriptor
151
 
  or path name (if fd == -1).
152
 
  az_open returns NULL if the file could not be opened or if there was
153
 
  insufficient memory to allocate the (de)compression state; errno
154
 
  can be checked to distinguish the two cases (if errno is zero, the
155
 
  zlib error is Z_MEM_ERROR).
156
 
*/
157
 
int azopen(azio_stream *s, const char *path, int Flags, az_method method)
158
 
{
159
 
  int err;
160
 
  int level = Z_DEFAULT_COMPRESSION ; /* compression level */
161
 
  int strategy = Z_DEFAULT_STRATEGY; /* compression strategy */
162
 
  int fd= -1;
163
 
 
164
 
  memset(s, 0, sizeof(azio_stream));
165
 
 
166
 
  s->stream.zalloc = (alloc_func)0;
167
 
  s->stream.zfree = (free_func)0;
168
 
  s->stream.opaque = (voidpf)0;
169
 
 
170
 
 
171
 
  s->container.offset= 0;
172
 
  s->container.buffer= (void *)s->buffer1;
173
 
  s->container.ready= AZ_THREAD_FINISHED;
174
 
 
175
 
  s->inbuf= s->buffer1;
176
 
  s->stream.next_in = s->inbuf;
177
 
  s->stream.next_out = s->outbuf;
178
 
  s->z_err = Z_OK;
179
 
  s->back = EOF;
180
 
  s->crc = crc32(0L, Z_NULL, 0);
181
 
  s->mode = 'r';
182
 
  s->version = (unsigned char)az_magic[1]; /* this needs to be a define to version */
183
 
  s->version = (unsigned char)az_magic[2]; /* minor version */
184
 
  s->method= method;
185
 
 
186
 
  /*
187
 
    We do our own version of append by nature.
188
 
    We must always have write access to take card of the header.
189
 
  */
190
 
  assert(Flags | O_APPEND);
191
 
  assert(Flags | O_WRONLY);
192
 
 
193
 
  if (Flags & O_RDWR)
194
 
    s->mode = 'w';
195
 
 
196
 
  if (s->mode == 'w')
197
 
  {
198
 
    err = deflateInit2(&(s->stream), level,
199
 
                       Z_DEFLATED, -MAX_WBITS, 8, strategy);
200
 
    /* windowBits is passed < 0 to suppress zlib header */
201
 
 
202
 
    s->stream.next_out = s->outbuf;
203
 
    if (err != Z_OK)
204
 
    {
205
 
      destroy(s);
206
 
      return Z_NULL;
207
 
    }
208
 
  } else {
209
 
    /* Threads are only used when we are running with azio */
210
 
    s->stream.next_in  = s->inbuf;
211
 
 
212
 
    err = inflateInit2(&(s->stream), -MAX_WBITS);
213
 
    /* windowBits is passed < 0 to tell that there is no zlib header.
214
 
     * Note that in this case inflate *requires* an extra "dummy" byte
215
 
     * after the compressed stream in order to complete decompression and
216
 
     * return Z_STREAM_END. Here the gzip CRC32 ensures that 4 bytes are
217
 
     * present after the compressed stream.
218
 
   */
219
 
    if (err != Z_OK)
220
 
    {
221
 
      destroy(s);
222
 
      return Z_NULL;
223
 
    }
224
 
  }
225
 
  s->stream.avail_out = AZ_BUFSIZE_WRITE;
226
 
 
227
 
  errno = 0;
228
 
  s->file = fd < 0 ? internal::my_open(path, Flags, MYF(0)) : fd;
229
 
#ifdef AZIO_AIO
230
 
  s->container.fd= s->file;
231
 
#endif
232
 
 
233
 
  if (s->file < 0 )
234
 
  {
235
 
    destroy(s);
236
 
    return Z_NULL;
237
 
  }
238
 
 
239
 
  if (Flags & O_CREAT || Flags & O_TRUNC)
240
 
  {
241
 
    s->rows= 0;
242
 
    s->forced_flushes= 0;
243
 
    s->shortest_row= 0;
244
 
    s->longest_row= 0;
245
 
    s->auto_increment= 0;
246
 
    s->check_point= 0;
247
 
    s->comment_start_pos= 0;
248
 
    s->comment_length= 0;
249
 
    s->frm_start_pos= 0;
250
 
    s->frm_length= 0;
251
 
    s->dirty= 1; /* We create the file dirty */
252
 
    s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
253
 
    if(write_header(s))
254
 
      return Z_NULL;
255
 
    s->pos= (size_t)lseek(s->file, 0, SEEK_END);
256
 
  }
257
 
  else if (s->mode == 'w')
258
 
  {
259
 
    unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
260
 
    const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
261
 
    if(pread(s->file, buffer, read_size, 0) < read_size)
262
 
      return Z_NULL;
263
 
    read_header(s, buffer);
264
 
    s->pos= (size_t)lseek(s->file, 0, SEEK_END);
265
 
  }
266
 
  else
267
 
  {
268
 
    check_header(s); /* skip the .az header */
269
 
  }
270
 
 
271
 
  switch (s->method)
272
 
  {
273
 
  case AZ_METHOD_AIO:
274
 
    azio_enable_aio(s);
275
 
    break;
276
 
  case AZ_METHOD_BLOCK:
277
 
  case AZ_METHOD_MAX:
278
 
    break;
279
 
  }
280
 
 
281
 
  return 1;
282
 
}
283
 
 
284
 
 
285
 
int write_header(azio_stream *s)
286
 
{
287
 
  char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
288
 
  char *ptr= buffer;
289
 
 
290
 
  s->block_size= AZ_BUFSIZE_WRITE;
291
 
  s->version = (unsigned char)az_magic[1];
292
 
  s->minor_version = (unsigned char)az_magic[2];
293
 
 
294
 
 
295
 
  /* Write a very simple .az header: */
296
 
  memset(buffer, 0, AZHEADER_SIZE + AZMETA_BUFFER_SIZE);
297
 
  *(ptr + AZ_MAGIC_POS)= az_magic[0];
298
 
  *(ptr + AZ_VERSION_POS)= (unsigned char)s->version;
299
 
  *(ptr + AZ_MINOR_VERSION_POS)= (unsigned char)s->minor_version;
300
 
  *(ptr + AZ_BLOCK_POS)= (unsigned char)(s->block_size/1024); /* Reserved for block size */
301
 
  *(ptr + AZ_STRATEGY_POS)= (unsigned char)Z_DEFAULT_STRATEGY; /* Compression Type */
302
 
 
303
 
  int4store(ptr + AZ_FRM_POS, s->frm_start_pos); /* FRM Block */
304
 
  int4store(ptr + AZ_FRM_LENGTH_POS, s->frm_length); /* FRM Block */
305
 
  int4store(ptr + AZ_COMMENT_POS, s->comment_start_pos); /* COMMENT Block */
306
 
  int4store(ptr + AZ_COMMENT_LENGTH_POS, s->comment_length); /* COMMENT Block */
307
 
  int4store(ptr + AZ_META_POS, 0); /* Meta Block */
308
 
  int4store(ptr + AZ_META_LENGTH_POS, 0); /* Meta Block */
309
 
  int8store(ptr + AZ_START_POS, (uint64_t)s->start); /* Start of Data Block Index Block */
310
 
  int8store(ptr + AZ_ROW_POS, (uint64_t)s->rows); /* Start of Data Block Index Block */
311
 
  int8store(ptr + AZ_FLUSH_POS, (uint64_t)s->forced_flushes); /* Start of Data Block Index Block */
312
 
  int8store(ptr + AZ_CHECK_POS, (uint64_t)s->check_point); /* Start of Data Block Index Block */
313
 
  int8store(ptr + AZ_AUTOINCREMENT_POS, (uint64_t)s->auto_increment); /* Start of Data Block Index Block */
314
 
  int4store(ptr+ AZ_LONGEST_POS , s->longest_row); /* Longest row */
315
 
  int4store(ptr+ AZ_SHORTEST_POS, s->shortest_row); /* Shorest row */
316
 
  int4store(ptr+ AZ_FRM_POS,
317
 
            AZHEADER_SIZE + AZMETA_BUFFER_SIZE); /* FRM position */
318
 
  *(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block */
319
 
 
320
 
  /* Always begin at the begining, and end there as well */
321
 
  const ssize_t write_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
322
 
  if(pwrite(s->file, (unsigned char*) buffer, write_size, 0)!=write_size)
323
 
    return -1;
324
 
 
325
 
  return 0;
326
 
}
327
 
 
328
 
/* ===========================================================================
329
 
  Read a byte from a azio_stream; update next_in and avail_in. Return EOF
330
 
  for end of file.
331
 
  IN assertion: the stream s has been sucessfully opened for reading.
332
 
*/
333
 
int get_byte(azio_stream *s)
334
 
{
335
 
  if (s->z_eof) return EOF;
336
 
  if (s->stream.avail_in == 0)
337
 
  {
338
 
    errno = 0;
339
 
    if (s->stream.avail_in == 0)
340
 
    {
341
 
      s->z_eof = 1;
342
 
      return EOF;
343
 
    }
344
 
    else if (s->stream.avail_in == (uInt) -1)
345
 
    {
346
 
      s->z_eof= 1;
347
 
      s->z_err= Z_ERRNO;
348
 
      return EOF;
349
 
    }
350
 
    s->stream.next_in = s->inbuf;
351
 
  }
352
 
  s->stream.avail_in--;
353
 
  return *(s->stream.next_in)++;
354
 
}
355
 
 
356
 
/* ===========================================================================
357
 
  Check the gzip header of a azio_stream opened for reading.
358
 
  IN assertion: the stream s has already been created sucessfully;
359
 
  s->stream.avail_in is zero for the first time, but may be non-zero
360
 
  for concatenated .gz files.
361
 
*/
362
 
void check_header(azio_stream *s)
363
 
{
364
 
  uInt len;
365
 
 
366
 
  /* Assure two bytes in the buffer so we can peek ahead -- handle case
367
 
    where first byte of header is at the end of the buffer after the last
368
 
    gzip segment */
369
 
  len = s->stream.avail_in;
370
 
  if (len < 2) {
371
 
    if (len) s->inbuf[0] = s->stream.next_in[0];
372
 
    errno = 0;
373
 
    len = (uInt)pread(s->file, (unsigned char *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
374
 
    s->pos+= len;
375
 
    if (len == (uInt)-1) s->z_err = Z_ERRNO;
376
 
    s->stream.avail_in += len;
377
 
    s->stream.next_in = s->inbuf;
378
 
  }
379
 
 
380
 
  /* Now we check the actual header */
381
 
  if ( s->stream.next_in[0] == az_magic[0]  && s->stream.next_in[1] == az_magic[1])
382
 
  {
383
 
    unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
384
 
 
385
 
    for (len = 0; len < (AZHEADER_SIZE + AZMETA_BUFFER_SIZE); len++)
386
 
      buffer[len]= get_byte(s);
387
 
    s->z_err = s->z_eof ? Z_DATA_ERROR : Z_OK;
388
 
    read_header(s, buffer);
389
 
    for (; len < s->start; len++)
390
 
      get_byte(s);
391
 
  }
392
 
  else
393
 
  {
394
 
    s->z_err = Z_OK;
395
 
 
396
 
    return;
397
 
  }
398
 
}
399
 
 
400
 
void read_header(azio_stream *s, unsigned char *buffer)
401
 
{
402
 
  if (buffer[0] == az_magic[0]  && buffer[1] == az_magic[1])
403
 
  {
404
 
    s->version= (unsigned int)buffer[AZ_VERSION_POS];
405
 
    s->minor_version= (unsigned int)buffer[AZ_MINOR_VERSION_POS];
406
 
    s->block_size= 1024 * buffer[AZ_BLOCK_POS];
407
 
    s->start= (size_t)uint8korr(buffer + AZ_START_POS);
408
 
    s->rows= (uint64_t)uint8korr(buffer + AZ_ROW_POS);
409
 
    s->check_point= (uint64_t)uint8korr(buffer + AZ_CHECK_POS);
410
 
    s->forced_flushes= (uint64_t)uint8korr(buffer + AZ_FLUSH_POS);
411
 
    s->auto_increment= (uint64_t)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
412
 
    s->longest_row= (unsigned int)uint4korr(buffer + AZ_LONGEST_POS);
413
 
    s->shortest_row= (unsigned int)uint4korr(buffer + AZ_SHORTEST_POS);
414
 
    s->frm_start_pos= (unsigned int)uint4korr(buffer + AZ_FRM_POS);
415
 
    s->frm_length= (unsigned int)uint4korr(buffer + AZ_FRM_LENGTH_POS);
416
 
    s->comment_start_pos= (unsigned int)uint4korr(buffer + AZ_COMMENT_POS);
417
 
    s->comment_length= (unsigned int)uint4korr(buffer + AZ_COMMENT_LENGTH_POS);
418
 
    s->dirty= (unsigned int)buffer[AZ_DIRTY_POS];
419
 
  }
420
 
  else
421
 
  {
422
 
    s->z_err = Z_OK;
423
 
    return;
424
 
  }
425
 
}
426
 
 
427
 
/* ===========================================================================
428
 
 * Cleanup then free the given azio_stream. Return a zlib error code.
429
 
 Try freeing in the reverse order of allocations.
430
 
 */
431
 
int destroy (azio_stream *s)
432
 
{
433
 
  int err = Z_OK;
434
 
 
435
 
  if (s->stream.state != NULL)
436
 
  {
437
 
    if (s->mode == 'w')
438
 
    {
439
 
      err = deflateEnd(&(s->stream));
440
 
      internal::my_sync(s->file, MYF(0));
441
 
    }
442
 
    else if (s->mode == 'r')
443
 
      err = inflateEnd(&(s->stream));
444
 
  }
445
 
 
446
 
  do_aio_cleanup(s);
447
 
 
448
 
  if (s->file > 0 && internal::my_close(s->file, MYF(0)))
449
 
      err = Z_ERRNO;
450
 
 
451
 
  s->file= -1;
452
 
 
453
 
  if (s->z_err < 0) err = s->z_err;
454
 
 
455
 
  return err;
456
 
}
457
 
 
458
 
/* ===========================================================================
459
 
  Reads the given number of uncompressed bytes from the compressed file.
460
 
  azread returns the number of bytes actually read (0 for end of file).
461
 
*/
462
 
/*
463
 
   This function is legacy, do not use.
464
 
 
465
 
     Reads the given number of uncompressed bytes from the compressed file.
466
 
   If the input file was not in gzip format, gzread copies the given number
467
 
   of bytes into the buffer.
468
 
     gzread returns the number of uncompressed bytes actually read (0 for
469
 
   end of file, -1 for error).
470
 
*/
471
 
static unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
472
 
{
473
 
  Bytef *start = (Bytef*)buf; /* starting point for crc computation */
474
 
  Byte  *next_out; /* == stream.next_out but not forced far (for MSDOS) */
475
 
  *error= 0;
476
 
 
477
 
  if (s->mode != 'r')
478
 
  {
479
 
    *error= Z_STREAM_ERROR;
480
 
    return 0;
481
 
  }
482
 
 
483
 
  if (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO)
484
 
  {
485
 
    *error= s->z_err;
486
 
    return 0;
487
 
  }
488
 
 
489
 
  if (s->z_err == Z_STREAM_END)  /* EOF */
490
 
  {
491
 
    return 0;
492
 
  }
493
 
 
494
 
  next_out = (Byte*)buf;
495
 
  s->stream.next_out = (Bytef*)buf;
496
 
  s->stream.avail_out = len;
497
 
 
498
 
  if (s->stream.avail_out && s->back != EOF) {
499
 
    *next_out++ = s->back;
500
 
    s->stream.next_out++;
501
 
    s->stream.avail_out--;
502
 
    s->back = EOF;
503
 
    s->out++;
504
 
    start++;
505
 
    if (s->last) {
506
 
      s->z_err = Z_STREAM_END;
507
 
      {
508
 
        return 1;
509
 
      }
510
 
    }
511
 
  }
512
 
 
513
 
  while (s->stream.avail_out != 0) {
514
 
 
515
 
    if (s->stream.avail_in == 0 && !s->z_eof) {
516
 
 
517
 
      errno = 0;
518
 
      get_block(s);
519
 
      if (s->stream.avail_in == 0)
520
 
      {
521
 
        s->z_eof = 1;
522
 
      }
523
 
      s->stream.next_in = (Bytef *)s->inbuf;
524
 
    }
525
 
    s->in += s->stream.avail_in;
526
 
    s->out += s->stream.avail_out;
527
 
    s->z_err = inflate(&(s->stream), Z_NO_FLUSH);
528
 
    s->in -= s->stream.avail_in;
529
 
    s->out -= s->stream.avail_out;
530
 
 
531
 
    if (s->z_err == Z_STREAM_END) {
532
 
      /* Check CRC and original size */
533
 
      s->crc = crc32(s->crc, start, (uInt)(s->stream.next_out - start));
534
 
      start = s->stream.next_out;
535
 
 
536
 
      if (getLong(s) != s->crc) {
537
 
        s->z_err = Z_DATA_ERROR;
538
 
      } else {
539
 
        (void)getLong(s);
540
 
        /* The uncompressed length returned by above getlong() may be
541
 
         * different from s->out in case of concatenated .gz files.
542
 
         * Check for such files:
543
 
       */
544
 
        check_header(s);
545
 
        if (s->z_err == Z_OK)
546
 
        {
547
 
          inflateReset(&(s->stream));
548
 
          s->crc = crc32(0L, Z_NULL, 0);
549
 
        }
550
 
      }
551
 
    }
552
 
    if (s->z_err != Z_OK || s->z_eof) break;
553
 
  }
554
 
  s->crc = crc32(s->crc, start, (uInt)(s->stream.next_out - start));
555
 
 
556
 
  if (len == s->stream.avail_out &&
557
 
      (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO))
558
 
  {
559
 
    *error= s->z_err;
560
 
 
561
 
    return 0;
562
 
  }
563
 
 
564
 
  return (len - s->stream.avail_out);
565
 
}
566
 
 
567
 
/* ===========================================================================
568
 
  Experimental Interface. We abstract out a concecpt of rows
569
 
*/
570
 
size_t azwrite_row(azio_stream *s, void *buf, unsigned int len)
571
 
{
572
 
  size_t length;
573
 
  /* First we write length */
574
 
  length= azwrite(s, &len, sizeof(unsigned int));
575
 
 
576
 
  if (length != sizeof(unsigned int))
577
 
    return length;
578
 
 
579
 
  /* Now we write the actual data */
580
 
  length= (size_t)azwrite(s, buf, len);
581
 
 
582
 
  if (length > 0)
583
 
    s->rows++;
584
 
 
585
 
  if (len > s->longest_row)
586
 
    s->longest_row= len;
587
 
 
588
 
  if (len < s->shortest_row || !(s->shortest_row))
589
 
    s->shortest_row= len;
590
 
 
591
 
  return length;
592
 
}
593
 
 
594
 
size_t azread_row(azio_stream *s, int *error)
595
 
{
596
 
  unsigned int row_length; /* Currently we are limited to this size for rows */
597
 
  char buffer[sizeof(unsigned int)];
598
 
  char *new_ptr;
599
 
  size_t read;
600
 
 
601
 
  read= azread_internal(s, buffer, sizeof(unsigned int), error);
602
 
 
603
 
  /* On error the return value will be zero as well */
604
 
  if (read == 0)
605
 
    return read;
606
 
  memcpy(&row_length, buffer, sizeof(unsigned int));
607
 
 
608
 
  new_ptr= (char *)realloc(s->row_ptr, (sizeof(char) * row_length));
609
 
 
610
 
  if (!new_ptr)
611
 
    return SIZE_MAX;
612
 
 
613
 
  s->row_ptr= new_ptr;
614
 
 
615
 
  /* TODO We should now adjust the length... */
616
 
  read= azread_internal(s, (voidp)s->row_ptr, row_length, error);
617
 
 
618
 
  return read;
619
 
}
620
 
 
621
 
 
622
 
/* ===========================================================================
623
 
  Writes the given number of uncompressed bytes into the compressed file.
624
 
  azwrite returns the number of bytes actually written (0 in case of error).
625
 
*/
626
 
static unsigned int azwrite(azio_stream *s, void *buf, unsigned int len)
627
 
{
628
 
  s->stream.next_in = (Bytef*)buf;
629
 
  s->stream.avail_in = len;
630
 
 
631
 
  while (s->stream.avail_in != 0)
632
 
  {
633
 
    if (s->stream.avail_out == 0)
634
 
    {
635
 
 
636
 
      s->stream.next_out = s->outbuf;
637
 
      if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
638
 
      {
639
 
        s->z_err = Z_ERRNO;
640
 
        break;
641
 
      }
642
 
      s->pos+= AZ_BUFSIZE_WRITE;
643
 
      s->stream.avail_out = AZ_BUFSIZE_WRITE;
644
 
    }
645
 
    s->in += s->stream.avail_in;
646
 
    s->out += s->stream.avail_out;
647
 
    s->z_err = deflate(&(s->stream), Z_NO_FLUSH);
648
 
    s->in -= s->stream.avail_in;
649
 
    s->out -= s->stream.avail_out;
650
 
    if (s->z_err != Z_OK) break;
651
 
  }
652
 
  s->crc = crc32(s->crc, (const Bytef *)buf, len);
653
 
 
654
 
  return (unsigned int)(len - s->stream.avail_in);
655
 
}
656
 
 
657
 
 
658
 
/* ===========================================================================
659
 
  Flushes all pending output into the compressed file. The parameter
660
 
  flush is as in the deflate() function.
661
 
*/
662
 
int do_flush (azio_stream *s, int flush)
663
 
{
664
 
  uInt len;
665
 
  int done = 0;
666
 
  size_t afterwrite_pos;
667
 
 
668
 
  if (s == NULL || s->mode != 'w') return Z_STREAM_ERROR;
669
 
 
670
 
  s->stream.avail_in = 0; /* should be zero already anyway */
671
 
 
672
 
  for (;;)
673
 
  {
674
 
    len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
675
 
 
676
 
    if (len != 0)
677
 
    {
678
 
      if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len)
679
 
      {
680
 
        s->z_err = Z_ERRNO;
681
 
        assert(0);
682
 
        return Z_ERRNO;
683
 
      }
684
 
      s->pos+= len;
685
 
      s->check_point= s->pos;
686
 
      s->stream.next_out = s->outbuf;
687
 
      s->stream.avail_out = AZ_BUFSIZE_WRITE;
688
 
    }
689
 
    if (done) break;
690
 
    s->out += s->stream.avail_out;
691
 
    s->z_err = deflate(&(s->stream), flush);
692
 
    s->out -= s->stream.avail_out;
693
 
 
694
 
    /* Ignore the second of two consecutive flushes: */
695
 
    if (len == 0 && s->z_err == Z_BUF_ERROR) s->z_err = Z_OK;
696
 
 
697
 
    /* deflate has finished flushing only when it hasn't used up
698
 
     * all the available space in the output buffer:
699
 
   */
700
 
    done = (s->stream.avail_out != 0 || s->z_err == Z_STREAM_END);
701
 
 
702
 
    if (s->z_err != Z_OK && s->z_err != Z_STREAM_END) break;
703
 
  }
704
 
 
705
 
  if (flush == Z_FINISH)
706
 
    s->dirty= AZ_STATE_CLEAN; /* Mark it clean, we should be good now */
707
 
  else
708
 
    s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */
709
 
 
710
 
  afterwrite_pos= (size_t)lseek(s->file, 0, SEEK_CUR);
711
 
  if(write_header(s))
712
 
    return Z_ERRNO;
713
 
 
714
 
  return  s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
715
 
}
716
 
 
717
 
static unsigned int azio_enable_aio(azio_stream *s)
718
 
{
719
 
  pthread_cond_init(&s->container.threshhold, NULL);
720
 
  pthread_mutex_init(&s->container.thresh_mutex, NULL);
721
 
  azio_start(s);
722
 
 
723
 
  return 0;
724
 
}
725
 
 
726
 
static void azio_disable_aio(azio_stream *s)
727
 
{
728
 
  azio_kill(s);
729
 
 
730
 
  pthread_mutex_destroy(&s->container.thresh_mutex);
731
 
  pthread_cond_destroy(&s->container.threshhold);
732
 
 
733
 
  s->method= AZ_METHOD_BLOCK;
734
 
}
735
 
 
736
 
int ZEXPORT azflush (azio_stream *s,int flush)
737
 
{
738
 
  int err;
739
 
 
740
 
  if (s->mode == 'r')
741
 
  {
742
 
    unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
743
 
    const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
744
 
    if(pread(s->file, (unsigned char*) buffer, read_size, 0)!=read_size)
745
 
      return Z_ERRNO;
746
 
    read_header(s, buffer); /* skip the .az header */
747
 
    azrewind(s);
748
 
 
749
 
    return Z_OK;
750
 
  }
751
 
  else
752
 
  {
753
 
    s->forced_flushes++;
754
 
    err= do_flush(s, flush);
755
 
 
756
 
    if (err) return err;
757
 
    internal::my_sync(s->file, MYF(0));
758
 
    return  s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
759
 
  }
760
 
}
761
 
 
762
 
/* ===========================================================================
763
 
  Initiazliaze for reading
764
 
*/
765
 
int azread_init(azio_stream *s)
766
 
{
767
 
  int returnable;
768
 
 
769
 
  /* This will reset any aio reads */
770
 
  returnable= azrewind(s);
771
 
 
772
 
  if (returnable == -1)
773
 
    return returnable;
774
 
 
775
 
  /* Put one in the chamber */
776
 
  if (s->method != AZ_METHOD_BLOCK)
777
 
  {
778
 
    do_aio_cleanup(s);
779
 
    s->container.offset= s->pos;
780
 
    s->container.buffer= (unsigned char *)s->buffer1;
781
 
    azio_read(s);
782
 
    s->aio_inited= 1;
783
 
  }
784
 
 
785
 
 
786
 
  return returnable;
787
 
}
788
 
 
789
 
/* ===========================================================================
790
 
  Rewinds input file.
791
 
*/
792
 
int azrewind (azio_stream *s)
793
 
{
794
 
  if (s == NULL || s->mode != 'r') return -1;
795
 
 
796
 
#ifdef AZIO_AIO
797
 
  do_aio_cleanup(s);
798
 
#endif
799
 
  s->z_err = Z_OK;
800
 
  s->z_eof = 0;
801
 
  s->back = EOF;
802
 
  s->stream.avail_in = 0;
803
 
  s->stream.next_in = (Bytef *)s->inbuf;
804
 
  s->crc = crc32(0L, Z_NULL, 0);
805
 
  (void)inflateReset(&s->stream);
806
 
  s->in = 0;
807
 
  s->out = 0;
808
 
  s->aio_inited= 0; /* Reset the AIO reader */
809
 
  s->pos= s->start;
810
 
  return 0;
811
 
}
812
 
 
813
 
/* ===========================================================================
814
 
  Sets the starting position for the next azread or azwrite on the given
815
 
  compressed file. The offset represents a number of bytes in the
816
 
  azseek returns the resulting offset location as measured in bytes from
817
 
  the beginning of the uncompressed stream, or -1 in case of error.
818
 
  SEEK_END is not implemented, returns error.
819
 
  In this version of the library, azseek can be extremely slow.
820
 
*/
821
 
size_t azseek (azio_stream *s, size_t offset, int whence)
822
 
{
823
 
 
824
 
  if (s == NULL || whence == SEEK_END ||
825
 
      s->z_err == Z_ERRNO || s->z_err == Z_DATA_ERROR) {
826
 
    return SIZE_MAX;
827
 
  }
828
 
 
829
 
  if (s->mode == 'w')
830
 
  {
831
 
    if (whence == SEEK_SET)
832
 
      offset -= s->in;
833
 
 
834
 
    /* At this point, offset is the number of zero bytes to write. */
835
 
    /* There was a zmemzero here if inbuf was null -Brian */
836
 
    while (offset > 0)
837
 
    {
838
 
      uInt size = AZ_BUFSIZE_READ;
839
 
      if (offset < AZ_BUFSIZE_READ) size = (uInt)offset;
840
 
 
841
 
      size = azwrite(s, s->inbuf, size);
842
 
      if (size == 0)
843
 
        return SIZE_MAX;
844
 
 
845
 
      offset -= size;
846
 
    }
847
 
    return s->in;
848
 
  }
849
 
  /* Rest of function is for reading only */
850
 
 
851
 
  /* compute absolute position */
852
 
  if (whence == SEEK_CUR) {
853
 
    offset += s->out;
854
 
  }
855
 
 
856
 
  /* For a negative seek, rewind and use positive seek */
857
 
  if (offset >= s->out) {
858
 
    offset -= s->out;
859
 
  } else if (azrewind(s)) {
860
 
    return SIZE_MAX;
861
 
  }
862
 
  /* offset is now the number of bytes to skip. */
863
 
 
864
 
  if (offset && s->back != EOF) {
865
 
    s->back = EOF;
866
 
    s->out++;
867
 
    offset--;
868
 
    if (s->last) s->z_err = Z_STREAM_END;
869
 
  }
870
 
  while (offset > 0)  {
871
 
    int error;
872
 
    unsigned int size = AZ_BUFSIZE_WRITE;
873
 
    if (offset < AZ_BUFSIZE_WRITE) size = (int)offset;
874
 
 
875
 
    size = azread_internal(s, s->outbuf, size, &error);
876
 
    if (error < 0) return SIZE_MAX;
877
 
    offset -= size;
878
 
  }
879
 
  return s->out;
880
 
}
881
 
 
882
 
/* ===========================================================================
883
 
  Returns the starting position for the next azread or azwrite on the
884
 
  given compressed file. This position represents a number of bytes in the
885
 
  uncompressed data stream.
886
 
*/
887
 
size_t ZEXPORT aztell (azio_stream *file)
888
 
{
889
 
  return azseek(file, 0L, SEEK_CUR);
890
 
}
891
 
 
892
 
 
893
 
/* ===========================================================================
894
 
  Outputs a long in LSB order to the given file
895
 
*/
896
 
void putLong (azio_stream *s, uLong x)
897
 
{
898
 
  int n;
899
 
  unsigned char buffer[1];
900
 
 
901
 
  for (n = 0; n < 4; n++)
902
 
  {
903
 
    buffer[0]= (int)(x & 0xff);
904
 
    size_t ret= pwrite(s->file, buffer, 1, s->pos);
905
 
    assert(ret == 1);
906
 
    s->pos++;
907
 
    x >>= 8;
908
 
  }
909
 
}
910
 
 
911
 
/* ===========================================================================
912
 
  Reads a long in LSB order from the given azio_stream. Sets z_err in case
913
 
  of error.
914
 
*/
915
 
uLong getLong (azio_stream *s)
916
 
{
917
 
  uLong x = (uLong)get_byte(s);
918
 
  int c;
919
 
 
920
 
  x += ((uLong)get_byte(s))<<8;
921
 
  x += ((uLong)get_byte(s))<<16;
922
 
  c = get_byte(s);
923
 
  if (c == EOF) s->z_err = Z_DATA_ERROR;
924
 
  x += ((uLong)c)<<24;
925
 
  return x;
926
 
}
927
 
 
928
 
/* ===========================================================================
929
 
  Flushes all pending output if necessary, closes the compressed file
930
 
  and deallocates all the (de)compression state.
931
 
*/
932
 
int azclose (azio_stream *s)
933
 
{
934
 
  int returnable;
935
 
 
936
 
  if (s == NULL) return Z_STREAM_ERROR;
937
 
 
938
 
  if (s->file < 1) return Z_OK;
939
 
 
940
 
  if (s->mode == 'w')
941
 
  {
942
 
    if (do_flush(s, Z_FINISH) != Z_OK)
943
 
      return destroy(s);
944
 
 
945
 
    putLong(s, s->crc);
946
 
    putLong(s, (uLong)(s->in & 0xffffffff));
947
 
    s->dirty= AZ_STATE_CLEAN;
948
 
    write_header(s);
949
 
  }
950
 
 
951
 
  returnable= destroy(s);
952
 
 
953
 
  switch (s->method)
954
 
  {
955
 
  case AZ_METHOD_AIO:
956
 
    azio_disable_aio(s);
957
 
    break;
958
 
  case AZ_METHOD_BLOCK:
959
 
  case AZ_METHOD_MAX:
960
 
    break;
961
 
  }
962
 
 
963
 
  /* If we allocated memory for row reading, now free it */
964
 
  if (s->row_ptr)
965
 
    free(s->row_ptr);
966
 
 
967
 
  return returnable;
968
 
}
969
 
 
970
 
/*
971
 
  Though this was added to support MySQL's FRM file, anything can be
972
 
  stored in this location.
973
 
*/
974
 
int azwrite_frm(azio_stream *s, const char *blob, unsigned int length)
975
 
{
976
 
  if (s->mode == 'r')
977
 
    return 1;
978
 
 
979
 
  if (s->rows > 0)
980
 
    return 1;
981
 
 
982
 
  s->frm_start_pos= (uint) s->start;
983
 
  s->frm_length= length;
984
 
  s->start+= length;
985
 
 
986
 
  if (pwrite(s->file, (unsigned char*) blob, s->frm_length, s->frm_start_pos) != (ssize_t)s->frm_length)
987
 
    return 1;
988
 
 
989
 
  write_header(s);
990
 
  s->pos= (size_t)lseek(s->file, 0, SEEK_END);
991
 
 
992
 
  return 0;
993
 
}
994
 
 
995
 
int azread_frm(azio_stream *s, char *blob)
996
 
{
997
 
  ssize_t r= pread(s->file, (unsigned char*) blob,
998
 
                   s->frm_length, s->frm_start_pos);
999
 
  if (r != (ssize_t)s->frm_length)
1000
 
    return r;
1001
 
 
1002
 
  return 0;
1003
 
}
1004
 
 
1005
 
 
1006
 
/*
1007
 
  Simple comment field
1008
 
*/
1009
 
int azwrite_comment(azio_stream *s, const char *blob, unsigned int length)
1010
 
{
1011
 
  if (s->mode == 'r')
1012
 
    return -1;
1013
 
 
1014
 
  if (s->rows > 0)
1015
 
    return -1;
1016
 
 
1017
 
  s->comment_start_pos= (uint) s->start;
1018
 
  s->comment_length= length;
1019
 
  s->start+= length;
1020
 
 
1021
 
  ssize_t r= pwrite(s->file, (unsigned char*) blob,
1022
 
                    s->comment_length, s->comment_start_pos);
1023
 
  if (r != (ssize_t)s->comment_length)
1024
 
    return -1;
1025
 
 
1026
 
  write_header(s);
1027
 
  s->pos= (size_t)lseek(s->file, 0, SEEK_END);
1028
 
 
1029
 
  return 0;
1030
 
}
1031
 
 
1032
 
int azread_comment(azio_stream *s, char *blob)
1033
 
{
1034
 
  ssize_t r= pread(s->file, (unsigned char*) blob,
1035
 
                   s->comment_length, s->comment_start_pos);
1036
 
  if (r != (ssize_t)s->comment_length)
1037
 
    return r;
1038
 
 
1039
 
  return 0;
1040
 
}
1041
 
 
1042
 
#ifdef AZIO_AIO
1043
 
static void do_aio_cleanup(azio_stream *s)
1044
 
{
1045
 
  if (s->method == AZ_METHOD_BLOCK)
1046
 
    return;
1047
 
 
1048
 
  azio_ready(s);
1049
 
 
1050
 
}
1051
 
#endif
1052
 
 
1053
 
/*
1054
 
  Normally all IO goes through azio_read(), but in case of error or non-support
1055
 
  we make use of pread().
1056
 
*/
1057
 
static void get_block(azio_stream *s)
1058
 
{
1059
 
#ifdef AZIO_AIO
1060
 
  if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1061
 
      && s->pos < s->check_point
1062
 
      && s->aio_inited)
1063
 
  {
1064
 
    azio_ready(s);
1065
 
    s->stream.avail_in= (unsigned int)azio_return(s);
1066
 
    if ((int)(s->stream.avail_in) == -1)
1067
 
      goto use_pread;
1068
 
    else if ((int)(s->stream.avail_in) == 0)
1069
 
    {
1070
 
      s->aio_inited= 0;
1071
 
      return;
1072
 
    }
1073
 
    s->pos+= s->stream.avail_in;
1074
 
    s->inbuf= (Byte *)s->container.buffer;
1075
 
    /* We only azio_read when we know there is more data to be read */
1076
 
    if (s->pos >= s->check_point)
1077
 
    {
1078
 
      s->aio_inited= 0;
1079
 
      return;
1080
 
    }
1081
 
    s->container.buffer= (s->container.buffer == s->buffer2) ? s->buffer1 : s->buffer2;
1082
 
    s->container.offset= s->pos;
1083
 
    azio_read(s);
1084
 
  }
1085
 
  else
1086
 
#endif
1087
 
  {
1088
 
#ifdef AZIO_AIO
1089
 
use_pread:
1090
 
#endif
1091
 
    s->stream.avail_in = (uInt)pread(s->file, (unsigned char *)s->inbuf,
1092
 
                                     AZ_BUFSIZE_READ, s->pos);
1093
 
    s->pos+= s->stream.avail_in;
1094
 
  }
1095
 
}