1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
19
#include <ndb_limits.h>
20
#include <SimulatedBlock.hpp>
22
#include <NodeBitmask.hpp>
26
#include <DLFifoList.hpp>
27
#include <KeyTable.hpp>
28
#include <DataBuffer.hpp>
29
#include <SignalCounter.hpp>
30
#include <AttributeHeader.hpp>
31
#include <AttributeList.hpp>
33
#include <signaldata/UtilSequence.hpp>
34
#include <signaldata/SumaImpl.hpp>
35
#include <ndbapi/NdbDictionary.hpp>
37
class Suma : public SimulatedBlock {
40
Suma(Block_context& ctx);
46
void execSUB_CREATE_REQ(Signal* signal);
47
void execSUB_REMOVE_REQ(Signal* signal);
49
void execSUB_START_REQ(Signal* signal);
50
void execSUB_STOP_REQ(Signal* signal);
52
void execSUB_SYNC_REQ(Signal* signal);
53
void execSUB_ABORT_SYNC_REQ(Signal* signal);
55
void execSUB_STOP_CONF(Signal* signal);
56
void execSUB_STOP_REF(Signal* signal);
62
void execLIST_TABLES_REF(Signal* signal);
63
void execLIST_TABLES_CONF(Signal* signal);
65
void execGET_TABINFOREF(Signal* signal);
66
void execGET_TABINFO_CONF(Signal* signal);
68
void execGET_TABLEID_CONF(Signal* signal);
69
void execGET_TABLEID_REF(Signal* signal);
71
void execDROP_TAB_CONF(Signal* signal);
72
void execALTER_TAB_REQ(Signal* signal);
73
void execCREATE_TAB_CONF(Signal* signal);
77
void execSCAN_HBREP(Signal* signal);
78
void execSCAN_FRAGREF(Signal* signal);
79
void execSCAN_FRAGCONF(Signal* signal);
80
void execTRANSID_AI(Signal* signal);
81
void execSUB_SYNC_CONTINUE_REF(Signal* signal);
82
void execSUB_SYNC_CONTINUE_CONF(Signal* signal);
87
void execTRIG_ATTRINFO(Signal* signal);
88
void execFIRE_TRIG_ORD(Signal* signal);
89
void execSUB_GCP_COMPLETE_REP(Signal* signal);
94
void execDI_FCOUNTREF(Signal* signal);
95
void execDI_FCOUNTCONF(Signal* signal);
96
void execDIGETPRIMREF(Signal* signal);
97
void execDIGETPRIMCONF(Signal* signal);
100
* Trigger administration
102
void execCREATE_TRIG_REF(Signal* signal);
103
void execCREATE_TRIG_CONF(Signal* signal);
104
void execDROP_TRIG_REF(Signal* signal);
105
void execDROP_TRIG_CONF(Signal* signal);
110
void execCONTINUEB(Signal* signal);
114
void suma_ndbrequire(bool v);
116
typedef DataBuffer<15> TableList;
118
union FragmentDescriptor {
127
* Used when sending SCAN_FRAG
129
union AttributeDescriptor {
140
Uint32 m_subPtrI; //reference to subscription
143
union { Uint32 nextPool; Uint32 prevList; };
145
typedef Ptr<Subscriber> SubscriberPtr;
151
struct Subscription {
155
Uint32 m_subscriptionId;
156
Uint32 m_subscriptionKey;
157
Uint32 m_subscriptionType;
162
REPORT_SUBSCRIBE = 0x2
172
Uint32 n_subscribers;
175
union { Uint32 prevHash; Uint32 nextPool; };
177
Uint32 hashValue() const {
178
return m_subscriptionId + m_subscriptionKey;
181
bool equal(const Subscription & s) const {
183
m_subscriptionId == s.m_subscriptionId &&
184
m_subscriptionKey == s.m_subscriptionKey;
187
* The following holds the tables included
188
* in the subscription.
192
Uint32 m_current_sync_ptrI;
194
typedef Ptr<Subscription> SubscriptionPtr;
198
typedef Ptr<Table> TablePtr;
201
SyncRecord(Suma& s, DataBuffer<15>::DataBufferPool & p)
202
: m_tableList(p), suma(s)
204
, cerrorInsert(s.cerrorInsert)
213
Uint32 m_subscriptionPtrI;
215
Uint32 m_currentTable;
216
TableList m_tableList; // Tables to sync
217
TableList::DataBufferIterator m_tableList_it;
222
Uint32 m_currentFragment; // Index in tabPtr.p->m_fragments
223
DataBuffer<15>::Head m_attributeList; // Attribute if other than default
224
DataBuffer<15>::Head m_tabList; // tables if other than default
226
Uint32 m_currentTableId; // Current table
227
Uint32 m_currentNoOfAttributes; // No of attributes for current table
229
void startScan(Signal*);
230
void nextScan(Signal*);
231
bool getNextFragment(TablePtr * tab, FragmentDescriptor * fd);
232
void completeScan(Signal*, int error= 0);
238
BlockNumber number() const { return suma.number(); }
239
void progError(int line, int cause, const char * extra) {
240
suma.progError(line, cause, extra);
243
Uint32 prevList; Uint32 ptrI;
244
union { Uint32 nextPool; Uint32 nextList; };
246
friend struct SyncRecord;
248
int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr,
249
Ptr<SyncRecord> syncPtr);
250
int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr,
251
SubscriberPtr subbPtr);
252
int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr);
254
int completeOneSubscriber(Signal* signal, TablePtr tabPtr, SubscriberPtr subbPtr);
255
void completeAllSubscribers(Signal* signal, TablePtr tabPtr);
256
void completeInitTable(Signal* signal, TablePtr tabPtr);
259
Table() { m_tableId = ~0; n_subscribers = 0; }
261
void checkRelease(Suma &suma);
263
DLList<Subscriber>::Head c_subscribers;
264
DLList<SyncRecord>::Head c_syncRecords;
276
SubscriberPtr m_drop_subbPtr;
278
Uint32 n_subscribers;
281
bool parseTable(SegmentedSectionPtr ptr, Suma &suma);
285
int setupTrigger(Signal* signal, Suma &suma);
286
void completeTrigger(Signal* signal);
287
void createAttributeMask(AttributeMask&, Suma &suma);
292
void dropTrigger(Signal* signal,Suma&);
293
void runDropTrigger(Signal* signal, Uint32 triggerId,Suma&);
299
void runLIST_TABLES_CONF(Signal* signal);
302
union { Uint32 m_tableId; Uint32 key; };
303
Uint32 m_schemaVersion;
304
Uint8 m_hasTriggerDefined[3]; // Insert/Update/Delete
305
Uint8 m_hasOutstandingTriggerReq[3]; // Insert/Update/Delete
306
Uint32 m_triggerIds[3]; // Insert/Update/Delete
310
* Default order in which to ask for attributes during scan
311
* 1) Fixed, not nullable
314
DataBuffer<15>::Head m_attributes; // Attribute id's
320
DataBuffer<15>::Head m_fragments; // Fragment descriptors
326
union { Uint32 prevHash; Uint32 nextPool; };
327
Uint32 hashValue() const {
330
bool equal(const Table& rec) const {
331
return m_tableId == rec.m_tableId;
338
DLList<Subscriber> c_metaSubscribers;
339
DLList<Subscriber> c_removeDataSubscribers;
344
KeyTable<Table> c_tables;
345
DLHashTable<Subscription> c_subscriptions;
350
ArrayPool<Subscriber> c_subscriberPool;
351
ArrayPool<Table> c_tablePool;
352
ArrayPool<Subscription> c_subscriptionPool;
353
ArrayPool<SyncRecord> c_syncPool;
354
DataBuffer<15>::DataBufferPool c_dataBufferPool;
356
NodeBitmask c_failedApiNodes;
361
bool removeSubscribersOnNode(Signal *signal, Uint32 nodeId);
363
bool checkTableTriggers(SegmentedSectionPtr ptr);
365
void addTableId(Uint32 TableId,
366
SubscriptionPtr subPtr, SyncRecord *psyncRec);
368
void sendSubIdRef(Signal* signal,Uint32 senderRef,Uint32 senderData,Uint32 errorCode);
369
void sendSubCreateRef(Signal* signal, Uint32 errorCode);
370
void sendSubStartRef(Signal*, SubscriberPtr, Uint32 errorCode, SubscriptionData::Part);
371
void sendSubStartRef(Signal* signal, Uint32 errorCode);
372
void sendSubStopRef(Signal* signal, Uint32 errorCode);
373
void sendSubSyncRef(Signal* signal, Uint32 errorCode);
374
void sendSubRemoveRef(Signal* signal, const SubRemoveReq& ref,
376
void sendSubStartComplete(Signal*, SubscriberPtr, Uint32,
377
SubscriptionData::Part);
378
void sendSubStopComplete(Signal*, SubscriberPtr);
379
void sendSubStopReq(Signal* signal, bool unlock= false);
381
void completeSubRemove(SubscriptionPtr subPtr);
383
void reportAllSubscribers(Signal *signal,
384
NdbDictionary::Event::_TableEvent table_event,
385
SubscriptionPtr subPtr,
386
SubscriberPtr subbPtr);
388
Uint32 getFirstGCI(Signal* signal);
393
void convertNameToId( SubscriptionPtr subPtr, Signal * signal);
398
void execCREATE_SUBSCRIPTION_REQ(Signal* signal);
399
void execDROP_SUBSCRIPTION_REQ(Signal* signal);
401
void execSTART_SUBSCRIPTION_REQ(Signal* signal);
402
void execSTOP_SUBSCRIPTION_REQ(Signal* signal);
404
void execSYNC_SUBSCRIPTION_REQ(Signal* signal);
405
void execABORT_SYNC_REQ(Signal* signal);
411
void getNodeGroupMembers(Signal* signal);
413
void execREAD_CONFIG_REQ(Signal* signal);
415
void execSTTOR(Signal* signal);
416
void sendSTTORRY(Signal*);
417
void execNDB_STTOR(Signal* signal);
418
void execDUMP_STATE_ORD(Signal* signal);
419
void execREAD_NODESCONF(Signal* signal);
420
void execNODE_FAILREP(Signal* signal);
421
void execINCL_NODEREQ(Signal* signal);
422
void execSIGNAL_DROPPED_REP(Signal* signal);
423
void execAPI_START_REP(Signal* signal);
424
void execAPI_FAILREQ(Signal* signal) ;
426
void execSUB_GCP_COMPLETE_ACK(Signal* signal);
429
* Controller interface
431
void execSUB_CREATE_REF(Signal* signal);
432
void execSUB_CREATE_CONF(Signal* signal);
434
void execSUB_DROP_REF(Signal* signal);
435
void execSUB_DROP_CONF(Signal* signal);
437
void execSUB_START_REF(Signal* signal);
438
void execSUB_START_CONF(Signal* signal);
440
void execSUB_ABORT_SYNC_REF(Signal* signal);
441
void execSUB_ABORT_SYNC_CONF(Signal* signal);
443
void execSUMA_START_ME_REQ(Signal* signal);
444
void execSUMA_START_ME_REF(Signal* signal);
445
void execSUMA_START_ME_CONF(Signal* signal);
446
void execSUMA_HANDOVER_REQ(Signal* signal);
447
void execSUMA_HANDOVER_REF(Signal* signal);
448
void execSUMA_HANDOVER_CONF(Signal* signal);
451
* Subscription generation interface
453
void createSequence(Signal* signal);
454
void createSequenceReply(Signal* signal,
455
UtilSequenceConf* conf,
456
UtilSequenceRef* ref);
457
void execUTIL_SEQUENCE_CONF(Signal* signal);
458
void execUTIL_SEQUENCE_REF(Signal* signal);
459
void execCREATE_SUBID_REQ(Signal* signal);
462
* for Suma that is restarting another
471
DLHashTable<Subscription>::Iterator c_subIt;
472
KeyTable<Table>::Iterator c_tabIt;
474
void progError(int line, int cause, const char * extra) {
475
suma.progError(line, cause, extra);
478
void resetNode(Uint32 sumaRef);
479
void runSUMA_START_ME_REQ(Signal*, Uint32 sumaRef);
480
void startNode(Signal*, Uint32 sumaRef);
482
void createSubscription(Signal* signal, Uint32 sumaRef);
483
void nextSubscription(Signal* signal, Uint32 sumaRef);
484
void runSUB_CREATE_CONF(Signal* signal);
485
void completeSubscription(Signal* signal, Uint32 sumaRef);
487
void startSubscriber(Signal* signal, Uint32 sumaRef);
488
void nextSubscriber(Signal* signal, Uint32 sumaRef, SubscriberPtr subbPtr);
489
void sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr,
490
Signal* signal, Uint32 sumaRef);
491
void runSUB_START_CONF(Signal* signal);
492
void completeSubscriber(Signal* signal, Uint32 sumaRef);
494
void completeRestartingNode(Signal* signal, Uint32 sumaRef);
495
void resetRestart(Signal* signal);
499
friend class Restart;
503
NodeId c_masterNodeId;
504
NdbNodeBitmask c_alive_nodes;
507
* for restarting Suma not to start sending data too early
511
bool m_wait_handover;
512
Uint32 m_restart_server_node_id;
513
NdbNodeBitmask m_handover_nodes;
516
NodeBitmask c_connected_nodes; // (NODE/API) START REP / (API/NODE) FAIL REQ
517
NodeBitmask c_subscriber_nodes; //
520
* for all Suma's to keep track of other Suma's in Node group
523
Uint32 c_noNodesInGroup;
524
Uint32 c_nodesInGroup[MAX_REPLICAS];
525
NdbNodeBitmask c_nodes_in_nodegroup_mask; // NodeId's of nodes in nodegroup
527
void send_start_me_req(Signal* signal);
528
void check_start_handover(Signal* signal);
529
void send_handover_req(Signal* signal);
531
Uint32 get_responsible_node(Uint32 B) const;
532
Uint32 get_responsible_node(Uint32 B, const NdbNodeBitmask& mask) const;
533
bool check_switchover(Uint32 bucket, Uint32 gci);
540
Uint32 m_max_gci; // max gci on page
541
Uint32 m_last_gci; // last gci on page
548
BUCKET_STARTING = 0x1 // On starting node
549
,BUCKET_HANDOVER = 0x2 // On running node
550
,BUCKET_TAKEOVER = 0x4 // On takeing over node
551
,BUCKET_RESEND = 0x8 // On takeing over node
554
Uint16 m_switchover_node;
555
Uint16 m_nodes[MAX_REPLICAS];
556
Uint32 m_switchover_gci;
557
Uint32 m_max_acked_gci;
558
Uint32 m_buffer_tail; // Page
559
Page_pos m_buffer_head;
564
STATIC_CONST( DATA_WORDS = 8192 - 9);
569
Uint32 m_page_state; // Used by TUP buddy algorithm
570
Uint32 m_page_chunk_ptr_i;
572
Uint32 m_words_used; //
574
Uint32 m_data[DATA_WORDS];
577
STATIC_CONST( NO_OF_BUCKETS = 24 ); // 24 = 4*3*2*1!
578
Uint32 c_no_of_buckets;
579
struct Bucket c_buckets[NO_OF_BUCKETS];
581
STATIC_CONST( BUCKET_MASK_SIZE = (((NO_OF_BUCKETS+31)>> 5)) );
582
typedef Bitmask<BUCKET_MASK_SIZE> Bucket_mask;
583
Bucket_mask m_active_buckets;
584
Bucket_mask m_switchover_buckets;
588
Uint32* get_buffer_ptr(Signal*, Uint32 buck, Uint32 gci, Uint32 sz);
590
void free_page(Uint32 page_id, Buffer_page* page);
591
void out_of_buffer(Signal*);
592
void out_of_buffer_release(Signal* signal, Uint32 buck);
594
void start_resend(Signal*, Uint32 bucket);
595
void resend_bucket(Signal*, Uint32 bucket, Uint32 gci,
596
Uint32 page_pos, Uint32 last_gci);
597
void release_gci(Signal*, Uint32 bucket, Uint32 gci);
599
Uint32 m_max_seen_gci; // FIRE_TRIG_ORD
600
Uint32 m_max_sent_gci; // FIRE_TRIG_ORD -> send
601
Uint32 m_last_complete_gci; // SUB_GCP_COMPLETE_REP
602
Uint32 m_out_of_buffer_gci;
603
Uint32 m_gcp_complete_rep_count;
608
NodeBitmask m_subscribers;
615
ArrayPool<Gcp_record> c_gcp_pool;
616
DLFifoList<Gcp_record> c_gcp_list;
630
Uint32 m_first_free_page;
631
ArrayPool<Page_chunk> c_page_chunk_pool;