~krummas/rabbitreplication/trunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
package drizzled.message;
option optimize_for = SPEED;
option java_package = "org.drizzle.messages";
option java_outer_classname = "TransactionMessage";

import "table.proto";
import "schema.proto";

/*
 * @file
 *
 * This file contains definitions for protobuffer messages that
 * are involved in Drizzle's replication system.
 *
 * @note
 *
 * We do not discuss implementation specifics about how messages
 * are transmitted across the wire in this documentation because a
 * discussion about those implementation details is immaterial to the
 * definition of the messages used in replication.
 *
 * For a discussion on the implementation of how messages are transmitted
 * to replicas, see the plugins which implement the replication system.
 *
 * @details
 *
 * Messages defined in this file are related in the following ways:
 *
 * ------------------------------------------------------------------
 * |                                                                |
 * | Transaction message                                            |
 * |                                                                |
 * |   -----------------------------------------------------------  |
 * |   |                                                         |  |
 * |   | TransactionContext message                              |  |
 * |   |                                                         |  |
 * |   -----------------------------------------------------------  |
 * |   -----------------------------------------------------------  |
 * |   |                                                         |  |
 * |   | Statement message 1                                     |  |
 * |   |                                                         |  |
 * |   -----------------------------------------------------------  |
 * |   -----------------------------------------------------------  |
 * |   |                                                         |  |
 * |   | Statement message 2                                     |  |
 * |   |                                                         |  |
 * |   -----------------------------------------------------------  |
 * |                             ...                                |
 * |   -----------------------------------------------------------  |
 * |   |                                                         |  |
 * |   | Statement message N                                     |  |
 * |   |                                                         |  |
 * |   -----------------------------------------------------------  |
 * ------------------------------------------------------------------
 *
 * with each Statement message looking like so:
 *
 * ------------------------------------------------------------------
 * |                                                                |
 * | Statement message                                              |
 * |                                                                |
 * |   -----------------------------------------------------------  |
 * |   |                                                         |  |
 * |   | Common information                                      |  |
 * |   |                                                         |  |
 * |   |  - Type of Statement (INSERT, DELETE, etc)              |  |
 * |   |  - Start Timestamp                                      |  |
 * |   |  - End Timestamp                                        |  |
 * |   |  - (OPTIONAL) Actual SQL query string                   |  |
 * |   |                                                         |  |
 * |   -----------------------------------------------------------  |
 * |   -----------------------------------------------------------  |
 * |   |                                                         |  |
 * |   | Statement subclass message 1 (see below)                |  |
 * |   |                                                         |  |
 * |   -----------------------------------------------------------  |
 * |                             ...                                |
 * |   -----------------------------------------------------------  |
 * |   |                                                         |  |
 * |   | Statement subclass message N (see below)                |  |
 * |   |                                                         |  |
 * |   -----------------------------------------------------------  |
 * ------------------------------------------------------------------
 *
 * The Transaction Message
 * =======================
 *
 * The main "envelope" message which represents an atomic transaction
 * which changed the state of a server is the Transaction message class.
 *
 * The Transaction message contains two pieces:
 *
 *   1) A TransactionContext message containing information about the
 *      transaction as a whole, such as the ID of the executing server, 
 *      the start and end timestamp of the transaction, and a globally-
 *      unique identifier for the transaction.
 *
 *   2) A vector of Statement messages representing the distinct SQL
 *      statements which modified the state of the server.  The Statement
 *      message is, itself, a generic envelope message containing a
 *      sub-message which describes the specific data modification which
 *      occurred on the server (such as, for instance, an INSERT statement.
 *
 * The Statement Message
 * =====================
 *
 * The generic "envelope" message containing information common to each
 * SQL statement executed against a server (such as a start and end timestamp
 * and the type of the SQL statement) as well as a Statement subclass message
 * describing the specific data modification event on the server.
 *
 * Each Statement message contains a type member which indicates how readers
 * of the Statement should construct the inner Statement subclass representing
 * a data change.
 *
 * For example, suppose we have have read a Transaction message and have executed
 * the following code:
 *
 * @code
 *
 * message::Statement &statement= transaction.statement(0); // Get first statement
 * 
 * switch (statement.type())
 * {
 *   case Statement::INSERT:
 *   {
 *     // Assume this is where we land...so, we know that the Statement
 *     // message represents an INSERT SQL statement.  We must now get the
 *     // specific information describing the INSERT statement.
 *     //
 *     // We assume here a simple INSERT statement that does not represent
 *     // a bulk operation.
 *     message::InsertHeader &insert_header= statement.insert_header();
 *     message::TableMetadata &table_metadata= insert_header.table_metadata();
 *     
 *     // Grab table name and echo out as start of INSERT SQL string...
 *     cout << "INSERT INTO `" << table_metadata.schema_name();
 *     cout << "`.`" << table_metadata.table_name() << "` (";
 *
 *     // Add field list to SQL string...
 *     uint32_t num_fields= insert_header.field_metadata_size();
 *     uint32_t x, y;
 *
 *     for (x= 0; x < num_fields; ++x)
 *     {
 *       message::FieldMetadata &field_metadata= insert_header.field_metadata(x);
 *       if (x != 0)
 *         cout << ',';
 *       cout << "`" << field_metadata.name() << "`";
 *     }
 *
 *     cout << ") VALUES (";
 *
 *     // Add insert values
 *     message::InsertData &insert_data= statement.insert_data();
 *     uint32_t num_records= insert_data.record_size();
 *     
 *     for (x= 0; x < num_records; ++x)
 *     {
 *       if (x != 0)
 *         cout << "),(";
 *       for (y= 0; y < num_fields; ++y)
 *       {
 *         if (y != 0)
 *           cout << ',';
 *         cout << "'" << insert_data.record(x).insert_value(y) << "'";
 *       }
 *     }
 *     cout << ")";
 *   }
 *   ...
 * }
 *
 * @endcode
 * 
 * How Bulk Operations Work
 * ========================
 *
 * Certain operations which change large volumes of data on a server
 * present a specific set of problems for a transaction coordinator or
 * replication service. If all operations must complete atomically on a
 * publishing server before replicas are delivered the complete
 * transactional unit:
 *
 * 1) The publishing server could consume a large amount of memory
 *    building an in-memory Transaction message containing all the
 *    operations contained  in the entire transaction.
 * 
 * 2) A replica, or subscribing server, is wasting time waiting on the
 *    eventual completion (commit) of the large transaction on the
 *    publishing server. It could be applying pieces of the large
 *    transaction in the meantime...
 *
 * In order to prevent the problems inherent in 1) and 2) above, Drizzle's 
 * replication system uses a mechanism which provides bulk change
 * operations.
 *
 * When a regular SQL statement modifies or inserts more rows than a
 * certain threshold, Drizzle's replication services component will begin
 * sending Transaction messages to replicas which contain a chunk
 * (or "segment") of the data which has been changed on the publisher.
 *
 * When data is inserted, updated, or modified in the database, a
 * header containing information about modified tables and fields is 
 * matched with one or more data segments which contain the actual
 * values changed in the statement.
 *
 * It's easiest to understand this mechanism by following through a real-world
 * scenario.
 *
 * Suppose the following table:
 *
 * CREATE TABLE test.person
 * (
 *   id INT NOT NULL AUTO_INCREMENT PRIMARY KEY
 * , first_name VARCHAR(50)
 * , last_name VARCHAR(50)
 * , is_active CHAR(1) NOT NULL DEFAULT 'Y'
 * );
 *
 * Also suppose that test.t1 contains <strong>1 million records</strong>.
 *
 * Next, suppose a client issues the SQL statement:
 *
 * UPDATE test.person SET is_active = 'N';
 * 
 * It is clear that one million records could be updated by this statement 
 * (we say, "could be" since Drizzle does not actually update a record if 
 * the UPDATE would not change the existing record...).
 *
 * In order to prevent the publishing server from having to construct an 
 * enormous Transaction message, Drizzle's replication services component 
 * will do the following:
 *
 * 1) Construct a Transaction message with a transaction context containing
 *    information about the originating server, the transaction ID, and 
 *    timestamp information.
 *
 * 2) Construct an UpdateHeader message with information about the tables
 *    and fields involved in the UPDATE statement.  Push this UpdateHeader
 *    message onto the Transaction message's statement vector.
 *
 * 3) Construct an UpdateData message.  Set the segment_id member to 1.
 *    Set the end_segment member to true.
 *
 * 4) For every record updated in a storage engine, the ReplicationServices
 *    component builds a new UpdateRecord message and appends this message
 *    to the aforementioned UpdateData message's record vector.
 *
 * 5) After a certain threshold of records is reached, the
 *    ReplicationServices component sets the current UpdateData message's
 *    end_segment member to false, and proceeds to send the Transaction
 *    message to replicators.
 *
 * 6) The ReplicationServices component then constructs a new Transaction
 *    message and constructs a transaction context with the same
 *    transaction ID and server information.
 *
 * 7) A new UpdateData message is created.  The message's segment_id is 
 *    set to N+1 and as new records are updated, new UpdateRecord messages
 *    are appended to the UpdateData message's record vector.
 *
 * 8) While records are being updated, we repeat steps 5 through 7, with 
 *    only the final UpdateData message having its end_segment member set
 *    to true.
 *
 * Handling ROLLBACKs
 * ==================
 *
 * Any time a transaction is rolled back, a single Transaction message is
 * containing a Statement with type = ROLLBACK is sent to replicators.  
 *
 * What the replicator does with this information depends on the
 * replicator.  For most, only rollbacks of bulk operations will actually
 * trigger any real action on a replica, since non-bulk operations won't
 * have changed any data on a replica and the ROLLBACK is only sent to a 
 * replica to notify it of a transaction being rolled back (so that the 
 * replica can understand transaction ID sequence gaps...)
 */

/*
 * Some minimal information transferred in the header of Statement
 * submessage classes which identifies metadata about a specific
 * field involved in a Statemet.
 */
message FieldMetadata
{
  required Table.Field.FieldType type = 1; /* Type of the field */
  required string name = 2; /* Name of the field */
}

/*
 * Minimal information transferred in the header of Statement submessage
 * classes which identifies metadata about the schema objects being
 * modified in a Statement.
 */
message TableMetadata
{
  required string schema_name = 1; /* Name of the containing schema */
  required string table_name = 2; /* Name of the table */
}

/*
  Context for a transaction.
*/
message TransactionContext
{
  required uint32 server_id = 1; /* Unique identifier of a server */
  required uint64 transaction_id = 2; /* Globally-unique transaction ID */
  required uint64 start_timestamp = 3; /* Timestamp of when the transaction started */
  required uint64 end_timestamp = 4; /* Timestamp of when the transaction ended */
}

/*
 * Represents a single record being inserted into a single table.
 *
 * @note 
 *
 * An INSERT Statement contains one or more InsertRecord submessages, each
 * of which represents a single record being inserted into a table.
 */
message InsertRecord
{
  repeated bytes insert_value = 1;
}

/*
 * Represents statements which insert data into the database:
 *
 * INSERT
 * INSERT SELECT
 * LOAD DATA INFILE
 * REPLACE (is a delete and an insert)
 *
 * The statement is composed of a header (InsertHeader) containing
 * metadata about affected tables and fields, as well as one or more data
 * segments (InsertData) containing the actual records
 * being inserted.
 *
 * @note
 *
 * Bulk insert operations will have >1 data segment, with the last data
 * segment having its end_segment member set to true.
 */
message InsertHeader
{
  required TableMetadata table_metadata = 1; /* Minimal metadata about the table affected */
  repeated FieldMetadata field_metadata = 2; /* Collection of metadata about fields affected */
}

message InsertData
{
  required uint32 segment_id = 1; /* The segment number */
  required bool end_segment = 2; /* Is this the final segment? */
  repeated InsertRecord record = 3; /* The records inserted */
}

/*
 * Represents a single record being updated in a single table.
 *
 * @note 
 *
 * An UPDATE Statement contains one or more UpdateRecord submessages, each
 * of which represents a single record being updated in a table.
 */
message UpdateRecord
{
  repeated bytes key_value = 1; /* The value of keys of updated records (unique or primary key) */
  repeated bytes after_value = 2; /* The value of the field after the update */
  repeated bytes before_value = 3; /* The value of the field before the update (optional) */
}

/*
 * Represents statements which update data in the database:
 *
 * INSERT ... ON DUPLICATE KEY UPDATE
 * UPDATE
 * REPLACE INTO when the UPDATE optimization occurs.
 *
 * The statement is composed of a header (UpdateHeader) containing
 * metadata about affected tables and fields, as well as one or more data
 * segments (UpdateData) containing the actual records
 * being updateed.
 *
 * @note
 *
 * Bulk update operations will have >1 data segment, with the last data
 * segment having its end_segment member set to true.
 */
message UpdateHeader
{
  required TableMetadata table_metadata = 1; /* Minimal metadata about the table affected */
  repeated FieldMetadata key_field_metadata = 2; /* Collection of metadata about key fields */
  repeated FieldMetadata set_field_metadata = 3; /* Collection of metadata about fields affected */
}

message UpdateData
{
  required uint32 segment_id = 1; /* The segment number */
  required bool end_segment = 2; /* Is this the final segment? */
  repeated UpdateRecord record = 3; /* Collection of same size as above metadata containing the
                                       values of all records of all the fields being updated. */
}

/*
 * Represents a single record being deleted in a single table.
 *
 * @note 
 *
 * A DELETE Statement contains one or more DeleteRecord submessages, each
 * of which represents a single record being delete from a table.
 */
message DeleteRecord
{
  repeated bytes key_value = 1;
}

/*
 * Represents statements which delete data from the database:
 *
 * DELETE
 * REPLACE (is a delete and an insert)
 *
 * The statement is composed of a header (DeleteHeader) containing
 * metadata about affected tables and fields, as well as one or more data
 * segments (DeleteData) containing the actual records
 * being deleteed.
 *
 * @note
 *
 * Bulk delete operations will have >1 data segment, with the last data
 * segment having its end_segment member set to true.
 */
message DeleteHeader
{
  required TableMetadata table_metadata = 1; /* Minimal metadata about the table affected */
  repeated FieldMetadata key_field_metadata = 2; /* Collection of metadata about key fields */
}

message DeleteData
{
  required uint32 segment_id = 1; /* The segment number */
  required bool end_segment = 2; /* Is this the final segment? */
  repeated DeleteRecord record = 3; /* Collection of same size as above metadata containing the
                                       values of all records of all the fields being deleted. */
}

/*
 * Represents a TRUNCATE TABLE statement
 */
message TruncateTableStatement
{
  required TableMetadata table_metadata = 1; /* Metadata about table to truncate */
}

/*
 * Represents a CREATE SCHEMA statement
 */
message CreateSchemaStatement
{
  required Schema schema = 1; /* Definition of new schema */
}

/*
 * Represents an ALTER SCHEMA statement
 */
message AlterSchemaStatement
{
  required Schema before = 1; /* Definition of old schema */
  required Schema after = 2; /* Definition of new schema */
}

/*
 * Represents a DROP SCHEMA statement
 */
message DropSchemaStatement
{
  required string schema_name = 1; /* Name of the schema to drop */
}

/*
 * Represents a CREATE TABLE statement.
 */
message CreateTableStatement
{
  required Table table = 1; /* The full table definition for the new table */
}

/*
 * Represents an ALTER TABLE statement.
 */
message AlterTableStatement
{
  required Table before = 1; /* The prior full table definition */
  required Table after = 2; /* The new full table definition */
}

/*
 * Represents a DROP TABLE statement
 */
message DropTableStatement
{
  required TableMetadata table_metadata = 1; /* Minimal metadata about the table to be dropped */
}

/*
 * Represents a SET statement
 *
 * @note
 *
 * This is constructed only for changes in a global variable.  For changes
 * to a session-local variable, the variable's contents are not transmitted
 * as Statement messages.
 */
message SetVariableStatement
{
  required FieldMetadata variable_metadata = 1; /* Metadata about the variable to set */
  required bytes variable_value = 2; /* Value to set variable */
}

/*
 * Base message class for a Statement.  A Transaction is composed of
 * one or more Statement messages.  The transaction message represents
 * a single atomic unit of change in the server's state.  Depending on 
 * whether the server/connection is in AUTOCOMMIT mode or not, a 
 * Transaction message may contain one or more than one Statement message.
 */
message Statement
{
  enum Type
  {
    ROLLBACK = 0; /* A ROLLBACK indicator */
    INSERT = 1; /* An INSERT statement */
    DELETE = 2; /* A DELETE statement */
    UPDATE = 3; /* An UPDATE statement */
    TRUNCATE_TABLE = 4; /* A TRUNCATE TABLE statement */
    CREATE_SCHEMA = 5; /* A CREATE SCHEMA statement */
    ALTER_SCHEMA = 6; /* An ALTER SCHEMA statement */
    DROP_SCHEMA = 7; /* A DROP SCHEMA statement */
    CREATE_TABLE = 8; /* A CREATE TABLE statement */
    ALTER_TABLE = 9; /* An ALTER TABLE statement */
    DROP_TABLE = 10; /* A DROP TABLE statement */
    SET_VARIABLE = 98; /* A SET statement */
    RAW_SQL = 99; /* A raw SQL statement */
  }
  required Type type = 1; /* The type of the Statement */
  required uint64 start_timestamp = 2; /* Nanosecond precision timestamp of when the 
                                          Statement was started on the server */
  required uint64 end_timestamp = 3; /* Nanosecond precision timestamp of when the
                                        Statement finished executing on the server */
  optional string sql = 4; /* May contain the original SQL string */
  
  /*
   * Each Statement message may contain one or more of
   * the below sub-messages, depending on the Statement's type.
   */
  optional InsertHeader insert_header = 5;
  optional InsertData insert_data = 6;
  optional UpdateHeader update_header = 7;
  optional UpdateData update_data = 8;
  optional DeleteHeader delete_header = 9;
  optional DeleteData delete_data = 10;
  optional TruncateTableStatement truncate_table_statement = 11;
  optional CreateSchemaStatement create_schema_statement = 12;
  optional DropSchemaStatement drop_schema_statement = 13;
  optional AlterSchemaStatement alter_schema_statement = 14;
  optional CreateTableStatement create_table_statement = 15;
  optional AlterTableStatement alter_table_statement = 16;
  optional DropTableStatement drop_table_statement = 17;  
  optional SetVariableStatement set_variable_statement = 18;  
}

/*
 * Represents a collection of Statement messages that
 * has been executed atomically on a server.
 *
 * @note
 *
 * For bulk operations, the transaction may contain only
 * a data segment, and the atomic guarantee can only be enforced by XA.
 */
message Transaction
{
  required TransactionContext transaction_context = 1;
  repeated Statement statement = 2;
}