1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#ifndef SocketClientRegistry_H
17
#define SocketClientRegistry_H
22
#include "SocketClient.hpp"
25
class SocketRegistry {
28
SocketRegistry(Uint32 maxSocketClients);
31
* creates and adds a SocketClient to m_socketClients[]
32
* @param host - host name
33
* @param port - port to connect to
35
bool createSocketClient(const char * host, const Uint16 port);
38
* performReceive reads from sockets should do more stuff
40
int performReceive(T &);
44
* performReceive reads from sockets should do more stuff
46
int syncPerformReceive(const char* ,T &, Uint32);
50
* performSend sends a command to a host
52
bool performSend(const char * buf, Uint32 len, const char * remotehost);
55
* pollSocketClients performs a select (for a max. of timeoutmillis) or
56
* until there is data to be read from any SocketClient
57
* @param timeOutMillis - select timeout
59
int pollSocketClients(Uint32 timeOutMillis);
62
* reconnect tries to reconnect to a cpcd given its hostname
63
* @param host - name of host to reconnect to.
65
bool reconnect(const char * host);
70
* @param host - name of host for which to remove the SocketConnection
72
bool removeSocketClient(const char * host);
75
SocketClient** m_socketClients;
76
Uint32 m_maxSocketClients;
77
Uint32 m_nSocketClients;
78
int tcpReadSelectReply;
87
SocketRegistry<T>::SocketRegistry(Uint32 maxSocketClients) {
88
m_maxSocketClients = maxSocketClients;
89
m_socketClients = new SocketClient * [m_maxSocketClients];
96
SocketRegistry<T>::~SocketRegistry() {
97
delete [] m_socketClients;
103
SocketRegistry<T>::createSocketClient(const char * host, Uint16 port) {
110
SocketClient * socketClient = new SocketClient(host, port);
112
if(socketClient->openSocket() < 0 || socketClient == NULL) {
113
ndbout << "could not connect" << endl;
118
m_socketClients[m_nSocketClients] = socketClient;
127
SocketRegistry<T>::pollSocketClients(Uint32 timeOutMillis) {
131
// Return directly if there are no TCP transporters configured
132
if (m_nSocketClients == 0){
133
tcpReadSelectReply = 0;
136
struct timeval timeout;
137
timeout.tv_sec = timeOutMillis / 1000;
138
timeout.tv_usec = (timeOutMillis % 1000) * 1000;
141
NDB_SOCKET_TYPE maxSocketValue = 0;
143
// Needed for TCP/IP connections
144
// The read- and writeset are used by select
146
FD_ZERO(&tcpReadset);
148
// Prepare for sending and receiving
149
for (Uint32 i = 0; i < m_nSocketClients; i++) {
150
SocketClient * t = m_socketClients[i];
152
// If the socketclient is connected
153
if (t->isConnected()) {
155
const NDB_SOCKET_TYPE socket = t->getSocket();
156
// Find the highest socket value. It will be used by select
157
if (socket > maxSocketValue)
158
maxSocketValue = socket;
160
// Put the connected transporters in the socket read-set
161
FD_SET(socket, &tcpReadset);
165
// The highest socket value plus one
168
tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);
170
if(tcpReadSelectReply == SOCKET_ERROR)
172
NdbSleep_MilliSleep(timeOutMillis);
176
return tcpReadSelectReply;
183
SocketRegistry<T>::performSend(const char * buf, Uint32 len, const char * remotehost)
185
SocketClient * socketClient;
186
for(Uint32 i=0; i < m_nSocketClients; i++) {
187
socketClient = m_socketClients[i];
188
if(strcmp(socketClient->gethostname(), remotehost)==0) {
189
if(socketClient->isConnected()) {
190
if(socketClient->writeSocket(buf, len)>0)
203
SocketRegistry<T>::performReceive(T & t) {
204
char buf[255] ; //temp. just for testing. must fix better
206
if(tcpReadSelectReply > 0){
207
for (Uint32 i=0; i<m_nSocketClients; i++) {
208
SocketClient *sc = m_socketClients[i];
209
const NDB_SOCKET_TYPE socket = sc->getSocket();
210
if(sc->isConnected() && FD_ISSET(socket, &tcpReadset)) {
211
t->runSession(socket,t);
225
SocketRegistry<T>::syncPerformReceive(const char * remotehost,
227
Uint32 timeOutMillis) {
228
char buf[255] ; //temp. just for testing. must fix better
229
struct timeval timeout;
230
timeout.tv_sec = timeOutMillis / 1000;
231
timeout.tv_usec = (timeOutMillis % 1000) * 1000;
234
for(Uint32 i=0; i < m_nSocketClients; i++) {
235
sc = m_socketClients[i];
236
if(strcmp(sc->gethostname(), remotehost)==0) {
237
if(sc->isConnected()) {
238
/*FD_ZERO(&tcpReadset);
239
reply = select(sc->getSocket()+1, 0, 0, 0, &timeout);
242
t.runSession(sc->getSocket(), t);
255
SocketRegistry<T>::reconnect(const char * host){
256
for(Uint32 i=0; i < m_nSocketClients; i++) {
257
SocketClient * socketClient = m_socketClients[i];
258
if(strcmp(socketClient->gethostname(), host)==0) {
259
if(!socketClient->isConnected()) {
260
if(socketClient->openSocket() > 0)
272
SocketRegistry<T>::removeSocketClient(const char * host){
273
for(Uint32 i=0; i < m_nSocketClients; i++) {
274
SocketClient * socketClient = m_socketClients[i];
275
if(strcmp(socketClient->gethostname(), host)==0) {
276
if(!socketClient->isConnected()) {
277
if(socketClient->closeSocket() > 0) {
289
#endif // Define of SocketRegistry