1
/* Copyright (c) 2007 SNAP Innovation GmbH
3
* BLOB Streaming for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
33
#include "BSNetwork.h"
34
#include "BSConnectionHandler.h"
37
BSSystemThread *BSNetwork::gSystemThread;
38
time_t BSNetwork::gCurrentTime;
39
time_t BSNetwork::gLastService;
40
CSThreadList *BSNetwork::gHandlerList;
41
CSSync BSNetwork::gListenerLock;
42
CSSocket *BSNetwork::gListenerSocket;
43
BSConnectionHandler *BSNetwork::gListenerThread;
44
u_int BSNetwork::gWaitingToListen;
45
int BSNetwork::handlerCount;
48
* -------------------------------------------------------------------------
52
bool BSSystemThread::doWork()
57
BSNetwork::gCurrentTime = time(NULL);
58
if ((BSNetwork::gCurrentTime - BSNetwork::gLastService) >= (BS_IDLE_THREAD_TIMEOUT/2)) {
59
BSNetwork::gLastService = BSNetwork::gCurrentTime;
60
while (!myMustQuit && killed) {
61
killed = BSNetwork::killListener();
62
BSNetwork::gCurrentTime = time(NULL);
69
* -------------------------------------------------------------------------
73
void BSNetwork::startUp(int port)
76
gCurrentTime = time(NULL);
77
gLastService = gCurrentTime;
78
gListenerSocket = NULL;
81
CSL.log(self, CSLog::Protocol, "BLOB Streaming engine ");
82
CSL.log(self, CSLog::Protocol, bs_version());
83
CSL.log(self, CSLog::Protocol, " listening on port ");
84
CSL.log(self, CSLog::Protocol, port);
85
CSL.log(self, CSLog::Protocol, "\n");
88
new_(gHandlerList, CSThreadList());
89
gListenerSocket = CSSocket::newSocket();
90
gListenerSocket->publish(NULL, port);
92
new_(gSystemThread, BSSystemThread(1000 /* 1 sec */, NULL));
93
gSystemThread->start();
97
void BSNetwork::shutDown()
102
gSystemThread->stop();
103
gSystemThread->release();
104
gSystemThread = NULL;
107
/* This will set all threads to quiting: */
109
gHandlerList->quitAllThreads();
111
/* Close the socket: */
112
lock_(&gListenerLock);
113
if (gListenerSocket) {
115
gListenerSocket->release();
118
self->logException();
122
gListenerSocket = NULL;
123
unlock_(&gListenerLock);
127
/* This will stop any threads remaining: */
128
gHandlerList->release();
131
self->logException();
136
CSL.log(self, CSLog::Protocol, "BLOB Streaming engine shutdown\n");
140
void BSNetwork::startConnectionHandler()
143
BSConnectionHandler *thread;
147
sprintf(buffer, "NetworkHandler%d", handlerCount);
149
thread = BSConnectionHandler::newHandler(BSNetwork::gHandlerList);
150
unlock_(gHandlerList);
152
thread->threadName = CSString::newString(buffer);
159
* Return NULL of a connection cannot be openned, and the
162
CSSocket *BSNetwork::openConnection(BSConnectionHandler *handler)
167
sock = CSSocket::newSocket();
170
/* Wait for a connection: */
171
if (!lockListenerSocket(handler)) {
177
sock->open(BSNetwork::gListenerSocket);
180
unlockListenerSocket();
185
handler->lastUse = gCurrentTime;
187
unlockListenerSocket();
193
void BSNetwork::startNetwork()
196
startConnectionHandler();
200
bool BSNetwork::lockListenerSocket(BSConnectionHandler *handler)
202
bool socket_locked = false;
205
if (handler->myMustQuit)
207
lock_(&gListenerLock);
208
if (gListenerSocket) {
209
/* Wait for the listen socket to be freed: */
210
if (gListenerThread) {
212
handler->amWaitingToListen = true;
213
while (gListenerThread) {
214
if (handler->myMustQuit)
217
gListenerLock.wait(2000);
220
/* Catch any error */;
225
handler->amWaitingToListen = false;
227
if (!handler->myMustQuit) {
228
gListenerThread = handler;
229
socket_locked = true;
232
unlock_(&gListenerLock);
233
return_(socket_locked);
236
void BSNetwork::unlockListenerSocket()
239
lock_(&gListenerLock);
240
gListenerThread = NULL;
241
gListenerLock.wakeup();
242
unlock_(&gListenerLock);
246
/* Kill a listener if possible!
247
* Return true if a thread was killed.
249
bool BSNetwork::killListener()
251
BSConnectionHandler *ptr = NULL;
254
lock_(&gListenerLock);
255
if (gListenerThread && gWaitingToListen > 0) {
258
ptr = (BSConnectionHandler *) gHandlerList->getBack();
260
if (ptr->amWaitingToListen) {
261
if (gCurrentTime > ptr->lastUse && (gCurrentTime - ptr->lastUse) > BS_IDLE_THREAD_TIMEOUT) {
262
ptr->myMustQuit = true;
267
ptr = (BSConnectionHandler *) ptr->getNextLink();
269
unlock_(gHandlerList);
271
unlock_(&gListenerLock);