~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/common/transporter/TCP_Transporter.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <ndb_global.h>
 
17
 
 
18
#include <NdbTCP.h>
 
19
#include "TCP_Transporter.hpp"
 
20
#include <NdbOut.hpp>
 
21
#include <NdbSleep.h>
 
22
 
 
23
#include <EventLogger.hpp>
 
24
extern EventLogger g_eventLogger;
 
25
// End of stuff to be moved
 
26
 
 
27
#ifdef NDB_WIN32
 
28
class ndbstrerror
 
29
{
 
30
public:
 
31
  ndbstrerror(int iError);
 
32
  ~ndbstrerror(void);
 
33
  operator char*(void) { return m_szError; };
 
34
 
 
35
private:
 
36
  int m_iError;
 
37
  char* m_szError;
 
38
};
 
39
 
 
40
ndbstrerror::ndbstrerror(int iError)
 
41
: m_iError(iError)
 
42
{
 
43
  FormatMessage( 
 
44
    FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
 
45
    0,
 
46
    iError,
 
47
    MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
 
48
    (LPTSTR)&m_szError,
 
49
    0,
 
50
    0);
 
51
}
 
52
 
 
53
ndbstrerror::~ndbstrerror(void)
 
54
{
 
55
  LocalFree( m_szError );
 
56
  m_szError = 0;
 
57
}
 
58
#else
 
59
#define ndbstrerror strerror
 
60
#endif
 
61
 
 
62
TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
 
63
                                 int sendBufSize, int maxRecvSize, 
 
64
                                 const char *lHostName,
 
65
                                 const char *rHostName, 
 
66
                                 int r_port,
 
67
                                 bool isMgmConnection_arg,
 
68
                                 NodeId lNodeId,
 
69
                                 NodeId rNodeId,
 
70
                                 NodeId serverNodeId,
 
71
                                 bool chksm, bool signalId,
 
72
                                 Uint32 _reportFreq) :
 
73
  Transporter(t_reg, tt_TCP_TRANSPORTER,
 
74
              lHostName, rHostName, r_port, isMgmConnection_arg,
 
75
              lNodeId, rNodeId, serverNodeId,
 
76
              0, false, chksm, signalId),
 
77
  m_sendBuffer(sendBufSize)
 
78
{
 
79
  maxReceiveSize = maxRecvSize;
 
80
  
 
81
  // Initialize member variables
 
82
  theSocket     = NDB_INVALID_SOCKET;
 
83
  
 
84
  sendCount      = receiveCount = 0;
 
85
  sendSize       = receiveSize  = 0;
 
86
  reportFreq     = _reportFreq;
 
87
 
 
88
  sockOptRcvBufSize = 70080;
 
89
  sockOptSndBufSize = 71540;
 
90
  sockOptNodelay    = 1;
 
91
  sockOptTcpMaxSeg  = 4096;
 
92
}
 
93
 
 
94
TCP_Transporter::~TCP_Transporter() {
 
95
  
 
96
  // Disconnect
 
97
  if (theSocket != NDB_INVALID_SOCKET)
 
98
    doDisconnect();
 
99
  
 
100
  // Delete send buffers
 
101
  
 
102
  // Delete receive buffer!!
 
103
  receiveBuffer.destroy();
 
104
}
 
105
 
 
106
bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
 
107
{
 
108
  DBUG_ENTER("TCP_Transpporter::connect_server_impl");
 
109
  DBUG_RETURN(connect_common(sockfd));
 
110
}
 
111
 
 
112
bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
 
113
{
 
114
  DBUG_ENTER("TCP_Transpporter::connect_client_impl");
 
115
  DBUG_RETURN(connect_common(sockfd));
 
116
}
 
117
 
 
118
bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
 
119
{
 
120
  theSocket = sockfd;
 
121
  setSocketOptions();
 
122
  setSocketNonBlocking(theSocket);
 
123
  DBUG_PRINT("info", ("Successfully set-up TCP transporter to node %d",
 
124
              remoteNodeId));
 
125
  return true;
 
126
}
 
127
 
 
128
bool
 
129
TCP_Transporter::initTransporter() {
 
130
  
 
131
  // Allocate buffer for receiving
 
132
  // Let it be the maximum size we receive plus 8 kB for any earlier received
 
133
  // incomplete messages (slack)
 
134
  Uint32 recBufSize = maxReceiveSize;
 
135
  if(recBufSize < MAX_MESSAGE_SIZE){
 
136
    recBufSize = MAX_MESSAGE_SIZE;
 
137
  }
 
138
  
 
139
  if(!receiveBuffer.init(recBufSize+MAX_MESSAGE_SIZE)){
 
140
    return false;
 
141
  }
 
142
  
 
143
  // Allocate buffers for sending
 
144
  if (!m_sendBuffer.initBuffer(remoteNodeId)) {
 
145
    // XXX What shall be done here? 
 
146
    // The same is valid for the other init-methods 
 
147
    return false;
 
148
  }
 
149
  
 
150
  return true;
 
151
}
 
152
 
 
153
void
 
154
TCP_Transporter::setSocketOptions(){
 
155
  int sockOptKeepAlive  = 1;
 
156
 
 
157
  if (setsockopt(theSocket, SOL_SOCKET, SO_RCVBUF,
 
158
                 (char*)&sockOptRcvBufSize, sizeof(sockOptRcvBufSize)) < 0) {
 
159
#ifdef DEBUG_TRANSPORTER
 
160
    g_eventLogger.error("The setsockopt SO_RCVBUF error code = %d", InetErrno);
 
161
#endif
 
162
  }//if
 
163
  
 
164
  if (setsockopt(theSocket, SOL_SOCKET, SO_SNDBUF,
 
165
                 (char*)&sockOptSndBufSize, sizeof(sockOptSndBufSize)) < 0) {
 
166
#ifdef DEBUG_TRANSPORTER
 
167
    g_eventLogger.error("The setsockopt SO_SNDBUF error code = %d", InetErrno);
 
168
#endif
 
169
  }//if
 
170
  
 
171
  if (setsockopt(theSocket, SOL_SOCKET, SO_KEEPALIVE,
 
172
                 (char*)&sockOptKeepAlive, sizeof(sockOptKeepAlive)) < 0) {
 
173
    ndbout_c("The setsockopt SO_KEEPALIVE error code = %d", InetErrno);
 
174
  }//if
 
175
 
 
176
  //-----------------------------------------------
 
177
  // Set the TCP_NODELAY option so also small packets are sent
 
178
  // as soon as possible
 
179
  //-----------------------------------------------
 
180
  if (setsockopt(theSocket, IPPROTO_TCP, TCP_NODELAY, 
 
181
                 (char*)&sockOptNodelay, sizeof(sockOptNodelay)) < 0) {
 
182
#ifdef DEBUG_TRANSPORTER
 
183
    g_eventLogger.error("The setsockopt TCP_NODELAY error code = %d", InetErrno);
 
184
#endif
 
185
  }//if
 
186
}
 
187
 
 
188
 
 
189
#ifdef NDB_WIN32
 
190
 
 
191
bool
 
192
TCP_Transporter::setSocketNonBlocking(NDB_SOCKET_TYPE socket){
 
193
  unsigned long  ul = 1;
 
194
  if(ioctlsocket(socket, FIONBIO, &ul))
 
195
  {
 
196
#ifdef DEBUG_TRANSPORTER
 
197
    g_eventLogger.error("Set non-blocking server error3: %d", InetErrno);
 
198
#endif
 
199
  }//if
 
200
  return true;
 
201
}
 
202
 
 
203
#else
 
204
 
 
205
bool
 
206
TCP_Transporter::setSocketNonBlocking(NDB_SOCKET_TYPE socket){
 
207
  int flags;
 
208
  flags = fcntl(socket, F_GETFL, 0);
 
209
  if (flags < 0) {
 
210
#ifdef DEBUG_TRANSPORTER
 
211
    g_eventLogger.error("Set non-blocking server error1: %s", strerror(InetErrno));
 
212
#endif
 
213
  }//if
 
214
  flags |= NDB_NONBLOCK;
 
215
  if (fcntl(socket, F_SETFL, flags) == -1) {
 
216
#ifdef DEBUG_TRANSPORTER
 
217
    g_eventLogger.error("Set non-blocking server error2: %s", strerror(InetErrno));
 
218
#endif
 
219
  }//if
 
220
  return true;
 
221
}
 
222
 
 
223
#endif
 
224
 
 
225
bool
 
226
TCP_Transporter::sendIsPossible(struct timeval * timeout) {
 
227
  if(theSocket != NDB_INVALID_SOCKET){
 
228
    fd_set   writeset;
 
229
    FD_ZERO(&writeset);
 
230
    FD_SET(theSocket, &writeset);
 
231
    
 
232
    int selectReply = select(theSocket + 1, NULL, &writeset, NULL, timeout);
 
233
 
 
234
    if ((selectReply > 0) && FD_ISSET(theSocket, &writeset)) 
 
235
      return true;
 
236
    else
 
237
      return false;
 
238
  }
 
239
  return false;
 
240
}
 
241
 
 
242
Uint32
 
243
TCP_Transporter::get_free_buffer() const 
 
244
{
 
245
  return m_sendBuffer.bufferSizeRemaining();
 
246
}
 
247
 
 
248
Uint32 *
 
249
TCP_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){
 
250
  
 
251
  Uint32 * insertPtr = m_sendBuffer.getInsertPtr(lenBytes);
 
252
  
 
253
  struct timeval timeout = {0, 10000};
 
254
 
 
255
  if (insertPtr == 0) {
 
256
    //-------------------------------------------------
 
257
    // Buffer was completely full. We have severe problems.
 
258
    // We will attempt to wait for a small time
 
259
    //-------------------------------------------------
 
260
    if(sendIsPossible(&timeout)) {
 
261
      //-------------------------------------------------
 
262
      // Send is possible after the small timeout.
 
263
      //-------------------------------------------------
 
264
      if(!doSend()){
 
265
        return 0;
 
266
      } else {
 
267
        //-------------------------------------------------
 
268
        // Since send was successful we will make a renewed
 
269
        // attempt at inserting the signal into the buffer.
 
270
        //-------------------------------------------------
 
271
        insertPtr = m_sendBuffer.getInsertPtr(lenBytes);
 
272
      }//if
 
273
    } else {
 
274
      return 0;
 
275
    }//if
 
276
  }
 
277
  return insertPtr;
 
278
}
 
279
 
 
280
void
 
281
TCP_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){
 
282
  m_sendBuffer.updateInsertPtr(lenBytes);
 
283
  
 
284
  const int bufsize = m_sendBuffer.bufferSize();
 
285
  if(bufsize > TCP_SEND_LIMIT) {
 
286
    //-------------------------------------------------
 
287
    // Buffer is full and we are ready to send. We will
 
288
    // not wait since the signal is already in the buffer.
 
289
    // Force flag set has the same indication that we
 
290
    // should always send. If it is not possible to send
 
291
    // we will not worry since we will soon be back for
 
292
    // a renewed trial.
 
293
    //-------------------------------------------------
 
294
    struct timeval no_timeout = {0,0};
 
295
    if(sendIsPossible(&no_timeout)) {
 
296
      //-------------------------------------------------
 
297
      // Send was possible, attempt at a send.
 
298
      //-------------------------------------------------
 
299
      doSend();
 
300
    }//if
 
301
  }
 
302
}
 
303
 
 
304
#define DISCONNECT_ERRNO(e, sz) ((sz == 0) || \
 
305
               (!((sz == -1) && (e == EAGAIN) || (e == EWOULDBLOCK) || (e == EINTR))))
 
306
 
 
307
 
 
308
bool
 
309
TCP_Transporter::doSend() {
 
310
  // If no sendbuffers are used nothing is done
 
311
  // Sends the contents of the SendBuffers until they are empty
 
312
  // or until select does not select the socket for write.
 
313
  // Before calling send, the socket must be selected for write
 
314
  // using "select"
 
315
  // It writes on the external TCP/IP interface until the send buffer is empty
 
316
  // and as long as write is possible (test it using select)
 
317
 
 
318
  // Empty the SendBuffers
 
319
  
 
320
  bool sent_any = true;
 
321
  while (m_sendBuffer.dataSize > 0)
 
322
  {
 
323
    const char * const sendPtr = m_sendBuffer.sendPtr;
 
324
    const Uint32 sizeToSend    = m_sendBuffer.sendDataSize;
 
325
    const int nBytesSent = send(theSocket, sendPtr, sizeToSend, 0);
 
326
    
 
327
    if (nBytesSent > 0) 
 
328
    {
 
329
      sent_any = true;
 
330
      m_sendBuffer.bytesSent(nBytesSent);
 
331
      
 
332
      sendCount ++;
 
333
      sendSize  += nBytesSent;
 
334
      if(sendCount == reportFreq)
 
335
      {
 
336
        reportSendLen(get_callback_obj(), remoteNodeId, sendCount, sendSize);
 
337
        sendCount = 0;
 
338
        sendSize  = 0;
 
339
      }
 
340
    } 
 
341
    else 
 
342
    {
 
343
      if (nBytesSent < 0 && InetErrno == EAGAIN && sent_any)
 
344
        break;
 
345
 
 
346
      // Send failed
 
347
#if defined DEBUG_TRANSPORTER
 
348
      g_eventLogger.error("Send Failure(disconnect==%d) to node = %d nBytesSent = %d "
 
349
               "errno = %d strerror = %s",
 
350
               DISCONNECT_ERRNO(InetErrno, nBytesSent),
 
351
               remoteNodeId, nBytesSent, InetErrno, 
 
352
               (char*)ndbstrerror(InetErrno));
 
353
#endif   
 
354
      if(DISCONNECT_ERRNO(InetErrno, nBytesSent)){
 
355
        doDisconnect();
 
356
        report_disconnect(InetErrno);
 
357
      }
 
358
      
 
359
      return false;
 
360
    }
 
361
  }
 
362
  return true;
 
363
}
 
364
 
 
365
int
 
366
TCP_Transporter::doReceive() {
 
367
  // Select-function must return the socket for read
 
368
  // before this method is called
 
369
  // It reads the external TCP/IP interface once
 
370
  Uint32 size = receiveBuffer.sizeOfBuffer - receiveBuffer.sizeOfData;
 
371
  if(size > 0){
 
372
    const int nBytesRead = recv(theSocket, 
 
373
                                receiveBuffer.insertPtr, 
 
374
                                size < maxReceiveSize ? size : maxReceiveSize, 
 
375
                                0);
 
376
    
 
377
    if (nBytesRead > 0) {
 
378
      receiveBuffer.sizeOfData += nBytesRead;
 
379
      receiveBuffer.insertPtr  += nBytesRead;
 
380
      
 
381
      if(receiveBuffer.sizeOfData > receiveBuffer.sizeOfBuffer){
 
382
#ifdef DEBUG_TRANSPORTER
 
383
        g_eventLogger.error("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
 
384
                 receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
 
385
        g_eventLogger.error("nBytesRead = %d", nBytesRead);
 
386
#endif
 
387
        g_eventLogger.error("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
 
388
                 receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
 
389
        report_error(TE_INVALID_MESSAGE_LENGTH);
 
390
        return 0;
 
391
      }
 
392
      
 
393
      receiveCount ++;
 
394
      receiveSize  += nBytesRead;
 
395
      
 
396
      if(receiveCount == reportFreq){
 
397
        reportReceiveLen(get_callback_obj(), remoteNodeId, receiveCount, receiveSize);
 
398
        receiveCount = 0;
 
399
        receiveSize  = 0;
 
400
      }
 
401
      return nBytesRead;
 
402
    } else {
 
403
#if defined DEBUG_TRANSPORTER
 
404
      g_eventLogger.error("Receive Failure(disconnect==%d) to node = %d nBytesSent = %d "
 
405
               "errno = %d strerror = %s",
 
406
               DISCONNECT_ERRNO(InetErrno, nBytesRead),
 
407
               remoteNodeId, nBytesRead, InetErrno, 
 
408
               (char*)ndbstrerror(InetErrno));
 
409
#endif   
 
410
      if(DISCONNECT_ERRNO(InetErrno, nBytesRead)){
 
411
        // The remote node has closed down
 
412
        doDisconnect();
 
413
        report_disconnect(InetErrno);
 
414
      } 
 
415
    }
 
416
    return nBytesRead;
 
417
  } else {
 
418
    return 0;
 
419
  }
 
420
}
 
421
 
 
422
void
 
423
TCP_Transporter::disconnectImpl() {
 
424
  if(theSocket != NDB_INVALID_SOCKET){
 
425
    if(NDB_CLOSE_SOCKET(theSocket) < 0){
 
426
      report_error(TE_ERROR_CLOSING_SOCKET);
 
427
    }
 
428
  }
 
429
  
 
430
  // Empty send och receive buffers 
 
431
  receiveBuffer.clear();
 
432
  m_sendBuffer.emptyBuffer();
 
433
 
 
434
  theSocket = NDB_INVALID_SOCKET;
 
435
}