~skinny.moey/drizzle/branch-rev

« back to all changes in this revision

Viewing changes to storage/csv/ha_tina.cc

  • Committer: brian
  • Date: 2008-06-25 05:29:13 UTC
  • Revision ID: brian@localhost.localdomain-20080625052913-6upwo0jsrl4lnapl
clean slate

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
  This program is free software; you can redistribute it and/or modify
 
4
  it under the terms of the GNU General Public License as published by
 
5
  the Free Software Foundation; version 2 of the License.
 
6
 
 
7
  This program is distributed in the hope that it will be useful,
 
8
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
  GNU General Public License for more details.
 
11
 
 
12
  You should have received a copy of the GNU General Public License
 
13
  along with this program; if not, write to the Free Software
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/*
 
17
  Make sure to look at ha_tina.h for more details.
 
18
 
 
19
  First off, this is a play thing for me, there are a number of things
 
20
  wrong with it:
 
21
    *) It was designed for csv and therefore its performance is highly
 
22
       questionable.
 
23
    *) Indexes have not been implemented. This is because the files can
 
24
       be traded in and out of the table directory without having to worry
 
25
       about rebuilding anything.
 
26
    *) NULLs and "" are treated equally (like a spreadsheet).
 
27
    *) There was in the beginning no point to anyone seeing this other
 
28
       then me, so there is a good chance that I haven't quite documented
 
29
       it well.
 
30
    *) Less design, more "make it work"
 
31
 
 
32
  Now there are a few cool things with it:
 
33
    *) Errors can result in corrupted data files.
 
34
    *) Data files can be read by spreadsheets directly.
 
35
 
 
36
TODO:
 
37
 *) Move to a block system for larger files
 
38
 *) Error recovery, its all there, just need to finish it
 
39
 *) Document how the chains work.
 
40
 
 
41
 -Brian
 
42
*/
 
43
 
 
44
#ifdef USE_PRAGMA_IMPLEMENTATION
 
45
#pragma implementation        // gcc: Class implementation
 
46
#endif
 
47
 
 
48
#include "mysql_priv.h"
 
49
#include <mysql/plugin.h>
 
50
#include "ha_tina.h"
 
51
 
 
52
 
 
53
/*
 
54
  uchar + uchar + uint64_t + uint64_t + uint64_t + uint64_t + uchar
 
55
*/
 
56
#define META_BUFFER_SIZE sizeof(uchar) + sizeof(uchar) + sizeof(uint64_t) \
 
57
  + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uchar)
 
58
#define TINA_CHECK_HEADER 254 // The number we use to determine corruption
 
59
#define BLOB_MEMROOT_ALLOC_SIZE 8192
 
60
 
 
61
/* The file extension */
 
62
#define CSV_EXT ".CSV"               // The data file
 
63
#define CSN_EXT ".CSN"               // Files used during repair and update
 
64
#define CSM_EXT ".CSM"               // Meta file
 
65
 
 
66
 
 
67
static TINA_SHARE *get_share(const char *table_name, TABLE *table);
 
68
static int free_share(TINA_SHARE *share);
 
69
static int read_meta_file(File meta_file, ha_rows *rows);
 
70
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
 
71
 
 
72
extern "C" void tina_get_status(void* param, int concurrent_insert);
 
73
extern "C" void tina_update_status(void* param);
 
74
extern "C" my_bool tina_check_status(void* param);
 
75
 
 
76
/* Stuff for shares */
 
77
pthread_mutex_t tina_mutex;
 
78
static HASH tina_open_tables;
 
79
static handler *tina_create_handler(handlerton *hton,
 
80
                                    TABLE_SHARE *table, 
 
81
                                    MEM_ROOT *mem_root);
 
82
 
 
83
 
 
84
/*****************************************************************************
 
85
 ** TINA tables
 
86
 *****************************************************************************/
 
87
 
 
88
/*
 
89
  Used for sorting chains with qsort().
 
90
*/
 
91
int sort_set (tina_set *a, tina_set *b)
 
92
{
 
93
  /*
 
94
    We assume that intervals do not intersect. So, it is enought to compare
 
95
    any two points. Here we take start of intervals for comparison.
 
96
  */
 
97
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
 
98
}
 
99
 
 
100
static uchar* tina_get_key(TINA_SHARE *share, size_t *length,
 
101
                          my_bool not_used __attribute__((unused)))
 
102
{
 
103
  *length=share->table_name_length;
 
104
  return (uchar*) share->table_name;
 
105
}
 
106
 
 
107
static int tina_init_func(void *p)
 
108
{
 
109
  handlerton *tina_hton;
 
110
 
 
111
  tina_hton= (handlerton *)p;
 
112
  VOID(pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST));
 
113
  (void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
 
114
                   (hash_get_key) tina_get_key,0,0);
 
115
  tina_hton->state= SHOW_OPTION_YES;
 
116
  tina_hton->db_type= DB_TYPE_CSV_DB;
 
117
  tina_hton->create= tina_create_handler;
 
118
  tina_hton->flags= (HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES | 
 
119
                     HTON_NO_PARTITION);
 
120
  return 0;
 
121
}
 
122
 
 
123
static int tina_done_func(void *p)
 
124
{
 
125
  hash_free(&tina_open_tables);
 
126
  pthread_mutex_destroy(&tina_mutex);
 
127
 
 
128
  return 0;
 
129
}
 
130
 
 
131
 
 
132
/*
 
133
  Simple lock controls.
 
134
*/
 
135
static TINA_SHARE *get_share(const char *table_name, TABLE *table)
 
136
{
 
137
  TINA_SHARE *share;
 
138
  char meta_file_name[FN_REFLEN];
 
139
  MY_STAT file_stat;                /* Stat information for the data file */
 
140
  char *tmp_name;
 
141
  uint length;
 
142
 
 
143
  pthread_mutex_lock(&tina_mutex);
 
144
  length=(uint) strlen(table_name);
 
145
 
 
146
  /*
 
147
    If share is not present in the hash, create a new share and
 
148
    initialize its members.
 
149
  */
 
150
  if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
 
151
                                        (uchar*) table_name,
 
152
                                       length)))
 
153
  {
 
154
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
155
                         &share, sizeof(*share),
 
156
                         &tmp_name, length+1,
 
157
                         NullS))
 
158
    {
 
159
      pthread_mutex_unlock(&tina_mutex);
 
160
      return NULL;
 
161
    }
 
162
 
 
163
    share->use_count= 0;
 
164
    share->is_log_table= FALSE;
 
165
    share->table_name_length= length;
 
166
    share->table_name= tmp_name;
 
167
    share->crashed= FALSE;
 
168
    share->rows_recorded= 0;
 
169
    share->update_file_opened= FALSE;
 
170
    share->tina_write_opened= FALSE;
 
171
    share->data_file_version= 0;
 
172
    strmov(share->table_name, table_name);
 
173
    fn_format(share->data_file_name, table_name, "", CSV_EXT,
 
174
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
175
    fn_format(meta_file_name, table_name, "", CSM_EXT,
 
176
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
177
 
 
178
    if (my_stat(share->data_file_name, &file_stat, MYF(MY_WME)) == NULL)
 
179
      goto error;
 
180
    share->saved_data_file_length= file_stat.st_size;
 
181
 
 
182
    if (my_hash_insert(&tina_open_tables, (uchar*) share))
 
183
      goto error;
 
184
    thr_lock_init(&share->lock);
 
185
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
 
186
 
 
187
    /*
 
188
      Open or create the meta file. In the latter case, we'll get
 
189
      an error during read_meta_file and mark the table as crashed.
 
190
      Usually this will result in auto-repair, and we will get a good
 
191
      meta-file in the end.
 
192
    */
 
193
    if ((share->meta_file= my_open(meta_file_name,
 
194
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
 
195
      share->crashed= TRUE;
 
196
 
 
197
    /*
 
198
      If the meta file will not open we assume it is crashed and
 
199
      mark it as such.
 
200
    */
 
201
    if (read_meta_file(share->meta_file, &share->rows_recorded))
 
202
      share->crashed= TRUE;
 
203
  }
 
204
  share->use_count++;
 
205
  pthread_mutex_unlock(&tina_mutex);
 
206
 
 
207
  return share;
 
208
 
 
209
error:
 
210
  pthread_mutex_unlock(&tina_mutex);
 
211
  my_free((uchar*) share, MYF(0));
 
212
 
 
213
  return NULL;
 
214
}
 
215
 
 
216
 
 
217
/*
 
218
  Read CSV meta-file
 
219
 
 
220
  SYNOPSIS
 
221
    read_meta_file()
 
222
    meta_file   The meta-file filedes
 
223
    ha_rows     Pointer to the var we use to store rows count.
 
224
                These are read from the meta-file.
 
225
 
 
226
  DESCRIPTION
 
227
 
 
228
    Read the meta-file info. For now we are only interested in
 
229
    rows counf, crashed bit and magic number.
 
230
 
 
231
  RETURN
 
232
    0 - OK
 
233
    non-zero - error occurred
 
234
*/
 
235
 
 
236
static int read_meta_file(File meta_file, ha_rows *rows)
 
237
{
 
238
  uchar meta_buffer[META_BUFFER_SIZE];
 
239
  uchar *ptr= meta_buffer;
 
240
 
 
241
  DBUG_ENTER("ha_tina::read_meta_file");
 
242
 
 
243
  VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
 
244
  if (my_read(meta_file, (uchar*)meta_buffer, META_BUFFER_SIZE, 0)
 
245
      != META_BUFFER_SIZE)
 
246
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
247
 
 
248
  /*
 
249
    Parse out the meta data, we ignore version at the moment
 
250
  */
 
251
 
 
252
  ptr+= sizeof(uchar)*2; // Move past header
 
253
  *rows= (ha_rows)uint8korr(ptr);
 
254
  ptr+= sizeof(uint64_t); // Move past rows
 
255
  /*
 
256
    Move past check_point, auto_increment and forced_flushes fields.
 
257
    They are present in the format, but we do not use them yet.
 
258
  */
 
259
  ptr+= 3*sizeof(uint64_t);
 
260
 
 
261
  /* check crashed bit and magic number */
 
262
  if ((meta_buffer[0] != (uchar)TINA_CHECK_HEADER) ||
 
263
      ((bool)(*ptr)== TRUE))
 
264
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
265
 
 
266
  my_sync(meta_file, MYF(MY_WME));
 
267
 
 
268
  DBUG_RETURN(0);
 
269
}
 
270
 
 
271
 
 
272
/*
 
273
  Write CSV meta-file
 
274
 
 
275
  SYNOPSIS
 
276
    write_meta_file()
 
277
    meta_file   The meta-file filedes
 
278
    ha_rows     The number of rows we have in the datafile.
 
279
    dirty       A flag, which marks whether we have a corrupt table
 
280
 
 
281
  DESCRIPTION
 
282
 
 
283
    Write meta-info the the file. Only rows count, crashed bit and
 
284
    magic number matter now.
 
285
 
 
286
  RETURN
 
287
    0 - OK
 
288
    non-zero - error occurred
 
289
*/
 
290
 
 
291
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
 
292
{
 
293
  uchar meta_buffer[META_BUFFER_SIZE];
 
294
  uchar *ptr= meta_buffer;
 
295
 
 
296
  DBUG_ENTER("ha_tina::write_meta_file");
 
297
 
 
298
  *ptr= (uchar)TINA_CHECK_HEADER;
 
299
  ptr+= sizeof(uchar);
 
300
  *ptr= (uchar)TINA_VERSION;
 
301
  ptr+= sizeof(uchar);
 
302
  int8store(ptr, (uint64_t)rows);
 
303
  ptr+= sizeof(uint64_t);
 
304
  memset(ptr, 0, 3*sizeof(uint64_t));
 
305
  /*
 
306
     Skip over checkpoint, autoincrement and forced_flushes fields.
 
307
     We'll need them later.
 
308
  */
 
309
  ptr+= 3*sizeof(uint64_t);
 
310
  *ptr= (uchar)dirty;
 
311
 
 
312
  VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
 
313
  if (my_write(meta_file, (uchar *)meta_buffer, META_BUFFER_SIZE, 0)
 
314
      != META_BUFFER_SIZE)
 
315
    DBUG_RETURN(-1);
 
316
 
 
317
  my_sync(meta_file, MYF(MY_WME));
 
318
 
 
319
  DBUG_RETURN(0);
 
320
}
 
321
 
 
322
bool ha_tina::check_and_repair(THD *thd)
 
323
{
 
324
  HA_CHECK_OPT check_opt;
 
325
  DBUG_ENTER("ha_tina::check_and_repair");
 
326
 
 
327
  check_opt.init();
 
328
 
 
329
  DBUG_RETURN(repair(thd, &check_opt));
 
330
}
 
331
 
 
332
 
 
333
int ha_tina::init_tina_writer()
 
334
{
 
335
  DBUG_ENTER("ha_tina::init_tina_writer");
 
336
 
 
337
  /*
 
338
    Mark the file as crashed. We will set the flag back when we close
 
339
    the file. In the case of the crash it will remain marked crashed,
 
340
    which enforce recovery.
 
341
  */
 
342
  (void)write_meta_file(share->meta_file, share->rows_recorded, TRUE);
 
343
 
 
344
  if ((share->tina_write_filedes=
 
345
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
 
346
  {
 
347
    DBUG_PRINT("info", ("Could not open tina file writes"));
 
348
    share->crashed= TRUE;
 
349
    DBUG_RETURN(1);
 
350
  }
 
351
  share->tina_write_opened= TRUE;
 
352
 
 
353
  DBUG_RETURN(0);
 
354
}
 
355
 
 
356
 
 
357
bool ha_tina::is_crashed() const
 
358
{
 
359
  DBUG_ENTER("ha_tina::is_crashed");
 
360
  DBUG_RETURN(share->crashed);
 
361
}
 
362
 
 
363
/*
 
364
  Free lock controls.
 
365
*/
 
366
static int free_share(TINA_SHARE *share)
 
367
{
 
368
  DBUG_ENTER("ha_tina::free_share");
 
369
  pthread_mutex_lock(&tina_mutex);
 
370
  int result_code= 0;
 
371
  if (!--share->use_count){
 
372
    /* Write the meta file. Mark it as crashed if needed. */
 
373
    (void)write_meta_file(share->meta_file, share->rows_recorded,
 
374
                          share->crashed ? TRUE :FALSE);
 
375
    if (my_close(share->meta_file, MYF(0)))
 
376
      result_code= 1;
 
377
    if (share->tina_write_opened)
 
378
    {
 
379
      if (my_close(share->tina_write_filedes, MYF(0)))
 
380
        result_code= 1;
 
381
      share->tina_write_opened= FALSE;
 
382
    }
 
383
 
 
384
    hash_delete(&tina_open_tables, (uchar*) share);
 
385
    thr_lock_delete(&share->lock);
 
386
    pthread_mutex_destroy(&share->mutex);
 
387
    my_free((uchar*) share, MYF(0));
 
388
  }
 
389
  pthread_mutex_unlock(&tina_mutex);
 
390
 
 
391
  DBUG_RETURN(result_code);
 
392
}
 
393
 
 
394
 
 
395
/*
 
396
  This function finds the end of a line and returns the length
 
397
  of the line ending.
 
398
 
 
399
  We support three kinds of line endings:
 
400
  '\r'     --  Old Mac OS line ending
 
401
  '\n'     --  Traditional Unix and Mac OS X line ending
 
402
  '\r''\n' --  DOS\Windows line ending
 
403
*/
 
404
 
 
405
off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
 
406
                     off_t end, int *eoln_len)
 
407
{
 
408
  *eoln_len= 0;
 
409
 
 
410
  for (off_t x= begin; x < end; x++)
 
411
  {
 
412
    /* Unix (includes Mac OS X) */
 
413
    if (data_buff->get_value(x) == '\n')
 
414
      *eoln_len= 1;
 
415
    else
 
416
      if (data_buff->get_value(x) == '\r') // Mac or Dos
 
417
      {
 
418
        /* old Mac line ending */
 
419
        if (x + 1 == end || (data_buff->get_value(x + 1) != '\n'))
 
420
          *eoln_len= 1;
 
421
        else // DOS style ending
 
422
          *eoln_len= 2;
 
423
      }
 
424
 
 
425
    if (*eoln_len)  // end of line was found
 
426
      return x;
 
427
  }
 
428
 
 
429
  return 0;
 
430
}
 
431
 
 
432
 
 
433
static handler *tina_create_handler(handlerton *hton,
 
434
                                    TABLE_SHARE *table, 
 
435
                                    MEM_ROOT *mem_root)
 
436
{
 
437
  return new (mem_root) ha_tina(hton, table);
 
438
}
 
439
 
 
440
 
 
441
ha_tina::ha_tina(handlerton *hton, TABLE_SHARE *table_arg)
 
442
  :handler(hton, table_arg),
 
443
  /*
 
444
    These definitions are found in handler.h
 
445
    They are not probably completely right.
 
446
  */
 
447
  current_position(0), next_position(0), local_saved_data_file_length(0),
 
448
  file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
 
449
  local_data_file_version(0), records_is_known(0)
 
450
{
 
451
  /* Set our original buffers from pre-allocated memory */
 
452
  buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
 
453
  chain= chain_buffer;
 
454
  file_buff= new Transparent_file();
 
455
}
 
456
 
 
457
 
 
458
/*
 
459
  Encode a buffer into the quoted format.
 
460
*/
 
461
 
 
462
int ha_tina::encode_quote(uchar *buf)
 
463
{
 
464
  char attribute_buffer[1024];
 
465
  String attribute(attribute_buffer, sizeof(attribute_buffer),
 
466
                   &my_charset_bin);
 
467
 
 
468
  my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
 
469
  buffer.length(0);
 
470
 
 
471
  for (Field **field=table->field ; *field ; field++)
 
472
  {
 
473
    const char *ptr;
 
474
    const char *end_ptr;
 
475
    const bool was_null= (*field)->is_null();
 
476
    
 
477
    /*
 
478
      assistance for backwards compatibility in production builds.
 
479
      note: this will not work for ENUM columns.
 
480
    */
 
481
    if (was_null)
 
482
    {
 
483
      (*field)->set_default();
 
484
      (*field)->set_notnull();
 
485
    }
 
486
 
 
487
    (*field)->val_str(&attribute,&attribute);
 
488
    
 
489
    if (was_null)
 
490
      (*field)->set_null();
 
491
 
 
492
    if ((*field)->str_needs_quotes())
 
493
    {
 
494
      ptr= attribute.ptr();
 
495
      end_ptr= attribute.length() + ptr;
 
496
 
 
497
      buffer.append('"');
 
498
 
 
499
      while (ptr < end_ptr) 
 
500
      {
 
501
        if (*ptr == '"')
 
502
        {
 
503
          buffer.append('\\');
 
504
          buffer.append('"');
 
505
          *ptr++;
 
506
        }
 
507
        else if (*ptr == '\r')
 
508
        {
 
509
          buffer.append('\\');
 
510
          buffer.append('r');
 
511
          *ptr++;
 
512
        }
 
513
        else if (*ptr == '\\')
 
514
        {
 
515
          buffer.append('\\');
 
516
          buffer.append('\\');
 
517
          *ptr++;
 
518
        }
 
519
        else if (*ptr == '\n')
 
520
        {
 
521
          buffer.append('\\');
 
522
          buffer.append('n');
 
523
          *ptr++;
 
524
        }
 
525
        else
 
526
          buffer.append(*ptr++);
 
527
      }
 
528
      buffer.append('"');
 
529
    }
 
530
    else
 
531
    {
 
532
      buffer.append(attribute);
 
533
    }
 
534
 
 
535
    buffer.append(',');
 
536
  }
 
537
  // Remove the comma, add a line feed
 
538
  buffer.length(buffer.length() - 1);
 
539
  buffer.append('\n');
 
540
 
 
541
  //buffer.replace(buffer.length(), 0, "\n", 1);
 
542
 
 
543
  dbug_tmp_restore_column_map(table->read_set, org_bitmap);
 
544
  return (buffer.length());
 
545
}
 
546
 
 
547
/*
 
548
  chain_append() adds delete positions to the chain that we use to keep
 
549
  track of space. Then the chain will be used to cleanup "holes", occurred
 
550
  due to deletes and updates.
 
551
*/
 
552
int ha_tina::chain_append()
 
553
{
 
554
  if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
 
555
    (chain_ptr -1)->end= next_position;
 
556
  else
 
557
  {
 
558
    /* We set up for the next position */
 
559
    if ((off_t)(chain_ptr - chain) == (chain_size -1))
 
560
    {
 
561
      off_t location= chain_ptr - chain;
 
562
      chain_size += DEFAULT_CHAIN_LENGTH;
 
563
      if (chain_alloced)
 
564
      {
 
565
        /* Must cast since my_malloc unlike malloc doesn't have a void ptr */
 
566
        if ((chain= (tina_set *) my_realloc((uchar*)chain,
 
567
                                            chain_size, MYF(MY_WME))) == NULL)
 
568
          return -1;
 
569
      }
 
570
      else
 
571
      {
 
572
        tina_set *ptr= (tina_set *) my_malloc(chain_size * sizeof(tina_set),
 
573
                                              MYF(MY_WME));
 
574
        memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
 
575
        chain= ptr;
 
576
        chain_alloced++;
 
577
      }
 
578
      chain_ptr= chain + location;
 
579
    }
 
580
    chain_ptr->begin= current_position;
 
581
    chain_ptr->end= next_position;
 
582
    chain_ptr++;
 
583
  }
 
584
 
 
585
  return 0;
 
586
}
 
587
 
 
588
 
 
589
/*
 
590
  Scans for a row.
 
591
*/
 
592
int ha_tina::find_current_row(uchar *buf)
 
593
{
 
594
  off_t end_offset, curr_offset= current_position;
 
595
  int eoln_len;
 
596
  my_bitmap_map *org_bitmap;
 
597
  int error;
 
598
  bool read_all;
 
599
  DBUG_ENTER("ha_tina::find_current_row");
 
600
 
 
601
  free_root(&blobroot, MYF(MY_MARK_BLOCKS_FREE));
 
602
 
 
603
  /*
 
604
    We do not read further then local_saved_data_file_length in order
 
605
    not to conflict with undergoing concurrent insert.
 
606
  */
 
607
  if ((end_offset=
 
608
        find_eoln_buff(file_buff, current_position,
 
609
                       local_saved_data_file_length, &eoln_len)) == 0)
 
610
    DBUG_RETURN(HA_ERR_END_OF_FILE);
 
611
 
 
612
  /* We must read all columns in case a table is opened for update */
 
613
  read_all= !bitmap_is_clear_all(table->write_set);
 
614
  /* Avoid asserts in ::store() for columns that are not going to be updated */
 
615
  org_bitmap= dbug_tmp_use_all_columns(table, table->write_set);
 
616
  error= HA_ERR_CRASHED_ON_USAGE;
 
617
 
 
618
  memset(buf, 0, table->s->null_bytes);
 
619
 
 
620
  for (Field **field=table->field ; *field ; field++)
 
621
  {
 
622
    char curr_char;
 
623
    
 
624
    buffer.length(0);
 
625
    if (curr_offset >= end_offset)
 
626
      goto err;
 
627
    curr_char= file_buff->get_value(curr_offset);
 
628
    if (curr_char == '"')
 
629
    {
 
630
      curr_offset++; // Incrementpast the first quote
 
631
 
 
632
      for(; curr_offset < end_offset; curr_offset++)
 
633
      {
 
634
        curr_char= file_buff->get_value(curr_offset);
 
635
        // Need to convert line feeds!
 
636
        if (curr_char == '"' &&
 
637
            (curr_offset == end_offset - 1 ||
 
638
             file_buff->get_value(curr_offset + 1) == ','))
 
639
        {
 
640
          curr_offset+= 2; // Move past the , and the "
 
641
          break;
 
642
        }
 
643
        if (curr_char == '\\' && curr_offset != (end_offset - 1))
 
644
        {
 
645
          curr_offset++;
 
646
          curr_char= file_buff->get_value(curr_offset);
 
647
          if (curr_char == 'r')
 
648
            buffer.append('\r');
 
649
          else if (curr_char == 'n' )
 
650
            buffer.append('\n');
 
651
          else if (curr_char == '\\' || curr_char == '"')
 
652
            buffer.append(curr_char);
 
653
          else  /* This could only happed with an externally created file */
 
654
          {
 
655
            buffer.append('\\');
 
656
            buffer.append(curr_char);
 
657
          }
 
658
        }
 
659
        else // ordinary symbol
 
660
        {
 
661
          /*
 
662
            We are at final symbol and no last quote was found =>
 
663
            we are working with a damaged file.
 
664
          */
 
665
          if (curr_offset == end_offset - 1)
 
666
            goto err;
 
667
          buffer.append(curr_char);
 
668
        }
 
669
      }
 
670
    }
 
671
    else 
 
672
    {
 
673
      for(; curr_offset < end_offset; curr_offset++)
 
674
      {
 
675
        curr_char= file_buff->get_value(curr_offset);
 
676
        if (curr_char == ',')
 
677
        {
 
678
          curr_offset++;       // Skip the ,
 
679
          break;
 
680
        }
 
681
        buffer.append(curr_char);
 
682
      }
 
683
    }
 
684
 
 
685
    if (read_all || bitmap_is_set(table->read_set, (*field)->field_index))
 
686
    {
 
687
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
 
688
                          CHECK_FIELD_WARN))
 
689
        goto err;
 
690
      if ((*field)->flags & BLOB_FLAG)
 
691
      {
 
692
        Field_blob *blob= *(Field_blob**) field;
 
693
        uchar *src, *tgt;
 
694
        uint length, packlength;
 
695
        
 
696
        packlength= blob->pack_length_no_ptr();
 
697
        length= blob->get_length(blob->ptr);
 
698
        memcpy_fixed(&src, blob->ptr + packlength, sizeof(char*));
 
699
        if (src)
 
700
        {
 
701
          tgt= (uchar*) alloc_root(&blobroot, length);
 
702
          bmove(tgt, src, length);
 
703
          memcpy_fixed(blob->ptr + packlength, &tgt, sizeof(char*));
 
704
        }
 
705
      }
 
706
    }
 
707
  }
 
708
  next_position= end_offset + eoln_len;
 
709
  error= 0;
 
710
 
 
711
err:
 
712
  dbug_tmp_restore_column_map(table->write_set, org_bitmap);
 
713
 
 
714
  DBUG_RETURN(error);
 
715
}
 
716
 
 
717
/*
 
718
  If frm_error() is called in table.cc this is called to find out what file
 
719
  extensions exist for this handler.
 
720
*/
 
721
static const char *ha_tina_exts[] = {
 
722
  CSV_EXT,
 
723
  CSM_EXT,
 
724
  NullS
 
725
};
 
726
 
 
727
const char **ha_tina::bas_ext() const
 
728
{
 
729
  return ha_tina_exts;
 
730
}
 
731
 
 
732
/*
 
733
  Three functions below are needed to enable concurrent insert functionality
 
734
  for CSV engine. For more details see mysys/thr_lock.c
 
735
*/
 
736
 
 
737
void tina_get_status(void* param, int concurrent_insert)
 
738
{
 
739
  ha_tina *tina= (ha_tina*) param;
 
740
  tina->get_status();
 
741
}
 
742
 
 
743
void tina_update_status(void* param)
 
744
{
 
745
  ha_tina *tina= (ha_tina*) param;
 
746
  tina->update_status();
 
747
}
 
748
 
 
749
/* this should exist and return 0 for concurrent insert to work */
 
750
my_bool tina_check_status(void* param)
 
751
{
 
752
  return 0;
 
753
}
 
754
 
 
755
/*
 
756
  Save the state of the table
 
757
 
 
758
  SYNOPSIS
 
759
    get_status()
 
760
 
 
761
  DESCRIPTION
 
762
    This function is used to retrieve the file length. During the lock
 
763
    phase of concurrent insert. For more details see comment to
 
764
    ha_tina::update_status below.
 
765
*/
 
766
 
 
767
void ha_tina::get_status()
 
768
{
 
769
  if (share->is_log_table)
 
770
  {
 
771
    /*
 
772
      We have to use mutex to follow pthreads memory visibility
 
773
      rules for share->saved_data_file_length
 
774
    */
 
775
    pthread_mutex_lock(&share->mutex);
 
776
    local_saved_data_file_length= share->saved_data_file_length;
 
777
    pthread_mutex_unlock(&share->mutex);
 
778
    return;
 
779
  }
 
780
  local_saved_data_file_length= share->saved_data_file_length;
 
781
}
 
782
 
 
783
 
 
784
/*
 
785
  Correct the state of the table. Called by unlock routines
 
786
  before the write lock is released.
 
787
 
 
788
  SYNOPSIS
 
789
    update_status()
 
790
 
 
791
  DESCRIPTION
 
792
    When we employ concurrent insert lock, we save current length of the file
 
793
    during the lock phase. We do not read further saved value, as we don't
 
794
    want to interfere with undergoing concurrent insert. Writers update file
 
795
    length info during unlock with update_status().
 
796
 
 
797
  NOTE
 
798
    For log tables concurrent insert works different. The reason is that
 
799
    log tables are always opened and locked. And as they do not unlock
 
800
    tables, the file length after writes should be updated in a different
 
801
    way. For this purpose we need is_log_table flag. When this flag is set
 
802
    we call update_status() explicitly after each row write.
 
803
*/
 
804
 
 
805
void ha_tina::update_status()
 
806
{
 
807
  /* correct local_saved_data_file_length for writers */
 
808
  share->saved_data_file_length= local_saved_data_file_length;
 
809
}
 
810
 
 
811
 
 
812
/*
 
813
  Open a database file. Keep in mind that tables are caches, so
 
814
  this will not be called for every request. Any sort of positions
 
815
  that need to be reset should be kept in the ::extra() call.
 
816
*/
 
817
int ha_tina::open(const char *name, int mode, uint open_options)
 
818
{
 
819
  DBUG_ENTER("ha_tina::open");
 
820
 
 
821
  if (!(share= get_share(name, table)))
 
822
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
 
823
 
 
824
  if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
 
825
  {
 
826
    free_share(share);
 
827
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
828
  }
 
829
 
 
830
  local_data_file_version= share->data_file_version;
 
831
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
 
832
    DBUG_RETURN(0);
 
833
 
 
834
  /*
 
835
    Init locking. Pass handler object to the locking routines,
 
836
    so that they could save/update local_saved_data_file_length value
 
837
    during locking. This is needed to enable concurrent inserts.
 
838
  */
 
839
  thr_lock_data_init(&share->lock, &lock, (void*) this);
 
840
  ref_length=sizeof(off_t);
 
841
 
 
842
  share->lock.get_status= tina_get_status;
 
843
  share->lock.update_status= tina_update_status;
 
844
  share->lock.check_status= tina_check_status;
 
845
 
 
846
  DBUG_RETURN(0);
 
847
}
 
848
 
 
849
 
 
850
/*
 
851
  Close a database file. We remove ourselves from the shared strucutre.
 
852
  If it is empty we destroy it.
 
853
*/
 
854
int ha_tina::close(void)
 
855
{
 
856
  int rc= 0;
 
857
  DBUG_ENTER("ha_tina::close");
 
858
  rc= my_close(data_file, MYF(0));
 
859
  DBUG_RETURN(free_share(share) || rc);
 
860
}
 
861
 
 
862
/*
 
863
  This is an INSERT. At the moment this handler just seeks to the end
 
864
  of the file and appends the data. In an error case it really should
 
865
  just truncate to the original position (this is not done yet).
 
866
*/
 
867
int ha_tina::write_row(uchar * buf)
 
868
{
 
869
  int size;
 
870
  DBUG_ENTER("ha_tina::write_row");
 
871
 
 
872
  if (share->crashed)
 
873
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
874
 
 
875
  ha_statistic_increment(&SSV::ha_write_count);
 
876
 
 
877
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
878
    table->timestamp_field->set_time();
 
879
 
 
880
  size= encode_quote(buf);
 
881
 
 
882
  if (!share->tina_write_opened)
 
883
    if (init_tina_writer())
 
884
      DBUG_RETURN(-1);
 
885
 
 
886
   /* use pwrite, as concurrent reader could have changed the position */
 
887
  if (my_write(share->tina_write_filedes, (uchar*)buffer.ptr(), size,
 
888
               MYF(MY_WME | MY_NABP)))
 
889
    DBUG_RETURN(-1);
 
890
 
 
891
  /* update local copy of the max position to see our own changes */
 
892
  local_saved_data_file_length+= size;
 
893
 
 
894
  /* update shared info */
 
895
  pthread_mutex_lock(&share->mutex);
 
896
  share->rows_recorded++;
 
897
  /* update status for the log tables */
 
898
  if (share->is_log_table)
 
899
    update_status();
 
900
  pthread_mutex_unlock(&share->mutex);
 
901
 
 
902
  stats.records++;
 
903
  DBUG_RETURN(0);
 
904
}
 
905
 
 
906
 
 
907
int ha_tina::open_update_temp_file_if_needed()
 
908
{
 
909
  char updated_fname[FN_REFLEN];
 
910
 
 
911
  if (!share->update_file_opened)
 
912
  {
 
913
    if ((update_temp_file=
 
914
           my_create(fn_format(updated_fname, share->table_name,
 
915
                               "", CSN_EXT,
 
916
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
917
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
 
918
      return 1;
 
919
    share->update_file_opened= TRUE;
 
920
    temp_file_length= 0;
 
921
  }
 
922
  return 0;
 
923
}
 
924
 
 
925
/*
 
926
  This is called for an update.
 
927
  Make sure you put in code to increment the auto increment, also
 
928
  update any timestamp data. Currently auto increment is not being
 
929
  fixed since autoincrements have yet to be added to this table handler.
 
930
  This will be called in a table scan right before the previous ::rnd_next()
 
931
  call.
 
932
*/
 
933
int ha_tina::update_row(const uchar * old_data, uchar * new_data)
 
934
{
 
935
  int size;
 
936
  int rc= -1;
 
937
  DBUG_ENTER("ha_tina::update_row");
 
938
 
 
939
  ha_statistic_increment(&SSV::ha_update_count);
 
940
 
 
941
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 
942
    table->timestamp_field->set_time();
 
943
 
 
944
  size= encode_quote(new_data);
 
945
 
 
946
  /*
 
947
    During update we mark each updating record as deleted 
 
948
    (see the chain_append()) then write new one to the temporary data file. 
 
949
    At the end of the sequence in the rnd_end() we append all non-marked
 
950
    records from the data file to the temporary data file then rename it.
 
951
    The temp_file_length is used to calculate new data file length.
 
952
  */
 
953
  if (chain_append())
 
954
    goto err;
 
955
 
 
956
  if (open_update_temp_file_if_needed())
 
957
    goto err;
 
958
 
 
959
  if (my_write(update_temp_file, (uchar*)buffer.ptr(), size,
 
960
               MYF(MY_WME | MY_NABP)))
 
961
    goto err;
 
962
  temp_file_length+= size;
 
963
  rc= 0;
 
964
 
 
965
  /* UPDATE should never happen on the log tables */
 
966
  DBUG_ASSERT(!share->is_log_table);
 
967
 
 
968
err:
 
969
  DBUG_PRINT("info",("rc = %d", rc));
 
970
  DBUG_RETURN(rc);
 
971
}
 
972
 
 
973
 
 
974
/*
 
975
  Deletes a row. First the database will find the row, and then call this
 
976
  method. In the case of a table scan, the previous call to this will be
 
977
  the ::rnd_next() that found this row.
 
978
  The exception to this is an ORDER BY. This will cause the table handler
 
979
  to walk the table noting the positions of all rows that match a query.
 
980
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
 
981
  DESC, ASC).
 
982
*/
 
983
int ha_tina::delete_row(const uchar * buf)
 
984
{
 
985
  DBUG_ENTER("ha_tina::delete_row");
 
986
  ha_statistic_increment(&SSV::ha_delete_count);
 
987
 
 
988
  if (chain_append())
 
989
    DBUG_RETURN(-1);
 
990
 
 
991
  stats.records--;
 
992
  /* Update shared info */
 
993
  DBUG_ASSERT(share->rows_recorded);
 
994
  pthread_mutex_lock(&share->mutex);
 
995
  share->rows_recorded--;
 
996
  pthread_mutex_unlock(&share->mutex);
 
997
 
 
998
  /* DELETE should never happen on the log table */
 
999
  DBUG_ASSERT(!share->is_log_table);
 
1000
 
 
1001
  DBUG_RETURN(0);
 
1002
}
 
1003
 
 
1004
 
 
1005
/**
 
1006
  @brief Initialize the data file.
 
1007
  
 
1008
  @details Compare the local version of the data file with the shared one.
 
1009
  If they differ, there are some changes behind and we have to reopen
 
1010
  the data file to make the changes visible.
 
1011
  Call @c file_buff->init_buff() at the end to read the beginning of the 
 
1012
  data file into buffer.
 
1013
  
 
1014
  @retval  0  OK.
 
1015
  @retval  1  There was an error.
 
1016
*/
 
1017
 
 
1018
int ha_tina::init_data_file()
 
1019
{
 
1020
  if (local_data_file_version != share->data_file_version)
 
1021
  {
 
1022
    local_data_file_version= share->data_file_version;
 
1023
    if (my_close(data_file, MYF(0)) ||
 
1024
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
 
1025
      return 1;
 
1026
  }
 
1027
  file_buff->init_buff(data_file);
 
1028
  return 0;
 
1029
}
 
1030
 
 
1031
 
 
1032
/*
 
1033
  All table scans call this first.
 
1034
  The order of a table scan is:
 
1035
 
 
1036
  ha_tina::store_lock
 
1037
  ha_tina::external_lock
 
1038
  ha_tina::info
 
1039
  ha_tina::rnd_init
 
1040
  ha_tina::extra
 
1041
  ENUM HA_EXTRA_CACHE   Cash record in HA_rrnd()
 
1042
  ha_tina::rnd_next
 
1043
  ha_tina::rnd_next
 
1044
  ha_tina::rnd_next
 
1045
  ha_tina::rnd_next
 
1046
  ha_tina::rnd_next
 
1047
  ha_tina::rnd_next
 
1048
  ha_tina::rnd_next
 
1049
  ha_tina::rnd_next
 
1050
  ha_tina::rnd_next
 
1051
  ha_tina::extra
 
1052
  ENUM HA_EXTRA_NO_CACHE   End cacheing of records (def)
 
1053
  ha_tina::external_lock
 
1054
  ha_tina::extra
 
1055
  ENUM HA_EXTRA_RESET   Reset database to after open
 
1056
 
 
1057
  Each call to ::rnd_next() represents a row returned in the can. When no more
 
1058
  rows can be returned, rnd_next() returns a value of HA_ERR_END_OF_FILE.
 
1059
  The ::info() call is just for the optimizer.
 
1060
 
 
1061
*/
 
1062
 
 
1063
int ha_tina::rnd_init(bool scan)
 
1064
{
 
1065
  DBUG_ENTER("ha_tina::rnd_init");
 
1066
 
 
1067
  /* set buffer to the beginning of the file */
 
1068
  if (share->crashed || init_data_file())
 
1069
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
1070
 
 
1071
  current_position= next_position= 0;
 
1072
  stats.records= 0;
 
1073
  records_is_known= 0;
 
1074
  chain_ptr= chain;
 
1075
 
 
1076
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1077
 
 
1078
  DBUG_RETURN(0);
 
1079
}
 
1080
 
 
1081
/*
 
1082
  ::rnd_next() does all the heavy lifting for a table scan. You will need to
 
1083
  populate *buf with the correct field data. You can walk the field to
 
1084
  determine at what position you should store the data (take a look at how
 
1085
  ::find_current_row() works). The structure is something like:
 
1086
  0Foo  Dog  Friend
 
1087
  The first offset is for the first attribute. All space before that is
 
1088
  reserved for null count.
 
1089
  Basically this works as a mask for which rows are nulled (compared to just
 
1090
  empty).
 
1091
  This table handler doesn't do nulls and does not know the difference between
 
1092
  NULL and "". This is ok since this table handler is for spreadsheets and
 
1093
  they don't know about them either :)
 
1094
*/
 
1095
int ha_tina::rnd_next(uchar *buf)
 
1096
{
 
1097
  int rc;
 
1098
  DBUG_ENTER("ha_tina::rnd_next");
 
1099
 
 
1100
  if (share->crashed)
 
1101
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
1102
 
 
1103
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
 
1104
 
 
1105
  current_position= next_position;
 
1106
 
 
1107
  /* don't scan an empty file */
 
1108
  if (!local_saved_data_file_length)
 
1109
    DBUG_RETURN(HA_ERR_END_OF_FILE);
 
1110
 
 
1111
  if ((rc= find_current_row(buf)))
 
1112
    DBUG_RETURN(rc);
 
1113
 
 
1114
  stats.records++;
 
1115
  DBUG_RETURN(0);
 
1116
}
 
1117
 
 
1118
/*
 
1119
  In the case of an order by rows will need to be sorted.
 
1120
  ::position() is called after each call to ::rnd_next(),
 
1121
  the data it stores is to a byte array. You can store this
 
1122
  data via my_store_ptr(). ref_length is a variable defined to the
 
1123
  class that is the sizeof() of position being stored. In our case
 
1124
  its just a position. Look at the bdb code if you want to see a case
 
1125
  where something other then a number is stored.
 
1126
*/
 
1127
void ha_tina::position(const uchar *record)
 
1128
{
 
1129
  DBUG_ENTER("ha_tina::position");
 
1130
  my_store_ptr(ref, ref_length, current_position);
 
1131
  DBUG_VOID_RETURN;
 
1132
}
 
1133
 
 
1134
 
 
1135
/*
 
1136
  Used to fetch a row from a posiion stored with ::position().
 
1137
  my_get_ptr() retrieves the data for you.
 
1138
*/
 
1139
 
 
1140
int ha_tina::rnd_pos(uchar * buf, uchar *pos)
 
1141
{
 
1142
  DBUG_ENTER("ha_tina::rnd_pos");
 
1143
  ha_statistic_increment(&SSV::ha_read_rnd_count);
 
1144
  current_position= (off_t)my_get_ptr(pos,ref_length);
 
1145
  DBUG_RETURN(find_current_row(buf));
 
1146
}
 
1147
 
 
1148
/*
 
1149
  ::info() is used to return information to the optimizer.
 
1150
  Currently this table handler doesn't implement most of the fields
 
1151
  really needed. SHOW also makes use of this data
 
1152
*/
 
1153
int ha_tina::info(uint flag)
 
1154
{
 
1155
  DBUG_ENTER("ha_tina::info");
 
1156
  /* This is a lie, but you don't want the optimizer to see zero or 1 */
 
1157
  if (!records_is_known && stats.records < 2) 
 
1158
    stats.records= 2;
 
1159
  DBUG_RETURN(0);
 
1160
}
 
1161
 
 
1162
/*
 
1163
  Grab bag of flags that are sent to the able handler every so often.
 
1164
  HA_EXTRA_RESET and HA_EXTRA_RESET_STATE are the most frequently called.
 
1165
  You are not required to implement any of these.
 
1166
*/
 
1167
int ha_tina::extra(enum ha_extra_function operation)
 
1168
{
 
1169
  DBUG_ENTER("ha_tina::extra");
 
1170
 if (operation == HA_EXTRA_MARK_AS_LOG_TABLE)
 
1171
 {
 
1172
   pthread_mutex_lock(&share->mutex);
 
1173
   share->is_log_table= TRUE;
 
1174
   pthread_mutex_unlock(&share->mutex);
 
1175
 }
 
1176
  DBUG_RETURN(0);
 
1177
}
 
1178
 
 
1179
/*
 
1180
  Set end_pos to the last valid byte of continuous area, closest
 
1181
  to the given "hole", stored in the buffer. "Valid" here means,
 
1182
  not listed in the chain of deleted records ("holes").
 
1183
*/
 
1184
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
 
1185
{
 
1186
  if (closest_hole == chain_ptr) /* no more chains */
 
1187
    *end_pos= file_buff->end();
 
1188
  else
 
1189
    *end_pos= min(file_buff->end(),
 
1190
                  closest_hole->begin);
 
1191
  return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
 
1192
}
 
1193
 
 
1194
 
 
1195
/*
 
1196
  Called after each table scan. In particular after deletes,
 
1197
  and updates. In the last case we employ chain of deleted
 
1198
  slots to clean up all of the dead space we have collected while
 
1199
  performing deletes/updates.
 
1200
*/
 
1201
int ha_tina::rnd_end()
 
1202
{
 
1203
  char updated_fname[FN_REFLEN];
 
1204
  off_t file_buffer_start= 0;
 
1205
  DBUG_ENTER("ha_tina::rnd_end");
 
1206
 
 
1207
  free_root(&blobroot, MYF(0));
 
1208
  records_is_known= 1;
 
1209
 
 
1210
  if ((chain_ptr - chain)  > 0)
 
1211
  {
 
1212
    tina_set *ptr= chain;
 
1213
 
 
1214
    /*
 
1215
      Re-read the beginning of a file (as the buffer should point to the
 
1216
      end of file after the scan).
 
1217
    */
 
1218
    file_buff->init_buff(data_file);
 
1219
 
 
1220
    /*
 
1221
      The sort is needed when there were updates/deletes with random orders.
 
1222
      It sorts so that we move the firts blocks to the beginning.
 
1223
    */
 
1224
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1225
             (qsort_cmp)sort_set);
 
1226
 
 
1227
    off_t write_begin= 0, write_end;
 
1228
 
 
1229
    /* create the file to write updated table if it wasn't yet created */
 
1230
    if (open_update_temp_file_if_needed())
 
1231
      DBUG_RETURN(-1);
 
1232
 
 
1233
    /* write the file with updated info */
 
1234
    while ((file_buffer_start != -1))     // while not end of file
 
1235
    {
 
1236
      bool in_hole= get_write_pos(&write_end, ptr);
 
1237
      off_t write_length= write_end - write_begin;
 
1238
 
 
1239
      /* if there is something to write, write it */
 
1240
      if (write_length)
 
1241
      {
 
1242
        if (my_write(update_temp_file, 
 
1243
                     (uchar*) (file_buff->ptr() +
 
1244
                               (write_begin - file_buff->start())),
 
1245
                     write_length, MYF_RW))
 
1246
          goto error;
 
1247
        temp_file_length+= write_length;
 
1248
      }
 
1249
      if (in_hole)
 
1250
      {
 
1251
        /* skip hole */
 
1252
        while (file_buff->end() <= ptr->end && file_buffer_start != -1)
 
1253
          file_buffer_start= file_buff->read_next();
 
1254
        write_begin= ptr->end;
 
1255
        ptr++;
 
1256
      }
 
1257
      else
 
1258
        write_begin= write_end;
 
1259
 
 
1260
      if (write_end == file_buff->end())
 
1261
        file_buffer_start= file_buff->read_next(); /* shift the buffer */
 
1262
 
 
1263
    }
 
1264
 
 
1265
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
 
1266
        my_close(update_temp_file, MYF(0)))
 
1267
      DBUG_RETURN(-1);
 
1268
 
 
1269
    share->update_file_opened= FALSE;
 
1270
 
 
1271
    if (share->tina_write_opened)
 
1272
    {
 
1273
      if (my_close(share->tina_write_filedes, MYF(0)))
 
1274
        DBUG_RETURN(-1);
 
1275
      /*
 
1276
        Mark that the writer fd is closed, so that init_tina_writer()
 
1277
        will reopen it later.
 
1278
      */
 
1279
      share->tina_write_opened= FALSE;
 
1280
    }
 
1281
 
 
1282
    /*
 
1283
      Close opened fildes's. Then move updated file in place
 
1284
      of the old datafile.
 
1285
    */
 
1286
    if (my_close(data_file, MYF(0)) ||
 
1287
        my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT,
 
1288
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1289
                  share->data_file_name, MYF(0)))
 
1290
      DBUG_RETURN(-1);
 
1291
 
 
1292
    /* Open the file again */
 
1293
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
 
1294
      DBUG_RETURN(-1);
 
1295
    /*
 
1296
      As we reopened the data file, increase share->data_file_version 
 
1297
      in order to force other threads waiting on a table lock and  
 
1298
      have already opened the table to reopen the data file.
 
1299
      That makes the latest changes become visible to them.
 
1300
      Update local_data_file_version as no need to reopen it in the 
 
1301
      current thread.
 
1302
    */
 
1303
    share->data_file_version++;
 
1304
    local_data_file_version= share->data_file_version;
 
1305
    /*
 
1306
      The datafile is consistent at this point and the write filedes is
 
1307
      closed, so nothing worrying will happen to it in case of a crash.
 
1308
      Here we record this fact to the meta-file.
 
1309
    */
 
1310
    (void)write_meta_file(share->meta_file, share->rows_recorded, FALSE);
 
1311
    /* 
 
1312
      Update local_saved_data_file_length with the real length of the 
 
1313
      data file.
 
1314
    */
 
1315
    local_saved_data_file_length= temp_file_length;
 
1316
  }
 
1317
 
 
1318
  DBUG_RETURN(0);
 
1319
error:
 
1320
  my_close(update_temp_file, MYF(0));
 
1321
  share->update_file_opened= FALSE;
 
1322
  DBUG_RETURN(-1);
 
1323
}
 
1324
 
 
1325
 
 
1326
/*
 
1327
  Repair CSV table in the case, it is crashed.
 
1328
 
 
1329
  SYNOPSIS
 
1330
    repair()
 
1331
    thd         The thread, performing repair
 
1332
    check_opt   The options for repair. We do not use it currently.
 
1333
 
 
1334
  DESCRIPTION
 
1335
    If the file is empty, change # of rows in the file and complete recovery.
 
1336
    Otherwise, scan the table looking for bad rows. If none were found,
 
1337
    we mark file as a good one and return. If a bad row was encountered,
 
1338
    we truncate the datafile up to the last good row.
 
1339
 
 
1340
   TODO: Make repair more clever - it should try to recover subsequent
 
1341
         rows (after the first bad one) as well.
 
1342
*/
 
1343
 
 
1344
int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt)
 
1345
{
 
1346
  char repaired_fname[FN_REFLEN];
 
1347
  uchar *buf;
 
1348
  File repair_file;
 
1349
  int rc;
 
1350
  ha_rows rows_repaired= 0;
 
1351
  off_t write_begin= 0, write_end;
 
1352
  DBUG_ENTER("ha_tina::repair");
 
1353
 
 
1354
  /* empty file */
 
1355
  if (!share->saved_data_file_length)
 
1356
  {
 
1357
    share->rows_recorded= 0;
 
1358
    goto end;
 
1359
  }
 
1360
 
 
1361
  /* Don't assert in field::val() functions */
 
1362
  table->use_all_columns();
 
1363
  if (!(buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
 
1364
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
 
1365
 
 
1366
  /* position buffer to the start of the file */
 
1367
  if (init_data_file())
 
1368
    DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
 
1369
 
 
1370
  /*
 
1371
    Local_saved_data_file_length is initialized during the lock phase.
 
1372
    Sometimes this is not getting executed before ::repair (e.g. for
 
1373
    the log tables). We set it manually here.
 
1374
  */
 
1375
  local_saved_data_file_length= share->saved_data_file_length;
 
1376
  /* set current position to the beginning of the file */
 
1377
  current_position= next_position= 0;
 
1378
 
 
1379
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1380
 
 
1381
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1382
  while (!(rc= find_current_row(buf)))
 
1383
  {
 
1384
    thd_inc_row_count(thd);
 
1385
    rows_repaired++;
 
1386
    current_position= next_position;
 
1387
  }
 
1388
 
 
1389
  free_root(&blobroot, MYF(0));
 
1390
 
 
1391
  my_free((char*)buf, MYF(0));
 
1392
 
 
1393
  if (rc == HA_ERR_END_OF_FILE)
 
1394
  {
 
1395
    /*
 
1396
      All rows were read ok until end of file, the file does not need repair.
 
1397
      If rows_recorded != rows_repaired, we should update rows_recorded value
 
1398
      to the current amount of rows.
 
1399
    */
 
1400
    share->rows_recorded= rows_repaired;
 
1401
    goto end;
 
1402
  }
 
1403
 
 
1404
  /*
 
1405
    Otherwise we've encountered a bad row => repair is needed.
 
1406
    Let us create a temporary file.
 
1407
  */
 
1408
  if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
 
1409
                                        "", CSN_EXT,
 
1410
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),
 
1411
                           0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1412
    DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
 
1413
 
 
1414
  file_buff->init_buff(data_file);
 
1415
 
 
1416
 
 
1417
  /* we just truncated the file up to the first bad row. update rows count. */
 
1418
  share->rows_recorded= rows_repaired;
 
1419
 
 
1420
  /* write repaired file */
 
1421
  while (1)
 
1422
  {
 
1423
    write_end= min(file_buff->end(), current_position);
 
1424
    if ((write_end - write_begin) &&
 
1425
        (my_write(repair_file, (uchar*)file_buff->ptr(),
 
1426
                  write_end - write_begin, MYF_RW)))
 
1427
      DBUG_RETURN(-1);
 
1428
 
 
1429
    write_begin= write_end;
 
1430
    if (write_end== current_position)
 
1431
      break;
 
1432
    else
 
1433
      file_buff->read_next(); /* shift the buffer */
 
1434
  }
 
1435
 
 
1436
  /*
 
1437
    Close the files and rename repaired file to the datafile.
 
1438
    We have to close the files, as on Windows one cannot rename
 
1439
    a file, which descriptor is still open. EACCES will be returned
 
1440
    when trying to delete the "to"-file in my_rename().
 
1441
  */
 
1442
  if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
 
1443
      my_rename(repaired_fname, share->data_file_name, MYF(0)))
 
1444
    DBUG_RETURN(-1);
 
1445
 
 
1446
  /* Open the file again, it should now be repaired */
 
1447
  if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
 
1448
                          MYF(0))) == -1)
 
1449
     DBUG_RETURN(-1);
 
1450
 
 
1451
  /* Set new file size. The file size will be updated by ::update_status() */
 
1452
  local_saved_data_file_length= (size_t) current_position;
 
1453
 
 
1454
end:
 
1455
  share->crashed= FALSE;
 
1456
  DBUG_RETURN(HA_ADMIN_OK);
 
1457
}
 
1458
 
 
1459
/*
 
1460
  DELETE without WHERE calls this
 
1461
*/
 
1462
 
 
1463
int ha_tina::delete_all_rows()
 
1464
{
 
1465
  int rc;
 
1466
  DBUG_ENTER("ha_tina::delete_all_rows");
 
1467
 
 
1468
  if (!records_is_known)
 
1469
    DBUG_RETURN(my_errno=HA_ERR_WRONG_COMMAND);
 
1470
 
 
1471
  if (!share->tina_write_opened)
 
1472
    if (init_tina_writer())
 
1473
      DBUG_RETURN(-1);
 
1474
 
 
1475
  /* Truncate the file to zero size */
 
1476
  rc= my_chsize(share->tina_write_filedes, 0, 0, MYF(MY_WME));
 
1477
 
 
1478
  stats.records=0;
 
1479
  /* Update shared info */
 
1480
  pthread_mutex_lock(&share->mutex);
 
1481
  share->rows_recorded= 0;
 
1482
  pthread_mutex_unlock(&share->mutex);
 
1483
  local_saved_data_file_length= 0;
 
1484
  DBUG_RETURN(rc);
 
1485
}
 
1486
 
 
1487
/*
 
1488
  Called by the database to lock the table. Keep in mind that this
 
1489
  is an internal lock.
 
1490
*/
 
1491
THR_LOCK_DATA **ha_tina::store_lock(THD *thd,
 
1492
                                    THR_LOCK_DATA **to,
 
1493
                                    enum thr_lock_type lock_type)
 
1494
{
 
1495
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1496
    lock.type=lock_type;
 
1497
  *to++= &lock;
 
1498
  return to;
 
1499
}
 
1500
 
 
1501
/* 
 
1502
  Create a table. You do not want to leave the table open after a call to
 
1503
  this (the database will call ::open() if it needs to).
 
1504
*/
 
1505
 
 
1506
int ha_tina::create(const char *name, TABLE *table_arg,
 
1507
                    HA_CREATE_INFO *create_info)
 
1508
{
 
1509
  char name_buff[FN_REFLEN];
 
1510
  File create_file;
 
1511
  DBUG_ENTER("ha_tina::create");
 
1512
 
 
1513
  /*
 
1514
    check columns
 
1515
  */
 
1516
  for (Field **field= table_arg->s->field; *field; field++)
 
1517
  {
 
1518
    if ((*field)->real_maybe_null())
 
1519
    {
 
1520
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
 
1521
      DBUG_RETURN(HA_ERR_UNSUPPORTED);
 
1522
    }
 
1523
  }
 
1524
  
 
1525
 
 
1526
  if ((create_file= my_create(fn_format(name_buff, name, "", CSM_EXT,
 
1527
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
 
1528
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1529
    DBUG_RETURN(-1);
 
1530
 
 
1531
  write_meta_file(create_file, 0, FALSE);
 
1532
  my_close(create_file, MYF(0));
 
1533
 
 
1534
  if ((create_file= my_create(fn_format(name_buff, name, "", CSV_EXT,
 
1535
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
 
1536
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1537
    DBUG_RETURN(-1);
 
1538
 
 
1539
  my_close(create_file, MYF(0));
 
1540
 
 
1541
  DBUG_RETURN(0);
 
1542
}
 
1543
 
 
1544
int ha_tina::check(THD* thd, HA_CHECK_OPT* check_opt)
 
1545
{
 
1546
  int rc= 0;
 
1547
  uchar *buf;
 
1548
  const char *old_proc_info;
 
1549
  ha_rows count= share->rows_recorded;
 
1550
  DBUG_ENTER("ha_tina::check");
 
1551
 
 
1552
  old_proc_info= thd_proc_info(thd, "Checking table");
 
1553
  if (!(buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
 
1554
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
 
1555
 
 
1556
  /* position buffer to the start of the file */
 
1557
   if (init_data_file())
 
1558
     DBUG_RETURN(HA_ERR_CRASHED);
 
1559
 
 
1560
  /*
 
1561
    Local_saved_data_file_length is initialized during the lock phase.
 
1562
    Check does not use store_lock in certain cases. So, we set it
 
1563
    manually here.
 
1564
  */
 
1565
  local_saved_data_file_length= share->saved_data_file_length;
 
1566
  /* set current position to the beginning of the file */
 
1567
  current_position= next_position= 0;
 
1568
 
 
1569
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1570
 
 
1571
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1572
  while (!(rc= find_current_row(buf)))
 
1573
  {
 
1574
    thd_inc_row_count(thd);
 
1575
    count--;
 
1576
    current_position= next_position;
 
1577
  }
 
1578
  
 
1579
  free_root(&blobroot, MYF(0));
 
1580
 
 
1581
  my_free((char*)buf, MYF(0));
 
1582
  thd_proc_info(thd, old_proc_info);
 
1583
 
 
1584
  if ((rc != HA_ERR_END_OF_FILE) || count)
 
1585
  {
 
1586
    share->crashed= TRUE;
 
1587
    DBUG_RETURN(HA_ADMIN_CORRUPT);
 
1588
  }
 
1589
  else
 
1590
    DBUG_RETURN(HA_ADMIN_OK);
 
1591
}
 
1592
 
 
1593
 
 
1594
bool ha_tina::check_if_incompatible_data(HA_CREATE_INFO *info,
 
1595
                                           uint table_changes)
 
1596
{
 
1597
  return COMPATIBLE_DATA_YES;
 
1598
}
 
1599
 
 
1600
struct st_mysql_storage_engine csv_storage_engine=
 
1601
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
 
1602
 
 
1603
mysql_declare_plugin(csv)
 
1604
{
 
1605
  MYSQL_STORAGE_ENGINE_PLUGIN,
 
1606
  &csv_storage_engine,
 
1607
  "CSV",
 
1608
  "Brian Aker, MySQL AB",
 
1609
  "CSV storage engine",
 
1610
  PLUGIN_LICENSE_GPL,
 
1611
  tina_init_func, /* Plugin Init */
 
1612
  tina_done_func, /* Plugin Deinit */
 
1613
  0x0100 /* 1.0 */,
 
1614
  NULL,                       /* status variables                */
 
1615
  NULL,                       /* system variables                */
 
1616
  NULL                        /* config options                  */
 
1617
}
 
1618
mysql_declare_plugin_end;
 
1619