~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to sql/repl_failsafe.cc

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2001-2006 MySQL AB & Sasha
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/**
 
17
  @file
 
18
 
 
19
  All of the functions defined in this file which are not used (the ones to
 
20
  handle failsafe) are not used; their code has not been updated for more
 
21
  than one year now so should be considered as BADLY BROKEN. Do not enable
 
22
  it. The used functions (to handle LOAD DATA FROM MASTER, plus some small
 
23
  functions like register_slave()) are working.
 
24
*/
 
25
 
 
26
#include "mysql_priv.h"
 
27
#ifdef HAVE_REPLICATION
 
28
 
 
29
#include "repl_failsafe.h"
 
30
#include "sql_repl.h"
 
31
#include "slave.h"
 
32
#include "rpl_mi.h"
 
33
#include "rpl_filter.h"
 
34
#include "log_event.h"
 
35
#include <mysql.h>
 
36
 
 
37
#define SLAVE_LIST_CHUNK 128
 
38
#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
 
39
 
 
40
 
 
41
RPL_STATUS rpl_status=RPL_NULL;
 
42
pthread_mutex_t LOCK_rpl_status;
 
43
pthread_cond_t COND_rpl_status;
 
44
HASH slave_list;
 
45
 
 
46
const char *rpl_role_type[] = {"MASTER","SLAVE",NullS};
 
47
TYPELIB rpl_role_typelib = {array_elements(rpl_role_type)-1,"",
 
48
                            rpl_role_type, NULL};
 
49
 
 
50
const char* rpl_status_type[]=
 
51
{
 
52
  "AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE", "LOST_SOLDIER","TROOP_SOLDIER",
 
53
  "RECOVERY_CAPTAIN","NULL",NullS
 
54
};
 
55
TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"",
 
56
                             rpl_status_type, NULL};
 
57
 
 
58
 
 
59
static Slave_log_event* find_slave_event(IO_CACHE* log,
 
60
                                         const char* log_file_name,
 
61
                                         char* errmsg);
 
62
 
 
63
/*
 
64
  All of the functions defined in this file which are not used (the ones to
 
65
  handle failsafe) are not used; their code has not been updated for more than
 
66
  one year now so should be considered as BADLY BROKEN. Do not enable it.
 
67
  The used functions (to handle LOAD DATA FROM MASTER, plus some small
 
68
  functions like register_slave()) are working.
 
69
*/
 
70
 
 
71
#if NOT_USED
 
72
static int init_failsafe_rpl_thread(THD* thd)
 
73
{
 
74
  DBUG_ENTER("init_failsafe_rpl_thread");
 
75
  thd->system_thread = SYSTEM_THREAD_DELAYED_INSERT;
 
76
  /*
 
77
    thd->bootstrap is to report errors barely to stderr; if this code is
 
78
    enable again one day, one should check if bootstrap is still needed (maybe
 
79
    this thread has no other error reporting method).
 
80
  */
 
81
  thd->bootstrap = 1;
 
82
  thd->security_ctx->skip_grants();
 
83
  my_net_init(&thd->net, 0);
 
84
  thd->net.read_timeout = slave_net_timeout;
 
85
  thd->max_client_packet_length=thd->net.max_packet;
 
86
  pthread_mutex_lock(&LOCK_thread_count);
 
87
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
 
88
  pthread_mutex_unlock(&LOCK_thread_count);
 
89
 
 
90
  if (init_thr_lock() || thd->store_globals())
 
91
  {
 
92
    /* purecov: begin inspected */
 
93
    close_connection(thd, ER_OUT_OF_RESOURCES, 1); // is this needed?
 
94
    statistic_increment(aborted_connects,&LOCK_status);
 
95
    one_thread_per_connection_end(thd,0);
 
96
    DBUG_RETURN(-1);
 
97
    /* purecov: end */
 
98
  }
 
99
 
 
100
  thd->mem_root->free= thd->mem_root->used= 0;
 
101
  if (thd->variables.max_join_size == HA_POS_ERROR)
 
102
    thd->options|= OPTION_BIG_SELECTS;
 
103
 
 
104
  thd_proc_info(thd, "Thread initialized");
 
105
  thd->version=refresh_version;
 
106
  thd->set_time();
 
107
  DBUG_RETURN(0);
 
108
}
 
109
#endif
 
110
 
 
111
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status)
 
112
{
 
113
  pthread_mutex_lock(&LOCK_rpl_status);
 
114
  if (rpl_status == from_status || rpl_status == RPL_ANY)
 
115
    rpl_status = to_status;
 
116
  pthread_cond_signal(&COND_rpl_status);
 
117
  pthread_mutex_unlock(&LOCK_rpl_status);
 
118
}
 
119
 
 
120
 
 
121
#define get_object(p, obj, msg) \
 
122
{\
 
123
  uint len = (uint)*p++;  \
 
124
  if (p + len > p_end || len >= sizeof(obj)) \
 
125
  {\
 
126
    errmsg= msg;\
 
127
    goto err; \
 
128
  }\
 
129
  strmake(obj,(char*) p,len); \
 
130
  p+= len; \
 
131
}\
 
132
 
 
133
 
 
134
static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi)
 
135
{
 
136
  return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name,
 
137
                        mi->pos);
 
138
}
 
139
 
 
140
 
 
141
void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
 
142
{
 
143
  if (thd->server_id)
 
144
  {
 
145
    if (need_mutex)
 
146
      pthread_mutex_lock(&LOCK_slave_list);
 
147
 
 
148
    SLAVE_INFO* old_si;
 
149
    if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
 
150
                                           (uchar*)&thd->server_id, 4)) &&
 
151
        (!only_mine || old_si->thd == thd))
 
152
    hash_delete(&slave_list, (uchar*)old_si);
 
153
 
 
154
    if (need_mutex)
 
155
      pthread_mutex_unlock(&LOCK_slave_list);
 
156
  }
 
157
}
 
158
 
 
159
 
 
160
/**
 
161
  Register slave in 'slave_list' hash table.
 
162
 
 
163
  @return
 
164
    0   ok
 
165
  @return
 
166
    1   Error.   Error message sent to client
 
167
*/
 
168
 
 
169
int register_slave(THD* thd, uchar* packet, uint packet_length)
 
170
{
 
171
  int res;
 
172
  SLAVE_INFO *si;
 
173
  uchar *p= packet, *p_end= packet + packet_length;
 
174
  const char *errmsg= "Wrong parameters to function register_slave";
 
175
 
 
176
  if (check_access(thd, REPL_SLAVE_ACL, any_db,0,0,0,0))
 
177
    return 1;
 
178
  if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
 
179
    goto err2;
 
180
 
 
181
  thd->server_id= si->server_id= uint4korr(p);
 
182
  p+= 4;
 
183
  get_object(p,si->host, "Failed to register slave: too long 'report-host'");
 
184
  get_object(p,si->user, "Failed to register slave: too long 'report-user'");
 
185
  get_object(p,si->password, "Failed to register slave; too long 'report-password'");
 
186
  if (p+10 > p_end)
 
187
    goto err;
 
188
  si->port= uint2korr(p);
 
189
  p += 2;
 
190
  si->rpl_recovery_rank= uint4korr(p);
 
191
  p += 4;
 
192
  if (!(si->master_id= uint4korr(p)))
 
193
    si->master_id= server_id;
 
194
  si->thd= thd;
 
195
 
 
196
  pthread_mutex_lock(&LOCK_slave_list);
 
197
  unregister_slave(thd,0,0);
 
198
  res= my_hash_insert(&slave_list, (uchar*) si);
 
199
  pthread_mutex_unlock(&LOCK_slave_list);
 
200
  return res;
 
201
 
 
202
err:
 
203
  my_free(si, MYF(MY_WME));
 
204
  my_message(ER_UNKNOWN_ERROR, errmsg, MYF(0)); /* purecov: inspected */
 
205
err2:
 
206
  return 1;
 
207
}
 
208
 
 
209
extern "C" uint32
 
210
*slave_list_key(SLAVE_INFO* si, size_t *len,
 
211
                my_bool not_used __attribute__((unused)))
 
212
{
 
213
  *len = 4;
 
214
  return &si->server_id;
 
215
}
 
216
 
 
217
extern "C" void slave_info_free(void *s)
 
218
{
 
219
  my_free(s, MYF(MY_WME));
 
220
}
 
221
 
 
222
void init_slave_list()
 
223
{
 
224
  hash_init(&slave_list, system_charset_info, SLAVE_LIST_CHUNK, 0, 0,
 
225
            (hash_get_key) slave_list_key, (hash_free_key) slave_info_free, 0);
 
226
  pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST);
 
227
}
 
228
 
 
229
void end_slave_list()
 
230
{
 
231
  /* No protection by a mutex needed as we are only called at shutdown */
 
232
  if (hash_inited(&slave_list))
 
233
  {
 
234
    hash_free(&slave_list);
 
235
    pthread_mutex_destroy(&LOCK_slave_list);
 
236
  }
 
237
}
 
238
 
 
239
static int find_target_pos(LEX_MASTER_INFO *mi, IO_CACHE *log, char *errmsg)
 
240
{
 
241
  my_off_t log_pos =        (my_off_t) mi->pos;
 
242
  uint32 target_server_id = mi->server_id;
 
243
 
 
244
  for (;;)
 
245
  {
 
246
    Log_event* ev;
 
247
    if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*) 0, 0)))
 
248
    {
 
249
      if (log->error > 0)
 
250
        strmov(errmsg, "Binary log truncated in the middle of event");
 
251
      else if (log->error < 0)
 
252
        strmov(errmsg, "I/O error reading binary log");
 
253
      else
 
254
        strmov(errmsg, "Could not find target event in the binary log");
 
255
      return 1;
 
256
    }
 
257
 
 
258
    if (ev->log_pos >= log_pos && ev->server_id == target_server_id)
 
259
    {
 
260
      delete ev;
 
261
      mi->pos = my_b_tell(log);
 
262
      return 0;
 
263
    }
 
264
    delete ev;
 
265
  }
 
266
  /* Impossible */
 
267
}
 
268
 
 
269
/**
 
270
  @details 
 
271
  Before 4.0.15 we had a member of THD called log_pos, it was meant for
 
272
  failsafe replication code in repl_failsafe.cc which is disabled until
 
273
  it is reworked. Event's log_pos used to be preserved through 
 
274
  log-slave-updates to make code in repl_failsafe.cc work (this 
 
275
  function, SHOW NEW MASTER); but on the other side it caused unexpected
 
276
  values in Exec_Master_Log_Pos in A->B->C replication setup, 
 
277
  synchronization problems in master_pos_wait(), ... So we 
 
278
  (Dmitri & Guilhem) removed it.
 
279
  
 
280
  So for now this function is broken. 
 
281
*/
 
282
 
 
283
int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
 
284
{
 
285
  LOG_INFO linfo;
 
286
  char last_log_name[FN_REFLEN];
 
287
  IO_CACHE log;
 
288
  File file = -1, last_file = -1;
 
289
  pthread_mutex_t *log_lock;
 
290
  const char* errmsg_p;
 
291
  Slave_log_event* sev = 0;
 
292
  my_off_t last_pos = 0;
 
293
  int error = 1;
 
294
  int cmp_res;
 
295
  LINT_INIT(cmp_res);
 
296
  DBUG_ENTER("translate_master");
 
297
 
 
298
  if (!mysql_bin_log.is_open())
 
299
  {
 
300
    strmov(errmsg,"Binary log is not open");
 
301
    DBUG_RETURN(1);
 
302
  }
 
303
 
 
304
  if (!server_id_supplied)
 
305
  {
 
306
    strmov(errmsg, "Misconfigured master - server id was not set");
 
307
    DBUG_RETURN(1);
 
308
  }
 
309
 
 
310
  if (mysql_bin_log.find_log_pos(&linfo, NullS, 1))
 
311
  {
 
312
    strmov(errmsg,"Could not find first log");
 
313
    DBUG_RETURN(1);
 
314
  }
 
315
  thd->current_linfo = &linfo;
 
316
 
 
317
  bzero((char*) &log,sizeof(log));
 
318
  log_lock = mysql_bin_log.get_log_lock();
 
319
  pthread_mutex_lock(log_lock);
 
320
 
 
321
  for (;;)
 
322
  {
 
323
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0)
 
324
    {
 
325
      strmov(errmsg, errmsg_p);
 
326
      goto err;
 
327
    }
 
328
 
 
329
    if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg)))
 
330
      goto err;
 
331
 
 
332
    cmp_res = cmp_master_pos(sev, mi);
 
333
    delete sev;
 
334
 
 
335
    if (!cmp_res)
 
336
    {
 
337
      /* Copy basename */
 
338
      fn_format(mi->log_file_name, linfo.log_file_name, "","",1);
 
339
      mi->pos = my_b_tell(&log);
 
340
      goto mi_inited;
 
341
    }
 
342
    else if (cmp_res > 0)
 
343
    {
 
344
      if (!last_pos)
 
345
      {
 
346
        strmov(errmsg,
 
347
               "Slave event in first log points past the target position");
 
348
        goto err;
 
349
      }
 
350
      end_io_cache(&log);
 
351
      (void) my_close(file, MYF(MY_WME));
 
352
      if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0,
 
353
                        MYF(MY_WME)))
 
354
      {
 
355
        errmsg[0] = 0;
 
356
        goto err;
 
357
      }
 
358
      break;
 
359
    }
 
360
 
 
361
    strmov(last_log_name, linfo.log_file_name);
 
362
    last_pos = my_b_tell(&log);
 
363
 
 
364
    switch (mysql_bin_log.find_next_log(&linfo, 1)) {
 
365
    case LOG_INFO_EOF:
 
366
      if (last_file >= 0)
 
367
       (void)my_close(last_file, MYF(MY_WME));
 
368
      last_file = -1;
 
369
      goto found_log;
 
370
    case 0:
 
371
      break;
 
372
    default:
 
373
      strmov(errmsg, "Error reading log index");
 
374
      goto err;
 
375
    }
 
376
 
 
377
    end_io_cache(&log);
 
378
    if (last_file >= 0)
 
379
     (void) my_close(last_file, MYF(MY_WME));
 
380
    last_file = file;
 
381
  }
 
382
 
 
383
found_log:
 
384
  my_b_seek(&log, last_pos);
 
385
  if (find_target_pos(mi,&log,errmsg))
 
386
    goto err;
 
387
  fn_format(mi->log_file_name, last_log_name, "","",1);  /* Copy basename */
 
388
 
 
389
mi_inited:
 
390
  error = 0;
 
391
err:
 
392
  pthread_mutex_unlock(log_lock);
 
393
  end_io_cache(&log);
 
394
  pthread_mutex_lock(&LOCK_thread_count);
 
395
  thd->current_linfo = 0;
 
396
  pthread_mutex_unlock(&LOCK_thread_count);
 
397
  if (file >= 0)
 
398
    (void) my_close(file, MYF(MY_WME));
 
399
  if (last_file >= 0 && last_file != file)
 
400
    (void) my_close(last_file, MYF(MY_WME));
 
401
 
 
402
  DBUG_RETURN(error);
 
403
}
 
404
 
 
405
 
 
406
/**
 
407
  Caller must delete result when done.
 
408
*/
 
409
 
 
410
static Slave_log_event* find_slave_event(IO_CACHE* log,
 
411
                                         const char* log_file_name,
 
412
                                         char* errmsg)
 
413
{
 
414
  Log_event* ev;
 
415
  int i;
 
416
  bool slave_event_found = 0;
 
417
  LINT_INIT(ev);
 
418
 
 
419
  for (i = 0; i < 2; i++)
 
420
  {
 
421
    if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0, 0)))
 
422
    {
 
423
      my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
 
424
                  "Error reading event in log '%s'",
 
425
                  (char*)log_file_name);
 
426
      return 0;
 
427
    }
 
428
    if (ev->get_type_code() == SLAVE_EVENT)
 
429
    {
 
430
      slave_event_found = 1;
 
431
      break;
 
432
    }
 
433
    delete ev;
 
434
  }
 
435
  if (!slave_event_found)
 
436
  {
 
437
    my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
 
438
                "Could not find slave event in log '%s'",
 
439
                (char*)log_file_name);
 
440
    return 0;
 
441
  }
 
442
 
 
443
  return (Slave_log_event*)ev;
 
444
}
 
445
 
 
446
/**
 
447
  This function is broken now. 
 
448
 
 
449
  @seealso translate_master()
 
450
*/
 
451
 
 
452
bool show_new_master(THD* thd)
 
453
{
 
454
  Protocol *protocol= thd->protocol;
 
455
  DBUG_ENTER("show_new_master");
 
456
  List<Item> field_list;
 
457
  char errmsg[SLAVE_ERRMSG_SIZE];
 
458
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
 
459
 
 
460
  errmsg[0]=0;                                  // Safety
 
461
  if (translate_master(thd, lex_mi, errmsg))
 
462
  {
 
463
    if (errmsg[0])
 
464
      my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
 
465
               "SHOW NEW MASTER", errmsg);
 
466
    DBUG_RETURN(TRUE);
 
467
  }
 
468
  else
 
469
  {
 
470
    field_list.push_back(new Item_empty_string("Log_name", 20));
 
471
    field_list.push_back(new Item_return_int("Log_pos", 10,
 
472
                                             MYSQL_TYPE_LONGLONG));
 
473
    if (protocol->send_fields(&field_list,
 
474
                              Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
475
      DBUG_RETURN(TRUE);
 
476
    protocol->prepare_for_resend();
 
477
    protocol->store(lex_mi->log_file_name, &my_charset_bin);
 
478
    protocol->store((ulonglong) lex_mi->pos);
 
479
    if (protocol->write())
 
480
      DBUG_RETURN(TRUE);
 
481
    my_eof(thd);
 
482
    DBUG_RETURN(FALSE);
 
483
  }
 
484
}
 
485
 
 
486
/**
 
487
  Asks the master for the list of its other connected slaves.
 
488
 
 
489
  This is for failsafe replication:
 
490
  in order for failsafe replication to work, the servers involved in
 
491
  replication must know of each other. We accomplish this by having each
 
492
  slave report to the master how to reach it, and on connection, each
 
493
  slave receives information about where the other slaves are.
 
494
 
 
495
  @param mysql           pre-existing connection to the master
 
496
  @param mi              master info
 
497
 
 
498
  @note
 
499
    mi is used only to give detailed error messages which include the
 
500
    hostname/port of the master, the username used by the slave to connect to
 
501
    the master.
 
502
    If the user used by the slave to connect to the master does not have the
 
503
    REPLICATION SLAVE privilege, it will pop in this function because
 
504
    SHOW SLAVE HOSTS will fail on the master.
 
505
 
 
506
  @retval
 
507
    1           error
 
508
  @retval
 
509
    0           success
 
510
*/
 
511
 
 
512
int update_slave_list(MYSQL* mysql, Master_info* mi)
 
513
{
 
514
  MYSQL_RES* res=0;
 
515
  MYSQL_ROW row;
 
516
  const char* error=0;
 
517
  bool have_auth_info;
 
518
  int port_ind;
 
519
  DBUG_ENTER("update_slave_list");
 
520
 
 
521
  if (mysql_real_query(mysql, STRING_WITH_LEN("SHOW SLAVE HOSTS")) ||
 
522
      !(res = mysql_store_result(mysql)))
 
523
  {
 
524
    error= mysql_error(mysql);
 
525
    goto err;
 
526
  }
 
527
 
 
528
  switch (mysql_num_fields(res)) {
 
529
  case 5:
 
530
    have_auth_info = 0;
 
531
    port_ind=2;
 
532
    break;
 
533
  case 7:
 
534
    have_auth_info = 1;
 
535
    port_ind=4;
 
536
    break;
 
537
  default:
 
538
    error= "the master returned an invalid number of fields for SHOW SLAVE \
 
539
HOSTS";
 
540
    goto err;
 
541
  }
 
542
 
 
543
  pthread_mutex_lock(&LOCK_slave_list);
 
544
 
 
545
  while ((row= mysql_fetch_row(res)))
 
546
  {
 
547
    uint32 log_server_id;
 
548
    SLAVE_INFO* si, *old_si;
 
549
    log_server_id = atoi(row[0]);
 
550
    if ((old_si= (SLAVE_INFO*)hash_search(&slave_list,
 
551
                                          (uchar*)&log_server_id,4)))
 
552
      si = old_si;
 
553
    else
 
554
    {
 
555
      if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
 
556
      {
 
557
        error= "the slave is out of memory";
 
558
        pthread_mutex_unlock(&LOCK_slave_list);
 
559
        goto err;
 
560
      }
 
561
      si->server_id = log_server_id;
 
562
      if (my_hash_insert(&slave_list, (uchar*)si))
 
563
      {
 
564
        error= "the slave is out of memory";
 
565
        pthread_mutex_unlock(&LOCK_slave_list);
 
566
        goto err;
 
567
      }
 
568
    }
 
569
    strmake(si->host, row[1], sizeof(si->host)-1);
 
570
    si->port = atoi(row[port_ind]);
 
571
    si->rpl_recovery_rank = atoi(row[port_ind+1]);
 
572
    si->master_id = atoi(row[port_ind+2]);
 
573
    if (have_auth_info)
 
574
    {
 
575
      strmake(si->user, row[2], sizeof(si->user)-1);
 
576
      strmake(si->password, row[3], sizeof(si->password)-1);
 
577
    }
 
578
  }
 
579
  pthread_mutex_unlock(&LOCK_slave_list);
 
580
 
 
581
err:
 
582
  if (res)
 
583
    mysql_free_result(res);
 
584
  if (error)
 
585
  {
 
586
    sql_print_error("While trying to obtain the list of slaves from the master "
 
587
                    "'%s:%d', user '%s' got the following error: '%s'", 
 
588
                    mi->host, mi->port, mi->user, error);
 
589
    DBUG_RETURN(1);
 
590
  }
 
591
  DBUG_RETURN(0);
 
592
}
 
593
 
 
594
 
 
595
#if NOT_USED
 
596
int find_recovery_captain(THD* thd, MYSQL* mysql)
 
597
{
 
598
  return 0;
 
599
}
 
600
#endif
 
601
 
 
602
#if NOT_USED
 
603
pthread_handler_t handle_failsafe_rpl(void *arg)
 
604
{
 
605
  DBUG_ENTER("handle_failsafe_rpl");
 
606
  THD *thd = new THD;
 
607
  thd->thread_stack = (char*)&thd;
 
608
  MYSQL* recovery_captain = 0;
 
609
  const char* msg;
 
610
 
 
611
  pthread_detach_this_thread();
 
612
  if (init_failsafe_rpl_thread(thd) || !(recovery_captain=mysql_init(0)))
 
613
  {
 
614
    sql_print_error("Could not initialize failsafe replication thread");
 
615
    goto err;
 
616
  }
 
617
  pthread_mutex_lock(&LOCK_rpl_status);
 
618
  msg= thd->enter_cond(&COND_rpl_status,
 
619
                       &LOCK_rpl_status, "Waiting for request");
 
620
  while (!thd->killed && !abort_loop)
 
621
  {
 
622
    bool break_req_chain = 0;
 
623
    pthread_cond_wait(&COND_rpl_status, &LOCK_rpl_status);
 
624
    thd_proc_info(thd, "Processing request");
 
625
    while (!break_req_chain)
 
626
    {
 
627
      switch (rpl_status) {
 
628
      case RPL_LOST_SOLDIER:
 
629
        if (find_recovery_captain(thd, recovery_captain))
 
630
          rpl_status=RPL_TROOP_SOLDIER;
 
631
        else
 
632
          rpl_status=RPL_RECOVERY_CAPTAIN;
 
633
        break_req_chain=1; /* for now until other states are implemented */
 
634
        break;
 
635
      default:
 
636
        break_req_chain=1;
 
637
        break;
 
638
      }
 
639
    }
 
640
  }
 
641
  thd->exit_cond(msg);
 
642
err:
 
643
  if (recovery_captain)
 
644
    mysql_close(recovery_captain);
 
645
  delete thd;
 
646
 
 
647
  DBUG_LEAVE;                                   // Must match DBUG_ENTER()
 
648
  my_thread_end();
 
649
  pthread_exit(0);
 
650
  return 0;                                     // Avoid compiler warnings
 
651
}
 
652
#endif
 
653
 
 
654
 
 
655
/**
 
656
  Execute a SHOW SLAVE HOSTS statement.
 
657
 
 
658
  @param thd Pointer to THD object for the client thread executing the
 
659
  statement.
 
660
 
 
661
  @retval FALSE success
 
662
  @retval TRUE failure
 
663
*/
 
664
bool show_slave_hosts(THD* thd)
 
665
{
 
666
  List<Item> field_list;
 
667
  Protocol *protocol= thd->protocol;
 
668
  DBUG_ENTER("show_slave_hosts");
 
669
 
 
670
  field_list.push_back(new Item_return_int("Server_id", 10,
 
671
                                           MYSQL_TYPE_LONG));
 
672
  field_list.push_back(new Item_empty_string("Host", 20));
 
673
  if (opt_show_slave_auth_info)
 
674
  {
 
675
    field_list.push_back(new Item_empty_string("User",20));
 
676
    field_list.push_back(new Item_empty_string("Password",20));
 
677
  }
 
678
  field_list.push_back(new Item_return_int("Port", 7, MYSQL_TYPE_LONG));
 
679
  field_list.push_back(new Item_return_int("Rpl_recovery_rank", 7,
 
680
                                           MYSQL_TYPE_LONG));
 
681
  field_list.push_back(new Item_return_int("Master_id", 10,
 
682
                                           MYSQL_TYPE_LONG));
 
683
 
 
684
  if (protocol->send_fields(&field_list,
 
685
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
686
    DBUG_RETURN(TRUE);
 
687
 
 
688
  pthread_mutex_lock(&LOCK_slave_list);
 
689
 
 
690
  for (uint i = 0; i < slave_list.records; ++i)
 
691
  {
 
692
    SLAVE_INFO* si = (SLAVE_INFO*) hash_element(&slave_list, i);
 
693
    protocol->prepare_for_resend();
 
694
    protocol->store((uint32) si->server_id);
 
695
    protocol->store(si->host, &my_charset_bin);
 
696
    if (opt_show_slave_auth_info)
 
697
    {
 
698
      protocol->store(si->user, &my_charset_bin);
 
699
      protocol->store(si->password, &my_charset_bin);
 
700
    }
 
701
    protocol->store((uint32) si->port);
 
702
    protocol->store((uint32) si->rpl_recovery_rank);
 
703
    protocol->store((uint32) si->master_id);
 
704
    if (protocol->write())
 
705
    {
 
706
      pthread_mutex_unlock(&LOCK_slave_list);
 
707
      DBUG_RETURN(TRUE);
 
708
    }
 
709
  }
 
710
  pthread_mutex_unlock(&LOCK_slave_list);
 
711
  my_eof(thd);
 
712
  DBUG_RETURN(FALSE);
 
713
}
 
714
 
 
715
 
 
716
int connect_to_master(THD *thd, MYSQL* mysql, Master_info* mi)
 
717
{
 
718
  DBUG_ENTER("connect_to_master");
 
719
 
 
720
  if (!mi->host || !*mi->host)                  /* empty host */
 
721
  {
 
722
    strmov(mysql->net.last_error, "Master is not configured");
 
723
    DBUG_RETURN(1);
 
724
  }
 
725
  mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
 
726
  mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
 
727
 
 
728
#ifdef HAVE_OPENSSL
 
729
  if (mi->ssl)
 
730
  {
 
731
    mysql_ssl_set(mysql, 
 
732
        mi->ssl_key[0]?mi->ssl_key:0,
 
733
        mi->ssl_cert[0]?mi->ssl_cert:0,
 
734
        mi->ssl_ca[0]?mi->ssl_ca:0, 
 
735
        mi->ssl_capath[0]?mi->ssl_capath:0,
 
736
        mi->ssl_cipher[0]?mi->ssl_cipher:0);
 
737
    mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
 
738
                  &mi->ssl_verify_server_cert);
 
739
  }
 
740
#endif
 
741
    
 
742
  mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
 
743
  mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
 
744
  if (!mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
 
745
                        mi->port, 0, 0))
 
746
    DBUG_RETURN(1);
 
747
  mysql->reconnect= 1;
 
748
  DBUG_RETURN(0);
 
749
}
 
750
 
 
751
 
 
752
static inline void cleanup_mysql_results(MYSQL_RES* db_res,
 
753
                                         MYSQL_RES** cur, MYSQL_RES** start)
 
754
{
 
755
  for (; cur >= start; --cur)
 
756
  {
 
757
    if (*cur)
 
758
      mysql_free_result(*cur);
 
759
  }
 
760
  mysql_free_result(db_res);
 
761
}
 
762
 
 
763
 
 
764
static int fetch_db_tables(THD *thd, MYSQL *mysql, const char *db,
 
765
                           MYSQL_RES *table_res, Master_info *mi)
 
766
{
 
767
  MYSQL_ROW row;
 
768
  for (row = mysql_fetch_row(table_res); row;
 
769
       row = mysql_fetch_row(table_res))
 
770
  {
 
771
    TABLE_LIST table;
 
772
    const char* table_name= row[0];
 
773
    int error;
 
774
    if (rpl_filter->is_on())
 
775
    {
 
776
      bzero((char*) &table, sizeof(table)); //just for safe
 
777
      table.db= (char*) db;
 
778
      table.table_name= (char*) table_name;
 
779
      table.updating= 1;
 
780
 
 
781
      if (!rpl_filter->tables_ok(thd->db, &table))
 
782
        continue;
 
783
    }
 
784
    /* download master's table and overwrite slave's table */
 
785
    if ((error= fetch_master_table(thd, db, table_name, mi, mysql, 1)))
 
786
      return error;
 
787
  }
 
788
  return 0;
 
789
}
 
790
 
 
791
/**
 
792
  Load all MyISAM tables from master to this slave.
 
793
 
 
794
  REQUIREMENTS
 
795
    - No active transaction (flush_relay_log_info would not work in this case).
 
796
 
 
797
  @todo
 
798
    - add special option, not enabled
 
799
    by default, to allow inclusion of mysql database into load
 
800
    data from master
 
801
*/
 
802
 
 
803
bool load_master_data(THD* thd)
 
804
{
 
805
  MYSQL mysql;
 
806
  MYSQL_RES* master_status_res = 0;
 
807
  int error = 0;
 
808
  const char* errmsg=0;
 
809
  int restart_thread_mask;
 
810
  HA_CREATE_INFO create_info;
 
811
 
 
812
  mysql_init(&mysql);
 
813
 
 
814
  /*
 
815
    We do not want anyone messing with the slave at all for the entire
 
816
    duration of the data load.
 
817
  */
 
818
  pthread_mutex_lock(&LOCK_active_mi);
 
819
  lock_slave_threads(active_mi);
 
820
  init_thread_mask(&restart_thread_mask,active_mi,0 /*not inverse*/);
 
821
  if (restart_thread_mask &&
 
822
      (error=terminate_slave_threads(active_mi,restart_thread_mask,
 
823
                                     1 /*skip lock*/)))
 
824
  {
 
825
    my_message(error, ER(error), MYF(0));
 
826
    unlock_slave_threads(active_mi);
 
827
    pthread_mutex_unlock(&LOCK_active_mi);
 
828
    return TRUE;
 
829
  }
 
830
  
 
831
  if (connect_to_master(thd, &mysql, active_mi))
 
832
  {
 
833
    my_error(error= ER_CONNECT_TO_MASTER, MYF(0), mysql_error(&mysql));
 
834
    goto err;
 
835
  }
 
836
 
 
837
  // now that we are connected, get all database and tables in each
 
838
  {
 
839
    MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res;
 
840
    uint num_dbs;
 
841
 
 
842
    if (mysql_real_query(&mysql, STRING_WITH_LEN("SHOW DATABASES")) ||
 
843
        !(db_res = mysql_store_result(&mysql)))
 
844
    {
 
845
      my_error(error= ER_QUERY_ON_MASTER, MYF(0), mysql_error(&mysql));
 
846
      goto err;
 
847
    }
 
848
 
 
849
    if (!(num_dbs = (uint) mysql_num_rows(db_res)))
 
850
      goto err;
 
851
    /*
 
852
      In theory, the master could have no databases at all
 
853
      and run with skip-grant
 
854
    */
 
855
 
 
856
    if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
 
857
    {
 
858
      my_message(error = ER_OUTOFMEMORY, ER(ER_OUTOFMEMORY), MYF(0));
 
859
      goto err;
 
860
    }
 
861
 
 
862
    /*
 
863
      This is a temporary solution until we have online backup
 
864
      capabilities - to be replaced once online backup is working
 
865
      we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
 
866
      can to minimize the lock time.
 
867
    */
 
868
    if (mysql_real_query(&mysql,
 
869
                         STRING_WITH_LEN("FLUSH TABLES WITH READ LOCK")) ||
 
870
        mysql_real_query(&mysql, STRING_WITH_LEN("SHOW MASTER STATUS")) ||
 
871
        !(master_status_res = mysql_store_result(&mysql)))
 
872
    {
 
873
      my_error(error= ER_QUERY_ON_MASTER, MYF(0), mysql_error(&mysql));
 
874
      goto err;
 
875
    }
 
876
 
 
877
    /*
 
878
      Go through every table in every database, and if the replication
 
879
      rules allow replicating it, get it
 
880
    */
 
881
 
 
882
    table_res_end = table_res + num_dbs;
 
883
 
 
884
    for (cur_table_res = table_res; cur_table_res < table_res_end;
 
885
         cur_table_res++)
 
886
    {
 
887
      // since we know how many rows we have, this can never be NULL
 
888
      MYSQL_ROW row = mysql_fetch_row(db_res);
 
889
      char* db = row[0];
 
890
 
 
891
      /*
 
892
        Do not replicate databases excluded by rules. We also test
 
893
        replicate_wild_*_table rules (replicate_wild_ignore_table='db1.%' will
 
894
        be considered as "ignore the 'db1' database as a whole, as it already
 
895
        works for CREATE DATABASE and DROP DATABASE).
 
896
        Also skip 'mysql' database - in most cases the user will
 
897
        mess up and not exclude mysql database with the rules when
 
898
        he actually means to - in this case, he is up for a surprise if
 
899
        his priv tables get dropped and downloaded from master
 
900
        TODO - add special option, not enabled
 
901
        by default, to allow inclusion of mysql database into load
 
902
        data from master
 
903
      */
 
904
 
 
905
      if (!rpl_filter->db_ok(db) || 
 
906
          !rpl_filter->db_ok_with_wild_table(db) || 
 
907
          !strcmp(db,"mysql") ||
 
908
          is_schema_db(db))
 
909
      {
 
910
        *cur_table_res = 0;
 
911
        continue;
 
912
      }
 
913
 
 
914
      bzero((char*) &create_info, sizeof(create_info));
 
915
      create_info.options= HA_LEX_CREATE_IF_NOT_EXISTS;
 
916
 
 
917
      if (mysql_create_db(thd, db, &create_info, 1))
 
918
      {
 
919
        cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
 
920
        goto err;
 
921
      }
 
922
      /* Clear the result of mysql_create_db(). */
 
923
      thd->main_da.reset_diagnostics_area();
 
924
 
 
925
      if (mysql_select_db(&mysql, db) ||
 
926
          mysql_real_query(&mysql, STRING_WITH_LEN("SHOW TABLES")) ||
 
927
          !(*cur_table_res = mysql_store_result(&mysql)))
 
928
      {
 
929
        my_error(error= ER_QUERY_ON_MASTER, MYF(0), mysql_error(&mysql));
 
930
        cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
 
931
        goto err;
 
932
      }
 
933
 
 
934
      if ((error = fetch_db_tables(thd,&mysql,db,*cur_table_res,active_mi)))
 
935
      {
 
936
        // we do not report the error - fetch_db_tables handles it
 
937
        cleanup_mysql_results(db_res, cur_table_res, table_res);
 
938
        goto err;
 
939
      }
 
940
    }
 
941
 
 
942
    cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
 
943
 
 
944
    // adjust replication coordinates from the master
 
945
    if (master_status_res)
 
946
    {
 
947
      MYSQL_ROW row = mysql_fetch_row(master_status_res);
 
948
 
 
949
      /*
 
950
        We need this check because the master may not be running with
 
951
        log-bin, but it will still allow us to do all the steps
 
952
        of LOAD DATA FROM MASTER - no reason to forbid it, really,
 
953
        although it does not make much sense for the user to do it
 
954
      */
 
955
      if (row && row[0] && row[1])
 
956
      {
 
957
        /*
 
958
          If the slave's master info is not inited, we init it, then we write
 
959
          the new coordinates to it. Must call init_master_info() *before*
 
960
          setting active_mi, because init_master_info() sets active_mi with
 
961
          defaults.
 
962
        */
 
963
        int error_2;
 
964
 
 
965
        if (init_master_info(active_mi, master_info_file, relay_log_info_file, 
 
966
                             0, (SLAVE_IO | SLAVE_SQL)))
 
967
          my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
 
968
        strmake(active_mi->master_log_name, row[0],
 
969
                sizeof(active_mi->master_log_name) -1);
 
970
        active_mi->master_log_pos= my_strtoll10(row[1], (char**) 0, &error_2);
 
971
        /* at least in recent versions, the condition below should be false */
 
972
        if (active_mi->master_log_pos < BIN_LOG_HEADER_SIZE)
 
973
          active_mi->master_log_pos = BIN_LOG_HEADER_SIZE;
 
974
        /*
 
975
          Relay log's IO_CACHE may not be inited (even if we are sure that some
 
976
          host was specified; there could have been a problem when replication
 
977
          started, which led to relay log's IO_CACHE to not be inited.
 
978
        */
 
979
        if (flush_master_info(active_mi, 0))
 
980
          sql_print_error("Failed to flush master info file");
 
981
      }
 
982
      mysql_free_result(master_status_res);
 
983
    }
 
984
 
 
985
    if (mysql_real_query(&mysql, STRING_WITH_LEN("UNLOCK TABLES")))
 
986
    {
 
987
      my_error(error= ER_QUERY_ON_MASTER, MYF(0), mysql_error(&mysql));
 
988
      goto err;
 
989
    }
 
990
  }
 
991
  thd_proc_info(thd, "purging old relay logs");
 
992
  if (purge_relay_logs(&active_mi->rli,thd,
 
993
                       0 /* not only reset, but also reinit */,
 
994
                       &errmsg))
 
995
  {
 
996
    my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
 
997
    unlock_slave_threads(active_mi);
 
998
    pthread_mutex_unlock(&LOCK_active_mi);
 
999
    return TRUE;
 
1000
  }
 
1001
  pthread_mutex_lock(&active_mi->rli.data_lock);
 
1002
  active_mi->rli.group_master_log_pos = active_mi->master_log_pos;
 
1003
  strmake(active_mi->rli.group_master_log_name,active_mi->master_log_name,
 
1004
          sizeof(active_mi->rli.group_master_log_name)-1);
 
1005
  /*
 
1006
     Cancel the previous START SLAVE UNTIL, as the fact to download
 
1007
     a new copy logically makes UNTIL irrelevant.
 
1008
  */
 
1009
  active_mi->rli.clear_until_condition();
 
1010
 
 
1011
  /*
 
1012
    No need to update rli.event* coordinates, they will be when the slave
 
1013
    threads start ; only rli.group* coordinates are necessary here.
 
1014
  */
 
1015
  flush_relay_log_info(&active_mi->rli);
 
1016
  pthread_cond_broadcast(&active_mi->rli.data_cond);
 
1017
  pthread_mutex_unlock(&active_mi->rli.data_lock);
 
1018
  thd_proc_info(thd, "starting slave");
 
1019
  if (restart_thread_mask)
 
1020
  {
 
1021
    error=start_slave_threads(0 /* mutex not needed */,
 
1022
                              1 /* wait for start */,
 
1023
                              active_mi,master_info_file,relay_log_info_file,
 
1024
                              restart_thread_mask);
 
1025
  }
 
1026
 
 
1027
err:
 
1028
  unlock_slave_threads(active_mi);
 
1029
  pthread_mutex_unlock(&LOCK_active_mi);
 
1030
  thd_proc_info(thd, 0);
 
1031
 
 
1032
  mysql_close(&mysql); // safe to call since we always do mysql_init()
 
1033
  if (!error)
 
1034
    my_ok(thd);
 
1035
 
 
1036
  return error;
 
1037
}
 
1038
 
 
1039
#endif /* HAVE_REPLICATION */
 
1040