~ubuntu-branches/ubuntu/saucy/drizzle/saucy-proposed

« back to all changes in this revision

Viewing changes to drizzled/sql_load.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-03-18 12:12:31 UTC
  • Revision ID: james.westby@ubuntu.com-20100318121231-k6g1xe6cshbwa0f8
Tags: upstream-2010.03.1347
ImportĀ upstreamĀ versionĀ 2010.03.1347

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2006 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/* Copy data from a textfile to table */
 
18
 
 
19
#include "config.h"
 
20
#include <drizzled/sql_load.h>
 
21
#include <drizzled/error.h>
 
22
#include <drizzled/data_home.h>
 
23
#include <drizzled/session.h>
 
24
#include <drizzled/sql_base.h>
 
25
#include <drizzled/field/timestamp.h>
 
26
#include "drizzled/internal/my_sys.h"
 
27
#include "drizzled/internal/iocache.h"
 
28
#include <drizzled/db.h>
 
29
 
 
30
#include <sys/stat.h>
 
31
#include <fcntl.h>
 
32
#include <algorithm>
 
33
#include <climits>
 
34
 
 
35
using namespace std;
 
36
namespace drizzled
 
37
{
 
38
 
 
39
class READ_INFO {
 
40
  int   cursor;
 
41
  unsigned char *buffer;                /* Buffer for read text */
 
42
  unsigned char *end_of_buff;           /* Data in bufferts ends here */
 
43
  size_t buff_length;                   /* Length of buffert */
 
44
  size_t max_length;                    /* Max length of row */
 
45
  char  *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
 
46
  uint  field_term_length,line_term_length,enclosed_length;
 
47
  int   field_term_char,line_term_char,enclosed_char,escape_char;
 
48
  int   *stack,*stack_pos;
 
49
  bool  found_end_of_line,start_of_line,eof;
 
50
  bool  need_end_io_cache;
 
51
  internal::IO_CACHE cache;
 
52
 
 
53
public:
 
54
  bool error,line_cuted,found_null,enclosed;
 
55
  unsigned char *row_start,                     /* Found row starts here */
 
56
        *row_end;                       /* Found row ends here */
 
57
  const CHARSET_INFO *read_charset;
 
58
 
 
59
  READ_INFO(int cursor, size_t tot_length, const CHARSET_INFO * const cs,
 
60
            String &field_term,String &line_start,String &line_term,
 
61
            String &enclosed,int escape, bool is_fifo);
 
62
  ~READ_INFO();
 
63
  int read_field();
 
64
  int read_fixed_length(void);
 
65
  int next_line(void);
 
66
  char unescape(char chr);
 
67
  int terminator(char *ptr,uint32_t length);
 
68
  bool find_start_of_fields();
 
69
 
 
70
  /*
 
71
    We need to force cache close before destructor is invoked to log
 
72
    the last read block
 
73
  */
 
74
  void end_io_cache()
 
75
  {
 
76
    internal::end_io_cache(&cache);
 
77
    need_end_io_cache = 0;
 
78
  }
 
79
 
 
80
  /*
 
81
    Either this method, or we need to make cache public
 
82
    Arg must be set from mysql_load() since constructor does not see
 
83
    either the table or Session value
 
84
  */
 
85
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
 
86
};
 
87
 
 
88
static int read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
 
89
                             List<Item> &fields_vars, List<Item> &set_fields,
 
90
                             List<Item> &set_values, READ_INFO &read_info,
 
91
                             uint32_t skip_lines,
 
92
                             bool ignore_check_option_errors);
 
93
static int read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
 
94
                          List<Item> &fields_vars, List<Item> &set_fields,
 
95
                          List<Item> &set_values, READ_INFO &read_info,
 
96
                          String &enclosed, uint32_t skip_lines,
 
97
                          bool ignore_check_option_errors);
 
98
 
 
99
 
 
100
/*
 
101
  Execute LOAD DATA query
 
102
 
 
103
  SYNOPSYS
 
104
    mysql_load()
 
105
      session - current thread
 
106
      ex  - file_exchange object representing source cursor and its parsing rules
 
107
      table_list  - list of tables to which we are loading data
 
108
      fields_vars - list of fields and variables to which we read
 
109
                    data from cursor
 
110
      set_fields  - list of fields mentioned in set clause
 
111
      set_values  - expressions to assign to fields in previous list
 
112
      handle_duplicates - indicates whenever we should emit error or
 
113
                          replace row if we will meet duplicates.
 
114
      ignore -          - indicates whenever we should ignore duplicates
 
115
 
 
116
  RETURN VALUES
 
117
    true - error / false - success
 
118
*/
 
119
 
 
120
int mysql_load(Session *session,file_exchange *ex,TableList *table_list,
 
121
                List<Item> &fields_vars, List<Item> &set_fields,
 
122
                List<Item> &set_values,
 
123
                enum enum_duplicates handle_duplicates, bool ignore)
 
124
{
 
125
  char name[FN_REFLEN];
 
126
  int file;
 
127
  Table *table= NULL;
 
128
  int error;
 
129
  String *field_term=ex->field_term,*escaped=ex->escaped;
 
130
  String *enclosed=ex->enclosed;
 
131
  bool is_fifo=0;
 
132
  char *db= table_list->db;                     // This is never null
 
133
  assert(db);
 
134
  /*
 
135
    If path for cursor is not defined, we will use the current database.
 
136
    If this is not set, we will use the directory where the table to be
 
137
    loaded is located
 
138
  */
 
139
  const char *tdb= session->db.empty() ? db  : session->db.c_str();             // Result is never null
 
140
  assert(tdb);
 
141
  uint32_t skip_lines= ex->skip_lines;
 
142
  bool transactional_table;
 
143
  Session::killed_state killed_status= Session::NOT_KILLED;
 
144
 
 
145
  /* Escape and enclosed character may be a utf8 4-byte character */
 
146
  if (escaped->length() > 4 || enclosed->length() > 4)
 
147
  {
 
148
    my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
 
149
    return(true);
 
150
  }
 
151
 
 
152
  if (session->openTablesLock(table_list))
 
153
    return(true);
 
154
 
 
155
  if (setup_tables_and_check_access(session, &session->lex->select_lex.context,
 
156
                                    &session->lex->select_lex.top_join_list,
 
157
                                    table_list,
 
158
                                    &session->lex->select_lex.leaf_tables, true))
 
159
     return(-1);
 
160
 
 
161
  /*
 
162
    Let us emit an error if we are loading data to table which is used
 
163
    in subselect in SET clause like we do it for INSERT.
 
164
 
 
165
    The main thing to fix to remove this restriction is to ensure that the
 
166
    table is marked to be 'used for insert' in which case we should never
 
167
    mark this table as 'const table' (ie, one that has only one row).
 
168
  */
 
169
  if (unique_table(table_list, table_list->next_global))
 
170
  {
 
171
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
 
172
    return(true);
 
173
  }
 
174
 
 
175
  table= table_list->table;
 
176
  transactional_table= table->cursor->has_transactions();
 
177
 
 
178
  if (!fields_vars.elements)
 
179
  {
 
180
    Field **field;
 
181
    for (field=table->field; *field ; field++)
 
182
      fields_vars.push_back(new Item_field(*field));
 
183
    table->setWriteSet();
 
184
    table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
 
185
    /*
 
186
      Let us also prepare SET clause, altough it is probably empty
 
187
      in this case.
 
188
    */
 
189
    if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
 
190
        setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
 
191
      return(true);
 
192
  }
 
193
  else
 
194
  {                                             // Part field list
 
195
    /* TODO: use this conds for 'WITH CHECK OPTIONS' */
 
196
    if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
 
197
        setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
 
198
        check_that_all_fields_are_given_values(session, table, table_list))
 
199
      return(true);
 
200
    /*
 
201
      Check whenever TIMESTAMP field with auto-set feature specified
 
202
      explicitly.
 
203
    */
 
204
    if (table->timestamp_field)
 
205
    {
 
206
      if (table->isWriteSet(table->timestamp_field->field_index))
 
207
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
 
208
      else
 
209
      {
 
210
        table->setWriteSet(table->timestamp_field->field_index);
 
211
      }
 
212
    }
 
213
    /* Fix the expressions in SET clause */
 
214
    if (setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
 
215
      return(true);
 
216
  }
 
217
 
 
218
  table->mark_columns_needed_for_insert();
 
219
 
 
220
  size_t tot_length=0;
 
221
  bool use_blobs= 0, use_vars= 0;
 
222
  List_iterator_fast<Item> it(fields_vars);
 
223
  Item *item;
 
224
 
 
225
  while ((item= it++))
 
226
  {
 
227
    Item *real_item= item->real_item();
 
228
 
 
229
    if (real_item->type() == Item::FIELD_ITEM)
 
230
    {
 
231
      Field *field= ((Item_field*)real_item)->field;
 
232
      if (field->flags & BLOB_FLAG)
 
233
      {
 
234
        use_blobs= 1;
 
235
        tot_length+= 256;                       // Will be extended if needed
 
236
      }
 
237
      else
 
238
        tot_length+= field->field_length;
 
239
    }
 
240
    else if (item->type() == Item::STRING_ITEM)
 
241
      use_vars= 1;
 
242
  }
 
243
  if (use_blobs && !ex->line_term->length() && !field_term->length())
 
244
  {
 
245
    my_message(ER_BLOBS_AND_NO_TERMINATED,ER(ER_BLOBS_AND_NO_TERMINATED),
 
246
               MYF(0));
 
247
    return(true);
 
248
  }
 
249
  if (use_vars && !field_term->length() && !enclosed->length())
 
250
  {
 
251
    my_error(ER_LOAD_FROM_FIXED_SIZE_ROWS_TO_VAR, MYF(0));
 
252
    return(true);
 
253
  }
 
254
 
 
255
  {
 
256
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
257
    ex->file_name+=dirname_length(ex->file_name);
 
258
#endif
 
259
    if (!internal::dirname_length(ex->file_name))
 
260
    {
 
261
      strcpy(name, drizzle_real_data_home);
 
262
      strncat(name, tdb, FN_REFLEN-strlen(drizzle_real_data_home)-1);
 
263
      (void) internal::fn_format(name, ex->file_name, name, "",
 
264
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
265
    }
 
266
    else
 
267
    {
 
268
      (void) internal::fn_format(name, ex->file_name, drizzle_real_data_home, "",
 
269
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
 
270
 
 
271
      if (opt_secure_file_priv &&
 
272
          strncmp(opt_secure_file_priv, name, strlen(opt_secure_file_priv)))
 
273
      {
 
274
        /* Read only allowed from within dir specified by secure_file_priv */
 
275
        my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
276
        return(true);
 
277
      }
 
278
 
 
279
      struct stat stat_info;
 
280
      if (stat(name,&stat_info))
 
281
      {
 
282
        my_error(ER_FILE_NOT_FOUND, MYF(0), name, errno);
 
283
        return(true);
 
284
      }
 
285
 
 
286
      // if we are not in slave thread, the cursor must be:
 
287
      if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
 
288
            (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
 
289
            ((stat_info.st_mode & S_IFREG) == S_IFREG ||
 
290
             (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
 
291
      {
 
292
        my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
 
293
        return(true);
 
294
      }
 
295
      if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
 
296
        is_fifo = 1;
 
297
    }
 
298
    if ((file=internal::my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
 
299
    {
 
300
      my_error(ER_CANT_OPEN_FILE, MYF(0), name, errno);
 
301
      return(true);
 
302
    }
 
303
  }
 
304
 
 
305
  COPY_INFO info;
 
306
  memset(&info, 0, sizeof(info));
 
307
  info.ignore= ignore;
 
308
  info.handle_duplicates=handle_duplicates;
 
309
  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
 
310
 
 
311
  READ_INFO read_info(file, tot_length,
 
312
                      ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(session->db.c_str()),
 
313
                      *field_term,*ex->line_start, *ex->line_term, *enclosed,
 
314
                      info.escape_char, is_fifo);
 
315
  if (read_info.error)
 
316
  {
 
317
    if  (file >= 0)
 
318
      internal::my_close(file,MYF(0));                  // no files in net reading
 
319
    return(true);                               // Can't allocate buffers
 
320
  }
 
321
 
 
322
  /*
 
323
   * Per the SQL standard, inserting NULL into a NOT NULL
 
324
   * field requires an error to be thrown.
 
325
   *
 
326
   * @NOTE
 
327
   *
 
328
   * NULL check and handling occurs in field_conv.cc
 
329
   */
 
330
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
331
  session->cuted_fields=0L;
 
332
  /* Skip lines if there is a line terminator */
 
333
  if (ex->line_term->length())
 
334
  {
 
335
    /* ex->skip_lines needs to be preserved for logging */
 
336
    while (skip_lines > 0)
 
337
    {
 
338
      skip_lines--;
 
339
      if (read_info.next_line())
 
340
        break;
 
341
    }
 
342
  }
 
343
 
 
344
  if (!(error=test(read_info.error)))
 
345
  {
 
346
 
 
347
    table->next_number_field=table->found_next_number_field;
 
348
    if (ignore ||
 
349
        handle_duplicates == DUP_REPLACE)
 
350
      table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
351
    if (handle_duplicates == DUP_REPLACE)
 
352
        table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
353
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
354
    table->copy_blobs=1;
 
355
 
 
356
    session->abort_on_warning= true;
 
357
 
 
358
    if (!field_term->length() && !enclosed->length())
 
359
      error= read_fixed_length(session, info, table_list, fields_vars,
 
360
                               set_fields, set_values, read_info,
 
361
                               skip_lines, ignore);
 
362
    else
 
363
      error= read_sep_field(session, info, table_list, fields_vars,
 
364
                            set_fields, set_values, read_info,
 
365
                            *enclosed, skip_lines, ignore);
 
366
    if (table->cursor->ha_end_bulk_insert() && !error)
 
367
    {
 
368
      table->print_error(errno, MYF(0));
 
369
      error= 1;
 
370
    }
 
371
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
372
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
373
    table->next_number_field=0;
 
374
  }
 
375
  if (file >= 0)
 
376
    internal::my_close(file,MYF(0));
 
377
  free_blobs(table);                            /* if pack_blob was used */
 
378
  table->copy_blobs=0;
 
379
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
 
380
  /*
 
381
     simulated killing in the middle of per-row loop
 
382
     must be effective for binlogging
 
383
  */
 
384
  killed_status= (error == 0)? Session::NOT_KILLED : session->killed;
 
385
  if (error)
 
386
  {
 
387
    error= -1;                          // Error on read
 
388
    goto err;
 
389
  }
 
390
  sprintf(name, ER(ER_LOAD_INFO), (uint32_t) info.records, (uint32_t) info.deleted,
 
391
          (uint32_t) (info.records - info.copied), (uint32_t) session->cuted_fields);
 
392
 
 
393
  if (session->transaction.stmt.hasModifiedNonTransData())
 
394
    session->transaction.all.markModifiedNonTransData();
 
395
 
 
396
  /* ok to client sent only after binlog write and engine commit */
 
397
  session->my_ok(info.copied + info.deleted, 0, 0L, name);
 
398
err:
 
399
  assert(transactional_table || !(info.copied || info.deleted) ||
 
400
              session->transaction.stmt.hasModifiedNonTransData());
 
401
  table->cursor->ha_release_auto_increment();
 
402
  table->auto_increment_field_not_null= false;
 
403
  session->abort_on_warning= 0;
 
404
  return(error);
 
405
}
 
406
 
 
407
 
 
408
/****************************************************************************
 
409
** Read of rows of fixed size + optional garage + optonal newline
 
410
****************************************************************************/
 
411
 
 
412
static int
 
413
read_fixed_length(Session *session, COPY_INFO &info, TableList *table_list,
 
414
                  List<Item> &fields_vars, List<Item> &set_fields,
 
415
                  List<Item> &set_values, READ_INFO &read_info,
 
416
                  uint32_t skip_lines, bool ignore_check_option_errors)
 
417
{
 
418
  List_iterator_fast<Item> it(fields_vars);
 
419
  Item_field *sql_field;
 
420
  Table *table= table_list->table;
 
421
  uint64_t id;
 
422
  bool err;
 
423
 
 
424
  id= 0;
 
425
 
 
426
  while (!read_info.read_fixed_length())
 
427
  {
 
428
    if (session->killed)
 
429
    {
 
430
      session->send_kill_message();
 
431
      return(1);
 
432
    }
 
433
    if (skip_lines)
 
434
    {
 
435
      /*
 
436
        We could implement this with a simple seek if:
 
437
        - We are not using DATA INFILE LOCAL
 
438
        - escape character is  ""
 
439
        - line starting prefix is ""
 
440
      */
 
441
      skip_lines--;
 
442
      continue;
 
443
    }
 
444
    it.rewind();
 
445
    unsigned char *pos=read_info.row_start;
 
446
#ifdef HAVE_purify
 
447
    read_info.row_end[0]=0;
 
448
#endif
 
449
 
 
450
    table->restoreRecordAsDefault();
 
451
    /*
 
452
      There is no variables in fields_vars list in this format so
 
453
      this conversion is safe.
 
454
    */
 
455
    while ((sql_field= (Item_field*) it++))
 
456
    {
 
457
      Field *field= sql_field->field;
 
458
      if (field == table->next_number_field)
 
459
        table->auto_increment_field_not_null= true;
 
460
      /*
 
461
        No fields specified in fields_vars list can be null in this format.
 
462
        Mark field as not null, we should do this for each row because of
 
463
        restore_record...
 
464
      */
 
465
      field->set_notnull();
 
466
 
 
467
      if (pos == read_info.row_end)
 
468
      {
 
469
        session->cuted_fields++;                        /* Not enough fields */
 
470
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
471
                            ER_WARN_TOO_FEW_RECORDS,
 
472
                            ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
 
473
        if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
474
            ((Field_timestamp*) field)->set_time();
 
475
      }
 
476
      else
 
477
      {
 
478
        uint32_t length;
 
479
        unsigned char save_chr;
 
480
        if ((length=(uint32_t) (read_info.row_end-pos)) >
 
481
            field->field_length)
 
482
        {
 
483
          length=field->field_length;
 
484
        }
 
485
        save_chr=pos[length];
 
486
        pos[length]='\0'; // Add temp null terminator for store()
 
487
        field->store((char*) pos,length,read_info.read_charset);
 
488
        pos[length]=save_chr;
 
489
        if ((pos+=length) > read_info.row_end)
 
490
          pos= read_info.row_end;       /* Fills rest with space */
 
491
      }
 
492
    }
 
493
    if (pos != read_info.row_end)
 
494
    {
 
495
      session->cuted_fields++;                  /* To long row */
 
496
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
497
                          ER_WARN_TOO_MANY_RECORDS,
 
498
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
 
499
    }
 
500
 
 
501
    if (session->killed ||
 
502
        fill_record(session, set_fields, set_values,
 
503
                    ignore_check_option_errors))
 
504
      return(1);
 
505
 
 
506
    err= write_record(session, table, &info);
 
507
    table->auto_increment_field_not_null= false;
 
508
    if (err)
 
509
      return(1);
 
510
 
 
511
    /*
 
512
      We don't need to reset auto-increment field since we are restoring
 
513
      its default value at the beginning of each loop iteration.
 
514
    */
 
515
    if (read_info.next_line())                  // Skip to next line
 
516
      break;
 
517
    if (read_info.line_cuted)
 
518
    {
 
519
      session->cuted_fields++;                  /* To long row */
 
520
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
521
                          ER_WARN_TOO_MANY_RECORDS,
 
522
                          ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
 
523
    }
 
524
    session->row_count++;
 
525
  }
 
526
  return(test(read_info.error));
 
527
}
 
528
 
 
529
 
 
530
 
 
531
static int
 
532
read_sep_field(Session *session, COPY_INFO &info, TableList *table_list,
 
533
               List<Item> &fields_vars, List<Item> &set_fields,
 
534
               List<Item> &set_values, READ_INFO &read_info,
 
535
               String &enclosed, uint32_t skip_lines,
 
536
               bool ignore_check_option_errors)
 
537
{
 
538
  List_iterator_fast<Item> it(fields_vars);
 
539
  Item *item;
 
540
  Table *table= table_list->table;
 
541
  uint32_t enclosed_length;
 
542
  uint64_t id;
 
543
  bool err;
 
544
 
 
545
  enclosed_length=enclosed.length();
 
546
  id= 0;
 
547
 
 
548
  for (;;it.rewind())
 
549
  {
 
550
    if (session->killed)
 
551
    {
 
552
      session->send_kill_message();
 
553
      return(1);
 
554
    }
 
555
 
 
556
    table->restoreRecordAsDefault();
 
557
 
 
558
    while ((item= it++))
 
559
    {
 
560
      uint32_t length;
 
561
      unsigned char *pos;
 
562
      Item *real_item;
 
563
 
 
564
      if (read_info.read_field())
 
565
        break;
 
566
 
 
567
      /* If this line is to be skipped we don't want to fill field or var */
 
568
      if (skip_lines)
 
569
        continue;
 
570
 
 
571
      pos=read_info.row_start;
 
572
      length=(uint32_t) (read_info.row_end-pos);
 
573
 
 
574
      real_item= item->real_item();
 
575
 
 
576
      if ((!read_info.enclosed && (enclosed_length && length == 4 && !memcmp(pos, STRING_WITH_LEN("NULL")))) ||
 
577
          (length == 1 && read_info.found_null))
 
578
      {
 
579
 
 
580
        if (real_item->type() == Item::FIELD_ITEM)
 
581
        {
 
582
          Field *field= ((Item_field *)real_item)->field;
 
583
          if (field->reset())
 
584
          {
 
585
            my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0), field->field_name,
 
586
                     session->row_count);
 
587
            return(1);
 
588
          }
 
589
          field->set_null();
 
590
          if (!field->maybe_null())
 
591
          {
 
592
            if (field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
593
              ((Field_timestamp*) field)->set_time();
 
594
            else if (field != table->next_number_field)
 
595
              field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
596
                                 ER_WARN_NULL_TO_NOTNULL, 1);
 
597
          }
 
598
        }
 
599
        else if (item->type() == Item::STRING_ITEM)
 
600
        {
 
601
          ((Item_user_var_as_out_param *)item)->set_null_value(
 
602
                                                  read_info.read_charset);
 
603
        }
 
604
        else
 
605
        {
 
606
          my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
 
607
          return(1);
 
608
        }
 
609
 
 
610
        continue;
 
611
      }
 
612
 
 
613
      if (real_item->type() == Item::FIELD_ITEM)
 
614
      {
 
615
        Field *field= ((Item_field *)real_item)->field;
 
616
        field->set_notnull();
 
617
        read_info.row_end[0]=0;                 // Safe to change end marker
 
618
        if (field == table->next_number_field)
 
619
          table->auto_increment_field_not_null= true;
 
620
        field->store((char*) pos, length, read_info.read_charset);
 
621
      }
 
622
      else if (item->type() == Item::STRING_ITEM)
 
623
      {
 
624
        ((Item_user_var_as_out_param *)item)->set_value((char*) pos, length,
 
625
                                                        read_info.read_charset);
 
626
      }
 
627
      else
 
628
      {
 
629
        my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
 
630
        return(1);
 
631
      }
 
632
    }
 
633
    if (read_info.error)
 
634
      break;
 
635
    if (skip_lines)
 
636
    {
 
637
      skip_lines--;
 
638
      continue;
 
639
    }
 
640
    if (item)
 
641
    {
 
642
      /* Have not read any field, thus input cursor is simply ended */
 
643
      if (item == fields_vars.head())
 
644
        break;
 
645
      for (; item ; item= it++)
 
646
      {
 
647
        Item *real_item= item->real_item();
 
648
        if (real_item->type() == Item::FIELD_ITEM)
 
649
        {
 
650
          Field *field= ((Item_field *)real_item)->field;
 
651
          if (field->reset())
 
652
          {
 
653
            my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0),field->field_name,
 
654
                     session->row_count);
 
655
            return(1);
 
656
          }
 
657
          if (!field->maybe_null() && field->type() == DRIZZLE_TYPE_TIMESTAMP)
 
658
              ((Field_timestamp*) field)->set_time();
 
659
          /*
 
660
            QQ: We probably should not throw warning for each field.
 
661
            But how about intention to always have the same number
 
662
            of warnings in Session::cuted_fields (and get rid of cuted_fields
 
663
            in the end ?)
 
664
          */
 
665
          session->cuted_fields++;
 
666
          push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
667
                              ER_WARN_TOO_FEW_RECORDS,
 
668
                              ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
 
669
        }
 
670
        else if (item->type() == Item::STRING_ITEM)
 
671
        {
 
672
          ((Item_user_var_as_out_param *)item)->set_null_value(
 
673
                                                  read_info.read_charset);
 
674
        }
 
675
        else
 
676
        {
 
677
          my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
 
678
          return(1);
 
679
        }
 
680
      }
 
681
    }
 
682
 
 
683
    if (session->killed ||
 
684
        fill_record(session, set_fields, set_values,
 
685
                    ignore_check_option_errors))
 
686
      return(1);
 
687
 
 
688
    err= write_record(session, table, &info);
 
689
    table->auto_increment_field_not_null= false;
 
690
    if (err)
 
691
      return(1);
 
692
    /*
 
693
      We don't need to reset auto-increment field since we are restoring
 
694
      its default value at the beginning of each loop iteration.
 
695
    */
 
696
    if (read_info.next_line())                  // Skip to next line
 
697
      break;
 
698
    if (read_info.line_cuted)
 
699
    {
 
700
      session->cuted_fields++;                  /* To long row */
 
701
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
702
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
 
703
                          session->row_count);
 
704
      if (session->killed)
 
705
        return(1);
 
706
    }
 
707
    session->row_count++;
 
708
  }
 
709
  return(test(read_info.error));
 
710
}
 
711
 
 
712
 
 
713
/* Unescape all escape characters, mark \N as null */
 
714
 
 
715
char
 
716
READ_INFO::unescape(char chr)
 
717
{
 
718
  /* keep this switch synchornous with the ESCAPE_CHARS macro */
 
719
  switch(chr) {
 
720
  case 'n': return '\n';
 
721
  case 't': return '\t';
 
722
  case 'r': return '\r';
 
723
  case 'b': return '\b';
 
724
  case '0': return 0;                           // Ascii null
 
725
  case 'Z': return '\032';                      // Win32 end of cursor
 
726
  case 'N': found_null=1;
 
727
 
 
728
    /* fall through */
 
729
  default:  return chr;
 
730
  }
 
731
}
 
732
 
 
733
 
 
734
/*
 
735
  Read a line using buffering
 
736
  If last line is empty (in line mode) then it isn't outputed
 
737
*/
 
738
 
 
739
 
 
740
READ_INFO::READ_INFO(int file_par, size_t tot_length,
 
741
                     const CHARSET_INFO * const cs,
 
742
                     String &field_term, String &line_start, String &line_term,
 
743
                     String &enclosed_par, int escape, bool is_fifo)
 
744
  :cursor(file_par),escape_char(escape)
 
745
{
 
746
  read_charset= cs;
 
747
  field_term_ptr=(char*) field_term.ptr();
 
748
  field_term_length= field_term.length();
 
749
  line_term_ptr=(char*) line_term.ptr();
 
750
  line_term_length= line_term.length();
 
751
  if (line_start.length() == 0)
 
752
  {
 
753
    line_start_ptr=0;
 
754
    start_of_line= 0;
 
755
  }
 
756
  else
 
757
  {
 
758
    line_start_ptr=(char*) line_start.ptr();
 
759
    line_start_end=line_start_ptr+line_start.length();
 
760
    start_of_line= 1;
 
761
  }
 
762
  /* If field_terminator == line_terminator, don't use line_terminator */
 
763
  if (field_term_length == line_term_length &&
 
764
      !memcmp(field_term_ptr,line_term_ptr,field_term_length))
 
765
  {
 
766
    line_term_length=0;
 
767
    line_term_ptr=(char*) "";
 
768
  }
 
769
  enclosed_char= (enclosed_length=enclosed_par.length()) ?
 
770
    (unsigned char) enclosed_par[0] : INT_MAX;
 
771
  field_term_char= field_term_length ? (unsigned char) field_term_ptr[0] : INT_MAX;
 
772
  line_term_char= line_term_length ? (unsigned char) line_term_ptr[0] : INT_MAX;
 
773
  error=eof=found_end_of_line=found_null=line_cuted=0;
 
774
  buff_length=tot_length;
 
775
 
 
776
 
 
777
  /* Set of a stack for unget if long terminators */
 
778
  uint32_t length= max(field_term_length,line_term_length)+1;
 
779
  set_if_bigger(length,line_start.length());
 
780
  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
 
781
 
 
782
  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
 
783
    error=1;
 
784
  else
 
785
  {
 
786
    end_of_buff=buffer+buff_length;
 
787
    if (init_io_cache(&cache,(false) ? -1 : cursor, 0,
 
788
                      (false) ? internal::READ_NET :
 
789
                      (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
 
790
                      MYF(MY_WME)))
 
791
    {
 
792
      free((unsigned char*) buffer);
 
793
      error=1;
 
794
    }
 
795
    else
 
796
    {
 
797
      /*
 
798
        init_io_cache() will not initialize read_function member
 
799
        if the cache is READ_NET. So we work around the problem with a
 
800
        manual assignment
 
801
      */
 
802
      need_end_io_cache = 1;
 
803
    }
 
804
  }
 
805
}
 
806
 
 
807
 
 
808
READ_INFO::~READ_INFO()
 
809
{
 
810
  if (!error)
 
811
  {
 
812
    if (need_end_io_cache)
 
813
      internal::end_io_cache(&cache);
 
814
    free(buffer);
 
815
    error=1;
 
816
  }
 
817
}
 
818
 
 
819
 
 
820
#define GET (stack_pos != stack ? *--stack_pos : my_b_get(&cache))
 
821
#define PUSH(A) *(stack_pos++)=(A)
 
822
 
 
823
 
 
824
inline int READ_INFO::terminator(char *ptr,uint32_t length)
 
825
{
 
826
  int chr=0;                                    // Keep gcc happy
 
827
  uint32_t i;
 
828
  for (i=1 ; i < length ; i++)
 
829
  {
 
830
    if ((chr=GET) != *++ptr)
 
831
    {
 
832
      break;
 
833
    }
 
834
  }
 
835
  if (i == length)
 
836
    return 1;
 
837
  PUSH(chr);
 
838
  while (i-- > 1)
 
839
    PUSH((unsigned char) *--ptr);
 
840
  return 0;
 
841
}
 
842
 
 
843
 
 
844
int READ_INFO::read_field()
 
845
{
 
846
  int chr,found_enclosed_char;
 
847
  unsigned char *to,*new_buffer;
 
848
 
 
849
  found_null=0;
 
850
  if (found_end_of_line)
 
851
    return 1;                                   // One have to call next_line
 
852
 
 
853
  /* Skip until we find 'line_start' */
 
854
 
 
855
  if (start_of_line)
 
856
  {                                             // Skip until line_start
 
857
    start_of_line=0;
 
858
    if (find_start_of_fields())
 
859
      return 1;
 
860
  }
 
861
  if ((chr=GET) == my_b_EOF)
 
862
  {
 
863
    found_end_of_line=eof=1;
 
864
    return 1;
 
865
  }
 
866
  to=buffer;
 
867
  if (chr == enclosed_char)
 
868
  {
 
869
    found_enclosed_char=enclosed_char;
 
870
    *to++=(unsigned char) chr;                          // If error
 
871
  }
 
872
  else
 
873
  {
 
874
    found_enclosed_char= INT_MAX;
 
875
    PUSH(chr);
 
876
  }
 
877
 
 
878
  for (;;)
 
879
  {
 
880
    while ( to < end_of_buff)
 
881
    {
 
882
      chr = GET;
 
883
      if ((my_mbcharlen(read_charset, chr) > 1) &&
 
884
          to+my_mbcharlen(read_charset, chr) <= end_of_buff)
 
885
      {
 
886
        unsigned char* p = (unsigned char*)to;
 
887
        *to++ = chr;
 
888
        int ml = my_mbcharlen(read_charset, chr);
 
889
        int i;
 
890
        for (i=1; i<ml; i++) {
 
891
          chr = GET;
 
892
          if (chr == my_b_EOF)
 
893
            goto found_eof;
 
894
          *to++ = chr;
 
895
        }
 
896
        if (my_ismbchar(read_charset,
 
897
              (const char *)p,
 
898
              (const char *)to))
 
899
          continue;
 
900
        for (i=0; i<ml; i++)
 
901
          PUSH((unsigned char) *--to);
 
902
        chr = GET;
 
903
      }
 
904
      if (chr == my_b_EOF)
 
905
        goto found_eof;
 
906
      if (chr == escape_char)
 
907
      {
 
908
        if ((chr=GET) == my_b_EOF)
 
909
        {
 
910
          *to++= (unsigned char) escape_char;
 
911
          goto found_eof;
 
912
        }
 
913
        /*
 
914
          When escape_char == enclosed_char, we treat it like we do for
 
915
          handling quotes in SQL parsing -- you can double-up the
 
916
          escape_char to include it literally, but it doesn't do escapes
 
917
          like \n. This allows: LOAD DATA ... ENCLOSED BY '"' ESCAPED BY '"'
 
918
          with data like: "fie""ld1", "field2"
 
919
         */
 
920
        if (escape_char != enclosed_char || chr == escape_char)
 
921
        {
 
922
          *to++ = (unsigned char) unescape((char) chr);
 
923
          continue;
 
924
        }
 
925
        PUSH(chr);
 
926
        chr= escape_char;
 
927
      }
 
928
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
 
929
      if (chr == line_term_char)
 
930
#else
 
931
        if (chr == line_term_char && found_enclosed_char == INT_MAX)
 
932
#endif
 
933
        {
 
934
          if (terminator(line_term_ptr,line_term_length))
 
935
          {                                     // Maybe unexpected linefeed
 
936
            enclosed=0;
 
937
            found_end_of_line=1;
 
938
            row_start=buffer;
 
939
            row_end=  to;
 
940
            return 0;
 
941
          }
 
942
        }
 
943
      if (chr == found_enclosed_char)
 
944
      {
 
945
        if ((chr=GET) == found_enclosed_char)
 
946
        {                                       // Remove dupplicated
 
947
          *to++ = (unsigned char) chr;
 
948
          continue;
 
949
        }
 
950
        // End of enclosed field if followed by field_term or line_term
 
951
        if (chr == my_b_EOF ||
 
952
            (chr == line_term_char && terminator(line_term_ptr, line_term_length)))
 
953
        {                                       // Maybe unexpected linefeed
 
954
          enclosed=1;
 
955
          found_end_of_line=1;
 
956
          row_start=buffer+1;
 
957
          row_end=  to;
 
958
          return 0;
 
959
        }
 
960
        if (chr == field_term_char &&
 
961
            terminator(field_term_ptr,field_term_length))
 
962
        {
 
963
          enclosed=1;
 
964
          row_start=buffer+1;
 
965
          row_end=  to;
 
966
          return 0;
 
967
        }
 
968
        /*
 
969
           The string didn't terminate yet.
 
970
           Store back next character for the loop
 
971
         */
 
972
        PUSH(chr);
 
973
        /* copy the found term character to 'to' */
 
974
        chr= found_enclosed_char;
 
975
      }
 
976
      else if (chr == field_term_char && found_enclosed_char == INT_MAX)
 
977
      {
 
978
        if (terminator(field_term_ptr,field_term_length))
 
979
        {
 
980
          enclosed=0;
 
981
          row_start=buffer;
 
982
          row_end=  to;
 
983
          return 0;
 
984
        }
 
985
      }
 
986
      *to++ = (unsigned char) chr;
 
987
    }
 
988
    /*
 
989
     ** We come here if buffer is too small. Enlarge it and continue
 
990
     */
 
991
    if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
 
992
      return (error=1);
 
993
    to=new_buffer + (to-buffer);
 
994
    buffer=new_buffer;
 
995
    buff_length+=IO_SIZE;
 
996
    end_of_buff=buffer+buff_length;
 
997
  }
 
998
 
 
999
found_eof:
 
1000
  enclosed=0;
 
1001
  found_end_of_line=eof=1;
 
1002
  row_start=buffer;
 
1003
  row_end=to;
 
1004
  return 0;
 
1005
}
 
1006
 
 
1007
/*
 
1008
  Read a row with fixed length.
 
1009
 
 
1010
  NOTES
 
1011
    The row may not be fixed size on disk if there are escape
 
1012
    characters in the cursor.
 
1013
 
 
1014
  IMPLEMENTATION NOTE
 
1015
    One can't use fixed length with multi-byte charset **
 
1016
 
 
1017
  RETURN
 
1018
    0  ok
 
1019
    1  error
 
1020
*/
 
1021
 
 
1022
int READ_INFO::read_fixed_length()
 
1023
{
 
1024
  int chr;
 
1025
  unsigned char *to;
 
1026
  if (found_end_of_line)
 
1027
    return 1;                                   // One have to call next_line
 
1028
 
 
1029
  if (start_of_line)
 
1030
  {                                             // Skip until line_start
 
1031
    start_of_line=0;
 
1032
    if (find_start_of_fields())
 
1033
      return 1;
 
1034
  }
 
1035
 
 
1036
  to=row_start=buffer;
 
1037
  while (to < end_of_buff)
 
1038
  {
 
1039
    if ((chr=GET) == my_b_EOF)
 
1040
      goto found_eof;
 
1041
    if (chr == escape_char)
 
1042
    {
 
1043
      if ((chr=GET) == my_b_EOF)
 
1044
      {
 
1045
        *to++= (unsigned char) escape_char;
 
1046
        goto found_eof;
 
1047
      }
 
1048
      *to++ =(unsigned char) unescape((char) chr);
 
1049
      continue;
 
1050
    }
 
1051
    if (chr == line_term_char)
 
1052
    {
 
1053
      if (terminator(line_term_ptr,line_term_length))
 
1054
      {                                         // Maybe unexpected linefeed
 
1055
        found_end_of_line=1;
 
1056
        row_end=  to;
 
1057
        return 0;
 
1058
      }
 
1059
    }
 
1060
    *to++ = (unsigned char) chr;
 
1061
  }
 
1062
  row_end=to;                                   // Found full line
 
1063
  return 0;
 
1064
 
 
1065
found_eof:
 
1066
  found_end_of_line=eof=1;
 
1067
  row_start=buffer;
 
1068
  row_end=to;
 
1069
  return to == buffer ? 1 : 0;
 
1070
}
 
1071
 
 
1072
 
 
1073
int READ_INFO::next_line()
 
1074
{
 
1075
  line_cuted=0;
 
1076
  start_of_line= line_start_ptr != 0;
 
1077
  if (found_end_of_line || eof)
 
1078
  {
 
1079
    found_end_of_line=0;
 
1080
    return eof;
 
1081
  }
 
1082
  found_end_of_line=0;
 
1083
  if (!line_term_length)
 
1084
    return 0;                                   // No lines
 
1085
  for (;;)
 
1086
  {
 
1087
    int chr = GET;
 
1088
    if (my_mbcharlen(read_charset, chr) > 1)
 
1089
    {
 
1090
      for (uint32_t i=1;
 
1091
          chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
 
1092
          i++)
 
1093
        chr = GET;
 
1094
      if (chr == escape_char)
 
1095
        continue;
 
1096
    }
 
1097
    if (chr == my_b_EOF)
 
1098
    {
 
1099
      eof=1;
 
1100
      return 1;
 
1101
    }
 
1102
    if (chr == escape_char)
 
1103
    {
 
1104
      line_cuted=1;
 
1105
      if (GET == my_b_EOF)
 
1106
        return 1;
 
1107
      continue;
 
1108
    }
 
1109
    if (chr == line_term_char && terminator(line_term_ptr,line_term_length))
 
1110
      return 0;
 
1111
    line_cuted=1;
 
1112
  }
 
1113
}
 
1114
 
 
1115
 
 
1116
bool READ_INFO::find_start_of_fields()
 
1117
{
 
1118
  int chr;
 
1119
 try_again:
 
1120
  do
 
1121
  {
 
1122
    if ((chr=GET) == my_b_EOF)
 
1123
    {
 
1124
      found_end_of_line=eof=1;
 
1125
      return 1;
 
1126
    }
 
1127
  } while ((char) chr != line_start_ptr[0]);
 
1128
  for (char *ptr=line_start_ptr+1 ; ptr != line_start_end ; ptr++)
 
1129
  {
 
1130
    chr=GET;                                    // Eof will be checked later
 
1131
    if ((char) chr != *ptr)
 
1132
    {                                           // Can't be line_start
 
1133
      PUSH(chr);
 
1134
      while (--ptr != line_start_ptr)
 
1135
      {                                         // Restart with next char
 
1136
        PUSH((unsigned char) *ptr);
 
1137
      }
 
1138
      goto try_again;
 
1139
    }
 
1140
  }
 
1141
  return 0;
 
1142
}
 
1143
 
 
1144
 
 
1145
} /* namespace drizzled */