~posulliv/drizzle/optimizer-style-cleanup

« back to all changes in this revision

Viewing changes to drizzled/replication_services.cc

  • Committer: Padraig O'Sullivan
  • Date: 2010-04-17 01:38:47 UTC
  • mfrom: (1237.9.238 bad-staging)
  • Revision ID: osullivan.padraig@gmail.com-20100417013847-ibjioqsfbmf5yg4g
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
4
 *  Copyright (C) 2008-2009 Sun Microsystems
 
5
 *  Copyright (c) 2009-2010 Jay Pipes <jaypipes@gmail.com>
5
6
 *
6
7
 *  Authors:
7
8
 *
8
 
 *    Jay Pipes <joinfu@sun.com>
 
9
 *    Jay Pipes <jaypipes@gmail.com>
9
10
 *
10
11
 *  This program is free software; you can redistribute it and/or modify
11
12
 *  it under the terms of the GNU General Public License as published by
23
24
 
24
25
/**
25
26
 * @file Server-side utility which is responsible for managing the 
26
 
 * communication between the kernel, replicator plugins, and applier plugins.
27
 
 *
28
 
 * ReplicationServices is a bridge between modules and the kernel, and its
29
 
 * primary function is to take internal events (for instance the start of a 
30
 
 * transaction, the changing of a record, or the rollback of a transaction) 
31
 
 * and construct GPB Messages that are passed to the registered replicator and
32
 
 * applier plugins.
33
 
 *
34
 
 * The reason for this functionality is to encapsulate all communication
35
 
 * between the kernel and the replicator/applier plugins into GPB Messages.
36
 
 * Instead of the plugin having to understand the (often fluidly changing)
37
 
 * mechanics of the kernel, all the plugin needs to understand is the message
38
 
 * format, and GPB messages provide a nice, clear, and versioned format for 
39
 
 * these messages.
40
 
 *
41
 
 * @see /drizzled/message/transaction.proto
42
 
 *
43
 
 * @todo
44
 
 *
45
 
 * We really should store the raw bytes in the messages, not the
46
 
 * String value of the Field.  But, to do that, the
47
 
 * statement_transform library needs first to be updated
48
 
 * to include the transformation code to convert raw
49
 
 * Drizzle-internal Field byte representation into something
50
 
 * plugins can understand.
51
 
 */
 
27
 * communication between the kernel and the replication plugins:
 
28
 *
 
29
 * - TransactionReplicator
 
30
 * - TransactionApplier
 
31
 * - Publisher
 
32
 * - Subscriber
 
33
 *
 
34
 * ReplicationServices is a bridge between replication modules and the kernel,
 
35
 * and its primary function is to  */
52
36
 
53
37
#include "config.h"
54
38
#include "drizzled/replication_services.h"
55
39
#include "drizzled/plugin/transaction_replicator.h"
56
40
#include "drizzled/plugin/transaction_applier.h"
57
41
#include "drizzled/message/transaction.pb.h"
58
 
#include "drizzled/message/table.pb.h"
59
 
#include "drizzled/message/statement_transform.h"
60
42
#include "drizzled/gettext.h"
61
43
#include "drizzled/session.h"
62
44
#include "drizzled/error.h"
63
45
 
 
46
#include <string>
64
47
#include <vector>
 
48
#include <algorithm>
65
49
 
66
50
using namespace std;
67
51
 
68
52
namespace drizzled
69
53
{
70
54
 
71
 
ReplicationServices::ReplicationServices()
 
55
ReplicationServices::ReplicationServices() :
 
56
  is_active(false)
72
57
{
73
 
  is_active= false;
74
58
}
75
59
 
76
 
void ReplicationServices::evaluateActivePlugins()
 
60
void ReplicationServices::normalizeReplicatorName(string &name)
77
61
{
78
 
  /* 
79
 
   * We loop through replicators and appliers, evaluating
80
 
   * whether or not there is at least one active replicator
81
 
   * and one active applier.  If not, we set is_active
82
 
   * to false.
83
 
   */
84
 
  bool tmp_is_active= false;
85
 
 
86
 
  if (replicators.empty() || appliers.empty())
87
 
  {
88
 
    is_active= false;
89
 
    return;
90
 
  }
91
 
 
92
 
  /* 
93
 
   * Determine if any remaining replicators and if those
94
 
   * replicators are active...if not, set is_active
95
 
   * to false
96
 
   */
97
 
  for (Replicators::iterator repl_iter= replicators.begin();
98
 
       repl_iter != replicators.end();
99
 
       ++repl_iter)
100
 
  {
101
 
    if ((*repl_iter)->isEnabled())
 
62
  transform(name.begin(),
 
63
            name.end(),
 
64
            name.begin(),
 
65
            ::tolower);
 
66
  if (name.find("replicator") == string::npos)
 
67
    name.append("replicator", 10);
 
68
  {
 
69
    size_t found_underscore= name.find('_');
 
70
    while (found_underscore != string::npos)
102
71
    {
103
 
      tmp_is_active= true;
104
 
      break;
 
72
      name.erase(found_underscore, 1);
 
73
      found_underscore= name.find('_');
105
74
    }
106
75
  }
107
 
  if (! tmp_is_active)
108
 
  {
109
 
    /* No active replicators. Set is_active to false and exit. */
110
 
    is_active= false;
111
 
    return;
112
 
  }
 
76
}
113
77
 
 
78
bool ReplicationServices::evaluateRegisteredPlugins()
 
79
{
114
80
  /* 
115
 
   * OK, we know there's at least one active replicator.
116
 
   *
117
 
   * Now determine if any remaining replicators and if those
118
 
   * replicators are active...if not, set is_active
119
 
   * to false
 
81
   * We loop through appliers that have registered with us
 
82
   * and attempts to pair the applier with its requested
 
83
   * replicator.  If an applier has requested a replicator
 
84
   * that has either not been built or has not registered
 
85
   * with the replication services, we print an error and
 
86
   * return false
120
87
   */
 
88
  if (appliers.empty())
 
89
    return true;
 
90
 
 
91
  if (replicators.empty() && not appliers.empty())
 
92
  {
 
93
    errmsg_printf(ERRMSG_LVL_ERROR,
 
94
                  N_("You registered a TransactionApplier plugin but no "
 
95
                     "TransactionReplicator plugins were registered.\n"));
 
96
    return false;
 
97
  }
 
98
 
121
99
  for (Appliers::iterator appl_iter= appliers.begin();
122
100
       appl_iter != appliers.end();
123
101
       ++appl_iter)
124
102
  {
125
 
    if ((*appl_iter)->isEnabled())
126
 
    {
127
 
      is_active= true;
128
 
      return;
 
103
    plugin::TransactionApplier *applier= (*appl_iter).second;
 
104
    string requested_replicator_name= (*appl_iter).first;
 
105
    normalizeReplicatorName(requested_replicator_name);
 
106
 
 
107
    bool found= false;
 
108
    Replicators::iterator repl_iter;
 
109
    for (repl_iter= replicators.begin();
 
110
         repl_iter != replicators.end();
 
111
         ++repl_iter)
 
112
    {
 
113
      string replicator_name= (*repl_iter)->getName();
 
114
      normalizeReplicatorName(replicator_name);
 
115
 
 
116
      if (requested_replicator_name.compare(replicator_name) == 0)
 
117
      {
 
118
        found= true;
 
119
        break;
 
120
      }
 
121
    }
 
122
    if (not found)
 
123
    {
 
124
      errmsg_printf(ERRMSG_LVL_ERROR,
 
125
                    N_("You registered a TransactionApplier plugin but no "
 
126
                       "TransactionReplicator plugins were registered that match the "
 
127
                       "requested replicator name of '%s'.\n"
 
128
                       "We have deactivated the TransactionApplier '%s'.\n"),
 
129
                       requested_replicator_name.c_str(),
 
130
                       applier->getName().c_str());
 
131
      applier->deactivate();
 
132
      return false;
 
133
    }
 
134
    else
 
135
    {
 
136
      replication_streams.push_back(make_pair(*repl_iter, applier));
129
137
    }
130
138
  }
131
 
  /* If we get here, there are no active appliers */
132
 
  is_active= false;
 
139
  is_active= true;
 
140
  return true;
133
141
}
134
142
 
135
143
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
136
144
{
137
145
  replicators.push_back(in_replicator);
138
 
  evaluateActivePlugins();
139
146
}
140
147
 
141
148
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
142
149
{
143
150
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
144
 
  evaluateActivePlugins();
145
 
}
146
 
 
147
 
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier)
148
 
{
149
 
  appliers.push_back(in_applier);
150
 
  evaluateActivePlugins();
151
 
}
152
 
 
153
 
void ReplicationServices::detachApplier(plugin::TransactionApplier *in_applier)
154
 
{
155
 
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
156
 
  evaluateActivePlugins();
 
151
}
 
152
 
 
153
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
 
154
{
 
155
  appliers.push_back(make_pair(requested_replicator_name, in_applier));
 
156
}
 
157
 
 
158
void ReplicationServices::detachApplier(plugin::TransactionApplier *)
 
159
{
157
160
}
158
161
 
159
162
bool ReplicationServices::isActive() const
161
164
  return is_active;
162
165
}
163
166
 
164
 
message::Transaction *ReplicationServices::getActiveTransaction(Session *in_session) const
165
 
{
166
 
  message::Transaction *transaction= in_session->getTransactionMessage();
167
 
 
168
 
  if (unlikely(transaction == NULL))
169
 
  {
170
 
    /* 
171
 
     * Allocate and initialize a new transaction message 
172
 
     * for this Session object.  Session is responsible for
173
 
     * deleting transaction message when done with it.
174
 
     */
175
 
    transaction= new (nothrow) message::Transaction();
176
 
    initTransaction(*transaction, in_session);
177
 
    in_session->setTransactionMessage(transaction);
178
 
    return transaction;
179
 
  }
180
 
  else
181
 
    return transaction;
182
 
}
183
 
 
184
 
void ReplicationServices::initTransaction(message::Transaction &in_transaction,
185
 
                                          Session *in_session) const
186
 
{
187
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
188
 
  trx->set_server_id(in_session->getServerId());
189
 
  trx->set_transaction_id(in_session->getQueryId());
190
 
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
191
 
}
192
 
 
193
 
void ReplicationServices::finalizeTransaction(message::Transaction &in_transaction,
194
 
                                              Session *in_session) const
195
 
{
196
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
197
 
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
198
 
}
199
 
 
200
 
void ReplicationServices::cleanupTransaction(message::Transaction *in_transaction,
201
 
                                             Session *in_session) const
202
 
{
203
 
  delete in_transaction;
204
 
  in_session->setStatementMessage(NULL);
205
 
  in_session->setTransactionMessage(NULL);
206
 
}
207
 
 
208
 
bool ReplicationServices::transactionContainsBulkSegment(const message::Transaction &transaction) const
209
 
{
210
 
  size_t num_statements= transaction.statement_size();
211
 
  if (num_statements == 0)
212
 
    return false;
213
 
 
214
 
  /*
215
 
   * Only INSERT, UPDATE, and DELETE statements can possibly
216
 
   * have bulk segments.  So, we loop through the statements
217
 
   * checking for segment_id > 1 in those specific submessages.
218
 
   */
219
 
  size_t x;
220
 
  for (x= 0; x < num_statements; ++x)
221
 
  {
222
 
    const message::Statement &statement= transaction.statement(x);
223
 
    message::Statement::Type type= statement.type();
224
 
 
225
 
    switch (type)
226
 
    {
227
 
      case message::Statement::INSERT:
228
 
        if (statement.insert_data().segment_id() > 1)
229
 
          return true;
230
 
        break;
231
 
      case message::Statement::UPDATE:
232
 
        if (statement.update_data().segment_id() > 1)
233
 
          return true;
234
 
        break;
235
 
      case message::Statement::DELETE:
236
 
        if (statement.delete_data().segment_id() > 1)
237
 
          return true;
238
 
        break;
239
 
      default:
240
 
        break;
241
 
    }
242
 
  }
243
 
  return false;
244
 
}
245
 
void ReplicationServices::commitTransaction(Session *in_session)
246
 
{
247
 
  if (! is_active)
248
 
    return;
249
 
 
250
 
  /* If there is an active statement message, finalize it */
251
 
  message::Statement *statement= in_session->getStatementMessage();
252
 
 
253
 
  if (statement != NULL)
254
 
  {
255
 
    finalizeStatement(*statement, in_session);
256
 
  }
257
 
  else
258
 
    return; /* No data modification occurred inside the transaction */
259
 
  
260
 
  message::Transaction* transaction= getActiveTransaction(in_session);
261
 
 
262
 
  finalizeTransaction(*transaction, in_session);
263
 
  
264
 
  push(*transaction);
265
 
 
266
 
  cleanupTransaction(transaction, in_session);
267
 
}
268
 
 
269
 
void ReplicationServices::initStatement(message::Statement &statement,
270
 
                                        message::Statement::Type in_type,
271
 
                                        Session *in_session) const
272
 
{
273
 
  statement.set_type(in_type);
274
 
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
275
 
  /** @TODO Set sql string optionally */
276
 
}
277
 
 
278
 
void ReplicationServices::finalizeStatement(message::Statement &statement,
279
 
                                            Session *in_session) const
280
 
{
281
 
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
282
 
  in_session->setStatementMessage(NULL);
283
 
}
284
 
 
285
 
void ReplicationServices::rollbackTransaction(Session *in_session)
286
 
{
287
 
  if (! is_active)
288
 
    return;
289
 
  
290
 
  message::Transaction *transaction= getActiveTransaction(in_session);
291
 
 
292
 
  /*
293
 
   * OK, so there are two situations that we need to deal with here:
294
 
   *
295
 
   * 1) We receive an instruction to ROLLBACK the current transaction
296
 
   *    and the currently-stored Transaction message is *self-contained*, 
297
 
   *    meaning that no Statement messages in the Transaction message
298
 
   *    contain a message having its segment_id member greater than 1.  If
299
 
   *    no non-segment ID 1 members are found, we can simply clear the
300
 
   *    current Transaction message and remove it from memory.
301
 
   *
302
 
   * 2) If the Transaction message does indeed have a non-end segment, that
303
 
   *    means that a bulk update/delete/insert Transaction message segment
304
 
   *    has previously been sent over the wire to replicators.  In this case, 
305
 
   *    we need to package a Transaction with a Statement message of type
306
 
   *    ROLLBACK to indicate to replicators that previously-transmitted
307
 
   *    messages must be un-applied.
308
 
   */
309
 
  if (unlikely(transactionContainsBulkSegment(*transaction)))
310
 
  {
311
 
    /*
312
 
     * Clear the transaction, create a Rollback statement message, 
313
 
     * attach it to the transaction, and push it to replicators.
314
 
     */
315
 
    transaction->Clear();
316
 
    initTransaction(*transaction, in_session);
317
 
 
318
 
    message::Statement *statement= transaction->add_statement();
319
 
 
320
 
    initStatement(*statement, message::Statement::ROLLBACK, in_session);
321
 
    finalizeStatement(*statement, in_session);
322
 
 
323
 
    finalizeTransaction(*transaction, in_session);
324
 
    
325
 
    push(*transaction);
326
 
  }
327
 
  cleanupTransaction(transaction, in_session);
328
 
}
329
 
 
330
 
message::Statement &ReplicationServices::getInsertStatement(Session *in_session,
331
 
                                                                 Table *in_table) const
332
 
{
333
 
  message::Statement *statement= in_session->getStatementMessage();
334
 
  /*
335
 
   * We check to see if the current Statement message is of type INSERT.
336
 
   * If it is not, we finalize the current Statement and ensure a new
337
 
   * InsertStatement is created.
338
 
   */
339
 
  if (statement != NULL &&
340
 
      statement->type() != message::Statement::INSERT)
341
 
  {
342
 
    finalizeStatement(*statement, in_session);
343
 
    statement= in_session->getStatementMessage();
344
 
  }
345
 
 
346
 
  if (statement == NULL)
347
 
  {
348
 
    message::Transaction *transaction= getActiveTransaction(in_session);
349
 
    /* 
350
 
     * Transaction message initialized and set, but no statement created
351
 
     * yet.  We construct one and initialize it, here, then return the
352
 
     * message after attaching the new Statement message pointer to the 
353
 
     * Session for easy retrieval later...
354
 
     */
355
 
    statement= transaction->add_statement();
356
 
    setInsertHeader(*statement, in_session, in_table);
357
 
    in_session->setStatementMessage(statement);
358
 
  }
359
 
  return *statement;
360
 
}
361
 
 
362
 
void ReplicationServices::setInsertHeader(message::Statement &statement,
363
 
                                          Session *in_session,
364
 
                                          Table *in_table) const
365
 
{
366
 
  initStatement(statement, message::Statement::INSERT, in_session);
367
 
 
368
 
  /* 
369
 
   * Now we construct the specialized InsertHeader message inside
370
 
   * the generalized message::Statement container...
371
 
   */
372
 
  /* Set up the insert header */
373
 
  message::InsertHeader *header= statement.mutable_insert_header();
374
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
375
 
 
376
 
  const char *schema_name= in_table->getShare()->db.str;
377
 
  const char *table_name= in_table->getShare()->table_name.str;
378
 
 
379
 
  table_metadata->set_schema_name(schema_name);
380
 
  table_metadata->set_table_name(table_name);
381
 
 
382
 
  Field *current_field;
383
 
  Field **table_fields= in_table->field;
384
 
 
385
 
  message::FieldMetadata *field_metadata;
386
 
 
387
 
  /* We will read all the table's fields... */
388
 
  in_table->setReadSet();
389
 
 
390
 
  while ((current_field= *table_fields++) != NULL) 
391
 
  {
392
 
    field_metadata= header->add_field_metadata();
393
 
    field_metadata->set_name(current_field->field_name);
394
 
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
395
 
  }
396
 
}
397
 
 
398
 
bool ReplicationServices::insertRecord(Session *in_session, Table *in_table)
399
 
{
400
 
  if (! is_active)
401
 
    return false;
402
 
  /**
403
 
   * We do this check here because we don't want to even create a 
404
 
   * statement if there isn't a primary key on the table...
405
 
   *
406
 
   * @todo
407
 
   *
408
 
   * Multi-column primary keys are handled how exactly?
409
 
   */
410
 
  if (in_table->s->primary_key == MAX_KEY)
411
 
  {
412
 
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
413
 
    return true;
414
 
  }
415
 
 
416
 
  message::Statement &statement= getInsertStatement(in_session, in_table);
417
 
 
418
 
  message::InsertData *data= statement.mutable_insert_data();
419
 
  data->set_segment_id(1);
420
 
  data->set_end_segment(true);
421
 
  message::InsertRecord *record= data->add_record();
422
 
 
423
 
  Field *current_field;
424
 
  Field **table_fields= in_table->field;
425
 
 
426
 
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
427
 
  string_value->set_charset(system_charset_info);
428
 
 
429
 
  /* We will read all the table's fields... */
430
 
  in_table->setReadSet();
431
 
 
432
 
  while ((current_field= *table_fields++) != NULL) 
433
 
  {
434
 
    string_value= current_field->val_str(string_value);
435
 
    record->add_insert_value(string_value->c_ptr(), string_value->length());
436
 
    string_value->free();
437
 
  }
438
 
  return false;
439
 
}
440
 
 
441
 
message::Statement &ReplicationServices::getUpdateStatement(Session *in_session,
442
 
                                                            Table *in_table,
443
 
                                                            const unsigned char *old_record, 
444
 
                                                            const unsigned char *new_record) const
445
 
{
446
 
  message::Statement *statement= in_session->getStatementMessage();
447
 
  /*
448
 
   * We check to see if the current Statement message is of type UPDATE.
449
 
   * If it is not, we finalize the current Statement and ensure a new
450
 
   * UpdateStatement is created.
451
 
   */
452
 
  if (statement != NULL &&
453
 
      statement->type() != message::Statement::UPDATE)
454
 
  {
455
 
    finalizeStatement(*statement, in_session);
456
 
    statement= in_session->getStatementMessage();
457
 
  }
458
 
 
459
 
  if (statement == NULL)
460
 
  {
461
 
    message::Transaction *transaction= getActiveTransaction(in_session);
462
 
    /* 
463
 
     * Transaction message initialized and set, but no statement created
464
 
     * yet.  We construct one and initialize it, here, then return the
465
 
     * message after attaching the new Statement message pointer to the 
466
 
     * Session for easy retrieval later...
467
 
     */
468
 
    statement= transaction->add_statement();
469
 
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
470
 
    in_session->setStatementMessage(statement);
471
 
  }
472
 
  return *statement;
473
 
}
474
 
 
475
 
void ReplicationServices::setUpdateHeader(message::Statement &statement,
476
 
                                          Session *in_session,
477
 
                                          Table *in_table,
478
 
                                          const unsigned char *old_record, 
479
 
                                          const unsigned char *new_record) const
480
 
{
481
 
  initStatement(statement, message::Statement::UPDATE, in_session);
482
 
 
483
 
  /* 
484
 
   * Now we construct the specialized UpdateHeader message inside
485
 
   * the generalized message::Statement container...
486
 
   */
487
 
  /* Set up the update header */
488
 
  message::UpdateHeader *header= statement.mutable_update_header();
489
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
490
 
 
491
 
  const char *schema_name= in_table->getShare()->db.str;
492
 
  const char *table_name= in_table->getShare()->table_name.str;
493
 
 
494
 
  table_metadata->set_schema_name(schema_name);
495
 
  table_metadata->set_table_name(table_name);
496
 
 
497
 
  Field *current_field;
498
 
  Field **table_fields= in_table->field;
499
 
 
500
 
  message::FieldMetadata *field_metadata;
501
 
 
502
 
  /* We will read all the table's fields... */
503
 
  in_table->setReadSet();
504
 
 
505
 
  while ((current_field= *table_fields++) != NULL) 
506
 
  {
507
 
    /*
508
 
     * We add the "key field metadata" -- i.e. the fields which is
509
 
     * the primary key for the table.
510
 
     */
511
 
    if (in_table->s->fieldInPrimaryKey(current_field))
512
 
    {
513
 
      field_metadata= header->add_key_field_metadata();
514
 
      field_metadata->set_name(current_field->field_name);
515
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
516
 
    }
517
 
 
518
 
    /*
519
 
     * The below really should be moved into the Field API and Record API.  But for now
520
 
     * we do this crazy pointer fiddling to figure out if the current field
521
 
     * has been updated in the supplied record raw byte pointers.
522
 
     */
523
 
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
524
 
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
525
 
 
526
 
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
527
 
 
528
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
529
 
    {
530
 
      /* Field is changed from old to new */
531
 
      field_metadata= header->add_set_field_metadata();
532
 
      field_metadata->set_name(current_field->field_name);
533
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
534
 
    }
535
 
  }
536
 
}
537
 
void ReplicationServices::updateRecord(Session *in_session,
538
 
                                       Table *in_table, 
539
 
                                       const unsigned char *old_record, 
540
 
                                       const unsigned char *new_record)
541
 
{
542
 
  if (! is_active)
543
 
    return;
544
 
 
545
 
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
546
 
 
547
 
  message::UpdateData *data= statement.mutable_update_data();
548
 
  data->set_segment_id(1);
549
 
  data->set_end_segment(true);
550
 
  message::UpdateRecord *record= data->add_record();
551
 
 
552
 
  Field *current_field;
553
 
  Field **table_fields= in_table->field;
554
 
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
555
 
  string_value->set_charset(system_charset_info);
556
 
 
557
 
  while ((current_field= *table_fields++) != NULL) 
558
 
  {
559
 
    /*
560
 
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
561
 
     * but then realized that an UPDATE statement could potentially have different values for
562
 
     * the SET field.  For instance, imagine this SQL scenario:
563
 
     *
564
 
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
565
 
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
566
 
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
567
 
     *
568
 
     * We will generate two UpdateRecord messages with different set_value byte arrays.
569
 
     *
570
 
     * The below really should be moved into the Field API and Record API.  But for now
571
 
     * we do this crazy pointer fiddling to figure out if the current field
572
 
     * has been updated in the supplied record raw byte pointers.
573
 
     */
574
 
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
575
 
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
576
 
 
577
 
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
578
 
 
579
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
580
 
    {
581
 
      /* Store the original "read bit" for this field */
582
 
      bool is_read_set= current_field->isReadSet();
583
 
 
584
 
      /* We need to mark that we will "read" this field... */
585
 
      in_table->setReadSet(current_field->field_index);
586
 
 
587
 
      /* Read the string value of this field's contents */
588
 
      string_value= current_field->val_str(string_value);
589
 
 
590
 
      /* 
591
 
       * Reset the read bit after reading field to its original state.  This 
592
 
       * prevents the field from being included in the WHERE clause
593
 
       */
594
 
      current_field->setReadSet(is_read_set);
595
 
 
596
 
      record->add_after_value(string_value->c_ptr(), string_value->length());
597
 
      string_value->free();
598
 
    }
599
 
 
600
 
    /* 
601
 
     * Add the WHERE clause values now...for now, this means the
602
 
     * primary key field value.  Replication only supports tables
603
 
     * with a primary key.
604
 
     */
605
 
    if (in_table->s->fieldInPrimaryKey(current_field))
606
 
    {
607
 
      /**
608
 
       * To say the below is ugly is an understatement. But it works.
609
 
       * 
610
 
       * @todo Move this crap into a real Record API.
611
 
       */
612
 
      string_value= current_field->val_str(string_value,
613
 
                                           old_record + 
614
 
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
615
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
616
 
      string_value->free();
617
 
    }
618
 
 
619
 
  }
620
 
}
621
 
 
622
 
message::Statement &ReplicationServices::getDeleteStatement(Session *in_session,
623
 
                                                            Table *in_table) const
624
 
{
625
 
  message::Statement *statement= in_session->getStatementMessage();
626
 
  /*
627
 
   * We check to see if the current Statement message is of type DELETE.
628
 
   * If it is not, we finalize the current Statement and ensure a new
629
 
   * DeleteStatement is created.
630
 
   */
631
 
  if (statement != NULL &&
632
 
      statement->type() != message::Statement::DELETE)
633
 
  {
634
 
    finalizeStatement(*statement, in_session);
635
 
    statement= in_session->getStatementMessage();
636
 
  }
637
 
 
638
 
  if (statement == NULL)
639
 
  {
640
 
    message::Transaction *transaction= getActiveTransaction(in_session);
641
 
    /* 
642
 
     * Transaction message initialized and set, but no statement created
643
 
     * yet.  We construct one and initialize it, here, then return the
644
 
     * message after attaching the new Statement message pointer to the 
645
 
     * Session for easy retrieval later...
646
 
     */
647
 
    statement= transaction->add_statement();
648
 
    setDeleteHeader(*statement, in_session, in_table);
649
 
    in_session->setStatementMessage(statement);
650
 
  }
651
 
  return *statement;
652
 
}
653
 
 
654
 
void ReplicationServices::setDeleteHeader(message::Statement &statement,
655
 
                                          Session *in_session,
656
 
                                          Table *in_table) const
657
 
{
658
 
  initStatement(statement, message::Statement::DELETE, in_session);
659
 
 
660
 
  /* 
661
 
   * Now we construct the specialized DeleteHeader message inside
662
 
   * the generalized message::Statement container...
663
 
   */
664
 
  message::DeleteHeader *header= statement.mutable_delete_header();
665
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
666
 
 
667
 
  const char *schema_name= in_table->getShare()->db.str;
668
 
  const char *table_name= in_table->getShare()->table_name.str;
669
 
 
670
 
  table_metadata->set_schema_name(schema_name);
671
 
  table_metadata->set_table_name(table_name);
672
 
 
673
 
  Field *current_field;
674
 
  Field **table_fields= in_table->field;
675
 
 
676
 
  message::FieldMetadata *field_metadata;
677
 
 
678
 
  while ((current_field= *table_fields++) != NULL) 
679
 
  {
680
 
    /* 
681
 
     * Add the WHERE clause values now...for now, this means the
682
 
     * primary key field value.  Replication only supports tables
683
 
     * with a primary key.
684
 
     */
685
 
    if (in_table->s->fieldInPrimaryKey(current_field))
686
 
    {
687
 
      field_metadata= header->add_key_field_metadata();
688
 
      field_metadata->set_name(current_field->field_name);
689
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
690
 
    }
691
 
  }
692
 
}
693
 
 
694
 
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
695
 
{
696
 
  if (! is_active)
697
 
    return;
698
 
 
699
 
  message::Statement &statement= getDeleteStatement(in_session, in_table);
700
 
 
701
 
  message::DeleteData *data= statement.mutable_delete_data();
702
 
  data->set_segment_id(1);
703
 
  data->set_end_segment(true);
704
 
  message::DeleteRecord *record= data->add_record();
705
 
 
706
 
  Field *current_field;
707
 
  Field **table_fields= in_table->field;
708
 
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
709
 
  string_value->set_charset(system_charset_info);
710
 
 
711
 
  while ((current_field= *table_fields++) != NULL) 
712
 
  {
713
 
    /* 
714
 
     * Add the WHERE clause values now...for now, this means the
715
 
     * primary key field value.  Replication only supports tables
716
 
     * with a primary key.
717
 
     */
718
 
    if (in_table->s->fieldInPrimaryKey(current_field))
719
 
    {
720
 
      string_value= current_field->val_str(string_value);
721
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
722
 
      /**
723
 
       * @TODO Store optional old record value in the before data member
724
 
       */
725
 
      string_value->free();
726
 
    }
727
 
  }
728
 
}
729
 
 
730
 
void ReplicationServices::createTable(Session *in_session,
731
 
                                      const message::Table &table)
732
 
{
733
 
  if (! is_active)
734
 
    return;
735
 
  
736
 
  message::Transaction *transaction= getActiveTransaction(in_session);
737
 
  message::Statement *statement= transaction->add_statement();
738
 
 
739
 
  initStatement(*statement, message::Statement::CREATE_TABLE, in_session);
740
 
 
741
 
  /* 
742
 
   * Construct the specialized CreateTableStatement message and attach
743
 
   * it to the generic Statement message
744
 
   */
745
 
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
746
 
  message::Table *new_table_message= create_table_statement->mutable_table();
747
 
  *new_table_message= table;
748
 
 
749
 
  finalizeStatement(*statement, in_session);
750
 
 
751
 
  finalizeTransaction(*transaction, in_session);
752
 
  
753
 
  push(*transaction);
754
 
 
755
 
  cleanupTransaction(transaction, in_session);
756
 
 
757
 
}
758
 
 
759
 
void ReplicationServices::createSchema(Session *in_session,
760
 
                                       const message::Schema &schema)
761
 
{
762
 
  if (! is_active)
763
 
    return;
764
 
  
765
 
  message::Transaction *transaction= getActiveTransaction(in_session);
766
 
  message::Statement *statement= transaction->add_statement();
767
 
 
768
 
  initStatement(*statement, message::Statement::CREATE_SCHEMA, in_session);
769
 
 
770
 
  /* 
771
 
   * Construct the specialized CreateSchemaStatement message and attach
772
 
   * it to the generic Statement message
773
 
   */
774
 
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
775
 
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
776
 
  *new_schema_message= schema;
777
 
 
778
 
  finalizeStatement(*statement, in_session);
779
 
 
780
 
  finalizeTransaction(*transaction, in_session);
781
 
  
782
 
  push(*transaction);
783
 
 
784
 
  cleanupTransaction(transaction, in_session);
785
 
 
786
 
}
787
 
 
788
 
void ReplicationServices::dropSchema(Session *in_session, const string &schema_name)
789
 
{
790
 
  if (! is_active)
791
 
    return;
792
 
  
793
 
  message::Transaction *transaction= getActiveTransaction(in_session);
794
 
  message::Statement *statement= transaction->add_statement();
795
 
 
796
 
  initStatement(*statement, message::Statement::DROP_SCHEMA, in_session);
797
 
 
798
 
  /* 
799
 
   * Construct the specialized DropSchemaStatement message and attach
800
 
   * it to the generic Statement message
801
 
   */
802
 
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
803
 
 
804
 
  drop_schema_statement->set_schema_name(schema_name);
805
 
 
806
 
  finalizeStatement(*statement, in_session);
807
 
 
808
 
  finalizeTransaction(*transaction, in_session);
809
 
  
810
 
  push(*transaction);
811
 
 
812
 
  cleanupTransaction(transaction, in_session);
813
 
}
814
 
 
815
 
void ReplicationServices::dropTable(Session *in_session,
816
 
                                    const string &schema_name,
817
 
                                    const string &table_name,
818
 
                                    bool if_exists)
819
 
{
820
 
  if (! is_active)
821
 
    return;
822
 
  
823
 
  message::Transaction *transaction= getActiveTransaction(in_session);
824
 
  message::Statement *statement= transaction->add_statement();
825
 
 
826
 
  initStatement(*statement, message::Statement::DROP_TABLE, in_session);
827
 
 
828
 
  /* 
829
 
   * Construct the specialized DropTableStatement message and attach
830
 
   * it to the generic Statement message
831
 
   */
832
 
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
833
 
 
834
 
  drop_table_statement->set_if_exists_clause(if_exists);
835
 
 
836
 
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
837
 
 
838
 
  table_metadata->set_schema_name(schema_name);
839
 
  table_metadata->set_table_name(table_name);
840
 
 
841
 
  finalizeStatement(*statement, in_session);
842
 
 
843
 
  finalizeTransaction(*transaction, in_session);
844
 
  
845
 
  push(*transaction);
846
 
 
847
 
  cleanupTransaction(transaction, in_session);
848
 
}
849
 
 
850
 
void ReplicationServices::truncateTable(Session *in_session, Table *in_table)
851
 
{
852
 
  if (! is_active)
853
 
    return;
854
 
  
855
 
  message::Transaction *transaction= getActiveTransaction(in_session);
856
 
  message::Statement *statement= transaction->add_statement();
857
 
 
858
 
  initStatement(*statement, message::Statement::TRUNCATE_TABLE, in_session);
859
 
 
860
 
  /* 
861
 
   * Construct the specialized TruncateTableStatement message and attach
862
 
   * it to the generic Statement message
863
 
   */
864
 
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
865
 
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
866
 
 
867
 
  const char *schema_name= in_table->getShare()->db.str;
868
 
  const char *table_name= in_table->getShare()->table_name.str;
869
 
 
870
 
  table_metadata->set_schema_name(schema_name);
871
 
  table_metadata->set_table_name(table_name);
872
 
 
873
 
  finalizeStatement(*statement, in_session);
874
 
 
875
 
  finalizeTransaction(*transaction, in_session);
876
 
  
877
 
  push(*transaction);
878
 
 
879
 
  cleanupTransaction(transaction, in_session);
880
 
}
881
 
 
882
 
void ReplicationServices::rawStatement(Session *in_session, const string &query)
883
 
{
884
 
  if (! is_active)
885
 
    return;
886
 
  
887
 
  message::Transaction *transaction= getActiveTransaction(in_session);
888
 
  message::Statement *statement= transaction->add_statement();
889
 
 
890
 
  initStatement(*statement, message::Statement::RAW_SQL, in_session);
891
 
  statement->set_sql(query);
892
 
  finalizeStatement(*statement, in_session);
893
 
 
894
 
  finalizeTransaction(*transaction, in_session);
895
 
  
896
 
  push(*transaction);
897
 
 
898
 
  cleanupTransaction(transaction, in_session);
899
 
}
900
 
 
901
 
void ReplicationServices::push(message::Transaction &to_push)
902
 
{
903
 
  vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
904
 
  vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
905
 
  appl_start_iter= appliers.begin();
906
 
 
907
 
  plugin::TransactionReplicator *cur_repl;
908
 
  plugin::TransactionApplier *cur_appl;
909
 
 
910
 
  while (repl_iter != replicators.end())
911
 
  {
912
 
    cur_repl= *repl_iter;
913
 
    if (! cur_repl->isEnabled())
914
 
    {
915
 
      ++repl_iter;
916
 
      continue;
917
 
    }
918
 
    
919
 
    appl_iter= appl_start_iter;
920
 
    while (appl_iter != appliers.end())
921
 
    {
922
 
      cur_appl= *appl_iter;
923
 
 
924
 
      if (! cur_appl->isEnabled())
925
 
      {
926
 
        ++appl_iter;
927
 
        continue;
928
 
      }
929
 
 
930
 
      cur_repl->replicate(cur_appl, to_push);
931
 
      
 
167
plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
 
168
                                                                          message::Transaction &to_push)
 
169
{
 
170
  plugin::ReplicationReturnCode result= plugin::SUCCESS;
 
171
 
 
172
  for (ReplicationStreams::iterator iter= replication_streams.begin();
 
173
       iter != replication_streams.end();
 
174
       ++iter)
 
175
  {
 
176
    plugin::TransactionReplicator *cur_repl= (*iter).first;
 
177
    plugin::TransactionApplier *cur_appl= (*iter).second;
 
178
 
 
179
    result= cur_repl->replicate(cur_appl, in_session, to_push);
 
180
 
 
181
    if (result == plugin::SUCCESS)
 
182
    {
932
183
      /* 
933
184
       * We update the timestamp for the last applied Transaction so that
934
185
       * publisher plugins can ask the replication services when the
936
187
       * method.
937
188
       */
938
189
      last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
939
 
      ++appl_iter;
940
190
    }
941
 
    ++repl_iter;
 
191
    else
 
192
      return result;
942
193
  }
 
194
  return result;
 
195
}
 
196
 
 
197
ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
 
198
{
 
199
  return replication_streams;
943
200
}
944
201
 
945
202
} /* namespace drizzled */