~ubuntu-branches/ubuntu/trusty/mariadb-5.5/trusty-proposed

« back to all changes in this revision

Viewing changes to sql/rpl_injector.cc

  • Committer: Package Import Robot
  • Author(s): Otto Kekäläinen
  • Date: 2013-12-22 10:27:05 UTC
  • Revision ID: package-import@ubuntu.com-20131222102705-mndw7s12mz0szrcn
Tags: upstream-5.5.32
Import upstream version 5.5.32

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2006, 2011, Oracle and/or its affiliates.
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
15
 
 
16
#include "sql_priv.h" 
 
17
#include "unireg.h"                             // REQUIRED by later includes
 
18
#include "rpl_injector.h"
 
19
#include "transaction.h"
 
20
#include "sql_parse.h"                          // begin_trans, end_trans, COMMIT
 
21
#include "sql_base.h"                           // close_thread_tables
 
22
#include "log_event.h"                          // Incident_log_event
 
23
 
 
24
/*
 
25
  injector::transaction - member definitions
 
26
*/
 
27
 
 
28
/* inline since it's called below */
 
29
inline
 
30
injector::transaction::transaction(MYSQL_BIN_LOG *log, THD *thd)
 
31
  : m_state(START_STATE), m_thd(thd)
 
32
{
 
33
  /* 
 
34
     Default initialization of m_start_pos (which initializes it to garbage).
 
35
     We need to fill it in using the code below.
 
36
  */
 
37
  LOG_INFO log_info;
 
38
  log->get_current_log(&log_info);
 
39
  /* !!! binlog_pos does not follow RAII !!! */
 
40
  m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0));
 
41
  m_start_pos.m_file_pos= log_info.pos;
 
42
 
 
43
  m_thd->lex->start_transaction_opt= 0; /* for begin_trans() */
 
44
  trans_begin(m_thd);
 
45
}
 
46
 
 
47
injector::transaction::~transaction()
 
48
{
 
49
  if (!good())
 
50
    return;
 
51
 
 
52
  /* Needed since my_free expects a 'char*' (instead of 'void*'). */
 
53
  char* const the_memory= const_cast<char*>(m_start_pos.m_file_name);
 
54
 
 
55
  /*
 
56
    We set the first character to null just to give all the copies of the
 
57
    start position a (minimal) chance of seening that the memory is lost.
 
58
    All assuming the my_free does not step over the memory, of course.
 
59
  */
 
60
  *the_memory= '\0';
 
61
 
 
62
  my_free(the_memory);
 
63
}
 
64
 
 
65
/**
 
66
   @retval 0 transaction committed
 
67
   @retval 1 transaction rolled back
 
68
 */
 
69
int injector::transaction::commit()
 
70
{
 
71
   DBUG_ENTER("injector::transaction::commit()");
 
72
   int error= m_thd->binlog_flush_pending_rows_event(true);
 
73
   /*
 
74
     Cluster replication does not preserve statement or
 
75
     transaction boundaries of the master.  Instead, a new
 
76
     transaction on replication slave is started when a new GCI
 
77
     (global checkpoint identifier) is issued, and is committed
 
78
     when the last event of the check point has been received and
 
79
     processed. This ensures consistency of each cluster in
 
80
     cluster replication, and there is no requirement for stronger
 
81
     consistency: MySQL replication is asynchronous with other
 
82
     engines as well.
 
83
 
 
84
     A practical consequence of that is that row level replication
 
85
     stream passed through the injector thread never contains
 
86
     COMMIT events.
 
87
     Here we should preserve the server invariant that there is no
 
88
     outstanding statement transaction when the normal transaction
 
89
     is committed by committing the statement transaction
 
90
     explicitly.
 
91
   */
 
92
   trans_commit_stmt(m_thd);
 
93
   if (!trans_commit(m_thd))
 
94
   {
 
95
     close_thread_tables(m_thd);
 
96
     m_thd->mdl_context.release_transactional_locks();
 
97
   }
 
98
   DBUG_RETURN(error);
 
99
}
 
100
 
 
101
 
 
102
int injector::transaction::use_table(server_id_type sid, table tbl)
 
103
{
 
104
  DBUG_ENTER("injector::transaction::use_table");
 
105
 
 
106
  int error;
 
107
 
 
108
  if ((error= check_state(TABLE_STATE)))
 
109
    DBUG_RETURN(error);
 
110
 
 
111
  server_id_type save_id= m_thd->server_id;
 
112
  m_thd->set_server_id(sid);
 
113
  error= m_thd->binlog_write_table_map(tbl.get_table(),
 
114
                                       tbl.is_transactional());
 
115
  m_thd->set_server_id(save_id);
 
116
  DBUG_RETURN(error);
 
117
}
 
118
 
 
119
 
 
120
int injector::transaction::write_row (server_id_type sid, table tbl, 
 
121
                                      MY_BITMAP const* cols, size_t colcnt,
 
122
                                      record_type record)
 
123
{
 
124
   DBUG_ENTER("injector::transaction::write_row(...)");
 
125
 
 
126
   int error= check_state(ROW_STATE);
 
127
   if (error)
 
128
     DBUG_RETURN(error);
 
129
 
 
130
   server_id_type save_id= m_thd->server_id;
 
131
   m_thd->set_server_id(sid);
 
132
   error= m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(), 
 
133
                                  cols, colcnt, record);
 
134
   m_thd->set_server_id(save_id);
 
135
   DBUG_RETURN(error);
 
136
}
 
137
 
 
138
 
 
139
int injector::transaction::delete_row(server_id_type sid, table tbl,
 
140
                                      MY_BITMAP const* cols, size_t colcnt,
 
141
                                      record_type record)
 
142
{
 
143
   DBUG_ENTER("injector::transaction::delete_row(...)");
 
144
 
 
145
   int error= check_state(ROW_STATE);
 
146
   if (error)
 
147
     DBUG_RETURN(error);
 
148
 
 
149
   server_id_type save_id= m_thd->server_id;
 
150
   m_thd->set_server_id(sid);
 
151
   error= m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(), 
 
152
                                   cols, colcnt, record);
 
153
   m_thd->set_server_id(save_id);
 
154
   DBUG_RETURN(error);
 
155
}
 
156
 
 
157
 
 
158
int injector::transaction::update_row(server_id_type sid, table tbl, 
 
159
                                      MY_BITMAP const* cols, size_t colcnt,
 
160
                                      record_type before, record_type after)
 
161
{
 
162
   DBUG_ENTER("injector::transaction::update_row(...)");
 
163
 
 
164
   int error= check_state(ROW_STATE);
 
165
   if (error)
 
166
     DBUG_RETURN(error);
 
167
 
 
168
   server_id_type save_id= m_thd->server_id;
 
169
   m_thd->set_server_id(sid);
 
170
   error= m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(),
 
171
                                   cols, colcnt, before, after);
 
172
   m_thd->set_server_id(save_id);
 
173
   DBUG_RETURN(error);
 
174
}
 
175
 
 
176
 
 
177
injector::transaction::binlog_pos injector::transaction::start_pos() const
 
178
{
 
179
   return m_start_pos;                  
 
180
}
 
181
 
 
182
 
 
183
/*
 
184
  injector - member definitions
 
185
*/
 
186
 
 
187
/* This constructor is called below */
 
188
inline injector::injector()
 
189
{
 
190
}
 
191
 
 
192
static injector *s_injector= 0;
 
193
injector *injector::instance()
 
194
{
 
195
  if (s_injector == 0)
 
196
    s_injector= new injector;
 
197
  /* "There can be only one [instance]" */
 
198
  return s_injector;
 
199
}
 
200
 
 
201
void injector::free_instance()
 
202
{
 
203
  injector *inj = s_injector;
 
204
 
 
205
  if (inj != 0)
 
206
  {
 
207
    s_injector= 0;
 
208
    delete inj;
 
209
  }
 
210
}
 
211
 
 
212
 
 
213
injector::transaction injector::new_trans(THD *thd)
 
214
{
 
215
   DBUG_ENTER("injector::new_trans(THD*)");
 
216
   /*
 
217
     Currently, there is no alternative to using 'mysql_bin_log' since that
 
218
     is hardcoded into the way the handler is using the binary log.
 
219
   */
 
220
   DBUG_RETURN(transaction(&mysql_bin_log, thd));
 
221
}
 
222
 
 
223
void injector::new_trans(THD *thd, injector::transaction *ptr)
 
224
{
 
225
   DBUG_ENTER("injector::new_trans(THD *, transaction *)");
 
226
   /*
 
227
     Currently, there is no alternative to using 'mysql_bin_log' since that
 
228
     is hardcoded into the way the handler is using the binary log. 
 
229
   */
 
230
   transaction trans(&mysql_bin_log, thd);
 
231
   ptr->swap(trans);
 
232
 
 
233
   DBUG_VOID_RETURN;
 
234
}
 
235
 
 
236
int injector::record_incident(THD *thd, Incident incident)
 
237
{
 
238
  Incident_log_event ev(thd, incident);
 
239
  if (int error= mysql_bin_log.write(&ev))
 
240
    return error;
 
241
  return mysql_bin_log.rotate_and_purge(true);
 
242
}
 
243
 
 
244
int injector::record_incident(THD *thd, Incident incident, LEX_STRING const message)
 
245
{
 
246
  Incident_log_event ev(thd, incident, message);
 
247
  if (int error= mysql_bin_log.write(&ev))
 
248
    return error;
 
249
  return mysql_bin_log.rotate_and_purge(true);
 
250
}