1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include <ndb_global.h>
17
#include <my_pthread.h>
18
#include <ndb_limits.h>
19
#include "TransporterFacade.hpp"
20
#include "ClusterMgr.hpp"
21
#include <IPCConfig.hpp>
22
#include <TransporterCallback.hpp>
23
#include <TransporterRegistry.hpp>
24
#include "NdbApiSignal.hpp"
30
#include <ConfigRetriever.hpp>
31
#include <mgmapi_config_parameters.h>
32
#include <mgmapi_configuration.hpp>
33
#include <NdbConfig.h>
34
#include <ndb_version.h>
35
#include <SignalLoggerManager.hpp>
36
#include <kernel/ndb_limits.h>
37
#include <signaldata/AlterTable.hpp>
38
#include <signaldata/SumaImpl.hpp>
40
//#define REPORT_TRANSPORTER
43
static int numberToIndex(int number)
45
return number - MIN_API_BLOCK_NO;
48
static int indexToNumber(int index)
50
return index + MIN_API_BLOCK_NO;
53
#if defined DEBUG_TRANSPORTER
54
#define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
59
/*****************************************************************************
61
*****************************************************************************/
64
reportError(void * callbackObj, NodeId nodeId,
65
TransporterError errorCode, const char *info)
67
#ifdef REPORT_TRANSPORTER
68
ndbout_c("REPORT_TRANSP: reportError (nodeId=%d, errorCode=%d) %s",
69
(int)nodeId, (int)errorCode, info ? info : "");
71
if(errorCode & TE_DO_DISCONNECT) {
72
ndbout_c("reportError (%d, %d) %s", (int)nodeId, (int)errorCode,
74
((TransporterFacade*)(callbackObj))->doDisconnect(nodeId);
79
* Report average send length in bytes (4096 last sends)
82
reportSendLen(void * callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
83
#ifdef REPORT_TRANSPORTER
84
ndbout_c("REPORT_TRANSP: reportSendLen (nodeId=%d, bytes/count=%d)",
85
(int)nodeId, (Uint32)(bytes/count));
93
* Report average receive length in bytes (4096 last receives)
96
reportReceiveLen(void * callbackObj,
97
NodeId nodeId, Uint32 count, Uint64 bytes){
98
#ifdef REPORT_TRANSPORTER
99
ndbout_c("REPORT_TRANSP: reportReceiveLen (nodeId=%d, bytes/count=%d)",
100
(int)nodeId, (Uint32)(bytes/count));
108
* Report connection established
111
reportConnect(void * callbackObj, NodeId nodeId){
112
#ifdef REPORT_TRANSPORTER
113
ndbout_c("REPORT_TRANSP: API reportConnect (nodeId=%d)", (int)nodeId);
115
((TransporterFacade*)(callbackObj))->reportConnected(nodeId);
119
* Report connection broken
122
reportDisconnect(void * callbackObj, NodeId nodeId, Uint32 error){
123
#ifdef REPORT_TRANSPORTER
124
ndbout_c("REPORT_TRANSP: API reportDisconnect (nodeId=%d)", (int)nodeId);
126
((TransporterFacade*)(callbackObj))->reportDisconnected(nodeId);
130
transporter_recv_from(void * callbackObj, NodeId nodeId){
131
((TransporterFacade*)(callbackObj))->hb_received(nodeId);
134
/****************************************************************************
136
*****************************************************************************/
139
* Report connection broken
141
int checkJobBuffer() {
146
static const char * API_SIGNAL_LOG = "API_SIGNAL_LOG";
147
static const char * apiSignalLog = 0;
148
static SignalLoggerManager signalLogger;
153
signalLogger.flushSignalLog();
155
const char * tmp = NdbEnv_GetEnv(API_SIGNAL_LOG, (char *)0, 0);
156
if(tmp != 0 && apiSignalLog != 0 && strcmp(tmp,apiSignalLog) == 0){
158
} else if(tmp == 0 && apiSignalLog == 0){
160
} else if(tmp == 0 && apiSignalLog != 0){
161
signalLogger.setOutputStream(0);
165
if (strcmp(tmp, "-") == 0)
166
signalLogger.setOutputStream(stdout);
168
else if (strcmp(tmp, "+") == 0)
169
signalLogger.setOutputStream(DBUG_FILE);
172
signalLogger.setOutputStream(fopen(tmp, "w"));
178
#ifdef TRACE_APIREGREQ
179
#define TRACE_GSN(gsn) true
181
#define TRACE_GSN(gsn) (gsn != GSN_API_REGREQ && gsn != GSN_API_REGCONF)
186
* The execute function : Handle received signal
189
execute(void * callbackObj, SignalHeader * const header,
190
Uint8 prio, Uint32 * const theData,
191
LinearSectionPtr ptr[3]){
193
TransporterFacade * theFacade = (TransporterFacade*)callbackObj;
194
TransporterFacade::ThreadData::Object_Execute oe;
195
Uint32 tRecBlockNo = header->theReceiversBlockNumber;
198
if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){
199
signalLogger.executeSignal(* header,
203
ptr, header->m_noOfSections);
204
signalLogger.flushSignalLog();
208
if (tRecBlockNo >= MIN_API_BLOCK_NO) {
209
oe = theFacade->m_threads.get(tRecBlockNo);
210
if (oe.m_object != 0 && oe.m_executeFunction != 0) {
212
* Handle received signal immediately to avoid any unnecessary
213
* copying of data, allocation of memory and other things. Copying
214
* of data could be interesting to support several priority levels
215
* and to support a special memory structure when executing the
216
* signals. Neither of those are interesting when receiving data
217
* in the NDBAPI. The NDBAPI will thus read signal data directly as
218
* it was written by the sender (SCI sender is other node, Shared
219
* memory sender is other process and TCP/IP sender is the OS that
220
* writes the TCP/IP message into a message buffer).
222
NdbApiSignal tmpSignal(*header);
223
NdbApiSignal * tSignal = &tmpSignal;
224
tSignal->setDataPtr(theData);
225
(* oe.m_executeFunction) (oe.m_object, tSignal, ptr);
227
} else if (tRecBlockNo == API_PACKED) {
229
* Block number == 2047 is used to signal a signal that consists of
230
* multiple instances of the same signal. This is an effort to
231
* package the signals so as to avoid unnecessary communication
232
* overhead since TCP/IP has a great performance impact.
234
Uint32 Tlength = header->theLength;
237
* Since it contains at least two data packets we will first
238
* copy the signal data to safe place.
240
while (Tsent < Tlength) {
241
Uint32 Theader = theData[Tsent];
243
Uint32 TpacketLen = (Theader & 0x1F) + 3;
244
tRecBlockNo = Theader >> 16;
245
if (TpacketLen <= 25) {
246
if ((TpacketLen + Tsent) <= Tlength) {
248
* Set the data length of the signal and the receivers block
249
* reference and then call the API.
251
header->theLength = TpacketLen;
252
header->theReceiversBlockNumber = tRecBlockNo;
253
Uint32* tDataPtr = &theData[Tsent];
255
if (tRecBlockNo >= MIN_API_BLOCK_NO) {
256
oe = theFacade->m_threads.get(tRecBlockNo);
257
if(oe.m_object != 0 && oe.m_executeFunction != 0){
258
NdbApiSignal tmpSignal(*header);
259
NdbApiSignal * tSignal = &tmpSignal;
260
tSignal->setDataPtr(tDataPtr);
261
(*oe.m_executeFunction)(oe.m_object, tSignal, 0);
268
} else if (tRecBlockNo == API_CLUSTERMGR) {
270
* The signal was aimed for the Cluster Manager.
271
* We handle it immediately here.
273
ClusterMgr * clusterMgr = theFacade->theClusterMgr;
274
const Uint32 gsn = header->theVerId_signalNumber;
278
clusterMgr->execAPI_REGREQ(theData);
281
case GSN_API_REGCONF:
282
clusterMgr->execAPI_REGCONF(theData);
286
clusterMgr->execAPI_REGREF(theData);
289
case GSN_NODE_FAILREP:
290
clusterMgr->execNODE_FAILREP(theData);
293
case GSN_NF_COMPLETEREP:
294
clusterMgr->execNF_COMPLETEREP(theData);
297
case GSN_ARBIT_STARTREQ:
298
if (theFacade->theArbitMgr != NULL)
299
theFacade->theArbitMgr->doStart(theData);
302
case GSN_ARBIT_CHOOSEREQ:
303
if (theFacade->theArbitMgr != NULL)
304
theFacade->theArbitMgr->doChoose(theData);
307
case GSN_ARBIT_STOPORD:
308
if(theFacade->theArbitMgr != NULL)
309
theFacade->theArbitMgr->doStop(theData);
312
case GSN_ALTER_TABLE_REP:
314
const AlterTableRep* rep = (const AlterTableRep*)theData;
315
theFacade->m_globalDictCache.lock();
316
theFacade->m_globalDictCache.
317
alter_table_rep((const char*)ptr[0].p,
320
rep->changeType == AlterTableRep::CT_ALTERED);
321
theFacade->m_globalDictCache.unlock();
324
case GSN_SUB_GCP_COMPLETE_REP:
329
NdbApiSignal tSignal(* header);
330
tSignal.setDataPtr(theData);
331
theFacade->for_each(&tSignal, ptr);
337
Uint32* send= tSignal.getDataPtrSend();
338
memcpy(send, theData, tSignal.getLength() << 2);
339
((SubGcpCompleteAck*)send)->rep.senderRef =
340
numberToRef(API_CLUSTERMGR, theFacade->theOwnId);
341
Uint32 ref= header->theSendersBlockRef;
342
Uint32 aNodeId= refToNode(ref);
343
tSignal.theReceiversBlockNumber= refToBlock(ref);
344
tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK;
345
theFacade->sendSignalUnCond(&tSignal, aNodeId);
355
; // Ignore all other block numbers.
356
if(header->theVerId_signalNumber!=3) {
357
TRP_DEBUG( "TransporterFacade received signal to unknown block no." );
358
ndbout << "BLOCK NO: " << tRecBlockNo << " sig "
359
<< header->theVerId_signalNumber << endl;
365
// These symbols are needed, but not used in the API
367
SignalLoggerManager::printSegmentedSection(FILE *, const SignalHeader &,
368
const SegmentedSectionPtr ptr[3],
374
copy(Uint32 * & insertPtr,
375
class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
380
* Note that this function need no locking since its
381
* only called from the constructor of Ndb (the NdbObject)
383
* Which is protected by a mutex
387
TransporterFacade::start_instance(int nodeId,
388
const ndb_mgm_configuration* props)
390
if (! init(nodeId, props)) {
395
* Install signal handler for SIGPIPE
397
* This due to the fact that a socket connection might have
398
* been closed in between a select and a corresponding send
400
#if !defined NDB_WIN32
401
signal(SIGPIPE, SIG_IGN);
408
* Note that this function need no locking since its
409
* only called from the destructor of Ndb (the NdbObject)
411
* Which is protected by a mutex
414
TransporterFacade::stop_instance(){
415
DBUG_ENTER("TransporterFacade::stop_instance");
421
TransporterFacade::doStop(){
422
DBUG_ENTER("TransporterFacade::doStop");
424
* First stop the ClusterMgr because it needs to send one more signal
425
* and also uses theFacadeInstance to lock/unlock theMutexPtr
427
if (theClusterMgr != NULL) theClusterMgr->doStop();
428
if (theArbitMgr != NULL) theArbitMgr->doStop(NULL);
431
* Now stop the send and receive threads
435
if (theReceiveThread) {
436
NdbThread_WaitFor(theReceiveThread, &status);
437
NdbThread_Destroy(&theReceiveThread);
440
NdbThread_WaitFor(theSendThread, &status);
441
NdbThread_Destroy(&theSendThread);
448
runSendRequest_C(void * me)
450
((TransporterFacade*) me)->threadMainSend();
454
void TransporterFacade::threadMainSend(void)
456
theTransporterRegistry->startSending();
457
if (!theTransporterRegistry->start_clients()){
458
ndbout_c("Unable to start theTransporterRegistry->start_clients");
462
m_socket_server.startServer();
464
while(!theStopReceive) {
465
NdbSleep_MilliSleep(10);
466
NdbMutex_Lock(theMutexPtr);
467
if (sendPerformedLastInterval == 0) {
468
theTransporterRegistry->performSend();
470
sendPerformedLastInterval = 0;
471
NdbMutex_Unlock(theMutexPtr);
473
theTransporterRegistry->stopSending();
475
m_socket_server.stopServer();
476
m_socket_server.stopSessions(true);
478
theTransporterRegistry->stop_clients();
483
runReceiveResponse_C(void * me)
485
((TransporterFacade*) me)->threadMainReceive();
490
The receiver thread is changed to only wake up once every 10 milliseconds
491
to poll. It will first check that nobody owns the poll "right" before
492
polling. This means that methods using the receiveResponse and
493
sendRecSignal will have a slightly longer response time if they are
494
executed without any parallel key lookups. Currently also scans are
495
affected but this is to be fixed.
497
void TransporterFacade::threadMainReceive(void)
499
theTransporterRegistry->startReceiving();
500
#ifdef NDB_SHM_TRANSPORTER
501
NdbThread_set_shm_sigmask(TRUE);
503
NdbMutex_Lock(theMutexPtr);
504
theTransporterRegistry->update_connections();
505
NdbMutex_Unlock(theMutexPtr);
506
while(!theStopReceive) {
507
for(int i = 0; i<10; i++){
508
NdbSleep_MilliSleep(10);
509
NdbMutex_Lock(theMutexPtr);
510
if (poll_owner == NULL) {
511
const int res = theTransporterRegistry->pollReceive(0);
513
theTransporterRegistry->performReceive();
515
NdbMutex_Unlock(theMutexPtr);
517
NdbMutex_Lock(theMutexPtr);
518
theTransporterRegistry->update_connections();
519
NdbMutex_Unlock(theMutexPtr);
521
theTransporterRegistry->stopReceiving();
524
This method is called by worker thread that owns the poll "rights".
525
It waits for events and if something arrives it takes care of it
526
and returns to caller. It will quickly come back here if not all
527
data was received for the worker thread.
529
void TransporterFacade::external_poll(Uint32 wait_time)
531
NdbMutex_Unlock(theMutexPtr);
532
const int res = theTransporterRegistry->pollReceive(wait_time);
533
NdbMutex_Lock(theMutexPtr);
535
theTransporterRegistry->performReceive();
540
This Ndb object didn't get hold of the poll "right" and will wait on a
541
conditional mutex wait instead. It is put into the conditional wait
542
queue so that it is accessible to take over the poll "right" if needed.
543
The method gets a free entry in the free list and puts it first in the
544
doubly linked list. Finally it assigns the ndb object reference to the
547
Uint32 TransporterFacade::put_in_cond_wait_queue(NdbWaiter *aWaiter)
552
Uint32 index = first_free_cond_wait;
553
assert(index < MAX_NO_THREADS);
554
first_free_cond_wait = cond_wait_array[index].next_cond_wait;
557
Put in doubly linked list
559
cond_wait_array[index].next_cond_wait = MAX_NO_THREADS;
560
cond_wait_array[index].prev_cond_wait = last_in_cond_wait;
561
if (last_in_cond_wait == MAX_NO_THREADS) {
562
first_in_cond_wait = index;
564
cond_wait_array[last_in_cond_wait].next_cond_wait = index;
565
last_in_cond_wait = index;
567
cond_wait_array[index].cond_wait_object = aWaiter;
568
aWaiter->set_cond_wait_index(index);
573
Somebody is about to signal the thread to wake it up, it could also
574
be that it woke up on a timeout and found himself still in the list.
575
Removes the entry from the doubly linked list.
576
Inserts the entry into the free list.
577
NULLifies the ndb object reference entry and sets the index in the
578
Ndb object to NIL (=MAX_NO_THREADS)
580
void TransporterFacade::remove_from_cond_wait_queue(NdbWaiter *aWaiter)
582
Uint32 index = aWaiter->get_cond_wait_index();
583
assert(index < MAX_NO_THREADS &&
584
cond_wait_array[index].cond_wait_object == aWaiter);
586
Remove from doubly linked list
588
Uint32 prev_elem, next_elem;
589
prev_elem = cond_wait_array[index].prev_cond_wait;
590
next_elem = cond_wait_array[index].next_cond_wait;
591
if (prev_elem != MAX_NO_THREADS)
592
cond_wait_array[prev_elem].next_cond_wait = next_elem;
594
first_in_cond_wait = next_elem;
595
if (next_elem != MAX_NO_THREADS)
596
cond_wait_array[next_elem].prev_cond_wait = prev_elem;
598
last_in_cond_wait = prev_elem;
600
Insert into free list
602
cond_wait_array[index].next_cond_wait = first_free_cond_wait;
603
cond_wait_array[index].prev_cond_wait = MAX_NO_THREADS;
604
first_free_cond_wait = index;
606
cond_wait_array[index].cond_wait_object = NULL;
607
aWaiter->set_cond_wait_index(MAX_NO_THREADS);
611
Get the latest Ndb object from the conditional wait queue
612
and also remove it from the list.
614
NdbWaiter* TransporterFacade::rem_last_from_cond_wait_queue()
617
Uint32 index = last_in_cond_wait;
618
if (last_in_cond_wait == MAX_NO_THREADS)
620
tWaiter = cond_wait_array[index].cond_wait_object;
621
remove_from_cond_wait_queue(tWaiter);
625
void TransporterFacade::init_cond_wait_queue()
629
Initialise the doubly linked list as empty
631
first_in_cond_wait = MAX_NO_THREADS;
632
last_in_cond_wait = MAX_NO_THREADS;
636
first_free_cond_wait = 0;
637
for (i = 0; i < MAX_NO_THREADS; i++) {
638
cond_wait_array[i].cond_wait_object = NULL;
639
cond_wait_array[i].next_cond_wait = i+1;
640
cond_wait_array[i].prev_cond_wait = MAX_NO_THREADS;
644
TransporterFacade::TransporterFacade() :
645
theTransporterRegistry(0),
648
theReceiveThread(NULL),
649
m_fragmented_signal_id(0)
651
DBUG_ENTER("TransporterFacade::TransporterFacade");
652
init_cond_wait_queue();
655
theMutexPtr = NdbMutex_Create();
656
sendPerformedLastInterval = 0;
659
currentSendLimit = 1;
660
theClusterMgr = NULL;
663
m_scan_batch_size= MAX_SCAN_BATCH_SIZE;
664
m_batch_byte_size= SCAN_BATCH_SIZE;
665
m_batch_size= DEF_BATCH_SIZE;
668
theClusterMgr = new ClusterMgr(* this);
677
TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
679
DBUG_ENTER("TransporterFacade::init");
682
theTransporterRegistry = new TransporterRegistry(this);
684
const int res = IPCConfig::configureTransporters(nodeId,
686
* theTransporterRegistry);
688
TRP_DEBUG( "configureTransporters returned 0 or less" );
692
ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
694
theClusterMgr->init(iter);
697
if(iter.find(CFG_NODE_ID, nodeId)){
698
TRP_DEBUG( "Node info missing from config." );
703
if(!iter.get(CFG_NODE_ARBIT_RANK, &rank) && rank>0){
704
theArbitMgr = new ArbitMgr(* this);
705
theArbitMgr->setRank(rank);
707
iter.get(CFG_NODE_ARBIT_DELAY, &delay);
708
theArbitMgr->setDelay(delay);
710
Uint32 scan_batch_size= 0;
711
if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
712
m_scan_batch_size= scan_batch_size;
714
Uint32 batch_byte_size= 0;
715
if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) {
716
m_batch_byte_size= batch_byte_size;
718
Uint32 batch_size= 0;
719
if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
720
m_batch_size= batch_size;
723
Uint32 timeout = 120000;
725
for (iter.first(); iter.valid(); iter.next())
727
Uint32 tmp1 = 0, tmp2 = 0;
728
iter.get(CFG_DB_TRANSACTION_CHECK_INTERVAL, &tmp1);
729
iter.get(CFG_DB_TRANSACTION_DEADLOCK_TIMEOUT, &tmp2);
734
m_waitfor_timeout = timeout;
736
if (!theTransporterRegistry->start_service(m_socket_server)){
737
ndbout_c("Unable to start theTransporterRegistry->start_service");
741
theReceiveThread = NdbThread_Create(runReceiveResponse_C,
745
NDB_THREAD_PRIO_LOW);
747
theSendThread = NdbThread_Create(runSendRequest_C,
751
NDB_THREAD_PRIO_LOW);
752
theClusterMgr->startThread();
755
signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
762
TransporterFacade::for_each(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
764
DBUG_ENTER("TransporterFacade::for_each");
765
Uint32 sz = m_threads.m_statusNext.size();
766
TransporterFacade::ThreadData::Object_Execute oe;
767
for (Uint32 i = 0; i < sz ; i ++)
769
oe = m_threads.m_objectExecute[i];
770
if (m_threads.getInUse(i))
772
(* oe.m_executeFunction) (oe.m_object, aSignal, ptr);
779
TransporterFacade::connected()
781
DBUG_ENTER("TransporterFacade::connected");
782
Uint32 sz = m_threads.m_statusNext.size();
783
for (Uint32 i = 0; i < sz ; i ++) {
784
if (m_threads.getInUse(i)){
785
void * obj = m_threads.m_objectExecute[i].m_object;
786
NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
787
(*RegPC) (obj, numberToRef(indexToNumber(i), theOwnId), true, true);
794
TransporterFacade::ReportNodeDead(NodeId tNodeId)
796
DBUG_ENTER("TransporterFacade::ReportNodeDead");
797
DBUG_PRINT("enter",("nodeid= %d", tNodeId));
799
* When a node fails we must report this to each Ndb object.
800
* The function that is used for communicating node failures is called.
801
* This is to ensure that the Ndb objects do not think their connections
802
* are correct after a failure followed by a restart.
803
* After the restart the node is up again and the Ndb object
804
* might not have noticed the failure.
806
Uint32 sz = m_threads.m_statusNext.size();
807
for (Uint32 i = 0; i < sz ; i ++) {
808
if (m_threads.getInUse(i)){
809
void * obj = m_threads.m_objectExecute[i].m_object;
810
NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
811
(*RegPC) (obj, tNodeId, false, false);
818
TransporterFacade::ReportNodeFailureComplete(NodeId tNodeId)
821
* When a node fails we must report this to each Ndb object.
822
* The function that is used for communicating node failures is called.
823
* This is to ensure that the Ndb objects do not think their connections
824
* are correct after a failure followed by a restart.
825
* After the restart the node is up again and the Ndb object
826
* might not have noticed the failure.
829
DBUG_ENTER("TransporterFacade::ReportNodeFailureComplete");
830
DBUG_PRINT("enter",("nodeid= %d", tNodeId));
831
Uint32 sz = m_threads.m_statusNext.size();
832
for (Uint32 i = 0; i < sz ; i ++) {
833
if (m_threads.getInUse(i)){
834
void * obj = m_threads.m_objectExecute[i].m_object;
835
NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
836
(*RegPC) (obj, tNodeId, false, true);
843
TransporterFacade::ReportNodeAlive(NodeId tNodeId)
846
* When a node fails we must report this to each Ndb object.
847
* The function that is used for communicating node failures is called.
848
* This is to ensure that the Ndb objects do not think there connections
849
* are correct after a failure
850
* followed by a restart.
851
* After the restart the node is up again and the Ndb object
852
* might not have noticed the failure.
854
Uint32 sz = m_threads.m_statusNext.size();
855
for (Uint32 i = 0; i < sz ; i ++) {
856
if (m_threads.getInUse(i)){
857
void * obj = m_threads.m_objectExecute[i].m_object;
858
NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
859
(*RegPC) (obj, tNodeId, true, false);
865
TransporterFacade::close(BlockNumber blockNumber, Uint64 trans_id)
867
NdbMutex_Lock(theMutexPtr);
868
Uint32 low_bits = (Uint32)trans_id;
869
m_max_trans_id = m_max_trans_id > low_bits ? m_max_trans_id : low_bits;
870
close_local(blockNumber);
871
NdbMutex_Unlock(theMutexPtr);
876
TransporterFacade::close_local(BlockNumber blockNumber){
877
m_threads.close(blockNumber);
882
TransporterFacade::open(void* objRef,
884
NodeStatusFunction statusFun)
886
DBUG_ENTER("TransporterFacade::open");
887
int r= m_threads.open(objRef, fun, statusFun);
892
(*statusFun)(objRef, numberToRef(r, theOwnId), true, true);
898
TransporterFacade::~TransporterFacade()
900
DBUG_ENTER("TransporterFacade::~TransporterFacade");
902
NdbMutex_Lock(theMutexPtr);
903
delete theClusterMgr;
905
delete theTransporterRegistry;
906
NdbMutex_Unlock(theMutexPtr);
907
NdbMutex_Destroy(theMutexPtr);
909
signalLogger.setOutputStream(0);
915
TransporterFacade::calculateSendLimit()
918
Uint32 TthreadCount = 0;
920
Uint32 sz = m_threads.m_statusNext.size();
921
for (Ti = 0; Ti < sz; Ti++) {
922
if (m_threads.m_statusNext[Ti] == (ThreadData::ACTIVE)){
924
m_threads.m_statusNext[Ti] = ThreadData::INACTIVE;
927
currentSendLimit = TthreadCount;
928
if (currentSendLimit == 0) {
929
currentSendLimit = 1;
931
checkCounter = currentSendLimit << 2;
935
//-------------------------------------------------
936
// Force sending but still report the sending to the
937
// adaptive algorithm.
938
//-------------------------------------------------
939
void TransporterFacade::forceSend(Uint32 block_number) {
941
m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
942
sendPerformedLastInterval = 1;
943
if (checkCounter < 0) {
944
calculateSendLimit();
946
theTransporterRegistry->forceSendCheck(0);
949
//-------------------------------------------------
950
// Improving API performance
951
//-------------------------------------------------
953
TransporterFacade::checkForceSend(Uint32 block_number) {
954
m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
955
//-------------------------------------------------
956
// This code is an adaptive algorithm to discover when
957
// the API should actually send its buffers. The reason
958
// is that the performance is highly dependent on the
959
// size of the writes over the communication network.
960
// Thus we try to ensure that the send size is as big
961
// as possible. At the same time we don't want response
962
// time to increase so therefore we have to keep track of
963
// how the users are performing adaptively.
964
//-------------------------------------------------
966
if (theTransporterRegistry->forceSendCheck(currentSendLimit) == 1) {
967
sendPerformedLastInterval = 1;
970
if (checkCounter < 0) {
971
calculateSendLimit();
976
/******************************************************************************
977
* SEND SIGNAL METHODS
978
*****************************************************************************/
980
TransporterFacade::sendSignal(NdbApiSignal * aSignal, NodeId aNode){
981
Uint32* tDataPtr = aSignal->getDataPtrSend();
982
Uint32 Tlen = aSignal->theLength;
983
Uint32 TBno = aSignal->theReceiversBlockNumber;
984
if(getIsNodeSendable(aNode) == true){
986
if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
987
Uint32 tmp = aSignal->theSendersBlockRef;
988
aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
989
LinearSectionPtr ptr[3];
990
signalLogger.sendSignal(* aSignal,
994
signalLogger.flushSignalLog();
995
aSignal->theSendersBlockRef = tmp;
998
if ((Tlen != 0) && (Tlen <= 25) && (TBno != 0)) {
999
SendStatus ss = theTransporterRegistry->prepareSend(aSignal,
1004
//if (ss != SEND_OK) ndbout << ss << endl;
1005
return (ss == SEND_OK ? 0 : -1);
1007
ndbout << "ERR: SigLen = " << Tlen << " BlockRec = " << TBno;
1008
ndbout << " SignalNo = " << aSignal->theVerId_signalNumber << endl;
1012
//const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(aNode);
1013
//const Uint32 startLevel = node.m_state.startLevel;
1014
return -1; // Node Dead
1018
TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){
1019
Uint32* tDataPtr = aSignal->getDataPtrSend();
1021
if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1022
Uint32 tmp = aSignal->theSendersBlockRef;
1023
aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
1024
LinearSectionPtr ptr[3];
1025
signalLogger.sendSignal(* aSignal,
1029
signalLogger.flushSignalLog();
1030
aSignal->theSendersBlockRef = tmp;
1033
assert((aSignal->theLength != 0) &&
1034
(aSignal->theLength <= 25) &&
1035
(aSignal->theReceiversBlockNumber != 0));
1036
SendStatus ss = theTransporterRegistry->prepareSend(aSignal,
1042
return (ss == SEND_OK ? 0 : -1);
1045
#define CHUNK_SZ NDB_SECTION_SEGMENT_SZ*64 // related to MAX_MESSAGE_SIZE
1047
TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode,
1048
LinearSectionPtr ptr[3], Uint32 secs)
1050
if(getIsNodeSendable(aNode) != true)
1054
if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1055
Uint32 tmp = aSignal->theSendersBlockRef;
1056
aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
1057
signalLogger.sendSignal(* aSignal,
1059
aSignal->getDataPtrSend(),
1062
aSignal->theSendersBlockRef = tmp;
1066
NdbApiSignal tmp_signal(*(SignalHeader*)aSignal);
1067
LinearSectionPtr tmp_ptr[3];
1068
Uint32 unique_id= m_fragmented_signal_id++; // next unique id
1070
for (i= 0; i < secs; i++)
1073
unsigned start_i= 0;
1074
unsigned chunk_sz= 0;
1075
unsigned fragment_info= 0;
1076
Uint32 *tmp_data= tmp_signal.getDataPtrSend();
1077
for (i= 0; i < secs;) {
1078
unsigned save_sz= tmp_ptr[i].sz;
1079
tmp_data[i-start_i]= i;
1080
if (chunk_sz + save_sz > CHUNK_SZ) {
1082
unsigned send_sz= CHUNK_SZ - chunk_sz;
1083
if (i != start_i) // first piece of a new section has to be a multiple of NDB_SECTION_SEGMENT_SZ
1086
NDB_SECTION_SEGMENT_SZ
1087
*(send_sz+NDB_SECTION_SEGMENT_SZ-1)
1088
/NDB_SECTION_SEGMENT_SZ;
1089
if (send_sz > save_sz)
1092
tmp_ptr[i].sz= send_sz;
1094
if (fragment_info < 2) // 1 = first fragment, 2 = middle fragments
1098
tmp_data[i-start_i+1]= unique_id;
1099
tmp_signal.setLength(i-start_i+2);
1100
tmp_signal.m_fragmentInfo= fragment_info;
1101
tmp_signal.m_noOfSections= i-start_i+1;
1104
SendStatus ss = theTransporterRegistry->prepareSend
1110
assert(ss != SEND_MESSAGE_TOO_BIG);
1111
if (ss != SEND_OK) return -1;
1113
// setup variables for next signal
1116
tmp_ptr[i].sz= save_sz-send_sz;
1117
tmp_ptr[i].p+= send_sz;
1118
if (tmp_ptr[i].sz == 0)
1128
unsigned a_sz= aSignal->getLength();
1130
if (fragment_info > 0) {
1131
// update the original signal to include section info
1132
Uint32 *a_data= aSignal->getDataPtrSend();
1133
unsigned tmp_sz= i-start_i;
1136
tmp_sz*sizeof(Uint32));
1137
a_data[a_sz+tmp_sz]= unique_id;
1138
aSignal->setLength(a_sz+tmp_sz+1);
1140
// send last fragment
1141
aSignal->m_fragmentInfo= 3; // 3 = last fragment
1142
aSignal->m_noOfSections= i-start_i;
1144
aSignal->m_noOfSections= secs;
1150
SendStatus ss = theTransporterRegistry->prepareSend
1153
aSignal->getDataPtrSend(),
1156
assert(ss != SEND_MESSAGE_TOO_BIG);
1157
ret = (ss == SEND_OK ? 0 : -1);
1159
aSignal->m_noOfSections = 0;
1160
aSignal->m_fragmentInfo = 0;
1161
aSignal->setLength(a_sz);
1166
TransporterFacade::sendSignal(NdbApiSignal* aSignal, NodeId aNode,
1167
LinearSectionPtr ptr[3], Uint32 secs){
1168
aSignal->m_noOfSections = secs;
1169
if(getIsNodeSendable(aNode) == true){
1171
if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1172
Uint32 tmp = aSignal->theSendersBlockRef;
1173
aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
1174
signalLogger.sendSignal(* aSignal,
1176
aSignal->getDataPtrSend(),
1179
signalLogger.flushSignalLog();
1180
aSignal->theSendersBlockRef = tmp;
1183
SendStatus ss = theTransporterRegistry->prepareSend
1186
aSignal->getDataPtrSend(),
1189
assert(ss != SEND_MESSAGE_TOO_BIG);
1190
aSignal->m_noOfSections = 0;
1191
return (ss == SEND_OK ? 0 : -1);
1193
aSignal->m_noOfSections = 0;
1197
/******************************************************************************
1198
* CONNECTION METHODS Etc
1199
******************************************************************************/
1202
TransporterFacade::doConnect(int aNodeId){
1203
theTransporterRegistry->setIOState(aNodeId, NoHalt);
1204
theTransporterRegistry->do_connect(aNodeId);
1208
TransporterFacade::doDisconnect(int aNodeId)
1210
theTransporterRegistry->do_disconnect(aNodeId);
1214
TransporterFacade::reportConnected(int aNodeId)
1216
theClusterMgr->reportConnected(aNodeId);
1221
TransporterFacade::reportDisconnected(int aNodeId)
1223
theClusterMgr->reportDisconnected(aNodeId);
1228
TransporterFacade::ownId() const
1234
TransporterFacade::isConnected(NodeId aNodeId){
1235
return theTransporterRegistry->is_connected(aNodeId);
1239
TransporterFacade::get_an_alive_node()
1241
DBUG_ENTER("TransporterFacade::get_an_alive_node");
1242
DBUG_PRINT("enter", ("theStartNodeId: %d", theStartNodeId));
1244
const char* p = NdbEnv_GetEnv("NDB_ALIVE_NODE_ID", (char*)0, 0);
1245
if (p != 0 && *p != 0)
1249
for (i = theStartNodeId; i < MAX_NDB_NODES; i++) {
1250
if (get_node_alive(i)){
1251
DBUG_PRINT("info", ("Node %d is alive", i));
1252
theStartNodeId = ((i + 1) % MAX_NDB_NODES);
1256
for (i = 1; i < theStartNodeId; i++) {
1257
if (get_node_alive(i)){
1258
DBUG_PRINT("info", ("Node %d is alive", i));
1259
theStartNodeId = ((i + 1) % MAX_NDB_NODES);
1263
DBUG_RETURN((NodeId)0);
1266
TransporterFacade::ThreadData::ThreadData(Uint32 size){
1268
m_firstFree = END_OF_LIST;
1273
TransporterFacade::ThreadData::expand(Uint32 size){
1274
Object_Execute oe = { 0 ,0 };
1275
NodeStatusFunction fun = 0;
1277
const Uint32 sz = m_statusNext.size();
1278
m_objectExecute.fill(sz + size, oe);
1279
m_statusFunction.fill(sz + size, fun);
1280
for(Uint32 i = 0; i<size; i++){
1281
m_statusNext.push_back(sz + i + 1);
1284
m_statusNext.back() = m_firstFree;
1285
m_firstFree = m_statusNext.size() - size;
1290
TransporterFacade::ThreadData::open(void* objRef,
1291
ExecuteFunction fun,
1292
NodeStatusFunction fun2)
1294
Uint32 nextFree = m_firstFree;
1296
if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){
1300
if(nextFree == END_OF_LIST){
1302
nextFree = m_firstFree;
1306
m_firstFree = m_statusNext[nextFree];
1308
Object_Execute oe = { objRef , fun };
1310
m_statusNext[nextFree] = INACTIVE;
1311
m_objectExecute[nextFree] = oe;
1312
m_statusFunction[nextFree] = fun2;
1314
return indexToNumber(nextFree);
1318
TransporterFacade::ThreadData::close(int number){
1319
number= numberToIndex(number);
1320
assert(getInUse(number));
1321
m_statusNext[number] = m_firstFree;
1324
m_firstFree = number;
1325
Object_Execute oe = { 0, 0 };
1326
m_objectExecute[number] = oe;
1327
m_statusFunction[number] = 0;
1332
TransporterFacade::get_active_ndb_objects() const
1334
return m_threads.m_use_cnt;
1337
PollGuard::PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter,
1343
m_block_no= block_no;
1348
This is a common routine for possibly forcing the send of buffered signals
1349
and receiving response the thread is waiting for. It is designed to be
1351
1) PK, UK lookups using the asynchronous interface
1352
This routine uses the wait_for_input routine instead since it has
1353
special end conditions due to the asynchronous nature of its usage.
1356
It uses a NdbWaiter object to wait on the events and this object is
1357
linked into the conditional wait queue. Thus this object contains
1358
a reference to its place in the queue.
1360
It replaces the method receiveResponse previously used on the Ndb object
1362
int PollGuard::wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state,
1366
m_waiter->set_node(nodeId);
1367
m_waiter->set_state(state);
1368
ret_val= wait_for_input_in_loop(wait_time, forceSend);
1369
unlock_and_signal();
1373
int PollGuard::wait_scan(int wait_time, NodeId nodeId, bool forceSend)
1375
m_waiter->set_node(nodeId);
1376
m_waiter->set_state(WAIT_SCAN);
1377
return wait_for_input_in_loop(wait_time, forceSend);
1380
int PollGuard::wait_for_input_in_loop(int wait_time, bool forceSend)
1384
m_tp->forceSend(m_block_no);
1386
m_tp->checkForceSend(m_block_no);
1388
NDB_TICKS curr_time = NdbTick_CurrentMillisecond();
1389
NDB_TICKS max_time = curr_time + (NDB_TICKS)wait_time;
1390
const int maxsleep = (wait_time == -1 || wait_time > 10) ? 10 : wait_time;
1393
wait_for_input(maxsleep);
1394
Uint32 state= m_waiter->get_state();
1395
if (state == NO_WAIT)
1399
else if (state == WAIT_NODE_FAILURE)
1404
if (wait_time == -1)
1407
ndbout << "Waited WAITFOR_RESPONSE_TIMEOUT, continuing wait" << endl;
1411
wait_time= max_time - NdbTick_CurrentMillisecond();
1415
ndbout << "Time-out state is " << m_waiter->get_state() << endl;
1417
m_waiter->set_state(WST_WAIT_TIMEOUT);
1423
ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = ";
1424
ndbout << m_waiter->get_state() << endl;
1426
m_waiter->set_state(NO_WAIT);
1430
void PollGuard::wait_for_input(int wait_time)
1432
NdbWaiter *t_poll_owner= m_tp->get_poll_owner();
1433
if (t_poll_owner != NULL && t_poll_owner != m_waiter)
1436
We didn't get hold of the poll "right". We will sleep on a
1437
conditional mutex until the thread owning the poll "right"
1438
will wake us up after all data is received. If no data arrives
1439
we will wake up eventually due to the timeout.
1440
After receiving all data we take the object out of the cond wait
1441
queue if it hasn't happened already. It is usually already out of the
1442
queue but at time-out it could be that the object is still there.
1444
(void) m_tp->put_in_cond_wait_queue(m_waiter);
1445
m_waiter->wait(wait_time);
1446
if (m_waiter->get_cond_wait_index() != TransporterFacade::MAX_NO_THREADS)
1448
m_tp->remove_from_cond_wait_queue(m_waiter);
1454
We got the poll "right" and we poll until data is received. After
1455
receiving data we will check if all data is received, if not we
1458
#ifdef NDB_SHM_TRANSPORTER
1460
If shared memory transporters are used we need to set our sigmask
1461
such that we wake up also on interrupts on the shared memory
1464
NdbThread_set_shm_sigmask(FALSE);
1466
m_tp->set_poll_owner(m_waiter);
1467
m_waiter->set_poll_owner(true);
1468
m_tp->external_poll((Uint32)wait_time);
1472
void PollGuard::unlock_and_signal()
1474
NdbWaiter *t_signal_cond_waiter= 0;
1478
When completing the poll for this thread we must return the poll
1479
ownership if we own it. We will give it to the last thread that
1480
came here (the most recent) which is likely to be the one also
1481
last to complete. We will remove that thread from the conditional
1482
wait queue and set him as the new owner of the poll "right".
1483
We will wait however with the signal until we have unlocked the
1484
mutex for performance reasons.
1485
See Stevens book on Unix NetworkProgramming: The Sockets Networking
1486
API Volume 1 Third Edition on page 703-704 for a discussion on this
1489
if (m_tp->get_poll_owner() == m_waiter)
1491
#ifdef NDB_SHM_TRANSPORTER
1493
If shared memory transporters are used we need to reset our sigmask
1494
since we are no longer the thread to receive interrupts.
1496
NdbThread_set_shm_sigmask(TRUE);
1498
m_waiter->set_poll_owner(false);
1499
t_signal_cond_waiter= m_tp->rem_last_from_cond_wait_queue();
1500
m_tp->set_poll_owner(t_signal_cond_waiter);
1501
if (t_signal_cond_waiter)
1502
t_signal_cond_waiter->set_poll_owner(true);
1504
if (t_signal_cond_waiter)
1505
t_signal_cond_waiter->cond_signal();
1506
m_tp->unlock_mutex();
1510
template class Vector<NodeStatusFunction>;
1511
template class Vector<TransporterFacade::ThreadData::Object_Execute>;
1513
#include "SignalSender.hpp"
1516
SignalSender::sendSignal(Uint16 nodeId, const SimpleSignal * s){
1518
if(setSignalLog() && TRACE_GSN(s->header.theVerId_signalNumber)){
1519
SignalHeader tmp = s->header;
1520
tmp.theSendersBlockRef = getOwnRef();
1522
LinearSectionPtr ptr[3];
1523
signalLogger.sendSignal(tmp,
1527
signalLogger.flushSignalLog();
1530
assert(getNodeInfo(nodeId).m_api_reg_conf == true ||
1531
s->readSignalNumber() == GSN_API_REGREQ);
1532
return theFacade->theTransporterRegistry->prepareSend(&s->header,