1
/* Copyright (C) 2000-2004 Thomas Bopp, Thorsten Hampel, Ludger Merkens
3
* This program is free software; you can redistribute it and/or modify
4
* it under the terms of the GNU General Public License as published by
5
* the Free Software Foundation; either version 2 of the License, or
6
* (at your option) any later version.
8
* This program is distributed in the hope that it will be useful,
9
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
* GNU General Public License for more details.
13
* You should have received a copy of the GNU General Public License
14
* along with this program; if not, write to the Free Software
15
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
* $Id: socket.pike,v 1.1.1.1 2005/02/23 14:47:21 cvs Exp $
20
constant cvs_version="$Id: socket.pike,v 1.1.1.1 2005/02/23 14:47:21 cvs Exp $";
22
inherit Stdio.File : socket;
29
private static string __buffer;
30
private static int __len;
31
private static function fWriteFunction;
32
private static function fFinishFunction;
35
private static Thread.Condition write_cond = Thread.Condition();
36
private static Thread.Mutex read_mutex = Thread.Mutex();
37
private static object write_lock;
38
private static Thread.Queue msgQueue = Thread.Queue();
39
private static Thread.Queue closeQueue = Thread.Queue();
42
//#define SOCKET_DEBUG
45
#define DEBUG(s) werror("["+__id+"] "+s+"\n")
50
#define ISCLOSED (closeQueue->size() > 0)
54
Thread.Queue threads = Thread.Queue();
67
DEBUG("DISCONNECT()\n");
72
DEBUG("While disconnecting socket:\n"+sprintf("%O",err));
73
if ( objectp(read_mutex) )
78
* send a message to the client
80
* @param str - the message to send
84
static void send_message(string str)
86
DEBUG("send_message("+strlen(str)+" bytes...)");
91
* this function is called when there is free space for writting
93
* @author Thomas Bopp (astra@upb.de)
95
* @see register_send_function
98
static void write_callback(string __buffer)
104
DEBUG("write_callback("+strlen(__buffer)+
105
", closed?"+(ISCLOSED?"true":"false"));
108
if ( ISCLOSED ) return;
110
len = strlen(__buffer);
112
if ( functionp(fWriteFunction) ) {
113
w = fWriteFunction(__len);
125
while ( strlen(__buffer) > 0 ) {
127
written = write(__buffer);
128
DEBUG("written " + written + " bytes...");
132
DEBUG("error while writting:\n"+sprintf("%O\n",err));
139
if ( written < strlen(__buffer ) ) {
141
__buffer = __buffer[written..];
142
else if ( written == 0 )
151
* this function is called when data is received on the socket
153
* @param id - data for the socket
154
* @param data - the arriving data
155
* @author Thomas Bopp
156
* @see write_callback
158
static void read_callback(mixed id, string data)
162
static void was_closed()
164
closeQueue->write("");
168
* The read thread reads data.
172
* @author Thomas Bopp (astra@upb.de)
175
final static void tread_data()
179
if ( ISCLOSED ) return;
182
str = socket::read(SOCKET_READ_SIZE, 1);
184
DEBUG("Returning from read...\n");
186
if ( err != 0 || !stringp(str) || strlen(str) == 0 ) {
187
DEBUG("Socket was closed while in read...");
191
DEBUG("Reading " + strlen(str) + " bytes...\n");
192
read_callback(0, str);
197
* close the connection to the client
199
* @author Thomas Bopp
204
DEBUG("close_connection()");
205
closeQueue->write("");
209
socket::set_read_callback(0);
214
* create the object (the constructor)
216
* @param f - the portobject
217
* @author Thomas Bopp
219
static void create(object f)
224
socket::set_blocking();
225
socket::set_buffer(65536*10, "w");
226
socket::set_buffer(65536*10, "r");
227
thread_create(read_thread);
231
* Get the ip of this socket.
233
* @return the ip number 127.0.0.0
234
* @author <a href="mailto:astra@upb.de">Thomas Bopp</a>)
242
addr = query_address();
245
addr = "no connected";
246
LOG("query_adress() returns " + addr);
249
sscanf(addr, "%s %*d", ip);
255
* register a callback function for writting data to the socket
256
* if there is already a function defined there will be a runtime error
258
* @param f - the callback function
259
* @author Thomas Bopp (astra@upb.de)
260
* @see write_callback
262
static void register_send_function(function f, function e)
264
ASSERTINFO(!functionp(fWriteFunction),
265
"Allready defined writer function !");
281
* The main function for the reader thread. Calls tread_data() repeatedly.
283
* @author Thomas Bopp (astra@upb.de)
286
final static void read_thread()
288
DEBUG("read_thread() created, now creating write_thread()...");
290
threads->write(this_thread());
292
thread_create(write_thread);
293
while ( !ISCLOSED ) {
294
DEBUG("!ISCLOSED and reading data....");
297
DEBUG("Read Thread Ended....");
298
closeQueue->write("");
299
DEBUG("Read Thread Ended....: CloseQueue="+closeQueue->size());
301
msgQueue->write("end");
306
* The main function for the writer thread. Calls the write_callback()
307
* repeatedly and waits for signals.
311
* @author Thomas Bopp (astra@upb.de)
318
DEBUG("write_thread() created ...");
320
threads->write(this_thread());
321
while ( !ISCLOSED && (msg = msgQueue->read()) ) {
324
DEBUG("disconnecting socket...: CloseQueue="+closeQueue->size());
328
closeQueue->write("");
334
return "Socket("+__id+", closed="+closeQueue->size()+","+get_ip()+")";
337
void set_id(int i) { __id = i; }
338
int get_id() { return __id; }
339
bool is_closed() { return closeQueue->size() >= 2; }
340
int is_closed_num() { return closeQueue->size(); }
341
string get_identifier() { return "socket"; }