44
static LIST_HEAD(rs_list);
45
static LIST_HEAD(tx_list);
46
static unsigned int rs_list_len;
47
static unsigned int tx_list_len;
48
static struct queue *rs_queue;
49
static struct queue *tx_queue;
37
struct queue *rs_queue;
50
38
static uint32_t exp_seq;
51
39
static uint32_t window;
52
40
static uint32_t ack_from;
53
41
static int ack_from_set = 0;
54
42
static struct alarm_block alive_alarm;
55
static int hello_state = SAY_HELLO;
49
static int hello_state = HELLO_INIT;
50
static int say_hello_back;
57
52
/* XXX: alive message expiration configurable */
58
53
#define ALIVE_INT 1
60
55
struct cache_ftfw {
61
struct list_head rs_list;
62
struct list_head tx_list;
56
struct queue_node qnode;
66
static void cache_ftfw_add(struct us_conntrack *u, void *data)
60
static void cache_ftfw_add(struct cache_object *obj, void *data)
68
62
struct cache_ftfw *cn = data;
69
63
/* These nodes are not inserted in the list */
70
INIT_LIST_HEAD(&cn->rs_list);
71
INIT_LIST_HEAD(&cn->tx_list);
64
queue_node_init(&cn->qnode, Q_ELEM_OBJ);
74
static void cache_ftfw_del(struct us_conntrack *u, void *data)
67
static void cache_ftfw_del(struct cache_object *obj, void *data)
76
69
struct cache_ftfw *cn = data;
78
/* this node is already out of the list */
79
if (list_empty(&cn->rs_list))
82
/* no need for list_del_init since the entry is destroyed */
83
list_del(&cn->rs_list);
70
queue_del(&cn->qnode);
87
73
static struct cache_extra cache_ftfw_extra = {
90
76
.destroy = cache_ftfw_del
79
static void nethdr_set_hello(struct nethdr *net)
83
hello_state = HELLO_SAY;
86
net->flags |= NET_F_HELLO;
90
net->flags |= NET_F_HELLO_BACK;
93
95
static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)
95
struct nethdr_ack ack = {
101
switch(hello_state) {
103
ack.flags |= NET_F_HELLO;
106
ack.flags |= NET_F_HELLO_BACK;
107
hello_state = HELLO_DONE;
111
queue_add(tx_queue, &ack, NETHDR_ACK_SIZ);
112
write_evfd(STATE_SYNC(evfd));
115
static void ftfw_run(void);
97
struct queue_object *qobj;
98
struct nethdr_ack *ack;
100
qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack));
104
ack = (struct nethdr_ack *)qobj->data;
105
ack->type = NET_T_CTL;
110
queue_add(STATE_SYNC(tx_queue), &qobj->qnode);
113
static void tx_queue_add_ctlmsg2(uint32_t flags)
115
struct queue_object *qobj;
118
qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack));
122
ctl = (struct nethdr *)qobj->data;
123
ctl->type = NET_T_CTL;
126
queue_add(STATE_SYNC(tx_queue), &qobj->qnode);
117
129
/* this function is called from the alarm framework */
118
130
static void do_alive_alarm(struct alarm_block *a, void *data)
120
if (ack_from_set && mcast_track_is_seq_set()) {
132
if (ack_from_set && nethdr_track_is_seq_set()) {
121
133
/* exp_seq contains the last update received */
122
dprint("send ALIVE ACK (from=%u, to=%u)\n",
123
ack_from, STATE_SYNC(last_seq_recv));
124
134
tx_queue_add_ctlmsg(NET_F_ACK,
126
136
STATE_SYNC(last_seq_recv));
127
137
ack_from_set = 0;
129
tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0);
131
/* TODO: no need for buffered send, extracted from run_sync() */
133
mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
139
static int rs_dump(void *data1, const void *data2)
141
struct nethdr_ack *net = data1;
143
dprint("in RS queue -> seq:%u flags:%u\n", net->seq, net->flags);
150
static void my_dump(int foo)
152
struct cache_ftfw *cn, *tmp;
154
list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
155
struct us_conntrack *u;
157
u = cache_get_conntrack(STATE_SYNC(internal), cn);
158
dprint("in RS list -> seq:%u\n", cn->seq);
161
queue_iterate(rs_queue, NULL, rs_dump);
139
tx_queue_add_ctlmsg2(NET_F_ALIVE);
141
add_alarm(&alive_alarm, ALIVE_INT, 0);
166
144
static int ftfw_init(void)
168
tx_queue = queue_create(CONFIG(resend_queue_size));
169
if (tx_queue == NULL) {
170
dlog(LOG_ERR, "cannot create tx queue");
174
rs_queue = queue_create(CONFIG(resend_queue_size));
146
rs_queue = queue_create(CONFIG(resend_queue_size), 0);
175
147
if (rs_queue == NULL) {
176
148
dlog(LOG_ERR, "cannot create rs queue");
183
155
/* set ack window size */
184
156
window = CONFIG(window_size);
187
signal(SIGUSR1, my_dump);
193
161
static void ftfw_kill(void)
195
163
queue_destroy(rs_queue);
196
queue_destroy(tx_queue);
199
166
static int do_cache_to_tx(void *data1, void *data2)
201
struct us_conntrack *u = data2;
202
struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), u);
205
list_add_tail(&cn->tx_list, &tx_list);
207
write_evfd(STATE_SYNC(evfd));
168
struct cache_object *obj = data2;
169
struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj);
171
if (queue_in(rs_queue, &cn->qnode)) {
172
queue_del(&cn->qnode);
173
queue_add(STATE_SYNC(tx_queue), &cn->qnode);
175
if (queue_add(STATE_SYNC(tx_queue), &cn->qnode))
176
cache_object_get(obj);
181
static int rs_queue_dump(struct queue_node *n, const void *data2)
183
const int *fd = data2;
189
struct nethdr *net = queue_node_data(n);
190
size = sprintf(buf, "control -> seq:%u flags:%u\n",
191
net->seq, net->flags);
195
struct cache_ftfw *cn = (struct cache_ftfw *) n;
196
size = sprintf(buf, "object -> seq:%u\n", cn->seq);
202
send(*fd, buf, size, 0);
206
static void ftfw_local_queue(int fd)
211
size = sprintf(buf, "resent queue (len=%u)\n", queue_len(rs_queue));
212
send(fd, buf, size, 0);
213
queue_iterate(rs_queue, &fd, rs_queue_dump);
212
216
static int ftfw_local(int fd, int type, void *data)
233
static int rs_queue_to_tx(void *data1, const void *data2)
240
static int rs_queue_to_tx(struct queue_node *n, const void *data)
235
struct nethdr_ack *net = data1;
236
const struct nethdr_ack *nack = data2;
238
if (between(net->seq, nack->from, nack->to)) {
242
const struct nethdr_ack *nack = data;
246
struct nethdr_ack *net = queue_node_data(n);
248
if (before(net->seq, nack->from))
249
return 0; /* continue */
250
else if (after(net->seq, nack->to))
251
return 1; /* break */
239
253
dp("rs_queue_to_tx sq: %u fl:%u len:%u\n",
240
254
net->seq, net->flags, net->len);
241
queue_add(tx_queue, net, net->len);
242
write_evfd(STATE_SYNC(evfd));
243
queue_del(rs_queue, net);
257
queue_add(STATE_SYNC(tx_queue), n);
261
struct cache_ftfw *cn;
263
cn = (struct cache_ftfw *) n;
264
if (before(cn->seq, nack->from))
266
else if (after(cn->seq, nack->to))
269
dp("resending nack'ed (oldseq=%u)\n", cn->seq);
272
queue_add(STATE_SYNC(tx_queue), n);
248
static int rs_queue_empty(void *data1, const void *data2)
279
static int rs_queue_empty(struct queue_node *n, const void *data)
250
struct nethdr *net = data1;
251
const struct nethdr_ack *h = data2;
253
if (between(net->seq, h->from, h->to)) {
281
const struct nethdr_ack *h = data;
285
struct nethdr_ack *net = queue_node_data(n);
289
queue_object_free((struct queue_object *)n);
292
if (before(net->seq, h->from))
293
return 0; /* continue */
294
else if (after(net->seq, h->to))
295
return 1; /* break */
254
297
dp("remove from queue (seq=%u)\n", net->seq);
255
queue_del(rs_queue, data1);
299
queue_object_free((struct queue_object *)n);
303
struct cache_ftfw *cn;
304
struct cache_object *obj;
306
cn = (struct cache_ftfw *) n;
309
obj = cache_data_get_object(STATE_SYNC(internal), cn);
310
cache_object_put(obj);
313
if (before(cn->seq, h->from))
315
else if (after(cn->seq, h->to))
318
dp("queue: deleting from queue (seq=%u)\n", cn->seq);
320
obj = cache_data_get_object(STATE_SYNC(internal), cn);
321
cache_object_put(obj);
260
static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to)
262
struct cache_ftfw *cn, *tmp;
264
list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
265
struct us_conntrack *u;
267
u = cache_get_conntrack(STATE_SYNC(internal), cn);
268
if (between(cn->seq, from, to)) {
269
dp("resending nack'ed (oldseq=%u)\n", cn->seq);
270
list_del_init(&cn->rs_list);
272
list_add_tail(&cn->tx_list, &tx_list);
274
write_evfd(STATE_SYNC(evfd));
279
static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to)
281
struct cache_ftfw *cn, *tmp;
283
list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
284
struct us_conntrack *u;
286
u = cache_get_conntrack(STATE_SYNC(internal), cn);
287
if (between(cn->seq, from, to)) {
288
dp("queue: deleting from queue (seq=%u)\n", cn->seq);
289
list_del_init(&cn->rs_list);
295
328
static int digest_msg(const struct nethdr *net)
297
330
if (IS_DATA(net))
410
448
if ((ret == MSG_DATA || ret == MSG_CTL))
411
mcast_track_update_seq(net->seq);
449
nethdr_track_update_seq(net->seq);
416
static void ftfw_send(struct nethdr *net, struct us_conntrack *u)
418
struct netpld *pld = NETHDR_DATA(net);
419
struct cache_ftfw *cn;
421
switch(ntohs(pld->query)) {
425
cn = (struct cache_ftfw *)
426
cache_get_extra(STATE_SYNC(internal), u);
428
if (!list_empty(&cn->rs_list)) {
429
list_del_init(&cn->rs_list);
433
switch(hello_state) {
435
net->flags = ntohs(net->flags) | NET_F_HELLO;
436
net->flags = htons(net->flags);
439
net->flags = ntohs(net->flags) | NET_F_HELLO_BACK;
440
net->flags = htons(net->flags);
441
hello_state = HELLO_DONE;
454
static void rs_queue_purge_full(void)
456
struct queue_node *n;
458
n = queue_del_head(rs_queue);
461
struct queue_object *qobj = (struct queue_object *)n;
462
queue_object_free(qobj);
466
struct cache_ftfw *cn;
467
struct cache_object *obj;
469
cn = (struct cache_ftfw *)n;
470
obj = cache_data_get_object(STATE_SYNC(internal), cn);
471
cache_object_put(obj);
477
static int tx_queue_xmit(struct queue_node *n, const void *data)
483
struct nethdr *net = queue_node_data(n);
485
nethdr_set_hello(net);
487
if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
492
HDR_HOST2NETWORK(net);
494
dp("tx_queue sq: %u fl:%u len:%u\n",
495
ntohl(net->seq), net->flags, ntohs(net->len));
497
multichannel_send(STATE_SYNC(channel), net);
498
HDR_NETWORK2HOST(net);
500
if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
501
if (queue_add(rs_queue, n) < 0) {
502
if (errno == ENOSPC) {
503
rs_queue_purge_full();
504
queue_add(rs_queue, n);
508
queue_object_free((struct queue_object *)n);
512
struct cache_ftfw *cn;
513
struct cache_object *obj;
517
cn = (struct cache_ftfw *)n;
518
obj = cache_data_get_object(STATE_SYNC(internal), cn);
519
type = object_status_to_network_type(obj->status);
520
net = BUILD_NETMSG(obj->ct, type);
521
nethdr_set_hello(net);
523
dp("tx_list sq: %u fl:%u len:%u\n",
524
ntohl(net->seq), net->flags, ntohs(net->len));
526
multichannel_send(STATE_SYNC(channel), net);
445
527
cn->seq = ntohl(net->seq);
446
list_add_tail(&cn->rs_list, &rs_list);
528
if (queue_add(rs_queue, &cn->qnode) < 0) {
529
if (errno == ENOSPC) {
530
rs_queue_purge_full();
531
queue_add(rs_queue, &cn->qnode);
534
/* we release the object once we get the acknowlegment */
452
static int tx_queue_xmit(void *data1, const void *data2)
454
struct nethdr *net = data1;
455
size_t len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
457
dp("tx_queue sq: %u fl:%u len:%u\n",
458
ntohl(net->seq), ntohs(net->flags), ntohs(net->len));
460
mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
461
HDR_NETWORK2HOST(net);
463
if (IS_DATA(net) || IS_ACK(net) || IS_NACK(net)) {
464
dprint("tx_queue -> to_rs_queue sq: %u fl:%u len:%u\n",
465
net->seq, net->flags, net->len);
466
queue_add(rs_queue, net, net->len);
468
queue_del(tx_queue, net);
473
static int tx_list_xmit(struct list_head *i, struct us_conntrack *u, int type)
542
static void ftfw_xmit(void)
476
struct nethdr *net = BUILD_NETMSG(u->ct, type);
477
size_t len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
479
dp("tx_list sq: %u fl:%u len:%u\n",
480
ntohl(net->seq), ntohs(net->flags),
486
ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
544
queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit);
545
add_alarm(&alive_alarm, ALIVE_INT, 0);
546
dp("tx_queue_len:%u rs_queue_len:%u\n",
547
queue_len(tx_queue), queue_len(rs_queue));
492
static void ftfw_run(void)
550
static void ftfw_enqueue(struct cache_object *obj, int type)
494
struct cache_ftfw *cn, *tmp;
496
/* send messages in the tx_queue */
497
queue_iterate(tx_queue, NULL, tx_queue_xmit);
499
/* send conntracks in the tx_list */
500
list_for_each_entry_safe(cn, tmp, &tx_list, tx_list) {
501
struct us_conntrack *u;
503
u = cache_get_conntrack(STATE_SYNC(internal), cn);
504
if (alarm_pending(&u->alarm))
505
tx_list_xmit(&cn->tx_list, u, NFCT_Q_DESTROY);
507
tx_list_xmit(&cn->tx_list, u, NFCT_Q_UPDATE);
552
struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj);
553
if (queue_in(rs_queue, &cn->qnode)) {
554
queue_del(&cn->qnode);
555
queue_add(STATE_SYNC(tx_queue), &cn->qnode);
557
if (queue_add(STATE_SYNC(tx_queue), &cn->qnode))
558
cache_object_get(obj);
510
/* reset alive alarm */
511
add_alarm(&alive_alarm, 1, 0);
513
dprint("tx_list_len:%u tx_queue_len:%u "
514
"rs_list_len: %u rs_queue_len:%u\n",
515
tx_list_len, queue_len(tx_queue),
516
rs_list_len, queue_len(rs_queue));
519
562
struct sync_mode sync_ftfw = {
520
.internal_cache_flags = LIFETIME,
521
.external_cache_flags = LIFETIME,
563
.internal_cache_flags = NO_FEATURES,
564
.external_cache_flags = NO_FEATURES,
522
565
.internal_cache_extra = &cache_ftfw_extra,
523
566
.init = ftfw_init,
524
567
.kill = ftfw_kill,
525
568
.local = ftfw_local,
526
569
.recv = ftfw_recv,
570
.enqueue = ftfw_enqueue,