2
Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
4
This program is free software; you can redistribute it and/or modify
5
it under the terms of the GNU General Public License as published by
6
the Free Software Foundation; version 2 of the License.
8
This program is distributed in the hope that it will be useful,
9
but WITHOUT ANY WARRANTY; without even the implied warranty of
10
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
GNU General Public License for more details.
13
You should have received a copy of the GNU General Public License
14
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
#ifndef TCP_TRANSPORTER_HPP
19
#define TCP_TRANSPORTER_HPP
21
#include "Transporter.hpp"
25
struct ReceiveBuffer {
26
Uint32 *startOfBuffer; // Pointer to start of the receive buffer
27
Uint32 *readPtr; // Pointer to start reading data
29
char *insertPtr; // Pointer to first position in the receiveBuffer
30
// in which to insert received data. Earlier
31
// received incomplete messages (slack) are
32
// copied into the first part of the receiveBuffer
34
Uint32 sizeOfData; // In bytes
42
void incompleteMessage();
45
class TCP_Transporter : public Transporter {
46
friend class TransporterRegistry;
47
friend class Loopback_Transporter;
49
// Initialize member variables
50
TCP_Transporter(TransporterRegistry&, const TransporterConfiguration* conf);
52
// Disconnect, delete send buffers and receive buffer
53
virtual ~TCP_Transporter();
55
virtual bool configure_derived(const TransporterConfiguration* conf);
58
* Allocate buffers for sending and receiving
60
bool initTransporter();
63
* Retrieves the contents of the send buffers and writes it on
64
* the external TCP/IP interface.
69
* It reads the external TCP/IP interface once
70
* and puts the data in the receiveBuffer
75
* Returns socket (used for select)
77
NDB_SOCKET_TYPE getSocket() const;
82
* Returns - no of bytes to read
85
virtual Uint32 getReceiveData(Uint32 ** ptr);
88
* Update receive data ptr
90
virtual void updateReceiveDataPtr(Uint32 bytesRead);
92
inline bool hasReceiveData () const {
93
return receiveBuffer.sizeOfData > 0;
97
* Setup client/server and perform connect/accept
98
* Is used both by clients and servers
99
* A client connects to the remote server
100
* A server accepts any new connections
102
virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd);
103
virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd);
104
bool connect_common(NDB_SOCKET_TYPE sockfd);
107
* Disconnects a TCP/IP node. Empty receivebuffer.
109
virtual void disconnectImpl();
112
// Sending/Receiving socket used by both client and server
113
NDB_SOCKET_TYPE theSocket;
115
Uint32 maxReceiveSize;
120
int sockOptRcvBufSize;
121
int sockOptSndBufSize;
123
int sockOptTcpMaxSeg;
125
void setSocketOptions(NDB_SOCKET_TYPE socket);
127
static bool setSocketNonBlocking(NDB_SOCKET_TYPE aSocket);
128
virtual int pre_connect_options(NDB_SOCKET_TYPE aSocket);
130
bool send_is_possible(int timeout_millisec) const;
131
bool send_is_possible(NDB_SOCKET_TYPE fd, int timeout_millisec) const;
142
ReceiveBuffer receiveBuffer;
144
bool send_limit_reached(int bufsize) { return bufsize > TCP_SEND_LIMIT; }
149
TCP_Transporter::getSocket() const {
155
TCP_Transporter::getReceiveData(Uint32 ** ptr){
156
(* ptr) = receiveBuffer.readPtr;
157
return receiveBuffer.sizeOfData;
162
TCP_Transporter::updateReceiveDataPtr(Uint32 bytesRead){
163
char * ptr = (char *)receiveBuffer.readPtr;
165
receiveBuffer.readPtr = (Uint32*)ptr;
166
receiveBuffer.sizeOfData -= bytesRead;
167
receiveBuffer.incompleteMessage();
172
ReceiveBuffer::init(int bytes){
173
#ifdef DEBUG_TRANSPORTER
174
ndbout << "Allocating " << bytes << " bytes as receivebuffer" << endl;
177
startOfBuffer = new Uint32[((bytes + 0) >> 2) + 1];
178
sizeOfBuffer = bytes + sizeof(Uint32);
185
ReceiveBuffer::destroy(){
186
delete[] startOfBuffer;
194
ReceiveBuffer::clear(){
195
readPtr = startOfBuffer;
196
insertPtr = (char *)startOfBuffer;
202
ReceiveBuffer::incompleteMessage() {
203
if(startOfBuffer != readPtr){
205
memmove(startOfBuffer, readPtr, sizeOfData);
206
readPtr = startOfBuffer;
207
insertPtr = ((char *)startOfBuffer) + sizeOfData;
212
#endif // Define of TCP_Transporter_H