~ubuntu-branches/ubuntu/natty/steam/natty

« back to all changes in this revision

Viewing changes to server/kernel/socket.pike

  • Committer: Bazaar Package Importer
  • Author(s): Alain Schroeder
  • Date: 2005-05-14 16:33:35 UTC
  • Revision ID: james.westby@ubuntu.com-20050514163335-5v7lbxibmlww15dx
Tags: upstream-1.6.3
ImportĀ upstreamĀ versionĀ 1.6.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2004  Thomas Bopp, Thorsten Hampel, Ludger Merkens
 
2
 *
 
3
 *  This program is free software; you can redistribute it and/or modify
 
4
 *  it under the terms of the GNU General Public License as published by
 
5
 *  the Free Software Foundation; either version 2 of the License, or
 
6
 *  (at your option) any later version.
 
7
 *
 
8
 *  This program is distributed in the hope that it will be useful,
 
9
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
 *  GNU General Public License for more details.
 
12
 *
 
13
 *  You should have received a copy of the GNU General Public License
 
14
 *  along with this program; if not, write to the Free Software
 
15
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
16
 * 
 
17
 * $Id: socket.pike,v 1.1.1.1 2005/02/23 14:47:21 cvs Exp $
 
18
 */
 
19
 
 
20
constant cvs_version="$Id: socket.pike,v 1.1.1.1 2005/02/23 14:47:21 cvs Exp $";
 
21
 
 
22
inherit Stdio.File : socket;
 
23
 
 
24
#include <classes.h>
 
25
#include <macros.h>
 
26
#include <assert.h>
 
27
#include <config.h>
 
28
 
 
29
private static string          __buffer;
 
30
private static int                __len;
 
31
private static function  fWriteFunction;
 
32
private static function fFinishFunction;
 
33
 
 
34
#ifdef THREAD_READ
 
35
private static Thread.Condition write_cond = Thread.Condition();
 
36
private static Thread.Mutex         read_mutex = Thread.Mutex();
 
37
private static object write_lock;
 
38
private static Thread.Queue msgQueue = Thread.Queue();
 
39
private static Thread.Queue closeQueue = Thread.Queue();
 
40
#endif
 
41
 
 
42
//#define SOCKET_DEBUG
 
43
 
 
44
#ifdef SOCKET_DEBUG
 
45
#define DEBUG(s) werror("["+__id+"] "+s+"\n")
 
46
#else
 
47
#define DEBUG(s)
 
48
#endif
 
49
 
 
50
#define ISCLOSED (closeQueue->size() > 0)
 
51
 
 
52
int mythreads = 0;
 
53
int __id;
 
54
Thread.Queue threads = Thread.Queue();
 
55
 
 
56
/**
 
57
 *
 
58
 *  
 
59
 * @param 
 
60
 * @return 
 
61
 * @author Thomas Bopp 
 
62
 * @see 
 
63
 */
 
64
static void 
 
65
disconnect()
 
66
{
 
67
    DEBUG("DISCONNECT()\n");
 
68
    mixed err = catch {
 
69
        ::close();
 
70
    };
 
71
    if ( err != 0 )
 
72
        DEBUG("While disconnecting socket:\n"+sprintf("%O",err));
 
73
    if ( objectp(read_mutex) )
 
74
        destruct(read_mutex);
 
75
}
 
76
 
 
77
/**
 
78
 * send a message to the client
 
79
 *  
 
80
 * @param str - the message to send
 
81
 * @author Thomas Bopp 
 
82
 * @see write_callback
 
83
 */
 
84
static void send_message(string str) 
 
85
{
 
86
    DEBUG("send_message("+strlen(str)+" bytes...)");
 
87
    msgQueue->write(str);
 
88
}
 
89
 
 
90
/**
 
91
 * this function is called when there is free space for writting
 
92
 *  
 
93
 * @author Thomas Bopp (astra@upb.de) 
 
94
 * @see read_callback
 
95
 * @see register_send_function
 
96
 * @see send_message
 
97
 */
 
98
static void write_callback(string __buffer)
 
99
{
 
100
    int written;
 
101
    mixed     w;
 
102
    int     len;
 
103
    
 
104
    DEBUG("write_callback("+strlen(__buffer)+
 
105
          ", closed?"+(ISCLOSED?"true":"false"));
 
106
 
 
107
 
 
108
    if ( ISCLOSED ) return;
 
109
 
 
110
    len = strlen(__buffer);
 
111
 
 
112
    if ( functionp(fWriteFunction) ) {
 
113
        w = fWriteFunction(__len);
 
114
       
 
115
        if ( !stringp(w) ) {
 
116
            fFinishFunction();
 
117
            fWriteFunction  = 0;
 
118
            fFinishFunction = 0;
 
119
        }
 
120
        else {
 
121
            __len += strlen(w);
 
122
            msgQueue->write(w);
 
123
        }
 
124
    }
 
125
    while ( strlen(__buffer) > 0 ) {
 
126
        mixed err = catch {
 
127
            written = write(__buffer);
 
128
            DEBUG("written " + written + " bytes...");
 
129
        };
 
130
        if ( err != 0 ) {
 
131
            __buffer = "";
 
132
            DEBUG("error while writting:\n"+sprintf("%O\n",err));
 
133
            return;
 
134
        }
 
135
        
 
136
        if ( written < 0 ) {
 
137
            return;
 
138
        }
 
139
        if ( written < strlen(__buffer ) ) {
 
140
            if ( written > 0 )
 
141
                __buffer = __buffer[written..];
 
142
            else if ( written == 0 ) 
 
143
                sleep(0.1);
 
144
        }
 
145
        else
 
146
            return;
 
147
    }
 
148
}
 
149
 
 
150
/**
 
151
 * this function is called when data is received on the socket
 
152
 *  
 
153
 * @param id - data for the socket
 
154
 * @param data - the arriving data
 
155
 * @author Thomas Bopp 
 
156
 * @see write_callback
 
157
 */
 
158
static void read_callback(mixed id, string data) 
 
159
{
 
160
}
 
161
 
 
162
static void was_closed()
 
163
{
 
164
    closeQueue->write("");
 
165
}
 
166
 
 
167
/**
 
168
 * The read thread reads data.
 
169
 *  
 
170
 * @param 
 
171
 * @return 
 
172
 * @author Thomas Bopp (astra@upb.de) 
 
173
 * @see 
 
174
 */
 
175
final static void tread_data()
 
176
{
 
177
    string str;
 
178
 
 
179
    if ( ISCLOSED ) return;
 
180
    
 
181
    mixed err = catch {
 
182
        str = socket::read(SOCKET_READ_SIZE, 1);
 
183
    };
 
184
    DEBUG("Returning from read...\n");
 
185
    
 
186
    if ( err != 0 || !stringp(str) || strlen(str) == 0 ) {
 
187
      DEBUG("Socket was closed while in read...");
 
188
      was_closed();
 
189
    }
 
190
    else {
 
191
        DEBUG("Reading " + strlen(str) + " bytes...\n");
 
192
        read_callback(0, str);
 
193
    }
 
194
}
 
195
 
 
196
/**
 
197
 * close the connection to the client
 
198
 *  
 
199
 * @author Thomas Bopp 
 
200
 */
 
201
void
 
202
close_connection()
 
203
{
 
204
    DEBUG("close_connection()");
 
205
    closeQueue->write("");
 
206
    msgQueue->write("");
 
207
 
 
208
    mixed err = catch {
 
209
        socket::set_read_callback(0);
 
210
    };
 
211
}
 
212
 
 
213
/**
 
214
 * create the object (the constructor)
 
215
 *  
 
216
 * @param f - the portobject
 
217
 * @author Thomas Bopp 
 
218
 */
 
219
static void create(object f) 
 
220
{
 
221
    socket::assign(f);
 
222
    __buffer = "";
 
223
    fWriteFunction = 0;
 
224
    socket::set_blocking();
 
225
    socket::set_buffer(65536*10, "w");
 
226
    socket::set_buffer(65536*10, "r");
 
227
    thread_create(read_thread);
 
228
}
 
229
 
 
230
/**
 
231
 * Get the ip of this socket.
 
232
 *  
 
233
 * @return the ip number 127.0.0.0
 
234
 * @author <a href="mailto:astra@upb.de">Thomas Bopp</a>) 
 
235
 */
 
236
string|int get_ip()
 
237
{
 
238
  mixed   err;
 
239
  string addr;
 
240
 
 
241
  err = catch {
 
242
    addr = query_address();
 
243
  };
 
244
  if ( err != 0 )
 
245
    addr = "no connected";
 
246
  LOG("query_adress() returns " + addr);
 
247
  string ip = 0;
 
248
  if ( stringp(addr) )
 
249
    sscanf(addr, "%s %*d", ip);
 
250
  return ip;
 
251
}
 
252
 
 
253
 
 
254
/**
 
255
 * register a callback function for writting data to the socket
 
256
 * if there is already a function defined there will be a runtime error
 
257
 *  
 
258
 * @param f - the callback function
 
259
 * @author Thomas Bopp (astra@upb.de) 
 
260
 * @see write_callback
 
261
 */
 
262
static void register_send_function(function f, function e)
 
263
{
 
264
    ASSERTINFO(!functionp(fWriteFunction), 
 
265
               "Allready defined writer function !");
 
266
 
 
267
    string w = f(0);
 
268
    if ( !stringp(w) )
 
269
        return;
 
270
    
 
271
    fWriteFunction  = f;
 
272
    fFinishFunction = e;
 
273
    __len = strlen(w);
 
274
    msgQueue->write(w);
 
275
}
 
276
 
 
277
 
 
278
#ifdef THREAD_READ
 
279
 
 
280
/**
 
281
 * The main function for the reader thread. Calls tread_data() repeatedly.
 
282
 *  
 
283
 * @author Thomas Bopp (astra@upb.de) 
 
284
 * @see tread_data
 
285
 */
 
286
final static void read_thread()
 
287
{
 
288
    DEBUG("read_thread() created, now creating write_thread()...");
 
289
    mythreads++;
 
290
    threads->write(this_thread());
 
291
 
 
292
    thread_create(write_thread);
 
293
    while ( !ISCLOSED ) {
 
294
        DEBUG("!ISCLOSED and reading data....");
 
295
        tread_data();
 
296
    }
 
297
    DEBUG("Read Thread Ended....");
 
298
    closeQueue->write("");
 
299
    DEBUG("Read Thread Ended....: CloseQueue="+closeQueue->size());
 
300
    mythreads--;
 
301
    msgQueue->write("end");
 
302
}
 
303
 
 
304
 
 
305
/**
 
306
 * The main function for the writer thread. Calls the write_callback() 
 
307
 * repeatedly and waits for signals.
 
308
 *  
 
309
 * @param 
 
310
 * @return 
 
311
 * @author Thomas Bopp (astra@upb.de) 
 
312
 * @see 
 
313
 */
 
314
final static void
 
315
write_thread()
 
316
{
 
317
    string msg;
 
318
    DEBUG("write_thread() created ...");
 
319
    mythreads++;
 
320
    threads->write(this_thread());
 
321
    while ( !ISCLOSED && (msg = msgQueue->read()) ) {
 
322
        write_callback(msg);
 
323
    }
 
324
    DEBUG("disconnecting socket...: CloseQueue="+closeQueue->size());
 
325
    
 
326
    disconnect();
 
327
    mythreads--;
 
328
    closeQueue->write("");
 
329
}
 
330
#endif
 
331
 
 
332
string describe()
 
333
{
 
334
  return "Socket("+__id+", closed="+closeQueue->size()+","+get_ip()+")";
 
335
}
 
336
 
 
337
void set_id(int i) { __id = i; }
 
338
int get_id() { return __id; }
 
339
bool is_closed() { return closeQueue->size() >= 2; }
 
340
int is_closed_num() { return closeQueue->size(); }
 
341
string get_identifier() { return "socket"; }
 
342
 
 
343
 
 
344
 
 
345
 
 
346