1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2009 Sun Microsystems
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
* Defines the implementation of a simple replicator that can filter
25
* events based on a schema or table name.
29
* This is a very simple implementation. All we do is maintain two
32
* 1) contains all the schema names to filter
33
* 2) contains all the table names to filter
35
* If an event is on a schema or table in the vectors described above, then
36
* the event will not be passed along to the applier.
40
#include <drizzled/gettext.h>
41
#include <drizzled/plugin/transaction_applier.h>
42
#include <drizzled/message/transaction.pb.h>
43
#include <drizzled/plugin.h>
44
#include <drizzled/plugin/registry.h>
46
#include "filtered_replicator.h"
52
using namespace drizzled;
54
static bool sysvar_filtered_replicator_enabled= false;
55
static char *sysvar_filtered_replicator_sch_filters= NULL;
56
static char *sysvar_filtered_replicator_tab_filters= NULL;
57
static char *sysvar_filtered_replicator_sch_regex= NULL;
58
static char *sysvar_filtered_replicator_tab_regex= NULL;
60
FilteredReplicator::FilteredReplicator(string name_arg,
61
const char *in_sch_filters,
62
const char *in_tab_filters)
64
plugin::TransactionReplicator(name_arg),
67
sch_filter_string(in_sch_filters),
68
tab_filter_string(in_tab_filters),
69
sch_regex_enabled(false),
70
tab_regex_enabled(false),
75
* Add each of the specified schemas to the vector of schemas
80
populateFilter(sch_filter_string, schemas_to_filter);
84
* Add each of the specified tables to the vector of tables
89
populateFilter(tab_filter_string, tables_to_filter);
93
* Compile the regular expression for schema's to filter
94
* if one is specified.
96
if (sysvar_filtered_replicator_sch_regex)
98
const char *error= NULL;
99
int32_t error_offset= 0;
100
sch_re= pcre_compile(sysvar_filtered_replicator_sch_regex,
105
sch_regex_enabled= true;
109
* Compile the regular expression for table's to filter
110
* if one is specified.
112
if (sysvar_filtered_replicator_tab_regex)
114
const char *error= NULL;
115
int32_t error_offset= 0;
116
tab_re= pcre_compile(sysvar_filtered_replicator_tab_regex,
121
tab_regex_enabled= true;
124
pthread_mutex_init(&sch_vector_lock, NULL);
125
pthread_mutex_init(&tab_vector_lock, NULL);
126
pthread_mutex_init(&sysvar_sch_lock, NULL);
127
pthread_mutex_init(&sysvar_tab_lock, NULL);
130
bool FilteredReplicator::isEnabled() const
132
return sysvar_filtered_replicator_enabled;
135
void FilteredReplicator::parseStatementTableMetadata(const message::Statement &in_statement,
136
string &in_schema_name,
137
string &in_table_name) const
139
switch (in_statement.type())
141
case message::Statement::INSERT:
143
const message::TableMetadata &metadata= in_statement.insert_header().table_metadata();
144
in_schema_name.assign(metadata.schema_name());
145
in_table_name.assign(metadata.table_name());
148
case message::Statement::UPDATE:
150
const message::TableMetadata &metadata= in_statement.update_header().table_metadata();
151
in_schema_name.assign(metadata.schema_name());
152
in_table_name.assign(metadata.table_name());
155
case message::Statement::DELETE:
157
const message::TableMetadata &metadata= in_statement.delete_header().table_metadata();
158
in_schema_name.assign(metadata.schema_name());
159
in_table_name.assign(metadata.table_name());
162
case message::Statement::CREATE_SCHEMA:
164
in_schema_name.assign(in_statement.create_schema_statement().schema().name());
165
in_table_name.clear();
168
case message::Statement::ALTER_SCHEMA:
170
in_schema_name.assign(in_statement.alter_schema_statement().after().name());
171
in_table_name.clear();
174
case message::Statement::DROP_SCHEMA:
176
in_schema_name.assign(in_statement.drop_schema_statement().schema_name());
177
in_table_name.clear();
180
case message::Statement::CREATE_TABLE:
182
// in_schema_name.assign(in_statement.create_table_statement().table().name());
183
in_table_name.assign(in_statement.create_table_statement().table().name());
186
case message::Statement::ALTER_TABLE:
188
// in_schema_name.assign(in_statement.alter_table_statement().table().name());
189
in_table_name.assign(in_statement.alter_table_statement().after().name());
192
case message::Statement::DROP_TABLE:
194
const message::TableMetadata &metadata= in_statement.drop_table_statement().table_metadata();
195
in_schema_name.assign(metadata.schema_name());
196
in_table_name.assign(metadata.table_name());
201
/* All other types have no schema and table information */
202
in_schema_name.clear();
203
in_table_name.clear();
208
void FilteredReplicator::enable()
210
sysvar_filtered_replicator_enabled= true;
213
void FilteredReplicator::disable()
215
sysvar_filtered_replicator_enabled= false;
218
void FilteredReplicator::replicate(plugin::TransactionApplier *in_applier,
219
message::Transaction &to_replicate)
224
size_t num_statements= to_replicate.statement_size();
228
* We build a new transaction message containing only Statement
229
* messages that have not been filtered.
231
* @todo A more efficient method would be to rework the pointers
232
* that the to_replicate.statement() vector contains and remove
233
* the statement pointers that are filtered...
235
message::Transaction filtered_transaction;
237
for (x= 0; x < num_statements; ++x)
242
const message::Statement &statement= to_replicate.statement(x);
245
* First, we check to see if the command consists of raw SQL. If so,
246
* we need to parse this SQL and determine whether to filter the event
247
* based on the information we obtain from the parsed SQL.
248
* If not raw SQL, check if this event should be filtered or not
249
* based on the schema and table names in the command message.
251
* The schema and table names are stored in TableMetadata headers
252
* for most types of Statement messages.
254
if (statement.type() == message::Statement::RAW_SQL)
256
parseQuery(statement.sql(), schema_name, table_name);
260
parseStatementTableMetadata(statement, schema_name, table_name);
264
* Convert the schema name and table name strings to lowercase so that it
265
* does not matter what case the table or schema name was specified in. We
266
* also keep all entries in the vectors of schemas and tables to filter in
269
std::transform(schema_name.begin(), schema_name.end(),
270
schema_name.begin(), ::tolower);
271
std::transform(table_name.begin(), table_name.end(),
272
table_name.begin(), ::tolower);
274
if (! isSchemaFiltered(schema_name) &&
275
! isTableFiltered(table_name))
277
message::Statement *s= filtered_transaction.add_statement();
278
*s= statement; /* copy contruct */
282
if (filtered_transaction.statement_size() > 0)
286
* We can now simply call the applier's apply() method, passing
287
* along the supplied command.
289
message::TransactionContext *tc= filtered_transaction.mutable_transaction_context();
290
*tc= to_replicate.transaction_context(); /* copy construct */
291
in_applier->apply(filtered_transaction);
295
void FilteredReplicator::populateFilter(std::string input,
296
vector<string> &filter)
299
* Convert the input string to lowercase so that all entries in the vector
300
* will be in lowercase.
302
std::transform(input.begin(), input.end(),
303
input.begin(), ::tolower);
304
string::size_type last_pos= input.find_first_not_of(',', 0);
305
string::size_type pos= input.find_first_of(',', last_pos);
307
while (pos != string::npos || last_pos != string::npos)
309
filter.push_back(input.substr(last_pos, pos - last_pos));
310
last_pos= input.find_first_not_of(',', pos);
311
pos= input.find_first_of(',', last_pos);
315
bool FilteredReplicator::isSchemaFiltered(const string &schema_name)
317
pthread_mutex_lock(&sch_vector_lock);
318
vector<string>::iterator it= find(schemas_to_filter.begin(),
319
schemas_to_filter.end(),
321
if (it != schemas_to_filter.end())
323
pthread_mutex_unlock(&sch_vector_lock);
326
pthread_mutex_unlock(&sch_vector_lock);
329
* If regular expression matching is enabled for schemas to filter, then
330
* we check to see if this schema name matches the regular expression that
331
* has been specified.
333
if (sch_regex_enabled)
335
int32_t result= pcre_exec(sch_re,
338
schema_name.length(),
352
bool FilteredReplicator::isTableFiltered(const string &table_name)
354
pthread_mutex_lock(&tab_vector_lock);
355
vector<string>::iterator it= find(tables_to_filter.begin(),
356
tables_to_filter.end(),
358
if (it != tables_to_filter.end())
360
pthread_mutex_unlock(&tab_vector_lock);
363
pthread_mutex_unlock(&tab_vector_lock);
366
* If regular expression matching is enabled for tables to filter, then
367
* we check to see if this table name matches the regular expression that
368
* has been specified.
370
if (tab_regex_enabled)
372
int32_t result= pcre_exec(tab_re,
389
void FilteredReplicator::parseQuery(const string &sql,
394
* Determine what type of SQL we are dealing with e.g. create table,
397
string::size_type pos= sql.find_first_of(' ', 0);
398
string type= sql.substr(0, pos);
401
* Convert the type string to uppercase here so that it doesn't
402
* matter what case the user entered the statement in.
404
std::transform(type.begin(), type.end(),
405
type.begin(), ::toupper);
407
if (type.compare("DROP") == 0)
410
* The schema and table name can be either the third word
411
* or the fifth word in a DROP TABLE statement...so we extract
412
* the third word from the SQL and see whether it is and IF or
415
pos= sql.find_first_of(' ', 11);
416
string cmp_str= sql.substr(11, pos - 11);
417
string target_name("");
418
if (cmp_str.compare("IF") == 0)
420
/* the name must be the fifth word */
421
pos= sql.find_first_of(' ', 21);
422
target_name.assign(sql.substr(21, pos - 21));
426
target_name.assign(cmp_str);
429
* Determine whether the name is a concatenation of the schema
430
* name and table name i.e. schema.table or just the table name
433
pos= target_name.find_first_of('.', 0);
434
if (pos != string::npos)
437
* There is a schema name here...
439
schema_name.assign(target_name.substr(0, pos));
441
* The rest of the name string is the table name.
443
table_name.assign(target_name.substr(pos + 1));
447
table_name.assign(target_name);
450
else if (type.compare("CREATE") == 0)
453
* The schema and table name are always the third word
454
* in a CREATE TABLE statement...always (unless there is
455
* some crazy syntax I am unaware of).
457
pos= sql.find_first_of(' ', 13);
458
string target_name= sql.substr(13, pos - 13);
460
* Determine whether the name is a concatenation of the schema
461
* name and table name i.e. schema.table or just the table name
464
pos= target_name.find_first_of('.', 0);
465
if (pos != string::npos)
468
* There is a schema name here...
470
schema_name.assign(target_name.substr(0, pos));
472
* The rest of the name string is the table name.
474
table_name.assign(target_name.substr(pos + 1));
478
table_name.assign(target_name);
483
/* we only deal with DROP and CREATE table for the moment */
488
void FilteredReplicator::setSchemaFilter(const string &input)
490
pthread_mutex_lock(&sch_vector_lock);
491
pthread_mutex_lock(&sysvar_sch_lock);
492
sch_filter_string.assign(input);
493
schemas_to_filter.clear();
494
populateFilter(sch_filter_string, schemas_to_filter);
495
pthread_mutex_unlock(&sch_vector_lock);
498
void FilteredReplicator::setTableFilter(const string &input)
500
pthread_mutex_lock(&tab_vector_lock);
501
pthread_mutex_lock(&sysvar_tab_lock);
502
tab_filter_string.assign(input);
503
tables_to_filter.clear();
504
populateFilter(tab_filter_string, tables_to_filter);
505
pthread_mutex_unlock(&tab_vector_lock);
508
static FilteredReplicator *filtered_replicator= NULL; /* The singleton replicator */
510
static int init(plugin::Registry ®istry)
512
if (sysvar_filtered_replicator_enabled)
514
filtered_replicator= new(std::nothrow)
515
FilteredReplicator("filtered_replicator",
516
sysvar_filtered_replicator_sch_filters,
517
sysvar_filtered_replicator_tab_filters);
518
if (filtered_replicator == NULL)
522
registry.add(filtered_replicator);
527
static int deinit(plugin::Registry ®istry)
529
if (filtered_replicator)
531
registry.remove(filtered_replicator);
532
delete filtered_replicator;
537
static int check_filtered_schemas(Session *,
540
drizzle_value *value)
542
char buff[STRING_BUFFER_USUAL_SIZE];
543
int len= sizeof(buff);
544
const char *input= value->val_str(value, buff, &len);
546
if (input && filtered_replicator)
548
filtered_replicator->setSchemaFilter(input);
554
static void set_filtered_schemas(Session *,
559
if (filtered_replicator)
561
if (*(bool *)save != true)
563
/* update the value of the system variable */
564
filtered_replicator->updateSchemaSysvar((const char **) var_ptr);
569
static int check_filtered_tables(Session *,
572
drizzle_value *value)
574
char buff[STRING_BUFFER_USUAL_SIZE];
575
int len= sizeof(buff);
576
const char *input= value->val_str(value, buff, &len);
578
if (input && filtered_replicator)
580
filtered_replicator->setTableFilter(input);
581
*(bool *) save= (bool) true;
584
*(bool *) save= (bool) false;
588
static void set_filtered_tables(Session *,
593
if (filtered_replicator)
595
if (*(bool *)save != false)
597
/* update the value of the system variable */
598
filtered_replicator->updateTableSysvar((const char **) var_ptr);
603
static DRIZZLE_SYSVAR_BOOL(enable,
604
sysvar_filtered_replicator_enabled,
606
N_("Enable filtered replicator"),
607
NULL, /* check func */
608
NULL, /* update func */
610
static DRIZZLE_SYSVAR_STR(filteredschemas,
611
sysvar_filtered_replicator_sch_filters,
613
N_("List of schemas to filter"),
614
check_filtered_schemas,
615
set_filtered_schemas,
617
static DRIZZLE_SYSVAR_STR(filteredtables,
618
sysvar_filtered_replicator_tab_filters,
620
N_("List of tables to filter"),
621
check_filtered_tables,
624
static DRIZZLE_SYSVAR_STR(schemaregex,
625
sysvar_filtered_replicator_sch_regex,
627
N_("Regular expression to apply to schemas to filter"),
631
static DRIZZLE_SYSVAR_STR(tableregex,
632
sysvar_filtered_replicator_tab_regex,
634
N_("Regular expression to apply to tables to filter"),
639
static drizzle_sys_var* filtered_replicator_system_variables[]= {
640
DRIZZLE_SYSVAR(enable),
641
DRIZZLE_SYSVAR(filteredschemas),
642
DRIZZLE_SYSVAR(filteredtables),
643
DRIZZLE_SYSVAR(schemaregex),
644
DRIZZLE_SYSVAR(tableregex),
648
DRIZZLE_DECLARE_PLUGIN
651
"filtered_replicator",
653
"Padraig O'Sullivan",
654
N_("Filtered Replicator"),
656
init, /* Plugin Init */
657
deinit, /* Plugin Deinit */
658
filtered_replicator_system_variables, /* system variables */
659
NULL /* config options */
661
DRIZZLE_DECLARE_PLUGIN_END;