~ubuntu-branches/ubuntu/precise/linux-lowlatency/precise

« back to all changes in this revision

Viewing changes to net/rds/threads.c

  • Committer: Package Import Robot
  • Author(s): Alessio Igor Bogani
  • Date: 2011-10-26 11:13:05 UTC
  • Revision ID: package-import@ubuntu.com-20111026111305-tz023xykf0i6eosh
Tags: upstream-3.2.0
ImportĀ upstreamĀ versionĀ 3.2.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2006 Oracle.  All rights reserved.
 
3
 *
 
4
 * This software is available to you under a choice of one of two
 
5
 * licenses.  You may choose to be licensed under the terms of the GNU
 
6
 * General Public License (GPL) Version 2, available from the file
 
7
 * COPYING in the main directory of this source tree, or the
 
8
 * OpenIB.org BSD license below:
 
9
 *
 
10
 *     Redistribution and use in source and binary forms, with or
 
11
 *     without modification, are permitted provided that the following
 
12
 *     conditions are met:
 
13
 *
 
14
 *      - Redistributions of source code must retain the above
 
15
 *        copyright notice, this list of conditions and the following
 
16
 *        disclaimer.
 
17
 *
 
18
 *      - Redistributions in binary form must reproduce the above
 
19
 *        copyright notice, this list of conditions and the following
 
20
 *        disclaimer in the documentation and/or other materials
 
21
 *        provided with the distribution.
 
22
 *
 
23
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 
24
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 
25
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 
26
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 
27
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 
28
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 
29
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 
30
 * SOFTWARE.
 
31
 *
 
32
 */
 
33
#include <linux/kernel.h>
 
34
#include <linux/random.h>
 
35
#include <linux/export.h>
 
36
 
 
37
#include "rds.h"
 
38
 
 
39
/*
 
40
 * All of connection management is simplified by serializing it through
 
41
 * work queues that execute in a connection managing thread.
 
42
 *
 
43
 * TCP wants to send acks through sendpage() in response to data_ready(),
 
44
 * but it needs a process context to do so.
 
45
 *
 
46
 * The receive paths need to allocate but can't drop packets (!) so we have
 
47
 * a thread around to block allocating if the receive fast path sees an
 
48
 * allocation failure.
 
49
 */
 
50
 
 
51
/* Grand Unified Theory of connection life cycle:
 
52
 * At any point in time, the connection can be in one of these states:
 
53
 * DOWN, CONNECTING, UP, DISCONNECTING, ERROR
 
54
 *
 
55
 * The following transitions are possible:
 
56
 *  ANY           -> ERROR
 
57
 *  UP            -> DISCONNECTING
 
58
 *  ERROR         -> DISCONNECTING
 
59
 *  DISCONNECTING -> DOWN
 
60
 *  DOWN          -> CONNECTING
 
61
 *  CONNECTING    -> UP
 
62
 *
 
63
 * Transition to state DISCONNECTING/DOWN:
 
64
 *  -   Inside the shutdown worker; synchronizes with xmit path
 
65
 *      through RDS_IN_XMIT, and with connection management callbacks
 
66
 *      via c_cm_lock.
 
67
 *
 
68
 *      For receive callbacks, we rely on the underlying transport
 
69
 *      (TCP, IB/RDMA) to provide the necessary synchronisation.
 
70
 */
 
71
struct workqueue_struct *rds_wq;
 
72
EXPORT_SYMBOL_GPL(rds_wq);
 
73
 
 
74
void rds_connect_complete(struct rds_connection *conn)
 
75
{
 
76
        if (!rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_UP)) {
 
77
                printk(KERN_WARNING "%s: Cannot transition to state UP, "
 
78
                                "current state is %d\n",
 
79
                                __func__,
 
80
                                atomic_read(&conn->c_state));
 
81
                atomic_set(&conn->c_state, RDS_CONN_ERROR);
 
82
                queue_work(rds_wq, &conn->c_down_w);
 
83
                return;
 
84
        }
 
85
 
 
86
        rdsdebug("conn %p for %pI4 to %pI4 complete\n",
 
87
          conn, &conn->c_laddr, &conn->c_faddr);
 
88
 
 
89
        conn->c_reconnect_jiffies = 0;
 
90
        set_bit(0, &conn->c_map_queued);
 
91
        queue_delayed_work(rds_wq, &conn->c_send_w, 0);
 
92
        queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
 
93
}
 
94
EXPORT_SYMBOL_GPL(rds_connect_complete);
 
95
 
 
96
/*
 
97
 * This random exponential backoff is relied on to eventually resolve racing
 
98
 * connects.
 
99
 *
 
100
 * If connect attempts race then both parties drop both connections and come
 
101
 * here to wait for a random amount of time before trying again.  Eventually
 
102
 * the backoff range will be so much greater than the time it takes to
 
103
 * establish a connection that one of the pair will establish the connection
 
104
 * before the other's random delay fires.
 
105
 *
 
106
 * Connection attempts that arrive while a connection is already established
 
107
 * are also considered to be racing connects.  This lets a connection from
 
108
 * a rebooted machine replace an existing stale connection before the transport
 
109
 * notices that the connection has failed.
 
110
 *
 
111
 * We should *always* start with a random backoff; otherwise a broken connection
 
112
 * will always take several iterations to be re-established.
 
113
 */
 
114
void rds_queue_reconnect(struct rds_connection *conn)
 
115
{
 
116
        unsigned long rand;
 
117
 
 
118
        rdsdebug("conn %p for %pI4 to %pI4 reconnect jiffies %lu\n",
 
119
          conn, &conn->c_laddr, &conn->c_faddr,
 
120
          conn->c_reconnect_jiffies);
 
121
 
 
122
        set_bit(RDS_RECONNECT_PENDING, &conn->c_flags);
 
123
        if (conn->c_reconnect_jiffies == 0) {
 
124
                conn->c_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
 
125
                queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
 
126
                return;
 
127
        }
 
128
 
 
129
        get_random_bytes(&rand, sizeof(rand));
 
130
        rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n",
 
131
                 rand % conn->c_reconnect_jiffies, conn->c_reconnect_jiffies,
 
132
                 conn, &conn->c_laddr, &conn->c_faddr);
 
133
        queue_delayed_work(rds_wq, &conn->c_conn_w,
 
134
                           rand % conn->c_reconnect_jiffies);
 
135
 
 
136
        conn->c_reconnect_jiffies = min(conn->c_reconnect_jiffies * 2,
 
137
                                        rds_sysctl_reconnect_max_jiffies);
 
138
}
 
139
 
 
140
void rds_connect_worker(struct work_struct *work)
 
141
{
 
142
        struct rds_connection *conn = container_of(work, struct rds_connection, c_conn_w.work);
 
143
        int ret;
 
144
 
 
145
        clear_bit(RDS_RECONNECT_PENDING, &conn->c_flags);
 
146
        if (rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
 
147
                ret = conn->c_trans->conn_connect(conn);
 
148
                rdsdebug("conn %p for %pI4 to %pI4 dispatched, ret %d\n",
 
149
                        conn, &conn->c_laddr, &conn->c_faddr, ret);
 
150
 
 
151
                if (ret) {
 
152
                        if (rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_DOWN))
 
153
                                rds_queue_reconnect(conn);
 
154
                        else
 
155
                                rds_conn_error(conn, "RDS: connect failed\n");
 
156
                }
 
157
        }
 
158
}
 
159
 
 
160
void rds_send_worker(struct work_struct *work)
 
161
{
 
162
        struct rds_connection *conn = container_of(work, struct rds_connection, c_send_w.work);
 
163
        int ret;
 
164
 
 
165
        if (rds_conn_state(conn) == RDS_CONN_UP) {
 
166
                ret = rds_send_xmit(conn);
 
167
                rdsdebug("conn %p ret %d\n", conn, ret);
 
168
                switch (ret) {
 
169
                case -EAGAIN:
 
170
                        rds_stats_inc(s_send_immediate_retry);
 
171
                        queue_delayed_work(rds_wq, &conn->c_send_w, 0);
 
172
                        break;
 
173
                case -ENOMEM:
 
174
                        rds_stats_inc(s_send_delayed_retry);
 
175
                        queue_delayed_work(rds_wq, &conn->c_send_w, 2);
 
176
                default:
 
177
                        break;
 
178
                }
 
179
        }
 
180
}
 
181
 
 
182
void rds_recv_worker(struct work_struct *work)
 
183
{
 
184
        struct rds_connection *conn = container_of(work, struct rds_connection, c_recv_w.work);
 
185
        int ret;
 
186
 
 
187
        if (rds_conn_state(conn) == RDS_CONN_UP) {
 
188
                ret = conn->c_trans->recv(conn);
 
189
                rdsdebug("conn %p ret %d\n", conn, ret);
 
190
                switch (ret) {
 
191
                case -EAGAIN:
 
192
                        rds_stats_inc(s_recv_immediate_retry);
 
193
                        queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
 
194
                        break;
 
195
                case -ENOMEM:
 
196
                        rds_stats_inc(s_recv_delayed_retry);
 
197
                        queue_delayed_work(rds_wq, &conn->c_recv_w, 2);
 
198
                default:
 
199
                        break;
 
200
                }
 
201
        }
 
202
}
 
203
 
 
204
void rds_shutdown_worker(struct work_struct *work)
 
205
{
 
206
        struct rds_connection *conn = container_of(work, struct rds_connection, c_down_w);
 
207
 
 
208
        rds_conn_shutdown(conn);
 
209
}
 
210
 
 
211
void rds_threads_exit(void)
 
212
{
 
213
        destroy_workqueue(rds_wq);
 
214
}
 
215
 
 
216
int rds_threads_init(void)
 
217
{
 
218
        rds_wq = create_singlethread_workqueue("krdsd");
 
219
        if (!rds_wq)
 
220
                return -ENOMEM;
 
221
 
 
222
        return 0;
 
223
}