2
* Copyright (C) 2003-2005 Tommi Maekitalo
4
* This program is free software; you can redistribute it and/or
5
* modify it under the terms of the GNU General Public License as
6
* published by the Free Software Foundation; either version 2 of the
7
* License, or (at your option) any later version.
9
* This program is distributed in the hope that it will be useful, but
10
* is provided AS IS, WITHOUT ANY WARRANTY; without even the implied
11
* warranty of MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, and
12
* NON-INFRINGEMENT. See the GNU General Public License for more details.
14
* You should have received a copy of the GNU General Public License
15
* along with this program; if not, write to the Free Software
16
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
27
#include <cxxtools/thread.h>
28
#include <cxxtools/tcpstream.h>
29
#include <tnt/httprequest.h>
30
#include <tnt/httpparser.h>
31
#include <tnt/pointer.h>
36
// in tntnet (mainthread):
42
Jobqueue::JobPtr j = new Tcpjob();
43
j->accept(poller.get());
48
// in server (workerthread):
53
Jobqueue::JobPtr j = queue.get();
54
std::iostream& socket = j->getStream();
55
processRequest(socket);
62
/** Job - one per request */
65
unsigned keepAliveCounter;
68
HttpMessage::Parser parser;
69
time_t lastAccessTime;
73
static unsigned socket_read_timeout;
74
static unsigned socket_write_timeout;
75
static unsigned keepalive_max;
76
static unsigned socket_buffer_size;
80
: keepAliveCounter(keepalive_max),
90
unsigned addRef() { return ++refs; }
102
virtual std::iostream& getStream() = 0;
103
virtual int getFd() const = 0;
104
virtual void setRead() = 0;
105
virtual void setWrite() = 0;
107
HttpRequest& getRequest() { return request; }
108
HttpMessage::Parser& getParser() { return parser; }
110
unsigned decrementKeepAliveCounter()
111
{ return keepAliveCounter > 0 ? --keepAliveCounter : 0; }
113
void touch() { time(&lastAccessTime); }
114
int msecToTimeout(time_t currentTime) const;
116
static void setSocketReadTimeout(unsigned ms) { socket_read_timeout = ms; }
117
static void setSocketWriteTimeout(unsigned ms) { socket_write_timeout = ms; }
118
static void setKeepAliveMax(unsigned n) { keepalive_max = n; }
119
static void setSocketBufferSize(unsigned b) { socket_buffer_size = b; }
121
static unsigned getSocketReadTimeout() { return socket_read_timeout; }
122
static unsigned getSocketWriteTimeout() { return socket_write_timeout; }
123
static unsigned getKeepAliveTimeout();
124
static unsigned getKeepAliveMax() { return keepalive_max; }
125
static unsigned getSocketBufferSize() { return socket_buffer_size; }
128
class Tcpjob : public Job
130
cxxtools::net::iostream socket;
134
: socket(getSocketBufferSize(), getSocketReadTimeout())
137
void accept(const cxxtools::net::Server& listener);
139
std::iostream& getStream();
146
class SslTcpjob : public Job
152
: socket(getSocketBufferSize(), getSocketReadTimeout())
155
void accept(const SslServer& listener);
157
std::iostream& getStream();
164
/** Jobqueue - one per process */
168
typedef Pointer<Job> JobPtr;
170
cxxtools::Condition noWaitThreads;
173
std::deque<JobPtr> jobs;
174
cxxtools::Mutex mutex;
175
cxxtools::Condition notEmpty;
176
cxxtools::Condition notFull;
177
unsigned waitThreads;
181
explicit Jobqueue(unsigned capacity_)
189
void setCapacity(unsigned c)
191
unsigned getCapacity() const
193
unsigned getWaitThreadCount() const
194
{ return waitThreads; }
196
{ return jobs.empty(); }