~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to sql/rpl_injector.cc

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2006 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
15
 
 
16
#include "mysql_priv.h" 
 
17
#include "rpl_injector.h"
 
18
 
 
19
/*
 
20
  injector::transaction - member definitions
 
21
*/
 
22
 
 
23
/* inline since it's called below */
 
24
inline
 
25
injector::transaction::transaction(MYSQL_BIN_LOG *log, THD *thd)
 
26
  : m_state(START_STATE), m_thd(thd)
 
27
{
 
28
  /* 
 
29
     Default initialization of m_start_pos (which initializes it to garbage).
 
30
     We need to fill it in using the code below.
 
31
  */
 
32
  LOG_INFO log_info;
 
33
  log->get_current_log(&log_info);
 
34
  /* !!! binlog_pos does not follow RAII !!! */
 
35
  m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0));
 
36
  m_start_pos.m_file_pos= log_info.pos;
 
37
 
 
38
  begin_trans(m_thd);
 
39
 
 
40
  thd->set_current_stmt_binlog_row_based();
 
41
}
 
42
 
 
43
injector::transaction::~transaction()
 
44
{
 
45
  if (!good())
 
46
    return;
 
47
 
 
48
  /* Needed since my_free expects a 'char*' (instead of 'void*'). */
 
49
  char* const the_memory= const_cast<char*>(m_start_pos.m_file_name);
 
50
 
 
51
  /*
 
52
    We set the first character to null just to give all the copies of the
 
53
    start position a (minimal) chance of seening that the memory is lost.
 
54
    All assuming the my_free does not step over the memory, of course.
 
55
  */
 
56
  *the_memory= '\0';
 
57
 
 
58
  my_free(the_memory, MYF(0));
 
59
}
 
60
 
 
61
/**
 
62
   @retval 0 transaction committed
 
63
   @retval 1 transaction rolled back
 
64
 */
 
65
int injector::transaction::commit()
 
66
{
 
67
   DBUG_ENTER("injector::transaction::commit()");
 
68
   int error= m_thd->binlog_flush_pending_rows_event(true);
 
69
   /*
 
70
     Cluster replication does not preserve statement or
 
71
     transaction boundaries of the master.  Instead, a new
 
72
     transaction on replication slave is started when a new GCI
 
73
     (global checkpoint identifier) is issued, and is committed
 
74
     when the last event of the check point has been received and
 
75
     processed. This ensures consistency of each cluster in
 
76
     cluster replication, and there is no requirement for stronger
 
77
     consistency: MySQL replication is asynchronous with other
 
78
     engines as well.
 
79
 
 
80
     A practical consequence of that is that row level replication
 
81
     stream passed through the injector thread never contains
 
82
     COMMIT events.
 
83
     Here we should preserve the server invariant that there is no
 
84
     outstanding statement transaction when the normal transaction
 
85
     is committed by committing the statement transaction
 
86
     explicitly.
 
87
   */
 
88
   error |= ha_autocommit_or_rollback(m_thd, error);
 
89
   end_trans(m_thd, error ? ROLLBACK : COMMIT);
 
90
   DBUG_RETURN(error);
 
91
}
 
92
 
 
93
int injector::transaction::use_table(server_id_type sid, table tbl)
 
94
{
 
95
  DBUG_ENTER("injector::transaction::use_table");
 
96
 
 
97
  int error;
 
98
 
 
99
  if ((error= check_state(TABLE_STATE)))
 
100
    DBUG_RETURN(error);
 
101
 
 
102
  server_id_type save_id= m_thd->server_id;
 
103
  m_thd->set_server_id(sid);
 
104
  error= m_thd->binlog_write_table_map(tbl.get_table(),
 
105
                                       tbl.is_transactional());
 
106
  m_thd->set_server_id(save_id);
 
107
  DBUG_RETURN(error);
 
108
}
 
109
 
 
110
 
 
111
int injector::transaction::write_row (server_id_type sid, table tbl, 
 
112
                                      MY_BITMAP const* cols, size_t colcnt,
 
113
                                      record_type record)
 
114
{
 
115
   DBUG_ENTER("injector::transaction::write_row(...)");
 
116
 
 
117
   int error= check_state(ROW_STATE);
 
118
   if (error)
 
119
     DBUG_RETURN(error);
 
120
 
 
121
   server_id_type save_id= m_thd->server_id;
 
122
   m_thd->set_server_id(sid);
 
123
   error= m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(), 
 
124
                                  cols, colcnt, record);
 
125
   m_thd->set_server_id(save_id);
 
126
   DBUG_RETURN(error);
 
127
}
 
128
 
 
129
 
 
130
int injector::transaction::delete_row(server_id_type sid, table tbl,
 
131
                                      MY_BITMAP const* cols, size_t colcnt,
 
132
                                      record_type record)
 
133
{
 
134
   DBUG_ENTER("injector::transaction::delete_row(...)");
 
135
 
 
136
   int error= check_state(ROW_STATE);
 
137
   if (error)
 
138
     DBUG_RETURN(error);
 
139
 
 
140
   server_id_type save_id= m_thd->server_id;
 
141
   m_thd->set_server_id(sid);
 
142
   error= m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(), 
 
143
                                   cols, colcnt, record);
 
144
   m_thd->set_server_id(save_id);
 
145
   DBUG_RETURN(error);
 
146
}
 
147
 
 
148
 
 
149
int injector::transaction::update_row(server_id_type sid, table tbl, 
 
150
                                      MY_BITMAP const* cols, size_t colcnt,
 
151
                                      record_type before, record_type after)
 
152
{
 
153
   DBUG_ENTER("injector::transaction::update_row(...)");
 
154
 
 
155
   int error= check_state(ROW_STATE);
 
156
   if (error)
 
157
     DBUG_RETURN(error);
 
158
 
 
159
   server_id_type save_id= m_thd->server_id;
 
160
   m_thd->set_server_id(sid);
 
161
   error= m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(),
 
162
                                   cols, colcnt, before, after);
 
163
   m_thd->set_server_id(save_id);
 
164
   DBUG_RETURN(error);
 
165
}
 
166
 
 
167
 
 
168
injector::transaction::binlog_pos injector::transaction::start_pos() const
 
169
{
 
170
   return m_start_pos;                  
 
171
}
 
172
 
 
173
 
 
174
/*
 
175
  injector - member definitions
 
176
*/
 
177
 
 
178
/* This constructor is called below */
 
179
inline injector::injector()
 
180
{
 
181
}
 
182
 
 
183
static injector *s_injector= 0;
 
184
injector *injector::instance()
 
185
{
 
186
  if (s_injector == 0)
 
187
    s_injector= new injector;
 
188
  /* "There can be only one [instance]" */
 
189
  return s_injector;
 
190
}
 
191
 
 
192
void injector::free_instance()
 
193
{
 
194
  injector *inj = s_injector;
 
195
 
 
196
  if (inj != 0)
 
197
  {
 
198
    s_injector= 0;
 
199
    delete inj;
 
200
  }
 
201
}
 
202
 
 
203
 
 
204
injector::transaction injector::new_trans(THD *thd)
 
205
{
 
206
   DBUG_ENTER("injector::new_trans(THD*)");
 
207
   /*
 
208
     Currently, there is no alternative to using 'mysql_bin_log' since that
 
209
     is hardcoded into the way the handler is using the binary log.
 
210
   */
 
211
   DBUG_RETURN(transaction(&mysql_bin_log, thd));
 
212
}
 
213
 
 
214
void injector::new_trans(THD *thd, injector::transaction *ptr)
 
215
{
 
216
   DBUG_ENTER("injector::new_trans(THD *, transaction *)");
 
217
   /*
 
218
     Currently, there is no alternative to using 'mysql_bin_log' since that
 
219
     is hardcoded into the way the handler is using the binary log. 
 
220
   */
 
221
   transaction trans(&mysql_bin_log, thd);
 
222
   ptr->swap(trans);
 
223
 
 
224
   DBUG_VOID_RETURN;
 
225
}
 
226
 
 
227
int injector::record_incident(THD *thd, Incident incident)
 
228
{
 
229
  Incident_log_event ev(thd, incident);
 
230
  if (int error= mysql_bin_log.write(&ev))
 
231
    return error;
 
232
  mysql_bin_log.rotate_and_purge(RP_FORCE_ROTATE);
 
233
  return 0;
 
234
}
 
235
 
 
236
int injector::record_incident(THD *thd, Incident incident, LEX_STRING const message)
 
237
{
 
238
  Incident_log_event ev(thd, incident, message);
 
239
  if (int error= mysql_bin_log.write(&ev))
 
240
    return error;
 
241
  mysql_bin_log.rotate_and_purge(RP_FORCE_ROTATE);
 
242
  return 0;
 
243
}