~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/csv/ha_tina.cc

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
  This program is free software; you can redistribute it and/or modify
 
4
  it under the terms of the GNU General Public License as published by
 
5
  the Free Software Foundation; version 2 of the License.
 
6
 
 
7
  This program is distributed in the hope that it will be useful,
 
8
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
  GNU General Public License for more details.
 
11
 
 
12
  You should have received a copy of the GNU General Public License
 
13
  along with this program; if not, write to the Free Software
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/*
 
17
  Make sure to look at ha_tina.h for more details.
 
18
 
 
19
  First off, this is a play thing for me, there are a number of things
 
20
  wrong with it:
 
21
    *) It was designed for csv and therefore its performance is highly
 
22
       questionable.
 
23
    *) Indexes have not been implemented. This is because the files can
 
24
       be traded in and out of the table directory without having to worry
 
25
       about rebuilding anything.
 
26
    *) NULLs and "" are treated equally (like a spreadsheet).
 
27
    *) There was in the beginning no point to anyone seeing this other
 
28
       then me, so there is a good chance that I haven't quite documented
 
29
       it well.
 
30
    *) Less design, more "make it work"
 
31
 
 
32
  Now there are a few cool things with it:
 
33
    *) Errors can result in corrupted data files.
 
34
    *) Data files can be read by spreadsheets directly.
 
35
 
 
36
TODO:
 
37
 *) Move to a block system for larger files
 
38
 *) Error recovery, its all there, just need to finish it
 
39
 *) Document how the chains work.
 
40
 
 
41
 -Brian
 
42
*/
 
43
 
 
44
#ifdef USE_PRAGMA_IMPLEMENTATION
 
45
#pragma implementation        // gcc: Class implementation
 
46
#endif
 
47
 
 
48
#include "mysql_priv.h"
 
49
#include <mysql/plugin.h>
 
50
#include "ha_tina.h"
 
51
 
 
52
 
 
53
/*
 
54
  uchar + uchar + ulonglong + ulonglong + ulonglong + ulonglong + uchar
 
55
*/
 
56
#define META_BUFFER_SIZE sizeof(uchar) + sizeof(uchar) + sizeof(ulonglong) \
 
57
  + sizeof(ulonglong) + sizeof(ulonglong) + sizeof(ulonglong) + sizeof(uchar)
 
58
#define TINA_CHECK_HEADER 254 // The number we use to determine corruption
 
59
#define BLOB_MEMROOT_ALLOC_SIZE 8192
 
60
 
 
61
/* The file extension */
 
62
#define CSV_EXT ".CSV"               // The data file
 
63
#define CSN_EXT ".CSN"               // Files used during repair and update
 
64
#define CSM_EXT ".CSM"               // Meta file
 
65
 
 
66
 
 
67
static TINA_SHARE *get_share(const char *table_name, TABLE *table);
 
68
static int free_share(TINA_SHARE *share);
 
69
static int read_meta_file(File meta_file, ha_rows *rows);
 
70
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
 
71
 
 
72
extern "C" void tina_get_status(void* param, int concurrent_insert);
 
73
extern "C" void tina_update_status(void* param);
 
74
extern "C" my_bool tina_check_status(void* param);
 
75
 
 
76
/* Stuff for shares */
 
77
pthread_mutex_t tina_mutex;
 
78
static HASH tina_open_tables;
 
79
static handler *tina_create_handler(handlerton *hton,
 
80
                                    TABLE_SHARE *table, 
 
81
                                    MEM_ROOT *mem_root);
 
82
 
 
83
 
 
84
/*****************************************************************************
 
85
 ** TINA tables
 
86
 *****************************************************************************/
 
87
 
 
88
/*
 
89
  Used for sorting chains with qsort().
 
90
*/
 
91
int sort_set (tina_set *a, tina_set *b)
 
92
{
 
93
  /*
 
94
    We assume that intervals do not intersect. So, it is enought to compare
 
95
    any two points. Here we take start of intervals for comparison.
 
96
  */
 
97
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
 
98
}
 
99
 
 
100
static uchar* tina_get_key(TINA_SHARE *share, size_t *length,
 
101
                          my_bool not_used __attribute__((unused)))
 
102
{
 
103
  *length=share->table_name_length;
 
104
  return (uchar*) share->table_name;
 
105
}
 
106
 
 
107
static int tina_init_func(void *p)
 
108
{
 
109
  handlerton *tina_hton;
 
110
 
 
111
  tina_hton= (handlerton *)p;
 
112
  VOID(pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST));
 
113
  (void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
 
114
                   (hash_get_key) tina_get_key,0,0);
 
115
  tina_hton->state= SHOW_OPTION_YES;
 
116
  tina_hton->db_type= DB_TYPE_CSV_DB;
 
117
  tina_hton->create= tina_create_handler;
 
118
  tina_hton->flags= (HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES | 
 
119
                     HTON_NO_PARTITION);
 
120
  return 0;
 
121
}
 
122
 
 
123
static int tina_done_func(void *p)
 
124
{
 
125
  hash_free(&tina_open_tables);
 
126
  pthread_mutex_destroy(&tina_mutex);
 
127
 
 
128
  return 0;
 
129
}
 
130
 
 
131
 
 
132
/*
 
133
  Simple lock controls.
 
134
*/
 
135
static TINA_SHARE *get_share(const char *table_name, TABLE *table)
 
136
{
 
137
  TINA_SHARE *share;
 
138
  char meta_file_name[FN_REFLEN];
 
139
  MY_STAT file_stat;                /* Stat information for the data file */
 
140
  char *tmp_name;
 
141
  uint length;
 
142
 
 
143
  pthread_mutex_lock(&tina_mutex);
 
144
  length=(uint) strlen(table_name);
 
145
 
 
146
  /*
 
147
    If share is not present in the hash, create a new share and
 
148
    initialize its members.
 
149
  */
 
150
  if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
 
151
                                        (uchar*) table_name,
 
152
                                       length)))
 
153
  {
 
154
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
155
                         &share, sizeof(*share),
 
156
                         &tmp_name, length+1,
 
157
                         NullS))
 
158
    {
 
159
      pthread_mutex_unlock(&tina_mutex);
 
160
      return NULL;
 
161
    }
 
162
 
 
163
    share->use_count= 0;
 
164
    share->is_log_table= FALSE;
 
165
    share->table_name_length= length;
 
166
    share->table_name= tmp_name;
 
167
    share->crashed= FALSE;
 
168
    share->rows_recorded= 0;
 
169
    share->update_file_opened= FALSE;
 
170
    share->tina_write_opened= FALSE;
 
171
    share->data_file_version= 0;
 
172
    strmov(share->table_name, table_name);
 
173
    fn_format(share->data_file_name, table_name, "", CSV_EXT,
 
174
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
175
    fn_format(meta_file_name, table_name, "", CSM_EXT,
 
176
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
 
177
 
 
178
    if (my_stat(share->data_file_name, &file_stat, MYF(MY_WME)) == NULL)
 
179
      goto error;
 
180
    share->saved_data_file_length= file_stat.st_size;
 
181
 
 
182
    if (my_hash_insert(&tina_open_tables, (uchar*) share))
 
183
      goto error;
 
184
    thr_lock_init(&share->lock);
 
185
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
 
186
 
 
187
    /*
 
188
      Open or create the meta file. In the latter case, we'll get
 
189
      an error during read_meta_file and mark the table as crashed.
 
190
      Usually this will result in auto-repair, and we will get a good
 
191
      meta-file in the end.
 
192
    */
 
193
    if (((share->meta_file= my_open(meta_file_name,
 
194
                                    O_RDWR|O_CREAT, MYF(MY_WME))) == -1) ||
 
195
        read_meta_file(share->meta_file, &share->rows_recorded))
 
196
      share->crashed= TRUE;
 
197
  }
 
198
 
 
199
  share->use_count++;
 
200
  pthread_mutex_unlock(&tina_mutex);
 
201
 
 
202
  return share;
 
203
 
 
204
error:
 
205
  pthread_mutex_unlock(&tina_mutex);
 
206
  my_free((uchar*) share, MYF(0));
 
207
 
 
208
  return NULL;
 
209
}
 
210
 
 
211
 
 
212
/*
 
213
  Read CSV meta-file
 
214
 
 
215
  SYNOPSIS
 
216
    read_meta_file()
 
217
    meta_file   The meta-file filedes
 
218
    ha_rows     Pointer to the var we use to store rows count.
 
219
                These are read from the meta-file.
 
220
 
 
221
  DESCRIPTION
 
222
 
 
223
    Read the meta-file info. For now we are only interested in
 
224
    rows counf, crashed bit and magic number.
 
225
 
 
226
  RETURN
 
227
    0 - OK
 
228
    non-zero - error occurred
 
229
*/
 
230
 
 
231
static int read_meta_file(File meta_file, ha_rows *rows)
 
232
{
 
233
  uchar meta_buffer[META_BUFFER_SIZE];
 
234
  uchar *ptr= meta_buffer;
 
235
 
 
236
  DBUG_ENTER("ha_tina::read_meta_file");
 
237
 
 
238
  VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
 
239
  if (my_read(meta_file, (uchar*)meta_buffer, META_BUFFER_SIZE, 0)
 
240
      != META_BUFFER_SIZE)
 
241
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
242
 
 
243
  /*
 
244
    Parse out the meta data, we ignore version at the moment
 
245
  */
 
246
 
 
247
  ptr+= sizeof(uchar)*2; // Move past header
 
248
  *rows= (ha_rows)uint8korr(ptr);
 
249
  ptr+= sizeof(ulonglong); // Move past rows
 
250
  /*
 
251
    Move past check_point, auto_increment and forced_flushes fields.
 
252
    They are present in the format, but we do not use them yet.
 
253
  */
 
254
  ptr+= 3*sizeof(ulonglong);
 
255
 
 
256
  /* check crashed bit and magic number */
 
257
  if ((meta_buffer[0] != (uchar)TINA_CHECK_HEADER) ||
 
258
      ((bool)(*ptr)== TRUE))
 
259
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
260
 
 
261
  my_sync(meta_file, MYF(MY_WME));
 
262
 
 
263
  DBUG_RETURN(0);
 
264
}
 
265
 
 
266
 
 
267
/*
 
268
  Write CSV meta-file
 
269
 
 
270
  SYNOPSIS
 
271
    write_meta_file()
 
272
    meta_file   The meta-file filedes
 
273
    ha_rows     The number of rows we have in the datafile.
 
274
    dirty       A flag, which marks whether we have a corrupt table
 
275
 
 
276
  DESCRIPTION
 
277
 
 
278
    Write meta-info the the file. Only rows count, crashed bit and
 
279
    magic number matter now.
 
280
 
 
281
  RETURN
 
282
    0 - OK
 
283
    non-zero - error occurred
 
284
*/
 
285
 
 
286
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
 
287
{
 
288
  uchar meta_buffer[META_BUFFER_SIZE];
 
289
  uchar *ptr= meta_buffer;
 
290
 
 
291
  DBUG_ENTER("ha_tina::write_meta_file");
 
292
 
 
293
  *ptr= (uchar)TINA_CHECK_HEADER;
 
294
  ptr+= sizeof(uchar);
 
295
  *ptr= (uchar)TINA_VERSION;
 
296
  ptr+= sizeof(uchar);
 
297
  int8store(ptr, (ulonglong)rows);
 
298
  ptr+= sizeof(ulonglong);
 
299
  memset(ptr, 0, 3*sizeof(ulonglong));
 
300
  /*
 
301
     Skip over checkpoint, autoincrement and forced_flushes fields.
 
302
     We'll need them later.
 
303
  */
 
304
  ptr+= 3*sizeof(ulonglong);
 
305
  *ptr= (uchar)dirty;
 
306
 
 
307
  VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
 
308
  if (my_write(meta_file, (uchar *)meta_buffer, META_BUFFER_SIZE, 0)
 
309
      != META_BUFFER_SIZE)
 
310
    DBUG_RETURN(-1);
 
311
 
 
312
  my_sync(meta_file, MYF(MY_WME));
 
313
 
 
314
  DBUG_RETURN(0);
 
315
}
 
316
 
 
317
bool ha_tina::check_and_repair(THD *thd)
 
318
{
 
319
  HA_CHECK_OPT check_opt;
 
320
  DBUG_ENTER("ha_tina::check_and_repair");
 
321
 
 
322
  check_opt.init();
 
323
 
 
324
  DBUG_RETURN(repair(thd, &check_opt));
 
325
}
 
326
 
 
327
 
 
328
int ha_tina::init_tina_writer()
 
329
{
 
330
  DBUG_ENTER("ha_tina::init_tina_writer");
 
331
 
 
332
  /*
 
333
    Mark the file as crashed. We will set the flag back when we close
 
334
    the file. In the case of the crash it will remain marked crashed,
 
335
    which enforce recovery.
 
336
  */
 
337
  (void)write_meta_file(share->meta_file, share->rows_recorded, TRUE);
 
338
 
 
339
  if ((share->tina_write_filedes=
 
340
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(MY_WME))) == -1)
 
341
  {
 
342
    DBUG_PRINT("info", ("Could not open tina file writes"));
 
343
    share->crashed= TRUE;
 
344
    DBUG_RETURN(my_errno ? my_errno : -1);
 
345
  }
 
346
  share->tina_write_opened= TRUE;
 
347
 
 
348
  DBUG_RETURN(0);
 
349
}
 
350
 
 
351
 
 
352
bool ha_tina::is_crashed() const
 
353
{
 
354
  DBUG_ENTER("ha_tina::is_crashed");
 
355
  DBUG_RETURN(share->crashed);
 
356
}
 
357
 
 
358
/*
 
359
  Free lock controls.
 
360
*/
 
361
static int free_share(TINA_SHARE *share)
 
362
{
 
363
  DBUG_ENTER("ha_tina::free_share");
 
364
  pthread_mutex_lock(&tina_mutex);
 
365
  int result_code= 0;
 
366
  if (!--share->use_count){
 
367
    /* Write the meta file. Mark it as crashed if needed. */
 
368
    (void)write_meta_file(share->meta_file, share->rows_recorded,
 
369
                          share->crashed ? TRUE :FALSE);
 
370
    if (my_close(share->meta_file, MYF(0)))
 
371
      result_code= 1;
 
372
    if (share->tina_write_opened)
 
373
    {
 
374
      if (my_close(share->tina_write_filedes, MYF(0)))
 
375
        result_code= 1;
 
376
      share->tina_write_opened= FALSE;
 
377
    }
 
378
 
 
379
    hash_delete(&tina_open_tables, (uchar*) share);
 
380
    thr_lock_delete(&share->lock);
 
381
    pthread_mutex_destroy(&share->mutex);
 
382
    my_free((uchar*) share, MYF(0));
 
383
  }
 
384
  pthread_mutex_unlock(&tina_mutex);
 
385
 
 
386
  DBUG_RETURN(result_code);
 
387
}
 
388
 
 
389
 
 
390
/*
 
391
  This function finds the end of a line and returns the length
 
392
  of the line ending.
 
393
 
 
394
  We support three kinds of line endings:
 
395
  '\r'     --  Old Mac OS line ending
 
396
  '\n'     --  Traditional Unix and Mac OS X line ending
 
397
  '\r''\n' --  DOS\Windows line ending
 
398
*/
 
399
 
 
400
my_off_t find_eoln_buff(Transparent_file *data_buff, my_off_t begin,
 
401
                     my_off_t end, int *eoln_len)
 
402
{
 
403
  *eoln_len= 0;
 
404
 
 
405
  for (my_off_t x= begin; x < end; x++)
 
406
  {
 
407
    /* Unix (includes Mac OS X) */
 
408
    if (data_buff->get_value(x) == '\n')
 
409
      *eoln_len= 1;
 
410
    else
 
411
      if (data_buff->get_value(x) == '\r') // Mac or Dos
 
412
      {
 
413
        /* old Mac line ending */
 
414
        if (x + 1 == end || (data_buff->get_value(x + 1) != '\n'))
 
415
          *eoln_len= 1;
 
416
        else // DOS style ending
 
417
          *eoln_len= 2;
 
418
      }
 
419
 
 
420
    if (*eoln_len)  // end of line was found
 
421
      return x;
 
422
  }
 
423
 
 
424
  return 0;
 
425
}
 
426
 
 
427
 
 
428
static handler *tina_create_handler(handlerton *hton,
 
429
                                    TABLE_SHARE *table, 
 
430
                                    MEM_ROOT *mem_root)
 
431
{
 
432
  return new (mem_root) ha_tina(hton, table);
 
433
}
 
434
 
 
435
 
 
436
ha_tina::ha_tina(handlerton *hton, TABLE_SHARE *table_arg)
 
437
  :handler(hton, table_arg),
 
438
  /*
 
439
    These definitions are found in handler.h
 
440
    They are not probably completely right.
 
441
  */
 
442
  current_position(0), next_position(0), local_saved_data_file_length(0),
 
443
  file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
 
444
  local_data_file_version(0), records_is_known(0)
 
445
{
 
446
  /* Set our original buffers from pre-allocated memory */
 
447
  buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
 
448
  chain= chain_buffer;
 
449
  file_buff= new Transparent_file();
 
450
}
 
451
 
 
452
 
 
453
/*
 
454
  Encode a buffer into the quoted format.
 
455
*/
 
456
 
 
457
int ha_tina::encode_quote(uchar *buf)
 
458
{
 
459
  char attribute_buffer[1024];
 
460
  String attribute(attribute_buffer, sizeof(attribute_buffer),
 
461
                   &my_charset_bin);
 
462
 
 
463
  my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
 
464
  buffer.length(0);
 
465
 
 
466
  for (Field **field=table->field ; *field ; field++)
 
467
  {
 
468
    const char *ptr;
 
469
    const char *end_ptr;
 
470
    const bool was_null= (*field)->is_null();
 
471
    
 
472
    /*
 
473
      assistance for backwards compatibility in production builds.
 
474
      note: this will not work for ENUM columns.
 
475
    */
 
476
    if (was_null)
 
477
    {
 
478
      (*field)->set_default();
 
479
      (*field)->set_notnull();
 
480
    }
 
481
 
 
482
    (*field)->val_str(&attribute,&attribute);
 
483
    
 
484
    if (was_null)
 
485
      (*field)->set_null();
 
486
 
 
487
    if ((*field)->str_needs_quotes())
 
488
    {
 
489
      ptr= attribute.ptr();
 
490
      end_ptr= attribute.length() + ptr;
 
491
 
 
492
      buffer.append('"');
 
493
 
 
494
      while (ptr < end_ptr) 
 
495
      {
 
496
        if (*ptr == '"')
 
497
        {
 
498
          buffer.append('\\');
 
499
          buffer.append('"');
 
500
          *ptr++;
 
501
        }
 
502
        else if (*ptr == '\r')
 
503
        {
 
504
          buffer.append('\\');
 
505
          buffer.append('r');
 
506
          *ptr++;
 
507
        }
 
508
        else if (*ptr == '\\')
 
509
        {
 
510
          buffer.append('\\');
 
511
          buffer.append('\\');
 
512
          *ptr++;
 
513
        }
 
514
        else if (*ptr == '\n')
 
515
        {
 
516
          buffer.append('\\');
 
517
          buffer.append('n');
 
518
          *ptr++;
 
519
        }
 
520
        else
 
521
          buffer.append(*ptr++);
 
522
      }
 
523
      buffer.append('"');
 
524
    }
 
525
    else
 
526
    {
 
527
      buffer.append(attribute);
 
528
    }
 
529
 
 
530
    buffer.append(',');
 
531
  }
 
532
  // Remove the comma, add a line feed
 
533
  buffer.length(buffer.length() - 1);
 
534
  buffer.append('\n');
 
535
 
 
536
  //buffer.replace(buffer.length(), 0, "\n", 1);
 
537
 
 
538
  dbug_tmp_restore_column_map(table->read_set, org_bitmap);
 
539
  return (buffer.length());
 
540
}
 
541
 
 
542
/*
 
543
  chain_append() adds delete positions to the chain that we use to keep
 
544
  track of space. Then the chain will be used to cleanup "holes", occurred
 
545
  due to deletes and updates.
 
546
*/
 
547
int ha_tina::chain_append()
 
548
{
 
549
  if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
 
550
    (chain_ptr -1)->end= next_position;
 
551
  else
 
552
  {
 
553
    /* We set up for the next position */
 
554
    if ((off_t)(chain_ptr - chain) == (chain_size -1))
 
555
    {
 
556
      my_off_t location= chain_ptr - chain;
 
557
      chain_size += DEFAULT_CHAIN_LENGTH;
 
558
      if (chain_alloced)
 
559
      {
 
560
        /* Must cast since my_malloc unlike malloc doesn't have a void ptr */
 
561
        if ((chain= (tina_set *) my_realloc((uchar*)chain,
 
562
                                            chain_size, MYF(MY_WME))) == NULL)
 
563
          return -1;
 
564
      }
 
565
      else
 
566
      {
 
567
        tina_set *ptr= (tina_set *) my_malloc(chain_size * sizeof(tina_set),
 
568
                                              MYF(MY_WME));
 
569
        memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
 
570
        chain= ptr;
 
571
        chain_alloced++;
 
572
      }
 
573
      chain_ptr= chain + location;
 
574
    }
 
575
    chain_ptr->begin= current_position;
 
576
    chain_ptr->end= next_position;
 
577
    chain_ptr++;
 
578
  }
 
579
 
 
580
  return 0;
 
581
}
 
582
 
 
583
 
 
584
/*
 
585
  Scans for a row.
 
586
*/
 
587
int ha_tina::find_current_row(uchar *buf)
 
588
{
 
589
  my_off_t end_offset, curr_offset= current_position;
 
590
  int eoln_len;
 
591
  my_bitmap_map *org_bitmap;
 
592
  int error;
 
593
  bool read_all;
 
594
  DBUG_ENTER("ha_tina::find_current_row");
 
595
 
 
596
  free_root(&blobroot, MYF(MY_MARK_BLOCKS_FREE));
 
597
 
 
598
  /*
 
599
    We do not read further then local_saved_data_file_length in order
 
600
    not to conflict with undergoing concurrent insert.
 
601
  */
 
602
  if ((end_offset=
 
603
        find_eoln_buff(file_buff, current_position,
 
604
                       local_saved_data_file_length, &eoln_len)) == 0)
 
605
    DBUG_RETURN(HA_ERR_END_OF_FILE);
 
606
 
 
607
  /* We must read all columns in case a table is opened for update */
 
608
  read_all= !bitmap_is_clear_all(table->write_set);
 
609
  /* Avoid asserts in ::store() for columns that are not going to be updated */
 
610
  org_bitmap= dbug_tmp_use_all_columns(table, table->write_set);
 
611
  error= HA_ERR_CRASHED_ON_USAGE;
 
612
 
 
613
  memset(buf, 0, table->s->null_bytes);
 
614
 
 
615
  for (Field **field=table->field ; *field ; field++)
 
616
  {
 
617
    char curr_char;
 
618
    
 
619
    buffer.length(0);
 
620
    if (curr_offset >= end_offset)
 
621
      goto err;
 
622
    curr_char= file_buff->get_value(curr_offset);
 
623
    if (curr_char == '"')
 
624
    {
 
625
      curr_offset++; // Incrementpast the first quote
 
626
 
 
627
      for(; curr_offset < end_offset; curr_offset++)
 
628
      {
 
629
        curr_char= file_buff->get_value(curr_offset);
 
630
        // Need to convert line feeds!
 
631
        if (curr_char == '"' &&
 
632
            (curr_offset == end_offset - 1 ||
 
633
             file_buff->get_value(curr_offset + 1) == ','))
 
634
        {
 
635
          curr_offset+= 2; // Move past the , and the "
 
636
          break;
 
637
        }
 
638
        if (curr_char == '\\' && curr_offset != (end_offset - 1))
 
639
        {
 
640
          curr_offset++;
 
641
          curr_char= file_buff->get_value(curr_offset);
 
642
          if (curr_char == 'r')
 
643
            buffer.append('\r');
 
644
          else if (curr_char == 'n' )
 
645
            buffer.append('\n');
 
646
          else if (curr_char == '\\' || curr_char == '"')
 
647
            buffer.append(curr_char);
 
648
          else  /* This could only happed with an externally created file */
 
649
          {
 
650
            buffer.append('\\');
 
651
            buffer.append(curr_char);
 
652
          }
 
653
        }
 
654
        else // ordinary symbol
 
655
        {
 
656
          /*
 
657
            We are at final symbol and no last quote was found =>
 
658
            we are working with a damaged file.
 
659
          */
 
660
          if (curr_offset == end_offset - 1)
 
661
            goto err;
 
662
          buffer.append(curr_char);
 
663
        }
 
664
      }
 
665
    }
 
666
    else 
 
667
    {
 
668
      for(; curr_offset < end_offset; curr_offset++)
 
669
      {
 
670
        curr_char= file_buff->get_value(curr_offset);
 
671
        if (curr_char == ',')
 
672
        {
 
673
          curr_offset++;       // Skip the ,
 
674
          break;
 
675
        }
 
676
        buffer.append(curr_char);
 
677
      }
 
678
    }
 
679
 
 
680
    if (read_all || bitmap_is_set(table->read_set, (*field)->field_index))
 
681
    {
 
682
      bool is_enum= ((*field)->real_type() ==  MYSQL_TYPE_ENUM);
 
683
      /*
 
684
        Here CHECK_FIELD_WARN checks that all values in the csv file are valid
 
685
        which is normally the case, if they were written  by
 
686
        INSERT -> ha_tina::write_row. '0' values on ENUM fields are considered
 
687
        invalid by Field_enum::store() but it can store them on INSERT anyway.
 
688
        Thus, for enums we silence the warning, as it doesn't really mean
 
689
        an invalid value.
 
690
      */
 
691
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
 
692
                          is_enum ? CHECK_FIELD_IGNORE : CHECK_FIELD_WARN))
 
693
      {
 
694
        if (!is_enum)
 
695
          goto err;
 
696
      }
 
697
      if ((*field)->flags & BLOB_FLAG)
 
698
      {
 
699
        Field_blob *blob= *(Field_blob**) field;
 
700
        uchar *src, *tgt;
 
701
        uint length, packlength;
 
702
        
 
703
        packlength= blob->pack_length_no_ptr();
 
704
        length= blob->get_length(blob->ptr);
 
705
        memcpy_fixed(&src, blob->ptr + packlength, sizeof(char*));
 
706
        if (src)
 
707
        {
 
708
          tgt= (uchar*) alloc_root(&blobroot, length);
 
709
          bmove(tgt, src, length);
 
710
          memcpy_fixed(blob->ptr + packlength, &tgt, sizeof(char*));
 
711
        }
 
712
      }
 
713
    }
 
714
  }
 
715
  next_position= end_offset + eoln_len;
 
716
  error= 0;
 
717
 
 
718
err:
 
719
  dbug_tmp_restore_column_map(table->write_set, org_bitmap);
 
720
 
 
721
  DBUG_RETURN(error);
 
722
}
 
723
 
 
724
/*
 
725
  If frm_error() is called in table.cc this is called to find out what file
 
726
  extensions exist for this handler.
 
727
*/
 
728
static const char *ha_tina_exts[] = {
 
729
  CSV_EXT,
 
730
  CSM_EXT,
 
731
  NullS
 
732
};
 
733
 
 
734
const char **ha_tina::bas_ext() const
 
735
{
 
736
  return ha_tina_exts;
 
737
}
 
738
 
 
739
/*
 
740
  Three functions below are needed to enable concurrent insert functionality
 
741
  for CSV engine. For more details see mysys/thr_lock.c
 
742
*/
 
743
 
 
744
void tina_get_status(void* param, int concurrent_insert)
 
745
{
 
746
  ha_tina *tina= (ha_tina*) param;
 
747
  tina->get_status();
 
748
}
 
749
 
 
750
void tina_update_status(void* param)
 
751
{
 
752
  ha_tina *tina= (ha_tina*) param;
 
753
  tina->update_status();
 
754
}
 
755
 
 
756
/* this should exist and return 0 for concurrent insert to work */
 
757
my_bool tina_check_status(void* param)
 
758
{
 
759
  return 0;
 
760
}
 
761
 
 
762
/*
 
763
  Save the state of the table
 
764
 
 
765
  SYNOPSIS
 
766
    get_status()
 
767
 
 
768
  DESCRIPTION
 
769
    This function is used to retrieve the file length. During the lock
 
770
    phase of concurrent insert. For more details see comment to
 
771
    ha_tina::update_status below.
 
772
*/
 
773
 
 
774
void ha_tina::get_status()
 
775
{
 
776
  if (share->is_log_table)
 
777
  {
 
778
    /*
 
779
      We have to use mutex to follow pthreads memory visibility
 
780
      rules for share->saved_data_file_length
 
781
    */
 
782
    pthread_mutex_lock(&share->mutex);
 
783
    local_saved_data_file_length= share->saved_data_file_length;
 
784
    pthread_mutex_unlock(&share->mutex);
 
785
    return;
 
786
  }
 
787
  local_saved_data_file_length= share->saved_data_file_length;
 
788
}
 
789
 
 
790
 
 
791
/*
 
792
  Correct the state of the table. Called by unlock routines
 
793
  before the write lock is released.
 
794
 
 
795
  SYNOPSIS
 
796
    update_status()
 
797
 
 
798
  DESCRIPTION
 
799
    When we employ concurrent insert lock, we save current length of the file
 
800
    during the lock phase. We do not read further saved value, as we don't
 
801
    want to interfere with undergoing concurrent insert. Writers update file
 
802
    length info during unlock with update_status().
 
803
 
 
804
  NOTE
 
805
    For log tables concurrent insert works different. The reason is that
 
806
    log tables are always opened and locked. And as they do not unlock
 
807
    tables, the file length after writes should be updated in a different
 
808
    way. For this purpose we need is_log_table flag. When this flag is set
 
809
    we call update_status() explicitly after each row write.
 
810
*/
 
811
 
 
812
void ha_tina::update_status()
 
813
{
 
814
  /* correct local_saved_data_file_length for writers */
 
815
  share->saved_data_file_length= local_saved_data_file_length;
 
816
}
 
817
 
 
818
 
 
819
/*
 
820
  Open a database file. Keep in mind that tables are caches, so
 
821
  this will not be called for every request. Any sort of positions
 
822
  that need to be reset should be kept in the ::extra() call.
 
823
*/
 
824
int ha_tina::open(const char *name, int mode, uint open_options)
 
825
{
 
826
  DBUG_ENTER("ha_tina::open");
 
827
 
 
828
  if (!(share= get_share(name, table)))
 
829
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
 
830
 
 
831
  if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
 
832
  {
 
833
    free_share(share);
 
834
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
835
  }
 
836
 
 
837
  local_data_file_version= share->data_file_version;
 
838
  if ((data_file= my_open(share->data_file_name,
 
839
                          O_RDONLY, MYF(MY_WME))) == -1)
 
840
  {
 
841
    free_share(share);
 
842
    DBUG_RETURN(my_errno ? my_errno : -1);
 
843
  }
 
844
 
 
845
  /*
 
846
    Init locking. Pass handler object to the locking routines,
 
847
    so that they could save/update local_saved_data_file_length value
 
848
    during locking. This is needed to enable concurrent inserts.
 
849
  */
 
850
  thr_lock_data_init(&share->lock, &lock, (void*) this);
 
851
  ref_length= sizeof(my_off_t);
 
852
 
 
853
  share->lock.get_status= tina_get_status;
 
854
  share->lock.update_status= tina_update_status;
 
855
  share->lock.check_status= tina_check_status;
 
856
 
 
857
  DBUG_RETURN(0);
 
858
}
 
859
 
 
860
 
 
861
/*
 
862
  Close a database file. We remove ourselves from the shared strucutre.
 
863
  If it is empty we destroy it.
 
864
*/
 
865
int ha_tina::close(void)
 
866
{
 
867
  int rc= 0;
 
868
  DBUG_ENTER("ha_tina::close");
 
869
  rc= my_close(data_file, MYF(0));
 
870
  DBUG_RETURN(free_share(share) || rc);
 
871
}
 
872
 
 
873
/*
 
874
  This is an INSERT. At the moment this handler just seeks to the end
 
875
  of the file and appends the data. In an error case it really should
 
876
  just truncate to the original position (this is not done yet).
 
877
*/
 
878
int ha_tina::write_row(uchar * buf)
 
879
{
 
880
  int size;
 
881
  DBUG_ENTER("ha_tina::write_row");
 
882
 
 
883
  if (share->crashed)
 
884
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
885
 
 
886
  ha_statistic_increment(&SSV::ha_write_count);
 
887
 
 
888
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
889
    table->timestamp_field->set_time();
 
890
 
 
891
  size= encode_quote(buf);
 
892
 
 
893
  if (!share->tina_write_opened)
 
894
    if (init_tina_writer())
 
895
      DBUG_RETURN(-1);
 
896
 
 
897
   /* use pwrite, as concurrent reader could have changed the position */
 
898
  if (my_write(share->tina_write_filedes, (uchar*)buffer.ptr(), size,
 
899
               MYF(MY_WME | MY_NABP)))
 
900
    DBUG_RETURN(-1);
 
901
 
 
902
  /* update local copy of the max position to see our own changes */
 
903
  local_saved_data_file_length+= size;
 
904
 
 
905
  /* update shared info */
 
906
  pthread_mutex_lock(&share->mutex);
 
907
  share->rows_recorded++;
 
908
  /* update status for the log tables */
 
909
  if (share->is_log_table)
 
910
    update_status();
 
911
  pthread_mutex_unlock(&share->mutex);
 
912
 
 
913
  stats.records++;
 
914
  DBUG_RETURN(0);
 
915
}
 
916
 
 
917
 
 
918
int ha_tina::open_update_temp_file_if_needed()
 
919
{
 
920
  char updated_fname[FN_REFLEN];
 
921
 
 
922
  if (!share->update_file_opened)
 
923
  {
 
924
    if ((update_temp_file=
 
925
           my_create(fn_format(updated_fname, share->table_name,
 
926
                               "", CSN_EXT,
 
927
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
928
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
 
929
      return 1;
 
930
    share->update_file_opened= TRUE;
 
931
    temp_file_length= 0;
 
932
  }
 
933
  return 0;
 
934
}
 
935
 
 
936
/*
 
937
  This is called for an update.
 
938
  Make sure you put in code to increment the auto increment, also
 
939
  update any timestamp data. Currently auto increment is not being
 
940
  fixed since autoincrements have yet to be added to this table handler.
 
941
  This will be called in a table scan right before the previous ::rnd_next()
 
942
  call.
 
943
*/
 
944
int ha_tina::update_row(const uchar * old_data, uchar * new_data)
 
945
{
 
946
  int size;
 
947
  int rc= -1;
 
948
  DBUG_ENTER("ha_tina::update_row");
 
949
 
 
950
  ha_statistic_increment(&SSV::ha_update_count);
 
951
 
 
952
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 
953
    table->timestamp_field->set_time();
 
954
 
 
955
  size= encode_quote(new_data);
 
956
 
 
957
  /*
 
958
    During update we mark each updating record as deleted 
 
959
    (see the chain_append()) then write new one to the temporary data file. 
 
960
    At the end of the sequence in the rnd_end() we append all non-marked
 
961
    records from the data file to the temporary data file then rename it.
 
962
    The temp_file_length is used to calculate new data file length.
 
963
  */
 
964
  if (chain_append())
 
965
    goto err;
 
966
 
 
967
  if (open_update_temp_file_if_needed())
 
968
    goto err;
 
969
 
 
970
  if (my_write(update_temp_file, (uchar*)buffer.ptr(), size,
 
971
               MYF(MY_WME | MY_NABP)))
 
972
    goto err;
 
973
  temp_file_length+= size;
 
974
  rc= 0;
 
975
 
 
976
  /* UPDATE should never happen on the log tables */
 
977
  DBUG_ASSERT(!share->is_log_table);
 
978
 
 
979
err:
 
980
  DBUG_PRINT("info",("rc = %d", rc));
 
981
  DBUG_RETURN(rc);
 
982
}
 
983
 
 
984
 
 
985
/*
 
986
  Deletes a row. First the database will find the row, and then call this
 
987
  method. In the case of a table scan, the previous call to this will be
 
988
  the ::rnd_next() that found this row.
 
989
  The exception to this is an ORDER BY. This will cause the table handler
 
990
  to walk the table noting the positions of all rows that match a query.
 
991
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
 
992
  DESC, ASC).
 
993
*/
 
994
int ha_tina::delete_row(const uchar * buf)
 
995
{
 
996
  DBUG_ENTER("ha_tina::delete_row");
 
997
  ha_statistic_increment(&SSV::ha_delete_count);
 
998
 
 
999
  if (chain_append())
 
1000
    DBUG_RETURN(-1);
 
1001
 
 
1002
  stats.records--;
 
1003
  /* Update shared info */
 
1004
  DBUG_ASSERT(share->rows_recorded);
 
1005
  pthread_mutex_lock(&share->mutex);
 
1006
  share->rows_recorded--;
 
1007
  pthread_mutex_unlock(&share->mutex);
 
1008
 
 
1009
  /* DELETE should never happen on the log table */
 
1010
  DBUG_ASSERT(!share->is_log_table);
 
1011
 
 
1012
  DBUG_RETURN(0);
 
1013
}
 
1014
 
 
1015
 
 
1016
/**
 
1017
  @brief Initialize the data file.
 
1018
  
 
1019
  @details Compare the local version of the data file with the shared one.
 
1020
  If they differ, there are some changes behind and we have to reopen
 
1021
  the data file to make the changes visible.
 
1022
  Call @c file_buff->init_buff() at the end to read the beginning of the 
 
1023
  data file into buffer.
 
1024
  
 
1025
  @retval  0  OK.
 
1026
  @retval  1  There was an error.
 
1027
*/
 
1028
 
 
1029
int ha_tina::init_data_file()
 
1030
{
 
1031
  if (local_data_file_version != share->data_file_version)
 
1032
  {
 
1033
    local_data_file_version= share->data_file_version;
 
1034
    if (my_close(data_file, MYF(0)) ||
 
1035
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(MY_WME))) == -1)
 
1036
      return my_errno ? my_errno : -1;
 
1037
  }
 
1038
  file_buff->init_buff(data_file);
 
1039
  return 0;
 
1040
}
 
1041
 
 
1042
 
 
1043
/*
 
1044
  All table scans call this first.
 
1045
  The order of a table scan is:
 
1046
 
 
1047
  ha_tina::store_lock
 
1048
  ha_tina::external_lock
 
1049
  ha_tina::info
 
1050
  ha_tina::rnd_init
 
1051
  ha_tina::extra
 
1052
  ENUM HA_EXTRA_CACHE   Cash record in HA_rrnd()
 
1053
  ha_tina::rnd_next
 
1054
  ha_tina::rnd_next
 
1055
  ha_tina::rnd_next
 
1056
  ha_tina::rnd_next
 
1057
  ha_tina::rnd_next
 
1058
  ha_tina::rnd_next
 
1059
  ha_tina::rnd_next
 
1060
  ha_tina::rnd_next
 
1061
  ha_tina::rnd_next
 
1062
  ha_tina::extra
 
1063
  ENUM HA_EXTRA_NO_CACHE   End cacheing of records (def)
 
1064
  ha_tina::external_lock
 
1065
  ha_tina::extra
 
1066
  ENUM HA_EXTRA_RESET   Reset database to after open
 
1067
 
 
1068
  Each call to ::rnd_next() represents a row returned in the can. When no more
 
1069
  rows can be returned, rnd_next() returns a value of HA_ERR_END_OF_FILE.
 
1070
  The ::info() call is just for the optimizer.
 
1071
 
 
1072
*/
 
1073
 
 
1074
int ha_tina::rnd_init(bool scan)
 
1075
{
 
1076
  DBUG_ENTER("ha_tina::rnd_init");
 
1077
 
 
1078
  /* set buffer to the beginning of the file */
 
1079
  if (share->crashed || init_data_file())
 
1080
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
1081
 
 
1082
  current_position= next_position= 0;
 
1083
  stats.records= 0;
 
1084
  records_is_known= 0;
 
1085
  chain_ptr= chain;
 
1086
 
 
1087
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1088
 
 
1089
  DBUG_RETURN(0);
 
1090
}
 
1091
 
 
1092
/*
 
1093
  ::rnd_next() does all the heavy lifting for a table scan. You will need to
 
1094
  populate *buf with the correct field data. You can walk the field to
 
1095
  determine at what position you should store the data (take a look at how
 
1096
  ::find_current_row() works). The structure is something like:
 
1097
  0Foo  Dog  Friend
 
1098
  The first offset is for the first attribute. All space before that is
 
1099
  reserved for null count.
 
1100
  Basically this works as a mask for which rows are nulled (compared to just
 
1101
  empty).
 
1102
  This table handler doesn't do nulls and does not know the difference between
 
1103
  NULL and "". This is ok since this table handler is for spreadsheets and
 
1104
  they don't know about them either :)
 
1105
*/
 
1106
int ha_tina::rnd_next(uchar *buf)
 
1107
{
 
1108
  int rc;
 
1109
  DBUG_ENTER("ha_tina::rnd_next");
 
1110
 
 
1111
  if (share->crashed)
 
1112
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
1113
 
 
1114
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
 
1115
 
 
1116
  current_position= next_position;
 
1117
 
 
1118
  /* don't scan an empty file */
 
1119
  if (!local_saved_data_file_length)
 
1120
    DBUG_RETURN(HA_ERR_END_OF_FILE);
 
1121
 
 
1122
  if ((rc= find_current_row(buf)))
 
1123
    DBUG_RETURN(rc);
 
1124
 
 
1125
  stats.records++;
 
1126
  DBUG_RETURN(0);
 
1127
}
 
1128
 
 
1129
/*
 
1130
  In the case of an order by rows will need to be sorted.
 
1131
  ::position() is called after each call to ::rnd_next(),
 
1132
  the data it stores is to a byte array. You can store this
 
1133
  data via my_store_ptr(). ref_length is a variable defined to the
 
1134
  class that is the sizeof() of position being stored. In our case
 
1135
  its just a position. Look at the bdb code if you want to see a case
 
1136
  where something other then a number is stored.
 
1137
*/
 
1138
void ha_tina::position(const uchar *record)
 
1139
{
 
1140
  DBUG_ENTER("ha_tina::position");
 
1141
  my_store_ptr(ref, ref_length, current_position);
 
1142
  DBUG_VOID_RETURN;
 
1143
}
 
1144
 
 
1145
 
 
1146
/*
 
1147
  Used to fetch a row from a posiion stored with ::position().
 
1148
  my_get_ptr() retrieves the data for you.
 
1149
*/
 
1150
 
 
1151
int ha_tina::rnd_pos(uchar * buf, uchar *pos)
 
1152
{
 
1153
  DBUG_ENTER("ha_tina::rnd_pos");
 
1154
  ha_statistic_increment(&SSV::ha_read_rnd_count);
 
1155
  current_position= my_get_ptr(pos,ref_length);
 
1156
  DBUG_RETURN(find_current_row(buf));
 
1157
}
 
1158
 
 
1159
/*
 
1160
  ::info() is used to return information to the optimizer.
 
1161
  Currently this table handler doesn't implement most of the fields
 
1162
  really needed. SHOW also makes use of this data
 
1163
*/
 
1164
int ha_tina::info(uint flag)
 
1165
{
 
1166
  DBUG_ENTER("ha_tina::info");
 
1167
  /* This is a lie, but you don't want the optimizer to see zero or 1 */
 
1168
  if (!records_is_known && stats.records < 2) 
 
1169
    stats.records= 2;
 
1170
  DBUG_RETURN(0);
 
1171
}
 
1172
 
 
1173
/*
 
1174
  Grab bag of flags that are sent to the able handler every so often.
 
1175
  HA_EXTRA_RESET and HA_EXTRA_RESET_STATE are the most frequently called.
 
1176
  You are not required to implement any of these.
 
1177
*/
 
1178
int ha_tina::extra(enum ha_extra_function operation)
 
1179
{
 
1180
  DBUG_ENTER("ha_tina::extra");
 
1181
 if (operation == HA_EXTRA_MARK_AS_LOG_TABLE)
 
1182
 {
 
1183
   pthread_mutex_lock(&share->mutex);
 
1184
   share->is_log_table= TRUE;
 
1185
   pthread_mutex_unlock(&share->mutex);
 
1186
 }
 
1187
  DBUG_RETURN(0);
 
1188
}
 
1189
 
 
1190
/*
 
1191
  Set end_pos to the last valid byte of continuous area, closest
 
1192
  to the given "hole", stored in the buffer. "Valid" here means,
 
1193
  not listed in the chain of deleted records ("holes").
 
1194
*/
 
1195
bool ha_tina::get_write_pos(my_off_t *end_pos, tina_set *closest_hole)
 
1196
{
 
1197
  if (closest_hole == chain_ptr) /* no more chains */
 
1198
    *end_pos= file_buff->end();
 
1199
  else
 
1200
    *end_pos= min(file_buff->end(),
 
1201
                  closest_hole->begin);
 
1202
  return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
 
1203
}
 
1204
 
 
1205
 
 
1206
/*
 
1207
  Called after each table scan. In particular after deletes,
 
1208
  and updates. In the last case we employ chain of deleted
 
1209
  slots to clean up all of the dead space we have collected while
 
1210
  performing deletes/updates.
 
1211
*/
 
1212
int ha_tina::rnd_end()
 
1213
{
 
1214
  char updated_fname[FN_REFLEN];
 
1215
  my_off_t file_buffer_start= 0;
 
1216
  DBUG_ENTER("ha_tina::rnd_end");
 
1217
 
 
1218
  free_root(&blobroot, MYF(0));
 
1219
  records_is_known= 1;
 
1220
 
 
1221
  if ((chain_ptr - chain)  > 0)
 
1222
  {
 
1223
    tina_set *ptr= chain;
 
1224
 
 
1225
    /*
 
1226
      Re-read the beginning of a file (as the buffer should point to the
 
1227
      end of file after the scan).
 
1228
    */
 
1229
    file_buff->init_buff(data_file);
 
1230
 
 
1231
    /*
 
1232
      The sort is needed when there were updates/deletes with random orders.
 
1233
      It sorts so that we move the firts blocks to the beginning.
 
1234
    */
 
1235
    my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
 
1236
             (qsort_cmp)sort_set);
 
1237
 
 
1238
    my_off_t write_begin= 0, write_end;
 
1239
 
 
1240
    /* create the file to write updated table if it wasn't yet created */
 
1241
    if (open_update_temp_file_if_needed())
 
1242
      DBUG_RETURN(-1);
 
1243
 
 
1244
    /* write the file with updated info */
 
1245
    while ((file_buffer_start != (my_off_t)-1))     // while not end of file
 
1246
    {
 
1247
      bool in_hole= get_write_pos(&write_end, ptr);
 
1248
      my_off_t write_length= write_end - write_begin;
 
1249
 
 
1250
      /* if there is something to write, write it */
 
1251
      if (write_length)
 
1252
      {
 
1253
        if (my_write(update_temp_file, 
 
1254
                     (uchar*) (file_buff->ptr() +
 
1255
                               (write_begin - file_buff->start())),
 
1256
                     (size_t)write_length, MYF_RW))
 
1257
          goto error;
 
1258
        temp_file_length+= write_length;
 
1259
      }
 
1260
      if (in_hole)
 
1261
      {
 
1262
        /* skip hole */
 
1263
        while (file_buff->end() <= ptr->end &&
 
1264
               file_buffer_start != (my_off_t)-1)
 
1265
          file_buffer_start= file_buff->read_next();
 
1266
        write_begin= ptr->end;
 
1267
        ptr++;
 
1268
      }
 
1269
      else
 
1270
        write_begin= write_end;
 
1271
 
 
1272
      if (write_end == file_buff->end())
 
1273
        file_buffer_start= file_buff->read_next(); /* shift the buffer */
 
1274
 
 
1275
    }
 
1276
 
 
1277
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
 
1278
        my_close(update_temp_file, MYF(0)))
 
1279
      DBUG_RETURN(-1);
 
1280
 
 
1281
    share->update_file_opened= FALSE;
 
1282
 
 
1283
    if (share->tina_write_opened)
 
1284
    {
 
1285
      if (my_close(share->tina_write_filedes, MYF(0)))
 
1286
        DBUG_RETURN(-1);
 
1287
      /*
 
1288
        Mark that the writer fd is closed, so that init_tina_writer()
 
1289
        will reopen it later.
 
1290
      */
 
1291
      share->tina_write_opened= FALSE;
 
1292
    }
 
1293
 
 
1294
    /*
 
1295
      Close opened fildes's. Then move updated file in place
 
1296
      of the old datafile.
 
1297
    */
 
1298
    if (my_close(data_file, MYF(0)) ||
 
1299
        my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT,
 
1300
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
 
1301
                  share->data_file_name, MYF(0)))
 
1302
      DBUG_RETURN(-1);
 
1303
 
 
1304
    /* Open the file again */
 
1305
    if (((data_file= my_open(share->data_file_name,
 
1306
                             O_RDONLY, MYF(MY_WME))) == -1))
 
1307
      DBUG_RETURN(my_errno ? my_errno : -1);
 
1308
    /*
 
1309
      As we reopened the data file, increase share->data_file_version 
 
1310
      in order to force other threads waiting on a table lock and  
 
1311
      have already opened the table to reopen the data file.
 
1312
      That makes the latest changes become visible to them.
 
1313
      Update local_data_file_version as no need to reopen it in the 
 
1314
      current thread.
 
1315
    */
 
1316
    share->data_file_version++;
 
1317
    local_data_file_version= share->data_file_version;
 
1318
    /*
 
1319
      The datafile is consistent at this point and the write filedes is
 
1320
      closed, so nothing worrying will happen to it in case of a crash.
 
1321
      Here we record this fact to the meta-file.
 
1322
    */
 
1323
    (void)write_meta_file(share->meta_file, share->rows_recorded, FALSE);
 
1324
    /* 
 
1325
      Update local_saved_data_file_length with the real length of the 
 
1326
      data file.
 
1327
    */
 
1328
    local_saved_data_file_length= temp_file_length;
 
1329
  }
 
1330
 
 
1331
  DBUG_RETURN(0);
 
1332
error:
 
1333
  my_close(update_temp_file, MYF(0));
 
1334
  share->update_file_opened= FALSE;
 
1335
  DBUG_RETURN(-1);
 
1336
}
 
1337
 
 
1338
 
 
1339
/*
 
1340
  Repair CSV table in the case, it is crashed.
 
1341
 
 
1342
  SYNOPSIS
 
1343
    repair()
 
1344
    thd         The thread, performing repair
 
1345
    check_opt   The options for repair. We do not use it currently.
 
1346
 
 
1347
  DESCRIPTION
 
1348
    If the file is empty, change # of rows in the file and complete recovery.
 
1349
    Otherwise, scan the table looking for bad rows. If none were found,
 
1350
    we mark file as a good one and return. If a bad row was encountered,
 
1351
    we truncate the datafile up to the last good row.
 
1352
 
 
1353
   TODO: Make repair more clever - it should try to recover subsequent
 
1354
         rows (after the first bad one) as well.
 
1355
*/
 
1356
 
 
1357
int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt)
 
1358
{
 
1359
  char repaired_fname[FN_REFLEN];
 
1360
  uchar *buf;
 
1361
  File repair_file;
 
1362
  int rc;
 
1363
  ha_rows rows_repaired= 0;
 
1364
  my_off_t write_begin= 0, write_end;
 
1365
  DBUG_ENTER("ha_tina::repair");
 
1366
 
 
1367
  /* empty file */
 
1368
  if (!share->saved_data_file_length)
 
1369
  {
 
1370
    share->rows_recorded= 0;
 
1371
    goto end;
 
1372
  }
 
1373
 
 
1374
  /* Don't assert in field::val() functions */
 
1375
  table->use_all_columns();
 
1376
  if (!(buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
 
1377
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
 
1378
 
 
1379
  /* position buffer to the start of the file */
 
1380
  if (init_data_file())
 
1381
    DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
 
1382
 
 
1383
  /*
 
1384
    Local_saved_data_file_length is initialized during the lock phase.
 
1385
    Sometimes this is not getting executed before ::repair (e.g. for
 
1386
    the log tables). We set it manually here.
 
1387
  */
 
1388
  local_saved_data_file_length= share->saved_data_file_length;
 
1389
  /* set current position to the beginning of the file */
 
1390
  current_position= next_position= 0;
 
1391
 
 
1392
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1393
 
 
1394
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1395
  while (!(rc= find_current_row(buf)))
 
1396
  {
 
1397
    thd_inc_row_count(thd);
 
1398
    rows_repaired++;
 
1399
    current_position= next_position;
 
1400
  }
 
1401
 
 
1402
  free_root(&blobroot, MYF(0));
 
1403
 
 
1404
  my_free((char*)buf, MYF(0));
 
1405
 
 
1406
  if (rc == HA_ERR_END_OF_FILE)
 
1407
  {
 
1408
    /*
 
1409
      All rows were read ok until end of file, the file does not need repair.
 
1410
      If rows_recorded != rows_repaired, we should update rows_recorded value
 
1411
      to the current amount of rows.
 
1412
    */
 
1413
    share->rows_recorded= rows_repaired;
 
1414
    goto end;
 
1415
  }
 
1416
 
 
1417
  /*
 
1418
    Otherwise we've encountered a bad row => repair is needed.
 
1419
    Let us create a temporary file.
 
1420
  */
 
1421
  if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
 
1422
                                        "", CSN_EXT,
 
1423
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),
 
1424
                           0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1425
    DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
 
1426
 
 
1427
  file_buff->init_buff(data_file);
 
1428
 
 
1429
 
 
1430
  /* we just truncated the file up to the first bad row. update rows count. */
 
1431
  share->rows_recorded= rows_repaired;
 
1432
 
 
1433
  /* write repaired file */
 
1434
  while (1)
 
1435
  {
 
1436
    write_end= min(file_buff->end(), current_position);
 
1437
    if ((write_end - write_begin) &&
 
1438
        (my_write(repair_file, (uchar*)file_buff->ptr(),
 
1439
                  (size_t) (write_end - write_begin), MYF_RW)))
 
1440
      DBUG_RETURN(-1);
 
1441
 
 
1442
    write_begin= write_end;
 
1443
    if (write_end== current_position)
 
1444
      break;
 
1445
    else
 
1446
      file_buff->read_next(); /* shift the buffer */
 
1447
  }
 
1448
 
 
1449
  /*
 
1450
    Close the files and rename repaired file to the datafile.
 
1451
    We have to close the files, as on Windows one cannot rename
 
1452
    a file, which descriptor is still open. EACCES will be returned
 
1453
    when trying to delete the "to"-file in my_rename().
 
1454
  */
 
1455
  if (share->tina_write_opened)
 
1456
  {
 
1457
    /*
 
1458
      Data file might be opened twice, on table opening stage and
 
1459
      during write_row execution. We need to close both instances
 
1460
      to satisfy Win.
 
1461
    */
 
1462
    if (my_close(share->tina_write_filedes, MYF(0)))
 
1463
      DBUG_RETURN(my_errno ? my_errno : -1);
 
1464
    share->tina_write_opened= FALSE;
 
1465
  }
 
1466
  if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
 
1467
      my_rename(repaired_fname, share->data_file_name, MYF(0)))
 
1468
    DBUG_RETURN(-1);
 
1469
 
 
1470
  /* Open the file again, it should now be repaired */
 
1471
  if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
 
1472
                          MYF(MY_WME))) == -1)
 
1473
     DBUG_RETURN(my_errno ? my_errno : -1);
 
1474
 
 
1475
  /* Set new file size. The file size will be updated by ::update_status() */
 
1476
  local_saved_data_file_length= (size_t) current_position;
 
1477
 
 
1478
end:
 
1479
  share->crashed= FALSE;
 
1480
  DBUG_RETURN(HA_ADMIN_OK);
 
1481
}
 
1482
 
 
1483
/*
 
1484
  DELETE without WHERE calls this
 
1485
*/
 
1486
 
 
1487
int ha_tina::delete_all_rows()
 
1488
{
 
1489
  int rc;
 
1490
  DBUG_ENTER("ha_tina::delete_all_rows");
 
1491
 
 
1492
  if (!records_is_known)
 
1493
    DBUG_RETURN(my_errno=HA_ERR_WRONG_COMMAND);
 
1494
 
 
1495
  if (!share->tina_write_opened)
 
1496
    if (init_tina_writer())
 
1497
      DBUG_RETURN(-1);
 
1498
 
 
1499
  /* Truncate the file to zero size */
 
1500
  rc= my_chsize(share->tina_write_filedes, 0, 0, MYF(MY_WME));
 
1501
 
 
1502
  stats.records=0;
 
1503
  /* Update shared info */
 
1504
  pthread_mutex_lock(&share->mutex);
 
1505
  share->rows_recorded= 0;
 
1506
  pthread_mutex_unlock(&share->mutex);
 
1507
  local_saved_data_file_length= 0;
 
1508
  DBUG_RETURN(rc);
 
1509
}
 
1510
 
 
1511
/*
 
1512
  Called by the database to lock the table. Keep in mind that this
 
1513
  is an internal lock.
 
1514
*/
 
1515
THR_LOCK_DATA **ha_tina::store_lock(THD *thd,
 
1516
                                    THR_LOCK_DATA **to,
 
1517
                                    enum thr_lock_type lock_type)
 
1518
{
 
1519
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
 
1520
    lock.type=lock_type;
 
1521
  *to++= &lock;
 
1522
  return to;
 
1523
}
 
1524
 
 
1525
/* 
 
1526
  Create a table. You do not want to leave the table open after a call to
 
1527
  this (the database will call ::open() if it needs to).
 
1528
*/
 
1529
 
 
1530
int ha_tina::create(const char *name, TABLE *table_arg,
 
1531
                    HA_CREATE_INFO *create_info)
 
1532
{
 
1533
  char name_buff[FN_REFLEN];
 
1534
  File create_file;
 
1535
  DBUG_ENTER("ha_tina::create");
 
1536
 
 
1537
  /*
 
1538
    check columns
 
1539
  */
 
1540
  for (Field **field= table_arg->s->field; *field; field++)
 
1541
  {
 
1542
    if ((*field)->real_maybe_null())
 
1543
    {
 
1544
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
 
1545
      DBUG_RETURN(HA_ERR_UNSUPPORTED);
 
1546
    }
 
1547
  }
 
1548
  
 
1549
 
 
1550
  if ((create_file= my_create(fn_format(name_buff, name, "", CSM_EXT,
 
1551
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
 
1552
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1553
    DBUG_RETURN(-1);
 
1554
 
 
1555
  write_meta_file(create_file, 0, FALSE);
 
1556
  my_close(create_file, MYF(0));
 
1557
 
 
1558
  if ((create_file= my_create(fn_format(name_buff, name, "", CSV_EXT,
 
1559
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
 
1560
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
 
1561
    DBUG_RETURN(-1);
 
1562
 
 
1563
  my_close(create_file, MYF(0));
 
1564
 
 
1565
  DBUG_RETURN(0);
 
1566
}
 
1567
 
 
1568
int ha_tina::check(THD* thd, HA_CHECK_OPT* check_opt)
 
1569
{
 
1570
  int rc= 0;
 
1571
  uchar *buf;
 
1572
  const char *old_proc_info;
 
1573
  ha_rows count= share->rows_recorded;
 
1574
  DBUG_ENTER("ha_tina::check");
 
1575
 
 
1576
  old_proc_info= thd_proc_info(thd, "Checking table");
 
1577
  if (!(buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
 
1578
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
 
1579
 
 
1580
  /* position buffer to the start of the file */
 
1581
   if (init_data_file())
 
1582
     DBUG_RETURN(HA_ERR_CRASHED);
 
1583
 
 
1584
  /*
 
1585
    Local_saved_data_file_length is initialized during the lock phase.
 
1586
    Check does not use store_lock in certain cases. So, we set it
 
1587
    manually here.
 
1588
  */
 
1589
  local_saved_data_file_length= share->saved_data_file_length;
 
1590
  /* set current position to the beginning of the file */
 
1591
  current_position= next_position= 0;
 
1592
 
 
1593
  init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
 
1594
 
 
1595
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
 
1596
  while (!(rc= find_current_row(buf)))
 
1597
  {
 
1598
    thd_inc_row_count(thd);
 
1599
    count--;
 
1600
    current_position= next_position;
 
1601
  }
 
1602
  
 
1603
  free_root(&blobroot, MYF(0));
 
1604
 
 
1605
  my_free((char*)buf, MYF(0));
 
1606
  thd_proc_info(thd, old_proc_info);
 
1607
 
 
1608
  if ((rc != HA_ERR_END_OF_FILE) || count)
 
1609
  {
 
1610
    share->crashed= TRUE;
 
1611
    DBUG_RETURN(HA_ADMIN_CORRUPT);
 
1612
  }
 
1613
 
 
1614
  DBUG_RETURN(HA_ADMIN_OK);
 
1615
}
 
1616
 
 
1617
 
 
1618
bool ha_tina::check_if_incompatible_data(HA_CREATE_INFO *info,
 
1619
                                           uint table_changes)
 
1620
{
 
1621
  return COMPATIBLE_DATA_YES;
 
1622
}
 
1623
 
 
1624
struct st_mysql_storage_engine csv_storage_engine=
 
1625
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
 
1626
 
 
1627
mysql_declare_plugin(csv)
 
1628
{
 
1629
  MYSQL_STORAGE_ENGINE_PLUGIN,
 
1630
  &csv_storage_engine,
 
1631
  "CSV",
 
1632
  "Brian Aker, MySQL AB",
 
1633
  "CSV storage engine",
 
1634
  PLUGIN_LICENSE_GPL,
 
1635
  tina_init_func, /* Plugin Init */
 
1636
  tina_done_func, /* Plugin Deinit */
 
1637
  0x0100 /* 1.0 */,
 
1638
  NULL,                       /* status variables                */
 
1639
  NULL,                       /* system variables                */
 
1640
  NULL                        /* config options                  */
 
1641
}
 
1642
mysql_declare_plugin_end;
 
1643