~ubuntu-branches/ubuntu/precise/mysql-5.5/precise-201203300109

« back to all changes in this revision

Viewing changes to storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp

  • Committer: Package Import Robot
  • Author(s): Clint Byrum
  • Date: 2011-11-08 11:31:13 UTC
  • Revision ID: package-import@ubuntu.com-20111108113113-3ulw01fvi4vn8m25
Tags: upstream-5.5.17
ImportĀ upstreamĀ versionĀ 5.5.17

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/**
 
17
 *  ndbapi_event.cpp: Using API level events in NDB API
 
18
 *
 
19
 *  Classes and methods used in this example:
 
20
 *
 
21
 *  Ndb_cluster_connection
 
22
 *       connect()
 
23
 *       wait_until_ready()
 
24
 *
 
25
 *  Ndb
 
26
 *       init()
 
27
 *       getDictionary()
 
28
 *       createEventOperation()
 
29
 *       dropEventOperation()
 
30
 *       pollEvents()
 
31
 *       nextEvent()
 
32
 *
 
33
 *  NdbDictionary
 
34
 *       createEvent()
 
35
 *       dropEvent()
 
36
 *
 
37
 *  NdbDictionary::Event
 
38
 *       setTable()
 
39
 *       addTableEvent()
 
40
 *       addEventColumn()
 
41
 *
 
42
 *  NdbEventOperation
 
43
 *       getValue()
 
44
 *       getPreValue()
 
45
 *       execute()
 
46
 *       getEventType()
 
47
 *
 
48
 */
 
49
 
 
50
#include <NdbApi.hpp>
 
51
 
 
52
// Used for cout
 
53
#include <stdio.h>
 
54
#include <iostream>
 
55
#include <unistd.h>
 
56
#ifdef VM_TRACE
 
57
#include <my_global.h>
 
58
#endif
 
59
#ifndef assert
 
60
#include <assert.h>
 
61
#endif
 
62
 
 
63
 
 
64
/**
 
65
 * Assume that there is a table which is being updated by 
 
66
 * another process (e.g. flexBench -l 0 -stdtables).
 
67
 * We want to monitor what happens with column values.
 
68
 *
 
69
 * Or using the mysql client:
 
70
 *
 
71
 * shell> mysql -u root
 
72
 * mysql> create database TEST_DB;
 
73
 * mysql> use TEST_DB;
 
74
 * mysql> create table t0
 
75
 *        (c0 int, c1 int, c2 char(4), c3 char(4), c4 text,
 
76
 *        primary key(c0, c2)) engine ndb charset latin1;
 
77
 *
 
78
 * In another window start ndbapi_event, wait until properly started
 
79
 
 
80
   insert into t0 values (1, 2, 'a', 'b', null);
 
81
   insert into t0 values (3, 4, 'c', 'd', null);
 
82
   update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk
 
83
   update t0 set c3 = 'f'; -- use scan
 
84
   update t0 set c3 = 'F'; -- use scan update to 'same'
 
85
   update t0 set c2 = 'g' where c0 = 1; -- update pk part
 
86
   update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same'
 
87
   update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK
 
88
   delete from t0;
 
89
 
 
90
   insert ...; update ...; -- see events w/ same pk merged (if -m option)
 
91
   delete ...; insert ...; -- there are 5 combinations ID IU DI UD UU
 
92
   update ...; update ...;
 
93
 
 
94
   -- text requires -m flag
 
95
   set @a = repeat('a',256); -- inline size
 
96
   set @b = repeat('b',2000); -- part size
 
97
   set @c = repeat('c',2000*30); -- 30 parts
 
98
 
 
99
   -- update the text field using combinations of @a, @b, @c ...
 
100
 
 
101
 * you should see the data popping up in the example window
 
102
 *
 
103
 */
 
104
 
 
105
#define APIERROR(error) \
 
106
  { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \
 
107
              << error.code << ", msg: " << error.message << "." << std::endl; \
 
108
    exit(-1); }
 
109
 
 
110
int myCreateEvent(Ndb* myNdb,
 
111
                  const char *eventName,
 
112
                  const char *eventTableName,
 
113
                  const char **eventColumnName,
 
114
                  const int noEventColumnName,
 
115
                  bool merge_events);
 
116
 
 
117
int main(int argc, char** argv)
 
118
{
 
119
  if (argc < 3)
 
120
  {
 
121
    std::cout << "Arguments are <connect_string cluster> <timeout> [m(merge events)|d(debug)].\n";
 
122
    exit(-1);
 
123
  }
 
124
  const char *connectstring = argv[1];
 
125
  int timeout = atoi(argv[2]);
 
126
  ndb_init();
 
127
  bool merge_events = argc > 3 && strchr(argv[3], 'm') != 0;
 
128
#ifdef VM_TRACE
 
129
  bool dbug = argc > 3 && strchr(argv[3], 'd') != 0;
 
130
  if (dbug) DBUG_PUSH("d:t:");
 
131
  if (dbug) putenv("API_SIGNAL_LOG=-");
 
132
#endif
 
133
 
 
134
  Ndb_cluster_connection *cluster_connection=
 
135
    new Ndb_cluster_connection(connectstring); // Object representing the cluster
 
136
 
 
137
  int r= cluster_connection->connect(5 /* retries               */,
 
138
                                     3 /* delay between retries */,
 
139
                                     1 /* verbose               */);
 
140
  if (r > 0)
 
141
  {
 
142
    std::cout
 
143
      << "Cluster connect failed, possibly resolved with more retries.\n";
 
144
    exit(-1);
 
145
  }
 
146
  else if (r < 0)
 
147
  {
 
148
    std::cout
 
149
      << "Cluster connect failed.\n";
 
150
    exit(-1);
 
151
  }
 
152
                                           
 
153
  if (cluster_connection->wait_until_ready(30,30))
 
154
  {
 
155
    std::cout << "Cluster was not ready within 30 secs." << std::endl;
 
156
    exit(-1);
 
157
  }
 
158
 
 
159
  Ndb* myNdb= new Ndb(cluster_connection,
 
160
                      "TEST_DB");  // Object representing the database
 
161
 
 
162
  if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());
 
163
 
 
164
  const char *eventName= "CHNG_IN_t0";
 
165
  const char *eventTableName= "t0";
 
166
  const int noEventColumnName= 5;
 
167
  const char *eventColumnName[noEventColumnName]=
 
168
    {"c0",
 
169
     "c1",
 
170
     "c2",
 
171
     "c3",
 
172
     "c4"
 
173
    };
 
174
  
 
175
  // Create events
 
176
  myCreateEvent(myNdb,
 
177
                eventName,
 
178
                eventTableName,
 
179
                eventColumnName,
 
180
                noEventColumnName,
 
181
                merge_events);
 
182
 
 
183
  // Normal values and blobs are unfortunately handled differently..
 
184
  typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH;
 
185
 
 
186
  int i, j, k, l;
 
187
  j = 0;
 
188
  while (j < timeout) {
 
189
 
 
190
    // Start "transaction" for handling events
 
191
    NdbEventOperation* op;
 
192
    printf("create EventOperation\n");
 
193
    if ((op = myNdb->createEventOperation(eventName)) == NULL)
 
194
      APIERROR(myNdb->getNdbError());
 
195
    op->mergeEvents(merge_events);
 
196
 
 
197
    printf("get values\n");
 
198
    RA_BH recAttr[noEventColumnName];
 
199
    RA_BH recAttrPre[noEventColumnName];
 
200
    // primary keys should always be a part of the result
 
201
    for (i = 0; i < noEventColumnName; i++) {
 
202
      if (i < 4) {
 
203
        recAttr[i].ra    = op->getValue(eventColumnName[i]);
 
204
        recAttrPre[i].ra = op->getPreValue(eventColumnName[i]);
 
205
      } else if (merge_events) {
 
206
        recAttr[i].bh    = op->getBlobHandle(eventColumnName[i]);
 
207
        recAttrPre[i].bh = op->getPreBlobHandle(eventColumnName[i]);
 
208
      }
 
209
    }
 
210
 
 
211
    // set up the callbacks
 
212
    printf("execute\n");
 
213
    // This starts changes to "start flowing"
 
214
    if (op->execute())
 
215
      APIERROR(op->getNdbError());
 
216
 
 
217
    NdbEventOperation* the_op = op;
 
218
 
 
219
    i= 0;
 
220
    while (i < timeout) {
 
221
      // printf("now waiting for event...\n");
 
222
      int r = myNdb->pollEvents(1000); // wait for event or 1000 ms
 
223
      if (r > 0) {
 
224
        // printf("got data! %d\n", r);
 
225
        while ((op= myNdb->nextEvent())) {
 
226
          assert(the_op == op);
 
227
          i++;
 
228
          switch (op->getEventType()) {
 
229
          case NdbDictionary::Event::TE_INSERT:
 
230
            printf("%u INSERT", i);
 
231
            break;
 
232
          case NdbDictionary::Event::TE_DELETE:
 
233
            printf("%u DELETE", i);
 
234
            break;
 
235
          case NdbDictionary::Event::TE_UPDATE:
 
236
            printf("%u UPDATE", i);
 
237
            break;
 
238
          default:
 
239
            abort(); // should not happen
 
240
          }
 
241
          printf(" gci=%d\n", (int)op->getGCI());
 
242
          for (k = 0; k <= 1; k++) {
 
243
            printf(k == 0 ? "post: " : "pre : ");
 
244
            for (l = 0; l < noEventColumnName; l++) {
 
245
              if (l < 4) {
 
246
                NdbRecAttr* ra = k == 0 ? recAttr[l].ra : recAttrPre[l].ra;
 
247
                if (ra->isNULL() >= 0) { // we have a value
 
248
                  if (ra->isNULL() == 0) { // we have a non-null value
 
249
                    if (l < 2)
 
250
                      printf("%-5u", ra->u_32_value());
 
251
                    else
 
252
                      printf("%-5.4s", ra->aRef());
 
253
                  } else
 
254
                    printf("%-5s", "NULL");
 
255
                } else
 
256
                  printf("%-5s", "-"); // no value
 
257
              } else if (merge_events) {
 
258
                int isNull;
 
259
                NdbBlob* bh = k == 0 ? recAttr[l].bh : recAttrPre[l].bh;
 
260
                bh->getDefined(isNull);
 
261
                if (isNull >= 0) { // we have a value
 
262
                  if (! isNull) { // we have a non-null value
 
263
                    Uint64 length = 0;
 
264
                    bh->getLength(length);
 
265
                    // read into buffer
 
266
                    unsigned char* buf = new unsigned char [length];
 
267
                    memset(buf, 'X', length);
 
268
                    Uint32 n = length;
 
269
                    bh->readData(buf, n); // n is in/out
 
270
                    assert(n == length);
 
271
                    // pretty-print
 
272
                    bool first = true;
 
273
                    Uint32 i = 0;
 
274
                    while (i < n) {
 
275
                      unsigned char c = buf[i++];
 
276
                      Uint32 m = 1;
 
277
                      while (i < n && buf[i] == c)
 
278
                        i++, m++;
 
279
                      if (! first)
 
280
                        printf("+");
 
281
                      printf("%u%c", m, c);
 
282
                      first = false;
 
283
                    }
 
284
                    printf("[%u]", n);
 
285
                    delete [] buf;
 
286
                  } else
 
287
                    printf("%-5s", "NULL");
 
288
                } else
 
289
                  printf("%-5s", "-"); // no value
 
290
              }
 
291
            }
 
292
            printf("\n");
 
293
          }
 
294
        }
 
295
      } else
 
296
        printf("timed out (%i)\n", timeout);
 
297
    }
 
298
    // don't want to listen to events anymore
 
299
    if (myNdb->dropEventOperation(the_op)) APIERROR(myNdb->getNdbError());
 
300
    the_op = 0;
 
301
 
 
302
    j++;
 
303
  }
 
304
 
 
305
  {
 
306
    NdbDictionary::Dictionary *myDict = myNdb->getDictionary();
 
307
    if (!myDict) APIERROR(myNdb->getNdbError());
 
308
    // remove event from database
 
309
    if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
 
310
  }
 
311
 
 
312
  delete myNdb;
 
313
  delete cluster_connection;
 
314
  ndb_end(0);
 
315
  return 0;
 
316
}
 
317
 
 
318
int myCreateEvent(Ndb* myNdb,
 
319
                  const char *eventName,
 
320
                  const char *eventTableName,
 
321
                  const char **eventColumnNames,
 
322
                  const int noEventColumnNames,
 
323
                  bool merge_events)
 
324
{
 
325
  NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
 
326
  if (!myDict) APIERROR(myNdb->getNdbError());
 
327
 
 
328
  const NdbDictionary::Table *table= myDict->getTable(eventTableName);
 
329
  if (!table) APIERROR(myDict->getNdbError());
 
330
 
 
331
  NdbDictionary::Event myEvent(eventName, *table);
 
332
  myEvent.addTableEvent(NdbDictionary::Event::TE_ALL); 
 
333
  //  myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT); 
 
334
  //  myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE); 
 
335
  //  myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);
 
336
 
 
337
  myEvent.addEventColumns(noEventColumnNames, eventColumnNames);
 
338
  myEvent.mergeEvents(merge_events);
 
339
 
 
340
  // Add event to database
 
341
  if (myDict->createEvent(myEvent) == 0)
 
342
    myEvent.print();
 
343
  else if (myDict->getNdbError().classification ==
 
344
           NdbError::SchemaObjectExists) {
 
345
    printf("Event creation failed, event exists\n");
 
346
    printf("dropping Event...\n");
 
347
    if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
 
348
    // try again
 
349
    // Add event to database
 
350
    if ( myDict->createEvent(myEvent)) APIERROR(myDict->getNdbError());
 
351
  } else
 
352
    APIERROR(myDict->getNdbError());
 
353
 
 
354
  return 0;
 
355
}