~mdcallag/+junk/5.1-map

« back to all changes in this revision

Viewing changes to sql/ha_ndbcluster.cc

  • Committer: msvensson at pilot
  • Date: 2007-04-24 09:11:45 UTC
  • mfrom: (2469.1.106)
  • Revision ID: sp1r-msvensson@pilot.blaudden-20070424091145-10463
Merge pilot.blaudden:/home/msvensson/mysql/my51-m-mysql_upgrade
into  pilot.blaudden:/home/msvensson/mysql/mysql-5.1-maint

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
#endif
25
25
 
26
26
#include "mysql_priv.h"
 
27
#include "rpl_mi.h"
27
28
 
28
29
#include <my_dir.h>
29
30
#ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
133
134
}
134
135
 
135
136
static int ndbcluster_inited= 0;
136
 
static int ndbcluster_terminating= 0;
 
137
int ndbcluster_terminating= 0;
137
138
 
138
139
static Ndb* g_ndb= NULL;
139
140
Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
349
350
  count= 0;
350
351
  all= NULL;
351
352
  stmt= NULL;
352
 
  error= 0;
 
353
  m_error= FALSE;
353
354
  query_state&= NDB_QUERY_NORMAL;
354
355
  options= 0;
355
356
  (void) hash_init(&open_tables, &my_charset_bin, 5, 0, 0,
384
385
Thd_ndb::init_open_tables()
385
386
{
386
387
  count= 0;
387
 
  error= 0;
 
388
  m_error= FALSE;
388
389
  my_hash_reset(&open_tables);
389
390
}
390
391
 
460
461
  }
461
462
 
462
463
  THD *thd= current_thd;
463
 
  if (get_thd_ndb(thd)->error)
 
464
  if (get_thd_ndb(thd)->m_error)
464
465
    local_info->no_uncommitted_rows_count= 0;
465
466
 
466
467
  DBUG_RETURN(retval + local_info->no_uncommitted_rows_count);
480
481
  {
481
482
    Ndb *ndb= get_ndb();
482
483
    struct Ndb_statistics stat;
483
 
    ndb->setDatabaseName(m_dbname);
 
484
    if (ndb->setDatabaseName(m_dbname))
 
485
    {
 
486
      return my_errno= HA_ERR_OUT_OF_MEM;
 
487
    }
484
488
    result= ndb_get_table_statistics(this, TRUE, ndb, m_table, &stat);
485
489
    if (result == 0)
486
490
    {
491
495
  }
492
496
  {
493
497
    THD *thd= current_thd;
494
 
    if (get_thd_ndb(thd)->error)
 
498
    if (get_thd_ndb(thd)->m_error)
495
499
      local_info->no_uncommitted_rows_count= 0;
496
500
  }
497
501
  if (result == 0)
504
508
  if (m_ha_not_exact_count)
505
509
    return;
506
510
  DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_execute_failure");
507
 
  get_thd_ndb(current_thd)->error= 1;
 
511
  get_thd_ndb(current_thd)->m_error= TRUE;
508
512
  DBUG_VOID_RETURN;
509
513
}
510
514
 
528
532
  DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_reset");
529
533
  Thd_ndb *thd_ndb= get_thd_ndb(thd);
530
534
  thd_ndb->count++;
531
 
  thd_ndb->error= 0;
 
535
  thd_ndb->m_error= FALSE;
532
536
  DBUG_VOID_RETURN;
533
537
}
534
538
 
733
737
        DBUG_PRINT("info", ("bit field"));
734
738
        DBUG_DUMP("value", (char*)&bits, pack_len);
735
739
#ifdef WORDS_BIGENDIAN
736
 
        if (pack_len < 5)
737
 
        {
738
 
          DBUG_RETURN(ndb_op->setValue(fieldnr, ((char*)&bits)+4) != 0);
739
 
        }
 
740
        /* store lsw first */
 
741
        bits = ((bits >> 32) & 0x00000000FFFFFFFF)
 
742
          |    ((bits << 32) & 0xFFFFFFFF00000000);
740
743
#endif
741
744
        DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)&bits) != 0);
742
745
      }
875
878
      DBUG_PRINT("info", ("allocate blobs buffer size %u", offset));
876
879
      buffer= my_malloc(offset, MYF(MY_WME));
877
880
      if (buffer == NULL)
 
881
      {
 
882
        sql_print_error("ha_ndbcluster::get_ndb_blobs_value: "
 
883
                        "my_malloc(%u) failed", offset);
878
884
        DBUG_RETURN(-1);
 
885
      }
879
886
      buffer_size= offset;
880
887
    }
881
888
  }
1008
1015
  DBUG_ASSERT(m_table == NULL);
1009
1016
  DBUG_ASSERT(m_table_info == NULL);
1010
1017
 
1011
 
  const void *data, *pack_data;
 
1018
  const void *data= NULL, *pack_data= NULL;
1012
1019
  uint length, pack_length;
1013
1020
 
1014
1021
  /*
1067
1074
  if (data.unique_index_attrid_map)
1068
1075
    my_free((char*)data.unique_index_attrid_map, MYF(0));
1069
1076
  data.unique_index_attrid_map= (uchar*)my_malloc(sz,MYF(MY_WME));
 
1077
  if (data.unique_index_attrid_map == 0)
 
1078
  {
 
1079
    sql_print_error("fix_unique_index_attr_order: my_malloc(%u) failure",
 
1080
                    (unsigned int)sz);
 
1081
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
 
1082
  }
1070
1083
 
1071
1084
  KEY_PART_INFO* key_part= key_info->key_part;
1072
1085
  KEY_PART_INFO* end= key_part+key_info->key_parts;
1104
1117
  KEY* key_info= tab->key_info;
1105
1118
  const char **key_name= tab->s->keynames.type_names;
1106
1119
  DBUG_ENTER("ha_ndbcluster::create_indexes");
1107
 
  
 
1120
 
1108
1121
  for (i= 0; i < tab->s->keys; i++, key_info++, key_name++)
1109
1122
  {
1110
1123
    index_name= *key_name;
2713
2726
    op->setValue(no_fields, part_func_value);
2714
2727
  }
2715
2728
 
 
2729
  if (unlikely(m_slow_path))
 
2730
  {
 
2731
    if (!(thd->options & OPTION_BIN_LOG))
 
2732
      op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
 
2733
    else if (thd->slave_thread)
 
2734
      op->setAnyValue(thd->server_id);
 
2735
  }
2716
2736
  m_rows_changed++;
2717
2737
 
2718
2738
  /*
2993
3013
      no_fields++;
2994
3014
    op->setValue(no_fields, part_func_value);
2995
3015
  }
2996
 
  // Execute update operation
2997
 
  if (!cursor && execute_no_commit(this,trans,FALSE) != 0) {
 
3016
 
 
3017
  if (unlikely(m_slow_path))
 
3018
  {
 
3019
    if (!(thd->options & OPTION_BIN_LOG))
 
3020
      op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
 
3021
    else if (thd->slave_thread)
 
3022
      op->setAnyValue(thd->server_id);
 
3023
  }
 
3024
  /*
 
3025
    Execute update operation if we are not doing a scan for update
 
3026
    and there exist UPDATE AFTER triggers
 
3027
  */
 
3028
 
 
3029
  if ((!cursor || m_update_cannot_batch) && 
 
3030
      execute_no_commit(this,trans,false) != 0) {
2998
3031
    no_uncommitted_rows_execute_failure();
2999
3032
    DBUG_RETURN(ndb_err(trans));
3000
3033
  }
3048
3081
 
3049
3082
    no_uncommitted_rows_update(-1);
3050
3083
 
3051
 
    if (!m_primary_key_update)
 
3084
    if (unlikely(m_slow_path))
 
3085
    {
 
3086
      if (!(thd->options & OPTION_BIN_LOG))
 
3087
        ((NdbOperation *)trans->getLastDefinedOperation())->
 
3088
          setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
 
3089
      else if (thd->slave_thread)
 
3090
        ((NdbOperation *)trans->getLastDefinedOperation())->
 
3091
          setAnyValue(thd->server_id);
 
3092
    }
 
3093
    if (!(m_primary_key_update || m_delete_cannot_batch))
3052
3094
      // If deleting from cursor, NoCommit will be handled in next_result
3053
3095
      DBUG_RETURN(0);
3054
3096
  }
3077
3119
      if ((error= set_primary_key_from_record(op, record)))
3078
3120
        DBUG_RETURN(error);
3079
3121
    }
 
3122
 
 
3123
    if (unlikely(m_slow_path))
 
3124
    {
 
3125
      if (!(thd->options & OPTION_BIN_LOG))
 
3126
        op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
 
3127
      else if (thd->slave_thread)
 
3128
        op->setAnyValue(thd->server_id);
 
3129
    }
3080
3130
  }
3081
3131
 
3082
3132
  // Execute delete operation
3109
3159
  my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
3110
3160
  DBUG_ENTER("ndb_unpack_record");
3111
3161
 
3112
 
  // Set null flag(s)
3113
 
  bzero(buf, table->s->null_bytes);
 
3162
  /*
 
3163
    Set the filler bits of the null byte, since they are
 
3164
    not touched in the code below.
 
3165
    
 
3166
    The filler bits are the MSBs in the last null byte
 
3167
  */ 
 
3168
  if (table->s->null_bytes > 0)
 
3169
       buf[table->s->null_bytes - 1]|= 256U - (1U <<
 
3170
                                               table->s->last_null_bit_pos);
 
3171
  /*
 
3172
    Set null flag(s)
 
3173
  */
3114
3174
  for ( ; field;
3115
3175
       p_field++, value++, field= *p_field)
3116
3176
  {
 
3177
    field->set_notnull(row_offset);       
3117
3178
    if ((*value).ptr)
3118
3179
    {
3119
3180
      if (!(field->flags & BLOB_FLAG))
3123
3184
        {
3124
3185
          if (is_null > 0)
3125
3186
          {
3126
 
            DBUG_PRINT("info",("[%u] NULL",
 
3187
            DBUG_PRINT("info",("[%u] NULL",
3127
3188
                               (*value).rec->getColumn()->getColumnNo()));
3128
3189
            field->set_null(row_offset);
3129
3190
          }
3155
3216
          else
3156
3217
          {
3157
3218
            DBUG_PRINT("info", ("bit field H'%.8X%.8X",
3158
 
                                *(Uint32*) (*value).rec->aRef(),
3159
 
                                *((Uint32*) (*value).rec->aRef()+1)));
3160
 
            field_bit->Field_bit::store((longlong) (*value).rec->u_64_value(), 
 
3219
                                *(Uint32 *)(*value).rec->aRef(),
 
3220
                                *((Uint32 *)(*value).rec->aRef()+1)));
 
3221
#ifdef WORDS_BIGENDIAN
 
3222
            /* lsw is stored first */
 
3223
            Uint32 *buf= (Uint32 *)(*value).rec->aRef();
 
3224
            field_bit->Field_bit::store((((longlong)*buf)
 
3225
                                         & 0x000000000FFFFFFFF)
 
3226
                                        |
 
3227
                                        ((((longlong)*(buf+1)) << 32)
 
3228
                                         & 0xFFFFFFFF00000000),
3161
3229
                                        TRUE);
 
3230
#else
 
3231
            field_bit->Field_bit::store((longlong)
 
3232
                                        (*value).rec->u_64_value(), TRUE);
 
3233
#endif
3162
3234
          }
3163
3235
          /*
3164
3236
            Move back internal field pointer to point to original
3370
3442
}
3371
3443
 
3372
3444
 
3373
 
int ha_ndbcluster::index_read_idx(byte *buf, uint index_no, 
3374
 
                              const byte *key, uint key_len, 
3375
 
                              enum ha_rkey_function find_flag)
3376
 
{
3377
 
  statistic_increment(current_thd->status_var.ha_read_key_count, &LOCK_status);
3378
 
  DBUG_ENTER("ha_ndbcluster::index_read_idx");
3379
 
  DBUG_PRINT("enter", ("index_no: %u, key_len: %u", index_no, key_len));  
3380
 
  close_scan();
3381
 
  index_init(index_no, 0);  
3382
 
  DBUG_RETURN(index_read(buf, key, key_len, find_flag));
3383
 
}
3384
 
 
3385
 
 
3386
3445
int ha_ndbcluster::index_next(byte *buf)
3387
3446
{
3388
3447
  DBUG_ENTER("ha_ndbcluster::index_next");
3549
3608
 
3550
3609
  m_multi_cursor= 0;
3551
3610
  if (!m_active_cursor && !m_multi_cursor)
3552
 
    DBUG_RETURN(1);
 
3611
    DBUG_RETURN(0);
3553
3612
 
3554
3613
  NdbScanOperation *cursor= m_active_cursor ? m_active_cursor : m_multi_cursor;
3555
 
  
 
3614
 
3556
3615
  if (m_lock_tuple)
3557
3616
  {
3558
3617
    /*
3783
3842
      Ndb *ndb= get_ndb();
3784
3843
      ndb->setDatabaseName(m_dbname);
3785
3844
      struct Ndb_statistics stat;
3786
 
      ndb->setDatabaseName(m_dbname);
 
3845
      if (ndb->setDatabaseName(m_dbname))
 
3846
      {
 
3847
        DBUG_RETURN(my_errno= HA_ERR_OUT_OF_MEM);
 
3848
      }
3787
3849
      if (current_thd->variables.ndb_use_exact_count &&
3788
3850
          (result= ndb_get_table_statistics(this, TRUE, ndb, m_table, &stat))
3789
3851
          == 0)
3876
3938
    break;
3877
3939
  case HA_EXTRA_WRITE_CAN_REPLACE:
3878
3940
    DBUG_PRINT("info", ("HA_EXTRA_WRITE_CAN_REPLACE"));
3879
 
    if (!m_has_unique_index)
 
3941
    if (!m_has_unique_index ||
 
3942
        current_thd->slave_thread) /* always set if slave, quick fix for bug 27378 */
3880
3943
    {
3881
3944
      DBUG_PRINT("info", ("Turning ON use of write instead of insert"));
3882
3945
      m_use_write= TRUE;
3887
3950
    DBUG_PRINT("info", ("Turning OFF use of write instead of insert"));
3888
3951
    m_use_write= FALSE;
3889
3952
    break;
 
3953
  case HA_EXTRA_DELETE_CANNOT_BATCH:
 
3954
    DBUG_PRINT("info", ("HA_EXTRA_DELETE_CANNOT_BATCH"));
 
3955
    m_delete_cannot_batch= TRUE;
 
3956
    break;
 
3957
  case HA_EXTRA_UPDATE_CANNOT_BATCH:
 
3958
    DBUG_PRINT("info", ("HA_EXTRA_UPDATE_CANNOT_BATCH"));
 
3959
    m_update_cannot_batch= TRUE;
 
3960
    break;
3890
3961
  default:
3891
3962
    break;
3892
3963
  }
3906
3977
  */
3907
3978
  if (m_part_info)
3908
3979
    bitmap_set_all(&m_part_info->used_partitions);
 
3980
 
 
3981
  /* reset flags set by extra calls */
 
3982
  m_ignore_dup_key= FALSE;
 
3983
  m_use_write= FALSE;
 
3984
  m_ignore_no_key= FALSE;
 
3985
  m_delete_cannot_batch= FALSE;
 
3986
  m_update_cannot_batch= FALSE;
 
3987
 
3909
3988
  DBUG_RETURN(0);
3910
3989
}
3911
3990
 
4119
4198
  - refresh list of the indexes for the table if needed (if altered)
4120
4199
 */
4121
4200
 
 
4201
#ifdef HAVE_NDB_BINLOG
 
4202
extern MASTER_INFO *active_mi;
 
4203
static int ndbcluster_update_apply_status(THD *thd, int do_update)
 
4204
{
 
4205
  return 0;
 
4206
 
 
4207
  Thd_ndb *thd_ndb= get_thd_ndb(thd);
 
4208
  Ndb *ndb= thd_ndb->ndb;
 
4209
  NDBDICT *dict= ndb->getDictionary();
 
4210
  const NDBTAB *ndbtab;
 
4211
  NdbTransaction *trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt;
 
4212
  ndb->setDatabaseName(NDB_REP_DB);
 
4213
  Ndb_table_guard ndbtab_g(dict, NDB_APPLY_TABLE);
 
4214
  if (!(ndbtab= ndbtab_g.get_table()))
 
4215
  {
 
4216
    return -1;
 
4217
  }
 
4218
  NdbOperation *op= 0;
 
4219
  int r= 0;
 
4220
  r|= (op= trans->getNdbOperation(ndbtab)) == 0;
 
4221
  DBUG_ASSERT(r == 0);
 
4222
  if (do_update)
 
4223
    r|= op->updateTuple();
 
4224
  else
 
4225
    r|= op->writeTuple();
 
4226
  DBUG_ASSERT(r == 0);
 
4227
  // server_id
 
4228
  r|= op->equal(0u, (Uint32)thd->server_id);
 
4229
  DBUG_ASSERT(r == 0);
 
4230
  if (!do_update)
 
4231
  {
 
4232
    // epoch
 
4233
    r|= op->setValue(1u, (Uint64)0);
 
4234
    DBUG_ASSERT(r == 0);
 
4235
  }
 
4236
  // log_name
 
4237
  char tmp_buf[FN_REFLEN];
 
4238
  ndb_pack_varchar(ndbtab->getColumn(2u), tmp_buf,
 
4239
                   active_mi->rli.group_master_log_name,
 
4240
                   strlen(active_mi->rli.group_master_log_name));
 
4241
  r|= op->setValue(2u, tmp_buf);
 
4242
  DBUG_ASSERT(r == 0);
 
4243
  // start_pos
 
4244
  r|= op->setValue(3u, (Uint64)active_mi->rli.group_master_log_pos);
 
4245
  DBUG_ASSERT(r == 0);
 
4246
  // end_pos
 
4247
  r|= op->setValue(4u, (Uint64)active_mi->rli.group_master_log_pos + 
 
4248
                   ((Uint64)active_mi->rli.future_event_relay_log_pos -
 
4249
                    (Uint64)active_mi->rli.group_relay_log_pos));
 
4250
  DBUG_ASSERT(r == 0);
 
4251
  return 0;
 
4252
}
 
4253
#endif /* HAVE_NDB_BINLOG */
 
4254
 
4122
4255
int ha_ndbcluster::external_lock(THD *thd, int lock_type)
4123
4256
{
4124
4257
  int error=0;
4147
4280
    {
4148
4281
      m_transaction_on= FALSE;
4149
4282
      /* Would be simpler if has_transactions() didn't always say "yes" */
4150
 
      thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
4151
 
      thd->no_trans_update= TRUE;
 
4283
      thd->no_trans_update.all= thd->no_trans_update.stmt= TRUE;
4152
4284
    }
4153
4285
    else if (!thd->transaction.on)
4154
4286
      m_transaction_on= FALSE;
4169
4301
        thd_ndb->init_open_tables();
4170
4302
        thd_ndb->stmt= trans;
4171
4303
        thd_ndb->query_state&= NDB_QUERY_NORMAL;
 
4304
        thd_ndb->trans_options= 0;
 
4305
        thd_ndb->m_slow_path= FALSE;
 
4306
        if (thd->slave_thread ||
 
4307
            !(thd->options & OPTION_BIN_LOG))
 
4308
          thd_ndb->m_slow_path= TRUE;
4172
4309
        trans_register_ha(thd, FALSE, ndbcluster_hton);
4173
4310
      } 
4174
4311
      else 
4185
4322
          thd_ndb->init_open_tables();
4186
4323
          thd_ndb->all= trans; 
4187
4324
          thd_ndb->query_state&= NDB_QUERY_NORMAL;
 
4325
          thd_ndb->trans_options= 0;
 
4326
          thd_ndb->m_slow_path= FALSE;
 
4327
          if (thd->slave_thread ||
 
4328
              !(thd->options & OPTION_BIN_LOG))
 
4329
            thd_ndb->m_slow_path= TRUE;
4188
4330
          trans_register_ha(thd, TRUE, ndbcluster_hton);
4189
4331
 
4190
4332
          /*
4225
4367
    // Start of transaction
4226
4368
    m_rows_changed= 0;
4227
4369
    m_ops_pending= 0;
4228
 
 
 
4370
    m_slow_path= thd_ndb->m_slow_path;
 
4371
#ifdef HAVE_NDB_BINLOG
 
4372
    if (unlikely(m_slow_path))
 
4373
    {
 
4374
      if (m_share == ndb_apply_status_share && thd->slave_thread)
 
4375
        thd_ndb->trans_options|= TNTO_INJECTED_APPLY_STATUS;
 
4376
    }
 
4377
#endif
4229
4378
    // TODO remove double pointers...
4230
4379
    m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table);
4231
4380
    m_table_info= &m_thd_ndb_share->stat;
4366
4515
  DBUG_PRINT("transaction",("%s",
4367
4516
                            trans == thd_ndb->stmt ?
4368
4517
                            "stmt" : "all"));
4369
 
  DBUG_ASSERT(ndb && trans);
 
4518
  DBUG_ASSERT(ndb);
 
4519
  if (trans == NULL)
 
4520
    DBUG_RETURN(0);
 
4521
 
 
4522
#ifdef HAVE_NDB_BINLOG
 
4523
  if (unlikely(thd_ndb->m_slow_path))
 
4524
  {
 
4525
    if (thd->slave_thread)
 
4526
      ndbcluster_update_apply_status
 
4527
        (thd, thd_ndb->trans_options & TNTO_INJECTED_APPLY_STATUS);
 
4528
  }
 
4529
#endif /* HAVE_NDB_BINLOG */
4370
4530
 
4371
4531
  if (execute_commit(thd,trans) != 0)
4372
4532
  {
4456
4616
                             HA_CREATE_INFO *info)
4457
4617
{
4458
4618
  // Set name
4459
 
  col.setName(field->field_name);
 
4619
  if (col.setName(field->field_name))
 
4620
  {
 
4621
    return (my_errno= errno);
 
4622
  }
4460
4623
  // Get char set
4461
4624
  CHARSET_INFO *cs= field->charset();
4462
4625
  // Set type and sizes
4745
4908
  NDBTAB tab;
4746
4909
  NDBCOL col;
4747
4910
  uint pack_length, length, i, pk_length= 0;
4748
 
  const void *data, *pack_data;
 
4911
  const void *data= NULL, *pack_data= NULL;
4749
4912
  bool create_from_engine= (create_info->table_options & HA_OPTION_CREATE_FROM_ENGINE);
4750
4913
  bool is_truncate= (thd->lex->sql_command == SQLCOM_TRUNCATE);
4751
4914
  char tablespace[FN_LEN];
 
4915
  NdbDictionary::Table::SingleUserMode single_user_mode= NdbDictionary::Table::SingleUserModeLocked;
4752
4916
 
4753
4917
  DBUG_ENTER("ha_ndbcluster::create");
4754
4918
  DBUG_PRINT("enter", ("name: %s", name));
4800
4964
    schema distribution table is setup
4801
4965
    ( unless it is a creation of the schema dist table itself )
4802
4966
  */
4803
 
  if (!ndb_schema_share &&
4804
 
      !(strcmp(m_dbname, NDB_REP_DB) == 0 &&
4805
 
        strcmp(m_tabname, NDB_SCHEMA_TABLE) == 0))
 
4967
  if (!ndb_schema_share)
4806
4968
  {
4807
 
    DBUG_PRINT("info", ("Schema distribution table not setup"));
4808
 
    DBUG_RETURN(HA_ERR_NO_CONNECTION);
 
4969
    if (!(strcmp(m_dbname, NDB_REP_DB) == 0 &&
 
4970
          strcmp(m_tabname, NDB_SCHEMA_TABLE) == 0))
 
4971
    {
 
4972
      DBUG_PRINT("info", ("Schema distribution table not setup"));
 
4973
      DBUG_RETURN(HA_ERR_NO_CONNECTION);
 
4974
    }
 
4975
    single_user_mode = NdbDictionary::Table::SingleUserModeReadWrite;
4809
4976
  }
4810
4977
#endif /* HAVE_NDB_BINLOG */
4811
4978
 
4812
4979
  DBUG_PRINT("table", ("name: %s", m_tabname));  
4813
 
  tab.setName(m_tabname);
 
4980
  if (tab.setName(m_tabname))
 
4981
  {
 
4982
    DBUG_RETURN(my_errno= errno);
 
4983
  }
4814
4984
  tab.setLogging(!(create_info->options & HA_LEX_CREATE_TMP_TABLE));    
4815
 
   
 
4985
  tab.setSingleUserMode(single_user_mode);
 
4986
 
4816
4987
  // Save frm data for this table
4817
4988
  if (readfrm(name, &data, &length))
4818
4989
    DBUG_RETURN(1);
4826
4997
  my_free((char*)data, MYF(0));
4827
4998
  my_free((char*)pack_data, MYF(0));
4828
4999
  
 
5000
  if (create_info->storage_media == HA_SM_DISK)
 
5001
  { 
 
5002
    if (create_info->tablespace)
 
5003
      tab.setTablespaceName(create_info->tablespace);
 
5004
    else
 
5005
      tab.setTablespaceName("DEFAULT-TS");
 
5006
  }
 
5007
  else if (create_info->tablespace)
 
5008
  {
 
5009
    if (create_info->storage_media == HA_SM_MEMORY)
 
5010
    {
 
5011
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
 
5012
                          ER_ILLEGAL_HA_CREATE_OPTION,
 
5013
                          ER(ER_ILLEGAL_HA_CREATE_OPTION),
 
5014
                          ndbcluster_hton_name,
 
5015
                          "TABLESPACE currently only supported for "
 
5016
                          "STORAGE DISK"); 
 
5017
      DBUG_RETURN(HA_ERR_UNSUPPORTED);
 
5018
    }
 
5019
    tab.setTablespaceName(create_info->tablespace);
 
5020
    create_info->storage_media = HA_SM_DISK;  //if use tablespace, that also means store on disk
 
5021
  }
 
5022
 
4829
5023
  for (i= 0; i < form->s->fields; i++) 
4830
5024
  {
4831
5025
    Field *field= form->field[i];
4832
 
    DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d", 
 
5026
    DBUG_PRINT("info", ("name: %s  type: %u  pack_length: %d", 
4833
5027
                        field->field_name, field->real_type(),
4834
5028
                        field->pack_length()));
4835
5029
    if ((my_errno= create_ndb_column(col, field, create_info)))
4836
5030
      DBUG_RETURN(my_errno);
4837
5031
 
4838
 
    if (create_info->storage_media == HA_SM_DISK)
 
5032
    if (create_info->storage_media == HA_SM_DISK ||
 
5033
        create_info->tablespace)
4839
5034
      col.setStorageType(NdbDictionary::Column::StorageTypeDisk);
4840
5035
    else
4841
5036
      col.setStorageType(NdbDictionary::Column::StorageTypeMemory);
4842
5037
 
4843
 
    tab.addColumn(col);
 
5038
    if (tab.addColumn(col))
 
5039
    {
 
5040
      DBUG_RETURN(my_errno= errno);
 
5041
    }
4844
5042
    if (col.getPrimaryKey())
4845
5043
      pk_length += (field->pack_length() + 3) / 4;
4846
5044
  }
4855
5053
                             NdbDictionary::Column::StorageTypeMemory);
4856
5054
  }
4857
5055
 
4858
 
  if (create_info->storage_media == HA_SM_DISK)
4859
 
  { 
4860
 
    if (create_info->tablespace)
4861
 
      tab.setTablespaceName(create_info->tablespace);
4862
 
    else
4863
 
      tab.setTablespaceName("DEFAULT-TS");
4864
 
  }
4865
 
  else if (create_info->tablespace)
4866
 
  {
4867
 
    if (create_info->storage_media == HA_SM_MEMORY)
4868
 
    {
4869
 
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
4870
 
                          ER_ILLEGAL_HA_CREATE_OPTION,
4871
 
                          ER(ER_ILLEGAL_HA_CREATE_OPTION),
4872
 
                          ndbcluster_hton_name,
4873
 
                          "TABLESPACE currently only supported for "
4874
 
                          "STORAGE DISK"); 
4875
 
      DBUG_RETURN(HA_ERR_UNSUPPORTED);
4876
 
    }
4877
 
    tab.setTablespaceName(create_info->tablespace);
4878
 
    create_info->storage_media = HA_SM_DISK;  //if use tablespace, that also means store on disk
4879
 
  }
4880
 
 
4881
5056
  // No primary key, create shadow key as 64 bit, auto increment  
4882
5057
  if (form->s->primary_key == MAX_KEY) 
4883
5058
  {
4884
5059
    DBUG_PRINT("info", ("Generating shadow key"));
4885
 
    col.setName("$PK");
 
5060
    if (col.setName("$PK"))
 
5061
    {
 
5062
      DBUG_RETURN(my_errno= errno);
 
5063
    }
4886
5064
    col.setType(NdbDictionary::Column::Bigunsigned);
4887
5065
    col.setLength(1);
4888
5066
    col.setNullable(FALSE);
4889
5067
    col.setPrimaryKey(TRUE);
4890
5068
    col.setAutoIncrement(TRUE);
4891
 
    tab.addColumn(col);
 
5069
    if (tab.addColumn(col))
 
5070
    {
 
5071
      DBUG_RETURN(my_errno= errno);
 
5072
    }
4892
5073
    pk_length += 2;
4893
5074
  }
4894
5075
 
5077
5258
5078
5259
  Ndb* ndb;
5079
5260
  const NDBTAB *tab;
5080
 
  const void *data, *pack_data;
 
5261
  const void *data= NULL, *pack_data= NULL;
5081
5262
  uint length, pack_length;
5082
5263
  int error= 0;
5083
5264
 
5236
5417
    // TODO Only temporary ordered indexes supported
5237
5418
    ndb_index.setLogging(FALSE); 
5238
5419
  }
5239
 
  ndb_index.setTable(m_tabname);
 
5420
  if (ndb_index.setTable(m_tabname))
 
5421
  {
 
5422
    DBUG_RETURN(my_errno= errno);
 
5423
  }
5240
5424
 
5241
5425
  for (; key_part != end; key_part++) 
5242
5426
  {
5243
5427
    Field *field= key_part->field;
5244
5428
    DBUG_PRINT("info", ("attr: %s", field->field_name));
5245
 
    ndb_index.addColumnName(field->field_name);
 
5429
    if (ndb_index.addColumnName(field->field_name))
 
5430
    {
 
5431
      DBUG_RETURN(my_errno= errno);
 
5432
    }
5246
5433
  }
5247
5434
  
5248
5435
  if (dict->createIndex(ndb_index, *m_table))
5403
5590
  }
5404
5591
  // Change current database to that of target table
5405
5592
  set_dbname(to);
5406
 
  ndb->setDatabaseName(m_dbname);
 
5593
  if (ndb->setDatabaseName(m_dbname))
 
5594
  {
 
5595
    ERR_RETURN(ndb->getNdbError());
 
5596
  }
5407
5597
 
5408
5598
  NdbDictionary::Table new_tab= *orig_tab;
5409
5599
  new_tab.setName(new_tabname);
5575
5765
    {
5576
5766
      ndb_table_id= h->m_table->getObjectId();
5577
5767
      ndb_table_version= h->m_table->getObjectVersion();
 
5768
      DBUG_PRINT("info", ("success 1"));
5578
5769
    }
5579
5770
    else
5580
5771
    {
5588
5779
          break;
5589
5780
      }
5590
5781
      res= ndb_to_mysql_error(&dict->getNdbError());
 
5782
      DBUG_PRINT("info", ("error(1) %u", res));
5591
5783
    }
5592
5784
    h->release_metadata(thd, ndb);
5593
5785
  }
5604
5796
        {
5605
5797
          ndb_table_id= ndbtab_g.get_table()->getObjectId();
5606
5798
          ndb_table_version= ndbtab_g.get_table()->getObjectVersion();
 
5799
          DBUG_PRINT("info", ("success 2"));
 
5800
          break;
5607
5801
        }
5608
5802
        else
5609
5803
        {
5623
5817
          }
5624
5818
        }
5625
5819
      }
5626
 
      else
5627
 
        res= ndb_to_mysql_error(&dict->getNdbError());
 
5820
      res= ndb_to_mysql_error(&dict->getNdbError());
 
5821
      DBUG_PRINT("info", ("error(2) %u", res));
5628
5822
      break;
5629
5823
    }
5630
5824
  }
5829
6023
  m_bulk_insert_rows((ha_rows) 1024),
5830
6024
  m_rows_changed((ha_rows) 0),
5831
6025
  m_bulk_insert_not_flushed(FALSE),
 
6026
  m_delete_cannot_batch(FALSE),
 
6027
  m_update_cannot_batch(FALSE),
5832
6028
  m_ops_pending(0),
5833
6029
  m_skip_auto_increment(TRUE),
5834
6030
  m_blobs_pending(0),
5971
6167
  if (!res)
5972
6168
  {
5973
6169
    Ndb *ndb= get_ndb();
5974
 
    ndb->setDatabaseName(m_dbname);
 
6170
    if (ndb->setDatabaseName(m_dbname))
 
6171
    {
 
6172
      ERR_RETURN(ndb->getNdbError());
 
6173
    }
5975
6174
    struct Ndb_statistics stat;
5976
6175
    res= ndb_get_table_statistics(NULL, FALSE, ndb, m_table, &stat);
5977
6176
    stats.mean_rec_length= stat.row_size;
6038
6237
  DBUG_ENTER("seize_thd_ndb");
6039
6238
 
6040
6239
  thd_ndb= new Thd_ndb();
 
6240
  if (thd_ndb == NULL)
 
6241
  {
 
6242
    my_errno= HA_ERR_OUT_OF_MEM;
 
6243
    return NULL;
 
6244
  }
6041
6245
  if (thd_ndb->ndb->init(max_transactions) != 0)
6042
6246
  {
6043
6247
    ERR_PRINT(thd_ndb->ndb->getNdbError());
6090
6294
  
6091
6295
  if (!(ndb= check_ndb_in_thd(thd)))
6092
6296
    DBUG_RETURN(HA_ERR_NO_CONNECTION);
6093
 
  ndb->setDatabaseName(m_dbname);
 
6297
  if (ndb->setDatabaseName(m_dbname))
 
6298
  {
 
6299
    ERR_RETURN(ndb->getNdbError());
 
6300
  }
6094
6301
  DBUG_RETURN(0);
6095
6302
}
6096
6303
 
6120
6327
  int error= 0;
6121
6328
  NdbError ndb_error;
6122
6329
  uint len;
6123
 
  const void* data;
 
6330
  const void* data= NULL;
6124
6331
  Ndb* ndb;
6125
6332
  char key[FN_REFLEN];
6126
6333
  DBUG_ENTER("ndbcluster_discover");
6128
6335
 
6129
6336
  if (!(ndb= check_ndb_in_thd(thd)))
6130
6337
    DBUG_RETURN(HA_ERR_NO_CONNECTION);  
6131
 
  ndb->setDatabaseName(db);
 
6338
  if (ndb->setDatabaseName(db))
 
6339
  {
 
6340
    ERR_RETURN(ndb->getNdbError());
 
6341
  }
6132
6342
  NDBDICT* dict= ndb->getDictionary();
6133
6343
  build_table_filename(key, sizeof(key), db, name, "", 0);
6134
6344
  /* ndb_share reference temporary */
6156
6366
    {
6157
6367
      const NdbError err= dict->getNdbError();
6158
6368
      if (err.code == 709 || err.code == 723)
 
6369
      {
6159
6370
        error= -1;
 
6371
        DBUG_PRINT("info", ("ndb_error.code: %u", ndb_error.code));
 
6372
      }
6160
6373
      else
 
6374
      {
 
6375
        error= -1;
6161
6376
        ndb_error= err;
 
6377
        DBUG_PRINT("info", ("ndb_error.code: %u", ndb_error.code));
 
6378
      }
6162
6379
      goto err;
6163
6380
    }
6164
6381
    DBUG_PRINT("info", ("Found table %s", tab->getName()));
6192
6409
 
6193
6410
  DBUG_RETURN(0);
6194
6411
err:
 
6412
  my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
6195
6413
  if (share)
6196
6414
  {
6197
6415
    /* ndb_share reference temporary free */
6221
6439
 
6222
6440
  if (!(ndb= check_ndb_in_thd(thd)))
6223
6441
    DBUG_RETURN(HA_ERR_NO_CONNECTION);
6224
 
 
6225
6442
  NDBDICT* dict= ndb->getDictionary();
6226
6443
  NdbDictionary::Dictionary::List list;
6227
6444
  if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
6291
6508
  char full_path[FN_REFLEN];
6292
6509
  char *tmp= full_path +
6293
6510
    build_table_filename(full_path, sizeof(full_path), dbname, "", "", 0);
6294
 
 
6295
 
  ndb->setDatabaseName(dbname);
 
6511
  if (ndb->setDatabaseName(dbname))
 
6512
  {
 
6513
    ERR_RETURN(ndb->getNdbError());
 
6514
  }
6296
6515
  List_iterator_fast<char> it(drop_list);
6297
6516
  while ((tabname=it++))
6298
6517
  {
6345
6564
  LEX *old_lex= thd->lex, newlex;
6346
6565
  thd->lex= &newlex;
6347
6566
  newlex.current_select= NULL;
6348
 
  lex_start(thd, (const uchar*) "", 0);
 
6567
  lex_start(thd, "", 0);
6349
6568
  int res= ha_create_table_from_engine(thd, db, table_name);
6350
6569
  thd->lex= old_lex;
6351
6570
  return res;
6783
7002
  {
6784
7003
    DBUG_PRINT("error",("Ndb_cluster_connection(%s)",
6785
7004
                        opt_ndbcluster_connectstring));
 
7005
    my_errno= HA_ERR_OUT_OF_MEM;
6786
7006
    goto ndbcluster_init_error;
6787
7007
  }
6788
7008
  {
6797
7017
  if ( (g_ndb= new Ndb(g_ndb_cluster_connection, "sys")) == 0 )
6798
7018
  {
6799
7019
    DBUG_PRINT("error", ("failed to create global ndb object"));
 
7020
    my_errno= HA_ERR_OUT_OF_MEM;
6800
7021
    goto ndbcluster_init_error;
6801
7022
  }
6802
7023
  if (g_ndb->init() != 0)
7289
7510
  Ndb *ndb;
7290
7511
  if (!(ndb= check_ndb_in_thd(thd)))
7291
7512
    DBUG_RETURN(1);
7292
 
  ndb->setDatabaseName(dbname);
 
7513
  if (ndb->setDatabaseName(dbname))
 
7514
  {
 
7515
    ERR_RETURN(ndb->getNdbError());
 
7516
  }
7293
7517
  uint lock= share->commit_count_lock;
7294
7518
  pthread_mutex_unlock(&share->mutex);
7295
7519
 
7613
7837
  /*
7614
7838
    Ndb share has not been released as it should
7615
7839
  */
 
7840
#ifdef NOT_YET
7616
7841
  DBUG_ASSERT(FALSE);
 
7842
#endif
7617
7843
 
7618
7844
  /*
7619
7845
    This is probably an error.  We can however save the situation
8503
8729
    return((char*)comment);
8504
8730
  }
8505
8731
 
8506
 
  ndb->setDatabaseName(m_dbname);
 
8732
  if (ndb->setDatabaseName(m_dbname))
 
8733
  {
 
8734
    return((char*)comment);
 
8735
  }
8507
8736
  const NDBTAB* tab= m_table;
8508
8737
  DBUG_ASSERT(tab != NULL);
8509
8738
 
8512
8741
  const unsigned fmt_len_plus_extra= length + strlen(fmt);
8513
8742
  if ((str= my_malloc(fmt_len_plus_extra, MYF(0))) == NULL)
8514
8743
  {
 
8744
    sql_print_error("ha_ndbcluster::update_table_comment: "
 
8745
                    "my_malloc(%u) failed", (unsigned int)fmt_len_plus_extra);
8515
8746
    return (char*)comment;
8516
8747
  }
8517
8748
 
8527
8758
{
8528
8759
  THD *thd; /* needs to be first for thread_stack */
8529
8760
  struct timespec abstime;
8530
 
  List<NDB_SHARE> util_open_tables;
8531
8761
  Thd_ndb *thd_ndb;
 
8762
  uint share_list_size= 0;
 
8763
  NDB_SHARE **share_list= NULL;
8532
8764
 
8533
8765
  my_thread_init();
8534
8766
  DBUG_ENTER("ndb_util_thread");
8537
8769
   pthread_mutex_lock(&LOCK_ndb_util_thread);
8538
8770
 
8539
8771
  thd= new THD; /* note that contructor of THD uses DBUG_ */
 
8772
  if (thd == NULL)
 
8773
  {
 
8774
    my_errno= HA_ERR_OUT_OF_MEM;
 
8775
    DBUG_RETURN(NULL);
 
8776
  }
8540
8777
  THD_CHECK_SENTRY(thd);
8541
 
 
8542
8778
  pthread_detach_this_thread();
8543
8779
  ndb_util_thread= pthread_self();
8544
8780
 
8648
8884
    /* Lock mutex and fill list with pointers to all open tables */
8649
8885
    NDB_SHARE *share;
8650
8886
    pthread_mutex_lock(&ndbcluster_mutex);
8651
 
    for (uint i= 0; i < ndbcluster_open_tables.records; i++)
 
8887
    uint i, open_count, record_count= ndbcluster_open_tables.records;
 
8888
    if (share_list_size < record_count)
 
8889
    {
 
8890
      NDB_SHARE ** new_share_list= new NDB_SHARE * [record_count];
 
8891
      if (!new_share_list)
 
8892
      {
 
8893
        sql_print_warning("ndb util thread: malloc failure, "
 
8894
                          "query cache not maintained properly");
 
8895
        pthread_mutex_unlock(&ndbcluster_mutex);
 
8896
        goto next;                               // At least do not crash
 
8897
      }
 
8898
      delete [] share_list;
 
8899
      share_list_size= record_count;
 
8900
      share_list= new_share_list;
 
8901
    }
 
8902
    for (i= 0, open_count= 0; i < record_count; i++)
8652
8903
    {
8653
8904
      share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i);
8654
8905
#ifdef HAVE_NDB_BINLOG
8666
8917
                  i, share->table_name, share->use_count));
8667
8918
 
8668
8919
      /* Store pointer to table */
8669
 
      util_open_tables.push_back(share);
 
8920
      share_list[open_count++]= share;
8670
8921
    }
8671
8922
    pthread_mutex_unlock(&ndbcluster_mutex);
8672
8923
 
8673
 
    /* Iterate through the  open files list */
8674
 
    List_iterator_fast<NDB_SHARE> it(util_open_tables);
8675
 
    while ((share= it++))
 
8924
    /* Iterate through the open files list */
 
8925
    for (i= 0; i < open_count; i++)
8676
8926
    {
 
8927
      share= share_list[i];
8677
8928
#ifdef HAVE_NDB_BINLOG
8678
8929
      if ((share->use_count - (int) (share->op != 0) - (int) (share->op != 0))
8679
8930
          <= 1)
8696
8947
      pthread_mutex_lock(&share->mutex);
8697
8948
      lock= share->commit_count_lock;
8698
8949
      pthread_mutex_unlock(&share->mutex);
8699
 
 
8700
8950
      {
8701
8951
        /* Contact NDB to get commit count for table */
8702
8952
        Ndb* ndb= thd_ndb->ndb;
8703
 
        ndb->setDatabaseName(share->db);
 
8953
        if (ndb->setDatabaseName(share->db))
 
8954
        {
 
8955
          goto loop_next;
 
8956
        }
8704
8957
        Ndb_table_guard ndbtab_g(ndb->getDictionary(), share->table_name);
8705
8958
        if (ndbtab_g.get_table() &&
8706
8959
            ndb_get_table_statistics(NULL, FALSE, ndb,
8723
8976
          stat.commit_count= 0;
8724
8977
        }
8725
8978
      }
8726
 
 
 
8979
  loop_next:
8727
8980
      pthread_mutex_lock(&share->mutex);
8728
8981
      if (share->commit_count_lock == lock)
8729
8982
        share->commit_count= stat.commit_count;
8734
8987
                               share->key, share->use_count));
8735
8988
      free_share(&share);
8736
8989
    }
8737
 
 
8738
 
    /* Clear the list of open tables */
8739
 
    util_open_tables.empty();
8740
 
 
 
8990
next:
8741
8991
    /* Calculate new time to wake up */
8742
8992
    int secs= 0;
8743
8993
    int msecs= ndb_cache_check_time;
8765
9015
ndb_util_thread_end:
8766
9016
  net_end(&thd->net);
8767
9017
ndb_util_thread_fail:
 
9018
  if (share_list)
 
9019
    delete [] share_list;
8768
9020
  thd->cleanup();
8769
9021
  delete thd;
8770
9022
  
8804
9056
8805
9057
  DBUG_ENTER("cond_push");
8806
9058
  Ndb_cond_stack *ndb_cond = new Ndb_cond_stack();
 
9059
  if (ndb_cond == NULL)
 
9060
  {
 
9061
    my_errno= HA_ERR_OUT_OF_MEM;
 
9062
    DBUG_RETURN(NULL);
 
9063
  }
8807
9064
  DBUG_EXECUTE("where",print_where((COND *)cond, m_tabname););
8808
9065
  if (m_cond_stack)
8809
9066
    ndb_cond->next= m_cond_stack;
10620
10877
 
10621
10878
  int pk= 0;
10622
10879
  int ai= 0;
 
10880
 
 
10881
  if (create_info->tablespace)
 
10882
    create_info->storage_media = HA_SM_DISK;
 
10883
  else
 
10884
    create_info->storage_media = HA_SM_MEMORY;
 
10885
 
10623
10886
  for (i= 0; i < table->s->fields; i++) 
10624
10887
  {
10625
10888
    Field *field= table->field[i];
10626
10889
    const NDBCOL *col= tab->getColumn(i);
 
10890
    if (col->getStorageType() == NDB_STORAGETYPE_MEMORY && create_info->storage_media != HA_SM_MEMORY ||
 
10891
        col->getStorageType() == NDB_STORAGETYPE_DISK && create_info->storage_media != HA_SM_DISK)
 
10892
    {
 
10893
      DBUG_PRINT("info", ("Column storage media is changed"));
 
10894
      DBUG_RETURN(COMPATIBLE_DATA_NO);
 
10895
    }
 
10896
    
10627
10897
    if (field->flags & FIELD_IS_RENAMED)
10628
10898
    {
10629
10899
      DBUG_PRINT("info", ("Field has been renamed, copy table"));
10641
10911
    if (field->flags & FIELD_IN_ADD_INDEX)
10642
10912
      ai=1;
10643
10913
  }
 
10914
 
 
10915
  char tablespace_name[FN_LEN]; 
 
10916
  if (get_tablespace_name(current_thd, tablespace_name, FN_LEN))
 
10917
  {
 
10918
    if (create_info->tablespace) 
 
10919
    {
 
10920
      if (strcmp(create_info->tablespace, tablespace_name))
 
10921
      {
 
10922
        DBUG_PRINT("info", ("storage media is changed, old tablespace=%s, new tablespace=%s",
 
10923
          tablespace_name, create_info->tablespace));
 
10924
        DBUG_RETURN(COMPATIBLE_DATA_NO);
 
10925
      }
 
10926
    }
 
10927
    else
 
10928
    {
 
10929
      DBUG_PRINT("info", ("storage media is changed, old is DISK and tablespace=%s, new is MEM",
 
10930
        tablespace_name));
 
10931
      DBUG_RETURN(COMPATIBLE_DATA_NO);
 
10932
    }
 
10933
  }
 
10934
  else
 
10935
  {
 
10936
    if (create_info->storage_media != HA_SM_MEMORY)
 
10937
    {
 
10938
      DBUG_PRINT("info", ("storage media is changed, old is MEM, new is DISK and tablespace=%s",
 
10939
        create_info->tablespace));
 
10940
      DBUG_RETURN(COMPATIBLE_DATA_NO);
 
10941
    }
 
10942
  }
 
10943
 
10644
10944
  if (table_changes != IS_EQUAL_YES)
10645
10945
    DBUG_RETURN(COMPATIBLE_DATA_NO);
10646
10946