~ubuntu-branches/ubuntu/saucy/drizzle/saucy-proposed

« back to all changes in this revision

Viewing changes to drizzled/message/transaction_writer.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-03-18 12:12:31 UTC
  • Revision ID: james.westby@ubuntu.com-20100318121231-k6g1xe6cshbwa0f8
Tags: upstream-2010.03.1347
ImportĀ upstreamĀ versionĀ 2010.03.1347

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2009 Sun Microsystems
 
5
 *
 
6
 *  Authors:
 
7
 *
 
8
 *    Jay Pipes <joinfu@sun.com>
 
9
 *
 
10
 *  This program is free software; you can redistribute it and/or modify
 
11
 *  it under the terms of the GNU General Public License as published by
 
12
 *  the Free Software Foundation; version 2 of the License.
 
13
 *
 
14
 *  This program is distributed in the hope that it will be useful,
 
15
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
16
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
17
 *  GNU General Public License for more details.
 
18
 *
 
19
 *  You should have received a copy of the GNU General Public License
 
20
 *  along with this program; if not, write to the Free Software
 
21
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
22
 */
 
23
 
 
24
#include "config.h"
 
25
#include "drizzled/algorithm/crc32.h"
 
26
#include "drizzled/gettext.h"
 
27
#include "drizzled/replication_services.h"
 
28
 
 
29
#include <sys/types.h>
 
30
#include <sys/stat.h>
 
31
#include <fcntl.h>
 
32
#include <string>
 
33
#include <fstream>
 
34
#include <unistd.h>
 
35
 
 
36
#if TIME_WITH_SYS_TIME
 
37
# include <sys/time.h>
 
38
# include <time.h>
 
39
#else
 
40
# if HAVE_SYS_TIME_H
 
41
#  include <sys/time.h>
 
42
# else
 
43
#  include <time.h>
 
44
# endif
 
45
#endif
 
46
 
 
47
#include <drizzled/message/transaction.pb.h>
 
48
 
 
49
#include <google/protobuf/io/coded_stream.h>
 
50
#include <google/protobuf/io/zero_copy_stream_impl.h>
 
51
 
 
52
#include "drizzled/gettext.h"
 
53
 
 
54
/** 
 
55
 * @file Example script for writing transactions to a log file.
 
56
 */
 
57
 
 
58
using namespace std;
 
59
using namespace drizzled;
 
60
using namespace google;
 
61
 
 
62
static uint32_t server_id= 1;
 
63
static uint64_t transaction_id= 1;
 
64
 
 
65
static uint64_t getNanoTimestamp()
 
66
{
 
67
#ifdef HAVE_CLOCK_GETTIME
 
68
  struct timespec tp;
 
69
  clock_gettime(CLOCK_REALTIME, &tp);
 
70
  return (uint64_t) tp.tv_sec * 10000000
 
71
       + (uint64_t) tp.tv_nsec;
 
72
#else
 
73
  struct timeval tv;
 
74
  gettimeofday(&tv,NULL);
 
75
  return (uint64_t) tv.tv_sec * 10000000
 
76
       + (uint64_t) tv.tv_usec * 1000;
 
77
#endif
 
78
}
 
79
 
 
80
static void initTransactionContext(message::Transaction &transaction)
 
81
{
 
82
  message::TransactionContext *ctx= transaction.mutable_transaction_context();
 
83
  ctx->set_transaction_id(transaction_id++);
 
84
  ctx->set_start_timestamp(getNanoTimestamp());
 
85
  ctx->set_server_id(server_id);
 
86
}
 
87
 
 
88
static void finalizeTransactionContext(message::Transaction &transaction)
 
89
{
 
90
  message::TransactionContext *ctx= transaction.mutable_transaction_context();
 
91
  ctx->set_end_timestamp(getNanoTimestamp());
 
92
}
 
93
 
 
94
static void doCreateTable1(message::Transaction &transaction)
 
95
{
 
96
  message::Statement *statement= transaction.add_statement();
 
97
 
 
98
  statement->set_type(message::Statement::RAW_SQL);
 
99
  statement->set_sql("CREATE TABLE t1 (a VARCHAR(32) NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
 
100
  statement->set_start_timestamp(getNanoTimestamp());
 
101
  statement->set_end_timestamp(getNanoTimestamp());
 
102
}
 
103
 
 
104
static void doCreateTable2(message::Transaction &transaction)
 
105
{
 
106
  message::Statement *statement= transaction.add_statement();
 
107
 
 
108
  statement->set_type(message::Statement::RAW_SQL);
 
109
  statement->set_sql("CREATE TABLE t2 (a INTEGER NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
 
110
  statement->set_start_timestamp(getNanoTimestamp());
 
111
  statement->set_end_timestamp(getNanoTimestamp());
 
112
}
 
113
 
 
114
static void doCreateTable3(message::Transaction &transaction)
 
115
{
 
116
  message::Statement *statement= transaction.add_statement();
 
117
 
 
118
  statement->set_type(message::Statement::RAW_SQL);
 
119
  statement->set_sql("CREATE TABLE t3 (a INTEGER NOT NULL, b BLOB NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
 
120
  statement->set_start_timestamp(getNanoTimestamp());
 
121
  statement->set_end_timestamp(getNanoTimestamp());
 
122
}
 
123
 
 
124
static void doSimpleInsert(message::Transaction &transaction)
 
125
{
 
126
  message::Statement *statement= transaction.add_statement();
 
127
 
 
128
  /* Do generic Statement setup */
 
129
  statement->set_type(message::Statement::INSERT);
 
130
  statement->set_sql("INSERT INTO t1 (a) VALUES (\"1\"), (\"2\")");
 
131
  statement->set_start_timestamp(getNanoTimestamp());
 
132
 
 
133
  /* Do INSERT-specific header and setup */
 
134
  message::InsertHeader *header= statement->mutable_insert_header();
 
135
 
 
136
  /* Add table and field metadata for the statement */
 
137
  message::TableMetadata *t_meta= header->mutable_table_metadata();
 
138
  t_meta->set_schema_name("test");
 
139
  t_meta->set_table_name("t1");
 
140
 
 
141
  message::FieldMetadata *f_meta= header->add_field_metadata();
 
142
  f_meta->set_name("a");
 
143
  f_meta->set_type(message::Table::Field::VARCHAR);
 
144
 
 
145
  /* Add new values... */
 
146
  message::InsertData *data= statement->mutable_insert_data();
 
147
  data->set_segment_id(1);
 
148
  data->set_end_segment(true);
 
149
 
 
150
  message::InsertRecord *record1= data->add_record();
 
151
  message::InsertRecord *record2= data->add_record();
 
152
 
 
153
  record1->add_insert_value("1");
 
154
  record2->add_insert_value("2");
 
155
 
 
156
  statement->set_end_timestamp(getNanoTimestamp());
 
157
}
 
158
 
 
159
static void doNonVarcharInsert(message::Transaction &transaction)
 
160
{
 
161
  message::Statement *statement= transaction.add_statement();
 
162
 
 
163
  /* Do generic Statement setup */
 
164
  statement->set_type(message::Statement::INSERT);
 
165
  statement->set_sql("INSERT INTO t2 (a) VALUES (1), (2)");
 
166
  statement->set_start_timestamp(getNanoTimestamp());
 
167
 
 
168
  /* Do INSERT-specific header and setup */
 
169
  message::InsertHeader *header= statement->mutable_insert_header();
 
170
 
 
171
  /* Add table and field metadata for the statement */
 
172
  message::TableMetadata *t_meta= header->mutable_table_metadata();
 
173
  t_meta->set_schema_name("test");
 
174
  t_meta->set_table_name("t2");
 
175
 
 
176
  message::FieldMetadata *f_meta= header->add_field_metadata();
 
177
  f_meta->set_name("a");
 
178
  f_meta->set_type(message::Table::Field::INTEGER);
 
179
 
 
180
  /* Add new values... */
 
181
  message::InsertData *data= statement->mutable_insert_data();
 
182
  data->set_segment_id(1);
 
183
  data->set_end_segment(true);
 
184
 
 
185
  message::InsertRecord *record1= data->add_record();
 
186
  message::InsertRecord *record2= data->add_record();
 
187
 
 
188
  record1->add_insert_value("1");
 
189
  record2->add_insert_value("2");
 
190
 
 
191
  statement->set_end_timestamp(getNanoTimestamp());
 
192
}
 
193
 
 
194
static void doBlobInsert(message::Transaction &transaction)
 
195
{
 
196
  message::Statement *statement= transaction.add_statement();
 
197
 
 
198
  /* Do generic Statement setup */
 
199
  statement->set_type(message::Statement::INSERT);
 
200
  statement->set_sql("INSERT INTO t3 (a, b) VALUES (1, 'test\0me')", 43); /* 43 == length including \0 */
 
201
  statement->set_start_timestamp(getNanoTimestamp());
 
202
 
 
203
  /* Do INSERT-specific header and setup */
 
204
  message::InsertHeader *header= statement->mutable_insert_header();
 
205
 
 
206
  /* Add table and field metadata for the statement */
 
207
  message::TableMetadata *t_meta= header->mutable_table_metadata();
 
208
  t_meta->set_schema_name("test");
 
209
  t_meta->set_table_name("t3");
 
210
 
 
211
  message::FieldMetadata *f_meta= header->add_field_metadata();
 
212
  f_meta->set_name("a");
 
213
  f_meta->set_type(message::Table::Field::INTEGER);
 
214
 
 
215
  f_meta= header->add_field_metadata();
 
216
  f_meta->set_name("b");
 
217
  f_meta->set_type(message::Table::Field::BLOB);
 
218
 
 
219
  /* Add new values... */
 
220
  message::InsertData *data= statement->mutable_insert_data();
 
221
  data->set_segment_id(1);
 
222
  data->set_end_segment(true);
 
223
 
 
224
  message::InsertRecord *record1= data->add_record();
 
225
 
 
226
  record1->add_insert_value("1");
 
227
  record1->add_insert_value("test\0me", 7); /* 7 == length including \0 */
 
228
 
 
229
  statement->set_end_timestamp(getNanoTimestamp());
 
230
}
 
231
 
 
232
static void doSimpleDelete(message::Transaction &transaction)
 
233
{
 
234
  message::Statement *statement= transaction.add_statement();
 
235
 
 
236
  /* Do generic Statement setup */
 
237
  statement->set_type(message::Statement::DELETE);
 
238
  statement->set_sql("DELETE FROM t1 WHERE a = \"1\"");
 
239
  statement->set_start_timestamp(getNanoTimestamp());
 
240
 
 
241
  /* Do DELETE-specific header and setup */
 
242
  message::DeleteHeader *header= statement->mutable_delete_header();
 
243
 
 
244
  /* Add table and field metadata for the statement */
 
245
  message::TableMetadata *t_meta= header->mutable_table_metadata();
 
246
  t_meta->set_schema_name("test");
 
247
  t_meta->set_table_name("t1");
 
248
 
 
249
  message::FieldMetadata *f_meta= header->add_key_field_metadata();
 
250
  f_meta->set_name("a");
 
251
  f_meta->set_type(message::Table::Field::VARCHAR);
 
252
 
 
253
  /* Add new values... */
 
254
  message::DeleteData *data= statement->mutable_delete_data();
 
255
  data->set_segment_id(1);
 
256
  data->set_end_segment(true);
 
257
 
 
258
  message::DeleteRecord *record1= data->add_record();
 
259
 
 
260
  record1->add_key_value("1");
 
261
 
 
262
  statement->set_end_timestamp(getNanoTimestamp());
 
263
}
 
264
 
 
265
static void doSimpleUpdate(message::Transaction &transaction)
 
266
{
 
267
  message::Statement *statement= transaction.add_statement();
 
268
 
 
269
  /* Do generic Statement setup */
 
270
  statement->set_type(message::Statement::UPDATE);
 
271
  statement->set_sql("UPDATE t1 SET a = \"5\" WHERE a = \"1\"");
 
272
  statement->set_start_timestamp(getNanoTimestamp());
 
273
 
 
274
  /* Do UPDATE-specific header and setup */
 
275
  message::UpdateHeader *header= statement->mutable_update_header();
 
276
 
 
277
  /* Add table and field metadata for the statement */
 
278
  message::TableMetadata *t_meta= header->mutable_table_metadata();
 
279
  t_meta->set_schema_name("test");
 
280
  t_meta->set_table_name("t1");
 
281
 
 
282
  message::FieldMetadata *kf_meta= header->add_key_field_metadata();
 
283
  kf_meta->set_name("a");
 
284
  kf_meta->set_type(message::Table::Field::VARCHAR);
 
285
 
 
286
  message::FieldMetadata *sf_meta= header->add_set_field_metadata();
 
287
  sf_meta->set_name("a");
 
288
  sf_meta->set_type(message::Table::Field::VARCHAR);
 
289
 
 
290
  /* Add new values... */
 
291
  message::UpdateData *data= statement->mutable_update_data();
 
292
  data->set_segment_id(1);
 
293
  data->set_end_segment(true);
 
294
 
 
295
  message::UpdateRecord *record1= data->add_record();
 
296
 
 
297
  record1->add_after_value("5");
 
298
  record1->add_key_value("1");
 
299
 
 
300
  statement->set_end_timestamp(getNanoTimestamp());
 
301
}
 
302
 
 
303
static void doMultiKeyUpdate(message::Transaction &transaction)
 
304
{
 
305
  message::Statement *statement= transaction.add_statement();
 
306
 
 
307
  /* Do generic Statement setup */
 
308
  statement->set_type(message::Statement::UPDATE);
 
309
  statement->set_sql("UPDATE t1 SET a = \"5\"");
 
310
  statement->set_start_timestamp(getNanoTimestamp());
 
311
 
 
312
  /* Do UPDATE-specific header and setup */
 
313
  message::UpdateHeader *header= statement->mutable_update_header();
 
314
 
 
315
  /* Add table and field metadata for the statement */
 
316
  message::TableMetadata *t_meta= header->mutable_table_metadata();
 
317
  t_meta->set_schema_name("test");
 
318
  t_meta->set_table_name("t1");
 
319
 
 
320
  message::FieldMetadata *kf_meta= header->add_key_field_metadata();
 
321
  kf_meta->set_name("a");
 
322
  kf_meta->set_type(message::Table::Field::VARCHAR);
 
323
 
 
324
  message::FieldMetadata *sf_meta= header->add_set_field_metadata();
 
325
  sf_meta->set_name("a");
 
326
  sf_meta->set_type(message::Table::Field::VARCHAR);
 
327
 
 
328
  /* Add new values... */
 
329
  message::UpdateData *data= statement->mutable_update_data();
 
330
  data->set_segment_id(1);
 
331
  data->set_end_segment(true);
 
332
 
 
333
  message::UpdateRecord *record1= data->add_record();
 
334
  message::UpdateRecord *record2= data->add_record();
 
335
 
 
336
  record1->add_after_value("5");
 
337
  record1->add_key_value("1");
 
338
  record2->add_after_value("5");
 
339
  record2->add_key_value("2");
 
340
 
 
341
  statement->set_end_timestamp(getNanoTimestamp());
 
342
}
 
343
 
 
344
static void writeTransaction(protobuf::io::CodedOutputStream *output, message::Transaction &transaction)
 
345
{
 
346
  std::string buffer("");
 
347
  finalizeTransactionContext(transaction);
 
348
  transaction.SerializeToString(&buffer);
 
349
 
 
350
  size_t length= buffer.length();
 
351
 
 
352
  output->WriteLittleEndian32(static_cast<uint32_t>(ReplicationServices::TRANSACTION));
 
353
  output->WriteLittleEndian32(static_cast<uint32_t>(length));
 
354
  output->WriteString(buffer);
 
355
  output->WriteLittleEndian32(drizzled::algorithm::crc32(buffer.c_str(), length)); /* checksum */
 
356
}
 
357
 
 
358
int main(int argc, char* argv[])
 
359
{
 
360
  GOOGLE_PROTOBUF_VERIFY_VERSION;
 
361
  int file;
 
362
 
 
363
  if (argc != 2) 
 
364
  {
 
365
    fprintf(stderr, _("Usage: %s TRANSACTION_LOG\n"), argv[0]);
 
366
    return -1;
 
367
  }
 
368
 
 
369
  if ((file= open(argv[1], O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU)) == -1)
 
370
  {
 
371
    fprintf(stderr, _("Cannot open file: %s\n"), argv[1]);
 
372
    return -1;
 
373
  }
 
374
 
 
375
  protobuf::io::ZeroCopyOutputStream *raw_output= new protobuf::io::FileOutputStream(file);
 
376
  protobuf::io::CodedOutputStream *coded_output= new protobuf::io::CodedOutputStream(raw_output);
 
377
 
 
378
  /* Write a series of statements which test each type of Statement */
 
379
  message::Transaction transaction;
 
380
 
 
381
  /* Simple CREATE TABLE statements as raw sql */
 
382
  initTransactionContext(transaction);
 
383
  doCreateTable1(transaction);
 
384
  writeTransaction(coded_output, transaction);
 
385
  transaction.Clear();
 
386
 
 
387
  initTransactionContext(transaction);
 
388
  doCreateTable2(transaction);
 
389
  writeTransaction(coded_output, transaction);
 
390
  transaction.Clear();
 
391
 
 
392
  /* Simple INSERT statement */
 
393
  initTransactionContext(transaction);
 
394
  doSimpleInsert(transaction);
 
395
  writeTransaction(coded_output, transaction);
 
396
  transaction.Clear();
 
397
 
 
398
  /* Write a DELETE and an UPDATE in one transaction */
 
399
  initTransactionContext(transaction);
 
400
  doSimpleDelete(transaction);
 
401
  doSimpleUpdate(transaction);
 
402
  writeTransaction(coded_output, transaction);
 
403
  transaction.Clear();
 
404
 
 
405
  /* Test an INSERT into non-varchar columns */
 
406
  initTransactionContext(transaction);
 
407
  doNonVarcharInsert(transaction);
 
408
  writeTransaction(coded_output, transaction);
 
409
  transaction.Clear();
 
410
 
 
411
  /* Write an UPDATE which affects >1 row */
 
412
  initTransactionContext(transaction);
 
413
  doMultiKeyUpdate(transaction);
 
414
  writeTransaction(coded_output, transaction);
 
415
  transaction.Clear();
 
416
 
 
417
  /* Write an INSERT which writes BLOB data */
 
418
  initTransactionContext(transaction);
 
419
  doCreateTable3(transaction);
 
420
  doBlobInsert(transaction);
 
421
  writeTransaction(coded_output, transaction);
 
422
  transaction.Clear();
 
423
 
 
424
  delete coded_output;
 
425
  delete raw_output;
 
426
 
 
427
  return 0;
 
428
}