~zulcss/samba/server-dailies-3.4

« back to all changes in this revision

Viewing changes to lib/async_req/async_sock.c

  • Committer: Chuck Short
  • Date: 2010-09-28 20:38:39 UTC
  • Revision ID: zulcss@ubuntu.com-20100928203839-pgjulytsi9ue63x1
Initial version

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
   Unix SMB/CIFS implementation.
 
3
   async socket syscalls
 
4
   Copyright (C) Volker Lendecke 2008
 
5
 
 
6
   This program is free software; you can redistribute it and/or modify
 
7
   it under the terms of the GNU General Public License as published by
 
8
   the Free Software Foundation; either version 3 of the License, or
 
9
   (at your option) any later version.
 
10
 
 
11
   This program is distributed in the hope that it will be useful,
 
12
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
   GNU General Public License for more details.
 
15
 
 
16
   You should have received a copy of the GNU General Public License
 
17
   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
*/
 
19
 
 
20
#include "includes.h"
 
21
#include "lib/talloc/talloc.h"
 
22
#include "lib/tevent/tevent.h"
 
23
#include "lib/async_req/async_req.h"
 
24
#include "lib/async_req/async_sock.h"
 
25
#include "lib/util/tevent_unix.h"
 
26
#include <fcntl.h>
 
27
 
 
28
#ifndef TALLOC_FREE
 
29
#define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
 
30
#endif
 
31
 
 
32
/**
 
33
 * @brief Map async_req states to unix-style errnos
 
34
 * @param[in]  req      The async req to get the state from
 
35
 * @param[out] err      Pointer to take the unix-style errno
 
36
 *
 
37
 * @return true if the async_req is in an error state, false otherwise
 
38
 */
 
39
 
 
40
bool async_req_is_errno(struct async_req *req, int *err)
 
41
{
 
42
        enum async_req_state state;
 
43
        uint64_t error;
 
44
 
 
45
        if (!async_req_is_error(req, &state, &error)) {
 
46
                return false;
 
47
        }
 
48
 
 
49
        switch (state) {
 
50
        case ASYNC_REQ_USER_ERROR:
 
51
                *err = (int)error;
 
52
                break;
 
53
        case ASYNC_REQ_TIMED_OUT:
 
54
#ifdef ETIMEDOUT
 
55
                *err = ETIMEDOUT;
 
56
#else
 
57
                *err = EAGAIN;
 
58
#endif
 
59
                break;
 
60
        case ASYNC_REQ_NO_MEMORY:
 
61
                *err = ENOMEM;
 
62
                break;
 
63
        default:
 
64
                *err = EIO;
 
65
                break;
 
66
        }
 
67
        return true;
 
68
}
 
69
 
 
70
int async_req_simple_recv_errno(struct async_req *req)
 
71
{
 
72
        int err;
 
73
 
 
74
        if (async_req_is_errno(req, &err)) {
 
75
                return err;
 
76
        }
 
77
 
 
78
        return 0;
 
79
}
 
80
 
 
81
struct async_send_state {
 
82
        int fd;
 
83
        const void *buf;
 
84
        size_t len;
 
85
        int flags;
 
86
        ssize_t sent;
 
87
};
 
88
 
 
89
static void async_send_handler(struct tevent_context *ev,
 
90
                               struct tevent_fd *fde,
 
91
                               uint16_t flags, void *private_data);
 
92
 
 
93
struct tevent_req *async_send_send(TALLOC_CTX *mem_ctx,
 
94
                                   struct tevent_context *ev,
 
95
                                   int fd, const void *buf, size_t len,
 
96
                                   int flags)
 
97
{
 
98
        struct tevent_req *result;
 
99
        struct async_send_state *state;
 
100
        struct tevent_fd *fde;
 
101
 
 
102
        result = tevent_req_create(mem_ctx, &state, struct async_send_state);
 
103
        if (result == NULL) {
 
104
                return result;
 
105
        }
 
106
        state->fd = fd;
 
107
        state->buf = buf;
 
108
        state->len = len;
 
109
        state->flags = flags;
 
110
 
 
111
        fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, async_send_handler,
 
112
                            result);
 
113
        if (fde == NULL) {
 
114
                TALLOC_FREE(result);
 
115
                return NULL;
 
116
        }
 
117
        return result;
 
118
}
 
119
 
 
120
static void async_send_handler(struct tevent_context *ev,
 
121
                               struct tevent_fd *fde,
 
122
                               uint16_t flags, void *private_data)
 
123
{
 
124
        struct tevent_req *req = talloc_get_type_abort(
 
125
                private_data, struct tevent_req);
 
126
        struct async_send_state *state =
 
127
                tevent_req_data(req, struct async_send_state);
 
128
 
 
129
        state->sent = send(state->fd, state->buf, state->len, state->flags);
 
130
        if (state->sent == -1) {
 
131
                tevent_req_error(req, errno);
 
132
                return;
 
133
        }
 
134
        tevent_req_done(req);
 
135
}
 
136
 
 
137
ssize_t async_send_recv(struct tevent_req *req, int *perrno)
 
138
{
 
139
        struct async_send_state *state =
 
140
                tevent_req_data(req, struct async_send_state);
 
141
 
 
142
        if (tevent_req_is_unix_error(req, perrno)) {
 
143
                return -1;
 
144
        }
 
145
        return state->sent;
 
146
}
 
147
 
 
148
struct async_recv_state {
 
149
        int fd;
 
150
        void *buf;
 
151
        size_t len;
 
152
        int flags;
 
153
        ssize_t received;
 
154
};
 
155
 
 
156
static void async_recv_handler(struct tevent_context *ev,
 
157
                               struct tevent_fd *fde,
 
158
                               uint16_t flags, void *private_data);
 
159
 
 
160
struct tevent_req *async_recv_send(TALLOC_CTX *mem_ctx,
 
161
                                   struct tevent_context *ev,
 
162
                                   int fd, void *buf, size_t len, int flags)
 
163
{
 
164
        struct tevent_req *result;
 
165
        struct async_recv_state *state;
 
166
        struct tevent_fd *fde;
 
167
 
 
168
        result = tevent_req_create(mem_ctx, &state, struct async_recv_state);
 
169
        if (result == NULL) {
 
170
                return result;
 
171
        }
 
172
        state->fd = fd;
 
173
        state->buf = buf;
 
174
        state->len = len;
 
175
        state->flags = flags;
 
176
 
 
177
        fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, async_recv_handler,
 
178
                            result);
 
179
        if (fde == NULL) {
 
180
                TALLOC_FREE(result);
 
181
                return NULL;
 
182
        }
 
183
        return result;
 
184
}
 
185
 
 
186
static void async_recv_handler(struct tevent_context *ev,
 
187
                               struct tevent_fd *fde,
 
188
                               uint16_t flags, void *private_data)
 
189
{
 
190
        struct tevent_req *req = talloc_get_type_abort(
 
191
                private_data, struct tevent_req);
 
192
        struct async_recv_state *state =
 
193
                tevent_req_data(req, struct async_recv_state);
 
194
 
 
195
        state->received = recv(state->fd, state->buf, state->len,
 
196
                               state->flags);
 
197
        if (state->received == -1) {
 
198
                tevent_req_error(req, errno);
 
199
                return;
 
200
        }
 
201
        tevent_req_done(req);
 
202
}
 
203
 
 
204
ssize_t async_recv_recv(struct tevent_req *req, int *perrno)
 
205
{
 
206
        struct async_recv_state *state =
 
207
                tevent_req_data(req, struct async_recv_state);
 
208
 
 
209
        if (tevent_req_is_unix_error(req, perrno)) {
 
210
                return -1;
 
211
        }
 
212
        return state->received;
 
213
}
 
214
 
 
215
struct async_connect_state {
 
216
        int fd;
 
217
        int result;
 
218
        int sys_errno;
 
219
        long old_sockflags;
 
220
};
 
221
 
 
222
static void async_connect_connected(struct tevent_context *ev,
 
223
                                    struct tevent_fd *fde, uint16_t flags,
 
224
                                    void *priv);
 
225
 
 
226
/**
 
227
 * @brief async version of connect(2)
 
228
 * @param[in] mem_ctx   The memory context to hang the result off
 
229
 * @param[in] ev        The event context to work from
 
230
 * @param[in] fd        The socket to recv from
 
231
 * @param[in] address   Where to connect?
 
232
 * @param[in] address_len Length of *address
 
233
 * @retval The async request
 
234
 *
 
235
 * This function sets the socket into non-blocking state to be able to call
 
236
 * connect in an async state. This will be reset when the request is finished.
 
237
 */
 
238
 
 
239
struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
 
240
                                      struct tevent_context *ev,
 
241
                                      int fd, const struct sockaddr *address,
 
242
                                      socklen_t address_len)
 
243
{
 
244
        struct tevent_req *result;
 
245
        struct async_connect_state *state;
 
246
        struct tevent_fd *fde;
 
247
 
 
248
        result = tevent_req_create(
 
249
                mem_ctx, &state, struct async_connect_state);
 
250
        if (result == NULL) {
 
251
                return NULL;
 
252
        }
 
253
 
 
254
        /**
 
255
         * We have to set the socket to nonblocking for async connect(2). Keep
 
256
         * the old sockflags around.
 
257
         */
 
258
 
 
259
        state->fd = fd;
 
260
        state->sys_errno = 0;
 
261
 
 
262
        state->old_sockflags = fcntl(fd, F_GETFL, 0);
 
263
        if (state->old_sockflags == -1) {
 
264
                goto post_errno;
 
265
        }
 
266
 
 
267
        set_blocking(fd, false);
 
268
 
 
269
        state->result = connect(fd, address, address_len);
 
270
        if (state->result == 0) {
 
271
                tevent_req_done(result);
 
272
                goto done;
 
273
        }
 
274
 
 
275
        /**
 
276
         * A number of error messages show that something good is progressing
 
277
         * and that we have to wait for readability.
 
278
         *
 
279
         * If none of them are present, bail out.
 
280
         */
 
281
 
 
282
        if (!(errno == EINPROGRESS || errno == EALREADY ||
 
283
#ifdef EISCONN
 
284
              errno == EISCONN ||
 
285
#endif
 
286
              errno == EAGAIN || errno == EINTR)) {
 
287
                state->sys_errno = errno;
 
288
                goto post_errno;
 
289
        }
 
290
 
 
291
        fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
 
292
                           async_connect_connected, result);
 
293
        if (fde == NULL) {
 
294
                state->sys_errno = ENOMEM;
 
295
                goto post_errno;
 
296
        }
 
297
        return result;
 
298
 
 
299
 post_errno:
 
300
        tevent_req_error(result, state->sys_errno);
 
301
 done:
 
302
        fcntl(fd, F_SETFL, state->old_sockflags);
 
303
        return tevent_req_post(result, ev);
 
304
}
 
305
 
 
306
/**
 
307
 * fde event handler for connect(2)
 
308
 * @param[in] ev        The event context that sent us here
 
309
 * @param[in] fde       The file descriptor event associated with the connect
 
310
 * @param[in] flags     Indicate read/writeability of the socket
 
311
 * @param[in] priv      private data, "struct async_req *" in this case
 
312
 */
 
313
 
 
314
static void async_connect_connected(struct tevent_context *ev,
 
315
                                    struct tevent_fd *fde, uint16_t flags,
 
316
                                    void *priv)
 
317
{
 
318
        struct tevent_req *req = talloc_get_type_abort(
 
319
                priv, struct tevent_req);
 
320
        struct async_connect_state *state =
 
321
                tevent_req_data(req, struct async_connect_state);
 
322
 
 
323
        TALLOC_FREE(fde);
 
324
 
 
325
        /*
 
326
         * Stevens, Network Programming says that if there's a
 
327
         * successful connect, the socket is only writable. Upon an
 
328
         * error, it's both readable and writable.
 
329
         */
 
330
        if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE))
 
331
            == (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
 
332
                int sockerr;
 
333
                socklen_t err_len = sizeof(sockerr);
 
334
 
 
335
                if (getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
 
336
                               (void *)&sockerr, &err_len) == 0) {
 
337
                        errno = sockerr;
 
338
                }
 
339
 
 
340
                state->sys_errno = errno;
 
341
 
 
342
                DEBUG(10, ("connect returned %s\n", strerror(errno)));
 
343
 
 
344
                fcntl(state->fd, F_SETFL, state->old_sockflags);
 
345
                tevent_req_error(req, state->sys_errno);
 
346
                return;
 
347
        }
 
348
 
 
349
        state->sys_errno = 0;
 
350
        tevent_req_done(req);
 
351
}
 
352
 
 
353
int async_connect_recv(struct tevent_req *req, int *perrno)
 
354
{
 
355
        struct async_connect_state *state =
 
356
                tevent_req_data(req, struct async_connect_state);
 
357
        int err;
 
358
 
 
359
        fcntl(state->fd, F_SETFL, state->old_sockflags);
 
360
 
 
361
        if (tevent_req_is_unix_error(req, &err)) {
 
362
                *perrno = err;
 
363
                return -1;
 
364
        }
 
365
 
 
366
        if (state->sys_errno == 0) {
 
367
                return 0;
 
368
        }
 
369
 
 
370
        *perrno = state->sys_errno;
 
371
        return -1;
 
372
}
 
373
 
 
374
struct writev_state {
 
375
        struct tevent_context *ev;
 
376
        int fd;
 
377
        struct iovec *iov;
 
378
        int count;
 
379
        size_t total_size;
 
380
};
 
381
 
 
382
static void writev_trigger(struct tevent_req *req, void *private_data);
 
383
static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
 
384
                           uint16_t flags, void *private_data);
 
385
 
 
386
struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
 
387
                               struct tevent_queue *queue, int fd,
 
388
                               struct iovec *iov, int count)
 
389
{
 
390
        struct tevent_req *result;
 
391
        struct writev_state *state;
 
392
 
 
393
        result = tevent_req_create(mem_ctx, &state, struct writev_state);
 
394
        if (result == NULL) {
 
395
                return NULL;
 
396
        }
 
397
        state->ev = ev;
 
398
        state->fd = fd;
 
399
        state->total_size = 0;
 
400
        state->count = count;
 
401
        state->iov = (struct iovec *)talloc_memdup(
 
402
                state, iov, sizeof(struct iovec) * count);
 
403
        if (state->iov == NULL) {
 
404
                goto fail;
 
405
        }
 
406
 
 
407
        if (!tevent_queue_add(queue, ev, result, writev_trigger, NULL)) {
 
408
                goto fail;
 
409
        }
 
410
        return result;
 
411
 fail:
 
412
        TALLOC_FREE(result);
 
413
        return NULL;
 
414
}
 
415
 
 
416
static void writev_trigger(struct tevent_req *req, void *private_data)
 
417
{
 
418
        struct writev_state *state = tevent_req_data(req, struct writev_state);
 
419
        struct tevent_fd *fde;
 
420
 
 
421
        fde = tevent_add_fd(state->ev, state, state->fd, TEVENT_FD_WRITE,
 
422
                            writev_handler, req);
 
423
        if (fde == NULL) {
 
424
                tevent_req_error(req, ENOMEM);
 
425
        }
 
426
}
 
427
 
 
428
static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
 
429
                           uint16_t flags, void *private_data)
 
430
{
 
431
        struct tevent_req *req = talloc_get_type_abort(
 
432
                private_data, struct tevent_req);
 
433
        struct writev_state *state =
 
434
                tevent_req_data(req, struct writev_state);
 
435
        size_t to_write, written;
 
436
        int i;
 
437
 
 
438
        to_write = 0;
 
439
 
 
440
        for (i=0; i<state->count; i++) {
 
441
                to_write += state->iov[i].iov_len;
 
442
        }
 
443
 
 
444
        written = sys_writev(state->fd, state->iov, state->count);
 
445
        if (written == -1) {
 
446
                tevent_req_error(req, errno);
 
447
                return;
 
448
        }
 
449
        if (written == 0) {
 
450
                tevent_req_error(req, EPIPE);
 
451
                return;
 
452
        }
 
453
        state->total_size += written;
 
454
 
 
455
        if (written == to_write) {
 
456
                tevent_req_done(req);
 
457
                return;
 
458
        }
 
459
 
 
460
        /*
 
461
         * We've written less than we were asked to, drop stuff from
 
462
         * state->iov.
 
463
         */
 
464
 
 
465
        while (written > 0) {
 
466
                if (written < state->iov[0].iov_len) {
 
467
                        state->iov[0].iov_base =
 
468
                                (char *)state->iov[0].iov_base + written;
 
469
                        state->iov[0].iov_len -= written;
 
470
                        break;
 
471
                }
 
472
                written -= state->iov[0].iov_len;
 
473
                state->iov += 1;
 
474
                state->count -= 1;
 
475
        }
 
476
}
 
477
 
 
478
ssize_t writev_recv(struct tevent_req *req, int *perrno)
 
479
{
 
480
        struct writev_state *state =
 
481
                tevent_req_data(req, struct writev_state);
 
482
 
 
483
        if (tevent_req_is_unix_error(req, perrno)) {
 
484
                return -1;
 
485
        }
 
486
        return state->total_size;
 
487
}
 
488
 
 
489
struct read_packet_state {
 
490
        int fd;
 
491
        uint8_t *buf;
 
492
        size_t nread;
 
493
        ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
 
494
        void *private_data;
 
495
};
 
496
 
 
497
static void read_packet_handler(struct tevent_context *ev,
 
498
                                struct tevent_fd *fde,
 
499
                                uint16_t flags, void *private_data);
 
500
 
 
501
struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
 
502
                                    struct tevent_context *ev,
 
503
                                    int fd, size_t initial,
 
504
                                    ssize_t (*more)(uint8_t *buf,
 
505
                                                    size_t buflen,
 
506
                                                    void *private_data),
 
507
                                    void *private_data)
 
508
{
 
509
        struct tevent_req *result;
 
510
        struct read_packet_state *state;
 
511
        struct tevent_fd *fde;
 
512
 
 
513
        result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
 
514
        if (result == NULL) {
 
515
                return NULL;
 
516
        }
 
517
        state->fd = fd;
 
518
        state->nread = 0;
 
519
        state->more = more;
 
520
        state->private_data = private_data;
 
521
 
 
522
        state->buf = talloc_array(state, uint8_t, initial);
 
523
        if (state->buf == NULL) {
 
524
                goto fail;
 
525
        }
 
526
 
 
527
        fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
 
528
                            result);
 
529
        if (fde == NULL) {
 
530
                goto fail;
 
531
        }
 
532
        return result;
 
533
 fail:
 
534
        TALLOC_FREE(result);
 
535
        return NULL;
 
536
}
 
537
 
 
538
static void read_packet_handler(struct tevent_context *ev,
 
539
                                struct tevent_fd *fde,
 
540
                                uint16_t flags, void *private_data)
 
541
{
 
542
        struct tevent_req *req = talloc_get_type_abort(
 
543
                private_data, struct tevent_req);
 
544
        struct read_packet_state *state =
 
545
                tevent_req_data(req, struct read_packet_state);
 
546
        size_t total = talloc_get_size(state->buf);
 
547
        ssize_t nread, more;
 
548
        uint8_t *tmp;
 
549
 
 
550
        nread = recv(state->fd, state->buf+state->nread, total-state->nread,
 
551
                     0);
 
552
        if (nread == -1) {
 
553
                tevent_req_error(req, errno);
 
554
                return;
 
555
        }
 
556
        if (nread == 0) {
 
557
                tevent_req_error(req, EPIPE);
 
558
                return;
 
559
        }
 
560
 
 
561
        state->nread += nread;
 
562
        if (state->nread < total) {
 
563
                /* Come back later */
 
564
                return;
 
565
        }
 
566
 
 
567
        /*
 
568
         * We got what was initially requested. See if "more" asks for -- more.
 
569
         */
 
570
        if (state->more == NULL) {
 
571
                /* Nobody to ask, this is a async read_data */
 
572
                tevent_req_done(req);
 
573
                return;
 
574
        }
 
575
 
 
576
        more = state->more(state->buf, total, state->private_data);
 
577
        if (more == -1) {
 
578
                /* We got an invalid packet, tell the caller */
 
579
                tevent_req_error(req, EIO);
 
580
                return;
 
581
        }
 
582
        if (more == 0) {
 
583
                /* We're done, full packet received */
 
584
                tevent_req_done(req);
 
585
                return;
 
586
        }
 
587
 
 
588
        tmp = TALLOC_REALLOC_ARRAY(state, state->buf, uint8_t, total+more);
 
589
        if (tevent_req_nomem(tmp, req)) {
 
590
                return;
 
591
        }
 
592
        state->buf = tmp;
 
593
}
 
594
 
 
595
ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 
596
                         uint8_t **pbuf, int *perrno)
 
597
{
 
598
        struct read_packet_state *state =
 
599
                tevent_req_data(req, struct read_packet_state);
 
600
 
 
601
        if (tevent_req_is_unix_error(req, perrno)) {
 
602
                return -1;
 
603
        }
 
604
        *pbuf = talloc_move(mem_ctx, &state->buf);
 
605
        return talloc_get_size(*pbuf);
 
606
}