~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/common/util/SocketServer.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
#include <ndb_global.h>
 
18
#include <my_pthread.h>
 
19
 
 
20
#include <SocketServer.hpp>
 
21
 
 
22
#include <NdbTCP.h>
 
23
#include <NdbOut.hpp>
 
24
#include <NdbThread.h>
 
25
#include <NdbSleep.h>
 
26
 
 
27
#define DEBUG(x) ndbout << x << endl;
 
28
 
 
29
SocketServer::SocketServer(unsigned maxSessions) :
 
30
  m_sessions(10),
 
31
  m_services(5)
 
32
{
 
33
  m_thread = 0;
 
34
  m_stopThread = false;
 
35
  m_maxSessions = maxSessions;
 
36
}
 
37
 
 
38
SocketServer::~SocketServer() {
 
39
  unsigned i;
 
40
  for(i = 0; i<m_sessions.size(); i++){
 
41
    delete m_sessions[i].m_session;
 
42
  }
 
43
  for(i = 0; i<m_services.size(); i++){
 
44
    if(m_services[i].m_socket)
 
45
      NDB_CLOSE_SOCKET(m_services[i].m_socket);
 
46
    delete m_services[i].m_service;
 
47
  }
 
48
}
 
49
 
 
50
bool
 
51
SocketServer::tryBind(unsigned short port, const char * intface) {
 
52
  struct sockaddr_in servaddr;
 
53
  memset(&servaddr, 0, sizeof(servaddr));
 
54
  servaddr.sin_family = AF_INET;
 
55
  servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
 
56
  servaddr.sin_port = htons(port);
 
57
  
 
58
  if(intface != 0){
 
59
    if(Ndb_getInAddr(&servaddr.sin_addr, intface))
 
60
      return false;
 
61
  }
 
62
  
 
63
  const NDB_SOCKET_TYPE sock  = socket(AF_INET, SOCK_STREAM, 0);
 
64
  if (sock == NDB_INVALID_SOCKET) {
 
65
    return false;
 
66
  }
 
67
  
 
68
  DBUG_PRINT("info",("NDB_SOCKET: %d", sock));
 
69
 
 
70
  const int on = 1;
 
71
  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 
 
72
                 (const char*)&on, sizeof(on)) == -1) {
 
73
    NDB_CLOSE_SOCKET(sock);
 
74
    return false;
 
75
  }
 
76
  
 
77
  if (bind(sock, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) {
 
78
    NDB_CLOSE_SOCKET(sock);
 
79
    return false;
 
80
  }
 
81
 
 
82
  NDB_CLOSE_SOCKET(sock);
 
83
  return true;
 
84
}
 
85
 
 
86
bool
 
87
SocketServer::setup(SocketServer::Service * service, 
 
88
                    unsigned short * port,
 
89
                    const char * intface){
 
90
  DBUG_ENTER("SocketServer::setup");
 
91
  DBUG_PRINT("enter",("interface=%s, port=%u", intface, *port));
 
92
  struct sockaddr_in servaddr;
 
93
  memset(&servaddr, 0, sizeof(servaddr));
 
94
  servaddr.sin_family = AF_INET;
 
95
  servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
 
96
  servaddr.sin_port = htons(*port);
 
97
  
 
98
  if(intface != 0){
 
99
    if(Ndb_getInAddr(&servaddr.sin_addr, intface))
 
100
      DBUG_RETURN(false);
 
101
  }
 
102
  
 
103
  const NDB_SOCKET_TYPE sock  = socket(AF_INET, SOCK_STREAM, 0);
 
104
  if (sock == NDB_INVALID_SOCKET) {
 
105
    DBUG_PRINT("error",("socket() - %d - %s",
 
106
                        errno, strerror(errno)));
 
107
    DBUG_RETURN(false);
 
108
  }
 
109
  
 
110
  DBUG_PRINT("info",("NDB_SOCKET: %d", sock));
 
111
 
 
112
  const int on = 1;
 
113
  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 
 
114
                 (const char*)&on, sizeof(on)) == -1) {
 
115
    DBUG_PRINT("error",("setsockopt() - %d - %s",
 
116
                        errno, strerror(errno)));
 
117
    NDB_CLOSE_SOCKET(sock);
 
118
    DBUG_RETURN(false);
 
119
  }
 
120
  
 
121
  if (bind(sock, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) {
 
122
    DBUG_PRINT("error",("bind() - %d - %s",
 
123
                        errno, strerror(errno)));
 
124
    NDB_CLOSE_SOCKET(sock);
 
125
    DBUG_RETURN(false);
 
126
  }
 
127
 
 
128
  /* Get the port we bound to */
 
129
  SOCKET_SIZE_TYPE sock_len = sizeof(servaddr);
 
130
  if(getsockname(sock,(struct sockaddr*)&servaddr,&sock_len)<0) {
 
131
    ndbout_c("An error occurred while trying to find out what"
 
132
             " port we bound to. Error: %s",strerror(errno));
 
133
    NDB_CLOSE_SOCKET(sock);
 
134
    DBUG_RETURN(false);
 
135
  }
 
136
 
 
137
  DBUG_PRINT("info",("bound to %u",ntohs(servaddr.sin_port)));
 
138
  if (listen(sock, m_maxSessions > 32 ? 32 : m_maxSessions) == -1){
 
139
    DBUG_PRINT("error",("listen() - %d - %s",
 
140
                        errno, strerror(errno)));
 
141
    NDB_CLOSE_SOCKET(sock);
 
142
    DBUG_RETURN(false);
 
143
  }
 
144
  
 
145
  ServiceInstance i;
 
146
  i.m_socket = sock;
 
147
  i.m_service = service;
 
148
  m_services.push_back(i);
 
149
 
 
150
  *port = ntohs(servaddr.sin_port);
 
151
 
 
152
  DBUG_RETURN(true);
 
153
}
 
154
 
 
155
void
 
156
SocketServer::doAccept(){
 
157
  fd_set readSet, exceptionSet;
 
158
  FD_ZERO(&readSet);
 
159
  FD_ZERO(&exceptionSet);
 
160
  
 
161
  m_services.lock();
 
162
  int maxSock = 0;
 
163
  for (unsigned i = 0; i < m_services.size(); i++){
 
164
    const NDB_SOCKET_TYPE s = m_services[i].m_socket;
 
165
    FD_SET(s, &readSet);
 
166
    FD_SET(s, &exceptionSet);
 
167
    maxSock = (maxSock > s ? maxSock : s);
 
168
  }
 
169
  struct timeval timeout;
 
170
  timeout.tv_sec  = 1;
 
171
  timeout.tv_usec = 0;
 
172
  
 
173
  if(select(maxSock + 1, &readSet, 0, &exceptionSet, &timeout) > 0){
 
174
    for (unsigned i = 0; i < m_services.size(); i++){
 
175
      ServiceInstance & si = m_services[i];
 
176
      
 
177
      if(FD_ISSET(si.m_socket, &readSet)){
 
178
        NDB_SOCKET_TYPE childSock = accept(si.m_socket, 0, 0);
 
179
        if(childSock == NDB_INVALID_SOCKET){
 
180
          continue;
 
181
        }
 
182
        
 
183
        SessionInstance s;
 
184
        s.m_service = si.m_service;
 
185
        s.m_session = si.m_service->newSession(childSock);
 
186
        if(s.m_session != 0)
 
187
        {
 
188
          m_session_mutex.lock();
 
189
          m_sessions.push_back(s);
 
190
          startSession(m_sessions.back());
 
191
          m_session_mutex.unlock();
 
192
        }
 
193
        
 
194
        continue;
 
195
      }      
 
196
      
 
197
      if(FD_ISSET(si.m_socket, &exceptionSet)){
 
198
        DEBUG("socket in the exceptionSet");
 
199
        continue;
 
200
      }
 
201
    }
 
202
  }
 
203
  m_services.unlock();
 
204
}
 
205
 
 
206
extern "C"
 
207
void* 
 
208
socketServerThread_C(void* _ss){
 
209
  SocketServer * ss = (SocketServer *)_ss;
 
210
  ss->doRun();
 
211
  return 0;
 
212
}
 
213
 
 
214
void
 
215
SocketServer::startServer(){
 
216
  m_threadLock.lock();
 
217
  if(m_thread == 0 && m_stopThread == false){
 
218
    m_thread = NdbThread_Create(socketServerThread_C,
 
219
                                (void**)this,
 
220
                                32768,
 
221
                                "NdbSockServ",
 
222
                                NDB_THREAD_PRIO_LOW);
 
223
  }
 
224
  m_threadLock.unlock();
 
225
}
 
226
 
 
227
void
 
228
SocketServer::stopServer(){
 
229
  m_threadLock.lock();
 
230
  if(m_thread != 0){
 
231
    m_stopThread = true;
 
232
    
 
233
    void * res;
 
234
    NdbThread_WaitFor(m_thread, &res);
 
235
    NdbThread_Destroy(&m_thread);
 
236
    m_thread = 0;
 
237
  }
 
238
  m_threadLock.unlock();
 
239
}
 
240
 
 
241
void
 
242
SocketServer::doRun(){
 
243
 
 
244
  while(!m_stopThread){
 
245
    m_session_mutex.lock();
 
246
    checkSessionsImpl();
 
247
    if(m_sessions.size() < m_maxSessions){
 
248
      m_session_mutex.unlock();
 
249
      doAccept();
 
250
    } else {
 
251
      m_session_mutex.unlock();
 
252
      NdbSleep_MilliSleep(200);
 
253
    }
 
254
  }
 
255
}
 
256
 
 
257
void
 
258
SocketServer::startSession(SessionInstance & si){
 
259
  si.m_thread = NdbThread_Create(sessionThread_C,
 
260
                                 (void**)si.m_session,
 
261
                                 32768,
 
262
                                 "NdbSock_Session",
 
263
                                 NDB_THREAD_PRIO_LOW);
 
264
}
 
265
 
 
266
void
 
267
SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data)
 
268
{
 
269
  m_session_mutex.lock();
 
270
  for(int i = m_sessions.size() - 1; i >= 0; i--){
 
271
    (*func)(m_sessions[i].m_session, data);
 
272
  }
 
273
  m_session_mutex.unlock();
 
274
}
 
275
 
 
276
void
 
277
SocketServer::checkSessions()
 
278
{
 
279
  m_session_mutex.lock();
 
280
  checkSessionsImpl();
 
281
  m_session_mutex.unlock();  
 
282
}
 
283
 
 
284
void
 
285
SocketServer::checkSessionsImpl()
 
286
{
 
287
  for(int i = m_sessions.size() - 1; i >= 0; i--)
 
288
  {
 
289
    if(m_sessions[i].m_session->m_stopped)
 
290
    {
 
291
      if(m_sessions[i].m_thread != 0)
 
292
      {
 
293
        void* ret;
 
294
        NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
 
295
        NdbThread_Destroy(&m_sessions[i].m_thread);
 
296
      } 
 
297
      m_sessions[i].m_session->stopSession();
 
298
      delete m_sessions[i].m_session;
 
299
      m_sessions.erase(i);
 
300
    }
 
301
  }
 
302
}
 
303
 
 
304
void
 
305
SocketServer::stopSessions(bool wait){
 
306
  int i;
 
307
  m_session_mutex.lock();
 
308
  for(i = m_sessions.size() - 1; i>=0; i--)
 
309
  {
 
310
    m_sessions[i].m_session->stopSession();
 
311
    m_sessions[i].m_session->m_stop = true; // to make sure
 
312
  }
 
313
  m_session_mutex.unlock();
 
314
  
 
315
  for(i = m_services.size() - 1; i>=0; i--)
 
316
    m_services[i].m_service->stopSessions();
 
317
  
 
318
  if(wait){
 
319
    m_session_mutex.lock();
 
320
    while(m_sessions.size() > 0){
 
321
      checkSessionsImpl();
 
322
      m_session_mutex.unlock();
 
323
      NdbSleep_MilliSleep(100);
 
324
      m_session_mutex.lock();
 
325
    }
 
326
    m_session_mutex.unlock();
 
327
  }
 
328
}
 
329
 
 
330
/***** Session code ******/
 
331
 
 
332
extern "C"
 
333
void* 
 
334
sessionThread_C(void* _sc){
 
335
  SocketServer::Session * si = (SocketServer::Session *)_sc;
 
336
 
 
337
  /**
 
338
   * may have m_stopped set if we're transforming a mgm
 
339
   * connection into a transporter connection.
 
340
   */
 
341
  if(!si->m_stopped)
 
342
  {
 
343
    if(!si->m_stop){
 
344
      si->m_stopped = false;
 
345
      si->runSession();
 
346
    } else {
 
347
      NDB_CLOSE_SOCKET(si->m_socket);
 
348
    }
 
349
  }
 
350
  
 
351
  si->m_stopped = true;
 
352
  return 0;
 
353
}
 
354
 
 
355
template class MutexVector<SocketServer::ServiceInstance>;
 
356
template class Vector<SocketServer::SessionInstance>;