1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include <ndb_global.h>
19
#include "TCP_Transporter.hpp"
23
#include <EventLogger.hpp>
24
extern EventLogger g_eventLogger;
25
// End of stuff to be moved
31
ndbstrerror(int iError);
33
operator char*(void) { return m_szError; };
40
ndbstrerror::ndbstrerror(int iError)
44
FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
47
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
53
ndbstrerror::~ndbstrerror(void)
55
LocalFree( m_szError );
59
#define ndbstrerror strerror
62
TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
63
int sendBufSize, int maxRecvSize,
64
const char *lHostName,
65
const char *rHostName,
67
bool isMgmConnection_arg,
71
bool chksm, bool signalId,
73
Transporter(t_reg, tt_TCP_TRANSPORTER,
74
lHostName, rHostName, r_port, isMgmConnection_arg,
75
lNodeId, rNodeId, serverNodeId,
76
0, false, chksm, signalId),
77
m_sendBuffer(sendBufSize)
79
maxReceiveSize = maxRecvSize;
81
// Initialize member variables
82
theSocket = NDB_INVALID_SOCKET;
84
sendCount = receiveCount = 0;
85
sendSize = receiveSize = 0;
86
reportFreq = _reportFreq;
88
sockOptRcvBufSize = 70080;
89
sockOptSndBufSize = 71540;
91
sockOptTcpMaxSeg = 4096;
94
TCP_Transporter::~TCP_Transporter() {
97
if (theSocket != NDB_INVALID_SOCKET)
100
// Delete send buffers
102
// Delete receive buffer!!
103
receiveBuffer.destroy();
106
bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
108
DBUG_ENTER("TCP_Transpporter::connect_server_impl");
109
DBUG_RETURN(connect_common(sockfd));
112
bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
114
DBUG_ENTER("TCP_Transpporter::connect_client_impl");
115
DBUG_RETURN(connect_common(sockfd));
118
bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
122
setSocketNonBlocking(theSocket);
123
DBUG_PRINT("info", ("Successfully set-up TCP transporter to node %d",
129
TCP_Transporter::initTransporter() {
131
// Allocate buffer for receiving
132
// Let it be the maximum size we receive plus 8 kB for any earlier received
133
// incomplete messages (slack)
134
Uint32 recBufSize = maxReceiveSize;
135
if(recBufSize < MAX_MESSAGE_SIZE){
136
recBufSize = MAX_MESSAGE_SIZE;
139
if(!receiveBuffer.init(recBufSize+MAX_MESSAGE_SIZE)){
143
// Allocate buffers for sending
144
if (!m_sendBuffer.initBuffer(remoteNodeId)) {
145
// XXX What shall be done here?
146
// The same is valid for the other init-methods
154
TCP_Transporter::setSocketOptions(){
155
int sockOptKeepAlive = 1;
157
if (setsockopt(theSocket, SOL_SOCKET, SO_RCVBUF,
158
(char*)&sockOptRcvBufSize, sizeof(sockOptRcvBufSize)) < 0) {
159
#ifdef DEBUG_TRANSPORTER
160
g_eventLogger.error("The setsockopt SO_RCVBUF error code = %d", InetErrno);
164
if (setsockopt(theSocket, SOL_SOCKET, SO_SNDBUF,
165
(char*)&sockOptSndBufSize, sizeof(sockOptSndBufSize)) < 0) {
166
#ifdef DEBUG_TRANSPORTER
167
g_eventLogger.error("The setsockopt SO_SNDBUF error code = %d", InetErrno);
171
if (setsockopt(theSocket, SOL_SOCKET, SO_KEEPALIVE,
172
(char*)&sockOptKeepAlive, sizeof(sockOptKeepAlive)) < 0) {
173
ndbout_c("The setsockopt SO_KEEPALIVE error code = %d", InetErrno);
176
//-----------------------------------------------
177
// Set the TCP_NODELAY option so also small packets are sent
178
// as soon as possible
179
//-----------------------------------------------
180
if (setsockopt(theSocket, IPPROTO_TCP, TCP_NODELAY,
181
(char*)&sockOptNodelay, sizeof(sockOptNodelay)) < 0) {
182
#ifdef DEBUG_TRANSPORTER
183
g_eventLogger.error("The setsockopt TCP_NODELAY error code = %d", InetErrno);
192
TCP_Transporter::setSocketNonBlocking(NDB_SOCKET_TYPE socket){
193
unsigned long ul = 1;
194
if(ioctlsocket(socket, FIONBIO, &ul))
196
#ifdef DEBUG_TRANSPORTER
197
g_eventLogger.error("Set non-blocking server error3: %d", InetErrno);
206
TCP_Transporter::setSocketNonBlocking(NDB_SOCKET_TYPE socket){
208
flags = fcntl(socket, F_GETFL, 0);
210
#ifdef DEBUG_TRANSPORTER
211
g_eventLogger.error("Set non-blocking server error1: %s", strerror(InetErrno));
214
flags |= NDB_NONBLOCK;
215
if (fcntl(socket, F_SETFL, flags) == -1) {
216
#ifdef DEBUG_TRANSPORTER
217
g_eventLogger.error("Set non-blocking server error2: %s", strerror(InetErrno));
226
TCP_Transporter::sendIsPossible(struct timeval * timeout) {
227
if(theSocket != NDB_INVALID_SOCKET){
230
FD_SET(theSocket, &writeset);
232
int selectReply = select(theSocket + 1, NULL, &writeset, NULL, timeout);
234
if ((selectReply > 0) && FD_ISSET(theSocket, &writeset))
243
TCP_Transporter::get_free_buffer() const
245
return m_sendBuffer.bufferSizeRemaining();
249
TCP_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){
251
Uint32 * insertPtr = m_sendBuffer.getInsertPtr(lenBytes);
253
struct timeval timeout = {0, 10000};
255
if (insertPtr == 0) {
256
//-------------------------------------------------
257
// Buffer was completely full. We have severe problems.
258
// We will attempt to wait for a small time
259
//-------------------------------------------------
260
if(sendIsPossible(&timeout)) {
261
//-------------------------------------------------
262
// Send is possible after the small timeout.
263
//-------------------------------------------------
267
//-------------------------------------------------
268
// Since send was successful we will make a renewed
269
// attempt at inserting the signal into the buffer.
270
//-------------------------------------------------
271
insertPtr = m_sendBuffer.getInsertPtr(lenBytes);
281
TCP_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){
282
m_sendBuffer.updateInsertPtr(lenBytes);
284
const int bufsize = m_sendBuffer.bufferSize();
285
if(bufsize > TCP_SEND_LIMIT) {
286
//-------------------------------------------------
287
// Buffer is full and we are ready to send. We will
288
// not wait since the signal is already in the buffer.
289
// Force flag set has the same indication that we
290
// should always send. If it is not possible to send
291
// we will not worry since we will soon be back for
293
//-------------------------------------------------
294
struct timeval no_timeout = {0,0};
295
if(sendIsPossible(&no_timeout)) {
296
//-------------------------------------------------
297
// Send was possible, attempt at a send.
298
//-------------------------------------------------
304
#define DISCONNECT_ERRNO(e, sz) ((sz == 0) || \
305
(!((sz == -1) && (e == EAGAIN) || (e == EWOULDBLOCK) || (e == EINTR))))
309
TCP_Transporter::doSend() {
310
// If no sendbuffers are used nothing is done
311
// Sends the contents of the SendBuffers until they are empty
312
// or until select does not select the socket for write.
313
// Before calling send, the socket must be selected for write
315
// It writes on the external TCP/IP interface until the send buffer is empty
316
// and as long as write is possible (test it using select)
318
// Empty the SendBuffers
320
bool sent_any = true;
321
while (m_sendBuffer.dataSize > 0)
323
const char * const sendPtr = m_sendBuffer.sendPtr;
324
const Uint32 sizeToSend = m_sendBuffer.sendDataSize;
325
const int nBytesSent = send(theSocket, sendPtr, sizeToSend, 0);
330
m_sendBuffer.bytesSent(nBytesSent);
333
sendSize += nBytesSent;
334
if(sendCount == reportFreq)
336
reportSendLen(get_callback_obj(), remoteNodeId, sendCount, sendSize);
343
if (nBytesSent < 0 && InetErrno == EAGAIN && sent_any)
347
#if defined DEBUG_TRANSPORTER
348
g_eventLogger.error("Send Failure(disconnect==%d) to node = %d nBytesSent = %d "
349
"errno = %d strerror = %s",
350
DISCONNECT_ERRNO(InetErrno, nBytesSent),
351
remoteNodeId, nBytesSent, InetErrno,
352
(char*)ndbstrerror(InetErrno));
354
if(DISCONNECT_ERRNO(InetErrno, nBytesSent)){
356
report_disconnect(InetErrno);
366
TCP_Transporter::doReceive() {
367
// Select-function must return the socket for read
368
// before this method is called
369
// It reads the external TCP/IP interface once
370
Uint32 size = receiveBuffer.sizeOfBuffer - receiveBuffer.sizeOfData;
372
const int nBytesRead = recv(theSocket,
373
receiveBuffer.insertPtr,
374
size < maxReceiveSize ? size : maxReceiveSize,
377
if (nBytesRead > 0) {
378
receiveBuffer.sizeOfData += nBytesRead;
379
receiveBuffer.insertPtr += nBytesRead;
381
if(receiveBuffer.sizeOfData > receiveBuffer.sizeOfBuffer){
382
#ifdef DEBUG_TRANSPORTER
383
g_eventLogger.error("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
384
receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
385
g_eventLogger.error("nBytesRead = %d", nBytesRead);
387
g_eventLogger.error("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
388
receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
389
report_error(TE_INVALID_MESSAGE_LENGTH);
394
receiveSize += nBytesRead;
396
if(receiveCount == reportFreq){
397
reportReceiveLen(get_callback_obj(), remoteNodeId, receiveCount, receiveSize);
403
#if defined DEBUG_TRANSPORTER
404
g_eventLogger.error("Receive Failure(disconnect==%d) to node = %d nBytesSent = %d "
405
"errno = %d strerror = %s",
406
DISCONNECT_ERRNO(InetErrno, nBytesRead),
407
remoteNodeId, nBytesRead, InetErrno,
408
(char*)ndbstrerror(InetErrno));
410
if(DISCONNECT_ERRNO(InetErrno, nBytesRead)){
411
// The remote node has closed down
413
report_disconnect(InetErrno);
423
TCP_Transporter::disconnectImpl() {
424
if(theSocket != NDB_INVALID_SOCKET){
425
if(NDB_CLOSE_SOCKET(theSocket) < 0){
426
report_error(TE_ERROR_CLOSING_SOCKET);
430
// Empty send och receive buffers
431
receiveBuffer.clear();
432
m_sendBuffer.emptyBuffer();
434
theSocket = NDB_INVALID_SOCKET;