1
/* Copyright (C) 2006 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16
#include "mysql_priv.h"
17
#include "rpl_injector.h"
20
injector::transaction - member definitions
23
/* inline since it's called below */
25
injector::transaction::transaction(MYSQL_BIN_LOG *log, THD *thd)
26
: m_state(START_STATE), m_thd(thd)
29
Default initialization of m_start_pos (which initializes it to garbage).
30
We need to fill it in using the code below.
33
log->get_current_log(&log_info);
34
/* !!! binlog_pos does not follow RAII !!! */
35
m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0));
36
m_start_pos.m_file_pos= log_info.pos;
40
thd->set_current_stmt_binlog_row_based();
43
injector::transaction::~transaction()
48
/* Needed since my_free expects a 'char*' (instead of 'void*'). */
49
char* const the_memory= const_cast<char*>(m_start_pos.m_file_name);
52
We set the first character to null just to give all the copies of the
53
start position a (minimal) chance of seening that the memory is lost.
54
All assuming the my_free does not step over the memory, of course.
58
my_free(the_memory, MYF(0));
62
@retval 0 transaction committed
63
@retval 1 transaction rolled back
65
int injector::transaction::commit()
67
DBUG_ENTER("injector::transaction::commit()");
68
int error= m_thd->binlog_flush_pending_rows_event(true);
70
Cluster replication does not preserve statement or
71
transaction boundaries of the master. Instead, a new
72
transaction on replication slave is started when a new GCI
73
(global checkpoint identifier) is issued, and is committed
74
when the last event of the check point has been received and
75
processed. This ensures consistency of each cluster in
76
cluster replication, and there is no requirement for stronger
77
consistency: MySQL replication is asynchronous with other
80
A practical consequence of that is that row level replication
81
stream passed through the injector thread never contains
83
Here we should preserve the server invariant that there is no
84
outstanding statement transaction when the normal transaction
85
is committed by committing the statement transaction
88
error |= ha_autocommit_or_rollback(m_thd, error);
89
end_trans(m_thd, error ? ROLLBACK : COMMIT);
93
int injector::transaction::use_table(server_id_type sid, table tbl)
95
DBUG_ENTER("injector::transaction::use_table");
99
if ((error= check_state(TABLE_STATE)))
102
server_id_type save_id= m_thd->server_id;
103
m_thd->set_server_id(sid);
104
error= m_thd->binlog_write_table_map(tbl.get_table(),
105
tbl.is_transactional());
106
m_thd->set_server_id(save_id);
111
int injector::transaction::write_row (server_id_type sid, table tbl,
112
MY_BITMAP const* cols, size_t colcnt,
115
DBUG_ENTER("injector::transaction::write_row(...)");
117
int error= check_state(ROW_STATE);
121
server_id_type save_id= m_thd->server_id;
122
m_thd->set_server_id(sid);
123
error= m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(),
124
cols, colcnt, record);
125
m_thd->set_server_id(save_id);
130
int injector::transaction::delete_row(server_id_type sid, table tbl,
131
MY_BITMAP const* cols, size_t colcnt,
134
DBUG_ENTER("injector::transaction::delete_row(...)");
136
int error= check_state(ROW_STATE);
140
server_id_type save_id= m_thd->server_id;
141
m_thd->set_server_id(sid);
142
error= m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(),
143
cols, colcnt, record);
144
m_thd->set_server_id(save_id);
149
int injector::transaction::update_row(server_id_type sid, table tbl,
150
MY_BITMAP const* cols, size_t colcnt,
151
record_type before, record_type after)
153
DBUG_ENTER("injector::transaction::update_row(...)");
155
int error= check_state(ROW_STATE);
159
server_id_type save_id= m_thd->server_id;
160
m_thd->set_server_id(sid);
161
error= m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(),
162
cols, colcnt, before, after);
163
m_thd->set_server_id(save_id);
168
injector::transaction::binlog_pos injector::transaction::start_pos() const
175
injector - member definitions
178
/* This constructor is called below */
179
inline injector::injector()
183
static injector *s_injector= 0;
184
injector *injector::instance()
187
s_injector= new injector;
188
/* "There can be only one [instance]" */
192
void injector::free_instance()
194
injector *inj = s_injector;
204
injector::transaction injector::new_trans(THD *thd)
206
DBUG_ENTER("injector::new_trans(THD*)");
208
Currently, there is no alternative to using 'mysql_bin_log' since that
209
is hardcoded into the way the handler is using the binary log.
211
DBUG_RETURN(transaction(&mysql_bin_log, thd));
214
void injector::new_trans(THD *thd, injector::transaction *ptr)
216
DBUG_ENTER("injector::new_trans(THD *, transaction *)");
218
Currently, there is no alternative to using 'mysql_bin_log' since that
219
is hardcoded into the way the handler is using the binary log.
221
transaction trans(&mysql_bin_log, thd);
227
int injector::record_incident(THD *thd, Incident incident)
229
Incident_log_event ev(thd, incident);
230
if (int error= mysql_bin_log.write(&ev))
232
mysql_bin_log.rotate_and_purge(RP_FORCE_ROTATE);
236
int injector::record_incident(THD *thd, Incident incident, LEX_STRING const message)
238
Incident_log_event ev(thd, incident, message);
239
if (int error= mysql_bin_log.write(&ev))
241
mysql_bin_log.rotate_and_purge(RP_FORCE_ROTATE);