~ubuntu-branches/ubuntu/precise/linux-lowlatency/precise

« back to all changes in this revision

Viewing changes to fs/afs/rxrpc.c

  • Committer: Package Import Robot
  • Author(s): Alessio Igor Bogani
  • Date: 2011-10-26 11:13:05 UTC
  • Revision ID: package-import@ubuntu.com-20111026111305-tz023xykf0i6eosh
Tags: upstream-3.2.0
ImportĀ upstreamĀ versionĀ 3.2.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Maintain an RxRPC server socket to do AFS communications through
 
2
 *
 
3
 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
 
4
 * Written by David Howells (dhowells@redhat.com)
 
5
 *
 
6
 * This program is free software; you can redistribute it and/or
 
7
 * modify it under the terms of the GNU General Public License
 
8
 * as published by the Free Software Foundation; either version
 
9
 * 2 of the License, or (at your option) any later version.
 
10
 */
 
11
 
 
12
#include <linux/slab.h>
 
13
#include <net/sock.h>
 
14
#include <net/af_rxrpc.h>
 
15
#include <rxrpc/packet.h>
 
16
#include "internal.h"
 
17
#include "afs_cm.h"
 
18
 
 
19
static struct socket *afs_socket; /* my RxRPC socket */
 
20
static struct workqueue_struct *afs_async_calls;
 
21
static atomic_t afs_outstanding_calls;
 
22
static atomic_t afs_outstanding_skbs;
 
23
 
 
24
static void afs_wake_up_call_waiter(struct afs_call *);
 
25
static int afs_wait_for_call_to_complete(struct afs_call *);
 
26
static void afs_wake_up_async_call(struct afs_call *);
 
27
static int afs_dont_wait_for_call_to_complete(struct afs_call *);
 
28
static void afs_process_async_call(struct work_struct *);
 
29
static void afs_rx_interceptor(struct sock *, unsigned long, struct sk_buff *);
 
30
static int afs_deliver_cm_op_id(struct afs_call *, struct sk_buff *, bool);
 
31
 
 
32
/* synchronous call management */
 
33
const struct afs_wait_mode afs_sync_call = {
 
34
        .rx_wakeup      = afs_wake_up_call_waiter,
 
35
        .wait           = afs_wait_for_call_to_complete,
 
36
};
 
37
 
 
38
/* asynchronous call management */
 
39
const struct afs_wait_mode afs_async_call = {
 
40
        .rx_wakeup      = afs_wake_up_async_call,
 
41
        .wait           = afs_dont_wait_for_call_to_complete,
 
42
};
 
43
 
 
44
/* asynchronous incoming call management */
 
45
static const struct afs_wait_mode afs_async_incoming_call = {
 
46
        .rx_wakeup      = afs_wake_up_async_call,
 
47
};
 
48
 
 
49
/* asynchronous incoming call initial processing */
 
50
static const struct afs_call_type afs_RXCMxxxx = {
 
51
        .name           = "CB.xxxx",
 
52
        .deliver        = afs_deliver_cm_op_id,
 
53
        .abort_to_error = afs_abort_to_error,
 
54
};
 
55
 
 
56
static void afs_collect_incoming_call(struct work_struct *);
 
57
 
 
58
static struct sk_buff_head afs_incoming_calls;
 
59
static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call);
 
60
 
 
61
/*
 
62
 * open an RxRPC socket and bind it to be a server for callback notifications
 
63
 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
 
64
 */
 
65
int afs_open_socket(void)
 
66
{
 
67
        struct sockaddr_rxrpc srx;
 
68
        struct socket *socket;
 
69
        int ret;
 
70
 
 
71
        _enter("");
 
72
 
 
73
        skb_queue_head_init(&afs_incoming_calls);
 
74
 
 
75
        afs_async_calls = create_singlethread_workqueue("kafsd");
 
76
        if (!afs_async_calls) {
 
77
                _leave(" = -ENOMEM [wq]");
 
78
                return -ENOMEM;
 
79
        }
 
80
 
 
81
        ret = sock_create_kern(AF_RXRPC, SOCK_DGRAM, PF_INET, &socket);
 
82
        if (ret < 0) {
 
83
                destroy_workqueue(afs_async_calls);
 
84
                _leave(" = %d [socket]", ret);
 
85
                return ret;
 
86
        }
 
87
 
 
88
        socket->sk->sk_allocation = GFP_NOFS;
 
89
 
 
90
        /* bind the callback manager's address to make this a server socket */
 
91
        srx.srx_family                  = AF_RXRPC;
 
92
        srx.srx_service                 = CM_SERVICE;
 
93
        srx.transport_type              = SOCK_DGRAM;
 
94
        srx.transport_len               = sizeof(srx.transport.sin);
 
95
        srx.transport.sin.sin_family    = AF_INET;
 
96
        srx.transport.sin.sin_port      = htons(AFS_CM_PORT);
 
97
        memset(&srx.transport.sin.sin_addr, 0,
 
98
               sizeof(srx.transport.sin.sin_addr));
 
99
 
 
100
        ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
 
101
        if (ret < 0) {
 
102
                sock_release(socket);
 
103
                destroy_workqueue(afs_async_calls);
 
104
                _leave(" = %d [bind]", ret);
 
105
                return ret;
 
106
        }
 
107
 
 
108
        rxrpc_kernel_intercept_rx_messages(socket, afs_rx_interceptor);
 
109
 
 
110
        afs_socket = socket;
 
111
        _leave(" = 0");
 
112
        return 0;
 
113
}
 
114
 
 
115
/*
 
116
 * close the RxRPC socket AFS was using
 
117
 */
 
118
void afs_close_socket(void)
 
119
{
 
120
        _enter("");
 
121
 
 
122
        sock_release(afs_socket);
 
123
 
 
124
        _debug("dework");
 
125
        destroy_workqueue(afs_async_calls);
 
126
 
 
127
        ASSERTCMP(atomic_read(&afs_outstanding_skbs), ==, 0);
 
128
        ASSERTCMP(atomic_read(&afs_outstanding_calls), ==, 0);
 
129
        _leave("");
 
130
}
 
131
 
 
132
/*
 
133
 * note that the data in a socket buffer is now delivered and that the buffer
 
134
 * should be freed
 
135
 */
 
136
static void afs_data_delivered(struct sk_buff *skb)
 
137
{
 
138
        if (!skb) {
 
139
                _debug("DLVR NULL [%d]", atomic_read(&afs_outstanding_skbs));
 
140
                dump_stack();
 
141
        } else {
 
142
                _debug("DLVR %p{%u} [%d]",
 
143
                       skb, skb->mark, atomic_read(&afs_outstanding_skbs));
 
144
                if (atomic_dec_return(&afs_outstanding_skbs) == -1)
 
145
                        BUG();
 
146
                rxrpc_kernel_data_delivered(skb);
 
147
        }
 
148
}
 
149
 
 
150
/*
 
151
 * free a socket buffer
 
152
 */
 
153
static void afs_free_skb(struct sk_buff *skb)
 
154
{
 
155
        if (!skb) {
 
156
                _debug("FREE NULL [%d]", atomic_read(&afs_outstanding_skbs));
 
157
                dump_stack();
 
158
        } else {
 
159
                _debug("FREE %p{%u} [%d]",
 
160
                       skb, skb->mark, atomic_read(&afs_outstanding_skbs));
 
161
                if (atomic_dec_return(&afs_outstanding_skbs) == -1)
 
162
                        BUG();
 
163
                rxrpc_kernel_free_skb(skb);
 
164
        }
 
165
}
 
166
 
 
167
/*
 
168
 * free a call
 
169
 */
 
170
static void afs_free_call(struct afs_call *call)
 
171
{
 
172
        _debug("DONE %p{%s} [%d]",
 
173
               call, call->type->name, atomic_read(&afs_outstanding_calls));
 
174
        if (atomic_dec_return(&afs_outstanding_calls) == -1)
 
175
                BUG();
 
176
 
 
177
        ASSERTCMP(call->rxcall, ==, NULL);
 
178
        ASSERT(!work_pending(&call->async_work));
 
179
        ASSERT(skb_queue_empty(&call->rx_queue));
 
180
        ASSERT(call->type->name != NULL);
 
181
 
 
182
        kfree(call->request);
 
183
        kfree(call);
 
184
}
 
185
 
 
186
/*
 
187
 * allocate a call with flat request and reply buffers
 
188
 */
 
189
struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
 
190
                                     size_t request_size, size_t reply_size)
 
191
{
 
192
        struct afs_call *call;
 
193
 
 
194
        call = kzalloc(sizeof(*call), GFP_NOFS);
 
195
        if (!call)
 
196
                goto nomem_call;
 
197
 
 
198
        _debug("CALL %p{%s} [%d]",
 
199
               call, type->name, atomic_read(&afs_outstanding_calls));
 
200
        atomic_inc(&afs_outstanding_calls);
 
201
 
 
202
        call->type = type;
 
203
        call->request_size = request_size;
 
204
        call->reply_max = reply_size;
 
205
 
 
206
        if (request_size) {
 
207
                call->request = kmalloc(request_size, GFP_NOFS);
 
208
                if (!call->request)
 
209
                        goto nomem_free;
 
210
        }
 
211
 
 
212
        if (reply_size) {
 
213
                call->buffer = kmalloc(reply_size, GFP_NOFS);
 
214
                if (!call->buffer)
 
215
                        goto nomem_free;
 
216
        }
 
217
 
 
218
        init_waitqueue_head(&call->waitq);
 
219
        skb_queue_head_init(&call->rx_queue);
 
220
        return call;
 
221
 
 
222
nomem_free:
 
223
        afs_free_call(call);
 
224
nomem_call:
 
225
        return NULL;
 
226
}
 
227
 
 
228
/*
 
229
 * clean up a call with flat buffer
 
230
 */
 
231
void afs_flat_call_destructor(struct afs_call *call)
 
232
{
 
233
        _enter("");
 
234
 
 
235
        kfree(call->request);
 
236
        call->request = NULL;
 
237
        kfree(call->buffer);
 
238
        call->buffer = NULL;
 
239
}
 
240
 
 
241
/*
 
242
 * attach the data from a bunch of pages on an inode to a call
 
243
 */
 
244
static int afs_send_pages(struct afs_call *call, struct msghdr *msg,
 
245
                          struct kvec *iov)
 
246
{
 
247
        struct page *pages[8];
 
248
        unsigned count, n, loop, offset, to;
 
249
        pgoff_t first = call->first, last = call->last;
 
250
        int ret;
 
251
 
 
252
        _enter("");
 
253
 
 
254
        offset = call->first_offset;
 
255
        call->first_offset = 0;
 
256
 
 
257
        do {
 
258
                _debug("attach %lx-%lx", first, last);
 
259
 
 
260
                count = last - first + 1;
 
261
                if (count > ARRAY_SIZE(pages))
 
262
                        count = ARRAY_SIZE(pages);
 
263
                n = find_get_pages_contig(call->mapping, first, count, pages);
 
264
                ASSERTCMP(n, ==, count);
 
265
 
 
266
                loop = 0;
 
267
                do {
 
268
                        msg->msg_flags = 0;
 
269
                        to = PAGE_SIZE;
 
270
                        if (first + loop >= last)
 
271
                                to = call->last_to;
 
272
                        else
 
273
                                msg->msg_flags = MSG_MORE;
 
274
                        iov->iov_base = kmap(pages[loop]) + offset;
 
275
                        iov->iov_len = to - offset;
 
276
                        offset = 0;
 
277
 
 
278
                        _debug("- range %u-%u%s",
 
279
                               offset, to, msg->msg_flags ? " [more]" : "");
 
280
                        msg->msg_iov = (struct iovec *) iov;
 
281
                        msg->msg_iovlen = 1;
 
282
 
 
283
                        /* have to change the state *before* sending the last
 
284
                         * packet as RxRPC might give us the reply before it
 
285
                         * returns from sending the request */
 
286
                        if (first + loop >= last)
 
287
                                call->state = AFS_CALL_AWAIT_REPLY;
 
288
                        ret = rxrpc_kernel_send_data(call->rxcall, msg,
 
289
                                                     to - offset);
 
290
                        kunmap(pages[loop]);
 
291
                        if (ret < 0)
 
292
                                break;
 
293
                } while (++loop < count);
 
294
                first += count;
 
295
 
 
296
                for (loop = 0; loop < count; loop++)
 
297
                        put_page(pages[loop]);
 
298
                if (ret < 0)
 
299
                        break;
 
300
        } while (first <= last);
 
301
 
 
302
        _leave(" = %d", ret);
 
303
        return ret;
 
304
}
 
305
 
 
306
/*
 
307
 * initiate a call
 
308
 */
 
309
int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
 
310
                  const struct afs_wait_mode *wait_mode)
 
311
{
 
312
        struct sockaddr_rxrpc srx;
 
313
        struct rxrpc_call *rxcall;
 
314
        struct msghdr msg;
 
315
        struct kvec iov[1];
 
316
        int ret;
 
317
 
 
318
        _enter("%x,{%d},", addr->s_addr, ntohs(call->port));
 
319
 
 
320
        ASSERT(call->type != NULL);
 
321
        ASSERT(call->type->name != NULL);
 
322
 
 
323
        _debug("____MAKE %p{%s,%x} [%d]____",
 
324
               call, call->type->name, key_serial(call->key),
 
325
               atomic_read(&afs_outstanding_calls));
 
326
 
 
327
        call->wait_mode = wait_mode;
 
328
        INIT_WORK(&call->async_work, afs_process_async_call);
 
329
 
 
330
        memset(&srx, 0, sizeof(srx));
 
331
        srx.srx_family = AF_RXRPC;
 
332
        srx.srx_service = call->service_id;
 
333
        srx.transport_type = SOCK_DGRAM;
 
334
        srx.transport_len = sizeof(srx.transport.sin);
 
335
        srx.transport.sin.sin_family = AF_INET;
 
336
        srx.transport.sin.sin_port = call->port;
 
337
        memcpy(&srx.transport.sin.sin_addr, addr, 4);
 
338
 
 
339
        /* create a call */
 
340
        rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key,
 
341
                                         (unsigned long) call, gfp);
 
342
        call->key = NULL;
 
343
        if (IS_ERR(rxcall)) {
 
344
                ret = PTR_ERR(rxcall);
 
345
                goto error_kill_call;
 
346
        }
 
347
 
 
348
        call->rxcall = rxcall;
 
349
 
 
350
        /* send the request */
 
351
        iov[0].iov_base = call->request;
 
352
        iov[0].iov_len  = call->request_size;
 
353
 
 
354
        msg.msg_name            = NULL;
 
355
        msg.msg_namelen         = 0;
 
356
        msg.msg_iov             = (struct iovec *) iov;
 
357
        msg.msg_iovlen          = 1;
 
358
        msg.msg_control         = NULL;
 
359
        msg.msg_controllen      = 0;
 
360
        msg.msg_flags           = (call->send_pages ? MSG_MORE : 0);
 
361
 
 
362
        /* have to change the state *before* sending the last packet as RxRPC
 
363
         * might give us the reply before it returns from sending the
 
364
         * request */
 
365
        if (!call->send_pages)
 
366
                call->state = AFS_CALL_AWAIT_REPLY;
 
367
        ret = rxrpc_kernel_send_data(rxcall, &msg, call->request_size);
 
368
        if (ret < 0)
 
369
                goto error_do_abort;
 
370
 
 
371
        if (call->send_pages) {
 
372
                ret = afs_send_pages(call, &msg, iov);
 
373
                if (ret < 0)
 
374
                        goto error_do_abort;
 
375
        }
 
376
 
 
377
        /* at this point, an async call may no longer exist as it may have
 
378
         * already completed */
 
379
        return wait_mode->wait(call);
 
380
 
 
381
error_do_abort:
 
382
        rxrpc_kernel_abort_call(rxcall, RX_USER_ABORT);
 
383
        rxrpc_kernel_end_call(rxcall);
 
384
        call->rxcall = NULL;
 
385
error_kill_call:
 
386
        call->type->destructor(call);
 
387
        afs_free_call(call);
 
388
        _leave(" = %d", ret);
 
389
        return ret;
 
390
}
 
391
 
 
392
/*
 
393
 * handles intercepted messages that were arriving in the socket's Rx queue
 
394
 * - called with the socket receive queue lock held to ensure message ordering
 
395
 * - called with softirqs disabled
 
396
 */
 
397
static void afs_rx_interceptor(struct sock *sk, unsigned long user_call_ID,
 
398
                               struct sk_buff *skb)
 
399
{
 
400
        struct afs_call *call = (struct afs_call *) user_call_ID;
 
401
 
 
402
        _enter("%p,,%u", call, skb->mark);
 
403
 
 
404
        _debug("ICPT %p{%u} [%d]",
 
405
               skb, skb->mark, atomic_read(&afs_outstanding_skbs));
 
406
 
 
407
        ASSERTCMP(sk, ==, afs_socket->sk);
 
408
        atomic_inc(&afs_outstanding_skbs);
 
409
 
 
410
        if (!call) {
 
411
                /* its an incoming call for our callback service */
 
412
                skb_queue_tail(&afs_incoming_calls, skb);
 
413
                queue_work(afs_wq, &afs_collect_incoming_call_work);
 
414
        } else {
 
415
                /* route the messages directly to the appropriate call */
 
416
                skb_queue_tail(&call->rx_queue, skb);
 
417
                call->wait_mode->rx_wakeup(call);
 
418
        }
 
419
 
 
420
        _leave("");
 
421
}
 
422
 
 
423
/*
 
424
 * deliver messages to a call
 
425
 */
 
426
static void afs_deliver_to_call(struct afs_call *call)
 
427
{
 
428
        struct sk_buff *skb;
 
429
        bool last;
 
430
        u32 abort_code;
 
431
        int ret;
 
432
 
 
433
        _enter("");
 
434
 
 
435
        while ((call->state == AFS_CALL_AWAIT_REPLY ||
 
436
                call->state == AFS_CALL_AWAIT_OP_ID ||
 
437
                call->state == AFS_CALL_AWAIT_REQUEST ||
 
438
                call->state == AFS_CALL_AWAIT_ACK) &&
 
439
               (skb = skb_dequeue(&call->rx_queue))) {
 
440
                switch (skb->mark) {
 
441
                case RXRPC_SKB_MARK_DATA:
 
442
                        _debug("Rcv DATA");
 
443
                        last = rxrpc_kernel_is_data_last(skb);
 
444
                        ret = call->type->deliver(call, skb, last);
 
445
                        switch (ret) {
 
446
                        case 0:
 
447
                                if (last &&
 
448
                                    call->state == AFS_CALL_AWAIT_REPLY)
 
449
                                        call->state = AFS_CALL_COMPLETE;
 
450
                                break;
 
451
                        case -ENOTCONN:
 
452
                                abort_code = RX_CALL_DEAD;
 
453
                                goto do_abort;
 
454
                        case -ENOTSUPP:
 
455
                                abort_code = RX_INVALID_OPERATION;
 
456
                                goto do_abort;
 
457
                        default:
 
458
                                abort_code = RXGEN_CC_UNMARSHAL;
 
459
                                if (call->state != AFS_CALL_AWAIT_REPLY)
 
460
                                        abort_code = RXGEN_SS_UNMARSHAL;
 
461
                        do_abort:
 
462
                                rxrpc_kernel_abort_call(call->rxcall,
 
463
                                                        abort_code);
 
464
                                call->error = ret;
 
465
                                call->state = AFS_CALL_ERROR;
 
466
                                break;
 
467
                        }
 
468
                        afs_data_delivered(skb);
 
469
                        skb = NULL;
 
470
                        continue;
 
471
                case RXRPC_SKB_MARK_FINAL_ACK:
 
472
                        _debug("Rcv ACK");
 
473
                        call->state = AFS_CALL_COMPLETE;
 
474
                        break;
 
475
                case RXRPC_SKB_MARK_BUSY:
 
476
                        _debug("Rcv BUSY");
 
477
                        call->error = -EBUSY;
 
478
                        call->state = AFS_CALL_BUSY;
 
479
                        break;
 
480
                case RXRPC_SKB_MARK_REMOTE_ABORT:
 
481
                        abort_code = rxrpc_kernel_get_abort_code(skb);
 
482
                        call->error = call->type->abort_to_error(abort_code);
 
483
                        call->state = AFS_CALL_ABORTED;
 
484
                        _debug("Rcv ABORT %u -> %d", abort_code, call->error);
 
485
                        break;
 
486
                case RXRPC_SKB_MARK_NET_ERROR:
 
487
                        call->error = -rxrpc_kernel_get_error_number(skb);
 
488
                        call->state = AFS_CALL_ERROR;
 
489
                        _debug("Rcv NET ERROR %d", call->error);
 
490
                        break;
 
491
                case RXRPC_SKB_MARK_LOCAL_ERROR:
 
492
                        call->error = -rxrpc_kernel_get_error_number(skb);
 
493
                        call->state = AFS_CALL_ERROR;
 
494
                        _debug("Rcv LOCAL ERROR %d", call->error);
 
495
                        break;
 
496
                default:
 
497
                        BUG();
 
498
                        break;
 
499
                }
 
500
 
 
501
                afs_free_skb(skb);
 
502
        }
 
503
 
 
504
        /* make sure the queue is empty if the call is done with (we might have
 
505
         * aborted the call early because of an unmarshalling error) */
 
506
        if (call->state >= AFS_CALL_COMPLETE) {
 
507
                while ((skb = skb_dequeue(&call->rx_queue)))
 
508
                        afs_free_skb(skb);
 
509
                if (call->incoming) {
 
510
                        rxrpc_kernel_end_call(call->rxcall);
 
511
                        call->rxcall = NULL;
 
512
                        call->type->destructor(call);
 
513
                        afs_free_call(call);
 
514
                }
 
515
        }
 
516
 
 
517
        _leave("");
 
518
}
 
519
 
 
520
/*
 
521
 * wait synchronously for a call to complete
 
522
 */
 
523
static int afs_wait_for_call_to_complete(struct afs_call *call)
 
524
{
 
525
        struct sk_buff *skb;
 
526
        int ret;
 
527
 
 
528
        DECLARE_WAITQUEUE(myself, current);
 
529
 
 
530
        _enter("");
 
531
 
 
532
        add_wait_queue(&call->waitq, &myself);
 
533
        for (;;) {
 
534
                set_current_state(TASK_INTERRUPTIBLE);
 
535
 
 
536
                /* deliver any messages that are in the queue */
 
537
                if (!skb_queue_empty(&call->rx_queue)) {
 
538
                        __set_current_state(TASK_RUNNING);
 
539
                        afs_deliver_to_call(call);
 
540
                        continue;
 
541
                }
 
542
 
 
543
                ret = call->error;
 
544
                if (call->state >= AFS_CALL_COMPLETE)
 
545
                        break;
 
546
                ret = -EINTR;
 
547
                if (signal_pending(current))
 
548
                        break;
 
549
                schedule();
 
550
        }
 
551
 
 
552
        remove_wait_queue(&call->waitq, &myself);
 
553
        __set_current_state(TASK_RUNNING);
 
554
 
 
555
        /* kill the call */
 
556
        if (call->state < AFS_CALL_COMPLETE) {
 
557
                _debug("call incomplete");
 
558
                rxrpc_kernel_abort_call(call->rxcall, RX_CALL_DEAD);
 
559
                while ((skb = skb_dequeue(&call->rx_queue)))
 
560
                        afs_free_skb(skb);
 
561
        }
 
562
 
 
563
        _debug("call complete");
 
564
        rxrpc_kernel_end_call(call->rxcall);
 
565
        call->rxcall = NULL;
 
566
        call->type->destructor(call);
 
567
        afs_free_call(call);
 
568
        _leave(" = %d", ret);
 
569
        return ret;
 
570
}
 
571
 
 
572
/*
 
573
 * wake up a waiting call
 
574
 */
 
575
static void afs_wake_up_call_waiter(struct afs_call *call)
 
576
{
 
577
        wake_up(&call->waitq);
 
578
}
 
579
 
 
580
/*
 
581
 * wake up an asynchronous call
 
582
 */
 
583
static void afs_wake_up_async_call(struct afs_call *call)
 
584
{
 
585
        _enter("");
 
586
        queue_work(afs_async_calls, &call->async_work);
 
587
}
 
588
 
 
589
/*
 
590
 * put a call into asynchronous mode
 
591
 * - mustn't touch the call descriptor as the call my have completed by the
 
592
 *   time we get here
 
593
 */
 
594
static int afs_dont_wait_for_call_to_complete(struct afs_call *call)
 
595
{
 
596
        _enter("");
 
597
        return -EINPROGRESS;
 
598
}
 
599
 
 
600
/*
 
601
 * delete an asynchronous call
 
602
 */
 
603
static void afs_delete_async_call(struct work_struct *work)
 
604
{
 
605
        struct afs_call *call =
 
606
                container_of(work, struct afs_call, async_work);
 
607
 
 
608
        _enter("");
 
609
 
 
610
        afs_free_call(call);
 
611
 
 
612
        _leave("");
 
613
}
 
614
 
 
615
/*
 
616
 * perform processing on an asynchronous call
 
617
 * - on a multiple-thread workqueue this work item may try to run on several
 
618
 *   CPUs at the same time
 
619
 */
 
620
static void afs_process_async_call(struct work_struct *work)
 
621
{
 
622
        struct afs_call *call =
 
623
                container_of(work, struct afs_call, async_work);
 
624
 
 
625
        _enter("");
 
626
 
 
627
        if (!skb_queue_empty(&call->rx_queue))
 
628
                afs_deliver_to_call(call);
 
629
 
 
630
        if (call->state >= AFS_CALL_COMPLETE && call->wait_mode) {
 
631
                if (call->wait_mode->async_complete)
 
632
                        call->wait_mode->async_complete(call->reply,
 
633
                                                        call->error);
 
634
                call->reply = NULL;
 
635
 
 
636
                /* kill the call */
 
637
                rxrpc_kernel_end_call(call->rxcall);
 
638
                call->rxcall = NULL;
 
639
                if (call->type->destructor)
 
640
                        call->type->destructor(call);
 
641
 
 
642
                /* we can't just delete the call because the work item may be
 
643
                 * queued */
 
644
                PREPARE_WORK(&call->async_work, afs_delete_async_call);
 
645
                queue_work(afs_async_calls, &call->async_work);
 
646
        }
 
647
 
 
648
        _leave("");
 
649
}
 
650
 
 
651
/*
 
652
 * empty a socket buffer into a flat reply buffer
 
653
 */
 
654
void afs_transfer_reply(struct afs_call *call, struct sk_buff *skb)
 
655
{
 
656
        size_t len = skb->len;
 
657
 
 
658
        if (skb_copy_bits(skb, 0, call->buffer + call->reply_size, len) < 0)
 
659
                BUG();
 
660
        call->reply_size += len;
 
661
}
 
662
 
 
663
/*
 
664
 * accept the backlog of incoming calls
 
665
 */
 
666
static void afs_collect_incoming_call(struct work_struct *work)
 
667
{
 
668
        struct rxrpc_call *rxcall;
 
669
        struct afs_call *call = NULL;
 
670
        struct sk_buff *skb;
 
671
 
 
672
        while ((skb = skb_dequeue(&afs_incoming_calls))) {
 
673
                _debug("new call");
 
674
 
 
675
                /* don't need the notification */
 
676
                afs_free_skb(skb);
 
677
 
 
678
                if (!call) {
 
679
                        call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
 
680
                        if (!call) {
 
681
                                rxrpc_kernel_reject_call(afs_socket);
 
682
                                return;
 
683
                        }
 
684
 
 
685
                        INIT_WORK(&call->async_work, afs_process_async_call);
 
686
                        call->wait_mode = &afs_async_incoming_call;
 
687
                        call->type = &afs_RXCMxxxx;
 
688
                        init_waitqueue_head(&call->waitq);
 
689
                        skb_queue_head_init(&call->rx_queue);
 
690
                        call->state = AFS_CALL_AWAIT_OP_ID;
 
691
 
 
692
                        _debug("CALL %p{%s} [%d]",
 
693
                               call, call->type->name,
 
694
                               atomic_read(&afs_outstanding_calls));
 
695
                        atomic_inc(&afs_outstanding_calls);
 
696
                }
 
697
 
 
698
                rxcall = rxrpc_kernel_accept_call(afs_socket,
 
699
                                                  (unsigned long) call);
 
700
                if (!IS_ERR(rxcall)) {
 
701
                        call->rxcall = rxcall;
 
702
                        call = NULL;
 
703
                }
 
704
        }
 
705
 
 
706
        if (call)
 
707
                afs_free_call(call);
 
708
}
 
709
 
 
710
/*
 
711
 * grab the operation ID from an incoming cache manager call
 
712
 */
 
713
static int afs_deliver_cm_op_id(struct afs_call *call, struct sk_buff *skb,
 
714
                                bool last)
 
715
{
 
716
        size_t len = skb->len;
 
717
        void *oibuf = (void *) &call->operation_ID;
 
718
 
 
719
        _enter("{%u},{%zu},%d", call->offset, len, last);
 
720
 
 
721
        ASSERTCMP(call->offset, <, 4);
 
722
 
 
723
        /* the operation ID forms the first four bytes of the request data */
 
724
        len = min_t(size_t, len, 4 - call->offset);
 
725
        if (skb_copy_bits(skb, 0, oibuf + call->offset, len) < 0)
 
726
                BUG();
 
727
        if (!pskb_pull(skb, len))
 
728
                BUG();
 
729
        call->offset += len;
 
730
 
 
731
        if (call->offset < 4) {
 
732
                if (last) {
 
733
                        _leave(" = -EBADMSG [op ID short]");
 
734
                        return -EBADMSG;
 
735
                }
 
736
                _leave(" = 0 [incomplete]");
 
737
                return 0;
 
738
        }
 
739
 
 
740
        call->state = AFS_CALL_AWAIT_REQUEST;
 
741
 
 
742
        /* ask the cache manager to route the call (it'll change the call type
 
743
         * if successful) */
 
744
        if (!afs_cm_incoming_call(call))
 
745
                return -ENOTSUPP;
 
746
 
 
747
        /* pass responsibility for the remainer of this message off to the
 
748
         * cache manager op */
 
749
        return call->type->deliver(call, skb, last);
 
750
}
 
751
 
 
752
/*
 
753
 * send an empty reply
 
754
 */
 
755
void afs_send_empty_reply(struct afs_call *call)
 
756
{
 
757
        struct msghdr msg;
 
758
        struct iovec iov[1];
 
759
 
 
760
        _enter("");
 
761
 
 
762
        iov[0].iov_base         = NULL;
 
763
        iov[0].iov_len          = 0;
 
764
        msg.msg_name            = NULL;
 
765
        msg.msg_namelen         = 0;
 
766
        msg.msg_iov             = iov;
 
767
        msg.msg_iovlen          = 0;
 
768
        msg.msg_control         = NULL;
 
769
        msg.msg_controllen      = 0;
 
770
        msg.msg_flags           = 0;
 
771
 
 
772
        call->state = AFS_CALL_AWAIT_ACK;
 
773
        switch (rxrpc_kernel_send_data(call->rxcall, &msg, 0)) {
 
774
        case 0:
 
775
                _leave(" [replied]");
 
776
                return;
 
777
 
 
778
        case -ENOMEM:
 
779
                _debug("oom");
 
780
                rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
 
781
        default:
 
782
                rxrpc_kernel_end_call(call->rxcall);
 
783
                call->rxcall = NULL;
 
784
                call->type->destructor(call);
 
785
                afs_free_call(call);
 
786
                _leave(" [error]");
 
787
                return;
 
788
        }
 
789
}
 
790
 
 
791
/*
 
792
 * send a simple reply
 
793
 */
 
794
void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
 
795
{
 
796
        struct msghdr msg;
 
797
        struct iovec iov[1];
 
798
        int n;
 
799
 
 
800
        _enter("");
 
801
 
 
802
        iov[0].iov_base         = (void *) buf;
 
803
        iov[0].iov_len          = len;
 
804
        msg.msg_name            = NULL;
 
805
        msg.msg_namelen         = 0;
 
806
        msg.msg_iov             = iov;
 
807
        msg.msg_iovlen          = 1;
 
808
        msg.msg_control         = NULL;
 
809
        msg.msg_controllen      = 0;
 
810
        msg.msg_flags           = 0;
 
811
 
 
812
        call->state = AFS_CALL_AWAIT_ACK;
 
813
        n = rxrpc_kernel_send_data(call->rxcall, &msg, len);
 
814
        if (n >= 0) {
 
815
                _leave(" [replied]");
 
816
                return;
 
817
        }
 
818
        if (n == -ENOMEM) {
 
819
                _debug("oom");
 
820
                rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
 
821
        }
 
822
        rxrpc_kernel_end_call(call->rxcall);
 
823
        call->rxcall = NULL;
 
824
        call->type->destructor(call);
 
825
        afs_free_call(call);
 
826
        _leave(" [error]");
 
827
}
 
828
 
 
829
/*
 
830
 * extract a piece of data from the received data socket buffers
 
831
 */
 
832
int afs_extract_data(struct afs_call *call, struct sk_buff *skb,
 
833
                     bool last, void *buf, size_t count)
 
834
{
 
835
        size_t len = skb->len;
 
836
 
 
837
        _enter("{%u},{%zu},%d,,%zu", call->offset, len, last, count);
 
838
 
 
839
        ASSERTCMP(call->offset, <, count);
 
840
 
 
841
        len = min_t(size_t, len, count - call->offset);
 
842
        if (skb_copy_bits(skb, 0, buf + call->offset, len) < 0 ||
 
843
            !pskb_pull(skb, len))
 
844
                BUG();
 
845
        call->offset += len;
 
846
 
 
847
        if (call->offset < count) {
 
848
                if (last) {
 
849
                        _leave(" = -EBADMSG [%d < %zu]", call->offset, count);
 
850
                        return -EBADMSG;
 
851
                }
 
852
                _leave(" = -EAGAIN");
 
853
                return -EAGAIN;
 
854
        }
 
855
        return 0;
 
856
}