~ubuntu-branches/ubuntu/karmic/conntrack/karmic

« back to all changes in this revision

Viewing changes to src/sync-ftfw.c

  • Committer: Bazaar Package Importer
  • Author(s): Andres Rodriguez
  • Date: 2009-06-18 18:27:31 UTC
  • mfrom: (3.1.2 squeeze)
  • Revision ID: james.westby@ubuntu.com-20090618182731-xlunt56xusrrif92
Tags: 1:0.9.12-1ubuntu1
* Merge from debian unstable (LP: #380358), remaining changes:
  - Error on fwrite failure in src/read_config_lex.c.
  - Patch from Kees Cook to not ignore return value of chdir call.
* debian/copyright: Updated download site.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
#include "conntrackd.h"
20
20
#include "sync.h"
21
 
#include "us-conntrack.h"
22
21
#include "queue.h"
23
 
#include "debug.h"
24
22
#include "network.h"
25
23
#include "alarm.h"
26
24
#include "log.h"
27
25
#include "cache.h"
28
 
#include "event.h"
 
26
#include "fds.h"
29
27
 
30
28
#include <string.h>
 
29
#include <errno.h>
31
30
 
32
31
#if 0 
33
32
#define dp printf
35
34
#define dp(...)
36
35
#endif
37
36
 
38
 
#if 0 
39
 
#define dprint printf
40
 
#else
41
 
#define dprint(...)
42
 
#endif
43
 
 
44
 
static LIST_HEAD(rs_list);
45
 
static LIST_HEAD(tx_list);
46
 
static unsigned int rs_list_len;
47
 
static unsigned int tx_list_len;
48
 
static struct queue *rs_queue;
49
 
static struct queue *tx_queue;
 
37
struct queue *rs_queue;
50
38
static uint32_t exp_seq;
51
39
static uint32_t window;
52
40
static uint32_t ack_from;
53
41
static int ack_from_set = 0;
54
42
static struct alarm_block alive_alarm;
55
 
static int hello_state = SAY_HELLO;
 
43
 
 
44
enum {
 
45
        HELLO_INIT,
 
46
        HELLO_SAY,
 
47
        HELLO_DONE,
 
48
};
 
49
static int hello_state = HELLO_INIT;
 
50
static int say_hello_back;
56
51
 
57
52
/* XXX: alive message expiration configurable */
58
53
#define ALIVE_INT 1
59
54
 
60
55
struct cache_ftfw {
61
 
        struct list_head        rs_list;
62
 
        struct list_head        tx_list;
 
56
        struct queue_node       qnode;
63
57
        uint32_t                seq;
64
58
};
65
59
 
66
 
static void cache_ftfw_add(struct us_conntrack *u, void *data)
 
60
static void cache_ftfw_add(struct cache_object *obj, void *data)
67
61
{
68
62
        struct cache_ftfw *cn = data;
69
63
        /* These nodes are not inserted in the list */
70
 
        INIT_LIST_HEAD(&cn->rs_list);
71
 
        INIT_LIST_HEAD(&cn->tx_list);
 
64
        queue_node_init(&cn->qnode, Q_ELEM_OBJ);
72
65
}
73
66
 
74
 
static void cache_ftfw_del(struct us_conntrack *u, void *data)
 
67
static void cache_ftfw_del(struct cache_object *obj, void *data)
75
68
{
76
69
        struct cache_ftfw *cn = data;
77
 
 
78
 
        /* this node is already out of the list */
79
 
        if (list_empty(&cn->rs_list))
80
 
                return;
81
 
 
82
 
        /* no need for list_del_init since the entry is destroyed */
83
 
        list_del(&cn->rs_list);
84
 
        rs_list_len--;
 
70
        queue_del(&cn->qnode);
85
71
}
86
72
 
87
73
static struct cache_extra cache_ftfw_extra = {
90
76
        .destroy        = cache_ftfw_del
91
77
};
92
78
 
 
79
static void nethdr_set_hello(struct nethdr *net)
 
80
{
 
81
        switch(hello_state) {
 
82
        case HELLO_INIT:
 
83
                hello_state = HELLO_SAY;
 
84
                /* fall through */
 
85
        case HELLO_SAY:
 
86
                net->flags |= NET_F_HELLO;
 
87
                break;
 
88
        }
 
89
        if (say_hello_back) {
 
90
                net->flags |= NET_F_HELLO_BACK;
 
91
                say_hello_back = 0;
 
92
        }
 
93
}
 
94
 
93
95
static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)
94
96
{
95
 
        struct nethdr_ack ack = {
96
 
                .flags = flags,
97
 
                .from  = from,
98
 
                .to    = to,
99
 
        };
100
 
 
101
 
        switch(hello_state) {
102
 
        case SAY_HELLO:
103
 
                ack.flags |= NET_F_HELLO;
104
 
                break;
105
 
        case HELLO_BACK:
106
 
                ack.flags |= NET_F_HELLO_BACK;
107
 
                hello_state = HELLO_DONE;
108
 
                break;
109
 
        }
110
 
 
111
 
        queue_add(tx_queue, &ack, NETHDR_ACK_SIZ);
112
 
        write_evfd(STATE_SYNC(evfd));
113
 
}
114
 
 
115
 
static void ftfw_run(void);
 
97
        struct queue_object *qobj;
 
98
        struct nethdr_ack *ack;
 
99
 
 
100
        qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack));
 
101
        if (qobj == NULL)
 
102
                return;
 
103
 
 
104
        ack             = (struct nethdr_ack *)qobj->data;
 
105
        ack->type       = NET_T_CTL;
 
106
        ack->flags      = flags;
 
107
        ack->from       = from;
 
108
        ack->to         = to;
 
109
 
 
110
        queue_add(STATE_SYNC(tx_queue), &qobj->qnode);
 
111
}
 
112
 
 
113
static void tx_queue_add_ctlmsg2(uint32_t flags)
 
114
{
 
115
        struct queue_object *qobj;
 
116
        struct nethdr *ctl;
 
117
 
 
118
        qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack));
 
119
        if (qobj == NULL)
 
120
                return;
 
121
 
 
122
        ctl             = (struct nethdr *)qobj->data;
 
123
        ctl->type       = NET_T_CTL;
 
124
        ctl->flags      = flags;
 
125
 
 
126
        queue_add(STATE_SYNC(tx_queue), &qobj->qnode);
 
127
}
116
128
 
117
129
/* this function is called from the alarm framework */
118
130
static void do_alive_alarm(struct alarm_block *a, void *data)
119
131
{
120
 
        if (ack_from_set && mcast_track_is_seq_set()) {
 
132
        if (ack_from_set && nethdr_track_is_seq_set()) {
121
133
                /* exp_seq contains the last update received */
122
 
                dprint("send ALIVE ACK (from=%u, to=%u)\n",
123
 
                        ack_from, STATE_SYNC(last_seq_recv));
124
134
                tx_queue_add_ctlmsg(NET_F_ACK,
125
135
                                    ack_from,
126
136
                                    STATE_SYNC(last_seq_recv));
127
137
                ack_from_set = 0;
128
138
        } else
129
 
                tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0);
130
 
 
131
 
        /* TODO: no need for buffered send, extracted from run_sync() */
132
 
        ftfw_run();
133
 
        mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
134
 
}
135
 
 
136
 
#undef _SIGNAL_DEBUG
137
 
#ifdef _SIGNAL_DEBUG
138
 
 
139
 
static int rs_dump(void *data1, const void *data2)
140
 
{
141
 
        struct nethdr_ack *net = data1;
142
 
 
143
 
        dprint("in RS queue -> seq:%u flags:%u\n", net->seq, net->flags);
144
 
 
145
 
        return 0;
146
 
}
147
 
 
148
 
#include <signal.h>
149
 
 
150
 
static void my_dump(int foo)
151
 
{
152
 
        struct cache_ftfw *cn, *tmp;
153
 
 
154
 
        list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
155
 
                struct us_conntrack *u;
156
 
                
157
 
                u = cache_get_conntrack(STATE_SYNC(internal), cn);
158
 
                dprint("in RS list -> seq:%u\n", cn->seq);
159
 
        }
160
 
 
161
 
        queue_iterate(rs_queue, NULL, rs_dump);
162
 
}
163
 
 
164
 
#endif
 
139
                tx_queue_add_ctlmsg2(NET_F_ALIVE);
 
140
 
 
141
        add_alarm(&alive_alarm, ALIVE_INT, 0);
 
142
}
165
143
 
166
144
static int ftfw_init(void)
167
145
{
168
 
        tx_queue = queue_create(CONFIG(resend_queue_size));
169
 
        if (tx_queue == NULL) {
170
 
                dlog(LOG_ERR, "cannot create tx queue");
171
 
                return -1;
172
 
        }
173
 
 
174
 
        rs_queue = queue_create(CONFIG(resend_queue_size));
 
146
        rs_queue = queue_create(CONFIG(resend_queue_size), 0);
175
147
        if (rs_queue == NULL) {
176
148
                dlog(LOG_ERR, "cannot create rs queue");
177
149
                return -1;
183
155
        /* set ack window size */
184
156
        window = CONFIG(window_size);
185
157
 
186
 
#ifdef _SIGNAL_DEBUG
187
 
        signal(SIGUSR1, my_dump);
188
 
#endif
189
 
 
190
158
        return 0;
191
159
}
192
160
 
193
161
static void ftfw_kill(void)
194
162
{
195
163
        queue_destroy(rs_queue);
196
 
        queue_destroy(tx_queue);
197
164
}
198
165
 
199
166
static int do_cache_to_tx(void *data1, void *data2)
200
167
{
201
 
        struct us_conntrack *u = data2;
202
 
        struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), u);
203
 
 
204
 
        /* add to tx list */
205
 
        list_add_tail(&cn->tx_list, &tx_list);
206
 
        tx_list_len++;
207
 
        write_evfd(STATE_SYNC(evfd));
208
 
 
209
 
        return 0;
 
168
        struct cache_object *obj = data2;
 
169
        struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj);
 
170
 
 
171
        if (queue_in(rs_queue, &cn->qnode)) {
 
172
                queue_del(&cn->qnode);
 
173
                queue_add(STATE_SYNC(tx_queue), &cn->qnode);
 
174
        } else {
 
175
                if (queue_add(STATE_SYNC(tx_queue), &cn->qnode))
 
176
                        cache_object_get(obj);
 
177
        }
 
178
        return 0;
 
179
}
 
180
 
 
181
static int rs_queue_dump(struct queue_node *n, const void *data2)
 
182
{
 
183
        const int *fd = data2;
 
184
        char buf[512];
 
185
        int size;
 
186
 
 
187
        switch(n->type) {
 
188
                case Q_ELEM_CTL: {
 
189
                        struct nethdr *net = queue_node_data(n);
 
190
                        size = sprintf(buf, "control -> seq:%u flags:%u\n",
 
191
                                            net->seq, net->flags);
 
192
                        break;
 
193
                }
 
194
                case Q_ELEM_OBJ: {
 
195
                        struct cache_ftfw *cn = (struct cache_ftfw *) n;
 
196
                        size = sprintf(buf, "object -> seq:%u\n", cn->seq);
 
197
                break;
 
198
                }
 
199
                default:
 
200
                        return 0;
 
201
        }
 
202
        send(*fd, buf, size, 0);
 
203
        return 0;
 
204
}
 
205
 
 
206
static void ftfw_local_queue(int fd)
 
207
{
 
208
        char buf[512];
 
209
        int size;
 
210
 
 
211
        size = sprintf(buf, "resent queue (len=%u)\n", queue_len(rs_queue));
 
212
        send(fd, buf, size, 0);
 
213
        queue_iterate(rs_queue, &fd, rs_queue_dump);
210
214
}
211
215
 
212
216
static int ftfw_local(int fd, int type, void *data)
222
226
                dlog(LOG_NOTICE, "sending bulk update");
223
227
                cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx);
224
228
                break;
 
229
        case STATS_QUEUE:
 
230
                ftfw_local_queue(fd);
 
231
                break;
225
232
        default:
226
233
                ret = 0;
227
234
                break;
230
237
        return ret;
231
238
}
232
239
 
233
 
static int rs_queue_to_tx(void *data1, const void *data2)
 
240
static int rs_queue_to_tx(struct queue_node *n, const void *data)
234
241
{
235
 
        struct nethdr_ack *net = data1;
236
 
        const struct nethdr_ack *nack = data2;
237
 
 
238
 
        if (between(net->seq, nack->from, nack->to)) {
 
242
        const struct nethdr_ack *nack = data;
 
243
 
 
244
        switch(n->type) {
 
245
        case Q_ELEM_CTL: {
 
246
                struct nethdr_ack *net = queue_node_data(n);
 
247
 
 
248
                if (before(net->seq, nack->from))
 
249
                        return 0;       /* continue */
 
250
                else if (after(net->seq, nack->to))
 
251
                        return 1;       /* break */
 
252
 
239
253
                dp("rs_queue_to_tx sq: %u fl:%u len:%u\n",
240
254
                        net->seq, net->flags, net->len);
241
 
                queue_add(tx_queue, net, net->len);
242
 
                write_evfd(STATE_SYNC(evfd));
243
 
                queue_del(rs_queue, net);
 
255
 
 
256
                queue_del(n);
 
257
                queue_add(STATE_SYNC(tx_queue), n);
 
258
                break;
 
259
        }
 
260
        case Q_ELEM_OBJ: {
 
261
                struct cache_ftfw *cn;
 
262
 
 
263
                cn = (struct cache_ftfw *) n;
 
264
                if (before(cn->seq, nack->from))
 
265
                        return 0;
 
266
                else if (after(cn->seq, nack->to))
 
267
                        return 1;
 
268
 
 
269
                dp("resending nack'ed (oldseq=%u)\n", cn->seq);
 
270
 
 
271
                queue_del(n);
 
272
                queue_add(STATE_SYNC(tx_queue), n);
 
273
                break;
 
274
        }
244
275
        }
245
276
        return 0;
246
277
}
247
278
 
248
 
static int rs_queue_empty(void *data1, const void *data2)
 
279
static int rs_queue_empty(struct queue_node *n, const void *data)
249
280
{
250
 
        struct nethdr *net = data1;
251
 
        const struct nethdr_ack *h = data2;
252
 
 
253
 
        if (between(net->seq, h->from, h->to)) {
 
281
        const struct nethdr_ack *h = data;
 
282
 
 
283
        switch(n->type) {
 
284
        case Q_ELEM_CTL: {
 
285
                struct nethdr_ack *net = queue_node_data(n);
 
286
 
 
287
                if (h == NULL) {
 
288
                        queue_del(n);
 
289
                        queue_object_free((struct queue_object *)n);
 
290
                        return 0;
 
291
                }
 
292
                if (before(net->seq, h->from))
 
293
                        return 0;       /* continue */
 
294
                else if (after(net->seq, h->to))
 
295
                        return 1;       /* break */
 
296
 
254
297
                dp("remove from queue (seq=%u)\n", net->seq);
255
 
                queue_del(rs_queue, data1);
 
298
                queue_del(n);
 
299
                queue_object_free((struct queue_object *)n);
 
300
                break;
 
301
        }
 
302
        case Q_ELEM_OBJ: {
 
303
                struct cache_ftfw *cn;
 
304
                struct cache_object *obj;
 
305
 
 
306
                cn = (struct cache_ftfw *) n;
 
307
                if (h == NULL) {
 
308
                        queue_del(n);
 
309
                        obj = cache_data_get_object(STATE_SYNC(internal), cn);
 
310
                        cache_object_put(obj);
 
311
                        return 0;
 
312
                }
 
313
                if (before(cn->seq, h->from))
 
314
                        return 0;
 
315
                else if (after(cn->seq, h->to))
 
316
                        return 1;
 
317
 
 
318
                dp("queue: deleting from queue (seq=%u)\n", cn->seq);
 
319
                queue_del(n);
 
320
                obj = cache_data_get_object(STATE_SYNC(internal), cn);
 
321
                cache_object_put(obj);
 
322
                break;
 
323
        }
256
324
        }
257
325
        return 0;
258
326
}
259
327
 
260
 
static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to)
261
 
{
262
 
        struct cache_ftfw *cn, *tmp;
263
 
 
264
 
        list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
265
 
                struct us_conntrack *u;
266
 
                
267
 
                u = cache_get_conntrack(STATE_SYNC(internal), cn);
268
 
                if (between(cn->seq, from, to)) {
269
 
                        dp("resending nack'ed (oldseq=%u)\n", cn->seq);
270
 
                        list_del_init(&cn->rs_list);
271
 
                        rs_list_len--;
272
 
                        list_add_tail(&cn->tx_list, &tx_list);
273
 
                        tx_list_len++;
274
 
                        write_evfd(STATE_SYNC(evfd));
275
 
                }
276
 
        } 
277
 
}
278
 
 
279
 
static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to)
280
 
{
281
 
        struct cache_ftfw *cn, *tmp;
282
 
 
283
 
        list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
284
 
                struct us_conntrack *u;
285
 
 
286
 
                u = cache_get_conntrack(STATE_SYNC(internal), cn);
287
 
                if (between(cn->seq, from, to)) {
288
 
                        dp("queue: deleting from queue (seq=%u)\n", cn->seq);
289
 
                        list_del_init(&cn->rs_list);
290
 
                        rs_list_len--;
291
 
                }
292
 
        }
293
 
}
294
 
 
295
328
static int digest_msg(const struct nethdr *net)
296
329
{
297
330
        if (IS_DATA(net))
300
333
        else if (IS_ACK(net)) {
301
334
                const struct nethdr_ack *h = (const struct nethdr_ack *) net;
302
335
 
303
 
                dprint("ACK(%u): from seq=%u to seq=%u\n",
304
 
                        h->seq, h->from, h->to);
305
 
                rs_list_empty(STATE_SYNC(internal), h->from, h->to);
 
336
                if (before(h->to, h->from))
 
337
                        return MSG_BAD;
 
338
 
306
339
                queue_iterate(rs_queue, h, rs_queue_empty);
307
340
                return MSG_CTL;
308
341
 
309
342
        } else if (IS_NACK(net)) {
310
343
                const struct nethdr_ack *nack = (const struct nethdr_ack *) net;
311
344
 
312
 
                dprint("NACK(%u): from seq=%u to seq=%u\n",
313
 
                        nack->seq, nack->from, nack->to);
314
 
                rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to);
 
345
                if (before(nack->to, nack->from))
 
346
                        return MSG_BAD;
 
347
 
315
348
                queue_iterate(rs_queue, nack, rs_queue_to_tx);
316
349
                return MSG_CTL;
317
350
 
331
364
        int ret = 0;
332
365
 
333
366
        if (IS_HELLO(net)) {
334
 
                dlog(LOG_NOTICE, "The other node says HELLO");
335
 
                hello_state = HELLO_BACK;
 
367
                say_hello_back = 1;
336
368
                ret = 1;
337
 
        } else if (IS_HELLO_BACK(net)) {
338
 
                dlog(LOG_NOTICE, "The other node says HELLO BACK");
339
 
                hello_state = HELLO_DONE;
 
369
        }
 
370
        if (IS_HELLO_BACK(net)) {
 
371
                /* this is a hello back for a requested hello */
 
372
                if (hello_state == HELLO_SAY)
 
373
                        hello_state = HELLO_DONE;
340
374
        }
341
375
 
342
376
        return ret;
346
380
{
347
381
        int ret = MSG_DATA;
348
382
 
349
 
        if (digest_hello(net))
 
383
        if (digest_hello(net)) {
 
384
                /* we have received a hello while we had data to acknowledge.
 
385
                 * reset the window, the other doesn't know anthing about it. */
 
386
                if (ack_from_set && before(net->seq, ack_from)) {
 
387
                        window = CONFIG(window_size) - 1;
 
388
                        ack_from = net->seq;
 
389
                }
 
390
 
 
391
                /* XXX: flush the resend queues since the other does not 
 
392
                 * know anything about that data, we are unreliable until 
 
393
                 * the helloing finishes */
 
394
                queue_iterate(rs_queue, NULL, rs_queue_empty);
 
395
 
350
396
                goto bypass;
 
397
        }
351
398
 
352
 
        switch (mcast_track_seq(net->seq, &exp_seq)) {
 
399
        switch (nethdr_track_seq(net->seq, &exp_seq)) {
353
400
        case SEQ_AFTER:
354
401
                ret = digest_msg(net);
355
402
                if (ret == MSG_BAD) {
359
406
 
360
407
                if (ack_from_set) {
361
408
                        tx_queue_add_ctlmsg(NET_F_ACK, ack_from, exp_seq-1);
362
 
                        dprint("OFS send half ACK: from seq=%u to seq=%u\n", 
363
 
                                ack_from, exp_seq-1);
364
409
                        ack_from_set = 0;
365
410
                }
366
411
 
367
412
                tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1);
368
 
                dprint("OFS send NACK: from seq=%u to seq=%u\n", 
369
 
                        exp_seq, net->seq-1);
370
413
 
371
414
                /* count this message as part of the new window */
372
415
                window = CONFIG(window_size) - 1;
376
419
 
377
420
        case SEQ_BEFORE:
378
421
                /* we don't accept delayed packets */
379
 
                dlog(LOG_WARNING, "Received seq=%u before expected seq=%u",
380
 
                                   net->seq, exp_seq);
381
422
                ret = MSG_DROP;
382
423
                break;
383
424
 
397
438
 
398
439
                if (--window <= 0) {
399
440
                        /* received a window, send an acknowledgement */
400
 
                        dprint("OFS send ACK: from seq=%u to seq=%u\n",
401
 
                                ack_from, net->seq);
402
 
 
403
441
                        tx_queue_add_ctlmsg(NET_F_ACK, ack_from, net->seq);
404
442
                        window = CONFIG(window_size);
405
443
                        ack_from_set = 0;
408
446
 
409
447
out:
410
448
        if ((ret == MSG_DATA || ret == MSG_CTL))
411
 
                mcast_track_update_seq(net->seq);
 
449
                nethdr_track_update_seq(net->seq);
412
450
 
413
451
        return ret;
414
452
}
415
453
 
416
 
static void ftfw_send(struct nethdr *net, struct us_conntrack *u)
417
 
{
418
 
        struct netpld *pld = NETHDR_DATA(net);
419
 
        struct cache_ftfw *cn;
420
 
 
421
 
        switch(ntohs(pld->query)) {
422
 
        case NFCT_Q_CREATE:
423
 
        case NFCT_Q_UPDATE:
424
 
        case NFCT_Q_DESTROY:
425
 
                cn = (struct cache_ftfw *) 
426
 
                        cache_get_extra(STATE_SYNC(internal), u);
427
 
 
428
 
                if (!list_empty(&cn->rs_list)) {
429
 
                        list_del_init(&cn->rs_list);
430
 
                        rs_list_len--;
431
 
                }
432
 
 
433
 
                switch(hello_state) {
434
 
                case SAY_HELLO:
435
 
                        net->flags = ntohs(net->flags) | NET_F_HELLO;
436
 
                        net->flags = htons(net->flags);
437
 
                        break;
438
 
                case HELLO_BACK:
439
 
                        net->flags = ntohs(net->flags) | NET_F_HELLO_BACK;
440
 
                        net->flags = htons(net->flags);
441
 
                        hello_state = HELLO_DONE;
442
 
                        break;
443
 
                }
444
 
 
 
454
static void rs_queue_purge_full(void)
 
455
{
 
456
        struct queue_node *n;
 
457
 
 
458
        n = queue_del_head(rs_queue);
 
459
        switch(n->type) {
 
460
        case Q_ELEM_CTL: {
 
461
                struct queue_object *qobj = (struct queue_object *)n;
 
462
                queue_object_free(qobj);
 
463
                break;
 
464
        }
 
465
        case Q_ELEM_OBJ: {
 
466
                struct cache_ftfw *cn;
 
467
                struct cache_object *obj;
 
468
 
 
469
                cn = (struct cache_ftfw *)n;
 
470
                obj = cache_data_get_object(STATE_SYNC(internal), cn);
 
471
                cache_object_put(obj);
 
472
                break;
 
473
        }
 
474
        }
 
475
}
 
476
 
 
477
static int tx_queue_xmit(struct queue_node *n, const void *data)
 
478
{
 
479
        queue_del(n);
 
480
 
 
481
        switch(n->type) {
 
482
        case Q_ELEM_CTL: {
 
483
                struct nethdr *net = queue_node_data(n);
 
484
 
 
485
                nethdr_set_hello(net);
 
486
 
 
487
                if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
 
488
                        nethdr_set_ack(net);
 
489
                } else {
 
490
                        nethdr_set_ctl(net);
 
491
                }
 
492
                HDR_HOST2NETWORK(net);
 
493
 
 
494
                dp("tx_queue sq: %u fl:%u len:%u\n",
 
495
                       ntohl(net->seq), net->flags, ntohs(net->len));
 
496
 
 
497
                multichannel_send(STATE_SYNC(channel), net);
 
498
                HDR_NETWORK2HOST(net);
 
499
 
 
500
                if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
 
501
                        if (queue_add(rs_queue, n) < 0) {
 
502
                                if (errno == ENOSPC) {
 
503
                                        rs_queue_purge_full();
 
504
                                        queue_add(rs_queue, n);
 
505
                                }
 
506
                        }
 
507
                } else
 
508
                        queue_object_free((struct queue_object *)n);
 
509
                break;
 
510
        }
 
511
        case Q_ELEM_OBJ: {
 
512
                struct cache_ftfw *cn;
 
513
                struct cache_object *obj;
 
514
                int type;
 
515
                struct nethdr *net;
 
516
 
 
517
                cn = (struct cache_ftfw *)n;
 
518
                obj = cache_data_get_object(STATE_SYNC(internal), cn);
 
519
                type = object_status_to_network_type(obj->status);
 
520
                net = BUILD_NETMSG(obj->ct, type);
 
521
                nethdr_set_hello(net);
 
522
 
 
523
                dp("tx_list sq: %u fl:%u len:%u\n",
 
524
                        ntohl(net->seq), net->flags, ntohs(net->len));
 
525
 
 
526
                multichannel_send(STATE_SYNC(channel), net);
445
527
                cn->seq = ntohl(net->seq);
446
 
                list_add_tail(&cn->rs_list, &rs_list);
447
 
                rs_list_len++;
 
528
                if (queue_add(rs_queue, &cn->qnode) < 0) {
 
529
                        if (errno == ENOSPC) {
 
530
                                rs_queue_purge_full();
 
531
                                queue_add(rs_queue, &cn->qnode);
 
532
                        }
 
533
                }
 
534
                /* we release the object once we get the acknowlegment */
448
535
                break;
449
536
        }
450
 
}
451
 
 
452
 
static int tx_queue_xmit(void *data1, const void *data2)
453
 
{
454
 
        struct nethdr *net = data1;
455
 
        size_t len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
456
 
 
457
 
        dp("tx_queue sq: %u fl:%u len:%u\n",
458
 
               ntohl(net->seq), ntohs(net->flags), ntohs(net->len));
459
 
 
460
 
        mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
461
 
        HDR_NETWORK2HOST(net);
462
 
 
463
 
        if (IS_DATA(net) || IS_ACK(net) || IS_NACK(net)) {
464
 
                dprint("tx_queue -> to_rs_queue sq: %u fl:%u len:%u\n",
465
 
                       net->seq, net->flags, net->len);
466
 
                queue_add(rs_queue, net, net->len);
467
537
        }
468
 
        queue_del(tx_queue, net);
469
538
 
470
539
        return 0;
471
540
}
472
541
 
473
 
static int tx_list_xmit(struct list_head *i, struct us_conntrack *u, int type)
 
542
static void ftfw_xmit(void)
474
543
{
475
 
        int ret;
476
 
        struct nethdr *net = BUILD_NETMSG(u->ct, type);
477
 
        size_t len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
478
 
 
479
 
        dp("tx_list sq: %u fl:%u len:%u\n",
480
 
                ntohl(net->seq), ntohs(net->flags),
481
 
                ntohs(net->len));
482
 
 
483
 
        list_del_init(i);
484
 
        tx_list_len--;
485
 
 
486
 
        ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
487
 
        ftfw_send(net, u);
488
 
 
489
 
        return ret;
 
544
        queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit);
 
545
        add_alarm(&alive_alarm, ALIVE_INT, 0);
 
546
        dp("tx_queue_len:%u rs_queue_len:%u\n", 
 
547
                queue_len(tx_queue), queue_len(rs_queue));
490
548
}
491
549
 
492
 
static void ftfw_run(void)
 
550
static void ftfw_enqueue(struct cache_object *obj, int type)
493
551
{
494
 
        struct cache_ftfw *cn, *tmp;
495
 
 
496
 
        /* send messages in the tx_queue */
497
 
        queue_iterate(tx_queue, NULL, tx_queue_xmit);
498
 
 
499
 
        /* send conntracks in the tx_list */
500
 
        list_for_each_entry_safe(cn, tmp, &tx_list, tx_list) {
501
 
                struct us_conntrack *u;
502
 
 
503
 
                u = cache_get_conntrack(STATE_SYNC(internal), cn);
504
 
                if (alarm_pending(&u->alarm))
505
 
                        tx_list_xmit(&cn->tx_list, u, NFCT_Q_DESTROY);
506
 
                else
507
 
                        tx_list_xmit(&cn->tx_list, u, NFCT_Q_UPDATE);
 
552
        struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj);
 
553
        if (queue_in(rs_queue, &cn->qnode)) {
 
554
                queue_del(&cn->qnode);
 
555
                queue_add(STATE_SYNC(tx_queue), &cn->qnode);
 
556
        } else {
 
557
                if (queue_add(STATE_SYNC(tx_queue), &cn->qnode))
 
558
                        cache_object_get(obj);
508
559
        }
509
 
 
510
 
        /* reset alive alarm */
511
 
        add_alarm(&alive_alarm, 1, 0);
512
 
 
513
 
        dprint("tx_list_len:%u tx_queue_len:%u "
514
 
               "rs_list_len: %u rs_queue_len:%u\n",
515
 
                tx_list_len, queue_len(tx_queue),
516
 
                rs_list_len, queue_len(rs_queue));
517
560
}
518
561
 
519
562
struct sync_mode sync_ftfw = {
520
 
        .internal_cache_flags   = LIFETIME,
521
 
        .external_cache_flags   = LIFETIME,
 
563
        .internal_cache_flags   = NO_FEATURES,
 
564
        .external_cache_flags   = NO_FEATURES,
522
565
        .internal_cache_extra   = &cache_ftfw_extra,
523
566
        .init                   = ftfw_init,
524
567
        .kill                   = ftfw_kill,
525
568
        .local                  = ftfw_local,
526
569
        .recv                   = ftfw_recv,
527
 
        .send                   = ftfw_send,
528
 
        .run                    = ftfw_run,
 
570
        .enqueue                = ftfw_enqueue,
 
571
        .xmit                   = ftfw_xmit,
529
572
};