~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/ndbapi/TransporterFacade.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <ndb_global.h>
 
17
#include <my_pthread.h>
 
18
#include <ndb_limits.h>
 
19
#include "TransporterFacade.hpp"
 
20
#include "ClusterMgr.hpp"
 
21
#include <IPCConfig.hpp>
 
22
#include <TransporterCallback.hpp>
 
23
#include <TransporterRegistry.hpp>
 
24
#include "NdbApiSignal.hpp"
 
25
#include <NdbOut.hpp>
 
26
#include <NdbEnv.h>
 
27
#include <NdbSleep.h>
 
28
 
 
29
#include "API.hpp"
 
30
#include <ConfigRetriever.hpp>
 
31
#include <mgmapi_config_parameters.h>
 
32
#include <mgmapi_configuration.hpp>
 
33
#include <NdbConfig.h>
 
34
#include <ndb_version.h>
 
35
#include <SignalLoggerManager.hpp>
 
36
#include <kernel/ndb_limits.h>
 
37
#include <signaldata/AlterTable.hpp>
 
38
#include <signaldata/SumaImpl.hpp>
 
39
 
 
40
//#define REPORT_TRANSPORTER
 
41
//#define API_TRACE;
 
42
 
 
43
static int numberToIndex(int number)
 
44
{
 
45
  return number - MIN_API_BLOCK_NO;
 
46
}
 
47
 
 
48
static int indexToNumber(int index)
 
49
{
 
50
  return index + MIN_API_BLOCK_NO;
 
51
}
 
52
 
 
53
#if defined DEBUG_TRANSPORTER
 
54
#define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
 
55
#else
 
56
#define TRP_DEBUG(t)
 
57
#endif
 
58
 
 
59
/*****************************************************************************
 
60
 * Call back functions
 
61
 *****************************************************************************/
 
62
 
 
63
void
 
64
reportError(void * callbackObj, NodeId nodeId,
 
65
            TransporterError errorCode, const char *info)
 
66
{
 
67
#ifdef REPORT_TRANSPORTER
 
68
  ndbout_c("REPORT_TRANSP: reportError (nodeId=%d, errorCode=%d) %s", 
 
69
           (int)nodeId, (int)errorCode, info ? info : "");
 
70
#endif
 
71
  if(errorCode & TE_DO_DISCONNECT) {
 
72
    ndbout_c("reportError (%d, %d) %s", (int)nodeId, (int)errorCode,
 
73
             info ? info : "");
 
74
    ((TransporterFacade*)(callbackObj))->doDisconnect(nodeId);
 
75
  }
 
76
}
 
77
 
 
78
/**
 
79
 * Report average send length in bytes (4096 last sends)
 
80
 */
 
81
void
 
82
reportSendLen(void * callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
 
83
#ifdef REPORT_TRANSPORTER
 
84
  ndbout_c("REPORT_TRANSP: reportSendLen (nodeId=%d, bytes/count=%d)", 
 
85
           (int)nodeId, (Uint32)(bytes/count));
 
86
#endif
 
87
  (void)nodeId;
 
88
  (void)count;
 
89
  (void)bytes;
 
90
}
 
91
 
 
92
/** 
 
93
 * Report average receive length in bytes (4096 last receives)
 
94
 */
 
95
void
 
96
reportReceiveLen(void * callbackObj, 
 
97
                 NodeId nodeId, Uint32 count, Uint64 bytes){
 
98
#ifdef REPORT_TRANSPORTER
 
99
  ndbout_c("REPORT_TRANSP: reportReceiveLen (nodeId=%d, bytes/count=%d)", 
 
100
           (int)nodeId, (Uint32)(bytes/count));
 
101
#endif
 
102
  (void)nodeId;
 
103
  (void)count;
 
104
  (void)bytes;
 
105
}
 
106
 
 
107
/**
 
108
 * Report connection established
 
109
 */
 
110
void
 
111
reportConnect(void * callbackObj, NodeId nodeId){
 
112
#ifdef REPORT_TRANSPORTER
 
113
  ndbout_c("REPORT_TRANSP: API reportConnect (nodeId=%d)", (int)nodeId);
 
114
#endif
 
115
  ((TransporterFacade*)(callbackObj))->reportConnected(nodeId);
 
116
}
 
117
 
 
118
/**
 
119
 * Report connection broken
 
120
 */
 
121
void
 
122
reportDisconnect(void * callbackObj, NodeId nodeId, Uint32 error){
 
123
#ifdef REPORT_TRANSPORTER
 
124
  ndbout_c("REPORT_TRANSP: API reportDisconnect (nodeId=%d)", (int)nodeId);
 
125
#endif
 
126
  ((TransporterFacade*)(callbackObj))->reportDisconnected(nodeId);
 
127
}
 
128
 
 
129
void
 
130
transporter_recv_from(void * callbackObj, NodeId nodeId){
 
131
  ((TransporterFacade*)(callbackObj))->hb_received(nodeId);
 
132
}
 
133
 
 
134
/****************************************************************************
 
135
 * 
 
136
 *****************************************************************************/
 
137
 
 
138
/**
 
139
 * Report connection broken
 
140
 */
 
141
int checkJobBuffer() {
 
142
  return 0;
 
143
}
 
144
 
 
145
#ifdef API_TRACE
 
146
static const char * API_SIGNAL_LOG = "API_SIGNAL_LOG";
 
147
static const char * apiSignalLog   = 0;
 
148
static SignalLoggerManager signalLogger;
 
149
static
 
150
inline
 
151
bool
 
152
setSignalLog(){
 
153
  signalLogger.flushSignalLog();
 
154
 
 
155
  const char * tmp = NdbEnv_GetEnv(API_SIGNAL_LOG, (char *)0, 0);
 
156
  if(tmp != 0 && apiSignalLog != 0 && strcmp(tmp,apiSignalLog) == 0){
 
157
    return true;
 
158
  } else if(tmp == 0 && apiSignalLog == 0){
 
159
    return false;
 
160
  } else if(tmp == 0 && apiSignalLog != 0){
 
161
    signalLogger.setOutputStream(0);
 
162
    apiSignalLog = tmp;
 
163
    return false;
 
164
  } else if(tmp !=0){
 
165
    if (strcmp(tmp, "-") == 0)
 
166
        signalLogger.setOutputStream(stdout);
 
167
#ifndef DBUG_OFF
 
168
    else if (strcmp(tmp, "+") == 0)
 
169
        signalLogger.setOutputStream(DBUG_FILE);
 
170
#endif
 
171
    else
 
172
        signalLogger.setOutputStream(fopen(tmp, "w"));
 
173
    apiSignalLog = tmp;
 
174
    return true;
 
175
  }
 
176
  return false;
 
177
}
 
178
#ifdef TRACE_APIREGREQ
 
179
#define TRACE_GSN(gsn) true
 
180
#else
 
181
#define TRACE_GSN(gsn) (gsn != GSN_API_REGREQ && gsn != GSN_API_REGCONF)
 
182
#endif
 
183
#endif
 
184
 
 
185
/**
 
186
 * The execute function : Handle received signal
 
187
 */
 
188
void
 
189
execute(void * callbackObj, SignalHeader * const header, 
 
190
        Uint8 prio, Uint32 * const theData,
 
191
        LinearSectionPtr ptr[3]){
 
192
 
 
193
  TransporterFacade * theFacade = (TransporterFacade*)callbackObj;
 
194
  TransporterFacade::ThreadData::Object_Execute oe; 
 
195
  Uint32 tRecBlockNo = header->theReceiversBlockNumber;
 
196
  
 
197
#ifdef API_TRACE
 
198
  if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){
 
199
    signalLogger.executeSignal(* header, 
 
200
                               prio,
 
201
                               theData,
 
202
                               theFacade->ownId(), 
 
203
                               ptr, header->m_noOfSections);
 
204
    signalLogger.flushSignalLog();
 
205
  }
 
206
#endif  
 
207
 
 
208
  if (tRecBlockNo >= MIN_API_BLOCK_NO) {
 
209
    oe = theFacade->m_threads.get(tRecBlockNo);
 
210
    if (oe.m_object != 0 && oe.m_executeFunction != 0) {
 
211
      /**
 
212
       * Handle received signal immediately to avoid any unnecessary
 
213
       * copying of data, allocation of memory and other things. Copying
 
214
       * of data could be interesting to support several priority levels
 
215
       * and to support a special memory structure when executing the
 
216
       * signals. Neither of those are interesting when receiving data
 
217
       * in the NDBAPI. The NDBAPI will thus read signal data directly as
 
218
       * it was written by the sender (SCI sender is other node, Shared
 
219
       * memory sender is other process and TCP/IP sender is the OS that
 
220
       * writes the TCP/IP message into a message buffer).
 
221
       */
 
222
      NdbApiSignal tmpSignal(*header);
 
223
      NdbApiSignal * tSignal = &tmpSignal;
 
224
      tSignal->setDataPtr(theData);
 
225
      (* oe.m_executeFunction) (oe.m_object, tSignal, ptr);
 
226
    }//if
 
227
  } else if (tRecBlockNo == API_PACKED) {
 
228
    /**
 
229
     * Block number == 2047 is used to signal a signal that consists of
 
230
     * multiple instances of the same signal. This is an effort to
 
231
     * package the signals so as to avoid unnecessary communication
 
232
     * overhead since TCP/IP has a great performance impact.
 
233
     */
 
234
    Uint32 Tlength = header->theLength;
 
235
    Uint32 Tsent = 0;
 
236
    /**
 
237
     * Since it contains at least two data packets we will first
 
238
     * copy the signal data to safe place.
 
239
     */
 
240
    while (Tsent < Tlength) {
 
241
      Uint32 Theader = theData[Tsent];
 
242
      Tsent++;
 
243
      Uint32 TpacketLen = (Theader & 0x1F) + 3;
 
244
      tRecBlockNo = Theader >> 16;
 
245
      if (TpacketLen <= 25) {
 
246
        if ((TpacketLen + Tsent) <= Tlength) {
 
247
          /**
 
248
           * Set the data length of the signal and the receivers block
 
249
           * reference and then call the API.
 
250
           */
 
251
          header->theLength = TpacketLen;
 
252
          header->theReceiversBlockNumber = tRecBlockNo;
 
253
          Uint32* tDataPtr = &theData[Tsent];
 
254
          Tsent += TpacketLen;
 
255
          if (tRecBlockNo >= MIN_API_BLOCK_NO) {
 
256
            oe = theFacade->m_threads.get(tRecBlockNo);
 
257
            if(oe.m_object != 0 && oe.m_executeFunction != 0){
 
258
              NdbApiSignal tmpSignal(*header);
 
259
              NdbApiSignal * tSignal = &tmpSignal;
 
260
              tSignal->setDataPtr(tDataPtr);
 
261
              (*oe.m_executeFunction)(oe.m_object, tSignal, 0);
 
262
            }
 
263
          }
 
264
        }
 
265
      }
 
266
    }
 
267
    return;
 
268
  } else if (tRecBlockNo == API_CLUSTERMGR) {
 
269
     /**
 
270
      * The signal was aimed for the Cluster Manager. 
 
271
      * We handle it immediately here.
 
272
      */     
 
273
     ClusterMgr * clusterMgr = theFacade->theClusterMgr;
 
274
     const Uint32 gsn = header->theVerId_signalNumber;
 
275
 
 
276
     switch (gsn){
 
277
     case GSN_API_REGREQ:
 
278
       clusterMgr->execAPI_REGREQ(theData);
 
279
       break;
 
280
 
 
281
     case GSN_API_REGCONF:
 
282
       clusterMgr->execAPI_REGCONF(theData);
 
283
       break;
 
284
     
 
285
     case GSN_API_REGREF:
 
286
       clusterMgr->execAPI_REGREF(theData);
 
287
       break;
 
288
 
 
289
     case GSN_NODE_FAILREP:
 
290
       clusterMgr->execNODE_FAILREP(theData);
 
291
       break;
 
292
       
 
293
     case GSN_NF_COMPLETEREP:
 
294
       clusterMgr->execNF_COMPLETEREP(theData);
 
295
       break;
 
296
 
 
297
     case GSN_ARBIT_STARTREQ:
 
298
       if (theFacade->theArbitMgr != NULL)
 
299
         theFacade->theArbitMgr->doStart(theData);
 
300
       break;
 
301
       
 
302
     case GSN_ARBIT_CHOOSEREQ:
 
303
       if (theFacade->theArbitMgr != NULL)
 
304
         theFacade->theArbitMgr->doChoose(theData);
 
305
       break;
 
306
       
 
307
     case GSN_ARBIT_STOPORD:
 
308
       if(theFacade->theArbitMgr != NULL)
 
309
         theFacade->theArbitMgr->doStop(theData);
 
310
       break;
 
311
 
 
312
     case GSN_ALTER_TABLE_REP:
 
313
     {
 
314
       const AlterTableRep* rep = (const AlterTableRep*)theData;
 
315
       theFacade->m_globalDictCache.lock();
 
316
       theFacade->m_globalDictCache.
 
317
         alter_table_rep((const char*)ptr[0].p, 
 
318
                         rep->tableId,
 
319
                         rep->tableVersion,
 
320
                         rep->changeType == AlterTableRep::CT_ALTERED);
 
321
       theFacade->m_globalDictCache.unlock();
 
322
       break;
 
323
     }
 
324
     case GSN_SUB_GCP_COMPLETE_REP:
 
325
     {
 
326
       /**
 
327
        * Report
 
328
        */
 
329
       NdbApiSignal tSignal(* header);
 
330
       tSignal.setDataPtr(theData);
 
331
       theFacade->for_each(&tSignal, ptr);
 
332
 
 
333
       /**
 
334
        * Reply
 
335
        */
 
336
       {
 
337
         Uint32* send= tSignal.getDataPtrSend();
 
338
         memcpy(send, theData, tSignal.getLength() << 2);
 
339
         ((SubGcpCompleteAck*)send)->rep.senderRef = 
 
340
           numberToRef(API_CLUSTERMGR, theFacade->theOwnId);
 
341
         Uint32 ref= header->theSendersBlockRef;
 
342
         Uint32 aNodeId= refToNode(ref);
 
343
         tSignal.theReceiversBlockNumber= refToBlock(ref);
 
344
         tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK;
 
345
         theFacade->sendSignalUnCond(&tSignal, aNodeId);
 
346
       }
 
347
       break;
 
348
     }
 
349
     default:
 
350
       break;
 
351
       
 
352
     }
 
353
     return;
 
354
  } else {
 
355
    ; // Ignore all other block numbers.
 
356
    if(header->theVerId_signalNumber!=3) {
 
357
      TRP_DEBUG( "TransporterFacade received signal to unknown block no." );
 
358
      ndbout << "BLOCK NO: "  << tRecBlockNo << " sig " 
 
359
             << header->theVerId_signalNumber  << endl;
 
360
      abort();
 
361
    }
 
362
  }
 
363
}
 
364
 
 
365
// These symbols are needed, but not used in the API
 
366
void 
 
367
SignalLoggerManager::printSegmentedSection(FILE *, const SignalHeader &,
 
368
                                           const SegmentedSectionPtr ptr[3],
 
369
                                           unsigned i){
 
370
  abort();
 
371
}
 
372
 
 
373
void 
 
374
copy(Uint32 * & insertPtr, 
 
375
     class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
 
376
  abort();
 
377
}
 
378
 
 
379
/**
 
380
 * Note that this function need no locking since its
 
381
 * only called from the constructor of Ndb (the NdbObject)
 
382
 * 
 
383
 * Which is protected by a mutex
 
384
 */
 
385
 
 
386
int
 
387
TransporterFacade::start_instance(int nodeId, 
 
388
                                  const ndb_mgm_configuration* props)
 
389
{
 
390
  if (! init(nodeId, props)) {
 
391
    return -1;
 
392
  }
 
393
  
 
394
  /**
 
395
   * Install signal handler for SIGPIPE
 
396
   *
 
397
   * This due to the fact that a socket connection might have
 
398
   * been closed in between a select and a corresponding send
 
399
   */
 
400
#if !defined NDB_WIN32
 
401
  signal(SIGPIPE, SIG_IGN);
 
402
#endif
 
403
 
 
404
  return 0;
 
405
}
 
406
 
 
407
/**
 
408
 * Note that this function need no locking since its
 
409
 * only called from the destructor of Ndb (the NdbObject)
 
410
 * 
 
411
 * Which is protected by a mutex
 
412
 */
 
413
void
 
414
TransporterFacade::stop_instance(){
 
415
  DBUG_ENTER("TransporterFacade::stop_instance");
 
416
  doStop();
 
417
  DBUG_VOID_RETURN;
 
418
}
 
419
 
 
420
void
 
421
TransporterFacade::doStop(){
 
422
  DBUG_ENTER("TransporterFacade::doStop");
 
423
  /**
 
424
   * First stop the ClusterMgr because it needs to send one more signal
 
425
   * and also uses theFacadeInstance to lock/unlock theMutexPtr
 
426
   */
 
427
  if (theClusterMgr != NULL) theClusterMgr->doStop();
 
428
  if (theArbitMgr != NULL) theArbitMgr->doStop(NULL);
 
429
  
 
430
  /**
 
431
   * Now stop the send and receive threads
 
432
   */
 
433
  void *status;
 
434
  theStopReceive = 1;
 
435
  if (theReceiveThread) {
 
436
    NdbThread_WaitFor(theReceiveThread, &status);
 
437
    NdbThread_Destroy(&theReceiveThread);
 
438
  }
 
439
  if (theSendThread) {
 
440
    NdbThread_WaitFor(theSendThread, &status);
 
441
    NdbThread_Destroy(&theSendThread);
 
442
  }
 
443
  DBUG_VOID_RETURN;
 
444
}
 
445
 
 
446
extern "C" 
 
447
void* 
 
448
runSendRequest_C(void * me)
 
449
{
 
450
  ((TransporterFacade*) me)->threadMainSend();
 
451
  return 0;
 
452
}
 
453
 
 
454
void TransporterFacade::threadMainSend(void)
 
455
{
 
456
  theTransporterRegistry->startSending();
 
457
  if (!theTransporterRegistry->start_clients()){
 
458
    ndbout_c("Unable to start theTransporterRegistry->start_clients");
 
459
    exit(0);
 
460
  }
 
461
 
 
462
  m_socket_server.startServer();
 
463
 
 
464
  while(!theStopReceive) {
 
465
    NdbSleep_MilliSleep(10);
 
466
    NdbMutex_Lock(theMutexPtr);
 
467
    if (sendPerformedLastInterval == 0) {
 
468
      theTransporterRegistry->performSend();
 
469
    }
 
470
    sendPerformedLastInterval = 0;
 
471
    NdbMutex_Unlock(theMutexPtr);
 
472
  }
 
473
  theTransporterRegistry->stopSending();
 
474
 
 
475
  m_socket_server.stopServer();
 
476
  m_socket_server.stopSessions(true);
 
477
 
 
478
  theTransporterRegistry->stop_clients();
 
479
}
 
480
 
 
481
extern "C" 
 
482
void* 
 
483
runReceiveResponse_C(void * me)
 
484
{
 
485
  ((TransporterFacade*) me)->threadMainReceive();
 
486
  return 0;
 
487
}
 
488
 
 
489
/*
 
490
  The receiver thread is changed to only wake up once every 10 milliseconds
 
491
  to poll. It will first check that nobody owns the poll "right" before
 
492
  polling. This means that methods using the receiveResponse and
 
493
  sendRecSignal will have a slightly longer response time if they are
 
494
  executed without any parallel key lookups. Currently also scans are
 
495
  affected but this is to be fixed.
 
496
*/
 
497
void TransporterFacade::threadMainReceive(void)
 
498
{
 
499
  theTransporterRegistry->startReceiving();
 
500
#ifdef NDB_SHM_TRANSPORTER
 
501
  NdbThread_set_shm_sigmask(TRUE);
 
502
#endif
 
503
  NdbMutex_Lock(theMutexPtr);
 
504
  theTransporterRegistry->update_connections();
 
505
  NdbMutex_Unlock(theMutexPtr);
 
506
  while(!theStopReceive) {
 
507
    for(int i = 0; i<10; i++){
 
508
      NdbSleep_MilliSleep(10);
 
509
      NdbMutex_Lock(theMutexPtr);
 
510
      if (poll_owner == NULL) {
 
511
        const int res = theTransporterRegistry->pollReceive(0);
 
512
        if(res > 0)
 
513
          theTransporterRegistry->performReceive();
 
514
      }
 
515
      NdbMutex_Unlock(theMutexPtr);
 
516
    }
 
517
    NdbMutex_Lock(theMutexPtr);
 
518
    theTransporterRegistry->update_connections();
 
519
    NdbMutex_Unlock(theMutexPtr);
 
520
  }//while
 
521
  theTransporterRegistry->stopReceiving();
 
522
}
 
523
/*
 
524
  This method is called by worker thread that owns the poll "rights".
 
525
  It waits for events and if something arrives it takes care of it
 
526
  and returns to caller. It will quickly come back here if not all
 
527
  data was received for the worker thread.
 
528
*/
 
529
void TransporterFacade::external_poll(Uint32 wait_time)
 
530
{
 
531
  NdbMutex_Unlock(theMutexPtr);
 
532
  const int res = theTransporterRegistry->pollReceive(wait_time);
 
533
  NdbMutex_Lock(theMutexPtr);
 
534
  if (res > 0) {
 
535
    theTransporterRegistry->performReceive();
 
536
  }
 
537
}
 
538
 
 
539
/*
 
540
  This Ndb object didn't get hold of the poll "right" and will wait on a
 
541
  conditional mutex wait instead. It is put into the conditional wait
 
542
  queue so that it is accessible to take over the poll "right" if needed.
 
543
  The method gets a free entry in the free list and puts it first in the
 
544
  doubly linked list. Finally it assigns the ndb object reference to the
 
545
  entry.
 
546
*/
 
547
Uint32 TransporterFacade::put_in_cond_wait_queue(NdbWaiter *aWaiter)
 
548
{
 
549
  /*
 
550
   Get first free entry
 
551
  */
 
552
  Uint32 index = first_free_cond_wait;
 
553
  assert(index < MAX_NO_THREADS);
 
554
  first_free_cond_wait = cond_wait_array[index].next_cond_wait;
 
555
 
 
556
  /*
 
557
   Put in doubly linked list
 
558
  */
 
559
  cond_wait_array[index].next_cond_wait = MAX_NO_THREADS;
 
560
  cond_wait_array[index].prev_cond_wait = last_in_cond_wait;
 
561
  if (last_in_cond_wait == MAX_NO_THREADS) {
 
562
    first_in_cond_wait = index;
 
563
  } else
 
564
    cond_wait_array[last_in_cond_wait].next_cond_wait = index;
 
565
  last_in_cond_wait = index;
 
566
 
 
567
  cond_wait_array[index].cond_wait_object = aWaiter;
 
568
  aWaiter->set_cond_wait_index(index);
 
569
  return index;
 
570
}
 
571
 
 
572
/*
 
573
  Somebody is about to signal the thread to wake it up, it could also
 
574
  be that it woke up on a timeout and found himself still in the list.
 
575
  Removes the entry from the doubly linked list.
 
576
  Inserts the entry into the free list.
 
577
  NULLifies the ndb object reference entry and sets the index in the
 
578
  Ndb object to NIL (=MAX_NO_THREADS)
 
579
*/
 
580
void TransporterFacade::remove_from_cond_wait_queue(NdbWaiter *aWaiter)
 
581
{
 
582
  Uint32 index = aWaiter->get_cond_wait_index();
 
583
  assert(index < MAX_NO_THREADS &&
 
584
         cond_wait_array[index].cond_wait_object == aWaiter);
 
585
  /*
 
586
   Remove from doubly linked list
 
587
  */
 
588
  Uint32 prev_elem, next_elem;
 
589
  prev_elem = cond_wait_array[index].prev_cond_wait;
 
590
  next_elem = cond_wait_array[index].next_cond_wait;
 
591
  if (prev_elem != MAX_NO_THREADS)
 
592
    cond_wait_array[prev_elem].next_cond_wait = next_elem;
 
593
  else
 
594
    first_in_cond_wait = next_elem;
 
595
  if (next_elem != MAX_NO_THREADS)
 
596
    cond_wait_array[next_elem].prev_cond_wait = prev_elem;
 
597
  else
 
598
    last_in_cond_wait = prev_elem;
 
599
  /*
 
600
   Insert into free list
 
601
  */
 
602
  cond_wait_array[index].next_cond_wait = first_free_cond_wait;
 
603
  cond_wait_array[index].prev_cond_wait = MAX_NO_THREADS;
 
604
  first_free_cond_wait = index;
 
605
 
 
606
  cond_wait_array[index].cond_wait_object = NULL;
 
607
  aWaiter->set_cond_wait_index(MAX_NO_THREADS);
 
608
}
 
609
 
 
610
/*
 
611
  Get the latest Ndb object from the conditional wait queue
 
612
  and also remove it from the list.
 
613
*/
 
614
NdbWaiter* TransporterFacade::rem_last_from_cond_wait_queue()
 
615
{
 
616
  NdbWaiter *tWaiter;
 
617
  Uint32 index = last_in_cond_wait;
 
618
  if (last_in_cond_wait == MAX_NO_THREADS)
 
619
    return NULL;
 
620
  tWaiter = cond_wait_array[index].cond_wait_object;
 
621
  remove_from_cond_wait_queue(tWaiter);
 
622
  return tWaiter;
 
623
}
 
624
 
 
625
void TransporterFacade::init_cond_wait_queue()
 
626
{
 
627
  Uint32 i;
 
628
  /*
 
629
   Initialise the doubly linked list as empty
 
630
  */
 
631
  first_in_cond_wait = MAX_NO_THREADS;
 
632
  last_in_cond_wait = MAX_NO_THREADS;
 
633
  /*
 
634
   Initialise free list
 
635
  */
 
636
  first_free_cond_wait = 0;
 
637
  for (i = 0; i < MAX_NO_THREADS; i++) {
 
638
    cond_wait_array[i].cond_wait_object = NULL;
 
639
    cond_wait_array[i].next_cond_wait = i+1;
 
640
    cond_wait_array[i].prev_cond_wait = MAX_NO_THREADS;
 
641
  }
 
642
}
 
643
 
 
644
TransporterFacade::TransporterFacade() :
 
645
  theTransporterRegistry(0),
 
646
  theStopReceive(0),
 
647
  theSendThread(NULL),
 
648
  theReceiveThread(NULL),
 
649
  m_fragmented_signal_id(0)
 
650
{
 
651
  DBUG_ENTER("TransporterFacade::TransporterFacade");
 
652
  init_cond_wait_queue();
 
653
  poll_owner = NULL;
 
654
  theOwnId = 0;
 
655
  theMutexPtr = NdbMutex_Create();
 
656
  sendPerformedLastInterval = 0;
 
657
 
 
658
  checkCounter = 4;
 
659
  currentSendLimit = 1;
 
660
  theClusterMgr = NULL;
 
661
  theArbitMgr = NULL;
 
662
  theStartNodeId = 1;
 
663
  m_scan_batch_size= MAX_SCAN_BATCH_SIZE;
 
664
  m_batch_byte_size= SCAN_BATCH_SIZE;
 
665
  m_batch_size= DEF_BATCH_SIZE;
 
666
  m_max_trans_id = 0;
 
667
 
 
668
  theClusterMgr = new ClusterMgr(* this);
 
669
 
 
670
#ifdef API_TRACE
 
671
  apiSignalLog = 0;
 
672
#endif
 
673
  DBUG_VOID_RETURN;
 
674
}
 
675
 
 
676
bool
 
677
TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
 
678
{
 
679
  DBUG_ENTER("TransporterFacade::init");
 
680
 
 
681
  theOwnId = nodeId;
 
682
  theTransporterRegistry = new TransporterRegistry(this);
 
683
 
 
684
  const int res = IPCConfig::configureTransporters(nodeId, 
 
685
                                                   * props, 
 
686
                                                   * theTransporterRegistry);
 
687
  if(res <= 0){
 
688
    TRP_DEBUG( "configureTransporters returned 0 or less" );
 
689
    DBUG_RETURN(false);
 
690
  }
 
691
  
 
692
  ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
 
693
  iter.first();
 
694
  theClusterMgr->init(iter);
 
695
  
 
696
  iter.first();
 
697
  if(iter.find(CFG_NODE_ID, nodeId)){
 
698
    TRP_DEBUG( "Node info missing from config." );
 
699
    DBUG_RETURN(false);
 
700
  }
 
701
  
 
702
  Uint32 rank = 0;
 
703
  if(!iter.get(CFG_NODE_ARBIT_RANK, &rank) && rank>0){
 
704
    theArbitMgr = new ArbitMgr(* this);
 
705
    theArbitMgr->setRank(rank);
 
706
    Uint32 delay = 0;
 
707
    iter.get(CFG_NODE_ARBIT_DELAY, &delay);
 
708
    theArbitMgr->setDelay(delay);
 
709
  }
 
710
  Uint32 scan_batch_size= 0;
 
711
  if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
 
712
    m_scan_batch_size= scan_batch_size;
 
713
  }
 
714
  Uint32 batch_byte_size= 0;
 
715
  if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) {
 
716
    m_batch_byte_size= batch_byte_size;
 
717
  }
 
718
  Uint32 batch_size= 0;
 
719
  if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
 
720
    m_batch_size= batch_size;
 
721
  }
 
722
  
 
723
  Uint32 timeout = 120000;
 
724
  iter.first();
 
725
  for (iter.first(); iter.valid(); iter.next())
 
726
  {
 
727
    Uint32 tmp1 = 0, tmp2 = 0;
 
728
    iter.get(CFG_DB_TRANSACTION_CHECK_INTERVAL, &tmp1);
 
729
    iter.get(CFG_DB_TRANSACTION_DEADLOCK_TIMEOUT, &tmp2);
 
730
    tmp1 += tmp2;
 
731
    if (tmp1 > timeout)
 
732
      timeout = tmp1;
 
733
  }
 
734
  m_waitfor_timeout = timeout;
 
735
  
 
736
  if (!theTransporterRegistry->start_service(m_socket_server)){
 
737
    ndbout_c("Unable to start theTransporterRegistry->start_service");
 
738
    DBUG_RETURN(false);
 
739
  }
 
740
 
 
741
  theReceiveThread = NdbThread_Create(runReceiveResponse_C,
 
742
                                      (void**)this,
 
743
                                      32768,
 
744
                                      "ndb_receive",
 
745
                                      NDB_THREAD_PRIO_LOW);
 
746
 
 
747
  theSendThread = NdbThread_Create(runSendRequest_C,
 
748
                                   (void**)this,
 
749
                                   32768,
 
750
                                   "ndb_send",
 
751
                                   NDB_THREAD_PRIO_LOW);
 
752
  theClusterMgr->startThread();
 
753
  
 
754
#ifdef API_TRACE
 
755
  signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
 
756
#endif
 
757
  
 
758
  DBUG_RETURN(true);
 
759
}
 
760
 
 
761
void
 
762
TransporterFacade::for_each(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
 
763
{
 
764
  DBUG_ENTER("TransporterFacade::for_each");
 
765
  Uint32 sz = m_threads.m_statusNext.size();
 
766
  TransporterFacade::ThreadData::Object_Execute oe; 
 
767
  for (Uint32 i = 0; i < sz ; i ++) 
 
768
  {
 
769
    oe = m_threads.m_objectExecute[i];
 
770
    if (m_threads.getInUse(i))
 
771
    {
 
772
      (* oe.m_executeFunction) (oe.m_object, aSignal, ptr);
 
773
    }
 
774
  }
 
775
  DBUG_VOID_RETURN;
 
776
}
 
777
 
 
778
void
 
779
TransporterFacade::connected()
 
780
{
 
781
  DBUG_ENTER("TransporterFacade::connected");
 
782
  Uint32 sz = m_threads.m_statusNext.size();
 
783
  for (Uint32 i = 0; i < sz ; i ++) {
 
784
    if (m_threads.getInUse(i)){
 
785
      void * obj = m_threads.m_objectExecute[i].m_object;
 
786
      NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
 
787
      (*RegPC) (obj, numberToRef(indexToNumber(i), theOwnId), true, true);
 
788
    }
 
789
  }
 
790
  DBUG_VOID_RETURN;
 
791
}
 
792
 
 
793
void
 
794
TransporterFacade::ReportNodeDead(NodeId tNodeId)
 
795
{
 
796
  DBUG_ENTER("TransporterFacade::ReportNodeDead");
 
797
  DBUG_PRINT("enter",("nodeid= %d", tNodeId));
 
798
  /**
 
799
   * When a node fails we must report this to each Ndb object. 
 
800
   * The function that is used for communicating node failures is called.
 
801
   * This is to ensure that the Ndb objects do not think their connections 
 
802
   * are correct after a failure followed by a restart. 
 
803
   * After the restart the node is up again and the Ndb object 
 
804
   * might not have noticed the failure.
 
805
   */
 
806
  Uint32 sz = m_threads.m_statusNext.size();
 
807
  for (Uint32 i = 0; i < sz ; i ++) {
 
808
    if (m_threads.getInUse(i)){
 
809
      void * obj = m_threads.m_objectExecute[i].m_object;
 
810
      NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
 
811
      (*RegPC) (obj, tNodeId, false, false);
 
812
    }
 
813
  }
 
814
  DBUG_VOID_RETURN;
 
815
}
 
816
 
 
817
void
 
818
TransporterFacade::ReportNodeFailureComplete(NodeId tNodeId)
 
819
{
 
820
  /**
 
821
   * When a node fails we must report this to each Ndb object. 
 
822
   * The function that is used for communicating node failures is called.
 
823
   * This is to ensure that the Ndb objects do not think their connections 
 
824
   * are correct after a failure followed by a restart. 
 
825
   * After the restart the node is up again and the Ndb object 
 
826
   * might not have noticed the failure.
 
827
   */
 
828
 
 
829
  DBUG_ENTER("TransporterFacade::ReportNodeFailureComplete");
 
830
  DBUG_PRINT("enter",("nodeid= %d", tNodeId));
 
831
  Uint32 sz = m_threads.m_statusNext.size();
 
832
  for (Uint32 i = 0; i < sz ; i ++) {
 
833
    if (m_threads.getInUse(i)){
 
834
      void * obj = m_threads.m_objectExecute[i].m_object;
 
835
      NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
 
836
      (*RegPC) (obj, tNodeId, false, true);
 
837
    }
 
838
  }
 
839
  DBUG_VOID_RETURN;
 
840
}
 
841
 
 
842
void
 
843
TransporterFacade::ReportNodeAlive(NodeId tNodeId)
 
844
{
 
845
  /**
 
846
   * When a node fails we must report this to each Ndb object. 
 
847
   * The function that is used for communicating node failures is called.
 
848
   * This is to ensure that the Ndb objects do not think there connections 
 
849
   * are correct after a failure
 
850
   * followed by a restart. 
 
851
   * After the restart the node is up again and the Ndb object 
 
852
   * might not have noticed the failure.
 
853
   */
 
854
  Uint32 sz = m_threads.m_statusNext.size();
 
855
  for (Uint32 i = 0; i < sz ; i ++) {
 
856
    if (m_threads.getInUse(i)){
 
857
      void * obj = m_threads.m_objectExecute[i].m_object;
 
858
      NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
 
859
      (*RegPC) (obj, tNodeId, true, false);
 
860
    }
 
861
  }
 
862
}
 
863
 
 
864
int 
 
865
TransporterFacade::close(BlockNumber blockNumber, Uint64 trans_id)
 
866
{
 
867
  NdbMutex_Lock(theMutexPtr);
 
868
  Uint32 low_bits = (Uint32)trans_id;
 
869
  m_max_trans_id = m_max_trans_id > low_bits ? m_max_trans_id : low_bits;
 
870
  close_local(blockNumber);
 
871
  NdbMutex_Unlock(theMutexPtr);
 
872
  return 0;
 
873
}
 
874
 
 
875
int 
 
876
TransporterFacade::close_local(BlockNumber blockNumber){
 
877
  m_threads.close(blockNumber);
 
878
  return 0;
 
879
}
 
880
 
 
881
int
 
882
TransporterFacade::open(void* objRef, 
 
883
                        ExecuteFunction fun, 
 
884
                        NodeStatusFunction statusFun)
 
885
{
 
886
  DBUG_ENTER("TransporterFacade::open");
 
887
  int r= m_threads.open(objRef, fun, statusFun);
 
888
  if (r < 0)
 
889
    DBUG_RETURN(r);
 
890
#if 1
 
891
  if (theOwnId > 0) {
 
892
    (*statusFun)(objRef, numberToRef(r, theOwnId), true, true);
 
893
  }
 
894
#endif
 
895
  DBUG_RETURN(r);
 
896
}
 
897
 
 
898
TransporterFacade::~TransporterFacade()
 
899
{  
 
900
  DBUG_ENTER("TransporterFacade::~TransporterFacade");
 
901
 
 
902
  NdbMutex_Lock(theMutexPtr);
 
903
  delete theClusterMgr;  
 
904
  delete theArbitMgr;
 
905
  delete theTransporterRegistry;
 
906
  NdbMutex_Unlock(theMutexPtr);
 
907
  NdbMutex_Destroy(theMutexPtr);
 
908
#ifdef API_TRACE
 
909
  signalLogger.setOutputStream(0);
 
910
#endif
 
911
  DBUG_VOID_RETURN;
 
912
}
 
913
 
 
914
void 
 
915
TransporterFacade::calculateSendLimit()
 
916
{
 
917
  Uint32 Ti;
 
918
  Uint32 TthreadCount = 0;
 
919
  
 
920
  Uint32 sz = m_threads.m_statusNext.size();
 
921
  for (Ti = 0; Ti < sz; Ti++) {
 
922
    if (m_threads.m_statusNext[Ti] == (ThreadData::ACTIVE)){
 
923
      TthreadCount++;
 
924
      m_threads.m_statusNext[Ti] = ThreadData::INACTIVE;
 
925
    }
 
926
  }
 
927
  currentSendLimit = TthreadCount;
 
928
  if (currentSendLimit == 0) {
 
929
    currentSendLimit = 1;
 
930
  }
 
931
  checkCounter = currentSendLimit << 2;
 
932
}
 
933
 
 
934
 
 
935
//-------------------------------------------------
 
936
// Force sending but still report the sending to the
 
937
// adaptive algorithm.
 
938
//-------------------------------------------------
 
939
void TransporterFacade::forceSend(Uint32 block_number) {
 
940
  checkCounter--;
 
941
  m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
 
942
  sendPerformedLastInterval = 1;
 
943
  if (checkCounter < 0) {
 
944
    calculateSendLimit();
 
945
  }
 
946
  theTransporterRegistry->forceSendCheck(0);
 
947
}
 
948
 
 
949
//-------------------------------------------------
 
950
// Improving API performance
 
951
//-------------------------------------------------
 
952
void
 
953
TransporterFacade::checkForceSend(Uint32 block_number) {  
 
954
  m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
 
955
  //-------------------------------------------------
 
956
  // This code is an adaptive algorithm to discover when
 
957
  // the API should actually send its buffers. The reason
 
958
  // is that the performance is highly dependent on the
 
959
  // size of the writes over the communication network.
 
960
  // Thus we try to ensure that the send size is as big
 
961
  // as possible. At the same time we don't want response
 
962
  // time to increase so therefore we have to keep track of
 
963
  // how the users are performing adaptively.
 
964
  //-------------------------------------------------
 
965
  
 
966
  if (theTransporterRegistry->forceSendCheck(currentSendLimit) == 1) {
 
967
    sendPerformedLastInterval = 1;
 
968
  }
 
969
  checkCounter--;
 
970
  if (checkCounter < 0) {
 
971
    calculateSendLimit();
 
972
  }
 
973
}
 
974
 
 
975
 
 
976
/******************************************************************************
 
977
 * SEND SIGNAL METHODS
 
978
 *****************************************************************************/
 
979
int
 
980
TransporterFacade::sendSignal(NdbApiSignal * aSignal, NodeId aNode){
 
981
  Uint32* tDataPtr = aSignal->getDataPtrSend();
 
982
  Uint32 Tlen = aSignal->theLength;
 
983
  Uint32 TBno = aSignal->theReceiversBlockNumber;
 
984
  if(getIsNodeSendable(aNode) == true){
 
985
#ifdef API_TRACE
 
986
    if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
 
987
      Uint32 tmp = aSignal->theSendersBlockRef;
 
988
      aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
 
989
      LinearSectionPtr ptr[3];
 
990
      signalLogger.sendSignal(* aSignal,
 
991
                              1,
 
992
                              tDataPtr,
 
993
                              aNode, ptr, 0);
 
994
      signalLogger.flushSignalLog();
 
995
      aSignal->theSendersBlockRef = tmp;
 
996
    }
 
997
#endif
 
998
    if ((Tlen != 0) && (Tlen <= 25) && (TBno != 0)) {
 
999
      SendStatus ss = theTransporterRegistry->prepareSend(aSignal, 
 
1000
                                                          1, // JBB
 
1001
                                                          tDataPtr, 
 
1002
                                                          aNode, 
 
1003
                                                          0);
 
1004
      //if (ss != SEND_OK) ndbout << ss << endl;
 
1005
      return (ss == SEND_OK ? 0 : -1);
 
1006
    } else {
 
1007
      ndbout << "ERR: SigLen = " << Tlen << " BlockRec = " << TBno;
 
1008
      ndbout << " SignalNo = " << aSignal->theVerId_signalNumber << endl;
 
1009
      assert(0);
 
1010
    }//if
 
1011
  }
 
1012
  //const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(aNode);
 
1013
  //const Uint32 startLevel = node.m_state.startLevel;
 
1014
  return -1; // Node Dead
 
1015
}
 
1016
 
 
1017
int
 
1018
TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){
 
1019
  Uint32* tDataPtr = aSignal->getDataPtrSend();
 
1020
#ifdef API_TRACE
 
1021
  if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
 
1022
    Uint32 tmp = aSignal->theSendersBlockRef;
 
1023
    aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
 
1024
    LinearSectionPtr ptr[3];
 
1025
    signalLogger.sendSignal(* aSignal,
 
1026
                            0,
 
1027
                            tDataPtr,
 
1028
                            aNode, ptr, 0);
 
1029
    signalLogger.flushSignalLog();
 
1030
    aSignal->theSendersBlockRef = tmp;
 
1031
  }
 
1032
#endif
 
1033
  assert((aSignal->theLength != 0) &&
 
1034
         (aSignal->theLength <= 25) &&
 
1035
         (aSignal->theReceiversBlockNumber != 0));
 
1036
  SendStatus ss = theTransporterRegistry->prepareSend(aSignal, 
 
1037
                                                      0, 
 
1038
                                                      tDataPtr,
 
1039
                                                      aNode, 
 
1040
                                                      0);
 
1041
  
 
1042
  return (ss == SEND_OK ? 0 : -1);
 
1043
}
 
1044
 
 
1045
#define CHUNK_SZ NDB_SECTION_SEGMENT_SZ*64 // related to MAX_MESSAGE_SIZE
 
1046
int
 
1047
TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, 
 
1048
                                        LinearSectionPtr ptr[3], Uint32 secs)
 
1049
{
 
1050
  if(getIsNodeSendable(aNode) != true)
 
1051
    return -1;
 
1052
 
 
1053
#ifdef API_TRACE
 
1054
  if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
 
1055
    Uint32 tmp = aSignal->theSendersBlockRef;
 
1056
    aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
 
1057
    signalLogger.sendSignal(* aSignal,
 
1058
                            1,
 
1059
                            aSignal->getDataPtrSend(),
 
1060
                            aNode,
 
1061
                            ptr, secs);
 
1062
    aSignal->theSendersBlockRef = tmp;
 
1063
  }
 
1064
#endif
 
1065
 
 
1066
  NdbApiSignal tmp_signal(*(SignalHeader*)aSignal);
 
1067
  LinearSectionPtr tmp_ptr[3];
 
1068
  Uint32 unique_id= m_fragmented_signal_id++; // next unique id
 
1069
  unsigned i;
 
1070
  for (i= 0; i < secs; i++)
 
1071
    tmp_ptr[i]= ptr[i];
 
1072
 
 
1073
  unsigned start_i= 0;
 
1074
  unsigned chunk_sz= 0;
 
1075
  unsigned fragment_info= 0;
 
1076
  Uint32 *tmp_data= tmp_signal.getDataPtrSend();
 
1077
  for (i= 0; i < secs;) {
 
1078
    unsigned save_sz= tmp_ptr[i].sz;
 
1079
    tmp_data[i-start_i]= i;
 
1080
    if (chunk_sz + save_sz > CHUNK_SZ) {
 
1081
      // truncate
 
1082
      unsigned send_sz= CHUNK_SZ - chunk_sz;
 
1083
      if (i != start_i) // first piece of a new section has to be a multiple of NDB_SECTION_SEGMENT_SZ
 
1084
      {
 
1085
        send_sz=
 
1086
          NDB_SECTION_SEGMENT_SZ
 
1087
          *(send_sz+NDB_SECTION_SEGMENT_SZ-1)
 
1088
          /NDB_SECTION_SEGMENT_SZ;
 
1089
        if (send_sz > save_sz)
 
1090
          send_sz= save_sz;
 
1091
      }
 
1092
      tmp_ptr[i].sz= send_sz;
 
1093
      
 
1094
      if (fragment_info < 2) // 1 = first fragment, 2 = middle fragments
 
1095
        fragment_info++;
 
1096
 
 
1097
      // send tmp_signal
 
1098
      tmp_data[i-start_i+1]= unique_id;
 
1099
      tmp_signal.setLength(i-start_i+2);
 
1100
      tmp_signal.m_fragmentInfo= fragment_info;
 
1101
      tmp_signal.m_noOfSections= i-start_i+1;
 
1102
      // do prepare send
 
1103
      {
 
1104
        SendStatus ss = theTransporterRegistry->prepareSend
 
1105
          (&tmp_signal, 
 
1106
           1, /*JBB*/
 
1107
           tmp_data,
 
1108
           aNode, 
 
1109
           &tmp_ptr[start_i]);
 
1110
        assert(ss != SEND_MESSAGE_TOO_BIG);
 
1111
        if (ss != SEND_OK) return -1;
 
1112
      }
 
1113
      // setup variables for next signal
 
1114
      start_i= i;
 
1115
      chunk_sz= 0;
 
1116
      tmp_ptr[i].sz= save_sz-send_sz;
 
1117
      tmp_ptr[i].p+= send_sz;
 
1118
      if (tmp_ptr[i].sz == 0)
 
1119
        i++;
 
1120
    }
 
1121
    else
 
1122
    {
 
1123
      chunk_sz+=save_sz;
 
1124
      i++;
 
1125
    }
 
1126
  }
 
1127
 
 
1128
  unsigned a_sz= aSignal->getLength();
 
1129
 
 
1130
  if (fragment_info > 0) {
 
1131
    // update the original signal to include section info
 
1132
    Uint32 *a_data= aSignal->getDataPtrSend();
 
1133
    unsigned tmp_sz= i-start_i;
 
1134
    memcpy(a_data+a_sz,
 
1135
           tmp_data,
 
1136
           tmp_sz*sizeof(Uint32));
 
1137
    a_data[a_sz+tmp_sz]= unique_id;
 
1138
    aSignal->setLength(a_sz+tmp_sz+1);
 
1139
 
 
1140
    // send last fragment
 
1141
    aSignal->m_fragmentInfo= 3; // 3 = last fragment
 
1142
    aSignal->m_noOfSections= i-start_i;
 
1143
  } else {
 
1144
    aSignal->m_noOfSections= secs;
 
1145
  }
 
1146
 
 
1147
  // send aSignal
 
1148
  int ret;
 
1149
  {
 
1150
    SendStatus ss = theTransporterRegistry->prepareSend
 
1151
      (aSignal,
 
1152
       1/*JBB*/,
 
1153
       aSignal->getDataPtrSend(),
 
1154
       aNode,
 
1155
       &tmp_ptr[start_i]);
 
1156
    assert(ss != SEND_MESSAGE_TOO_BIG);
 
1157
    ret = (ss == SEND_OK ? 0 : -1);
 
1158
  }
 
1159
  aSignal->m_noOfSections = 0;
 
1160
  aSignal->m_fragmentInfo = 0;
 
1161
  aSignal->setLength(a_sz);
 
1162
  return ret;
 
1163
}
 
1164
 
 
1165
int
 
1166
TransporterFacade::sendSignal(NdbApiSignal* aSignal, NodeId aNode, 
 
1167
                              LinearSectionPtr ptr[3], Uint32 secs){
 
1168
  aSignal->m_noOfSections = secs;
 
1169
  if(getIsNodeSendable(aNode) == true){
 
1170
#ifdef API_TRACE
 
1171
    if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
 
1172
      Uint32 tmp = aSignal->theSendersBlockRef;
 
1173
      aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
 
1174
      signalLogger.sendSignal(* aSignal,
 
1175
                              1,
 
1176
                              aSignal->getDataPtrSend(),
 
1177
                              aNode,
 
1178
                              ptr, secs);
 
1179
      signalLogger.flushSignalLog();
 
1180
      aSignal->theSendersBlockRef = tmp;
 
1181
    }
 
1182
#endif
 
1183
    SendStatus ss = theTransporterRegistry->prepareSend
 
1184
      (aSignal, 
 
1185
       1, // JBB
 
1186
       aSignal->getDataPtrSend(),
 
1187
       aNode, 
 
1188
       ptr);
 
1189
    assert(ss != SEND_MESSAGE_TOO_BIG);
 
1190
    aSignal->m_noOfSections = 0;
 
1191
    return (ss == SEND_OK ? 0 : -1);
 
1192
  }
 
1193
  aSignal->m_noOfSections = 0;
 
1194
  return -1;
 
1195
}
 
1196
 
 
1197
/******************************************************************************
 
1198
 * CONNECTION METHODS  Etc
 
1199
 ******************************************************************************/
 
1200
 
 
1201
void
 
1202
TransporterFacade::doConnect(int aNodeId){
 
1203
  theTransporterRegistry->setIOState(aNodeId, NoHalt);
 
1204
  theTransporterRegistry->do_connect(aNodeId);
 
1205
}
 
1206
 
 
1207
void
 
1208
TransporterFacade::doDisconnect(int aNodeId)
 
1209
{
 
1210
  theTransporterRegistry->do_disconnect(aNodeId);
 
1211
}
 
1212
 
 
1213
void
 
1214
TransporterFacade::reportConnected(int aNodeId)
 
1215
{
 
1216
  theClusterMgr->reportConnected(aNodeId);
 
1217
  return;
 
1218
}
 
1219
 
 
1220
void
 
1221
TransporterFacade::reportDisconnected(int aNodeId)
 
1222
{
 
1223
  theClusterMgr->reportDisconnected(aNodeId);
 
1224
  return;
 
1225
}
 
1226
 
 
1227
NodeId
 
1228
TransporterFacade::ownId() const
 
1229
{
 
1230
  return theOwnId;
 
1231
}
 
1232
 
 
1233
bool
 
1234
TransporterFacade::isConnected(NodeId aNodeId){
 
1235
  return theTransporterRegistry->is_connected(aNodeId);
 
1236
}
 
1237
 
 
1238
NodeId
 
1239
TransporterFacade::get_an_alive_node()
 
1240
{
 
1241
  DBUG_ENTER("TransporterFacade::get_an_alive_node");
 
1242
  DBUG_PRINT("enter", ("theStartNodeId: %d", theStartNodeId));
 
1243
#ifdef VM_TRACE
 
1244
  const char* p = NdbEnv_GetEnv("NDB_ALIVE_NODE_ID", (char*)0, 0);
 
1245
  if (p != 0 && *p != 0)
 
1246
    return atoi(p);
 
1247
#endif
 
1248
  NodeId i;
 
1249
  for (i = theStartNodeId; i < MAX_NDB_NODES; i++) {
 
1250
    if (get_node_alive(i)){
 
1251
      DBUG_PRINT("info", ("Node %d is alive", i));
 
1252
      theStartNodeId = ((i + 1) % MAX_NDB_NODES);
 
1253
      DBUG_RETURN(i);
 
1254
    }
 
1255
  }
 
1256
  for (i = 1; i < theStartNodeId; i++) {
 
1257
    if (get_node_alive(i)){
 
1258
      DBUG_PRINT("info", ("Node %d is alive", i));
 
1259
      theStartNodeId = ((i + 1) % MAX_NDB_NODES);
 
1260
      DBUG_RETURN(i);
 
1261
    }
 
1262
  }
 
1263
  DBUG_RETURN((NodeId)0);
 
1264
}
 
1265
 
 
1266
TransporterFacade::ThreadData::ThreadData(Uint32 size){
 
1267
  m_use_cnt = 0;
 
1268
  m_firstFree = END_OF_LIST;
 
1269
  expand(size);
 
1270
}
 
1271
 
 
1272
void
 
1273
TransporterFacade::ThreadData::expand(Uint32 size){
 
1274
  Object_Execute oe = { 0 ,0 };
 
1275
  NodeStatusFunction fun = 0;
 
1276
 
 
1277
  const Uint32 sz = m_statusNext.size();
 
1278
  m_objectExecute.fill(sz + size, oe);
 
1279
  m_statusFunction.fill(sz + size, fun);
 
1280
  for(Uint32 i = 0; i<size; i++){
 
1281
    m_statusNext.push_back(sz + i + 1);
 
1282
  }
 
1283
 
 
1284
  m_statusNext.back() = m_firstFree;
 
1285
  m_firstFree = m_statusNext.size() - size;
 
1286
}
 
1287
 
 
1288
 
 
1289
int
 
1290
TransporterFacade::ThreadData::open(void* objRef, 
 
1291
                                    ExecuteFunction fun, 
 
1292
                                    NodeStatusFunction fun2)
 
1293
{
 
1294
  Uint32 nextFree = m_firstFree;
 
1295
 
 
1296
  if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){
 
1297
    return -1;
 
1298
  }
 
1299
  
 
1300
  if(nextFree == END_OF_LIST){
 
1301
    expand(10);
 
1302
    nextFree = m_firstFree;
 
1303
  }
 
1304
  
 
1305
  m_use_cnt++;
 
1306
  m_firstFree = m_statusNext[nextFree];
 
1307
  
 
1308
  Object_Execute oe = { objRef , fun };
 
1309
 
 
1310
  m_statusNext[nextFree] = INACTIVE;
 
1311
  m_objectExecute[nextFree] = oe;
 
1312
  m_statusFunction[nextFree] = fun2;
 
1313
 
 
1314
  return indexToNumber(nextFree);
 
1315
}
 
1316
 
 
1317
int
 
1318
TransporterFacade::ThreadData::close(int number){
 
1319
  number= numberToIndex(number);
 
1320
  assert(getInUse(number));
 
1321
  m_statusNext[number] = m_firstFree;
 
1322
  assert(m_use_cnt);
 
1323
  m_use_cnt--;
 
1324
  m_firstFree = number;
 
1325
  Object_Execute oe = { 0, 0 };
 
1326
  m_objectExecute[number] = oe;
 
1327
  m_statusFunction[number] = 0;
 
1328
  return 0;
 
1329
}
 
1330
 
 
1331
Uint32
 
1332
TransporterFacade::get_active_ndb_objects() const
 
1333
{
 
1334
  return m_threads.m_use_cnt;
 
1335
}
 
1336
 
 
1337
PollGuard::PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter,
 
1338
                     Uint32 block_no)
 
1339
{
 
1340
  m_tp= tp;
 
1341
  m_waiter= aWaiter;
 
1342
  m_locked= true;
 
1343
  m_block_no= block_no;
 
1344
  tp->lock_mutex();
 
1345
}
 
1346
 
 
1347
/*
 
1348
  This is a common routine for possibly forcing the send of buffered signals
 
1349
  and receiving response the thread is waiting for. It is designed to be
 
1350
  useful from:
 
1351
  1) PK, UK lookups using the asynchronous interface
 
1352
     This routine uses the wait_for_input routine instead since it has
 
1353
     special end conditions due to the asynchronous nature of its usage.
 
1354
  2) Scans
 
1355
  3) dictSignal
 
1356
  It uses a NdbWaiter object to wait on the events and this object is
 
1357
  linked into the conditional wait queue. Thus this object contains
 
1358
  a reference to its place in the queue.
 
1359
 
 
1360
  It replaces the method receiveResponse previously used on the Ndb object
 
1361
*/
 
1362
int PollGuard::wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state,
 
1363
                             bool forceSend)
 
1364
{
 
1365
  int ret_val;
 
1366
  m_waiter->set_node(nodeId);
 
1367
  m_waiter->set_state(state);
 
1368
  ret_val= wait_for_input_in_loop(wait_time, forceSend);
 
1369
  unlock_and_signal();
 
1370
  return ret_val;
 
1371
}
 
1372
 
 
1373
int PollGuard::wait_scan(int wait_time, NodeId nodeId, bool forceSend)
 
1374
{
 
1375
  m_waiter->set_node(nodeId);
 
1376
  m_waiter->set_state(WAIT_SCAN);
 
1377
  return wait_for_input_in_loop(wait_time, forceSend);
 
1378
}
 
1379
 
 
1380
int PollGuard::wait_for_input_in_loop(int wait_time, bool forceSend)
 
1381
{
 
1382
  int ret_val;
 
1383
  if (forceSend)
 
1384
    m_tp->forceSend(m_block_no);
 
1385
  else
 
1386
    m_tp->checkForceSend(m_block_no);
 
1387
 
 
1388
  NDB_TICKS curr_time = NdbTick_CurrentMillisecond();
 
1389
  NDB_TICKS max_time = curr_time + (NDB_TICKS)wait_time;
 
1390
  const int maxsleep = (wait_time == -1 || wait_time > 10) ? 10 : wait_time;
 
1391
  do
 
1392
  {
 
1393
    wait_for_input(maxsleep);
 
1394
    Uint32 state= m_waiter->get_state();
 
1395
    if (state == NO_WAIT)
 
1396
    {
 
1397
      return 0;
 
1398
    }
 
1399
    else if (state == WAIT_NODE_FAILURE)
 
1400
    {
 
1401
      ret_val= -2;
 
1402
      break;
 
1403
    }
 
1404
    if (wait_time == -1)
 
1405
    {
 
1406
#ifdef NOT_USED
 
1407
      ndbout << "Waited WAITFOR_RESPONSE_TIMEOUT, continuing wait" << endl;
 
1408
#endif
 
1409
      continue;
 
1410
    }
 
1411
    wait_time= max_time - NdbTick_CurrentMillisecond();
 
1412
    if (wait_time <= 0)
 
1413
    {
 
1414
#ifdef VM_TRACE
 
1415
      ndbout << "Time-out state is " << m_waiter->get_state() << endl;
 
1416
#endif
 
1417
      m_waiter->set_state(WST_WAIT_TIMEOUT);
 
1418
      ret_val= -1;
 
1419
      break;
 
1420
    }
 
1421
  } while (1);
 
1422
#ifdef VM_TRACE
 
1423
  ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = ";
 
1424
  ndbout << m_waiter->get_state() << endl;
 
1425
#endif
 
1426
  m_waiter->set_state(NO_WAIT);
 
1427
  return ret_val;
 
1428
}
 
1429
 
 
1430
void PollGuard::wait_for_input(int wait_time)
 
1431
{
 
1432
  NdbWaiter *t_poll_owner= m_tp->get_poll_owner();
 
1433
  if (t_poll_owner != NULL && t_poll_owner != m_waiter)
 
1434
  {
 
1435
    /*
 
1436
      We didn't get hold of the poll "right". We will sleep on a
 
1437
      conditional mutex until the thread owning the poll "right"
 
1438
      will wake us up after all data is received. If no data arrives
 
1439
      we will wake up eventually due to the timeout.
 
1440
      After receiving all data we take the object out of the cond wait
 
1441
      queue if it hasn't happened already. It is usually already out of the
 
1442
      queue but at time-out it could be that the object is still there.
 
1443
    */
 
1444
    (void) m_tp->put_in_cond_wait_queue(m_waiter);
 
1445
    m_waiter->wait(wait_time);
 
1446
    if (m_waiter->get_cond_wait_index() != TransporterFacade::MAX_NO_THREADS)
 
1447
    {
 
1448
      m_tp->remove_from_cond_wait_queue(m_waiter);
 
1449
    }
 
1450
  }
 
1451
  else
 
1452
  {
 
1453
    /*
 
1454
      We got the poll "right" and we poll until data is received. After
 
1455
      receiving data we will check if all data is received, if not we
 
1456
      poll again.
 
1457
    */
 
1458
#ifdef NDB_SHM_TRANSPORTER
 
1459
    /*
 
1460
      If shared memory transporters are used we need to set our sigmask
 
1461
      such that we wake up also on interrupts on the shared memory
 
1462
      interrupt signal.
 
1463
    */
 
1464
    NdbThread_set_shm_sigmask(FALSE);
 
1465
#endif
 
1466
    m_tp->set_poll_owner(m_waiter);
 
1467
    m_waiter->set_poll_owner(true);
 
1468
    m_tp->external_poll((Uint32)wait_time);
 
1469
  }
 
1470
}
 
1471
 
 
1472
void PollGuard::unlock_and_signal()
 
1473
{
 
1474
  NdbWaiter *t_signal_cond_waiter= 0;
 
1475
  if (!m_locked)
 
1476
    return;
 
1477
  /*
 
1478
   When completing the poll for this thread we must return the poll
 
1479
   ownership if we own it. We will give it to the last thread that
 
1480
   came here (the most recent) which is likely to be the one also
 
1481
   last to complete. We will remove that thread from the conditional
 
1482
   wait queue and set him as the new owner of the poll "right".
 
1483
   We will wait however with the signal until we have unlocked the
 
1484
   mutex for performance reasons.
 
1485
   See Stevens book on Unix NetworkProgramming: The Sockets Networking
 
1486
   API Volume 1 Third Edition on page 703-704 for a discussion on this
 
1487
   subject.
 
1488
  */
 
1489
  if (m_tp->get_poll_owner() == m_waiter)
 
1490
  {
 
1491
#ifdef NDB_SHM_TRANSPORTER
 
1492
    /*
 
1493
      If shared memory transporters are used we need to reset our sigmask
 
1494
      since we are no longer the thread to receive interrupts.
 
1495
    */
 
1496
    NdbThread_set_shm_sigmask(TRUE);
 
1497
#endif
 
1498
    m_waiter->set_poll_owner(false);
 
1499
    t_signal_cond_waiter= m_tp->rem_last_from_cond_wait_queue();
 
1500
    m_tp->set_poll_owner(t_signal_cond_waiter);
 
1501
    if (t_signal_cond_waiter)
 
1502
      t_signal_cond_waiter->set_poll_owner(true);
 
1503
  }
 
1504
  if (t_signal_cond_waiter)
 
1505
    t_signal_cond_waiter->cond_signal();
 
1506
  m_tp->unlock_mutex();
 
1507
  m_locked=false;
 
1508
}
 
1509
 
 
1510
template class Vector<NodeStatusFunction>;
 
1511
template class Vector<TransporterFacade::ThreadData::Object_Execute>;
 
1512
 
 
1513
#include "SignalSender.hpp"
 
1514
 
 
1515
SendStatus
 
1516
SignalSender::sendSignal(Uint16 nodeId, const SimpleSignal * s){
 
1517
#ifdef API_TRACE
 
1518
  if(setSignalLog() && TRACE_GSN(s->header.theVerId_signalNumber)){
 
1519
    SignalHeader tmp = s->header;
 
1520
    tmp.theSendersBlockRef = getOwnRef();
 
1521
 
 
1522
    LinearSectionPtr ptr[3];
 
1523
    signalLogger.sendSignal(tmp,
 
1524
                            1,
 
1525
                            s->theData,
 
1526
                            nodeId, ptr, 0);
 
1527
    signalLogger.flushSignalLog();
 
1528
  }
 
1529
#endif
 
1530
  assert(getNodeInfo(nodeId).m_api_reg_conf == true ||
 
1531
         s->readSignalNumber() == GSN_API_REGREQ);
 
1532
  return theFacade->theTransporterRegistry->prepareSend(&s->header,
 
1533
                                                        1, // JBB
 
1534
                                                        &s->theData[0],
 
1535
                                                        nodeId, 
 
1536
                                                        &s->ptr[0]);
 
1537
}