1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
* ndbapi_event.cpp: Using API level events in NDB API
19
* Classes and methods used in this example:
21
* Ndb_cluster_connection
28
* createEventOperation()
29
* dropEventOperation()
37
* NdbDictionary::Event
57
#include <my_global.h>
65
* Assume that there is a table which is being updated by
66
* another process (e.g. flexBench -l 0 -stdtables).
67
* We want to monitor what happens with column values.
69
* Or using the mysql client:
71
* shell> mysql -u root
72
* mysql> create database TEST_DB;
74
* mysql> create table t0
75
* (c0 int, c1 int, c2 char(4), c3 char(4), c4 text,
76
* primary key(c0, c2)) engine ndb charset latin1;
78
* In another window start ndbapi_event, wait until properly started
80
insert into t0 values (1, 2, 'a', 'b', null);
81
insert into t0 values (3, 4, 'c', 'd', null);
82
update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk
83
update t0 set c3 = 'f'; -- use scan
84
update t0 set c3 = 'F'; -- use scan update to 'same'
85
update t0 set c2 = 'g' where c0 = 1; -- update pk part
86
update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same'
87
update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK
90
insert ...; update ...; -- see events w/ same pk merged (if -m option)
91
delete ...; insert ...; -- there are 5 combinations ID IU DI UD UU
92
update ...; update ...;
94
-- text requires -m flag
95
set @a = repeat('a',256); -- inline size
96
set @b = repeat('b',2000); -- part size
97
set @c = repeat('c',2000*30); -- 30 parts
99
-- update the text field using combinations of @a, @b, @c ...
101
* you should see the data popping up in the example window
105
#define APIERROR(error) \
106
{ std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \
107
<< error.code << ", msg: " << error.message << "." << std::endl; \
110
int myCreateEvent(Ndb* myNdb,
111
const char *eventName,
112
const char *eventTableName,
113
const char **eventColumnName,
114
const int noEventColumnName,
117
int main(int argc, char** argv)
121
std::cout << "Arguments are <connect_string cluster> <timeout> [m(merge events)|d(debug)].\n";
124
const char *connectstring = argv[1];
125
int timeout = atoi(argv[2]);
127
bool merge_events = argc > 3 && strchr(argv[3], 'm') != 0;
129
bool dbug = argc > 3 && strchr(argv[3], 'd') != 0;
130
if (dbug) DBUG_PUSH("d:t:");
131
if (dbug) putenv("API_SIGNAL_LOG=-");
134
Ndb_cluster_connection *cluster_connection=
135
new Ndb_cluster_connection(connectstring); // Object representing the cluster
137
int r= cluster_connection->connect(5 /* retries */,
138
3 /* delay between retries */,
143
<< "Cluster connect failed, possibly resolved with more retries.\n";
149
<< "Cluster connect failed.\n";
153
if (cluster_connection->wait_until_ready(30,30))
155
std::cout << "Cluster was not ready within 30 secs." << std::endl;
159
Ndb* myNdb= new Ndb(cluster_connection,
160
"TEST_DB"); // Object representing the database
162
if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());
164
const char *eventName= "CHNG_IN_t0";
165
const char *eventTableName= "t0";
166
const int noEventColumnName= 5;
167
const char *eventColumnName[noEventColumnName]=
183
// Normal values and blobs are unfortunately handled differently..
184
typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH;
188
while (j < timeout) {
190
// Start "transaction" for handling events
191
NdbEventOperation* op;
192
printf("create EventOperation\n");
193
if ((op = myNdb->createEventOperation(eventName)) == NULL)
194
APIERROR(myNdb->getNdbError());
195
op->mergeEvents(merge_events);
197
printf("get values\n");
198
RA_BH recAttr[noEventColumnName];
199
RA_BH recAttrPre[noEventColumnName];
200
// primary keys should always be a part of the result
201
for (i = 0; i < noEventColumnName; i++) {
203
recAttr[i].ra = op->getValue(eventColumnName[i]);
204
recAttrPre[i].ra = op->getPreValue(eventColumnName[i]);
205
} else if (merge_events) {
206
recAttr[i].bh = op->getBlobHandle(eventColumnName[i]);
207
recAttrPre[i].bh = op->getPreBlobHandle(eventColumnName[i]);
211
// set up the callbacks
213
// This starts changes to "start flowing"
215
APIERROR(op->getNdbError());
217
NdbEventOperation* the_op = op;
220
while (i < timeout) {
221
// printf("now waiting for event...\n");
222
int r = myNdb->pollEvents(1000); // wait for event or 1000 ms
224
// printf("got data! %d\n", r);
225
while ((op= myNdb->nextEvent())) {
226
assert(the_op == op);
228
switch (op->getEventType()) {
229
case NdbDictionary::Event::TE_INSERT:
230
printf("%u INSERT", i);
232
case NdbDictionary::Event::TE_DELETE:
233
printf("%u DELETE", i);
235
case NdbDictionary::Event::TE_UPDATE:
236
printf("%u UPDATE", i);
239
abort(); // should not happen
241
printf(" gci=%d\n", (int)op->getGCI());
242
for (k = 0; k <= 1; k++) {
243
printf(k == 0 ? "post: " : "pre : ");
244
for (l = 0; l < noEventColumnName; l++) {
246
NdbRecAttr* ra = k == 0 ? recAttr[l].ra : recAttrPre[l].ra;
247
if (ra->isNULL() >= 0) { // we have a value
248
if (ra->isNULL() == 0) { // we have a non-null value
250
printf("%-5u", ra->u_32_value());
252
printf("%-5.4s", ra->aRef());
254
printf("%-5s", "NULL");
256
printf("%-5s", "-"); // no value
257
} else if (merge_events) {
259
NdbBlob* bh = k == 0 ? recAttr[l].bh : recAttrPre[l].bh;
260
bh->getDefined(isNull);
261
if (isNull >= 0) { // we have a value
262
if (! isNull) { // we have a non-null value
264
bh->getLength(length);
266
unsigned char* buf = new unsigned char [length];
267
memset(buf, 'X', length);
269
bh->readData(buf, n); // n is in/out
275
unsigned char c = buf[i++];
277
while (i < n && buf[i] == c)
281
printf("%u%c", m, c);
287
printf("%-5s", "NULL");
289
printf("%-5s", "-"); // no value
296
printf("timed out (%i)\n", timeout);
298
// don't want to listen to events anymore
299
if (myNdb->dropEventOperation(the_op)) APIERROR(myNdb->getNdbError());
306
NdbDictionary::Dictionary *myDict = myNdb->getDictionary();
307
if (!myDict) APIERROR(myNdb->getNdbError());
308
// remove event from database
309
if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
313
delete cluster_connection;
318
int myCreateEvent(Ndb* myNdb,
319
const char *eventName,
320
const char *eventTableName,
321
const char **eventColumnNames,
322
const int noEventColumnNames,
325
NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
326
if (!myDict) APIERROR(myNdb->getNdbError());
328
const NdbDictionary::Table *table= myDict->getTable(eventTableName);
329
if (!table) APIERROR(myDict->getNdbError());
331
NdbDictionary::Event myEvent(eventName, *table);
332
myEvent.addTableEvent(NdbDictionary::Event::TE_ALL);
333
// myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT);
334
// myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE);
335
// myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);
337
myEvent.addEventColumns(noEventColumnNames, eventColumnNames);
338
myEvent.mergeEvents(merge_events);
340
// Add event to database
341
if (myDict->createEvent(myEvent) == 0)
343
else if (myDict->getNdbError().classification ==
344
NdbError::SchemaObjectExists) {
345
printf("Event creation failed, event exists\n");
346
printf("dropping Event...\n");
347
if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
349
// Add event to database
350
if ( myDict->createEvent(myEvent)) APIERROR(myDict->getNdbError());
352
APIERROR(myDict->getNdbError());