1
/* Copyright (c) 2006, 2011, Oracle and/or its affiliates.
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
17
#include "unireg.h" // REQUIRED by later includes
18
#include "rpl_injector.h"
19
#include "transaction.h"
20
#include "sql_parse.h" // begin_trans, end_trans, COMMIT
21
#include "sql_base.h" // close_thread_tables
22
#include "log_event.h" // Incident_log_event
25
injector::transaction - member definitions
28
/* inline since it's called below */
30
injector::transaction::transaction(MYSQL_BIN_LOG *log, THD *thd)
31
: m_state(START_STATE), m_thd(thd)
34
Default initialization of m_start_pos (which initializes it to garbage).
35
We need to fill it in using the code below.
38
log->get_current_log(&log_info);
39
/* !!! binlog_pos does not follow RAII !!! */
40
m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0));
41
m_start_pos.m_file_pos= log_info.pos;
43
m_thd->lex->start_transaction_opt= 0; /* for begin_trans() */
47
injector::transaction::~transaction()
52
/* Needed since my_free expects a 'char*' (instead of 'void*'). */
53
char* const the_memory= const_cast<char*>(m_start_pos.m_file_name);
56
We set the first character to null just to give all the copies of the
57
start position a (minimal) chance of seening that the memory is lost.
58
All assuming the my_free does not step over the memory, of course.
66
@retval 0 transaction committed
67
@retval 1 transaction rolled back
69
int injector::transaction::commit()
71
DBUG_ENTER("injector::transaction::commit()");
72
int error= m_thd->binlog_flush_pending_rows_event(true);
74
Cluster replication does not preserve statement or
75
transaction boundaries of the master. Instead, a new
76
transaction on replication slave is started when a new GCI
77
(global checkpoint identifier) is issued, and is committed
78
when the last event of the check point has been received and
79
processed. This ensures consistency of each cluster in
80
cluster replication, and there is no requirement for stronger
81
consistency: MySQL replication is asynchronous with other
84
A practical consequence of that is that row level replication
85
stream passed through the injector thread never contains
87
Here we should preserve the server invariant that there is no
88
outstanding statement transaction when the normal transaction
89
is committed by committing the statement transaction
92
trans_commit_stmt(m_thd);
93
if (!trans_commit(m_thd))
95
close_thread_tables(m_thd);
96
m_thd->mdl_context.release_transactional_locks();
102
int injector::transaction::use_table(server_id_type sid, table tbl)
104
DBUG_ENTER("injector::transaction::use_table");
108
if ((error= check_state(TABLE_STATE)))
111
server_id_type save_id= m_thd->server_id;
112
m_thd->set_server_id(sid);
113
error= m_thd->binlog_write_table_map(tbl.get_table(),
114
tbl.is_transactional());
115
m_thd->set_server_id(save_id);
120
int injector::transaction::write_row (server_id_type sid, table tbl,
121
MY_BITMAP const* cols, size_t colcnt,
124
DBUG_ENTER("injector::transaction::write_row(...)");
126
int error= check_state(ROW_STATE);
130
server_id_type save_id= m_thd->server_id;
131
m_thd->set_server_id(sid);
132
error= m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(),
133
cols, colcnt, record);
134
m_thd->set_server_id(save_id);
139
int injector::transaction::delete_row(server_id_type sid, table tbl,
140
MY_BITMAP const* cols, size_t colcnt,
143
DBUG_ENTER("injector::transaction::delete_row(...)");
145
int error= check_state(ROW_STATE);
149
server_id_type save_id= m_thd->server_id;
150
m_thd->set_server_id(sid);
151
error= m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(),
152
cols, colcnt, record);
153
m_thd->set_server_id(save_id);
158
int injector::transaction::update_row(server_id_type sid, table tbl,
159
MY_BITMAP const* cols, size_t colcnt,
160
record_type before, record_type after)
162
DBUG_ENTER("injector::transaction::update_row(...)");
164
int error= check_state(ROW_STATE);
168
server_id_type save_id= m_thd->server_id;
169
m_thd->set_server_id(sid);
170
error= m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(),
171
cols, colcnt, before, after);
172
m_thd->set_server_id(save_id);
177
injector::transaction::binlog_pos injector::transaction::start_pos() const
184
injector - member definitions
187
/* This constructor is called below */
188
inline injector::injector()
192
static injector *s_injector= 0;
193
injector *injector::instance()
196
s_injector= new injector;
197
/* "There can be only one [instance]" */
201
void injector::free_instance()
203
injector *inj = s_injector;
213
injector::transaction injector::new_trans(THD *thd)
215
DBUG_ENTER("injector::new_trans(THD*)");
217
Currently, there is no alternative to using 'mysql_bin_log' since that
218
is hardcoded into the way the handler is using the binary log.
220
DBUG_RETURN(transaction(&mysql_bin_log, thd));
223
void injector::new_trans(THD *thd, injector::transaction *ptr)
225
DBUG_ENTER("injector::new_trans(THD *, transaction *)");
227
Currently, there is no alternative to using 'mysql_bin_log' since that
228
is hardcoded into the way the handler is using the binary log.
230
transaction trans(&mysql_bin_log, thd);
236
int injector::record_incident(THD *thd, Incident incident)
238
Incident_log_event ev(thd, incident);
239
if (int error= mysql_bin_log.write(&ev))
241
return mysql_bin_log.rotate_and_purge(true);
244
int injector::record_incident(THD *thd, Incident incident, LEX_STRING const message)
246
Incident_log_event ev(thd, incident, message);
247
if (int error= mysql_bin_log.write(&ev))
249
return mysql_bin_log.rotate_and_purge(true);