~pbms-core/pbms/async_read

« back to all changes in this revision

Viewing changes to mybs/src/BSNetwork.cc

  • Committer: paul-mccullagh
  • Date: 2008-03-26 11:35:17 UTC
  • Revision ID: paul-mccullagh-afb1610c21464a577ae428d72fc725eb986c05a5
Initial import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2007 SNAP Innovation GmbH
 
2
 *
 
3
 * BLOB Streaming for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Paul McCullagh
 
20
 *
 
21
 * 2007-05-25
 
22
 *
 
23
 * H&G2JCtL
 
24
 *
 
25
 * Network interface.
 
26
 *
 
27
 */
 
28
 
 
29
#include "CSConfig.h"
 
30
#include "CSGlobal.h"
 
31
#include "CSLog.h"
 
32
 
 
33
#include "BSNetwork.h"
 
34
#include "BSConnectionHandler.h"
 
35
#include "BSUtil.h"
 
36
 
 
37
BSSystemThread          *BSNetwork::gSystemThread;
 
38
time_t                          BSNetwork::gCurrentTime;
 
39
time_t                          BSNetwork::gLastService;
 
40
CSThreadList            *BSNetwork::gHandlerList;
 
41
CSSync                          BSNetwork::gListenerLock;
 
42
CSSocket                        *BSNetwork::gListenerSocket;
 
43
BSConnectionHandler     *BSNetwork::gListenerThread;
 
44
u_int                           BSNetwork::gWaitingToListen;
 
45
int                                     BSNetwork::handlerCount;
 
46
 
 
47
/*
 
48
 * -------------------------------------------------------------------------
 
49
 * SYSTEM THREAD
 
50
 */
 
51
 
 
52
bool BSSystemThread::doWork()
 
53
{
 
54
        bool    killed = true;
 
55
 
 
56
        enter_();
 
57
        BSNetwork::gCurrentTime = time(NULL);
 
58
        if ((BSNetwork::gCurrentTime - BSNetwork::gLastService) >= (BS_IDLE_THREAD_TIMEOUT/2)) {
 
59
                BSNetwork::gLastService = BSNetwork::gCurrentTime;
 
60
                while (!myMustQuit && killed) {
 
61
                        killed = BSNetwork::killListener();
 
62
                        BSNetwork::gCurrentTime = time(NULL);
 
63
                }
 
64
        }
 
65
        return_(true);
 
66
}
 
67
 
 
68
/*
 
69
 * -------------------------------------------------------------------------
 
70
 * NETWORK FUNCTIONS
 
71
 */
 
72
 
 
73
void BSNetwork::startUp(int port)
 
74
{
 
75
        enter_();
 
76
        gCurrentTime = time(NULL);
 
77
        gLastService = gCurrentTime;
 
78
        gListenerSocket = NULL;
 
79
        handlerCount = 0;
 
80
        CSL.lock();
 
81
        CSL.log(self, CSLog::Protocol, "BLOB Streaming engine ");
 
82
        CSL.log(self, CSLog::Protocol, bs_version());
 
83
        CSL.log(self, CSLog::Protocol, " listening on port ");
 
84
        CSL.log(self, CSLog::Protocol, port);
 
85
        CSL.log(self, CSLog::Protocol, "\n");
 
86
        CSL.unlock();
 
87
 
 
88
        new_(gHandlerList, CSThreadList());
 
89
        gListenerSocket = CSSocket::newSocket();
 
90
        gListenerSocket->publish(NULL, port);
 
91
 
 
92
        new_(gSystemThread, BSSystemThread(1000 /* 1 sec */, NULL));
 
93
        gSystemThread->start();
 
94
        exit_();
 
95
}
 
96
 
 
97
void BSNetwork::shutDown()
 
98
{
 
99
        enter_();
 
100
 
 
101
        if (gSystemThread) {
 
102
                gSystemThread->stop();
 
103
                gSystemThread->release();
 
104
                gSystemThread = NULL;
 
105
        }
 
106
 
 
107
        /* This will set all threads to quiting: */
 
108
        if (gHandlerList)
 
109
                gHandlerList->quitAllThreads();
 
110
 
 
111
        /* Close the socket: */
 
112
        lock_(&gListenerLock);
 
113
        if (gListenerSocket) {
 
114
                try_(a) {
 
115
                        gListenerSocket->release();
 
116
                }
 
117
                catch_(a) {
 
118
                        self->logException();
 
119
                }
 
120
                cont_(a);
 
121
        }
 
122
        gListenerSocket = NULL;
 
123
        unlock_(&gListenerLock);
 
124
 
 
125
        if (gHandlerList) {
 
126
                try_(b) {
 
127
                        /* This will stop any threads remaining: */
 
128
                        gHandlerList->release();
 
129
                }
 
130
                catch_(b) {
 
131
                        self->logException();
 
132
                }
 
133
                cont_(b);
 
134
        }
 
135
 
 
136
        CSL.log(self, CSLog::Protocol, "BLOB Streaming engine shutdown\n");
 
137
        exit_();
 
138
}
 
139
 
 
140
void BSNetwork::startConnectionHandler()
 
141
{
 
142
        char                            buffer[120];
 
143
        BSConnectionHandler     *thread;
 
144
 
 
145
        enter_();
 
146
        handlerCount++;
 
147
        sprintf(buffer, "NetworkHandler%d", handlerCount);
 
148
        lock_(gHandlerList);
 
149
        thread = BSConnectionHandler::newHandler(BSNetwork::gHandlerList);
 
150
        unlock_(gHandlerList);
 
151
        push_(thread);
 
152
        thread->threadName = CSString::newString(buffer);
 
153
        thread->start();
 
154
        release_(thread);
 
155
        exit_();
 
156
}
 
157
 
 
158
/*
 
159
 * Return NULL of a connection cannot be openned, and the
 
160
 * thread must quit.
 
161
 */
 
162
CSSocket *BSNetwork::openConnection(BSConnectionHandler *handler)
 
163
{
 
164
        CSSocket *sock;
 
165
 
 
166
        enter_();
 
167
        sock = CSSocket::newSocket();
 
168
        push_(sock);
 
169
 
 
170
        /* Wait for a connection: */
 
171
        if (!lockListenerSocket(handler)) {
 
172
                release_(sock);
 
173
                return_(NULL);
 
174
        }
 
175
 
 
176
        try_(a) {
 
177
                sock->open(BSNetwork::gListenerSocket);
 
178
        }
 
179
        catch_(a) {
 
180
                unlockListenerSocket();
 
181
                throw_();
 
182
        }
 
183
        cont_(a);
 
184
 
 
185
        handler->lastUse = gCurrentTime;
 
186
 
 
187
        unlockListenerSocket();
 
188
 
 
189
        pop_(sock);
 
190
        return_(sock);
 
191
}
 
192
 
 
193
void BSNetwork::startNetwork()
 
194
{
 
195
        enter_();
 
196
        startConnectionHandler();
 
197
        exit_();
 
198
}
 
199
 
 
200
bool BSNetwork::lockListenerSocket(BSConnectionHandler *handler)
 
201
{
 
202
        bool socket_locked = false;
 
203
 
 
204
        enter_();
 
205
        if (handler->myMustQuit)
 
206
                return false;
 
207
        lock_(&gListenerLock);
 
208
        if (gListenerSocket) {
 
209
                /* Wait for the listen socket to be freed: */
 
210
                if (gListenerThread) {
 
211
                        gWaitingToListen++;
 
212
                        handler->amWaitingToListen = true;
 
213
                        while (gListenerThread) {
 
214
                                if (handler->myMustQuit)
 
215
                                        break;
 
216
                                try_(a) {
 
217
                                        gListenerLock.wait(2000);
 
218
                                }
 
219
                                catch_(a) {
 
220
                                        /* Catch any error */;
 
221
                                }
 
222
                                cont_(a);
 
223
                        }
 
224
                        gWaitingToListen--;
 
225
                        handler->amWaitingToListen = false;
 
226
                }
 
227
                if (!handler->myMustQuit) {
 
228
                        gListenerThread = handler;
 
229
                        socket_locked = true;
 
230
                }
 
231
        }
 
232
        unlock_(&gListenerLock);
 
233
        return_(socket_locked);
 
234
}
 
235
 
 
236
void BSNetwork::unlockListenerSocket()
 
237
{
 
238
        enter_();
 
239
        lock_(&gListenerLock);
 
240
        gListenerThread = NULL;
 
241
        gListenerLock.wakeup();
 
242
        unlock_(&gListenerLock);
 
243
        exit_();
 
244
}
 
245
 
 
246
/* Kill a listener if possible!
 
247
 * Return true if a thread was killed.
 
248
 */
 
249
bool BSNetwork::killListener()
 
250
{
 
251
        BSConnectionHandler     *ptr = NULL;
 
252
 
 
253
        enter_();
 
254
        lock_(&gListenerLock);
 
255
        if (gListenerThread && gWaitingToListen > 0) {
 
256
                /* Kill one: */
 
257
                lock_(gHandlerList);
 
258
                ptr = (BSConnectionHandler *) gHandlerList->getBack();
 
259
                while (ptr) {
 
260
                        if (ptr->amWaitingToListen) {
 
261
                                if (gCurrentTime > ptr->lastUse && (gCurrentTime - ptr->lastUse) > BS_IDLE_THREAD_TIMEOUT) {
 
262
                                        ptr->myMustQuit = true;
 
263
                                        ptr->wakeup();
 
264
                                        break;
 
265
                                }
 
266
                        }
 
267
                        ptr = (BSConnectionHandler *) ptr->getNextLink();
 
268
                }
 
269
                unlock_(gHandlerList);
 
270
        }
 
271
        unlock_(&gListenerLock);
 
272
        if (ptr) {
 
273
                ptr->join();
 
274
                return_(true);
 
275
        }
 
276
        return_(false);
 
277
}
 
278
 
 
279