65
static void xprt_init(struct rpc_xprt *xprt, struct net *net);
65
66
static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
66
67
static void xprt_connect_status(struct rpc_task *task);
67
68
static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
187
188
* xprt_reserve_xprt - serialize write access to transports
188
189
* @task: task that is requesting access to the transport
190
* @xprt: pointer to the target transport
190
192
* This prevents mixing the payload of separate requests, and prevents
191
193
* transport connects from colliding with writes. No congestion control
194
int xprt_reserve_xprt(struct rpc_task *task)
196
int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
196
198
struct rpc_rqst *req = task->tk_rqstp;
197
struct rpc_xprt *xprt = req->rq_xprt;
199
201
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
200
202
if (task == xprt->snd_task)
212
216
task->tk_pid, xprt);
213
217
task->tk_timeout = 0;
214
218
task->tk_status = -EAGAIN;
216
rpc_sleep_on(&xprt->resend, task, NULL);
220
priority = RPC_PRIORITY_LOW;
221
else if (!req->rq_ntrans)
222
priority = RPC_PRIORITY_NORMAL;
218
rpc_sleep_on(&xprt->sending, task, NULL);
224
priority = RPC_PRIORITY_HIGH;
225
rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
221
228
EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
239
246
* integrated into the decision of whether a request is allowed to be
240
247
* woken up and given access to the transport.
242
int xprt_reserve_xprt_cong(struct rpc_task *task)
249
int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
244
struct rpc_xprt *xprt = task->tk_xprt;
245
251
struct rpc_rqst *req = task->tk_rqstp;
247
254
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
248
255
if (task == xprt->snd_task)
260
xprt->snd_task = task;
252
263
if (__xprt_get_cong(xprt, task)) {
253
264
xprt->snd_task = task;
255
req->rq_bytes_sent = 0;
265
req->rq_bytes_sent = 0;
260
269
xprt_clear_locked(xprt);
262
271
dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
263
272
task->tk_timeout = 0;
264
273
task->tk_status = -EAGAIN;
265
if (req && req->rq_ntrans)
266
rpc_sleep_on(&xprt->resend, task, NULL);
275
priority = RPC_PRIORITY_LOW;
276
else if (!req->rq_ntrans)
277
priority = RPC_PRIORITY_NORMAL;
268
rpc_sleep_on(&xprt->sending, task, NULL);
279
priority = RPC_PRIORITY_HIGH;
280
rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
271
283
EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
277
289
spin_lock_bh(&xprt->transport_lock);
278
retval = xprt->ops->reserve_xprt(task);
290
retval = xprt->ops->reserve_xprt(xprt, task);
279
291
spin_unlock_bh(&xprt->transport_lock);
288
300
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
291
task = rpc_wake_up_next(&xprt->resend);
293
task = rpc_wake_up_next(&xprt->sending);
303
task = rpc_wake_up_next(&xprt->sending);
298
307
req = task->tk_rqstp;
299
308
xprt->snd_task = task;
310
319
static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
312
321
struct rpc_task *task;
322
struct rpc_rqst *req;
314
324
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
316
326
if (RPCXPRT_CONGESTED(xprt))
318
task = rpc_wake_up_next(&xprt->resend);
320
task = rpc_wake_up_next(&xprt->sending);
328
task = rpc_wake_up_next(&xprt->sending);
332
req = task->tk_rqstp;
334
xprt->snd_task = task;
324
337
if (__xprt_get_cong(xprt, task)) {
325
struct rpc_rqst *req = task->tk_rqstp;
326
338
xprt->snd_task = task;
328
req->rq_bytes_sent = 0;
339
req->rq_bytes_sent = 0;
852
862
err = req->rq_reply_bytes_recvd;
855
if (!xprt->ops->reserve_xprt(task))
865
if (!xprt->ops->reserve_xprt(xprt, task))
858
868
spin_unlock_bh(&xprt->transport_lock);
928
938
spin_unlock_bh(&xprt->transport_lock);
941
static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
943
struct rpc_rqst *req = ERR_PTR(-EAGAIN);
945
if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs))
947
req = kzalloc(sizeof(struct rpc_rqst), gfp_flags);
950
atomic_dec(&xprt->num_reqs);
951
req = ERR_PTR(-ENOMEM);
956
static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
958
if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) {
931
965
static void xprt_alloc_slot(struct rpc_task *task)
933
967
struct rpc_xprt *xprt = task->tk_xprt;
968
struct rpc_rqst *req;
938
970
if (!list_empty(&xprt->free)) {
939
struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
940
list_del_init(&req->rq_list);
941
task->tk_rqstp = req;
942
xprt_request_init(task, xprt);
945
dprintk("RPC: waiting for request slot\n");
971
req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
972
list_del(&req->rq_list);
975
req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT);
978
switch (PTR_ERR(req)) {
980
rpc_delay(task, HZ >> 2);
981
dprintk("RPC: dynamic allocation of request slot "
982
"failed! Retrying\n");
985
rpc_sleep_on(&xprt->backlog, task, NULL);
986
dprintk("RPC: waiting for request slot\n");
946
988
task->tk_status = -EAGAIN;
947
task->tk_timeout = 0;
948
rpc_sleep_on(&xprt->backlog, task, NULL);
992
task->tk_rqstp = req;
993
xprt_request_init(task, xprt);
951
996
static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
998
if (xprt_dynamic_free_slot(xprt, req))
953
1001
memset(req, 0, sizeof(*req)); /* mark unused */
955
1003
spin_lock(&xprt->reserve_lock);
958
1006
spin_unlock(&xprt->reserve_lock);
961
struct rpc_xprt *xprt_alloc(struct net *net, int size, int max_req)
1009
static void xprt_free_all_slots(struct rpc_xprt *xprt)
1011
struct rpc_rqst *req;
1012
while (!list_empty(&xprt->free)) {
1013
req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
1014
list_del(&req->rq_list);
1019
struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
1020
unsigned int num_prealloc,
1021
unsigned int max_alloc)
963
1023
struct rpc_xprt *xprt;
1024
struct rpc_rqst *req;
965
1027
xprt = kzalloc(size, GFP_KERNEL);
966
1028
if (xprt == NULL)
968
atomic_set(&xprt->count, 1);
970
xprt->max_reqs = max_req;
971
xprt->slot = kcalloc(max_req, sizeof(struct rpc_rqst), GFP_KERNEL);
972
if (xprt->slot == NULL)
1031
xprt_init(xprt, net);
1033
for (i = 0; i < num_prealloc; i++) {
1034
req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
1037
list_add(&req->rq_list, &xprt->free);
1039
if (i < num_prealloc)
1041
if (max_alloc > num_prealloc)
1042
xprt->max_reqs = max_alloc;
1044
xprt->max_reqs = num_prealloc;
1045
xprt->min_reqs = num_prealloc;
1046
atomic_set(&xprt->num_reqs, num_prealloc);
975
xprt->xprt_net = get_net(net);
1002
1074
struct rpc_xprt *xprt = task->tk_xprt;
1004
task->tk_status = -EIO;
1076
task->tk_status = 0;
1077
if (task->tk_rqstp != NULL)
1080
/* Note: grabbing the xprt_lock_write() here is not strictly needed,
1081
* but ensures that we throttle new slot allocation if the transport
1082
* is congested (e.g. if reconnecting or if we're out of socket
1083
* write buffer space).
1085
task->tk_timeout = 0;
1086
task->tk_status = -EAGAIN;
1087
if (!xprt_lock_write(xprt, task))
1005
1090
spin_lock(&xprt->reserve_lock);
1006
1091
xprt_alloc_slot(task);
1007
1092
spin_unlock(&xprt->reserve_lock);
1093
xprt_release_write(xprt, task);
1010
1096
static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
1073
1160
xprt_free_bc_request(req);
1163
static void xprt_init(struct rpc_xprt *xprt, struct net *net)
1165
atomic_set(&xprt->count, 1);
1167
spin_lock_init(&xprt->transport_lock);
1168
spin_lock_init(&xprt->reserve_lock);
1170
INIT_LIST_HEAD(&xprt->free);
1171
INIT_LIST_HEAD(&xprt->recv);
1172
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1173
spin_lock_init(&xprt->bc_pa_lock);
1174
INIT_LIST_HEAD(&xprt->bc_pa_list);
1175
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1177
xprt->last_used = jiffies;
1178
xprt->cwnd = RPC_INITCWND;
1179
xprt->bind_index = 0;
1181
rpc_init_wait_queue(&xprt->binding, "xprt_binding");
1182
rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1183
rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending");
1184
rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
1186
xprt_init_xid(xprt);
1188
xprt->xprt_net = get_net(net);
1077
1192
* xprt_create_transport - create an RPC transport
1078
1193
* @args: rpc transport creation arguments
1100
1214
if (IS_ERR(xprt)) {
1101
1215
dprintk("RPC: xprt_create_transport: failed, %ld\n",
1102
1216
-PTR_ERR(xprt));
1105
if (test_and_set_bit(XPRT_INITIALIZED, &xprt->state))
1106
/* ->setup returned a pre-initialized xprt: */
1109
spin_lock_init(&xprt->transport_lock);
1110
spin_lock_init(&xprt->reserve_lock);
1112
INIT_LIST_HEAD(&xprt->free);
1113
INIT_LIST_HEAD(&xprt->recv);
1114
#if defined(CONFIG_NFS_V4_1)
1115
spin_lock_init(&xprt->bc_pa_lock);
1116
INIT_LIST_HEAD(&xprt->bc_pa_list);
1117
#endif /* CONFIG_NFS_V4_1 */
1119
1219
INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
1120
1220
if (xprt_has_timer(xprt))
1121
1221
setup_timer(&xprt->timer, xprt_init_autodisconnect,
1122
1222
(unsigned long)xprt);
1124
1224
init_timer(&xprt->timer);
1125
xprt->last_used = jiffies;
1126
xprt->cwnd = RPC_INITCWND;
1127
xprt->bind_index = 0;
1129
rpc_init_wait_queue(&xprt->binding, "xprt_binding");
1130
rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1131
rpc_init_wait_queue(&xprt->sending, "xprt_sending");
1132
rpc_init_wait_queue(&xprt->resend, "xprt_resend");
1133
rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
1135
/* initialize free list */
1136
for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
1137
list_add(&req->rq_list, &xprt->free);
1139
xprt_init_xid(xprt);
1141
1225
dprintk("RPC: created transport %p with %u slots\n", xprt,
1142
1226
xprt->max_reqs);
1157
1242
rpc_destroy_wait_queue(&xprt->binding);
1158
1243
rpc_destroy_wait_queue(&xprt->pending);
1159
1244
rpc_destroy_wait_queue(&xprt->sending);
1160
rpc_destroy_wait_queue(&xprt->resend);
1161
1245
rpc_destroy_wait_queue(&xprt->backlog);
1162
1246
cancel_work_sync(&xprt->task_cleanup);