~mdcallag/+junk/5.1-map

« back to all changes in this revision

Viewing changes to sql/ha_ndbcluster_binlog.cc

  • Committer: msvensson at pilot
  • Date: 2007-04-24 09:11:45 UTC
  • mfrom: (2469.1.106)
  • Revision ID: sp1r-msvensson@pilot.blaudden-20070424091145-10463
Merge pilot.blaudden:/home/msvensson/mysql/my51-m-mysql_upgrade
into  pilot.blaudden:/home/msvensson/mysql/mysql-5.1-maint

Show diffs side-by-side

added added

removed removed

Lines of Context:
500
500
static int
501
501
ndbcluster_binlog_index_purge_file(THD *thd, const char *file)
502
502
{
503
 
  if (!ndb_binlog_running)
 
503
  if (!ndb_binlog_running || thd->slave_thread)
504
504
    return 0;
505
505
 
506
506
  DBUG_ENTER("ndbcluster_binlog_index_purge_file");
582
582
  ndbcluster_binlog_inited= 0;
583
583
 
584
584
#ifdef HAVE_NDB_BINLOG
 
585
  if (ndb_util_thread_running > 0)
 
586
  {
 
587
    /*
 
588
      Wait for util thread to die (as this uses the injector mutex)
 
589
      There is a very small change that ndb_util_thread dies and the
 
590
      following mutex is freed before it's accessed. This shouldn't
 
591
      however be a likely case as the ndbcluster_binlog_end is supposed to
 
592
      be called before ndb_cluster_end().
 
593
    */
 
594
    pthread_mutex_lock(&LOCK_ndb_util_thread);
 
595
    /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
 
596
    ndb_util_thread_running++;
 
597
    ndbcluster_terminating= 1;
 
598
    pthread_cond_signal(&COND_ndb_util_thread);
 
599
    while (ndb_util_thread_running > 1)
 
600
      pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
 
601
    ndb_util_thread_running--;
 
602
    pthread_mutex_unlock(&LOCK_ndb_util_thread);
 
603
  }
 
604
 
585
605
  /* wait for injector thread to finish */
586
606
  ndbcluster_binlog_terminating= 1;
 
607
  pthread_mutex_lock(&injector_mutex);
587
608
  pthread_cond_signal(&injector_cond);
588
 
  pthread_mutex_lock(&injector_mutex);
589
609
  while (ndb_binlog_thread_running > 0)
590
610
    pthread_cond_wait(&injector_cond, &injector_mutex);
591
611
  pthread_mutex_unlock(&injector_mutex);
729
749
                   NDB_REP_DB "." NDB_APPLY_TABLE
730
750
                   " ( server_id INT UNSIGNED NOT NULL,"
731
751
                   " epoch BIGINT UNSIGNED NOT NULL, "
 
752
                   " log_name VARCHAR(255) BINARY NOT NULL, "
 
753
                   " start_pos BIGINT UNSIGNED NOT NULL, "
 
754
                   " end_pos BIGINT UNSIGNED NOT NULL, "
732
755
                   " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB");
733
756
 
734
757
  run_query(thd, buf, end, TRUE, TRUE);
869
892
  uint32 id;
870
893
  uint32 version;
871
894
  uint32 type;
 
895
  uint32 any_value;
872
896
};
873
897
 
874
898
/*
950
974
/*
951
975
  helper function to pack a ndb varchar
952
976
*/
953
 
static char *ndb_pack_varchar(const NDBCOL *col, char *buf,
954
 
                              const char *str, int sz)
 
977
char *ndb_pack_varchar(const NDBCOL *col, char *buf,
 
978
                       const char *str, int sz)
955
979
{
956
980
  switch (col->getArrayType())
957
981
  {
1389
1413
      /* type */
1390
1414
      r|= op->setValue(SCHEMA_TYPE_I, log_type);
1391
1415
      DBUG_ASSERT(r == 0);
 
1416
      /* any value */
 
1417
      if (!(thd->options & OPTION_BIN_LOG))
 
1418
        r|= op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
 
1419
      else
 
1420
        r|= op->setAnyValue(thd->server_id);
 
1421
      DBUG_ASSERT(r == 0);
1392
1422
      if (log_db != new_db && new_db && new_table_name)
1393
1423
      {
1394
1424
        log_db= new_db;
1734
1764
  DBUG_RETURN(0);
1735
1765
}
1736
1766
 
 
1767
static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
 
1768
{
 
1769
  if (schema->any_value & NDB_ANYVALUE_RESERVED)
 
1770
  {
 
1771
    if (schema->any_value != NDB_ANYVALUE_FOR_NOLOGGING)
 
1772
      sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
 
1773
                        "query not logged",
 
1774
                        schema->any_value);
 
1775
    return;
 
1776
  }
 
1777
  uint32 thd_server_id_save= thd->server_id;
 
1778
  DBUG_ASSERT(sizeof(thd_server_id_save) == sizeof(thd->server_id));
 
1779
  char *thd_db_save= thd->db;
 
1780
  if (schema->any_value == 0)
 
1781
    thd->server_id= ::server_id;
 
1782
  else
 
1783
    thd->server_id= schema->any_value;
 
1784
  thd->db= schema->db;
 
1785
  thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
 
1786
                    schema->query_length, FALSE,
 
1787
                    schema->name[0] == 0 || thd->db[0] == 0);
 
1788
  thd->server_id= thd_server_id_save;
 
1789
  thd->db= thd_db_save;
 
1790
}
 
1791
 
1737
1792
static int
1738
1793
ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
1739
1794
                                      NdbEventOperation *pOp,
1758
1813
      MY_BITMAP slock;
1759
1814
      bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, FALSE);
1760
1815
      uint node_id= g_ndb_cluster_connection->node_id();
1761
 
      ndbcluster_get_schema(tmp_share, schema);
 
1816
      {
 
1817
        ndbcluster_get_schema(tmp_share, schema);
 
1818
        schema->any_value= pOp->getAnyValue();
 
1819
      }
1762
1820
      enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
1763
1821
      DBUG_PRINT("info",
1764
1822
                 ("%s.%s: log query_length: %d  query: '%s'  type: %d",
1882
1940
          run_query(thd, schema->query,
1883
1941
                    schema->query + schema->query_length,
1884
1942
                    TRUE,    /* print error */
1885
 
                    FALSE);  /* binlog the query */
 
1943
                    TRUE);   /* don't binlog the query */
 
1944
          log_query= 1;
1886
1945
          break;
1887
1946
        case SOT_TABLESPACE:
1888
1947
        case SOT_LOGFILE_GROUP:
1892
1951
          abort();
1893
1952
        }
1894
1953
        if (log_query && ndb_binlog_running)
1895
 
        {
1896
 
          char *thd_db_save= thd->db;
1897
 
          thd->db= schema->db;
1898
 
          thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
1899
 
                            schema->query_length, FALSE,
1900
 
                            schema->name[0] == 0 || thd->db[0] == 0);
1901
 
          thd->db= thd_db_save;
1902
 
        }
 
1954
          ndb_binlog_query(thd, schema);
1903
1955
        /* signal that schema operation has been handled */
1904
1956
        DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length);
1905
1957
        if (bitmap_is_set(&slock, node_id))
2076
2128
        log_query= 1;
2077
2129
        break;
2078
2130
      case SOT_DROP_TABLE:
 
2131
        log_query= 1;
2079
2132
        // invalidation already handled by binlog thread
2080
2133
        if (share && share->op)
2081
2134
        {
2082
 
          log_query= 1;
2083
2135
          break;
2084
2136
        }
2085
2137
        // fall through
2157
2209
      }
2158
2210
    }
2159
2211
    if (ndb_binlog_running && log_query)
2160
 
    {
2161
 
      char *thd_db_save= thd->db;
2162
 
      thd->db= schema->db;
2163
 
      thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
2164
 
                        schema->query_length, FALSE,
2165
 
                        schema->name[0] == 0);
2166
 
      thd->db= thd_db_save;
2167
 
    }
 
2212
      ndb_binlog_query(thd, schema);
2168
2213
  }
2169
2214
  while ((schema= post_epoch_unlock_list->pop()))
2170
2215
  {
2270
2315
    break;
2271
2316
  }
2272
2317
 
2273
 
  // Set all fields non-null.
2274
 
  if(ndb_binlog_index->s->null_bytes > 0)
2275
 
    bzero(ndb_binlog_index->record[0], ndb_binlog_index->s->null_bytes);
 
2318
  /*
 
2319
    Intialize ndb_binlog_index->record[0]
 
2320
  */
 
2321
  empty_record(ndb_binlog_index);
 
2322
 
2276
2323
  ndb_binlog_index->field[0]->store(row.master_log_pos);
2277
2324
  ndb_binlog_index->field[1]->store(row.master_log_file,
2278
2325
                                strlen(row.master_log_file),
2318
2365
{
2319
2366
  DBUG_ENTER("ndbcluster_binlog_start");
2320
2367
 
 
2368
  if (::server_id == 0)
 
2369
  {
 
2370
    sql_print_warning("NDB: server id set to zero will cause any other mysqld "
 
2371
                      "with bin log to log with wrong server id");
 
2372
  }
 
2373
  else if (::server_id & 0x1 << 31)
 
2374
  {
 
2375
    sql_print_error("NDB: server id's with high bit set is reserved for internal "
 
2376
                    "purposes");
 
2377
    DBUG_RETURN(-1);
 
2378
  }
 
2379
 
2321
2380
  pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST);
2322
2381
  pthread_cond_init(&injector_cond, NULL);
2323
2382
  pthread_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST);
3186
3245
  NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
3187
3246
  if (share == ndb_apply_status_share)
3188
3247
    return 0;
 
3248
 
 
3249
  uint32 originating_server_id= pOp->getAnyValue();
 
3250
  if (originating_server_id == 0)
 
3251
    originating_server_id= ::server_id;
 
3252
  else if (originating_server_id & NDB_ANYVALUE_RESERVED)
 
3253
  {
 
3254
    if (originating_server_id != NDB_ANYVALUE_FOR_NOLOGGING)
 
3255
      sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
 
3256
                        "event not logged",
 
3257
                        originating_server_id);
 
3258
    return 0;
 
3259
  }
 
3260
 
3189
3261
  TABLE *table= share->table;
3190
 
 
3191
3262
  DBUG_ASSERT(trans.good());
3192
3263
  DBUG_ASSERT(table != 0);
3193
3264
 
3232
3303
        DBUG_ASSERT(ret == 0);
3233
3304
      }
3234
3305
      ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
3235
 
      IF_DBUG(int ret=) trans.write_row(::server_id,
 
3306
      IF_DBUG(int ret=) trans.write_row(originating_server_id,
3236
3307
                                        injector::transaction::table(table,
3237
3308
                                                                     TRUE),
3238
3309
                                        &b, n_fields, table->record[0]);
3272
3343
      }
3273
3344
      ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
3274
3345
      DBUG_EXECUTE("info", print_records(table, table->record[n]););
3275
 
      IF_DBUG(int ret =) trans.delete_row(::server_id,
 
3346
      IF_DBUG(int ret =) trans.delete_row(originating_server_id,
3276
3347
                                          injector::transaction::table(table,
3277
3348
                                                                       TRUE),
3278
3349
                                          &b, n_fields, table->record[n]);
3302
3373
          since table has a primary key, we can do a write
3303
3374
          using only after values
3304
3375
        */
3305
 
        trans.write_row(::server_id, injector::transaction::table(table, TRUE),
 
3376
        trans.write_row(originating_server_id,
 
3377
                        injector::transaction::table(table, TRUE),
3306
3378
                        &b, n_fields, table->record[0]);// after values
3307
3379
      }
3308
3380
      else
3322
3394
        }
3323
3395
        ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
3324
3396
        DBUG_EXECUTE("info", print_records(table, table->record[1]););
3325
 
        IF_DBUG(int ret =) trans.update_row(::server_id,
 
3397
        IF_DBUG(int ret =) trans.update_row(originating_server_id,
3326
3398
                                            injector::transaction::table(table,
3327
3399
                                                                         TRUE),
3328
3400
                                            &b, n_fields,
3582
3654
    Main NDB Injector loop
3583
3655
  */
3584
3656
  {
 
3657
    /*
 
3658
      Always insert a GAP event as we cannot know what has happened in the cluster
 
3659
      while not being connected.
 
3660
    */
 
3661
    LEX_STRING const msg= { C_STRING_WITH_LEN("Cluster connect") };
 
3662
    inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg);
 
3663
  }
 
3664
  {
3585
3665
    thd->proc_info= "Waiting for ndbcluster to start";
3586
3666
 
3587
3667
    pthread_mutex_lock(&injector_mutex);
3893
3973
            IF_DBUG(int ret=) trans.use_table(::server_id, tbl);
3894
3974
            DBUG_ASSERT(ret == 0);
3895
3975
 
3896
 
            // Set all fields non-null.
3897
 
            if(table->s->null_bytes > 0)
3898
 
              bzero(table->record[0], table->s->null_bytes);
 
3976
            /* 
 
3977
               Intialize table->record[0] 
 
3978
            */
 
3979
            empty_record(table);
 
3980
 
3899
3981
            table->field[0]->store((longlong)::server_id);
3900
3982
            table->field[1]->store((longlong)gci);
 
3983
            table->field[2]->store("", 0, &my_charset_bin);
 
3984
            table->field[3]->store((longlong)0);
 
3985
            table->field[4]->store((longlong)0);
3901
3986
            trans.write_row(::server_id,
3902
3987
                            injector::transaction::table(table, TRUE),
3903
3988
                            &table->s->all_set, table->s->fields,