1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include <ndb_global.h>
17
#include <my_pthread.h>
19
#include <TransporterRegistry.hpp>
20
#include "TransporterInternalDefinitions.hpp"
22
#include "Transporter.hpp"
23
#include <SocketAuthenticator.hpp>
25
#ifdef NDB_TCP_TRANSPORTER
26
#include "TCP_Transporter.hpp"
29
#ifdef NDB_SCI_TRANSPORTER
30
#include "SCI_Transporter.hpp"
33
#ifdef NDB_SHM_TRANSPORTER
34
#include "SHM_Transporter.hpp"
35
extern int g_ndb_shm_signum;
38
#include "TransporterCallback.hpp"
42
#include <InputStream.hpp>
43
#include <OutputStream.hpp>
45
#include <mgmapi/mgmapi.h>
46
#include <mgmapi_internal.h>
47
#include <mgmapi/mgmapi_debug.h>
49
#include <EventLogger.hpp>
50
extern EventLogger g_eventLogger;
53
TransporterRegistry::get_connect_address(NodeId node_id) const
55
return theTransporters[node_id]->m_connect_address;
58
SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
60
DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
61
if (m_auth && !m_auth->server_authenticate(sockfd)){
62
NDB_CLOSE_SOCKET(sockfd);
66
if (!m_transporter_registry->connect_server(sockfd))
68
NDB_CLOSE_SOCKET(sockfd);
75
TransporterRegistry::TransporterRegistry(void * callback,
76
unsigned _maxTransporters,
77
unsigned sizeOfLongSignalMemory) :
81
DBUG_ENTER("TransporterRegistry::TransporterRegistry");
83
nodeIdSpecified = false;
84
maxTransporters = _maxTransporters;
89
theTCPTransporters = new TCP_Transporter * [maxTransporters];
90
theSCITransporters = new SCI_Transporter * [maxTransporters];
91
theSHMTransporters = new SHM_Transporter * [maxTransporters];
92
theTransporterTypes = new TransporterType [maxTransporters];
93
theTransporters = new Transporter * [maxTransporters];
94
performStates = new PerformState [maxTransporters];
95
ioStates = new IOState [maxTransporters];
97
// Initialize member variables
100
nSCITransporters = 0;
101
nSHMTransporters = 0;
103
// Initialize the transporter arrays
104
for (unsigned i=0; i<maxTransporters; i++) {
105
theTCPTransporters[i] = NULL;
106
theSCITransporters[i] = NULL;
107
theSHMTransporters[i] = NULL;
108
theTransporters[i] = NULL;
109
performStates[i] = DISCONNECTED;
110
ioStates[i] = NoHalt;
116
void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
118
DBUG_ENTER("TransporterRegistry::set_mgm_handle");
120
ndb_mgm_destroy_handle(&m_mgm_handle);
122
ndb_mgm_set_timeout(m_mgm_handle, 5000);
127
DBUG_PRINT("info",("handle set with connectstring: %s",
128
ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
132
DBUG_PRINT("info",("handle set to NULL"));
138
TransporterRegistry::~TransporterRegistry()
140
DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
144
delete[] theTCPTransporters;
145
delete[] theSCITransporters;
146
delete[] theSHMTransporters;
147
delete[] theTransporterTypes;
148
delete[] theTransporters;
149
delete[] performStates;
153
ndb_mgm_destroy_handle(&m_mgm_handle);
159
TransporterRegistry::removeAll(){
160
for(unsigned i = 0; i<maxTransporters; i++){
161
if(theTransporters[i] != NULL)
162
removeTransporter(theTransporters[i]->getRemoteNodeId());
167
TransporterRegistry::disconnectAll(){
168
for(unsigned i = 0; i<maxTransporters; i++){
169
if(theTransporters[i] != NULL)
170
theTransporters[i]->doDisconnect();
175
TransporterRegistry::init(NodeId nodeId) {
176
DBUG_ENTER("TransporterRegistry::init");
177
nodeIdSpecified = true;
178
localNodeId = nodeId;
180
DEBUG("TransporterRegistry started node: " << localNodeId);
186
TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd)
188
DBUG_ENTER("TransporterRegistry::connect_server");
190
// read node id from client
191
// read transporter type
192
int nodeId, remote_transporter_type= -1;
193
SocketInputStream s_input(sockfd);
195
if (s_input.gets(buf, 256) == 0) {
196
DBUG_PRINT("error", ("Could not get node id from client"));
199
int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
204
// we're running version prior to 4.1.9
205
// ok, but with no checks on transporter configuration compatability
208
DBUG_PRINT("error", ("Error in node id from client"));
212
DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
213
nodeId,remote_transporter_type));
215
//check that nodeid is valid and that there is an allocated transporter
216
if ( nodeId < 0 || nodeId >= (int)maxTransporters) {
217
DBUG_PRINT("error", ("Node id out of range from client"));
220
if (theTransporters[nodeId] == 0) {
221
DBUG_PRINT("error", ("No transporter for this node id from client"));
225
//check that the transporter should be connected
226
if (performStates[nodeId] != TransporterRegistry::CONNECTING) {
227
DBUG_PRINT("error", ("Transporter in wrong state for this node id from client"));
231
Transporter *t= theTransporters[nodeId];
233
// send info about own id (just as response to acknowledge connection)
234
// send info on own transporter type
235
SocketOutputStream s_output(sockfd);
236
s_output.println("%d %d", t->getLocalNodeId(), t->m_type);
238
if (remote_transporter_type != -1)
240
if (remote_transporter_type != t->m_type)
242
DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
243
t->m_type, remote_transporter_type));
244
g_eventLogger.error("Incompatible configuration: Transporter type "
245
"mismatch with node %d", nodeId);
247
// wait for socket close for 1 second to let message arrive at client
251
FD_SET(sockfd, &a_set);
252
struct timeval timeout;
253
timeout.tv_sec = 1; timeout.tv_usec = 0;
254
select(sockfd+1, &a_set, 0, 0, &timeout);
259
else if (t->m_type == tt_SHM_TRANSPORTER)
261
g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
264
// setup transporter (transporter responsible for closing sockfd)
265
t->connect_server(sockfd);
271
TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
272
#ifdef NDB_TCP_TRANSPORTER
274
if(!nodeIdSpecified){
275
init(config->localNodeId);
278
if(config->localNodeId != localNodeId)
281
if(theTransporters[config->remoteNodeId] != NULL)
284
TCP_Transporter * t = new TCP_Transporter(*this,
285
config->tcp.sendBufferSize,
286
config->tcp.maxReceiveSize,
287
config->localHostName,
288
config->remoteHostName,
290
config->isMgmConnection,
292
config->remoteNodeId,
293
config->serverNodeId,
298
else if (!t->initTransporter()) {
303
// Put the transporter in the transporter arrays
304
theTCPTransporters[nTCPTransporters] = t;
305
theTransporters[t->getRemoteNodeId()] = t;
306
theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
307
performStates[t->getRemoteNodeId()] = DISCONNECTED;
318
TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
319
#ifdef NDB_SCI_TRANSPORTER
321
if(!SCI_Transporter::initSCI())
324
if(!nodeIdSpecified){
325
init(config->localNodeId);
328
if(config->localNodeId != localNodeId)
331
if(theTransporters[config->remoteNodeId] != NULL)
334
SCI_Transporter * t = new SCI_Transporter(*this,
335
config->localHostName,
336
config->remoteHostName,
338
config->isMgmConnection,
339
config->sci.sendLimit,
340
config->sci.bufferSize,
341
config->sci.nLocalAdapters,
342
config->sci.remoteSciNodeId0,
343
config->sci.remoteSciNodeId1,
345
config->remoteNodeId,
346
config->serverNodeId,
352
else if (!t->initTransporter()) {
356
// Put the transporter in the transporter arrays
357
theSCITransporters[nSCITransporters] = t;
358
theTransporters[t->getRemoteNodeId()] = t;
359
theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
360
performStates[t->getRemoteNodeId()] = DISCONNECTED;
371
TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
372
DBUG_ENTER("TransporterRegistry::createTransporter SHM");
373
#ifdef NDB_SHM_TRANSPORTER
374
if(!nodeIdSpecified){
375
init(config->localNodeId);
378
if(config->localNodeId != localNodeId)
381
if (!g_ndb_shm_signum) {
382
g_ndb_shm_signum= config->shm.signum;
383
DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
385
* Make sure to block g_ndb_shm_signum
386
* TransporterRegistry::init is run from "main" thread
388
NdbThread_set_shm_sigmask(TRUE);
391
if(config->shm.signum != g_ndb_shm_signum)
394
if(theTransporters[config->remoteNodeId] != NULL)
397
SHM_Transporter * t = new SHM_Transporter(*this,
398
config->localHostName,
399
config->remoteHostName,
401
config->isMgmConnection,
403
config->remoteNodeId,
404
config->serverNodeId,
412
else if (!t->initTransporter()) {
416
// Put the transporter in the transporter arrays
417
theSHMTransporters[nSHMTransporters] = t;
418
theTransporters[t->getRemoteNodeId()] = t;
419
theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
420
performStates[t->getRemoteNodeId()] = DISCONNECTED;
433
TransporterRegistry::removeTransporter(NodeId nodeId) {
435
DEBUG("Removing transporter from " << localNodeId
436
<< " to " << nodeId);
438
if(theTransporters[nodeId] == NULL)
441
theTransporters[nodeId]->doDisconnect();
443
const TransporterType type = theTransporterTypes[nodeId];
447
case tt_TCP_TRANSPORTER:
448
#ifdef NDB_TCP_TRANSPORTER
449
for(; ind < nTCPTransporters; ind++)
450
if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
453
for(; ind<nTCPTransporters; ind++)
454
theTCPTransporters[ind-1] = theTCPTransporters[ind];
458
case tt_SCI_TRANSPORTER:
459
#ifdef NDB_SCI_TRANSPORTER
460
for(; ind < nSCITransporters; ind++)
461
if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
464
for(; ind<nSCITransporters; ind++)
465
theSCITransporters[ind-1] = theSCITransporters[ind];
469
case tt_SHM_TRANSPORTER:
470
#ifdef NDB_SHM_TRANSPORTER
471
for(; ind < nSHMTransporters; ind++)
472
if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
475
for(; ind<nSHMTransporters; ind++)
476
theSHMTransporters[ind-1] = theSHMTransporters[ind];
484
// Delete the transporter and remove it from theTransporters array
485
delete theTransporters[nodeId];
486
theTransporters[nodeId] = NULL;
490
TransporterRegistry::get_free_buffer(Uint32 node) const
493
if(likely((t = theTransporters[node]) != 0))
495
return t->get_free_buffer();
502
TransporterRegistry::prepareSend(const SignalHeader * const signalHeader,
504
const Uint32 * const signalData,
506
const LinearSectionPtr ptr[3]){
509
Transporter *t = theTransporters[nodeId];
511
(((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
512
((signalHeader->theReceiversBlockNumber == 252) ||
513
(signalHeader->theReceiversBlockNumber == 4002)))) {
515
if(t->isConnected()){
516
Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
517
if(lenBytes <= MAX_MESSAGE_SIZE){
518
Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
520
t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
521
t->updateWritePtr(lenBytes, prio);
528
* @note: on linux/i386 the granularity is 10ms
529
* so sleepTime = 2 generates a 10 ms sleep.
531
for(int i = 0; i<50; i++){
532
if((nSHMTransporters+nSCITransporters) == 0)
533
NdbSleep_MilliSleep(sleepTime);
534
insertPtr = t->getWritePtr(lenBytes, prio);
536
t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
537
t->updateWritePtr(lenBytes, prio);
544
* Send buffer full, but resend works
546
reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
550
WARNING("Signal to " << nodeId << " lost(buffer)");
551
reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
552
return SEND_BUFFER_FULL;
554
return SEND_MESSAGE_TOO_BIG;
557
DEBUG("Signal to " << nodeId << " lost(disconnect) ");
558
return SEND_DISCONNECTED;
561
DEBUG("Discarding message to block: "
562
<< signalHeader->theReceiversBlockNumber
563
<< " node: " << nodeId);
566
return SEND_UNKNOWN_NODE;
573
TransporterRegistry::prepareSend(const SignalHeader * const signalHeader,
575
const Uint32 * const signalData,
577
class SectionSegmentPool & thePool,
578
const SegmentedSectionPtr ptr[3]){
581
Transporter *t = theTransporters[nodeId];
583
(((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
584
((signalHeader->theReceiversBlockNumber == 252)||
585
(signalHeader->theReceiversBlockNumber == 4002)))) {
587
if(t->isConnected()){
588
Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
589
if(lenBytes <= MAX_MESSAGE_SIZE){
590
Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
592
t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
593
t->updateWritePtr(lenBytes, prio);
599
* @note: on linux/i386 the granularity is 10ms
600
* so sleepTime = 2 generates a 10 ms sleep.
603
for(int i = 0; i<50; i++){
604
if((nSHMTransporters+nSCITransporters) == 0)
605
NdbSleep_MilliSleep(sleepTime);
606
insertPtr = t->getWritePtr(lenBytes, prio);
608
t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
609
t->updateWritePtr(lenBytes, prio);
616
* Send buffer full, but resend works
618
reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
622
WARNING("Signal to " << nodeId << " lost(buffer)");
623
reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
624
return SEND_BUFFER_FULL;
626
return SEND_MESSAGE_TOO_BIG;
629
DEBUG("Signal to " << nodeId << " lost(disconnect) ");
630
return SEND_DISCONNECTED;
633
DEBUG("Discarding message to block: "
634
<< signalHeader->theReceiversBlockNumber
635
<< " node: " << nodeId);
638
return SEND_UNKNOWN_NODE;
645
TransporterRegistry::external_IO(Uint32 timeOutMillis) {
646
//-----------------------------------------------------------
647
// Most of the time we will send the buffers here and then wait
648
// for new signals. Thus we start by sending without timeout
649
// followed by the receive part where we expect to sleep for
651
//-----------------------------------------------------------
652
if(pollReceive(timeOutMillis)){
659
TransporterRegistry::pollReceive(Uint32 timeOutMillis){
662
if((nSCITransporters) > 0)
667
#ifdef NDB_SHM_TRANSPORTER
668
if(nSHMTransporters > 0)
670
Uint32 res = poll_SHM(0);
679
#ifdef NDB_TCP_TRANSPORTER
680
if(nTCPTransporters > 0 || retVal == 0)
682
retVal |= poll_TCP(timeOutMillis);
685
tcpReadSelectReply = 0;
687
#ifdef NDB_SCI_TRANSPORTER
688
if(nSCITransporters > 0)
689
retVal |= poll_SCI(timeOutMillis);
691
#ifdef NDB_SHM_TRANSPORTER
692
if(nSHMTransporters > 0 && retVal == 0)
694
int res = poll_SHM(0);
702
#ifdef NDB_SCI_TRANSPORTER
704
TransporterRegistry::poll_SCI(Uint32 timeOutMillis)
706
for (int i=0; i<nSCITransporters; i++) {
707
SCI_Transporter * t = theSCITransporters[i];
708
if (t->isConnected()) {
709
if(t->hasDataToRead())
718
#ifdef NDB_SHM_TRANSPORTER
719
static int g_shm_counter = 0;
721
TransporterRegistry::poll_SHM(Uint32 timeOutMillis)
723
for(int j=0; j < 100; j++)
725
for (int i=0; i<nSHMTransporters; i++) {
726
SHM_Transporter * t = theSHMTransporters[i];
727
if (t->isConnected()) {
728
if(t->hasDataToRead()) {
738
#ifdef NDB_TCP_TRANSPORTER
740
TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
742
bool hasdata = false;
743
if (false && nTCPTransporters == 0)
745
tcpReadSelectReply = 0;
749
NDB_SOCKET_TYPE maxSocketValue = -1;
751
// Needed for TCP/IP connections
752
// The read- and writeset are used by select
754
FD_ZERO(&tcpReadset);
756
// Prepare for sending and receiving
757
for (int i = 0; i < nTCPTransporters; i++) {
758
TCP_Transporter * t = theTCPTransporters[i];
760
// If the transporter is connected
761
NodeId nodeId = t->getRemoteNodeId();
762
if (is_connected(nodeId) && t->isConnected()) {
764
const NDB_SOCKET_TYPE socket = t->getSocket();
765
// Find the highest socket value. It will be used by select
766
if (socket > maxSocketValue)
767
maxSocketValue = socket;
769
// Put the connected transporters in the socket read-set
770
FD_SET(socket, &tcpReadset);
772
hasdata |= t->hasReceiveData();
775
timeOutMillis = hasdata ? 0 : timeOutMillis;
777
struct timeval timeout;
778
timeout.tv_sec = timeOutMillis / 1000;
779
timeout.tv_usec = (timeOutMillis % 1000) * 1000;
781
// The highest socket value plus one
784
tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);
785
if(false && tcpReadSelectReply == -1 && errno == EINTR)
786
g_eventLogger.info("woke-up by signal");
789
if(tcpReadSelectReply == SOCKET_ERROR)
791
NdbSleep_MilliSleep(timeOutMillis);
795
return tcpReadSelectReply || hasdata;
801
TransporterRegistry::performReceive()
803
#ifdef NDB_TCP_TRANSPORTER
804
for (int i=0; i<nTCPTransporters; i++)
807
TCP_Transporter *t = theTCPTransporters[i];
808
const NodeId nodeId = t->getRemoteNodeId();
809
const NDB_SOCKET_TYPE socket = t->getSocket();
810
if(is_connected(nodeId)){
813
if (FD_ISSET(socket, &tcpReadset))
818
if (t->hasReceiveData())
821
Uint32 sz = t->getReceiveData(&ptr);
822
transporter_recv_from(callbackObj, nodeId);
823
Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]);
824
t->updateReceiveDataPtr(szUsed);
831
#ifdef NDB_SCI_TRANSPORTER
833
//do prepareReceive on the SCI transporters (prepareReceive(t,,,,))
834
for (int i=0; i<nSCITransporters; i++)
837
SCI_Transporter *t = theSCITransporters[i];
838
const NodeId nodeId = t->getRemoteNodeId();
839
if(is_connected(nodeId))
841
if(t->isConnected() && t->checkConnected())
843
Uint32 * readPtr, * eodPtr;
844
t->getReceivePtr(&readPtr, &eodPtr);
845
transporter_recv_from(callbackObj, nodeId);
846
Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
847
t->updateReceivePtr(newPtr);
852
#ifdef NDB_SHM_TRANSPORTER
853
for (int i=0; i<nSHMTransporters; i++)
856
SHM_Transporter *t = theSHMTransporters[i];
857
const NodeId nodeId = t->getRemoteNodeId();
858
if(is_connected(nodeId)){
859
if(t->isConnected() && t->checkConnected())
861
Uint32 * readPtr, * eodPtr;
862
t->getReceivePtr(&readPtr, &eodPtr);
863
transporter_recv_from(callbackObj, nodeId);
864
Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
865
t->updateReceivePtr(newPtr);
873
TransporterRegistry::performSend()
878
#ifdef NDB_TCP_TRANSPORTER
879
for (i = m_transp_count; i < nTCPTransporters; i++)
881
TCP_Transporter *t = theTCPTransporters[i];
882
if (t && t->hasDataToSend() && t->isConnected() &&
883
is_connected(t->getRemoteNodeId()))
888
for (i = 0; i < m_transp_count && i < nTCPTransporters; i++)
890
TCP_Transporter *t = theTCPTransporters[i];
891
if (t && t->hasDataToSend() && t->isConnected() &&
892
is_connected(t->getRemoteNodeId()))
898
if (m_transp_count == nTCPTransporters) m_transp_count = 0;
900
#ifdef NDB_SCI_TRANSPORTER
901
//scroll through the SCI transporters,
902
// get each transporter, check if connected, send data
903
for (i=0; i<nSCITransporters; i++) {
904
SCI_Transporter *t = theSCITransporters[i];
905
const NodeId nodeId = t->getRemoteNodeId();
907
if(is_connected(nodeId))
909
if(t->isConnected() && t->hasDataToSend()) {
916
#ifdef NDB_SHM_TRANSPORTER
917
for (i=0; i<nSHMTransporters; i++)
919
SHM_Transporter *t = theSHMTransporters[i];
920
const NodeId nodeId = t->getRemoteNodeId();
921
if(is_connected(nodeId))
933
TransporterRegistry::forceSendCheck(int sendLimit){
934
int tSendCounter = sendCounter;
935
sendCounter = tSendCounter + 1;
936
if (tSendCounter >= sendLimit) {
942
}//TransporterRegistry::forceSendCheck()
944
#ifdef DEBUG_TRANSPORTER
946
TransporterRegistry::printState(){
947
ndbout << "-- TransporterRegistry -- " << endl << endl
948
<< "Transporters = " << nTransporters << endl;
949
for(int i = 0; i<maxTransporters; i++)
950
if(theTransporters[i] != NULL){
951
const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
952
ndbout << "Transporter: " << remoteNodeId
953
<< " PerformState: " << performStates[remoteNodeId]
954
<< " IOState: " << ioStates[remoteNodeId] << endl;
960
TransporterRegistry::ioState(NodeId nodeId) {
961
return ioStates[nodeId];
965
TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
966
DEBUG("TransporterRegistry::setIOState("
967
<< nodeId << ", " << state << ")");
968
ioStates[nodeId] = state;
972
run_start_clients_C(void * me)
974
((TransporterRegistry*) me)->start_clients_thread();
978
// Run by kernel thread
980
TransporterRegistry::do_connect(NodeId node_id)
982
PerformState &curr_state = performStates[node_id];
993
DBUG_ENTER("TransporterRegistry::do_connect");
994
DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id));
995
curr_state= CONNECTING;
999
TransporterRegistry::do_disconnect(NodeId node_id)
1001
PerformState &curr_state = performStates[node_id];
1012
DBUG_ENTER("TransporterRegistry::do_disconnect");
1013
DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id));
1014
curr_state= DISCONNECTING;
1019
TransporterRegistry::report_connect(NodeId node_id)
1021
DBUG_ENTER("TransporterRegistry::report_connect");
1022
DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
1023
performStates[node_id] = CONNECTED;
1024
reportConnect(callbackObj, node_id);
1029
TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
1031
DBUG_ENTER("TransporterRegistry::report_disconnect");
1032
DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
1033
performStates[node_id] = DISCONNECTED;
1034
reportDisconnect(callbackObj, node_id, errnum);
1039
TransporterRegistry::update_connections()
1041
for (int i= 0, n= 0; n < nTransporters; i++){
1042
Transporter * t = theTransporters[i];
1047
const NodeId nodeId = t->getRemoteNodeId();
1048
switch(performStates[nodeId]){
1053
if(t->isConnected())
1054
report_connect(nodeId);
1057
if(!t->isConnected())
1058
report_disconnect(nodeId, 0);
1064
// run as own thread
1066
TransporterRegistry::start_clients_thread()
1068
int persist_mgm_count= 0;
1069
DBUG_ENTER("TransporterRegistry::start_clients_thread");
1070
while (m_run_start_clients_thread) {
1071
NdbSleep_MilliSleep(100);
1072
persist_mgm_count++;
1073
if(persist_mgm_count==50)
1075
ndb_mgm_check_connection(m_mgm_handle);
1076
persist_mgm_count= 0;
1078
for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
1079
Transporter * t = theTransporters[i];
1084
const NodeId nodeId = t->getRemoteNodeId();
1085
switch(performStates[nodeId]){
1087
if(!t->isConnected() && !t->isServer) {
1088
bool connected= false;
1090
* First, we try to connect (if we have a port number).
1092
if (t->get_s_port())
1093
connected= t->connect_client();
1096
* If dynamic, get the port for connecting from the management server
1098
if( !connected && t->get_s_port() <= 0) { // Port is dynamic
1100
struct ndb_mgm_reply mgm_reply;
1102
if(!ndb_mgm_is_connected(m_mgm_handle))
1103
ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
1105
if(ndb_mgm_is_connected(m_mgm_handle))
1108
ndb_mgm_get_connection_int_parameter(m_mgm_handle,
1109
t->getRemoteNodeId(),
1110
t->getLocalNodeId(),
1111
CFG_CONNECTION_SERVER_PORT,
1114
DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
1115
server_port,t->getRemoteNodeId(),
1116
t->getLocalNodeId(),res));
1120
* Server_port == 0 just means that that a mgmt server
1121
* has not received a new port yet. Keep the old.
1124
t->set_s_port(server_port);
1126
else if(ndb_mgm_is_connected(m_mgm_handle))
1128
g_eventLogger.info("Failed to get dynamic port to connect to: %d", res);
1129
ndb_mgm_disconnect(m_mgm_handle);
1133
g_eventLogger.info("Management server closed connection early. "
1134
"It is probably being shut down (or has problems). "
1135
"We will retry the connection. %d %s %s line: %d",
1136
ndb_mgm_get_latest_error(m_mgm_handle),
1137
ndb_mgm_get_latest_error_desc(m_mgm_handle),
1138
ndb_mgm_get_latest_error_msg(m_mgm_handle),
1139
ndb_mgm_get_latest_error_line(m_mgm_handle)
1144
* We will not be able to get a new port unless
1145
* the m_mgm_handle is connected. Note that not
1146
* being connected is an ok state, just continue
1147
* until it is able to connect. Continue using the
1148
* old port until we can connect again and get a
1155
if(t->isConnected())
1167
TransporterRegistry::start_clients()
1169
m_run_start_clients_thread= true;
1170
m_start_clients_thread= NdbThread_Create(run_start_clients_C,
1173
"ndb_start_clients",
1174
NDB_THREAD_PRIO_LOW);
1175
if (m_start_clients_thread == 0) {
1176
m_run_start_clients_thread= false;
1183
TransporterRegistry::stop_clients()
1185
if (m_start_clients_thread) {
1186
m_run_start_clients_thread= false;
1188
NdbThread_WaitFor(m_start_clients_thread, &status);
1189
NdbThread_Destroy(&m_start_clients_thread);
1195
TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
1199
DBUG_ENTER("TransporterRegistry::add_transporter_interface");
1200
DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));
1201
if (interf && strlen(interf) == 0)
1204
for (unsigned i= 0; i < m_transporter_interface.size(); i++)
1206
Transporter_interface &tmp= m_transporter_interface[i];
1207
if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
1209
if (interf != 0 && tmp.m_interface != 0 &&
1210
strcmp(interf, tmp.m_interface) == 0)
1212
DBUG_VOID_RETURN; // found match, no need to insert
1214
if (interf == 0 && tmp.m_interface == 0)
1216
DBUG_VOID_RETURN; // found match, no need to insert
1219
Transporter_interface t;
1220
t.m_remote_nodeId= remoteNodeId;
1221
t.m_s_service_port= s_port;
1222
t.m_interface= interf;
1223
m_transporter_interface.push_back(t);
1224
DBUG_PRINT("exit",("interface and port added"));
1229
TransporterRegistry::start_service(SocketServer& socket_server)
1231
DBUG_ENTER("TransporterRegistry::start_service");
1232
if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
1234
g_eventLogger.error("TransporterRegistry::startReceiving: localNodeId not specified");
1238
for (unsigned i= 0; i < m_transporter_interface.size(); i++)
1240
Transporter_interface &t= m_transporter_interface[i];
1242
unsigned short port= (unsigned short)t.m_s_service_port;
1243
if(t.m_s_service_port<0)
1244
port= -t.m_s_service_port; // is a dynamic port
1245
TransporterService *transporter_service =
1246
new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
1247
if(!socket_server.setup(transporter_service,
1248
&port, t.m_interface))
1250
DBUG_PRINT("info", ("Trying new port"));
1252
if(t.m_s_service_port>0
1253
|| !socket_server.setup(transporter_service,
1254
&port, t.m_interface))
1257
* If it wasn't a dynamically allocated port, or
1258
* our attempts at getting a new dynamic port failed
1260
g_eventLogger.error("Unable to setup transporter service port: %s:%d!\n"
1261
"Please check if the port is already used,\n"
1262
"(perhaps the node is already running)",
1263
t.m_interface ? t.m_interface : "*", t.m_s_service_port);
1264
delete transporter_service;
1268
t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
1269
DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
1270
transporter_service->setTransporterRegistry(this);
1275
#ifdef NDB_SHM_TRANSPORTER
1278
shm_sig_handler(int signo)
1285
TransporterRegistry::startReceiving()
1287
DBUG_ENTER("TransporterRegistry::startReceiving");
1289
#ifdef NDB_SHM_TRANSPORTER
1290
m_shm_own_pid = getpid();
1291
if (g_ndb_shm_signum)
1293
DBUG_PRINT("info",("Install signal handler for signum %d",
1295
struct sigaction sa;
1296
NdbThread_set_shm_sigmask(FALSE);
1297
sigemptyset(&sa.sa_mask);
1298
sa.sa_handler = shm_sig_handler;
1301
while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR);
1304
DBUG_PRINT("error",("Install failed"));
1305
g_eventLogger.error("Failed to install signal handler for"
1306
" SHM transporter, signum %d, errno: %d (%s)",
1307
g_ndb_shm_signum, errno, strerror(errno));
1310
#endif // NDB_SHM_TRANSPORTER
1315
TransporterRegistry::stopReceiving(){
1317
* Disconnect all transporters, this includes detach from remote node
1318
* and since that must be done from the same process that called attach
1319
* it's done here in the receive thread
1325
TransporterRegistry::startSending(){
1329
TransporterRegistry::stopSending(){
1332
NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
1333
out << "-- Signal Header --" << endl;
1334
out << "theLength: " << sh.theLength << endl;
1335
out << "gsn: " << sh.theVerId_signalNumber << endl;
1336
out << "recBlockNo: " << sh.theReceiversBlockNumber << endl;
1337
out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
1338
out << "sendersSig: " << sh.theSendersSignalId << endl;
1339
out << "theSignalId: " << sh.theSignalId << endl;
1340
out << "trace: " << (int)sh.theTrace << endl;
1345
TransporterRegistry::get_transporter(NodeId nodeId) {
1346
return theTransporters[nodeId];
1349
bool TransporterRegistry::connect_client(NdbMgmHandle *h)
1351
DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
1353
Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
1357
g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1360
Transporter * t = theTransporters[mgm_nodeid];
1363
g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1366
DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h)));
1370
* Given a connected NdbMgmHandle, turns it into a transporter
1371
* and returns the socket.
1373
NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
1375
struct ndb_mgm_reply mgm_reply;
1377
if ( h==NULL || *h == NULL )
1379
g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1380
return NDB_INVALID_SOCKET;
1383
for(unsigned int i=0;i < m_transporter_interface.size();i++)
1384
if (m_transporter_interface[i].m_s_service_port < 0
1385
&& ndb_mgm_set_connection_int_parameter(*h,
1387
m_transporter_interface[i].m_remote_nodeId,
1388
CFG_CONNECTION_SERVER_PORT,
1389
m_transporter_interface[i].m_s_service_port,
1392
g_eventLogger.error("Error: %s: %d",
1393
ndb_mgm_get_latest_error_desc(*h),
1394
ndb_mgm_get_latest_error(*h));
1395
g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1396
ndb_mgm_destroy_handle(h);
1397
return NDB_INVALID_SOCKET;
1401
* convert_to_transporter also disposes of the handle (i.e. we don't leak
1404
NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h);
1405
if ( sockfd == NDB_INVALID_SOCKET)
1407
g_eventLogger.error("Error: %s: %d",
1408
ndb_mgm_get_latest_error_desc(*h),
1409
ndb_mgm_get_latest_error(*h));
1410
g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1411
ndb_mgm_destroy_handle(h);
1417
* Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
1418
* and returns the socket.
1420
NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc)
1422
NdbMgmHandle h= ndb_mgm_create_handle();
1426
return NDB_INVALID_SOCKET;
1434
cs.assfmt("%s:%u",sc->get_server_name(),sc->get_port());
1435
ndb_mgm_set_connectstring(h, cs.c_str());
1438
if(ndb_mgm_connect(h, 0, 0, 0)<0)
1440
ndb_mgm_destroy_handle(&h);
1441
return NDB_INVALID_SOCKET;
1444
return connect_ndb_mgmd(&h);
1447
template class Vector<TransporterRegistry::Transporter_interface>;