~ubuntu-branches/ubuntu/precise/linux-lowlatency/precise

« back to all changes in this revision

Viewing changes to net/sunrpc/xprt.c

  • Committer: Package Import Robot
  • Author(s): Alessio Igor Bogani
  • Date: 2011-10-26 11:13:05 UTC
  • Revision ID: package-import@ubuntu.com-20111026111305-tz023xykf0i6eosh
Tags: upstream-3.2.0
ImportĀ upstreamĀ versionĀ 3.2.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 *  linux/net/sunrpc/xprt.c
 
3
 *
 
4
 *  This is a generic RPC call interface supporting congestion avoidance,
 
5
 *  and asynchronous calls.
 
6
 *
 
7
 *  The interface works like this:
 
8
 *
 
9
 *  -   When a process places a call, it allocates a request slot if
 
10
 *      one is available. Otherwise, it sleeps on the backlog queue
 
11
 *      (xprt_reserve).
 
12
 *  -   Next, the caller puts together the RPC message, stuffs it into
 
13
 *      the request struct, and calls xprt_transmit().
 
14
 *  -   xprt_transmit sends the message and installs the caller on the
 
15
 *      transport's wait list. At the same time, if a reply is expected,
 
16
 *      it installs a timer that is run after the packet's timeout has
 
17
 *      expired.
 
18
 *  -   When a packet arrives, the data_ready handler walks the list of
 
19
 *      pending requests for that transport. If a matching XID is found, the
 
20
 *      caller is woken up, and the timer removed.
 
21
 *  -   When no reply arrives within the timeout interval, the timer is
 
22
 *      fired by the kernel and runs xprt_timer(). It either adjusts the
 
23
 *      timeout values (minor timeout) or wakes up the caller with a status
 
24
 *      of -ETIMEDOUT.
 
25
 *  -   When the caller receives a notification from RPC that a reply arrived,
 
26
 *      it should release the RPC slot, and process the reply.
 
27
 *      If the call timed out, it may choose to retry the operation by
 
28
 *      adjusting the initial timeout value, and simply calling rpc_call
 
29
 *      again.
 
30
 *
 
31
 *  Support for async RPC is done through a set of RPC-specific scheduling
 
32
 *  primitives that `transparently' work for processes as well as async
 
33
 *  tasks that rely on callbacks.
 
34
 *
 
35
 *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
 
36
 *
 
37
 *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
 
38
 */
 
39
 
 
40
#include <linux/module.h>
 
41
 
 
42
#include <linux/types.h>
 
43
#include <linux/interrupt.h>
 
44
#include <linux/workqueue.h>
 
45
#include <linux/net.h>
 
46
#include <linux/ktime.h>
 
47
 
 
48
#include <linux/sunrpc/clnt.h>
 
49
#include <linux/sunrpc/metrics.h>
 
50
#include <linux/sunrpc/bc_xprt.h>
 
51
 
 
52
#include "sunrpc.h"
 
53
 
 
54
/*
 
55
 * Local variables
 
56
 */
 
57
 
 
58
#ifdef RPC_DEBUG
 
59
# define RPCDBG_FACILITY        RPCDBG_XPRT
 
60
#endif
 
61
 
 
62
/*
 
63
 * Local functions
 
64
 */
 
65
static void      xprt_init(struct rpc_xprt *xprt, struct net *net);
 
66
static void     xprt_request_init(struct rpc_task *, struct rpc_xprt *);
 
67
static void     xprt_connect_status(struct rpc_task *task);
 
68
static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
 
69
 
 
70
static DEFINE_SPINLOCK(xprt_list_lock);
 
71
static LIST_HEAD(xprt_list);
 
72
 
 
73
/*
 
74
 * The transport code maintains an estimate on the maximum number of out-
 
75
 * standing RPC requests, using a smoothed version of the congestion
 
76
 * avoidance implemented in 44BSD. This is basically the Van Jacobson
 
77
 * congestion algorithm: If a retransmit occurs, the congestion window is
 
78
 * halved; otherwise, it is incremented by 1/cwnd when
 
79
 *
 
80
 *      -       a reply is received and
 
81
 *      -       a full number of requests are outstanding and
 
82
 *      -       the congestion window hasn't been updated recently.
 
83
 */
 
84
#define RPC_CWNDSHIFT           (8U)
 
85
#define RPC_CWNDSCALE           (1U << RPC_CWNDSHIFT)
 
86
#define RPC_INITCWND            RPC_CWNDSCALE
 
87
#define RPC_MAXCWND(xprt)       ((xprt)->max_reqs << RPC_CWNDSHIFT)
 
88
 
 
89
#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
 
90
 
 
91
/**
 
92
 * xprt_register_transport - register a transport implementation
 
93
 * @transport: transport to register
 
94
 *
 
95
 * If a transport implementation is loaded as a kernel module, it can
 
96
 * call this interface to make itself known to the RPC client.
 
97
 *
 
98
 * Returns:
 
99
 * 0:           transport successfully registered
 
100
 * -EEXIST:     transport already registered
 
101
 * -EINVAL:     transport module being unloaded
 
102
 */
 
103
int xprt_register_transport(struct xprt_class *transport)
 
104
{
 
105
        struct xprt_class *t;
 
106
        int result;
 
107
 
 
108
        result = -EEXIST;
 
109
        spin_lock(&xprt_list_lock);
 
110
        list_for_each_entry(t, &xprt_list, list) {
 
111
                /* don't register the same transport class twice */
 
112
                if (t->ident == transport->ident)
 
113
                        goto out;
 
114
        }
 
115
 
 
116
        list_add_tail(&transport->list, &xprt_list);
 
117
        printk(KERN_INFO "RPC: Registered %s transport module.\n",
 
118
               transport->name);
 
119
        result = 0;
 
120
 
 
121
out:
 
122
        spin_unlock(&xprt_list_lock);
 
123
        return result;
 
124
}
 
125
EXPORT_SYMBOL_GPL(xprt_register_transport);
 
126
 
 
127
/**
 
128
 * xprt_unregister_transport - unregister a transport implementation
 
129
 * @transport: transport to unregister
 
130
 *
 
131
 * Returns:
 
132
 * 0:           transport successfully unregistered
 
133
 * -ENOENT:     transport never registered
 
134
 */
 
135
int xprt_unregister_transport(struct xprt_class *transport)
 
136
{
 
137
        struct xprt_class *t;
 
138
        int result;
 
139
 
 
140
        result = 0;
 
141
        spin_lock(&xprt_list_lock);
 
142
        list_for_each_entry(t, &xprt_list, list) {
 
143
                if (t == transport) {
 
144
                        printk(KERN_INFO
 
145
                                "RPC: Unregistered %s transport module.\n",
 
146
                                transport->name);
 
147
                        list_del_init(&transport->list);
 
148
                        goto out;
 
149
                }
 
150
        }
 
151
        result = -ENOENT;
 
152
 
 
153
out:
 
154
        spin_unlock(&xprt_list_lock);
 
155
        return result;
 
156
}
 
157
EXPORT_SYMBOL_GPL(xprt_unregister_transport);
 
158
 
 
159
/**
 
160
 * xprt_load_transport - load a transport implementation
 
161
 * @transport_name: transport to load
 
162
 *
 
163
 * Returns:
 
164
 * 0:           transport successfully loaded
 
165
 * -ENOENT:     transport module not available
 
166
 */
 
167
int xprt_load_transport(const char *transport_name)
 
168
{
 
169
        struct xprt_class *t;
 
170
        int result;
 
171
 
 
172
        result = 0;
 
173
        spin_lock(&xprt_list_lock);
 
174
        list_for_each_entry(t, &xprt_list, list) {
 
175
                if (strcmp(t->name, transport_name) == 0) {
 
176
                        spin_unlock(&xprt_list_lock);
 
177
                        goto out;
 
178
                }
 
179
        }
 
180
        spin_unlock(&xprt_list_lock);
 
181
        result = request_module("xprt%s", transport_name);
 
182
out:
 
183
        return result;
 
184
}
 
185
EXPORT_SYMBOL_GPL(xprt_load_transport);
 
186
 
 
187
/**
 
188
 * xprt_reserve_xprt - serialize write access to transports
 
189
 * @task: task that is requesting access to the transport
 
190
 * @xprt: pointer to the target transport
 
191
 *
 
192
 * This prevents mixing the payload of separate requests, and prevents
 
193
 * transport connects from colliding with writes.  No congestion control
 
194
 * is provided.
 
195
 */
 
196
int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 
197
{
 
198
        struct rpc_rqst *req = task->tk_rqstp;
 
199
        int priority;
 
200
 
 
201
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 
202
                if (task == xprt->snd_task)
 
203
                        return 1;
 
204
                goto out_sleep;
 
205
        }
 
206
        xprt->snd_task = task;
 
207
        if (req != NULL) {
 
208
                req->rq_bytes_sent = 0;
 
209
                req->rq_ntrans++;
 
210
        }
 
211
 
 
212
        return 1;
 
213
 
 
214
out_sleep:
 
215
        dprintk("RPC: %5u failed to lock transport %p\n",
 
216
                        task->tk_pid, xprt);
 
217
        task->tk_timeout = 0;
 
218
        task->tk_status = -EAGAIN;
 
219
        if (req == NULL)
 
220
                priority = RPC_PRIORITY_LOW;
 
221
        else if (!req->rq_ntrans)
 
222
                priority = RPC_PRIORITY_NORMAL;
 
223
        else
 
224
                priority = RPC_PRIORITY_HIGH;
 
225
        rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
 
226
        return 0;
 
227
}
 
228
EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
 
229
 
 
230
static void xprt_clear_locked(struct rpc_xprt *xprt)
 
231
{
 
232
        xprt->snd_task = NULL;
 
233
        if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) {
 
234
                smp_mb__before_clear_bit();
 
235
                clear_bit(XPRT_LOCKED, &xprt->state);
 
236
                smp_mb__after_clear_bit();
 
237
        } else
 
238
                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 
239
}
 
240
 
 
241
/*
 
242
 * xprt_reserve_xprt_cong - serialize write access to transports
 
243
 * @task: task that is requesting access to the transport
 
244
 *
 
245
 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
 
246
 * integrated into the decision of whether a request is allowed to be
 
247
 * woken up and given access to the transport.
 
248
 */
 
249
int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 
250
{
 
251
        struct rpc_rqst *req = task->tk_rqstp;
 
252
        int priority;
 
253
 
 
254
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 
255
                if (task == xprt->snd_task)
 
256
                        return 1;
 
257
                goto out_sleep;
 
258
        }
 
259
        if (req == NULL) {
 
260
                xprt->snd_task = task;
 
261
                return 1;
 
262
        }
 
263
        if (__xprt_get_cong(xprt, task)) {
 
264
                xprt->snd_task = task;
 
265
                req->rq_bytes_sent = 0;
 
266
                req->rq_ntrans++;
 
267
                return 1;
 
268
        }
 
269
        xprt_clear_locked(xprt);
 
270
out_sleep:
 
271
        dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
 
272
        task->tk_timeout = 0;
 
273
        task->tk_status = -EAGAIN;
 
274
        if (req == NULL)
 
275
                priority = RPC_PRIORITY_LOW;
 
276
        else if (!req->rq_ntrans)
 
277
                priority = RPC_PRIORITY_NORMAL;
 
278
        else
 
279
                priority = RPC_PRIORITY_HIGH;
 
280
        rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
 
281
        return 0;
 
282
}
 
283
EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
 
284
 
 
285
static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
 
286
{
 
287
        int retval;
 
288
 
 
289
        spin_lock_bh(&xprt->transport_lock);
 
290
        retval = xprt->ops->reserve_xprt(xprt, task);
 
291
        spin_unlock_bh(&xprt->transport_lock);
 
292
        return retval;
 
293
}
 
294
 
 
295
static void __xprt_lock_write_next(struct rpc_xprt *xprt)
 
296
{
 
297
        struct rpc_task *task;
 
298
        struct rpc_rqst *req;
 
299
 
 
300
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 
301
                return;
 
302
 
 
303
        task = rpc_wake_up_next(&xprt->sending);
 
304
        if (task == NULL)
 
305
                goto out_unlock;
 
306
 
 
307
        req = task->tk_rqstp;
 
308
        xprt->snd_task = task;
 
309
        if (req) {
 
310
                req->rq_bytes_sent = 0;
 
311
                req->rq_ntrans++;
 
312
        }
 
313
        return;
 
314
 
 
315
out_unlock:
 
316
        xprt_clear_locked(xprt);
 
317
}
 
318
 
 
319
static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 
320
{
 
321
        struct rpc_task *task;
 
322
        struct rpc_rqst *req;
 
323
 
 
324
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 
325
                return;
 
326
        if (RPCXPRT_CONGESTED(xprt))
 
327
                goto out_unlock;
 
328
        task = rpc_wake_up_next(&xprt->sending);
 
329
        if (task == NULL)
 
330
                goto out_unlock;
 
331
 
 
332
        req = task->tk_rqstp;
 
333
        if (req == NULL) {
 
334
                xprt->snd_task = task;
 
335
                return;
 
336
        }
 
337
        if (__xprt_get_cong(xprt, task)) {
 
338
                xprt->snd_task = task;
 
339
                req->rq_bytes_sent = 0;
 
340
                req->rq_ntrans++;
 
341
                return;
 
342
        }
 
343
out_unlock:
 
344
        xprt_clear_locked(xprt);
 
345
}
 
346
 
 
347
/**
 
348
 * xprt_release_xprt - allow other requests to use a transport
 
349
 * @xprt: transport with other tasks potentially waiting
 
350
 * @task: task that is releasing access to the transport
 
351
 *
 
352
 * Note that "task" can be NULL.  No congestion control is provided.
 
353
 */
 
354
void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 
355
{
 
356
        if (xprt->snd_task == task) {
 
357
                xprt_clear_locked(xprt);
 
358
                __xprt_lock_write_next(xprt);
 
359
        }
 
360
}
 
361
EXPORT_SYMBOL_GPL(xprt_release_xprt);
 
362
 
 
363
/**
 
364
 * xprt_release_xprt_cong - allow other requests to use a transport
 
365
 * @xprt: transport with other tasks potentially waiting
 
366
 * @task: task that is releasing access to the transport
 
367
 *
 
368
 * Note that "task" can be NULL.  Another task is awoken to use the
 
369
 * transport if the transport's congestion window allows it.
 
370
 */
 
371
void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 
372
{
 
373
        if (xprt->snd_task == task) {
 
374
                xprt_clear_locked(xprt);
 
375
                __xprt_lock_write_next_cong(xprt);
 
376
        }
 
377
}
 
378
EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
 
379
 
 
380
static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
 
381
{
 
382
        spin_lock_bh(&xprt->transport_lock);
 
383
        xprt->ops->release_xprt(xprt, task);
 
384
        spin_unlock_bh(&xprt->transport_lock);
 
385
}
 
386
 
 
387
/*
 
388
 * Van Jacobson congestion avoidance. Check if the congestion window
 
389
 * overflowed. Put the task to sleep if this is the case.
 
390
 */
 
391
static int
 
392
__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 
393
{
 
394
        struct rpc_rqst *req = task->tk_rqstp;
 
395
 
 
396
        if (req->rq_cong)
 
397
                return 1;
 
398
        dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
 
399
                        task->tk_pid, xprt->cong, xprt->cwnd);
 
400
        if (RPCXPRT_CONGESTED(xprt))
 
401
                return 0;
 
402
        req->rq_cong = 1;
 
403
        xprt->cong += RPC_CWNDSCALE;
 
404
        return 1;
 
405
}
 
406
 
 
407
/*
 
408
 * Adjust the congestion window, and wake up the next task
 
409
 * that has been sleeping due to congestion
 
410
 */
 
411
static void
 
412
__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
 
413
{
 
414
        if (!req->rq_cong)
 
415
                return;
 
416
        req->rq_cong = 0;
 
417
        xprt->cong -= RPC_CWNDSCALE;
 
418
        __xprt_lock_write_next_cong(xprt);
 
419
}
 
420
 
 
421
/**
 
422
 * xprt_release_rqst_cong - housekeeping when request is complete
 
423
 * @task: RPC request that recently completed
 
424
 *
 
425
 * Useful for transports that require congestion control.
 
426
 */
 
427
void xprt_release_rqst_cong(struct rpc_task *task)
 
428
{
 
429
        __xprt_put_cong(task->tk_xprt, task->tk_rqstp);
 
430
}
 
431
EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
 
432
 
 
433
/**
 
434
 * xprt_adjust_cwnd - adjust transport congestion window
 
435
 * @task: recently completed RPC request used to adjust window
 
436
 * @result: result code of completed RPC request
 
437
 *
 
438
 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
 
439
 */
 
440
void xprt_adjust_cwnd(struct rpc_task *task, int result)
 
441
{
 
442
        struct rpc_rqst *req = task->tk_rqstp;
 
443
        struct rpc_xprt *xprt = task->tk_xprt;
 
444
        unsigned long cwnd = xprt->cwnd;
 
445
 
 
446
        if (result >= 0 && cwnd <= xprt->cong) {
 
447
                /* The (cwnd >> 1) term makes sure
 
448
                 * the result gets rounded properly. */
 
449
                cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
 
450
                if (cwnd > RPC_MAXCWND(xprt))
 
451
                        cwnd = RPC_MAXCWND(xprt);
 
452
                __xprt_lock_write_next_cong(xprt);
 
453
        } else if (result == -ETIMEDOUT) {
 
454
                cwnd >>= 1;
 
455
                if (cwnd < RPC_CWNDSCALE)
 
456
                        cwnd = RPC_CWNDSCALE;
 
457
        }
 
458
        dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",
 
459
                        xprt->cong, xprt->cwnd, cwnd);
 
460
        xprt->cwnd = cwnd;
 
461
        __xprt_put_cong(xprt, req);
 
462
}
 
463
EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
 
464
 
 
465
/**
 
466
 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
 
467
 * @xprt: transport with waiting tasks
 
468
 * @status: result code to plant in each task before waking it
 
469
 *
 
470
 */
 
471
void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
 
472
{
 
473
        if (status < 0)
 
474
                rpc_wake_up_status(&xprt->pending, status);
 
475
        else
 
476
                rpc_wake_up(&xprt->pending);
 
477
}
 
478
EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
 
479
 
 
480
/**
 
481
 * xprt_wait_for_buffer_space - wait for transport output buffer to clear
 
482
 * @task: task to be put to sleep
 
483
 * @action: function pointer to be executed after wait
 
484
 */
 
485
void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)
 
486
{
 
487
        struct rpc_rqst *req = task->tk_rqstp;
 
488
        struct rpc_xprt *xprt = req->rq_xprt;
 
489
 
 
490
        task->tk_timeout = req->rq_timeout;
 
491
        rpc_sleep_on(&xprt->pending, task, action);
 
492
}
 
493
EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
 
494
 
 
495
/**
 
496
 * xprt_write_space - wake the task waiting for transport output buffer space
 
497
 * @xprt: transport with waiting tasks
 
498
 *
 
499
 * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
 
500
 */
 
501
void xprt_write_space(struct rpc_xprt *xprt)
 
502
{
 
503
        if (unlikely(xprt->shutdown))
 
504
                return;
 
505
 
 
506
        spin_lock_bh(&xprt->transport_lock);
 
507
        if (xprt->snd_task) {
 
508
                dprintk("RPC:       write space: waking waiting task on "
 
509
                                "xprt %p\n", xprt);
 
510
                rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
 
511
        }
 
512
        spin_unlock_bh(&xprt->transport_lock);
 
513
}
 
514
EXPORT_SYMBOL_GPL(xprt_write_space);
 
515
 
 
516
/**
 
517
 * xprt_set_retrans_timeout_def - set a request's retransmit timeout
 
518
 * @task: task whose timeout is to be set
 
519
 *
 
520
 * Set a request's retransmit timeout based on the transport's
 
521
 * default timeout parameters.  Used by transports that don't adjust
 
522
 * the retransmit timeout based on round-trip time estimation.
 
523
 */
 
524
void xprt_set_retrans_timeout_def(struct rpc_task *task)
 
525
{
 
526
        task->tk_timeout = task->tk_rqstp->rq_timeout;
 
527
}
 
528
EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
 
529
 
 
530
/*
 
531
 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
 
532
 * @task: task whose timeout is to be set
 
533
 *
 
534
 * Set a request's retransmit timeout using the RTT estimator.
 
535
 */
 
536
void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
 
537
{
 
538
        int timer = task->tk_msg.rpc_proc->p_timer;
 
539
        struct rpc_clnt *clnt = task->tk_client;
 
540
        struct rpc_rtt *rtt = clnt->cl_rtt;
 
541
        struct rpc_rqst *req = task->tk_rqstp;
 
542
        unsigned long max_timeout = clnt->cl_timeout->to_maxval;
 
543
 
 
544
        task->tk_timeout = rpc_calc_rto(rtt, timer);
 
545
        task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
 
546
        if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
 
547
                task->tk_timeout = max_timeout;
 
548
}
 
549
EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);
 
550
 
 
551
static void xprt_reset_majortimeo(struct rpc_rqst *req)
 
552
{
 
553
        const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 
554
 
 
555
        req->rq_majortimeo = req->rq_timeout;
 
556
        if (to->to_exponential)
 
557
                req->rq_majortimeo <<= to->to_retries;
 
558
        else
 
559
                req->rq_majortimeo += to->to_increment * to->to_retries;
 
560
        if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
 
561
                req->rq_majortimeo = to->to_maxval;
 
562
        req->rq_majortimeo += jiffies;
 
563
}
 
564
 
 
565
/**
 
566
 * xprt_adjust_timeout - adjust timeout values for next retransmit
 
567
 * @req: RPC request containing parameters to use for the adjustment
 
568
 *
 
569
 */
 
570
int xprt_adjust_timeout(struct rpc_rqst *req)
 
571
{
 
572
        struct rpc_xprt *xprt = req->rq_xprt;
 
573
        const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 
574
        int status = 0;
 
575
 
 
576
        if (time_before(jiffies, req->rq_majortimeo)) {
 
577
                if (to->to_exponential)
 
578
                        req->rq_timeout <<= 1;
 
579
                else
 
580
                        req->rq_timeout += to->to_increment;
 
581
                if (to->to_maxval && req->rq_timeout >= to->to_maxval)
 
582
                        req->rq_timeout = to->to_maxval;
 
583
                req->rq_retries++;
 
584
        } else {
 
585
                req->rq_timeout = to->to_initval;
 
586
                req->rq_retries = 0;
 
587
                xprt_reset_majortimeo(req);
 
588
                /* Reset the RTT counters == "slow start" */
 
589
                spin_lock_bh(&xprt->transport_lock);
 
590
                rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
 
591
                spin_unlock_bh(&xprt->transport_lock);
 
592
                status = -ETIMEDOUT;
 
593
        }
 
594
 
 
595
        if (req->rq_timeout == 0) {
 
596
                printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
 
597
                req->rq_timeout = 5 * HZ;
 
598
        }
 
599
        return status;
 
600
}
 
601
 
 
602
static void xprt_autoclose(struct work_struct *work)
 
603
{
 
604
        struct rpc_xprt *xprt =
 
605
                container_of(work, struct rpc_xprt, task_cleanup);
 
606
 
 
607
        xprt->ops->close(xprt);
 
608
        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 
609
        xprt_release_write(xprt, NULL);
 
610
}
 
611
 
 
612
/**
 
613
 * xprt_disconnect_done - mark a transport as disconnected
 
614
 * @xprt: transport to flag for disconnect
 
615
 *
 
616
 */
 
617
void xprt_disconnect_done(struct rpc_xprt *xprt)
 
618
{
 
619
        dprintk("RPC:       disconnected transport %p\n", xprt);
 
620
        spin_lock_bh(&xprt->transport_lock);
 
621
        xprt_clear_connected(xprt);
 
622
        xprt_wake_pending_tasks(xprt, -EAGAIN);
 
623
        spin_unlock_bh(&xprt->transport_lock);
 
624
}
 
625
EXPORT_SYMBOL_GPL(xprt_disconnect_done);
 
626
 
 
627
/**
 
628
 * xprt_force_disconnect - force a transport to disconnect
 
629
 * @xprt: transport to disconnect
 
630
 *
 
631
 */
 
632
void xprt_force_disconnect(struct rpc_xprt *xprt)
 
633
{
 
634
        /* Don't race with the test_bit() in xprt_clear_locked() */
 
635
        spin_lock_bh(&xprt->transport_lock);
 
636
        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 
637
        /* Try to schedule an autoclose RPC call */
 
638
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
 
639
                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 
640
        xprt_wake_pending_tasks(xprt, -EAGAIN);
 
641
        spin_unlock_bh(&xprt->transport_lock);
 
642
}
 
643
 
 
644
/**
 
645
 * xprt_conditional_disconnect - force a transport to disconnect
 
646
 * @xprt: transport to disconnect
 
647
 * @cookie: 'connection cookie'
 
648
 *
 
649
 * This attempts to break the connection if and only if 'cookie' matches
 
650
 * the current transport 'connection cookie'. It ensures that we don't
 
651
 * try to break the connection more than once when we need to retransmit
 
652
 * a batch of RPC requests.
 
653
 *
 
654
 */
 
655
void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
 
656
{
 
657
        /* Don't race with the test_bit() in xprt_clear_locked() */
 
658
        spin_lock_bh(&xprt->transport_lock);
 
659
        if (cookie != xprt->connect_cookie)
 
660
                goto out;
 
661
        if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt))
 
662
                goto out;
 
663
        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 
664
        /* Try to schedule an autoclose RPC call */
 
665
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
 
666
                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 
667
        xprt_wake_pending_tasks(xprt, -EAGAIN);
 
668
out:
 
669
        spin_unlock_bh(&xprt->transport_lock);
 
670
}
 
671
 
 
672
static void
 
673
xprt_init_autodisconnect(unsigned long data)
 
674
{
 
675
        struct rpc_xprt *xprt = (struct rpc_xprt *)data;
 
676
 
 
677
        spin_lock(&xprt->transport_lock);
 
678
        if (!list_empty(&xprt->recv) || xprt->shutdown)
 
679
                goto out_abort;
 
680
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 
681
                goto out_abort;
 
682
        spin_unlock(&xprt->transport_lock);
 
683
        set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
 
684
        queue_work(rpciod_workqueue, &xprt->task_cleanup);
 
685
        return;
 
686
out_abort:
 
687
        spin_unlock(&xprt->transport_lock);
 
688
}
 
689
 
 
690
/**
 
691
 * xprt_connect - schedule a transport connect operation
 
692
 * @task: RPC task that is requesting the connect
 
693
 *
 
694
 */
 
695
void xprt_connect(struct rpc_task *task)
 
696
{
 
697
        struct rpc_xprt *xprt = task->tk_xprt;
 
698
 
 
699
        dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
 
700
                        xprt, (xprt_connected(xprt) ? "is" : "is not"));
 
701
 
 
702
        if (!xprt_bound(xprt)) {
 
703
                task->tk_status = -EAGAIN;
 
704
                return;
 
705
        }
 
706
        if (!xprt_lock_write(xprt, task))
 
707
                return;
 
708
 
 
709
        if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state))
 
710
                xprt->ops->close(xprt);
 
711
 
 
712
        if (xprt_connected(xprt))
 
713
                xprt_release_write(xprt, task);
 
714
        else {
 
715
                if (task->tk_rqstp)
 
716
                        task->tk_rqstp->rq_bytes_sent = 0;
 
717
 
 
718
                task->tk_timeout = task->tk_rqstp->rq_timeout;
 
719
                rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
 
720
 
 
721
                if (test_bit(XPRT_CLOSING, &xprt->state))
 
722
                        return;
 
723
                if (xprt_test_and_set_connecting(xprt))
 
724
                        return;
 
725
                xprt->stat.connect_start = jiffies;
 
726
                xprt->ops->connect(task);
 
727
        }
 
728
}
 
729
 
 
730
static void xprt_connect_status(struct rpc_task *task)
 
731
{
 
732
        struct rpc_xprt *xprt = task->tk_xprt;
 
733
 
 
734
        if (task->tk_status == 0) {
 
735
                xprt->stat.connect_count++;
 
736
                xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start;
 
737
                dprintk("RPC: %5u xprt_connect_status: connection established\n",
 
738
                                task->tk_pid);
 
739
                return;
 
740
        }
 
741
 
 
742
        switch (task->tk_status) {
 
743
        case -EAGAIN:
 
744
                dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid);
 
745
                break;
 
746
        case -ETIMEDOUT:
 
747
                dprintk("RPC: %5u xprt_connect_status: connect attempt timed "
 
748
                                "out\n", task->tk_pid);
 
749
                break;
 
750
        default:
 
751
                dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
 
752
                                "server %s\n", task->tk_pid, -task->tk_status,
 
753
                                task->tk_client->cl_server);
 
754
                xprt_release_write(xprt, task);
 
755
                task->tk_status = -EIO;
 
756
        }
 
757
}
 
758
 
 
759
/**
 
760
 * xprt_lookup_rqst - find an RPC request corresponding to an XID
 
761
 * @xprt: transport on which the original request was transmitted
 
762
 * @xid: RPC XID of incoming reply
 
763
 *
 
764
 */
 
765
struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 
766
{
 
767
        struct rpc_rqst *entry;
 
768
 
 
769
        list_for_each_entry(entry, &xprt->recv, rq_list)
 
770
                if (entry->rq_xid == xid)
 
771
                        return entry;
 
772
 
 
773
        dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
 
774
                        ntohl(xid));
 
775
        xprt->stat.bad_xids++;
 
776
        return NULL;
 
777
}
 
778
EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
 
779
 
 
780
static void xprt_update_rtt(struct rpc_task *task)
 
781
{
 
782
        struct rpc_rqst *req = task->tk_rqstp;
 
783
        struct rpc_rtt *rtt = task->tk_client->cl_rtt;
 
784
        unsigned timer = task->tk_msg.rpc_proc->p_timer;
 
785
        long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));
 
786
 
 
787
        if (timer) {
 
788
                if (req->rq_ntrans == 1)
 
789
                        rpc_update_rtt(rtt, timer, m);
 
790
                rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
 
791
        }
 
792
}
 
793
 
 
794
/**
 
795
 * xprt_complete_rqst - called when reply processing is complete
 
796
 * @task: RPC request that recently completed
 
797
 * @copied: actual number of bytes received from the transport
 
798
 *
 
799
 * Caller holds transport lock.
 
800
 */
 
801
void xprt_complete_rqst(struct rpc_task *task, int copied)
 
802
{
 
803
        struct rpc_rqst *req = task->tk_rqstp;
 
804
        struct rpc_xprt *xprt = req->rq_xprt;
 
805
 
 
806
        dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
 
807
                        task->tk_pid, ntohl(req->rq_xid), copied);
 
808
 
 
809
        xprt->stat.recvs++;
 
810
        req->rq_rtt = ktime_sub(ktime_get(), req->rq_xtime);
 
811
        if (xprt->ops->timer != NULL)
 
812
                xprt_update_rtt(task);
 
813
 
 
814
        list_del_init(&req->rq_list);
 
815
        req->rq_private_buf.len = copied;
 
816
        /* Ensure all writes are done before we update */
 
817
        /* req->rq_reply_bytes_recvd */
 
818
        smp_wmb();
 
819
        req->rq_reply_bytes_recvd = copied;
 
820
        rpc_wake_up_queued_task(&xprt->pending, task);
 
821
}
 
822
EXPORT_SYMBOL_GPL(xprt_complete_rqst);
 
823
 
 
824
static void xprt_timer(struct rpc_task *task)
 
825
{
 
826
        struct rpc_rqst *req = task->tk_rqstp;
 
827
        struct rpc_xprt *xprt = req->rq_xprt;
 
828
 
 
829
        if (task->tk_status != -ETIMEDOUT)
 
830
                return;
 
831
        dprintk("RPC: %5u xprt_timer\n", task->tk_pid);
 
832
 
 
833
        spin_lock_bh(&xprt->transport_lock);
 
834
        if (!req->rq_reply_bytes_recvd) {
 
835
                if (xprt->ops->timer)
 
836
                        xprt->ops->timer(task);
 
837
        } else
 
838
                task->tk_status = 0;
 
839
        spin_unlock_bh(&xprt->transport_lock);
 
840
}
 
841
 
 
842
static inline int xprt_has_timer(struct rpc_xprt *xprt)
 
843
{
 
844
        return xprt->idle_timeout != 0;
 
845
}
 
846
 
 
847
/**
 
848
 * xprt_prepare_transmit - reserve the transport before sending a request
 
849
 * @task: RPC task about to send a request
 
850
 *
 
851
 */
 
852
int xprt_prepare_transmit(struct rpc_task *task)
 
853
{
 
854
        struct rpc_rqst *req = task->tk_rqstp;
 
855
        struct rpc_xprt *xprt = req->rq_xprt;
 
856
        int err = 0;
 
857
 
 
858
        dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
 
859
 
 
860
        spin_lock_bh(&xprt->transport_lock);
 
861
        if (req->rq_reply_bytes_recvd && !req->rq_bytes_sent) {
 
862
                err = req->rq_reply_bytes_recvd;
 
863
                goto out_unlock;
 
864
        }
 
865
        if (!xprt->ops->reserve_xprt(xprt, task))
 
866
                err = -EAGAIN;
 
867
out_unlock:
 
868
        spin_unlock_bh(&xprt->transport_lock);
 
869
        return err;
 
870
}
 
871
 
 
872
void xprt_end_transmit(struct rpc_task *task)
 
873
{
 
874
        xprt_release_write(task->tk_rqstp->rq_xprt, task);
 
875
}
 
876
 
 
877
/**
 
878
 * xprt_transmit - send an RPC request on a transport
 
879
 * @task: controlling RPC task
 
880
 *
 
881
 * We have to copy the iovec because sendmsg fiddles with its contents.
 
882
 */
 
883
void xprt_transmit(struct rpc_task *task)
 
884
{
 
885
        struct rpc_rqst *req = task->tk_rqstp;
 
886
        struct rpc_xprt *xprt = req->rq_xprt;
 
887
        int status;
 
888
 
 
889
        dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
 
890
 
 
891
        if (!req->rq_reply_bytes_recvd) {
 
892
                if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
 
893
                        /*
 
894
                         * Add to the list only if we're expecting a reply
 
895
                         */
 
896
                        spin_lock_bh(&xprt->transport_lock);
 
897
                        /* Update the softirq receive buffer */
 
898
                        memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
 
899
                                        sizeof(req->rq_private_buf));
 
900
                        /* Add request to the receive list */
 
901
                        list_add_tail(&req->rq_list, &xprt->recv);
 
902
                        spin_unlock_bh(&xprt->transport_lock);
 
903
                        xprt_reset_majortimeo(req);
 
904
                        /* Turn off autodisconnect */
 
905
                        del_singleshot_timer_sync(&xprt->timer);
 
906
                }
 
907
        } else if (!req->rq_bytes_sent)
 
908
                return;
 
909
 
 
910
        req->rq_connect_cookie = xprt->connect_cookie;
 
911
        req->rq_xtime = ktime_get();
 
912
        status = xprt->ops->send_request(task);
 
913
        if (status != 0) {
 
914
                task->tk_status = status;
 
915
                return;
 
916
        }
 
917
 
 
918
        dprintk("RPC: %5u xmit complete\n", task->tk_pid);
 
919
        task->tk_flags |= RPC_TASK_SENT;
 
920
        spin_lock_bh(&xprt->transport_lock);
 
921
 
 
922
        xprt->ops->set_retrans_timeout(task);
 
923
 
 
924
        xprt->stat.sends++;
 
925
        xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
 
926
        xprt->stat.bklog_u += xprt->backlog.qlen;
 
927
 
 
928
        /* Don't race with disconnect */
 
929
        if (!xprt_connected(xprt))
 
930
                task->tk_status = -ENOTCONN;
 
931
        else if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) {
 
932
                /*
 
933
                 * Sleep on the pending queue since
 
934
                 * we're expecting a reply.
 
935
                 */
 
936
                rpc_sleep_on(&xprt->pending, task, xprt_timer);
 
937
        }
 
938
        spin_unlock_bh(&xprt->transport_lock);
 
939
}
 
940
 
 
941
static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
 
942
{
 
943
        struct rpc_rqst *req = ERR_PTR(-EAGAIN);
 
944
 
 
945
        if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs))
 
946
                goto out;
 
947
        req = kzalloc(sizeof(struct rpc_rqst), gfp_flags);
 
948
        if (req != NULL)
 
949
                goto out;
 
950
        atomic_dec(&xprt->num_reqs);
 
951
        req = ERR_PTR(-ENOMEM);
 
952
out:
 
953
        return req;
 
954
}
 
955
 
 
956
static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
 
957
{
 
958
        if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) {
 
959
                kfree(req);
 
960
                return true;
 
961
        }
 
962
        return false;
 
963
}
 
964
 
 
965
static void xprt_alloc_slot(struct rpc_task *task)
 
966
{
 
967
        struct rpc_xprt *xprt = task->tk_xprt;
 
968
        struct rpc_rqst *req;
 
969
 
 
970
        if (!list_empty(&xprt->free)) {
 
971
                req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
 
972
                list_del(&req->rq_list);
 
973
                goto out_init_req;
 
974
        }
 
975
        req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT);
 
976
        if (!IS_ERR(req))
 
977
                goto out_init_req;
 
978
        switch (PTR_ERR(req)) {
 
979
        case -ENOMEM:
 
980
                rpc_delay(task, HZ >> 2);
 
981
                dprintk("RPC:       dynamic allocation of request slot "
 
982
                                "failed! Retrying\n");
 
983
                break;
 
984
        case -EAGAIN:
 
985
                rpc_sleep_on(&xprt->backlog, task, NULL);
 
986
                dprintk("RPC:       waiting for request slot\n");
 
987
        }
 
988
        task->tk_status = -EAGAIN;
 
989
        return;
 
990
out_init_req:
 
991
        task->tk_status = 0;
 
992
        task->tk_rqstp = req;
 
993
        xprt_request_init(task, xprt);
 
994
}
 
995
 
 
996
static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
 
997
{
 
998
        spin_lock(&xprt->reserve_lock);
 
999
        if (!xprt_dynamic_free_slot(xprt, req)) {
 
1000
                memset(req, 0, sizeof(*req));   /* mark unused */
 
1001
                list_add(&req->rq_list, &xprt->free);
 
1002
        }
 
1003
        rpc_wake_up_next(&xprt->backlog);
 
1004
        spin_unlock(&xprt->reserve_lock);
 
1005
}
 
1006
 
 
1007
static void xprt_free_all_slots(struct rpc_xprt *xprt)
 
1008
{
 
1009
        struct rpc_rqst *req;
 
1010
        while (!list_empty(&xprt->free)) {
 
1011
                req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
 
1012
                list_del(&req->rq_list);
 
1013
                kfree(req);
 
1014
        }
 
1015
}
 
1016
 
 
1017
struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
 
1018
                unsigned int num_prealloc,
 
1019
                unsigned int max_alloc)
 
1020
{
 
1021
        struct rpc_xprt *xprt;
 
1022
        struct rpc_rqst *req;
 
1023
        int i;
 
1024
 
 
1025
        xprt = kzalloc(size, GFP_KERNEL);
 
1026
        if (xprt == NULL)
 
1027
                goto out;
 
1028
 
 
1029
        xprt_init(xprt, net);
 
1030
 
 
1031
        for (i = 0; i < num_prealloc; i++) {
 
1032
                req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
 
1033
                if (!req)
 
1034
                        break;
 
1035
                list_add(&req->rq_list, &xprt->free);
 
1036
        }
 
1037
        if (i < num_prealloc)
 
1038
                goto out_free;
 
1039
        if (max_alloc > num_prealloc)
 
1040
                xprt->max_reqs = max_alloc;
 
1041
        else
 
1042
                xprt->max_reqs = num_prealloc;
 
1043
        xprt->min_reqs = num_prealloc;
 
1044
        atomic_set(&xprt->num_reqs, num_prealloc);
 
1045
 
 
1046
        return xprt;
 
1047
 
 
1048
out_free:
 
1049
        xprt_free(xprt);
 
1050
out:
 
1051
        return NULL;
 
1052
}
 
1053
EXPORT_SYMBOL_GPL(xprt_alloc);
 
1054
 
 
1055
void xprt_free(struct rpc_xprt *xprt)
 
1056
{
 
1057
        put_net(xprt->xprt_net);
 
1058
        xprt_free_all_slots(xprt);
 
1059
        kfree(xprt);
 
1060
}
 
1061
EXPORT_SYMBOL_GPL(xprt_free);
 
1062
 
 
1063
/**
 
1064
 * xprt_reserve - allocate an RPC request slot
 
1065
 * @task: RPC task requesting a slot allocation
 
1066
 *
 
1067
 * If no more slots are available, place the task on the transport's
 
1068
 * backlog queue.
 
1069
 */
 
1070
void xprt_reserve(struct rpc_task *task)
 
1071
{
 
1072
        struct rpc_xprt *xprt = task->tk_xprt;
 
1073
 
 
1074
        task->tk_status = 0;
 
1075
        if (task->tk_rqstp != NULL)
 
1076
                return;
 
1077
 
 
1078
        /* Note: grabbing the xprt_lock_write() here is not strictly needed,
 
1079
         * but ensures that we throttle new slot allocation if the transport
 
1080
         * is congested (e.g. if reconnecting or if we're out of socket
 
1081
         * write buffer space).
 
1082
         */
 
1083
        task->tk_timeout = 0;
 
1084
        task->tk_status = -EAGAIN;
 
1085
        if (!xprt_lock_write(xprt, task))
 
1086
                return;
 
1087
 
 
1088
        spin_lock(&xprt->reserve_lock);
 
1089
        xprt_alloc_slot(task);
 
1090
        spin_unlock(&xprt->reserve_lock);
 
1091
        xprt_release_write(xprt, task);
 
1092
}
 
1093
 
 
1094
static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
 
1095
{
 
1096
        return (__force __be32)xprt->xid++;
 
1097
}
 
1098
 
 
1099
static inline void xprt_init_xid(struct rpc_xprt *xprt)
 
1100
{
 
1101
        xprt->xid = net_random();
 
1102
}
 
1103
 
 
1104
static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
 
1105
{
 
1106
        struct rpc_rqst *req = task->tk_rqstp;
 
1107
 
 
1108
        INIT_LIST_HEAD(&req->rq_list);
 
1109
        req->rq_timeout = task->tk_client->cl_timeout->to_initval;
 
1110
        req->rq_task    = task;
 
1111
        req->rq_xprt    = xprt;
 
1112
        req->rq_buffer  = NULL;
 
1113
        req->rq_xid     = xprt_alloc_xid(xprt);
 
1114
        req->rq_release_snd_buf = NULL;
 
1115
        xprt_reset_majortimeo(req);
 
1116
        dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
 
1117
                        req, ntohl(req->rq_xid));
 
1118
}
 
1119
 
 
1120
/**
 
1121
 * xprt_release - release an RPC request slot
 
1122
 * @task: task which is finished with the slot
 
1123
 *
 
1124
 */
 
1125
void xprt_release(struct rpc_task *task)
 
1126
{
 
1127
        struct rpc_xprt *xprt;
 
1128
        struct rpc_rqst *req;
 
1129
 
 
1130
        if (!(req = task->tk_rqstp))
 
1131
                return;
 
1132
 
 
1133
        xprt = req->rq_xprt;
 
1134
        rpc_count_iostats(task);
 
1135
        spin_lock_bh(&xprt->transport_lock);
 
1136
        xprt->ops->release_xprt(xprt, task);
 
1137
        if (xprt->ops->release_request)
 
1138
                xprt->ops->release_request(task);
 
1139
        if (!list_empty(&req->rq_list))
 
1140
                list_del(&req->rq_list);
 
1141
        xprt->last_used = jiffies;
 
1142
        if (list_empty(&xprt->recv) && xprt_has_timer(xprt))
 
1143
                mod_timer(&xprt->timer,
 
1144
                                xprt->last_used + xprt->idle_timeout);
 
1145
        spin_unlock_bh(&xprt->transport_lock);
 
1146
        if (req->rq_buffer)
 
1147
                xprt->ops->buf_free(req->rq_buffer);
 
1148
        if (req->rq_cred != NULL)
 
1149
                put_rpccred(req->rq_cred);
 
1150
        task->tk_rqstp = NULL;
 
1151
        if (req->rq_release_snd_buf)
 
1152
                req->rq_release_snd_buf(req);
 
1153
 
 
1154
        dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
 
1155
        if (likely(!bc_prealloc(req)))
 
1156
                xprt_free_slot(xprt, req);
 
1157
        else
 
1158
                xprt_free_bc_request(req);
 
1159
}
 
1160
 
 
1161
static void xprt_init(struct rpc_xprt *xprt, struct net *net)
 
1162
{
 
1163
        atomic_set(&xprt->count, 1);
 
1164
 
 
1165
        spin_lock_init(&xprt->transport_lock);
 
1166
        spin_lock_init(&xprt->reserve_lock);
 
1167
 
 
1168
        INIT_LIST_HEAD(&xprt->free);
 
1169
        INIT_LIST_HEAD(&xprt->recv);
 
1170
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 
1171
        spin_lock_init(&xprt->bc_pa_lock);
 
1172
        INIT_LIST_HEAD(&xprt->bc_pa_list);
 
1173
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
1174
 
 
1175
        xprt->last_used = jiffies;
 
1176
        xprt->cwnd = RPC_INITCWND;
 
1177
        xprt->bind_index = 0;
 
1178
 
 
1179
        rpc_init_wait_queue(&xprt->binding, "xprt_binding");
 
1180
        rpc_init_wait_queue(&xprt->pending, "xprt_pending");
 
1181
        rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending");
 
1182
        rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
 
1183
 
 
1184
        xprt_init_xid(xprt);
 
1185
 
 
1186
        xprt->xprt_net = get_net(net);
 
1187
}
 
1188
 
 
1189
/**
 
1190
 * xprt_create_transport - create an RPC transport
 
1191
 * @args: rpc transport creation arguments
 
1192
 *
 
1193
 */
 
1194
struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
 
1195
{
 
1196
        struct rpc_xprt *xprt;
 
1197
        struct xprt_class *t;
 
1198
 
 
1199
        spin_lock(&xprt_list_lock);
 
1200
        list_for_each_entry(t, &xprt_list, list) {
 
1201
                if (t->ident == args->ident) {
 
1202
                        spin_unlock(&xprt_list_lock);
 
1203
                        goto found;
 
1204
                }
 
1205
        }
 
1206
        spin_unlock(&xprt_list_lock);
 
1207
        printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident);
 
1208
        return ERR_PTR(-EIO);
 
1209
 
 
1210
found:
 
1211
        xprt = t->setup(args);
 
1212
        if (IS_ERR(xprt)) {
 
1213
                dprintk("RPC:       xprt_create_transport: failed, %ld\n",
 
1214
                                -PTR_ERR(xprt));
 
1215
                goto out;
 
1216
        }
 
1217
        INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
 
1218
        if (xprt_has_timer(xprt))
 
1219
                setup_timer(&xprt->timer, xprt_init_autodisconnect,
 
1220
                            (unsigned long)xprt);
 
1221
        else
 
1222
                init_timer(&xprt->timer);
 
1223
        dprintk("RPC:       created transport %p with %u slots\n", xprt,
 
1224
                        xprt->max_reqs);
 
1225
out:
 
1226
        return xprt;
 
1227
}
 
1228
 
 
1229
/**
 
1230
 * xprt_destroy - destroy an RPC transport, killing off all requests.
 
1231
 * @xprt: transport to destroy
 
1232
 *
 
1233
 */
 
1234
static void xprt_destroy(struct rpc_xprt *xprt)
 
1235
{
 
1236
        dprintk("RPC:       destroying transport %p\n", xprt);
 
1237
        xprt->shutdown = 1;
 
1238
        del_timer_sync(&xprt->timer);
 
1239
 
 
1240
        rpc_destroy_wait_queue(&xprt->binding);
 
1241
        rpc_destroy_wait_queue(&xprt->pending);
 
1242
        rpc_destroy_wait_queue(&xprt->sending);
 
1243
        rpc_destroy_wait_queue(&xprt->backlog);
 
1244
        cancel_work_sync(&xprt->task_cleanup);
 
1245
        /*
 
1246
         * Tear down transport state and free the rpc_xprt
 
1247
         */
 
1248
        xprt->ops->destroy(xprt);
 
1249
}
 
1250
 
 
1251
/**
 
1252
 * xprt_put - release a reference to an RPC transport.
 
1253
 * @xprt: pointer to the transport
 
1254
 *
 
1255
 */
 
1256
void xprt_put(struct rpc_xprt *xprt)
 
1257
{
 
1258
        if (atomic_dec_and_test(&xprt->count))
 
1259
                xprt_destroy(xprt);
 
1260
}
 
1261
 
 
1262
/**
 
1263
 * xprt_get - return a reference to an RPC transport.
 
1264
 * @xprt: pointer to the transport
 
1265
 *
 
1266
 */
 
1267
struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
 
1268
{
 
1269
        if (atomic_inc_not_zero(&xprt->count))
 
1270
                return xprt;
 
1271
        return NULL;
 
1272
}