2
* linux/net/sunrpc/xprt.c
4
* This is a generic RPC call interface supporting congestion avoidance,
5
* and asynchronous calls.
7
* The interface works like this:
9
* - When a process places a call, it allocates a request slot if
10
* one is available. Otherwise, it sleeps on the backlog queue
12
* - Next, the caller puts together the RPC message, stuffs it into
13
* the request struct, and calls xprt_transmit().
14
* - xprt_transmit sends the message and installs the caller on the
15
* transport's wait list. At the same time, if a reply is expected,
16
* it installs a timer that is run after the packet's timeout has
18
* - When a packet arrives, the data_ready handler walks the list of
19
* pending requests for that transport. If a matching XID is found, the
20
* caller is woken up, and the timer removed.
21
* - When no reply arrives within the timeout interval, the timer is
22
* fired by the kernel and runs xprt_timer(). It either adjusts the
23
* timeout values (minor timeout) or wakes up the caller with a status
25
* - When the caller receives a notification from RPC that a reply arrived,
26
* it should release the RPC slot, and process the reply.
27
* If the call timed out, it may choose to retry the operation by
28
* adjusting the initial timeout value, and simply calling rpc_call
31
* Support for async RPC is done through a set of RPC-specific scheduling
32
* primitives that `transparently' work for processes as well as async
33
* tasks that rely on callbacks.
35
* Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
37
* Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
40
#include <linux/module.h>
42
#include <linux/types.h>
43
#include <linux/interrupt.h>
44
#include <linux/workqueue.h>
45
#include <linux/net.h>
46
#include <linux/ktime.h>
48
#include <linux/sunrpc/clnt.h>
49
#include <linux/sunrpc/metrics.h>
50
#include <linux/sunrpc/bc_xprt.h>
59
# define RPCDBG_FACILITY RPCDBG_XPRT
65
static void xprt_init(struct rpc_xprt *xprt, struct net *net);
66
static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
67
static void xprt_connect_status(struct rpc_task *task);
68
static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
70
static DEFINE_SPINLOCK(xprt_list_lock);
71
static LIST_HEAD(xprt_list);
74
* The transport code maintains an estimate on the maximum number of out-
75
* standing RPC requests, using a smoothed version of the congestion
76
* avoidance implemented in 44BSD. This is basically the Van Jacobson
77
* congestion algorithm: If a retransmit occurs, the congestion window is
78
* halved; otherwise, it is incremented by 1/cwnd when
80
* - a reply is received and
81
* - a full number of requests are outstanding and
82
* - the congestion window hasn't been updated recently.
84
#define RPC_CWNDSHIFT (8U)
85
#define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT)
86
#define RPC_INITCWND RPC_CWNDSCALE
87
#define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT)
89
#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
92
* xprt_register_transport - register a transport implementation
93
* @transport: transport to register
95
* If a transport implementation is loaded as a kernel module, it can
96
* call this interface to make itself known to the RPC client.
99
* 0: transport successfully registered
100
* -EEXIST: transport already registered
101
* -EINVAL: transport module being unloaded
103
int xprt_register_transport(struct xprt_class *transport)
105
struct xprt_class *t;
109
spin_lock(&xprt_list_lock);
110
list_for_each_entry(t, &xprt_list, list) {
111
/* don't register the same transport class twice */
112
if (t->ident == transport->ident)
116
list_add_tail(&transport->list, &xprt_list);
117
printk(KERN_INFO "RPC: Registered %s transport module.\n",
122
spin_unlock(&xprt_list_lock);
125
EXPORT_SYMBOL_GPL(xprt_register_transport);
128
* xprt_unregister_transport - unregister a transport implementation
129
* @transport: transport to unregister
132
* 0: transport successfully unregistered
133
* -ENOENT: transport never registered
135
int xprt_unregister_transport(struct xprt_class *transport)
137
struct xprt_class *t;
141
spin_lock(&xprt_list_lock);
142
list_for_each_entry(t, &xprt_list, list) {
143
if (t == transport) {
145
"RPC: Unregistered %s transport module.\n",
147
list_del_init(&transport->list);
154
spin_unlock(&xprt_list_lock);
157
EXPORT_SYMBOL_GPL(xprt_unregister_transport);
160
* xprt_load_transport - load a transport implementation
161
* @transport_name: transport to load
164
* 0: transport successfully loaded
165
* -ENOENT: transport module not available
167
int xprt_load_transport(const char *transport_name)
169
struct xprt_class *t;
173
spin_lock(&xprt_list_lock);
174
list_for_each_entry(t, &xprt_list, list) {
175
if (strcmp(t->name, transport_name) == 0) {
176
spin_unlock(&xprt_list_lock);
180
spin_unlock(&xprt_list_lock);
181
result = request_module("xprt%s", transport_name);
185
EXPORT_SYMBOL_GPL(xprt_load_transport);
188
* xprt_reserve_xprt - serialize write access to transports
189
* @task: task that is requesting access to the transport
190
* @xprt: pointer to the target transport
192
* This prevents mixing the payload of separate requests, and prevents
193
* transport connects from colliding with writes. No congestion control
196
int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
198
struct rpc_rqst *req = task->tk_rqstp;
201
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
202
if (task == xprt->snd_task)
206
xprt->snd_task = task;
208
req->rq_bytes_sent = 0;
215
dprintk("RPC: %5u failed to lock transport %p\n",
217
task->tk_timeout = 0;
218
task->tk_status = -EAGAIN;
220
priority = RPC_PRIORITY_LOW;
221
else if (!req->rq_ntrans)
222
priority = RPC_PRIORITY_NORMAL;
224
priority = RPC_PRIORITY_HIGH;
225
rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
228
EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
230
static void xprt_clear_locked(struct rpc_xprt *xprt)
232
xprt->snd_task = NULL;
233
if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) {
234
smp_mb__before_clear_bit();
235
clear_bit(XPRT_LOCKED, &xprt->state);
236
smp_mb__after_clear_bit();
238
queue_work(rpciod_workqueue, &xprt->task_cleanup);
242
* xprt_reserve_xprt_cong - serialize write access to transports
243
* @task: task that is requesting access to the transport
245
* Same as xprt_reserve_xprt, but Van Jacobson congestion control is
246
* integrated into the decision of whether a request is allowed to be
247
* woken up and given access to the transport.
249
int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
251
struct rpc_rqst *req = task->tk_rqstp;
254
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
255
if (task == xprt->snd_task)
260
xprt->snd_task = task;
263
if (__xprt_get_cong(xprt, task)) {
264
xprt->snd_task = task;
265
req->rq_bytes_sent = 0;
269
xprt_clear_locked(xprt);
271
dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
272
task->tk_timeout = 0;
273
task->tk_status = -EAGAIN;
275
priority = RPC_PRIORITY_LOW;
276
else if (!req->rq_ntrans)
277
priority = RPC_PRIORITY_NORMAL;
279
priority = RPC_PRIORITY_HIGH;
280
rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
283
EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
285
static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
289
spin_lock_bh(&xprt->transport_lock);
290
retval = xprt->ops->reserve_xprt(xprt, task);
291
spin_unlock_bh(&xprt->transport_lock);
295
static void __xprt_lock_write_next(struct rpc_xprt *xprt)
297
struct rpc_task *task;
298
struct rpc_rqst *req;
300
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
303
task = rpc_wake_up_next(&xprt->sending);
307
req = task->tk_rqstp;
308
xprt->snd_task = task;
310
req->rq_bytes_sent = 0;
316
xprt_clear_locked(xprt);
319
static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
321
struct rpc_task *task;
322
struct rpc_rqst *req;
324
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
326
if (RPCXPRT_CONGESTED(xprt))
328
task = rpc_wake_up_next(&xprt->sending);
332
req = task->tk_rqstp;
334
xprt->snd_task = task;
337
if (__xprt_get_cong(xprt, task)) {
338
xprt->snd_task = task;
339
req->rq_bytes_sent = 0;
344
xprt_clear_locked(xprt);
348
* xprt_release_xprt - allow other requests to use a transport
349
* @xprt: transport with other tasks potentially waiting
350
* @task: task that is releasing access to the transport
352
* Note that "task" can be NULL. No congestion control is provided.
354
void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
356
if (xprt->snd_task == task) {
357
xprt_clear_locked(xprt);
358
__xprt_lock_write_next(xprt);
361
EXPORT_SYMBOL_GPL(xprt_release_xprt);
364
* xprt_release_xprt_cong - allow other requests to use a transport
365
* @xprt: transport with other tasks potentially waiting
366
* @task: task that is releasing access to the transport
368
* Note that "task" can be NULL. Another task is awoken to use the
369
* transport if the transport's congestion window allows it.
371
void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
373
if (xprt->snd_task == task) {
374
xprt_clear_locked(xprt);
375
__xprt_lock_write_next_cong(xprt);
378
EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
380
static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
382
spin_lock_bh(&xprt->transport_lock);
383
xprt->ops->release_xprt(xprt, task);
384
spin_unlock_bh(&xprt->transport_lock);
388
* Van Jacobson congestion avoidance. Check if the congestion window
389
* overflowed. Put the task to sleep if this is the case.
392
__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
394
struct rpc_rqst *req = task->tk_rqstp;
398
dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
399
task->tk_pid, xprt->cong, xprt->cwnd);
400
if (RPCXPRT_CONGESTED(xprt))
403
xprt->cong += RPC_CWNDSCALE;
408
* Adjust the congestion window, and wake up the next task
409
* that has been sleeping due to congestion
412
__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
417
xprt->cong -= RPC_CWNDSCALE;
418
__xprt_lock_write_next_cong(xprt);
422
* xprt_release_rqst_cong - housekeeping when request is complete
423
* @task: RPC request that recently completed
425
* Useful for transports that require congestion control.
427
void xprt_release_rqst_cong(struct rpc_task *task)
429
__xprt_put_cong(task->tk_xprt, task->tk_rqstp);
431
EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
434
* xprt_adjust_cwnd - adjust transport congestion window
435
* @task: recently completed RPC request used to adjust window
436
* @result: result code of completed RPC request
438
* We use a time-smoothed congestion estimator to avoid heavy oscillation.
440
void xprt_adjust_cwnd(struct rpc_task *task, int result)
442
struct rpc_rqst *req = task->tk_rqstp;
443
struct rpc_xprt *xprt = task->tk_xprt;
444
unsigned long cwnd = xprt->cwnd;
446
if (result >= 0 && cwnd <= xprt->cong) {
447
/* The (cwnd >> 1) term makes sure
448
* the result gets rounded properly. */
449
cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
450
if (cwnd > RPC_MAXCWND(xprt))
451
cwnd = RPC_MAXCWND(xprt);
452
__xprt_lock_write_next_cong(xprt);
453
} else if (result == -ETIMEDOUT) {
455
if (cwnd < RPC_CWNDSCALE)
456
cwnd = RPC_CWNDSCALE;
458
dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
459
xprt->cong, xprt->cwnd, cwnd);
461
__xprt_put_cong(xprt, req);
463
EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
466
* xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
467
* @xprt: transport with waiting tasks
468
* @status: result code to plant in each task before waking it
471
void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
474
rpc_wake_up_status(&xprt->pending, status);
476
rpc_wake_up(&xprt->pending);
478
EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
481
* xprt_wait_for_buffer_space - wait for transport output buffer to clear
482
* @task: task to be put to sleep
483
* @action: function pointer to be executed after wait
485
void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)
487
struct rpc_rqst *req = task->tk_rqstp;
488
struct rpc_xprt *xprt = req->rq_xprt;
490
task->tk_timeout = req->rq_timeout;
491
rpc_sleep_on(&xprt->pending, task, action);
493
EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
496
* xprt_write_space - wake the task waiting for transport output buffer space
497
* @xprt: transport with waiting tasks
499
* Can be called in a soft IRQ context, so xprt_write_space never sleeps.
501
void xprt_write_space(struct rpc_xprt *xprt)
503
if (unlikely(xprt->shutdown))
506
spin_lock_bh(&xprt->transport_lock);
507
if (xprt->snd_task) {
508
dprintk("RPC: write space: waking waiting task on "
510
rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
512
spin_unlock_bh(&xprt->transport_lock);
514
EXPORT_SYMBOL_GPL(xprt_write_space);
517
* xprt_set_retrans_timeout_def - set a request's retransmit timeout
518
* @task: task whose timeout is to be set
520
* Set a request's retransmit timeout based on the transport's
521
* default timeout parameters. Used by transports that don't adjust
522
* the retransmit timeout based on round-trip time estimation.
524
void xprt_set_retrans_timeout_def(struct rpc_task *task)
526
task->tk_timeout = task->tk_rqstp->rq_timeout;
528
EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
531
* xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
532
* @task: task whose timeout is to be set
534
* Set a request's retransmit timeout using the RTT estimator.
536
void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
538
int timer = task->tk_msg.rpc_proc->p_timer;
539
struct rpc_clnt *clnt = task->tk_client;
540
struct rpc_rtt *rtt = clnt->cl_rtt;
541
struct rpc_rqst *req = task->tk_rqstp;
542
unsigned long max_timeout = clnt->cl_timeout->to_maxval;
544
task->tk_timeout = rpc_calc_rto(rtt, timer);
545
task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
546
if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
547
task->tk_timeout = max_timeout;
549
EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);
551
static void xprt_reset_majortimeo(struct rpc_rqst *req)
553
const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
555
req->rq_majortimeo = req->rq_timeout;
556
if (to->to_exponential)
557
req->rq_majortimeo <<= to->to_retries;
559
req->rq_majortimeo += to->to_increment * to->to_retries;
560
if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
561
req->rq_majortimeo = to->to_maxval;
562
req->rq_majortimeo += jiffies;
566
* xprt_adjust_timeout - adjust timeout values for next retransmit
567
* @req: RPC request containing parameters to use for the adjustment
570
int xprt_adjust_timeout(struct rpc_rqst *req)
572
struct rpc_xprt *xprt = req->rq_xprt;
573
const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
576
if (time_before(jiffies, req->rq_majortimeo)) {
577
if (to->to_exponential)
578
req->rq_timeout <<= 1;
580
req->rq_timeout += to->to_increment;
581
if (to->to_maxval && req->rq_timeout >= to->to_maxval)
582
req->rq_timeout = to->to_maxval;
585
req->rq_timeout = to->to_initval;
587
xprt_reset_majortimeo(req);
588
/* Reset the RTT counters == "slow start" */
589
spin_lock_bh(&xprt->transport_lock);
590
rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
591
spin_unlock_bh(&xprt->transport_lock);
595
if (req->rq_timeout == 0) {
596
printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
597
req->rq_timeout = 5 * HZ;
602
static void xprt_autoclose(struct work_struct *work)
604
struct rpc_xprt *xprt =
605
container_of(work, struct rpc_xprt, task_cleanup);
607
xprt->ops->close(xprt);
608
clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
609
xprt_release_write(xprt, NULL);
613
* xprt_disconnect_done - mark a transport as disconnected
614
* @xprt: transport to flag for disconnect
617
void xprt_disconnect_done(struct rpc_xprt *xprt)
619
dprintk("RPC: disconnected transport %p\n", xprt);
620
spin_lock_bh(&xprt->transport_lock);
621
xprt_clear_connected(xprt);
622
xprt_wake_pending_tasks(xprt, -EAGAIN);
623
spin_unlock_bh(&xprt->transport_lock);
625
EXPORT_SYMBOL_GPL(xprt_disconnect_done);
628
* xprt_force_disconnect - force a transport to disconnect
629
* @xprt: transport to disconnect
632
void xprt_force_disconnect(struct rpc_xprt *xprt)
634
/* Don't race with the test_bit() in xprt_clear_locked() */
635
spin_lock_bh(&xprt->transport_lock);
636
set_bit(XPRT_CLOSE_WAIT, &xprt->state);
637
/* Try to schedule an autoclose RPC call */
638
if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
639
queue_work(rpciod_workqueue, &xprt->task_cleanup);
640
xprt_wake_pending_tasks(xprt, -EAGAIN);
641
spin_unlock_bh(&xprt->transport_lock);
645
* xprt_conditional_disconnect - force a transport to disconnect
646
* @xprt: transport to disconnect
647
* @cookie: 'connection cookie'
649
* This attempts to break the connection if and only if 'cookie' matches
650
* the current transport 'connection cookie'. It ensures that we don't
651
* try to break the connection more than once when we need to retransmit
652
* a batch of RPC requests.
655
void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
657
/* Don't race with the test_bit() in xprt_clear_locked() */
658
spin_lock_bh(&xprt->transport_lock);
659
if (cookie != xprt->connect_cookie)
661
if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt))
663
set_bit(XPRT_CLOSE_WAIT, &xprt->state);
664
/* Try to schedule an autoclose RPC call */
665
if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
666
queue_work(rpciod_workqueue, &xprt->task_cleanup);
667
xprt_wake_pending_tasks(xprt, -EAGAIN);
669
spin_unlock_bh(&xprt->transport_lock);
673
xprt_init_autodisconnect(unsigned long data)
675
struct rpc_xprt *xprt = (struct rpc_xprt *)data;
677
spin_lock(&xprt->transport_lock);
678
if (!list_empty(&xprt->recv) || xprt->shutdown)
680
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
682
spin_unlock(&xprt->transport_lock);
683
set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
684
queue_work(rpciod_workqueue, &xprt->task_cleanup);
687
spin_unlock(&xprt->transport_lock);
691
* xprt_connect - schedule a transport connect operation
692
* @task: RPC task that is requesting the connect
695
void xprt_connect(struct rpc_task *task)
697
struct rpc_xprt *xprt = task->tk_xprt;
699
dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
700
xprt, (xprt_connected(xprt) ? "is" : "is not"));
702
if (!xprt_bound(xprt)) {
703
task->tk_status = -EAGAIN;
706
if (!xprt_lock_write(xprt, task))
709
if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state))
710
xprt->ops->close(xprt);
712
if (xprt_connected(xprt))
713
xprt_release_write(xprt, task);
716
task->tk_rqstp->rq_bytes_sent = 0;
718
task->tk_timeout = task->tk_rqstp->rq_timeout;
719
rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
721
if (test_bit(XPRT_CLOSING, &xprt->state))
723
if (xprt_test_and_set_connecting(xprt))
725
xprt->stat.connect_start = jiffies;
726
xprt->ops->connect(task);
730
static void xprt_connect_status(struct rpc_task *task)
732
struct rpc_xprt *xprt = task->tk_xprt;
734
if (task->tk_status == 0) {
735
xprt->stat.connect_count++;
736
xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start;
737
dprintk("RPC: %5u xprt_connect_status: connection established\n",
742
switch (task->tk_status) {
744
dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid);
747
dprintk("RPC: %5u xprt_connect_status: connect attempt timed "
748
"out\n", task->tk_pid);
751
dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
752
"server %s\n", task->tk_pid, -task->tk_status,
753
task->tk_client->cl_server);
754
xprt_release_write(xprt, task);
755
task->tk_status = -EIO;
760
* xprt_lookup_rqst - find an RPC request corresponding to an XID
761
* @xprt: transport on which the original request was transmitted
762
* @xid: RPC XID of incoming reply
765
struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
767
struct rpc_rqst *entry;
769
list_for_each_entry(entry, &xprt->recv, rq_list)
770
if (entry->rq_xid == xid)
773
dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n",
775
xprt->stat.bad_xids++;
778
EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
780
static void xprt_update_rtt(struct rpc_task *task)
782
struct rpc_rqst *req = task->tk_rqstp;
783
struct rpc_rtt *rtt = task->tk_client->cl_rtt;
784
unsigned timer = task->tk_msg.rpc_proc->p_timer;
785
long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));
788
if (req->rq_ntrans == 1)
789
rpc_update_rtt(rtt, timer, m);
790
rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
795
* xprt_complete_rqst - called when reply processing is complete
796
* @task: RPC request that recently completed
797
* @copied: actual number of bytes received from the transport
799
* Caller holds transport lock.
801
void xprt_complete_rqst(struct rpc_task *task, int copied)
803
struct rpc_rqst *req = task->tk_rqstp;
804
struct rpc_xprt *xprt = req->rq_xprt;
806
dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
807
task->tk_pid, ntohl(req->rq_xid), copied);
810
req->rq_rtt = ktime_sub(ktime_get(), req->rq_xtime);
811
if (xprt->ops->timer != NULL)
812
xprt_update_rtt(task);
814
list_del_init(&req->rq_list);
815
req->rq_private_buf.len = copied;
816
/* Ensure all writes are done before we update */
817
/* req->rq_reply_bytes_recvd */
819
req->rq_reply_bytes_recvd = copied;
820
rpc_wake_up_queued_task(&xprt->pending, task);
822
EXPORT_SYMBOL_GPL(xprt_complete_rqst);
824
static void xprt_timer(struct rpc_task *task)
826
struct rpc_rqst *req = task->tk_rqstp;
827
struct rpc_xprt *xprt = req->rq_xprt;
829
if (task->tk_status != -ETIMEDOUT)
831
dprintk("RPC: %5u xprt_timer\n", task->tk_pid);
833
spin_lock_bh(&xprt->transport_lock);
834
if (!req->rq_reply_bytes_recvd) {
835
if (xprt->ops->timer)
836
xprt->ops->timer(task);
839
spin_unlock_bh(&xprt->transport_lock);
842
static inline int xprt_has_timer(struct rpc_xprt *xprt)
844
return xprt->idle_timeout != 0;
848
* xprt_prepare_transmit - reserve the transport before sending a request
849
* @task: RPC task about to send a request
852
int xprt_prepare_transmit(struct rpc_task *task)
854
struct rpc_rqst *req = task->tk_rqstp;
855
struct rpc_xprt *xprt = req->rq_xprt;
858
dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
860
spin_lock_bh(&xprt->transport_lock);
861
if (req->rq_reply_bytes_recvd && !req->rq_bytes_sent) {
862
err = req->rq_reply_bytes_recvd;
865
if (!xprt->ops->reserve_xprt(xprt, task))
868
spin_unlock_bh(&xprt->transport_lock);
872
void xprt_end_transmit(struct rpc_task *task)
874
xprt_release_write(task->tk_rqstp->rq_xprt, task);
878
* xprt_transmit - send an RPC request on a transport
879
* @task: controlling RPC task
881
* We have to copy the iovec because sendmsg fiddles with its contents.
883
void xprt_transmit(struct rpc_task *task)
885
struct rpc_rqst *req = task->tk_rqstp;
886
struct rpc_xprt *xprt = req->rq_xprt;
889
dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
891
if (!req->rq_reply_bytes_recvd) {
892
if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
894
* Add to the list only if we're expecting a reply
896
spin_lock_bh(&xprt->transport_lock);
897
/* Update the softirq receive buffer */
898
memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
899
sizeof(req->rq_private_buf));
900
/* Add request to the receive list */
901
list_add_tail(&req->rq_list, &xprt->recv);
902
spin_unlock_bh(&xprt->transport_lock);
903
xprt_reset_majortimeo(req);
904
/* Turn off autodisconnect */
905
del_singleshot_timer_sync(&xprt->timer);
907
} else if (!req->rq_bytes_sent)
910
req->rq_connect_cookie = xprt->connect_cookie;
911
req->rq_xtime = ktime_get();
912
status = xprt->ops->send_request(task);
914
task->tk_status = status;
918
dprintk("RPC: %5u xmit complete\n", task->tk_pid);
919
task->tk_flags |= RPC_TASK_SENT;
920
spin_lock_bh(&xprt->transport_lock);
922
xprt->ops->set_retrans_timeout(task);
925
xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
926
xprt->stat.bklog_u += xprt->backlog.qlen;
928
/* Don't race with disconnect */
929
if (!xprt_connected(xprt))
930
task->tk_status = -ENOTCONN;
931
else if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) {
933
* Sleep on the pending queue since
934
* we're expecting a reply.
936
rpc_sleep_on(&xprt->pending, task, xprt_timer);
938
spin_unlock_bh(&xprt->transport_lock);
941
static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
943
struct rpc_rqst *req = ERR_PTR(-EAGAIN);
945
if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs))
947
req = kzalloc(sizeof(struct rpc_rqst), gfp_flags);
950
atomic_dec(&xprt->num_reqs);
951
req = ERR_PTR(-ENOMEM);
956
static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
958
if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) {
965
static void xprt_alloc_slot(struct rpc_task *task)
967
struct rpc_xprt *xprt = task->tk_xprt;
968
struct rpc_rqst *req;
970
if (!list_empty(&xprt->free)) {
971
req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
972
list_del(&req->rq_list);
975
req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT);
978
switch (PTR_ERR(req)) {
980
rpc_delay(task, HZ >> 2);
981
dprintk("RPC: dynamic allocation of request slot "
982
"failed! Retrying\n");
985
rpc_sleep_on(&xprt->backlog, task, NULL);
986
dprintk("RPC: waiting for request slot\n");
988
task->tk_status = -EAGAIN;
992
task->tk_rqstp = req;
993
xprt_request_init(task, xprt);
996
static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
998
spin_lock(&xprt->reserve_lock);
999
if (!xprt_dynamic_free_slot(xprt, req)) {
1000
memset(req, 0, sizeof(*req)); /* mark unused */
1001
list_add(&req->rq_list, &xprt->free);
1003
rpc_wake_up_next(&xprt->backlog);
1004
spin_unlock(&xprt->reserve_lock);
1007
static void xprt_free_all_slots(struct rpc_xprt *xprt)
1009
struct rpc_rqst *req;
1010
while (!list_empty(&xprt->free)) {
1011
req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
1012
list_del(&req->rq_list);
1017
struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
1018
unsigned int num_prealloc,
1019
unsigned int max_alloc)
1021
struct rpc_xprt *xprt;
1022
struct rpc_rqst *req;
1025
xprt = kzalloc(size, GFP_KERNEL);
1029
xprt_init(xprt, net);
1031
for (i = 0; i < num_prealloc; i++) {
1032
req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
1035
list_add(&req->rq_list, &xprt->free);
1037
if (i < num_prealloc)
1039
if (max_alloc > num_prealloc)
1040
xprt->max_reqs = max_alloc;
1042
xprt->max_reqs = num_prealloc;
1043
xprt->min_reqs = num_prealloc;
1044
atomic_set(&xprt->num_reqs, num_prealloc);
1053
EXPORT_SYMBOL_GPL(xprt_alloc);
1055
void xprt_free(struct rpc_xprt *xprt)
1057
put_net(xprt->xprt_net);
1058
xprt_free_all_slots(xprt);
1061
EXPORT_SYMBOL_GPL(xprt_free);
1064
* xprt_reserve - allocate an RPC request slot
1065
* @task: RPC task requesting a slot allocation
1067
* If no more slots are available, place the task on the transport's
1070
void xprt_reserve(struct rpc_task *task)
1072
struct rpc_xprt *xprt = task->tk_xprt;
1074
task->tk_status = 0;
1075
if (task->tk_rqstp != NULL)
1078
/* Note: grabbing the xprt_lock_write() here is not strictly needed,
1079
* but ensures that we throttle new slot allocation if the transport
1080
* is congested (e.g. if reconnecting or if we're out of socket
1081
* write buffer space).
1083
task->tk_timeout = 0;
1084
task->tk_status = -EAGAIN;
1085
if (!xprt_lock_write(xprt, task))
1088
spin_lock(&xprt->reserve_lock);
1089
xprt_alloc_slot(task);
1090
spin_unlock(&xprt->reserve_lock);
1091
xprt_release_write(xprt, task);
1094
static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
1096
return (__force __be32)xprt->xid++;
1099
static inline void xprt_init_xid(struct rpc_xprt *xprt)
1101
xprt->xid = net_random();
1104
static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1106
struct rpc_rqst *req = task->tk_rqstp;
1108
INIT_LIST_HEAD(&req->rq_list);
1109
req->rq_timeout = task->tk_client->cl_timeout->to_initval;
1110
req->rq_task = task;
1111
req->rq_xprt = xprt;
1112
req->rq_buffer = NULL;
1113
req->rq_xid = xprt_alloc_xid(xprt);
1114
req->rq_release_snd_buf = NULL;
1115
xprt_reset_majortimeo(req);
1116
dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
1117
req, ntohl(req->rq_xid));
1121
* xprt_release - release an RPC request slot
1122
* @task: task which is finished with the slot
1125
void xprt_release(struct rpc_task *task)
1127
struct rpc_xprt *xprt;
1128
struct rpc_rqst *req;
1130
if (!(req = task->tk_rqstp))
1133
xprt = req->rq_xprt;
1134
rpc_count_iostats(task);
1135
spin_lock_bh(&xprt->transport_lock);
1136
xprt->ops->release_xprt(xprt, task);
1137
if (xprt->ops->release_request)
1138
xprt->ops->release_request(task);
1139
if (!list_empty(&req->rq_list))
1140
list_del(&req->rq_list);
1141
xprt->last_used = jiffies;
1142
if (list_empty(&xprt->recv) && xprt_has_timer(xprt))
1143
mod_timer(&xprt->timer,
1144
xprt->last_used + xprt->idle_timeout);
1145
spin_unlock_bh(&xprt->transport_lock);
1147
xprt->ops->buf_free(req->rq_buffer);
1148
if (req->rq_cred != NULL)
1149
put_rpccred(req->rq_cred);
1150
task->tk_rqstp = NULL;
1151
if (req->rq_release_snd_buf)
1152
req->rq_release_snd_buf(req);
1154
dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
1155
if (likely(!bc_prealloc(req)))
1156
xprt_free_slot(xprt, req);
1158
xprt_free_bc_request(req);
1161
static void xprt_init(struct rpc_xprt *xprt, struct net *net)
1163
atomic_set(&xprt->count, 1);
1165
spin_lock_init(&xprt->transport_lock);
1166
spin_lock_init(&xprt->reserve_lock);
1168
INIT_LIST_HEAD(&xprt->free);
1169
INIT_LIST_HEAD(&xprt->recv);
1170
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1171
spin_lock_init(&xprt->bc_pa_lock);
1172
INIT_LIST_HEAD(&xprt->bc_pa_list);
1173
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1175
xprt->last_used = jiffies;
1176
xprt->cwnd = RPC_INITCWND;
1177
xprt->bind_index = 0;
1179
rpc_init_wait_queue(&xprt->binding, "xprt_binding");
1180
rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1181
rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending");
1182
rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
1184
xprt_init_xid(xprt);
1186
xprt->xprt_net = get_net(net);
1190
* xprt_create_transport - create an RPC transport
1191
* @args: rpc transport creation arguments
1194
struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
1196
struct rpc_xprt *xprt;
1197
struct xprt_class *t;
1199
spin_lock(&xprt_list_lock);
1200
list_for_each_entry(t, &xprt_list, list) {
1201
if (t->ident == args->ident) {
1202
spin_unlock(&xprt_list_lock);
1206
spin_unlock(&xprt_list_lock);
1207
printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident);
1208
return ERR_PTR(-EIO);
1211
xprt = t->setup(args);
1213
dprintk("RPC: xprt_create_transport: failed, %ld\n",
1217
INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
1218
if (xprt_has_timer(xprt))
1219
setup_timer(&xprt->timer, xprt_init_autodisconnect,
1220
(unsigned long)xprt);
1222
init_timer(&xprt->timer);
1223
dprintk("RPC: created transport %p with %u slots\n", xprt,
1230
* xprt_destroy - destroy an RPC transport, killing off all requests.
1231
* @xprt: transport to destroy
1234
static void xprt_destroy(struct rpc_xprt *xprt)
1236
dprintk("RPC: destroying transport %p\n", xprt);
1238
del_timer_sync(&xprt->timer);
1240
rpc_destroy_wait_queue(&xprt->binding);
1241
rpc_destroy_wait_queue(&xprt->pending);
1242
rpc_destroy_wait_queue(&xprt->sending);
1243
rpc_destroy_wait_queue(&xprt->backlog);
1244
cancel_work_sync(&xprt->task_cleanup);
1246
* Tear down transport state and free the rpc_xprt
1248
xprt->ops->destroy(xprt);
1252
* xprt_put - release a reference to an RPC transport.
1253
* @xprt: pointer to the transport
1256
void xprt_put(struct rpc_xprt *xprt)
1258
if (atomic_dec_and_test(&xprt->count))
1263
* xprt_get - return a reference to an RPC transport.
1264
* @xprt: pointer to the transport
1267
struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
1269
if (atomic_inc_not_zero(&xprt->count))