1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
#include <ndb_global.h>
18
#include <my_pthread.h>
20
#include <SocketServer.hpp>
24
#include <NdbThread.h>
27
#define DEBUG(x) ndbout << x << endl;
29
SocketServer::SocketServer(unsigned maxSessions) :
35
m_maxSessions = maxSessions;
38
SocketServer::~SocketServer() {
40
for(i = 0; i<m_sessions.size(); i++){
41
delete m_sessions[i].m_session;
43
for(i = 0; i<m_services.size(); i++){
44
if(m_services[i].m_socket)
45
NDB_CLOSE_SOCKET(m_services[i].m_socket);
46
delete m_services[i].m_service;
51
SocketServer::tryBind(unsigned short port, const char * intface) {
52
struct sockaddr_in servaddr;
53
memset(&servaddr, 0, sizeof(servaddr));
54
servaddr.sin_family = AF_INET;
55
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
56
servaddr.sin_port = htons(port);
59
if(Ndb_getInAddr(&servaddr.sin_addr, intface))
63
const NDB_SOCKET_TYPE sock = socket(AF_INET, SOCK_STREAM, 0);
64
if (sock == NDB_INVALID_SOCKET) {
68
DBUG_PRINT("info",("NDB_SOCKET: %d", sock));
71
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
72
(const char*)&on, sizeof(on)) == -1) {
73
NDB_CLOSE_SOCKET(sock);
77
if (bind(sock, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) {
78
NDB_CLOSE_SOCKET(sock);
82
NDB_CLOSE_SOCKET(sock);
87
SocketServer::setup(SocketServer::Service * service,
88
unsigned short * port,
89
const char * intface){
90
DBUG_ENTER("SocketServer::setup");
91
DBUG_PRINT("enter",("interface=%s, port=%u", intface, *port));
92
struct sockaddr_in servaddr;
93
memset(&servaddr, 0, sizeof(servaddr));
94
servaddr.sin_family = AF_INET;
95
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
96
servaddr.sin_port = htons(*port);
99
if(Ndb_getInAddr(&servaddr.sin_addr, intface))
103
const NDB_SOCKET_TYPE sock = socket(AF_INET, SOCK_STREAM, 0);
104
if (sock == NDB_INVALID_SOCKET) {
105
DBUG_PRINT("error",("socket() - %d - %s",
106
errno, strerror(errno)));
110
DBUG_PRINT("info",("NDB_SOCKET: %d", sock));
113
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
114
(const char*)&on, sizeof(on)) == -1) {
115
DBUG_PRINT("error",("setsockopt() - %d - %s",
116
errno, strerror(errno)));
117
NDB_CLOSE_SOCKET(sock);
121
if (bind(sock, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) {
122
DBUG_PRINT("error",("bind() - %d - %s",
123
errno, strerror(errno)));
124
NDB_CLOSE_SOCKET(sock);
128
/* Get the port we bound to */
129
SOCKET_SIZE_TYPE sock_len = sizeof(servaddr);
130
if(getsockname(sock,(struct sockaddr*)&servaddr,&sock_len)<0) {
131
ndbout_c("An error occurred while trying to find out what"
132
" port we bound to. Error: %s",strerror(errno));
133
NDB_CLOSE_SOCKET(sock);
137
DBUG_PRINT("info",("bound to %u",ntohs(servaddr.sin_port)));
138
if (listen(sock, m_maxSessions > 32 ? 32 : m_maxSessions) == -1){
139
DBUG_PRINT("error",("listen() - %d - %s",
140
errno, strerror(errno)));
141
NDB_CLOSE_SOCKET(sock);
147
i.m_service = service;
148
m_services.push_back(i);
150
*port = ntohs(servaddr.sin_port);
156
SocketServer::doAccept(){
157
fd_set readSet, exceptionSet;
159
FD_ZERO(&exceptionSet);
163
for (unsigned i = 0; i < m_services.size(); i++){
164
const NDB_SOCKET_TYPE s = m_services[i].m_socket;
166
FD_SET(s, &exceptionSet);
167
maxSock = (maxSock > s ? maxSock : s);
169
struct timeval timeout;
173
if(select(maxSock + 1, &readSet, 0, &exceptionSet, &timeout) > 0){
174
for (unsigned i = 0; i < m_services.size(); i++){
175
ServiceInstance & si = m_services[i];
177
if(FD_ISSET(si.m_socket, &readSet)){
178
NDB_SOCKET_TYPE childSock = accept(si.m_socket, 0, 0);
179
if(childSock == NDB_INVALID_SOCKET){
184
s.m_service = si.m_service;
185
s.m_session = si.m_service->newSession(childSock);
188
m_session_mutex.lock();
189
m_sessions.push_back(s);
190
startSession(m_sessions.back());
191
m_session_mutex.unlock();
197
if(FD_ISSET(si.m_socket, &exceptionSet)){
198
DEBUG("socket in the exceptionSet");
208
socketServerThread_C(void* _ss){
209
SocketServer * ss = (SocketServer *)_ss;
215
SocketServer::startServer(){
217
if(m_thread == 0 && m_stopThread == false){
218
m_thread = NdbThread_Create(socketServerThread_C,
222
NDB_THREAD_PRIO_LOW);
224
m_threadLock.unlock();
228
SocketServer::stopServer(){
234
NdbThread_WaitFor(m_thread, &res);
235
NdbThread_Destroy(&m_thread);
238
m_threadLock.unlock();
242
SocketServer::doRun(){
244
while(!m_stopThread){
245
m_session_mutex.lock();
247
if(m_sessions.size() < m_maxSessions){
248
m_session_mutex.unlock();
251
m_session_mutex.unlock();
252
NdbSleep_MilliSleep(200);
258
SocketServer::startSession(SessionInstance & si){
259
si.m_thread = NdbThread_Create(sessionThread_C,
260
(void**)si.m_session,
263
NDB_THREAD_PRIO_LOW);
267
SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data)
269
m_session_mutex.lock();
270
for(int i = m_sessions.size() - 1; i >= 0; i--){
271
(*func)(m_sessions[i].m_session, data);
273
m_session_mutex.unlock();
277
SocketServer::checkSessions()
279
m_session_mutex.lock();
281
m_session_mutex.unlock();
285
SocketServer::checkSessionsImpl()
287
for(int i = m_sessions.size() - 1; i >= 0; i--)
289
if(m_sessions[i].m_session->m_stopped)
291
if(m_sessions[i].m_thread != 0)
294
NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
295
NdbThread_Destroy(&m_sessions[i].m_thread);
297
m_sessions[i].m_session->stopSession();
298
delete m_sessions[i].m_session;
305
SocketServer::stopSessions(bool wait){
307
m_session_mutex.lock();
308
for(i = m_sessions.size() - 1; i>=0; i--)
310
m_sessions[i].m_session->stopSession();
311
m_sessions[i].m_session->m_stop = true; // to make sure
313
m_session_mutex.unlock();
315
for(i = m_services.size() - 1; i>=0; i--)
316
m_services[i].m_service->stopSessions();
319
m_session_mutex.lock();
320
while(m_sessions.size() > 0){
322
m_session_mutex.unlock();
323
NdbSleep_MilliSleep(100);
324
m_session_mutex.lock();
326
m_session_mutex.unlock();
330
/***** Session code ******/
334
sessionThread_C(void* _sc){
335
SocketServer::Session * si = (SocketServer::Session *)_sc;
338
* may have m_stopped set if we're transforming a mgm
339
* connection into a transporter connection.
344
si->m_stopped = false;
347
NDB_CLOSE_SOCKET(si->m_socket);
351
si->m_stopped = true;
355
template class MutexVector<SocketServer::ServiceInstance>;
356
template class Vector<SocketServer::SessionInstance>;