48
41
#define MSG_DONTWAIT 0x40
51
/* -------------------------
52
* we don't use a circular buffer.
54
static void dsi_init_buffer(DSI *dsi)
57
/* XXX config options */
58
dsi->maxsize = 6 * dsi->server_quantum;
60
dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
61
dsi->buffer = malloc(dsi->maxsize);
65
dsi->start = dsi->buffer;
66
dsi->eof = dsi->buffer;
67
dsi->end = dsi->buffer + dsi->maxsize;
71
/* ----------------------
72
afpd is sleeping too much while trying to send something.
73
May be there's no reader or the reader is also sleeping in write,
74
look if there's some data for us to read, hopefully it will wake up
75
the reader so we can write again.
45
* afpd is sleeping too much while trying to send something.
46
* May be there's no reader or the reader is also sleeping in write,
47
* look if there's some data for us to read, hopefully it will wake up
48
* the reader so we can write again.
50
* @returns 0 when is possible to send again, -1 on error
77
52
static int dsi_peek(DSI *dsi)
54
static int warned = 0;
79
55
fd_set readfds, writefds;
86
FD_SET( dsi->socket, &readfds);
87
FD_SET( dsi->socket, &writefds);
88
maxfd = dsi->socket +1;
60
LOG(log_debug, logtype_dsi, "dsi_peek");
62
maxfd = dsi->socket + 1;
91
FD_SET( dsi->socket, &readfds);
68
if (dsi->eof < dsi->end) {
69
/* space in read buffer */
70
FD_SET( dsi->socket, &readfds);
74
LOG(log_note, logtype_dsi, "dsi_peek: readahead buffer is full, possibly increase -dsireadbuf option");
75
LOG(log_note, logtype_dsi, "dsi_peek: dsireadbuf: %d, DSI quantum: %d, effective buffer size: %d",
77
dsi->server_quantum ? dsi->server_quantum : DSI_SERVQUANT_DEF,
78
dsi->end - dsi->buffer);
92
82
FD_SET( dsi->socket, &writefds);
94
84
/* No timeout: if there's nothing to read nor nothing to write,
98
88
/* we might have been interrupted by out timer, so restart select */
91
LOG(log_error, logtype_dsi, "dsi_peek: unexpected select return: %d %s",
92
ret, ret < 0 ? strerror(errno) : "");
96
if (FD_ISSET(dsi->socket, &writefds)) {
97
/* we can write again */
98
LOG(log_debug, logtype_dsi, "dsi_peek: can write again");
104
102
/* Check if there's sth to read, hopefully reading that will unblock the client */
105
103
if (FD_ISSET(dsi->socket, &readfds)) {
106
dsi_init_buffer(dsi);
107
len = dsi->end - dsi->eof;
104
len = dsi->end - dsi->eof; /* it's ensured above that there's space */
110
/* ouch, our buffer is full ! fall back to blocking IO
111
* could block and disconnect but it's better than a cpu hog */
106
if ((len = read(dsi->socket, dsi->eof, len)) <= 0) {
108
LOG(log_error, logtype_dsi, "dsi_peek: EOF");
111
LOG(log_error, logtype_dsi, "dsi_peek: read: %s", strerror(errno));
116
LOG(log_debug, logtype_dsi, "dsi_peek: read %d bytes", len);
115
len = read(dsi->socket, dsi->eof, len);
121
if (FD_ISSET(dsi->socket, &writefds))
122
/* we can write again at last */
142
138
LOG(log_maxdebug, logtype_dsi, "dsi_stream_write: sending %u bytes", length);
144
/* non blocking mode */
145
if (setnonblock(dsi->socket, 1) < 0) {
146
LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
150
140
while (written < length) {
151
141
len = send(dsi->socket, (u_int8_t *) data + written, length - written, flags);
189
LOG(log_maxdebug, logtype_dsi, "dsi_stream_read_file: sending %u bytes", length);
205
/* non blocking mode */
206
if (setnonblock(dsi->socket, 1) < 0) {
207
LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
211
194
while (written < length) {
212
195
len = sys_sendfile(dsi->socket, fromfd, &offset, length - written);
255
233
static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
237
if (dsi->buffer == NULL)
238
/* afpd master has no DSI buffering */
241
LOG(log_maxdebug, logtype_dsi, "from_buf: %u bytes", count);
260
nbe = dsi->eof - dsi->start;
263
nbe = min((size_t)nbe, count);
264
memcpy(buf, dsi->start, nbe);
267
if (dsi->eof == dsi->start)
268
dsi->start = dsi->eof = dsi->buffer;
243
nbe = dsi->eof - dsi->start;
246
nbe = min((size_t)nbe, count);
247
memcpy(buf, dsi->start, nbe);
250
if (dsi->eof == dsi->start)
251
dsi->start = dsi->eof = dsi->buffer;
254
LOG(log_debug, logtype_dsi, "from_buf(read: %u, unread:%u , space left: %u): returning %u",
255
dsi->start - dsi->buffer, dsi->eof - dsi->start, dsi->end - dsi->eof, nbe);
283
268
static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
272
LOG(log_maxdebug, logtype_dsi, "buf_read(%u bytes)", count);
290
nbe = from_buf(dsi, buf, count); /* 1. */
277
len = from_buf(dsi, buf, count); /* 1. */
294
return read(dsi->socket, buf, count); /* 3. */
281
len = readt(dsi->socket, buf, count, 0, 1); /* 3. */
283
LOG(log_maxdebug, logtype_dsi, "buf_read(%u bytes): got: %d", count, len);
298
289
* Essentially a loop around buf_read() to ensure "length" bytes are read
299
290
* from dsi->buffer and/or the socket.
292
* @returns length on success, some value smaller then length indicates an error
301
294
size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
299
LOG(log_maxdebug, logtype_dsi, "dsi_stream_read(%u bytes)", length);
307
302
while (stored < length) {
308
len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
309
if (len == -1 && errno == EINTR)
313
else { /* eof or error */
314
/* don't log EOF error if it's just after connect (OSX 10.3 probe) */
315
if (len || stored || dsi->read_count) {
316
LOG(log_error, logtype_dsi, "dsi_stream_read(%d): %s", len, (len < 0)?strerror(errno):"unexpected EOF");
303
len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
304
if (len == -1 && (errno == EINTR || errno == EAGAIN)) {
305
LOG(log_debug, logtype_dsi, "dsi_stream_read: select read loop");
307
} else if (len > 0) {
309
} else { /* eof or error */
310
/* don't log EOF error if it's just after connect (OSX 10.3 probe) */
311
if (len || stored || dsi->read_count) {
312
if (! (dsi->flags & DSI_DISCONNECTED)) {
313
LOG(log_error, logtype_dsi, "dsi_stream_read: len:%d, %s",
314
len, (len < 0) ? strerror(errno) : "unexpected EOF");
322
322
dsi->read_count += stored;
324
LOG(log_maxdebug, logtype_dsi, "dsi_stream_read(%u bytes): got: %u", length, stored);
381
377
int dsi_stream_send(DSI *dsi, void *buf, size_t length)
383
379
char block[DSI_BLOCKSIZ];
385
380
struct iovec iov[2];
388
#endif /* USE_WRITEV */
384
LOG(log_maxdebug, logtype_dsi, "dsi_stream_send: %u bytes",
385
length ? length : sizeof(block));
390
387
block[0] = dsi->header.dsi_flags;
391
388
block[1] = dsi->header.dsi_command;
412
408
towrite = sizeof(block) + length;
413
409
dsi->write_count += towrite;
414
410
while (towrite > 0) {
415
if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) ||
419
if ((size_t)len == towrite) /* wrote everything out */
421
else if (len < 0) { /* error */
422
if (errno == EAGAIN || errno == EWOULDBLOCK) {
423
if (!dsi_peek(dsi)) {
427
LOG(log_error, logtype_dsi, "dsi_stream_send: %s", strerror(errno));
433
if (towrite > length) { /* skip part of header */
434
iov[0].iov_base = (char *) iov[0].iov_base + len;
435
iov[0].iov_len -= len;
436
} else { /* skip to data */
437
if (iov[0].iov_len) {
438
len -= iov[0].iov_len;
441
iov[1].iov_base = (char *) iov[1].iov_base + len;
442
iov[1].iov_len -= len;
411
if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) || (len == 0))
414
if ((size_t)len == towrite) /* wrote everything out */
416
else if (len < 0) { /* error */
417
if (errno == EAGAIN || errno == EWOULDBLOCK) {
418
if (!dsi_peek(dsi)) {
422
LOG(log_error, logtype_dsi, "dsi_stream_send: %s", strerror(errno));
428
if (towrite > length) { /* skip part of header */
429
iov[0].iov_base = (char *) iov[0].iov_base + len;
430
iov[0].iov_len -= len;
431
} else { /* skip to data */
432
if (iov[0].iov_len) {
433
len -= iov[0].iov_len;
436
iov[1].iov_base = (char *) iov[1].iov_base + len;
437
iov[1].iov_len -= len;
446
#else /* USE_WRITEV */
447
/* write the header then data */
448
if ((dsi_stream_write(dsi, block, sizeof(block), 1) != sizeof(block)) ||
449
(dsi_stream_write(dsi, buf, length, 0) != length)) {
453
#endif /* USE_WRITEV */
455
441
unblock_sig(dsi);