~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/common/transporter/TransporterRegistry.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <ndb_global.h>
 
17
#include <my_pthread.h>
 
18
 
 
19
#include <TransporterRegistry.hpp>
 
20
#include "TransporterInternalDefinitions.hpp"
 
21
 
 
22
#include "Transporter.hpp"
 
23
#include <SocketAuthenticator.hpp>
 
24
 
 
25
#ifdef NDB_TCP_TRANSPORTER
 
26
#include "TCP_Transporter.hpp"
 
27
#endif
 
28
 
 
29
#ifdef NDB_SCI_TRANSPORTER
 
30
#include "SCI_Transporter.hpp"
 
31
#endif
 
32
 
 
33
#ifdef NDB_SHM_TRANSPORTER
 
34
#include "SHM_Transporter.hpp"
 
35
extern int g_ndb_shm_signum;
 
36
#endif
 
37
 
 
38
#include "TransporterCallback.hpp"
 
39
#include "NdbOut.hpp"
 
40
#include <NdbSleep.h>
 
41
#include <NdbTick.h>
 
42
#include <InputStream.hpp>
 
43
#include <OutputStream.hpp>
 
44
 
 
45
#include <mgmapi/mgmapi.h>
 
46
#include <mgmapi_internal.h>
 
47
#include <mgmapi/mgmapi_debug.h>
 
48
 
 
49
#include <EventLogger.hpp>
 
50
extern EventLogger g_eventLogger;
 
51
 
 
52
struct in_addr
 
53
TransporterRegistry::get_connect_address(NodeId node_id) const
 
54
{
 
55
  return theTransporters[node_id]->m_connect_address;
 
56
}
 
57
 
 
58
SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
 
59
{
 
60
  DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
 
61
  if (m_auth && !m_auth->server_authenticate(sockfd)){
 
62
    NDB_CLOSE_SOCKET(sockfd);
 
63
    DBUG_RETURN(0);
 
64
  }
 
65
 
 
66
  if (!m_transporter_registry->connect_server(sockfd))
 
67
  {
 
68
    NDB_CLOSE_SOCKET(sockfd);
 
69
    DBUG_RETURN(0);
 
70
  }
 
71
 
 
72
  DBUG_RETURN(0);
 
73
}
 
74
 
 
75
TransporterRegistry::TransporterRegistry(void * callback,
 
76
                                         unsigned _maxTransporters,
 
77
                                         unsigned sizeOfLongSignalMemory) :
 
78
  m_mgm_handle(0),
 
79
  m_transp_count(0)
 
80
{
 
81
  DBUG_ENTER("TransporterRegistry::TransporterRegistry");
 
82
 
 
83
  nodeIdSpecified = false;
 
84
  maxTransporters = _maxTransporters;
 
85
  sendCounter = 1;
 
86
  
 
87
  callbackObj=callback;
 
88
 
 
89
  theTCPTransporters  = new TCP_Transporter * [maxTransporters];
 
90
  theSCITransporters  = new SCI_Transporter * [maxTransporters];
 
91
  theSHMTransporters  = new SHM_Transporter * [maxTransporters];
 
92
  theTransporterTypes = new TransporterType   [maxTransporters];
 
93
  theTransporters     = new Transporter     * [maxTransporters];
 
94
  performStates       = new PerformState      [maxTransporters];
 
95
  ioStates            = new IOState           [maxTransporters]; 
 
96
  
 
97
  // Initialize member variables
 
98
  nTransporters    = 0;
 
99
  nTCPTransporters = 0;
 
100
  nSCITransporters = 0;
 
101
  nSHMTransporters = 0;
 
102
  
 
103
  // Initialize the transporter arrays
 
104
  for (unsigned i=0; i<maxTransporters; i++) {
 
105
    theTCPTransporters[i] = NULL;
 
106
    theSCITransporters[i] = NULL;
 
107
    theSHMTransporters[i] = NULL;
 
108
    theTransporters[i]    = NULL;
 
109
    performStates[i]      = DISCONNECTED;
 
110
    ioStates[i]           = NoHalt;
 
111
  }
 
112
 
 
113
  DBUG_VOID_RETURN;
 
114
}
 
115
 
 
116
void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
 
117
{
 
118
  DBUG_ENTER("TransporterRegistry::set_mgm_handle");
 
119
  if (m_mgm_handle)
 
120
    ndb_mgm_destroy_handle(&m_mgm_handle);
 
121
  m_mgm_handle= h;
 
122
  ndb_mgm_set_timeout(m_mgm_handle, 5000);
 
123
#ifndef DBUG_OFF
 
124
  if (h)
 
125
  {
 
126
    char buf[256];
 
127
    DBUG_PRINT("info",("handle set with connectstring: %s",
 
128
                       ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
 
129
  }
 
130
  else
 
131
  {
 
132
    DBUG_PRINT("info",("handle set to NULL"));
 
133
  }
 
134
#endif
 
135
  DBUG_VOID_RETURN;
 
136
}
 
137
 
 
138
TransporterRegistry::~TransporterRegistry()
 
139
{
 
140
  DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
 
141
  
 
142
  removeAll();
 
143
  
 
144
  delete[] theTCPTransporters;
 
145
  delete[] theSCITransporters;
 
146
  delete[] theSHMTransporters;
 
147
  delete[] theTransporterTypes;
 
148
  delete[] theTransporters;
 
149
  delete[] performStates;
 
150
  delete[] ioStates;
 
151
 
 
152
  if (m_mgm_handle)
 
153
    ndb_mgm_destroy_handle(&m_mgm_handle);
 
154
 
 
155
  DBUG_VOID_RETURN;
 
156
}
 
157
 
 
158
void
 
159
TransporterRegistry::removeAll(){
 
160
  for(unsigned i = 0; i<maxTransporters; i++){
 
161
    if(theTransporters[i] != NULL)
 
162
      removeTransporter(theTransporters[i]->getRemoteNodeId());
 
163
  }
 
164
}
 
165
 
 
166
void
 
167
TransporterRegistry::disconnectAll(){
 
168
  for(unsigned i = 0; i<maxTransporters; i++){
 
169
    if(theTransporters[i] != NULL)
 
170
      theTransporters[i]->doDisconnect();
 
171
  }
 
172
}
 
173
 
 
174
bool
 
175
TransporterRegistry::init(NodeId nodeId) {
 
176
  DBUG_ENTER("TransporterRegistry::init");
 
177
  nodeIdSpecified = true;
 
178
  localNodeId = nodeId;
 
179
  
 
180
  DEBUG("TransporterRegistry started node: " << localNodeId);
 
181
  
 
182
  DBUG_RETURN(true);
 
183
}
 
184
 
 
185
bool
 
186
TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd)
 
187
{
 
188
  DBUG_ENTER("TransporterRegistry::connect_server");
 
189
 
 
190
  // read node id from client
 
191
  // read transporter type
 
192
  int nodeId, remote_transporter_type= -1;
 
193
  SocketInputStream s_input(sockfd);
 
194
  char buf[256];
 
195
  if (s_input.gets(buf, 256) == 0) {
 
196
    DBUG_PRINT("error", ("Could not get node id from client"));
 
197
    DBUG_RETURN(false);
 
198
  }
 
199
  int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
 
200
  switch (r) {
 
201
  case 2:
 
202
    break;
 
203
  case 1:
 
204
    // we're running version prior to 4.1.9
 
205
    // ok, but with no checks on transporter configuration compatability
 
206
    break;
 
207
  default:
 
208
    DBUG_PRINT("error", ("Error in node id from client"));
 
209
    DBUG_RETURN(false);
 
210
  }
 
211
 
 
212
  DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
 
213
                      nodeId,remote_transporter_type));
 
214
 
 
215
  //check that nodeid is valid and that there is an allocated transporter
 
216
  if ( nodeId < 0 || nodeId >= (int)maxTransporters) {
 
217
    DBUG_PRINT("error", ("Node id out of range from client"));
 
218
    DBUG_RETURN(false);
 
219
  }
 
220
  if (theTransporters[nodeId] == 0) {
 
221
      DBUG_PRINT("error", ("No transporter for this node id from client"));
 
222
      DBUG_RETURN(false);
 
223
  }
 
224
 
 
225
  //check that the transporter should be connected
 
226
  if (performStates[nodeId] != TransporterRegistry::CONNECTING) {
 
227
    DBUG_PRINT("error", ("Transporter in wrong state for this node id from client"));
 
228
    DBUG_RETURN(false);
 
229
  }
 
230
 
 
231
  Transporter *t= theTransporters[nodeId];
 
232
 
 
233
  // send info about own id (just as response to acknowledge connection)
 
234
  // send info on own transporter type
 
235
  SocketOutputStream s_output(sockfd);
 
236
  s_output.println("%d %d", t->getLocalNodeId(), t->m_type);
 
237
 
 
238
  if (remote_transporter_type != -1)
 
239
  {
 
240
    if (remote_transporter_type != t->m_type)
 
241
    {
 
242
      DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
 
243
                           t->m_type, remote_transporter_type));
 
244
      g_eventLogger.error("Incompatible configuration: Transporter type "
 
245
                          "mismatch with node %d", nodeId);
 
246
 
 
247
      // wait for socket close for 1 second to let message arrive at client
 
248
      {
 
249
        fd_set a_set;
 
250
        FD_ZERO(&a_set);
 
251
        FD_SET(sockfd, &a_set);
 
252
        struct timeval timeout;
 
253
        timeout.tv_sec  = 1; timeout.tv_usec = 0;
 
254
        select(sockfd+1, &a_set, 0, 0, &timeout);
 
255
      }
 
256
      DBUG_RETURN(false);
 
257
    }
 
258
  }
 
259
  else if (t->m_type == tt_SHM_TRANSPORTER)
 
260
  {
 
261
    g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
 
262
  }
 
263
 
 
264
  // setup transporter (transporter responsible for closing sockfd)
 
265
  t->connect_server(sockfd);
 
266
 
 
267
  DBUG_RETURN(true);
 
268
}
 
269
 
 
270
bool
 
271
TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
 
272
#ifdef NDB_TCP_TRANSPORTER
 
273
 
 
274
  if(!nodeIdSpecified){
 
275
    init(config->localNodeId);
 
276
  }
 
277
  
 
278
  if(config->localNodeId != localNodeId) 
 
279
    return false;
 
280
  
 
281
  if(theTransporters[config->remoteNodeId] != NULL)
 
282
    return false;
 
283
   
 
284
  TCP_Transporter * t = new TCP_Transporter(*this,
 
285
                                            config->tcp.sendBufferSize,
 
286
                                            config->tcp.maxReceiveSize,
 
287
                                            config->localHostName,
 
288
                                            config->remoteHostName,
 
289
                                            config->s_port,
 
290
                                            config->isMgmConnection,
 
291
                                            localNodeId,
 
292
                                            config->remoteNodeId,
 
293
                                            config->serverNodeId,
 
294
                                            config->checksum,
 
295
                                            config->signalId);
 
296
  if (t == NULL) 
 
297
    return false;
 
298
  else if (!t->initTransporter()) {
 
299
    delete t;
 
300
    return false;
 
301
  }
 
302
 
 
303
  // Put the transporter in the transporter arrays
 
304
  theTCPTransporters[nTCPTransporters]      = t;
 
305
  theTransporters[t->getRemoteNodeId()]     = t;
 
306
  theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
 
307
  performStates[t->getRemoteNodeId()]       = DISCONNECTED;
 
308
  nTransporters++;
 
309
  nTCPTransporters++;
 
310
 
 
311
  return true;
 
312
#else
 
313
  return false;
 
314
#endif
 
315
}
 
316
 
 
317
bool
 
318
TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
 
319
#ifdef NDB_SCI_TRANSPORTER
 
320
 
 
321
  if(!SCI_Transporter::initSCI())
 
322
    abort();
 
323
  
 
324
  if(!nodeIdSpecified){
 
325
    init(config->localNodeId);
 
326
  }
 
327
  
 
328
  if(config->localNodeId != localNodeId)
 
329
    return false;
 
330
 
 
331
  if(theTransporters[config->remoteNodeId] != NULL)
 
332
    return false;
 
333
 
 
334
  SCI_Transporter * t = new SCI_Transporter(*this,
 
335
                                            config->localHostName,
 
336
                                            config->remoteHostName,
 
337
                                            config->s_port,
 
338
                                            config->isMgmConnection,
 
339
                                            config->sci.sendLimit, 
 
340
                                            config->sci.bufferSize,
 
341
                                            config->sci.nLocalAdapters,
 
342
                                            config->sci.remoteSciNodeId0,
 
343
                                            config->sci.remoteSciNodeId1,
 
344
                                            localNodeId,
 
345
                                            config->remoteNodeId,
 
346
                                            config->serverNodeId,
 
347
                                            config->checksum,
 
348
                                            config->signalId);
 
349
  
 
350
  if (t == NULL) 
 
351
    return false;
 
352
  else if (!t->initTransporter()) {
 
353
    delete t;
 
354
    return false;
 
355
  }
 
356
  // Put the transporter in the transporter arrays
 
357
  theSCITransporters[nSCITransporters]      = t;
 
358
  theTransporters[t->getRemoteNodeId()]     = t;
 
359
  theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
 
360
  performStates[t->getRemoteNodeId()]       = DISCONNECTED;
 
361
  nTransporters++;
 
362
  nSCITransporters++;
 
363
  
 
364
  return true;
 
365
#else
 
366
  return false;
 
367
#endif
 
368
}
 
369
 
 
370
bool
 
371
TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
 
372
  DBUG_ENTER("TransporterRegistry::createTransporter SHM");
 
373
#ifdef NDB_SHM_TRANSPORTER
 
374
  if(!nodeIdSpecified){
 
375
    init(config->localNodeId);
 
376
  }
 
377
  
 
378
  if(config->localNodeId != localNodeId)
 
379
    return false;
 
380
  
 
381
  if (!g_ndb_shm_signum) {
 
382
    g_ndb_shm_signum= config->shm.signum;
 
383
    DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
 
384
    /**
 
385
     * Make sure to block g_ndb_shm_signum
 
386
     *   TransporterRegistry::init is run from "main" thread
 
387
     */
 
388
    NdbThread_set_shm_sigmask(TRUE);
 
389
  }
 
390
 
 
391
  if(config->shm.signum != g_ndb_shm_signum)
 
392
    return false;
 
393
  
 
394
  if(theTransporters[config->remoteNodeId] != NULL)
 
395
    return false;
 
396
 
 
397
  SHM_Transporter * t = new SHM_Transporter(*this,
 
398
                                            config->localHostName,
 
399
                                            config->remoteHostName,
 
400
                                            config->s_port,
 
401
                                            config->isMgmConnection,
 
402
                                            localNodeId,
 
403
                                            config->remoteNodeId,
 
404
                                            config->serverNodeId,
 
405
                                            config->checksum,
 
406
                                            config->signalId,
 
407
                                            config->shm.shmKey,
 
408
                                            config->shm.shmSize
 
409
                                            );
 
410
  if (t == NULL)
 
411
    return false;
 
412
  else if (!t->initTransporter()) {
 
413
    delete t;
 
414
    return false;
 
415
  }
 
416
  // Put the transporter in the transporter arrays
 
417
  theSHMTransporters[nSHMTransporters]      = t;
 
418
  theTransporters[t->getRemoteNodeId()]     = t;
 
419
  theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
 
420
  performStates[t->getRemoteNodeId()]       = DISCONNECTED;
 
421
  
 
422
  nTransporters++;
 
423
  nSHMTransporters++;
 
424
 
 
425
  DBUG_RETURN(true);
 
426
#else
 
427
  DBUG_RETURN(false);
 
428
#endif
 
429
}
 
430
 
 
431
 
 
432
void
 
433
TransporterRegistry::removeTransporter(NodeId nodeId) {
 
434
 
 
435
  DEBUG("Removing transporter from " << localNodeId
 
436
        << " to " << nodeId);
 
437
  
 
438
  if(theTransporters[nodeId] == NULL)
 
439
    return;
 
440
  
 
441
  theTransporters[nodeId]->doDisconnect();
 
442
  
 
443
  const TransporterType type = theTransporterTypes[nodeId];
 
444
 
 
445
  int ind = 0;
 
446
  switch(type){
 
447
  case tt_TCP_TRANSPORTER:
 
448
#ifdef NDB_TCP_TRANSPORTER
 
449
    for(; ind < nTCPTransporters; ind++)
 
450
      if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
 
451
        break;
 
452
    ind++;
 
453
    for(; ind<nTCPTransporters; ind++)
 
454
      theTCPTransporters[ind-1] = theTCPTransporters[ind];
 
455
    nTCPTransporters --;
 
456
#endif
 
457
    break;
 
458
  case tt_SCI_TRANSPORTER:
 
459
#ifdef NDB_SCI_TRANSPORTER
 
460
    for(; ind < nSCITransporters; ind++)
 
461
      if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
 
462
        break;
 
463
    ind++;
 
464
    for(; ind<nSCITransporters; ind++)
 
465
      theSCITransporters[ind-1] = theSCITransporters[ind];
 
466
    nSCITransporters --;
 
467
#endif
 
468
    break;
 
469
  case tt_SHM_TRANSPORTER:
 
470
#ifdef NDB_SHM_TRANSPORTER
 
471
    for(; ind < nSHMTransporters; ind++)
 
472
      if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
 
473
        break;
 
474
    ind++;
 
475
    for(; ind<nSHMTransporters; ind++)
 
476
      theSHMTransporters[ind-1] = theSHMTransporters[ind];
 
477
    nSHMTransporters --;
 
478
#endif
 
479
    break;
 
480
  }
 
481
 
 
482
  nTransporters--;
 
483
 
 
484
  // Delete the transporter and remove it from theTransporters array
 
485
  delete theTransporters[nodeId];
 
486
  theTransporters[nodeId] = NULL;        
 
487
}
 
488
 
 
489
Uint32
 
490
TransporterRegistry::get_free_buffer(Uint32 node) const
 
491
{
 
492
  Transporter *t;
 
493
  if(likely((t = theTransporters[node]) != 0))
 
494
  {
 
495
    return t->get_free_buffer();
 
496
  }
 
497
  return 0;
 
498
}
 
499
 
 
500
 
 
501
SendStatus
 
502
TransporterRegistry::prepareSend(const SignalHeader * const signalHeader, 
 
503
                                 Uint8 prio,
 
504
                                 const Uint32 * const signalData,
 
505
                                 NodeId nodeId, 
 
506
                                 const LinearSectionPtr ptr[3]){
 
507
 
 
508
 
 
509
  Transporter *t = theTransporters[nodeId];
 
510
  if(t != NULL && 
 
511
     (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) || 
 
512
      ((signalHeader->theReceiversBlockNumber == 252) ||
 
513
       (signalHeader->theReceiversBlockNumber == 4002)))) {
 
514
         
 
515
    if(t->isConnected()){
 
516
      Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
 
517
      if(lenBytes <= MAX_MESSAGE_SIZE){
 
518
        Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
 
519
        if(insertPtr != 0){
 
520
          t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
 
521
          t->updateWritePtr(lenBytes, prio);
 
522
          return SEND_OK;
 
523
        }
 
524
 
 
525
        int sleepTime = 2;      
 
526
 
 
527
        /**
 
528
         * @note: on linux/i386 the granularity is 10ms
 
529
         *        so sleepTime = 2 generates a 10 ms sleep.
 
530
         */
 
531
        for(int i = 0; i<50; i++){
 
532
          if((nSHMTransporters+nSCITransporters) == 0)
 
533
            NdbSleep_MilliSleep(sleepTime); 
 
534
          insertPtr = t->getWritePtr(lenBytes, prio);
 
535
          if(insertPtr != 0){
 
536
            t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
 
537
            t->updateWritePtr(lenBytes, prio);
 
538
            break;
 
539
          }
 
540
        }
 
541
        
 
542
        if(insertPtr != 0){
 
543
          /**
 
544
           * Send buffer full, but resend works
 
545
           */
 
546
          reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
 
547
          return SEND_OK;
 
548
        }
 
549
        
 
550
        WARNING("Signal to " << nodeId << " lost(buffer)");
 
551
        reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
 
552
        return SEND_BUFFER_FULL;
 
553
      } else {
 
554
        return SEND_MESSAGE_TOO_BIG;
 
555
      }
 
556
    } else {
 
557
      DEBUG("Signal to " << nodeId << " lost(disconnect) ");
 
558
      return SEND_DISCONNECTED;
 
559
    }
 
560
  } else {
 
561
    DEBUG("Discarding message to block: " 
 
562
          << signalHeader->theReceiversBlockNumber 
 
563
          << " node: " << nodeId);
 
564
    
 
565
    if(t == NULL)
 
566
      return SEND_UNKNOWN_NODE;
 
567
    
 
568
    return SEND_BLOCKED;
 
569
  }
 
570
}
 
571
 
 
572
SendStatus
 
573
TransporterRegistry::prepareSend(const SignalHeader * const signalHeader, 
 
574
                                 Uint8 prio,
 
575
                                 const Uint32 * const signalData,
 
576
                                 NodeId nodeId, 
 
577
                                 class SectionSegmentPool & thePool,
 
578
                                 const SegmentedSectionPtr ptr[3]){
 
579
  
 
580
 
 
581
  Transporter *t = theTransporters[nodeId];
 
582
  if(t != NULL && 
 
583
     (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) || 
 
584
      ((signalHeader->theReceiversBlockNumber == 252)|| 
 
585
       (signalHeader->theReceiversBlockNumber == 4002)))) {
 
586
    
 
587
    if(t->isConnected()){
 
588
      Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
 
589
      if(lenBytes <= MAX_MESSAGE_SIZE){
 
590
        Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
 
591
        if(insertPtr != 0){
 
592
          t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
 
593
          t->updateWritePtr(lenBytes, prio);
 
594
          return SEND_OK;
 
595
        }
 
596
        
 
597
        
 
598
        /**
 
599
         * @note: on linux/i386 the granularity is 10ms
 
600
         *        so sleepTime = 2 generates a 10 ms sleep.
 
601
         */
 
602
        int sleepTime = 2;
 
603
        for(int i = 0; i<50; i++){
 
604
          if((nSHMTransporters+nSCITransporters) == 0)
 
605
            NdbSleep_MilliSleep(sleepTime); 
 
606
          insertPtr = t->getWritePtr(lenBytes, prio);
 
607
          if(insertPtr != 0){
 
608
            t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
 
609
            t->updateWritePtr(lenBytes, prio);
 
610
            break;
 
611
          }
 
612
        }
 
613
        
 
614
        if(insertPtr != 0){
 
615
          /**
 
616
           * Send buffer full, but resend works
 
617
           */
 
618
          reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
 
619
          return SEND_OK;
 
620
        }
 
621
        
 
622
        WARNING("Signal to " << nodeId << " lost(buffer)");
 
623
        reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
 
624
        return SEND_BUFFER_FULL;
 
625
      } else {
 
626
        return SEND_MESSAGE_TOO_BIG;
 
627
      }
 
628
    } else {
 
629
      DEBUG("Signal to " << nodeId << " lost(disconnect) ");
 
630
      return SEND_DISCONNECTED;
 
631
    }
 
632
  } else {
 
633
    DEBUG("Discarding message to block: " 
 
634
          << signalHeader->theReceiversBlockNumber 
 
635
          << " node: " << nodeId);
 
636
    
 
637
    if(t == NULL)
 
638
      return SEND_UNKNOWN_NODE;
 
639
    
 
640
    return SEND_BLOCKED;
 
641
  }
 
642
}
 
643
 
 
644
void
 
645
TransporterRegistry::external_IO(Uint32 timeOutMillis) {
 
646
  //-----------------------------------------------------------
 
647
  // Most of the time we will send the buffers here and then wait
 
648
  // for new signals. Thus we start by sending without timeout
 
649
  // followed by the receive part where we expect to sleep for
 
650
  // a while.
 
651
  //-----------------------------------------------------------
 
652
  if(pollReceive(timeOutMillis)){
 
653
    performReceive();
 
654
  }
 
655
  performSend();
 
656
}
 
657
 
 
658
Uint32
 
659
TransporterRegistry::pollReceive(Uint32 timeOutMillis){
 
660
  Uint32 retVal = 0;
 
661
 
 
662
  if((nSCITransporters) > 0)
 
663
  {
 
664
    timeOutMillis=0;
 
665
  }
 
666
 
 
667
#ifdef NDB_SHM_TRANSPORTER
 
668
  if(nSHMTransporters > 0)
 
669
  {
 
670
    Uint32 res = poll_SHM(0);
 
671
    if(res)
 
672
    {
 
673
      retVal |= res;
 
674
      timeOutMillis = 0;
 
675
    }
 
676
  }
 
677
#endif
 
678
 
 
679
#ifdef NDB_TCP_TRANSPORTER
 
680
  if(nTCPTransporters > 0 || retVal == 0)
 
681
  {
 
682
    retVal |= poll_TCP(timeOutMillis);
 
683
  }
 
684
  else
 
685
    tcpReadSelectReply = 0;
 
686
#endif
 
687
#ifdef NDB_SCI_TRANSPORTER
 
688
  if(nSCITransporters > 0)
 
689
    retVal |= poll_SCI(timeOutMillis);
 
690
#endif
 
691
#ifdef NDB_SHM_TRANSPORTER
 
692
  if(nSHMTransporters > 0 && retVal == 0)
 
693
  {
 
694
    int res = poll_SHM(0);
 
695
    retVal |= res;
 
696
  }
 
697
#endif
 
698
  return retVal;
 
699
}
 
700
 
 
701
 
 
702
#ifdef NDB_SCI_TRANSPORTER
 
703
Uint32
 
704
TransporterRegistry::poll_SCI(Uint32 timeOutMillis)
 
705
{
 
706
  for (int i=0; i<nSCITransporters; i++) {
 
707
    SCI_Transporter * t = theSCITransporters[i];
 
708
    if (t->isConnected()) {
 
709
      if(t->hasDataToRead())
 
710
        return 1;
 
711
    }
 
712
  }
 
713
  return 0;
 
714
}
 
715
#endif
 
716
 
 
717
 
 
718
#ifdef NDB_SHM_TRANSPORTER
 
719
static int g_shm_counter = 0;
 
720
Uint32
 
721
TransporterRegistry::poll_SHM(Uint32 timeOutMillis)
 
722
{  
 
723
  for(int j=0; j < 100; j++)
 
724
  {
 
725
    for (int i=0; i<nSHMTransporters; i++) {
 
726
      SHM_Transporter * t = theSHMTransporters[i];
 
727
      if (t->isConnected()) {
 
728
        if(t->hasDataToRead()) {
 
729
          return 1;
 
730
        }
 
731
      }
 
732
    }
 
733
  }
 
734
  return 0;
 
735
}
 
736
#endif
 
737
 
 
738
#ifdef NDB_TCP_TRANSPORTER
 
739
Uint32 
 
740
TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
 
741
{
 
742
  bool hasdata = false;
 
743
  if (false && nTCPTransporters == 0)
 
744
  {
 
745
    tcpReadSelectReply = 0;
 
746
    return 0;
 
747
  }
 
748
  
 
749
  NDB_SOCKET_TYPE maxSocketValue = -1;
 
750
  
 
751
  // Needed for TCP/IP connections
 
752
  // The read- and writeset are used by select
 
753
  
 
754
  FD_ZERO(&tcpReadset);
 
755
 
 
756
  // Prepare for sending and receiving
 
757
  for (int i = 0; i < nTCPTransporters; i++) {
 
758
    TCP_Transporter * t = theTCPTransporters[i];
 
759
    
 
760
    // If the transporter is connected
 
761
    NodeId nodeId = t->getRemoteNodeId();
 
762
    if (is_connected(nodeId) && t->isConnected()) {
 
763
      
 
764
      const NDB_SOCKET_TYPE socket = t->getSocket();
 
765
      // Find the highest socket value. It will be used by select
 
766
      if (socket > maxSocketValue)
 
767
        maxSocketValue = socket;
 
768
      
 
769
      // Put the connected transporters in the socket read-set 
 
770
      FD_SET(socket, &tcpReadset);
 
771
    }
 
772
    hasdata |= t->hasReceiveData();
 
773
  }
 
774
  
 
775
  timeOutMillis = hasdata ? 0 : timeOutMillis;
 
776
  
 
777
  struct timeval timeout;
 
778
  timeout.tv_sec  = timeOutMillis / 1000;
 
779
  timeout.tv_usec = (timeOutMillis % 1000) * 1000;
 
780
 
 
781
  // The highest socket value plus one
 
782
  maxSocketValue++; 
 
783
  
 
784
  tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);  
 
785
  if(false && tcpReadSelectReply == -1 && errno == EINTR)
 
786
    g_eventLogger.info("woke-up by signal");
 
787
 
 
788
#ifdef NDB_WIN32
 
789
  if(tcpReadSelectReply == SOCKET_ERROR)
 
790
  {
 
791
    NdbSleep_MilliSleep(timeOutMillis);
 
792
  }
 
793
#endif
 
794
  
 
795
  return tcpReadSelectReply || hasdata;
 
796
}
 
797
#endif
 
798
 
 
799
 
 
800
void
 
801
TransporterRegistry::performReceive()
 
802
{
 
803
#ifdef NDB_TCP_TRANSPORTER
 
804
  for (int i=0; i<nTCPTransporters; i++) 
 
805
  {
 
806
    checkJobBuffer();
 
807
    TCP_Transporter *t = theTCPTransporters[i];
 
808
    const NodeId nodeId = t->getRemoteNodeId();
 
809
    const NDB_SOCKET_TYPE socket    = t->getSocket();
 
810
    if(is_connected(nodeId)){
 
811
      if(t->isConnected())
 
812
      {
 
813
        if (FD_ISSET(socket, &tcpReadset))
 
814
        {
 
815
          t->doReceive();
 
816
        }
 
817
 
 
818
        if (t->hasReceiveData())
 
819
        {
 
820
          Uint32 * ptr;
 
821
          Uint32 sz = t->getReceiveData(&ptr);
 
822
          transporter_recv_from(callbackObj, nodeId);
 
823
          Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]);
 
824
          t->updateReceiveDataPtr(szUsed);
 
825
        }
 
826
      } 
 
827
    }
 
828
  }
 
829
#endif
 
830
  
 
831
#ifdef NDB_SCI_TRANSPORTER
 
832
  //performReceive
 
833
  //do prepareReceive on the SCI transporters  (prepareReceive(t,,,,))
 
834
  for (int i=0; i<nSCITransporters; i++) 
 
835
  {
 
836
    checkJobBuffer();
 
837
    SCI_Transporter  *t = theSCITransporters[i];
 
838
    const NodeId nodeId = t->getRemoteNodeId();
 
839
    if(is_connected(nodeId))
 
840
    {
 
841
      if(t->isConnected() && t->checkConnected())
 
842
      {
 
843
        Uint32 * readPtr, * eodPtr;
 
844
        t->getReceivePtr(&readPtr, &eodPtr);
 
845
        transporter_recv_from(callbackObj, nodeId);
 
846
        Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
 
847
        t->updateReceivePtr(newPtr);
 
848
      }
 
849
    } 
 
850
  }
 
851
#endif
 
852
#ifdef NDB_SHM_TRANSPORTER
 
853
  for (int i=0; i<nSHMTransporters; i++) 
 
854
  {
 
855
    checkJobBuffer();
 
856
    SHM_Transporter *t = theSHMTransporters[i];
 
857
    const NodeId nodeId = t->getRemoteNodeId();
 
858
    if(is_connected(nodeId)){
 
859
      if(t->isConnected() && t->checkConnected())
 
860
      {
 
861
        Uint32 * readPtr, * eodPtr;
 
862
        t->getReceivePtr(&readPtr, &eodPtr);
 
863
        transporter_recv_from(callbackObj, nodeId);
 
864
        Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
 
865
        t->updateReceivePtr(newPtr);
 
866
      }
 
867
    } 
 
868
  }
 
869
#endif
 
870
}
 
871
 
 
872
void
 
873
TransporterRegistry::performSend()
 
874
{
 
875
  int i; 
 
876
  sendCounter = 1;
 
877
 
 
878
#ifdef NDB_TCP_TRANSPORTER
 
879
  for (i = m_transp_count; i < nTCPTransporters; i++) 
 
880
  {
 
881
    TCP_Transporter *t = theTCPTransporters[i];
 
882
    if (t && t->hasDataToSend() && t->isConnected() &&
 
883
        is_connected(t->getRemoteNodeId())) 
 
884
    {
 
885
      t->doSend();
 
886
    }
 
887
  }
 
888
  for (i = 0; i < m_transp_count && i < nTCPTransporters; i++) 
 
889
  {
 
890
    TCP_Transporter *t = theTCPTransporters[i];
 
891
    if (t && t->hasDataToSend() && t->isConnected() &&
 
892
        is_connected(t->getRemoteNodeId())) 
 
893
    {
 
894
      t->doSend();
 
895
    }
 
896
  }
 
897
  m_transp_count++;
 
898
  if (m_transp_count == nTCPTransporters) m_transp_count = 0;
 
899
#endif
 
900
#ifdef NDB_SCI_TRANSPORTER
 
901
  //scroll through the SCI transporters, 
 
902
  // get each transporter, check if connected, send data
 
903
  for (i=0; i<nSCITransporters; i++) {
 
904
    SCI_Transporter  *t = theSCITransporters[i];
 
905
    const NodeId nodeId = t->getRemoteNodeId();
 
906
    
 
907
    if(is_connected(nodeId))
 
908
    {
 
909
      if(t->isConnected() && t->hasDataToSend()) {
 
910
        t->doSend();
 
911
      } //if
 
912
    } //if
 
913
  }
 
914
#endif
 
915
  
 
916
#ifdef NDB_SHM_TRANSPORTER
 
917
  for (i=0; i<nSHMTransporters; i++) 
 
918
  {
 
919
    SHM_Transporter  *t = theSHMTransporters[i];
 
920
    const NodeId nodeId = t->getRemoteNodeId();
 
921
    if(is_connected(nodeId))
 
922
    {
 
923
      if(t->isConnected())
 
924
      {
 
925
        t->doSend();
 
926
      }
 
927
    }
 
928
  }
 
929
#endif
 
930
}
 
931
 
 
932
int
 
933
TransporterRegistry::forceSendCheck(int sendLimit){
 
934
  int tSendCounter = sendCounter;
 
935
  sendCounter = tSendCounter + 1;
 
936
  if (tSendCounter >= sendLimit) {
 
937
    performSend();
 
938
    sendCounter = 1;
 
939
    return 1;
 
940
  }//if
 
941
  return 0;
 
942
}//TransporterRegistry::forceSendCheck()
 
943
 
 
944
#ifdef DEBUG_TRANSPORTER
 
945
void
 
946
TransporterRegistry::printState(){
 
947
  ndbout << "-- TransporterRegistry -- " << endl << endl
 
948
         << "Transporters = " << nTransporters << endl;
 
949
  for(int i = 0; i<maxTransporters; i++)
 
950
    if(theTransporters[i] != NULL){
 
951
      const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
 
952
      ndbout << "Transporter: " << remoteNodeId 
 
953
             << " PerformState: " << performStates[remoteNodeId]
 
954
             << " IOState: " << ioStates[remoteNodeId] << endl;
 
955
    }
 
956
}
 
957
#endif
 
958
 
 
959
IOState
 
960
TransporterRegistry::ioState(NodeId nodeId) { 
 
961
  return ioStates[nodeId]; 
 
962
}
 
963
 
 
964
void
 
965
TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
 
966
  DEBUG("TransporterRegistry::setIOState("
 
967
        << nodeId << ", " << state << ")");
 
968
  ioStates[nodeId] = state;
 
969
}
 
970
 
 
971
static void * 
 
972
run_start_clients_C(void * me)
 
973
{
 
974
  ((TransporterRegistry*) me)->start_clients_thread();
 
975
  return 0;
 
976
}
 
977
 
 
978
// Run by kernel thread
 
979
void
 
980
TransporterRegistry::do_connect(NodeId node_id)
 
981
{
 
982
  PerformState &curr_state = performStates[node_id];
 
983
  switch(curr_state){
 
984
  case DISCONNECTED:
 
985
    break;
 
986
  case CONNECTED:
 
987
    return;
 
988
  case CONNECTING:
 
989
    return;
 
990
  case DISCONNECTING:
 
991
    break;
 
992
  }
 
993
  DBUG_ENTER("TransporterRegistry::do_connect");
 
994
  DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id));
 
995
  curr_state= CONNECTING;
 
996
  DBUG_VOID_RETURN;
 
997
}
 
998
void
 
999
TransporterRegistry::do_disconnect(NodeId node_id)
 
1000
{
 
1001
  PerformState &curr_state = performStates[node_id];
 
1002
  switch(curr_state){
 
1003
  case DISCONNECTED:
 
1004
    return;
 
1005
  case CONNECTED:
 
1006
    break;
 
1007
  case CONNECTING:
 
1008
    break;
 
1009
  case DISCONNECTING:
 
1010
    return;
 
1011
  }
 
1012
  DBUG_ENTER("TransporterRegistry::do_disconnect");
 
1013
  DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id));
 
1014
  curr_state= DISCONNECTING;
 
1015
  DBUG_VOID_RETURN;
 
1016
}
 
1017
 
 
1018
void
 
1019
TransporterRegistry::report_connect(NodeId node_id)
 
1020
{
 
1021
  DBUG_ENTER("TransporterRegistry::report_connect");
 
1022
  DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
 
1023
  performStates[node_id] = CONNECTED;
 
1024
  reportConnect(callbackObj, node_id);
 
1025
  DBUG_VOID_RETURN;
 
1026
}
 
1027
 
 
1028
void
 
1029
TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
 
1030
{
 
1031
  DBUG_ENTER("TransporterRegistry::report_disconnect");
 
1032
  DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
 
1033
  performStates[node_id] = DISCONNECTED;
 
1034
  reportDisconnect(callbackObj, node_id, errnum);
 
1035
  DBUG_VOID_RETURN;
 
1036
}
 
1037
 
 
1038
void
 
1039
TransporterRegistry::update_connections()
 
1040
{
 
1041
  for (int i= 0, n= 0; n < nTransporters; i++){
 
1042
    Transporter * t = theTransporters[i];
 
1043
    if (!t)
 
1044
      continue;
 
1045
    n++;
 
1046
 
 
1047
    const NodeId nodeId = t->getRemoteNodeId();
 
1048
    switch(performStates[nodeId]){
 
1049
    case CONNECTED:
 
1050
    case DISCONNECTED:
 
1051
      break;
 
1052
    case CONNECTING:
 
1053
      if(t->isConnected())
 
1054
        report_connect(nodeId);
 
1055
      break;
 
1056
    case DISCONNECTING:
 
1057
      if(!t->isConnected())
 
1058
        report_disconnect(nodeId, 0);
 
1059
      break;
 
1060
    }
 
1061
  }
 
1062
}
 
1063
 
 
1064
// run as own thread
 
1065
void
 
1066
TransporterRegistry::start_clients_thread()
 
1067
{
 
1068
  int persist_mgm_count= 0;
 
1069
  DBUG_ENTER("TransporterRegistry::start_clients_thread");
 
1070
  while (m_run_start_clients_thread) {
 
1071
    NdbSleep_MilliSleep(100);
 
1072
    persist_mgm_count++;
 
1073
    if(persist_mgm_count==50)
 
1074
    {
 
1075
      ndb_mgm_check_connection(m_mgm_handle);
 
1076
      persist_mgm_count= 0;
 
1077
    }
 
1078
    for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
 
1079
      Transporter * t = theTransporters[i];
 
1080
      if (!t)
 
1081
        continue;
 
1082
      n++;
 
1083
 
 
1084
      const NodeId nodeId = t->getRemoteNodeId();
 
1085
      switch(performStates[nodeId]){
 
1086
      case CONNECTING:
 
1087
        if(!t->isConnected() && !t->isServer) {
 
1088
          bool connected= false;
 
1089
          /**
 
1090
           * First, we try to connect (if we have a port number).
 
1091
           */
 
1092
          if (t->get_s_port())
 
1093
            connected= t->connect_client();
 
1094
 
 
1095
          /**
 
1096
           * If dynamic, get the port for connecting from the management server
 
1097
           */
 
1098
          if( !connected && t->get_s_port() <= 0) {     // Port is dynamic
 
1099
            int server_port= 0;
 
1100
            struct ndb_mgm_reply mgm_reply;
 
1101
 
 
1102
            if(!ndb_mgm_is_connected(m_mgm_handle))
 
1103
              ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
 
1104
            
 
1105
            if(ndb_mgm_is_connected(m_mgm_handle))
 
1106
            {
 
1107
              int res=
 
1108
                ndb_mgm_get_connection_int_parameter(m_mgm_handle,
 
1109
                                                     t->getRemoteNodeId(),
 
1110
                                                     t->getLocalNodeId(),
 
1111
                                                     CFG_CONNECTION_SERVER_PORT,
 
1112
                                                     &server_port,
 
1113
                                                     &mgm_reply);
 
1114
              DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
 
1115
                                 server_port,t->getRemoteNodeId(),
 
1116
                                 t->getLocalNodeId(),res));
 
1117
              if( res >= 0 )
 
1118
              {
 
1119
                /**
 
1120
                 * Server_port == 0 just means that that a mgmt server
 
1121
                 * has not received a new port yet. Keep the old.
 
1122
                 */
 
1123
                if (server_port)
 
1124
                  t->set_s_port(server_port);
 
1125
              }
 
1126
              else if(ndb_mgm_is_connected(m_mgm_handle))
 
1127
              {
 
1128
                g_eventLogger.info("Failed to get dynamic port to connect to: %d", res);
 
1129
                ndb_mgm_disconnect(m_mgm_handle);
 
1130
              }
 
1131
              else
 
1132
              {
 
1133
                g_eventLogger.info("Management server closed connection early. "
 
1134
                         "It is probably being shut down (or has problems). "
 
1135
                         "We will retry the connection. %d %s %s line: %d",
 
1136
                                   ndb_mgm_get_latest_error(m_mgm_handle),
 
1137
                                   ndb_mgm_get_latest_error_desc(m_mgm_handle),
 
1138
                                   ndb_mgm_get_latest_error_msg(m_mgm_handle),
 
1139
                                   ndb_mgm_get_latest_error_line(m_mgm_handle)
 
1140
                                   );
 
1141
              }
 
1142
            }
 
1143
            /** else
 
1144
             * We will not be able to get a new port unless
 
1145
             * the m_mgm_handle is connected. Note that not
 
1146
             * being connected is an ok state, just continue
 
1147
             * until it is able to connect. Continue using the
 
1148
             * old port until we can connect again and get a
 
1149
             * new port.
 
1150
             */
 
1151
          }
 
1152
        }
 
1153
        break;
 
1154
      case DISCONNECTING:
 
1155
        if(t->isConnected())
 
1156
          t->doDisconnect();
 
1157
        break;
 
1158
      default:
 
1159
        break;
 
1160
      }
 
1161
    }
 
1162
  }
 
1163
  DBUG_VOID_RETURN;
 
1164
}
 
1165
 
 
1166
bool
 
1167
TransporterRegistry::start_clients()
 
1168
{
 
1169
  m_run_start_clients_thread= true;
 
1170
  m_start_clients_thread= NdbThread_Create(run_start_clients_C,
 
1171
                                           (void**)this,
 
1172
                                           32768,
 
1173
                                           "ndb_start_clients",
 
1174
                                           NDB_THREAD_PRIO_LOW);
 
1175
  if (m_start_clients_thread == 0) {
 
1176
    m_run_start_clients_thread= false;
 
1177
    return false;
 
1178
  }
 
1179
  return true;
 
1180
}
 
1181
 
 
1182
bool
 
1183
TransporterRegistry::stop_clients()
 
1184
{
 
1185
  if (m_start_clients_thread) {
 
1186
    m_run_start_clients_thread= false;
 
1187
    void* status;
 
1188
    NdbThread_WaitFor(m_start_clients_thread, &status);
 
1189
    NdbThread_Destroy(&m_start_clients_thread);
 
1190
  }
 
1191
  return true;
 
1192
}
 
1193
 
 
1194
void
 
1195
TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
 
1196
                                               const char *interf, 
 
1197
                                               int s_port)
 
1198
{
 
1199
  DBUG_ENTER("TransporterRegistry::add_transporter_interface");
 
1200
  DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));
 
1201
  if (interf && strlen(interf) == 0)
 
1202
    interf= 0;
 
1203
 
 
1204
  for (unsigned i= 0; i < m_transporter_interface.size(); i++)
 
1205
  {
 
1206
    Transporter_interface &tmp= m_transporter_interface[i];
 
1207
    if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
 
1208
      continue;
 
1209
    if (interf != 0 && tmp.m_interface != 0 &&
 
1210
        strcmp(interf, tmp.m_interface) == 0)
 
1211
    {
 
1212
      DBUG_VOID_RETURN; // found match, no need to insert
 
1213
    }
 
1214
    if (interf == 0 && tmp.m_interface == 0)
 
1215
    {
 
1216
      DBUG_VOID_RETURN; // found match, no need to insert
 
1217
    }
 
1218
  }
 
1219
  Transporter_interface t;
 
1220
  t.m_remote_nodeId= remoteNodeId;
 
1221
  t.m_s_service_port= s_port;
 
1222
  t.m_interface= interf;
 
1223
  m_transporter_interface.push_back(t);
 
1224
  DBUG_PRINT("exit",("interface and port added"));
 
1225
  DBUG_VOID_RETURN;
 
1226
}
 
1227
 
 
1228
bool
 
1229
TransporterRegistry::start_service(SocketServer& socket_server)
 
1230
{
 
1231
  DBUG_ENTER("TransporterRegistry::start_service");
 
1232
  if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
 
1233
  {
 
1234
    g_eventLogger.error("TransporterRegistry::startReceiving: localNodeId not specified");
 
1235
    DBUG_RETURN(false);
 
1236
  }
 
1237
 
 
1238
  for (unsigned i= 0; i < m_transporter_interface.size(); i++)
 
1239
  {
 
1240
    Transporter_interface &t= m_transporter_interface[i];
 
1241
 
 
1242
    unsigned short port= (unsigned short)t.m_s_service_port;
 
1243
    if(t.m_s_service_port<0)
 
1244
      port= -t.m_s_service_port; // is a dynamic port
 
1245
    TransporterService *transporter_service =
 
1246
      new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
 
1247
    if(!socket_server.setup(transporter_service,
 
1248
                            &port, t.m_interface))
 
1249
    {
 
1250
      DBUG_PRINT("info", ("Trying new port"));
 
1251
      port= 0;
 
1252
      if(t.m_s_service_port>0
 
1253
         || !socket_server.setup(transporter_service,
 
1254
                                 &port, t.m_interface))
 
1255
      {
 
1256
        /*
 
1257
         * If it wasn't a dynamically allocated port, or
 
1258
         * our attempts at getting a new dynamic port failed
 
1259
         */
 
1260
        g_eventLogger.error("Unable to setup transporter service port: %s:%d!\n"
 
1261
                 "Please check if the port is already used,\n"
 
1262
                 "(perhaps the node is already running)",
 
1263
                 t.m_interface ? t.m_interface : "*", t.m_s_service_port);
 
1264
        delete transporter_service;
 
1265
        DBUG_RETURN(false);
 
1266
      }
 
1267
    }
 
1268
    t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
 
1269
    DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
 
1270
    transporter_service->setTransporterRegistry(this);
 
1271
  }
 
1272
  DBUG_RETURN(true);
 
1273
}
 
1274
 
 
1275
#ifdef NDB_SHM_TRANSPORTER
 
1276
static
 
1277
RETSIGTYPE 
 
1278
shm_sig_handler(int signo)
 
1279
{
 
1280
  g_shm_counter++;
 
1281
}
 
1282
#endif
 
1283
 
 
1284
void
 
1285
TransporterRegistry::startReceiving()
 
1286
{
 
1287
  DBUG_ENTER("TransporterRegistry::startReceiving");
 
1288
 
 
1289
#ifdef NDB_SHM_TRANSPORTER
 
1290
  m_shm_own_pid = getpid();
 
1291
  if (g_ndb_shm_signum)
 
1292
  {
 
1293
    DBUG_PRINT("info",("Install signal handler for signum %d",
 
1294
                       g_ndb_shm_signum));
 
1295
    struct sigaction sa;
 
1296
    NdbThread_set_shm_sigmask(FALSE);
 
1297
    sigemptyset(&sa.sa_mask);
 
1298
    sa.sa_handler = shm_sig_handler;
 
1299
    sa.sa_flags = 0;
 
1300
    int ret;
 
1301
    while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR);
 
1302
    if(ret != 0)
 
1303
    {
 
1304
      DBUG_PRINT("error",("Install failed"));
 
1305
      g_eventLogger.error("Failed to install signal handler for"
 
1306
                          " SHM transporter, signum %d, errno: %d (%s)",
 
1307
                          g_ndb_shm_signum, errno, strerror(errno));
 
1308
    }
 
1309
  }
 
1310
#endif // NDB_SHM_TRANSPORTER
 
1311
  DBUG_VOID_RETURN;
 
1312
}
 
1313
 
 
1314
void
 
1315
TransporterRegistry::stopReceiving(){
 
1316
  /**
 
1317
   * Disconnect all transporters, this includes detach from remote node
 
1318
   * and since that must be done from the same process that called attach
 
1319
   * it's done here in the receive thread
 
1320
   */
 
1321
  disconnectAll();
 
1322
}
 
1323
 
 
1324
void
 
1325
TransporterRegistry::startSending(){
 
1326
}
 
1327
 
 
1328
void
 
1329
TransporterRegistry::stopSending(){
 
1330
}
 
1331
 
 
1332
NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
 
1333
  out << "-- Signal Header --" << endl;
 
1334
  out << "theLength:    " << sh.theLength << endl;
 
1335
  out << "gsn:          " << sh.theVerId_signalNumber << endl;
 
1336
  out << "recBlockNo:   " << sh.theReceiversBlockNumber << endl;
 
1337
  out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
 
1338
  out << "sendersSig:   " << sh.theSendersSignalId << endl;
 
1339
  out << "theSignalId:  " << sh.theSignalId << endl;
 
1340
  out << "trace:        " << (int)sh.theTrace << endl;
 
1341
  return out;
 
1342
 
1343
 
 
1344
Transporter*
 
1345
TransporterRegistry::get_transporter(NodeId nodeId) {
 
1346
  return theTransporters[nodeId];
 
1347
}
 
1348
 
 
1349
bool TransporterRegistry::connect_client(NdbMgmHandle *h)
 
1350
{
 
1351
  DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
 
1352
 
 
1353
  Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
 
1354
 
 
1355
  if(!mgm_nodeid)
 
1356
  {
 
1357
    g_eventLogger.error("%s: %d", __FILE__, __LINE__);
 
1358
    return false;
 
1359
  }
 
1360
  Transporter * t = theTransporters[mgm_nodeid];
 
1361
  if (!t)
 
1362
  {
 
1363
    g_eventLogger.error("%s: %d", __FILE__, __LINE__);
 
1364
    return false;
 
1365
  }
 
1366
  DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h)));
 
1367
}
 
1368
 
 
1369
/**
 
1370
 * Given a connected NdbMgmHandle, turns it into a transporter
 
1371
 * and returns the socket.
 
1372
 */
 
1373
NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
 
1374
{
 
1375
  struct ndb_mgm_reply mgm_reply;
 
1376
 
 
1377
  if ( h==NULL || *h == NULL )
 
1378
  {
 
1379
    g_eventLogger.error("%s: %d", __FILE__, __LINE__);
 
1380
    return NDB_INVALID_SOCKET;
 
1381
  }
 
1382
 
 
1383
  for(unsigned int i=0;i < m_transporter_interface.size();i++)
 
1384
    if (m_transporter_interface[i].m_s_service_port < 0
 
1385
        && ndb_mgm_set_connection_int_parameter(*h,
 
1386
                                   get_localNodeId(),
 
1387
                                   m_transporter_interface[i].m_remote_nodeId,
 
1388
                                   CFG_CONNECTION_SERVER_PORT,
 
1389
                                   m_transporter_interface[i].m_s_service_port,
 
1390
                                   &mgm_reply) < 0)
 
1391
    {
 
1392
      g_eventLogger.error("Error: %s: %d",
 
1393
               ndb_mgm_get_latest_error_desc(*h),
 
1394
               ndb_mgm_get_latest_error(*h));
 
1395
      g_eventLogger.error("%s: %d", __FILE__, __LINE__);
 
1396
      ndb_mgm_destroy_handle(h);
 
1397
      return NDB_INVALID_SOCKET;
 
1398
    }
 
1399
 
 
1400
  /**
 
1401
   * convert_to_transporter also disposes of the handle (i.e. we don't leak
 
1402
   * memory here.
 
1403
   */
 
1404
  NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h);
 
1405
  if ( sockfd == NDB_INVALID_SOCKET)
 
1406
  {
 
1407
    g_eventLogger.error("Error: %s: %d",
 
1408
             ndb_mgm_get_latest_error_desc(*h),
 
1409
             ndb_mgm_get_latest_error(*h));
 
1410
    g_eventLogger.error("%s: %d", __FILE__, __LINE__);
 
1411
    ndb_mgm_destroy_handle(h);
 
1412
  }
 
1413
  return sockfd;
 
1414
}
 
1415
 
 
1416
/**
 
1417
 * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
 
1418
 * and returns the socket.
 
1419
 */
 
1420
NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc)
 
1421
{
 
1422
  NdbMgmHandle h= ndb_mgm_create_handle();
 
1423
 
 
1424
  if ( h == NULL )
 
1425
  {
 
1426
    return NDB_INVALID_SOCKET;
 
1427
  }
 
1428
 
 
1429
  /**
 
1430
   * Set connectstring
 
1431
   */
 
1432
  {
 
1433
    BaseString cs;
 
1434
    cs.assfmt("%s:%u",sc->get_server_name(),sc->get_port());
 
1435
    ndb_mgm_set_connectstring(h, cs.c_str());
 
1436
  }
 
1437
 
 
1438
  if(ndb_mgm_connect(h, 0, 0, 0)<0)
 
1439
  {
 
1440
    ndb_mgm_destroy_handle(&h);
 
1441
    return NDB_INVALID_SOCKET;
 
1442
  }
 
1443
 
 
1444
  return connect_ndb_mgmd(&h);
 
1445
}
 
1446
 
 
1447
template class Vector<TransporterRegistry::Transporter_interface>;