1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#ifndef NdbEventOperationImpl_H
17
#define NdbEventOperationImpl_H
19
#include <NdbEventOperation.hpp>
20
#include <signaldata/SumaImpl.hpp>
21
#include <transporter/TransporterDefinitions.hpp>
22
#include <NdbRecAttr.hpp>
23
#include <AttributeHeader.hpp>
24
#include <UtilBuffer.hpp>
26
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
29
#define DBUG_ENTER_EVENT(A) DBUG_ENTER(A)
30
#define DBUG_RETURN_EVENT(A) DBUG_RETURN(A)
31
#define DBUG_VOID_RETURN_EVENT DBUG_VOID_RETURN
32
#define DBUG_PRINT_EVENT(A,B) DBUG_PRINT(A,B)
33
#define DBUG_DUMP_EVENT(A,B,C) DBUG_DUMP(A,B,C)
35
#define DBUG_ENTER_EVENT(A)
36
#define DBUG_RETURN_EVENT(A) return(A)
37
#define DBUG_VOID_RETURN_EVENT return
38
#define DBUG_PRINT_EVENT(A,B)
39
#define DBUG_DUMP_EVENT(A,B,C)
42
#undef NDB_EVENT_VERIFY_SIZE
44
// not much effect on performance, leave on
45
#define NDB_EVENT_VERIFY_SIZE
48
class NdbEventOperationImpl;
56
LinearSectionPtr ptr[3];
58
NdbEventOperationImpl *m_event_op;
61
* Blobs are stored in blob list (m_next_blob) where each entry
62
* is list of parts (m_next). TODO order by part number
64
* Processed data (m_used_data, m_free_data) keeps the old blob
65
* list intact. It is reconsumed when new data items are needed.
67
* Data item lists keep track of item count and sum(sz) and
68
* these include both main items and blob parts.
71
EventBufData *m_next; // Next wrt to global order or Next blob part
72
EventBufData *m_next_blob; // First part in next blob
74
EventBufData *m_next_hash; // Next in per-GCI hash
75
Uint32 m_pkhash; // PK hash (without op) for fast compare
79
// Get blob part number from blob data
80
Uint32 get_blob_part_no() const;
83
* Main item does not include summary of parts (space / performance
84
* tradeoff). The summary is needed when moving single data item.
85
* It is not needed when moving entire list.
87
void get_full_size(Uint32 & full_count, Uint32 & full_sz) const {
91
add_part_size(full_count, full_sz);
93
void add_part_size(Uint32 & full_count, Uint32 & full_sz) const;
96
class EventBufData_list
100
~EventBufData_list();
102
// remove first and return its size
103
void remove_first(Uint32 & full_count, Uint32 & full_sz);
104
// for remove+append avoid double call to get_full_size()
105
void append_used_data(EventBufData *data, Uint32 full_count, Uint32 full_sz);
106
// append data and insert data but ignore Gci_op list
107
void append_used_data(EventBufData *data);
108
// append data and insert data into Gci_op list with add_gci_op
109
void append_data(EventBufData *data);
110
// append list to another, will call move_gci_ops
111
void append_list(EventBufData_list *list, Uint64 gci);
115
EventBufData *m_head, *m_tail;
120
distinct ops per gci (assume no hash needed)
122
list may be in 2 versions
124
1. single list with on gci only
126
Gci_op *m_gci_op_list;
127
Uint32 m_gci_op_count;
128
Uint32 m_gci_op_alloc != 0;
130
2. multi list with several gcis
131
- linked list of gci's
132
- one linear array per gci
133
Gci_ops *m_gci_ops_list;
134
Gci_ops *m_gci_ops_list_tail;
135
Uint32 m_is_not_multi_list == 0;
138
struct Gci_op // 1 + 2
140
NdbEventOperationImpl* op;
146
Gci_op *m_gci_op_list;
148
Uint32 m_gci_op_count;
152
Gci_op *m_gci_op_list; // 1
153
Gci_ops *m_gci_ops_list; // 2
157
Uint32 m_gci_op_count; // 1
158
Gci_ops *m_gci_ops_list_tail;// 2
162
Uint32 m_gci_op_alloc; // 1
163
Uint32 m_is_not_multi_list; // 2
165
Gci_ops *first_gci_ops();
166
Gci_ops *next_gci_ops();
167
// case 1 above; add Gci_op to single list
168
void add_gci_op(Gci_op g);
170
// case 2 above; move single list or multi list from
171
// one list to another
172
void move_gci_ops(EventBufData_list *list, Uint64 gci);
176
EventBufData_list::EventBufData_list()
177
: m_head(0), m_tail(0),
181
m_gci_ops_list_tail(0),
184
DBUG_ENTER_EVENT("EventBufData_list::EventBufData_list");
185
DBUG_PRINT_EVENT("info", ("this: %p", this));
186
DBUG_VOID_RETURN_EVENT;
190
EventBufData_list::~EventBufData_list()
192
DBUG_ENTER_EVENT("EventBufData_list::~EventBufData_list");
193
DBUG_PRINT_EVENT("info", ("this: %p m_is_not_multi_list: %u",
194
this, m_is_not_multi_list));
195
if (m_is_not_multi_list)
197
DBUG_PRINT_EVENT("info", ("delete m_gci_op_list: %p", m_gci_op_list));
198
delete [] m_gci_op_list;
202
Gci_ops *op = first_gci_ops();
206
DBUG_VOID_RETURN_EVENT;
210
int EventBufData_list::is_empty()
216
void EventBufData_list::remove_first(Uint32 & full_count, Uint32 & full_sz)
218
m_head->get_full_size(full_count, full_sz);
220
assert(m_count >= full_count);
221
assert(m_sz >= full_sz);
223
m_count -= full_count;
225
m_head = m_head->m_next;
231
void EventBufData_list::append_used_data(EventBufData *data, Uint32 full_count, Uint32 full_sz)
235
m_tail->m_next = data;
240
assert(m_count == 0);
247
m_count += full_count;
252
void EventBufData_list::append_used_data(EventBufData *data)
254
Uint32 full_count, full_sz;
255
data->get_full_size(full_count, full_sz);
256
append_used_data(data, full_count, full_sz);
260
void EventBufData_list::append_data(EventBufData *data)
262
Gci_op g = { data->m_event_op,
263
1 << SubTableData::getOperation(data->sdata->requestInfo) };
266
append_used_data(data);
269
inline EventBufData_list::Gci_ops *
270
EventBufData_list::first_gci_ops()
272
assert(!m_is_not_multi_list);
273
return m_gci_ops_list;
276
inline EventBufData_list::Gci_ops *
277
EventBufData_list::next_gci_ops()
279
assert(!m_is_not_multi_list);
280
Gci_ops *first = m_gci_ops_list;
281
m_gci_ops_list = first->m_next;
282
if (first->m_gci_op_list)
284
DBUG_PRINT_EVENT("info", ("this: %p delete m_gci_op_list: %p",
285
this, first->m_gci_op_list));
286
delete [] first->m_gci_op_list;
289
if (m_gci_ops_list == 0)
290
m_gci_ops_list_tail = 0;
291
return m_gci_ops_list;
294
// GCI bucket has also a hash over data, with key event op, table PK.
295
// It can only be appended to and is invalid after remove_first().
296
class EventBufData_hash
299
struct Pos { // search result
300
Uint32 index; // index into hash array
301
EventBufData* data; // non-zero if found
302
Uint32 pkhash; // PK hash
305
static Uint32 getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3]);
306
static bool getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3]);
308
void search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3]);
309
void append(Pos& hpos, EventBufData* data);
311
enum { GCI_EVENT_HASH_SIZE = 101 };
312
EventBufData* m_hash[GCI_EVENT_HASH_SIZE];
316
void EventBufData_hash::append(Pos& hpos, EventBufData* data)
318
data->m_next_hash = m_hash[hpos.index];
319
m_hash[hpos.index] = data;
326
GC_COMPLETE = 0x1 // GCI is complete, but waiting for out of order
330
Uint32 m_gcp_complete_rep_count; // Remaining SUB_GCP_COMPLETE_REP until done
332
EventBufData_list m_data;
333
EventBufData_hash m_data_hash;
336
struct Gci_container_pod
338
char data[sizeof(Gci_container)];
341
class NdbEventOperationImpl : public NdbEventOperation {
343
NdbEventOperationImpl(NdbEventOperation &f,
345
const char* eventName);
346
NdbEventOperationImpl(Ndb *theNdb,
348
void init(NdbEventImpl& evnt);
349
NdbEventOperationImpl(NdbEventOperationImpl&); //unimplemented
350
NdbEventOperationImpl& operator=(const NdbEventOperationImpl&); //unimplemented
351
~NdbEventOperationImpl();
353
NdbEventOperation::State getState();
356
int execute_nolock();
358
NdbRecAttr *getValue(const char *colName, char *aValue, int n);
359
NdbRecAttr *getValue(const NdbColumnImpl *, char *aValue, int n);
360
NdbBlob *getBlobHandle(const char *colName, int n);
361
NdbBlob *getBlobHandle(const NdbColumnImpl *, int n);
362
int readBlobParts(char* buf, NdbBlob* blob, Uint32 part, Uint32 count);
364
bool tableNameChanged() const;
365
bool tableFrmChanged() const;
366
bool tableFragmentationChanged() const;
367
bool tableRangeListChanged() const;
369
Uint32 getAnyValue() const;
370
Uint64 getLatestGCI();
371
bool execSUB_TABLE_DATA(NdbApiSignal * signal,
372
LinearSectionPtr ptr[3]);
374
NdbDictionary::Event::TableEvent getEventType();
379
NdbEventOperation *m_facade;
380
Uint32 m_magic_number;
382
const NdbError & getNdbError() const;
386
NdbEventImpl *m_eventImpl;
388
NdbRecAttr *theFirstPkAttrs[2];
389
NdbRecAttr *theCurrentPkAttrs[2];
390
NdbRecAttr *theFirstDataAttrs[2];
391
NdbRecAttr *theCurrentDataAttrs[2];
393
NdbBlob* theBlobList;
394
NdbEventOperationImpl* theBlobOpList; // in main op, list of blob ops
395
NdbEventOperationImpl* theMainOp; // in blob op, the main op
397
NdbEventOperation::State m_state; /* note connection to mi_type */
398
Uint32 mi_type; /* should be == 0 if m_state != EO_EXECUTING
399
* else same as in EventImpl
405
m_node_bit_mask keeps track of which ndb nodes have reference to
409
- remove - TE_STOP, TE_NODE_FAILURE, TE_CLUSTER_FAILURE
411
TE_NODE_FAILURE and TE_CLUSTER_FAILURE are created as events
412
and added to all event ops listed as active or pending delete
413
in m_dropped_ev_op using insertDataL, includeing the blob
414
event ops referenced by a regular event op.
415
- NdbEventBuffer::report_node_failure
416
- NdbEventBuffer::completeClusterFailed
418
TE_ACTIVE is sent from the kernel on initial execute/start of the
419
event op, but is also internally generetad on node connect like
420
TE_NODE_FAILURE and TE_CLUSTER_FAILURE
421
- NdbEventBuffer::report_node_connected
423
when m_node_bit_mask becomes clear, the kernel reference is
424
removed from m_ref_count
426
node id 0 is used to denote that cluster has a reference
429
Bitmask<(unsigned int)_NDB_NODE_BITMASK_SIZE> m_node_bit_mask;
432
m_ref_count keeps track of outstanding references to an event
433
operation impl object. To make sure that the object is not
436
If on dropEventOperation there are still references to an
437
object it is queued for delete in NdbEventBuffer::m_dropped_ev_op
439
the following references exists for a _non_ blob event op:
441
- add - NdbEventBuffer::createEventOperation
442
- remove - NdbEventBuffer::dropEventOperation
444
- add - execute_nolock
445
- remove - TE_STOP, TE_CLUSTER_FAILURE
447
- add - execute_nolock on blob event
448
- remove - TE_STOP, TE_CLUSTER_FAILURE on blob event
450
- add - insertDataL/add_gci_op
451
- remove - NdbEventBuffer::deleteUsedEventOperations
453
the following references exists for a blob event op:
455
- add - execute_nolock
456
- remove - TE_STOP, TE_CLUSTER_FAILURE
462
EventBufData *m_data_item;
470
// Bit mask for what has changed in a table (for TE_ALTER event)
471
Uint32 m_change_mask;
474
Uint32 m_data_done_count;
478
// managed by the ndb object
479
NdbEventOperationImpl *m_next;
480
NdbEventOperationImpl *m_prev;
482
void receive_data(NdbRecAttr *r, const Uint32 *data, Uint32 sz);
485
class NdbEventBuffer {
487
NdbEventBuffer(Ndb*);
490
const Uint32 &m_system_nodes;
491
Vector<Gci_container_pod> m_active_gci;
492
NdbEventOperation *createEventOperation(const char* eventName,
494
NdbEventOperationImpl *createEventOperationImpl(NdbEventImpl& evnt,
496
void dropEventOperation(NdbEventOperation *);
497
static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp);
499
void add_drop_lock() { NdbMutex_Lock(m_add_drop_mutex); }
500
void add_drop_unlock() { NdbMutex_Unlock(m_add_drop_mutex); }
501
void lock() { NdbMutex_Lock(m_mutex); }
502
void unlock() { NdbMutex_Unlock(m_mutex); }
506
void init_gci_containers();
508
// accessed from the "receive thread"
509
int insertDataL(NdbEventOperationImpl *op,
510
const SubTableData * const sdata,
511
LinearSectionPtr ptr[3]);
512
void execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep);
513
void complete_outof_order_gcis();
515
void report_node_connected(Uint32 node_id);
516
void report_node_failure(Uint32 node_id);
517
void completeClusterFailed();
519
// used by user thread
520
Uint64 getLatestGCI();
521
Uint32 getEventId(int bufferId);
523
int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0);
524
int flushIncompleteEvents(Uint64 gci);
525
NdbEventOperation *nextEvent();
526
NdbEventOperationImpl* getGCIEventOperations(Uint32* iter,
527
Uint32* event_types);
528
void deleteUsedEventOperations();
530
NdbEventOperationImpl *move_data();
532
// routines to copy/merge events
533
EventBufData* alloc_data();
534
int alloc_mem(EventBufData* data,
535
LinearSectionPtr ptr[3],
537
void dealloc_mem(EventBufData* data,
539
int copy_data(const SubTableData * const sdata,
540
LinearSectionPtr ptr[3],
543
int merge_data(const SubTableData * const sdata,
544
LinearSectionPtr ptr[3],
547
int get_main_data(Gci_container* bucket,
548
EventBufData_hash::Pos& hpos,
549
EventBufData* blob_data);
550
void add_blob_data(Gci_container* bucket,
551
EventBufData* main_data,
552
EventBufData* blob_data);
554
void free_list(EventBufData_list &list);
558
// Global Mutex used for some things
559
static NdbMutex *p_add_drop_mutex;
562
const char *m_latest_command;
567
Uint64 m_latestGCI; // latest "handover" GCI
568
Uint64 m_latest_complete_GCI; // latest complete GCI (in case of outof order)
571
struct NdbCondition *p_cond;
574
Gci_container m_complete_data;
575
EventBufData *m_free_data;
577
Uint32 m_free_data_count;
579
Uint32 m_free_data_sz;
582
EventBufData_list m_available_data;
583
EventBufData_list m_used_data;
585
unsigned m_total_alloc; // total allocated memory
587
// threshholds to report status
588
unsigned m_free_thresh, m_min_free_thresh, m_max_free_thresh;
589
unsigned m_gci_slip_thresh;
594
static void verify_size(const EventBufData* data, Uint32 count, Uint32 sz);
595
static void verify_size(const EventBufData_list & list);
599
void insert_event(NdbEventOperationImpl* impl,
601
LinearSectionPtr *ptr,
604
int expand(unsigned sz);
606
// all allocated data
607
struct EventBufData_chunk
610
EventBufData data[1];
612
Vector<EventBufData_chunk *> m_allocated_data;
616
dropped event operations (dropEventOperation) that have not yet
617
been deleted because of outstanding m_ref_count
619
check for delete is done on occations when the ref_count may have
620
changed by calling deleteUsedEventOperations:
621
- nextEvent - each time the user has completed processing a gci
623
NdbEventOperationImpl *m_dropped_ev_op;
625
Uint32 m_active_op_count;
626
NdbMutex *m_add_drop_mutex;
630
NdbEventOperationImpl*
631
NdbEventBuffer::getEventOperationImpl(NdbEventOperation* tOp)
637
NdbEventOperationImpl::receive_data(NdbRecAttr *r,
641
r->receive_data(data,sz);
645
assert((r->attrSize() * r->arraySize() + 3) >> 2 == sz);
647
memcpy(r->aRef(), data, 4 * sz);