~ubuntu-branches/ubuntu/trusty/mysql-5.6/trusty

« back to all changes in this revision

Viewing changes to storage/ndb/src/common/transporter/TCP_Transporter.hpp

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-02-12 11:54:27 UTC
  • Revision ID: package-import@ubuntu.com-20140212115427-oq6tfsqxl1wuwehi
Tags: upstream-5.6.15
ImportĀ upstreamĀ versionĀ 5.6.15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
   Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
 
3
 
 
4
   This program is free software; you can redistribute it and/or modify
 
5
   it under the terms of the GNU General Public License as published by
 
6
   the Free Software Foundation; version 2 of the License.
 
7
 
 
8
   This program is distributed in the hope that it will be useful,
 
9
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
   GNU General Public License for more details.
 
12
 
 
13
   You should have received a copy of the GNU General Public License
 
14
   along with this program; if not, write to the Free Software
 
15
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
 
16
*/
 
17
 
 
18
#ifndef TCP_TRANSPORTER_HPP
 
19
#define TCP_TRANSPORTER_HPP
 
20
 
 
21
#include "Transporter.hpp"
 
22
 
 
23
#include <NdbTCP.h>
 
24
 
 
25
struct ReceiveBuffer {
 
26
  Uint32 *startOfBuffer;    // Pointer to start of the receive buffer 
 
27
  Uint32 *readPtr;          // Pointer to start reading data
 
28
  
 
29
  char   *insertPtr;        // Pointer to first position in the receiveBuffer
 
30
                            // in which to insert received data. Earlier
 
31
                            // received incomplete messages (slack) are 
 
32
                            // copied into the first part of the receiveBuffer
 
33
 
 
34
  Uint32 sizeOfData;        // In bytes
 
35
  Uint32 sizeOfBuffer;
 
36
  
 
37
  ReceiveBuffer() {}
 
38
  bool init(int bytes);
 
39
  void destroy();
 
40
  
 
41
  void clear();
 
42
  void incompleteMessage();
 
43
};
 
44
 
 
45
class TCP_Transporter : public Transporter {
 
46
  friend class TransporterRegistry;
 
47
  friend class Loopback_Transporter;
 
48
private:
 
49
  // Initialize member variables
 
50
  TCP_Transporter(TransporterRegistry&, const TransporterConfiguration* conf);
 
51
 
 
52
  // Disconnect, delete send buffers and receive buffer
 
53
  virtual ~TCP_Transporter();
 
54
 
 
55
  virtual bool configure_derived(const TransporterConfiguration* conf);
 
56
 
 
57
  /**
 
58
   * Allocate buffers for sending and receiving
 
59
   */
 
60
  bool initTransporter();
 
61
 
 
62
  /**
 
63
   * Retrieves the contents of the send buffers and writes it on
 
64
   * the external TCP/IP interface.
 
65
   */
 
66
  int doSend();
 
67
  
 
68
  /**
 
69
   * It reads the external TCP/IP interface once 
 
70
   * and puts the data in the receiveBuffer
 
71
   */
 
72
  int doReceive(); 
 
73
 
 
74
  /**
 
75
   * Returns socket (used for select)
 
76
   */
 
77
  NDB_SOCKET_TYPE getSocket() const;
 
78
 
 
79
  /**
 
80
   * Get Receive Data
 
81
   *
 
82
   *  Returns - no of bytes to read
 
83
   *            and set ptr
 
84
   */
 
85
  virtual Uint32 getReceiveData(Uint32 ** ptr);
 
86
  
 
87
  /**
 
88
   * Update receive data ptr
 
89
   */
 
90
  virtual void updateReceiveDataPtr(Uint32 bytesRead);
 
91
 
 
92
  inline bool hasReceiveData () const {
 
93
    return receiveBuffer.sizeOfData > 0;
 
94
  }
 
95
protected:
 
96
  /**
 
97
   * Setup client/server and perform connect/accept
 
98
   * Is used both by clients and servers
 
99
   * A client connects to the remote server
 
100
   * A server accepts any new connections
 
101
   */
 
102
  virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd);
 
103
  virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd);
 
104
  bool connect_common(NDB_SOCKET_TYPE sockfd);
 
105
  
 
106
  /**
 
107
   * Disconnects a TCP/IP node. Empty receivebuffer.
 
108
   */
 
109
  virtual void disconnectImpl();
 
110
  
 
111
private:
 
112
  // Sending/Receiving socket used by both client and server
 
113
  NDB_SOCKET_TYPE theSocket;   
 
114
  
 
115
  Uint32 maxReceiveSize;
 
116
  
 
117
  /**
 
118
   * Socket options
 
119
   */
 
120
  int sockOptRcvBufSize;
 
121
  int sockOptSndBufSize;
 
122
  int sockOptNodelay;
 
123
  int sockOptTcpMaxSeg;
 
124
 
 
125
  void setSocketOptions(NDB_SOCKET_TYPE socket);
 
126
 
 
127
  static bool setSocketNonBlocking(NDB_SOCKET_TYPE aSocket);
 
128
  virtual int pre_connect_options(NDB_SOCKET_TYPE aSocket);
 
129
  
 
130
  bool send_is_possible(int timeout_millisec) const;
 
131
  bool send_is_possible(NDB_SOCKET_TYPE fd, int timeout_millisec) const;
 
132
 
 
133
  /**
 
134
   * Statistics
 
135
   */
 
136
  Uint32 reportFreq;
 
137
  Uint32 receiveCount;
 
138
  Uint64 receiveSize;
 
139
  Uint32 sendCount;
 
140
  Uint64 sendSize;
 
141
 
 
142
  ReceiveBuffer receiveBuffer;
 
143
 
 
144
  bool send_limit_reached(int bufsize) { return bufsize > TCP_SEND_LIMIT; }
 
145
};
 
146
 
 
147
inline
 
148
NDB_SOCKET_TYPE
 
149
TCP_Transporter::getSocket() const {
 
150
  return theSocket;
 
151
}
 
152
 
 
153
inline
 
154
Uint32
 
155
TCP_Transporter::getReceiveData(Uint32 ** ptr){
 
156
  (* ptr) = receiveBuffer.readPtr;
 
157
  return receiveBuffer.sizeOfData;
 
158
}
 
159
 
 
160
inline
 
161
void
 
162
TCP_Transporter::updateReceiveDataPtr(Uint32 bytesRead){
 
163
  char * ptr = (char *)receiveBuffer.readPtr;
 
164
  ptr += bytesRead;
 
165
  receiveBuffer.readPtr = (Uint32*)ptr;
 
166
  receiveBuffer.sizeOfData -= bytesRead;
 
167
  receiveBuffer.incompleteMessage();
 
168
}
 
169
 
 
170
inline
 
171
bool
 
172
ReceiveBuffer::init(int bytes){
 
173
#ifdef DEBUG_TRANSPORTER
 
174
  ndbout << "Allocating " << bytes << " bytes as receivebuffer" << endl;
 
175
#endif
 
176
 
 
177
  startOfBuffer = new Uint32[((bytes + 0) >> 2) + 1];
 
178
  sizeOfBuffer  = bytes + sizeof(Uint32);
 
179
  clear();
 
180
  return true;
 
181
}
 
182
 
 
183
inline
 
184
void
 
185
ReceiveBuffer::destroy(){
 
186
  delete[] startOfBuffer;
 
187
  sizeOfBuffer  = 0;
 
188
  startOfBuffer = 0;
 
189
  clear();
 
190
}
 
191
 
 
192
inline
 
193
void
 
194
ReceiveBuffer::clear(){
 
195
  readPtr    = startOfBuffer;
 
196
  insertPtr  = (char *)startOfBuffer;
 
197
  sizeOfData = 0;
 
198
}
 
199
 
 
200
inline
 
201
void
 
202
ReceiveBuffer::incompleteMessage() {
 
203
  if(startOfBuffer != readPtr){
 
204
    if(sizeOfData != 0)
 
205
      memmove(startOfBuffer, readPtr, sizeOfData);
 
206
    readPtr   = startOfBuffer;
 
207
    insertPtr = ((char *)startOfBuffer) + sizeOfData;
 
208
  }
 
209
}
 
210
 
 
211
 
 
212
#endif // Define of TCP_Transporter_H