2
$Id: io.c 261 2006-05-29 21:27:31Z titer $
4
Copyright (c) 2006 Joshua Elsasser. All rights reserved.
6
Redistribution and use in source and binary forms, with or without
7
modification, are permitted provided that the following conditions
10
1. Redistributions of source code must retain the above copyright
11
notice, this list of conditions and the following disclaimer.
12
2. Redistributions in binary form must reproduce the above copyright
13
notice, this list of conditions and the following disclaimer in the
14
documentation and/or other materials provided with the distribution.
16
THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS "AS IS" AND
17
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26
POSSIBILITY OF SUCH DAMAGE.
29
#include <sys/types.h>
30
#include <sys/socket.h>
42
#define IO_BLOCKSIZE (1024)
49
iodatafunc_t received;
69
static struct iosource *
72
freeoutbuf(struct iooutbuf *buf);
74
io_prepare(GSource *source, gint *timeout_);
76
io_check(GSource *source);
78
io_dispatch(GSource *source, GSourceFunc callback, gpointer gdata);
80
io_finalize(GSource *source);
82
io_accept(struct iosource *io);
84
io_read(struct iosource *io);
86
io_write(struct iosource *io);
88
io_disconnect(struct iosource *io, int err);
90
static GSourceFuncs sourcefuncs = {
100
io_new(int fd, ioidfunc_t sent, iodatafunc_t received,
101
iofunc_t closed, void *cbdata) {
109
io->received = received;
113
io->infd.events = G_IO_IN | G_IO_HUP | G_IO_ERR;
114
io->infd.revents = 0;
116
io->outfd.events = G_IO_OUT | G_IO_ERR;
117
io->outfd.revents = 0;
119
g_source_add_poll((GSource*)io, &io->infd);
120
g_source_attach((GSource*)io, NULL);
126
io_new_listening(int fd, socklen_t len, ionewfunc_t accepted,
127
iofunc_t closed, void *cbdata) {
130
g_assert(NULL != accepted);
136
io->accepted = accepted;
140
io->infd.events = G_IO_IN | G_IO_ERR;
141
io->infd.revents = 0;
142
io->inbuf = g_new(char, len);
145
g_source_add_poll((GSource*)io, &io->infd);
146
g_source_attach((GSource*)io, NULL);
155
if(0 > (flags = fcntl(fd, F_GETFL)) ||
156
0 > fcntl(fd, F_SETFL, flags | O_NONBLOCK))
162
static struct iosource *
164
GSource *source = g_source_new(&sourcefuncs, sizeof(struct iosource));
165
struct iosource *io = (struct iosource*)source;
172
bzero(&io->infd, sizeof(io->infd));
174
bzero(&io->outfd, sizeof(io->outfd));
186
io_send(GSource *source, const char *data, unsigned int len) {
187
char *new = g_new(char, len);
189
memcpy(new, data, len);
191
return io_send_keepdata(source, new, len);
195
io_send_keepdata(GSource *source, char *data, unsigned int len) {
196
struct iosource *io = (struct iosource*)source;
197
struct iooutbuf *buf = g_new(struct iooutbuf, 1);
203
buf->id = io->lastid;
205
if(NULL != io->outbufs)
206
io->outbufs = g_list_append(io->outbufs, buf);
208
io->outbufs = g_list_append(io->outbufs, buf);
209
g_source_add_poll(source, &io->outfd);
216
freeoutbuf(struct iooutbuf *buf) {
217
if(NULL != buf->data)
223
io_prepare(GSource *source SHUTUP, gint *timeout_) {
229
io_check(GSource *source) {
230
struct iosource *io = (struct iosource*)source;
234
if(NULL != io->outbufs && io->outfd.revents)
241
io_dispatch(GSource *source, GSourceFunc callback SHUTUP,
242
gpointer gdata SHUTUP) {
243
struct iosource *io = (struct iosource*)source;
245
if(io->infd.revents & (G_IO_ERR | G_IO_HUP) ||
246
io->outfd.revents & G_IO_ERR)
247
io_disconnect(io, 0 /* XXX how do I get errors here? */ );
248
else if(io->infd.revents & G_IO_IN)
249
(NULL == io->accepted ? io_read : io_accept)(io);
250
else if(io->outfd.revents & G_IO_OUT)
260
io_finalize(GSource *source SHUTUP) {
261
struct iosource *io = (struct iosource*)source;
263
if(NULL != io->outbufs) {
264
g_list_foreach(io->outbufs, (GFunc)freeoutbuf, NULL);
265
g_list_free(io->outbufs);
268
if(NULL != io->inbuf)
273
io_biggify(char **buf, unsigned int used, unsigned int *max) {
274
if(used + IO_BLOCKSIZE > *max) {
275
*max += IO_BLOCKSIZE;
276
*buf = g_renew(char, *buf, *max);
281
io_accept(struct iosource *io) {
285
if(0 > (fd = accept(io->infd.fd, (struct sockaddr*)io->inbuf, &len))) {
286
if(EAGAIN == errno || ECONNABORTED == errno || EWOULDBLOCK == errno)
288
io_disconnect(io, errno);
291
io->accepted((GSource*)io, fd, (struct sockaddr*)io->inbuf, len, io->cbdata);
295
io_read(struct iosource *io) {
297
gboolean newdata = FALSE;
301
g_source_ref((GSource*)io);
304
if(!newdata && 0 < res)
307
io_biggify(&io->inbuf, io->inused, &io->inmax);
309
res = read(io->infd.fd, io->inbuf + io->inused, io->inmax - io->inused);
314
if(NULL == io->received)
317
used = io->received((GSource*)io, io->inbuf, io->inused, io->cbdata);
318
if(used > io->inused)
321
if(used < io->inused)
322
memmove(io->inbuf, io->inbuf + used, io->inused - used);
327
if(0 != err && EAGAIN != err)
328
io_disconnect(io, err);
330
io_disconnect(io, 0);
331
g_source_unref((GSource*)io);
335
io_write(struct iosource *io) {
336
struct iooutbuf *buf;
340
g_source_ref((GSource*)io);
342
while(NULL != io->outbufs && 0 == err) {
343
buf = io->outbufs->data;
344
while(buf->off < buf->len && 0 < res) {
346
res = write(io->outfd.fd, buf->data + buf->off, buf->len - buf->off);
353
if(buf->off >= buf->len) {
354
io->outbufs = g_list_remove(io->outbufs, buf);
355
if(NULL == io->outbufs)
356
g_source_remove_poll((GSource*)io, &io->outfd);
358
io->sent((GSource*)io, buf->id, io->cbdata);
363
if(0 != err && EAGAIN != err)
364
io_disconnect(io, err);
366
g_source_unref((GSource*)io);
370
io_disconnect(struct iosource *io, int err) {
371
if(NULL != io->closed) {
373
io->closed((GSource*)io, io->cbdata);
376
if(NULL != io->outbufs)
377
g_source_remove_poll((GSource*)io, &io->outfd);
379
g_source_remove_poll((GSource*)io, &io->infd);
380
g_source_remove(g_source_get_id((GSource*)io));
381
g_source_unref((GSource*)io);