2
* $Id: tx_packet.c,v 1.10 2005/01/04 14:59:23 mkern Exp $
4
* Copyright (C) 2004 giFT project (gift.sourceforge.net)
6
* This program is free software; you can redistribute it and/or modify it
7
* under the terms of the GNU General Public License as published by the
8
* Free Software Foundation; either version 2, or (at your option) any
11
* This program is distributed in the hope that it will be useful, but
12
* WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14
* General Public License for more details.
17
#include "gt_gnutella.h"
18
#include "gt_packet.h" /* packet manipulation macros */
20
#include "io/tx_stack.h"
21
#include "io/tx_layer.h"
22
#include "io/io_buf.h"
24
/*****************************************************************************/
26
#define TX_PACKET_DEBUG 0
28
/*****************************************************************************/
31
* Relative packet priority ratios. These control how many packets of a
32
* certain type are sent prior to looking for other types. For each type we
33
* maintain an independent FIFO queue. Each time a packet can be sent, each
36
* Packets in a queue will be sent while the ratio is greater than zero, and
37
* there are no higher priority packets waiting. Once there are no queues
38
* with both waiting packets and a non-zero ratio, the queue priority ratios
39
* are reset so that more packets can be sent. This process continues until
40
* the lower layer becomes saturated.
42
* Note that it is bad idea to reset the priority ratios only when all the
43
* ratios are zero, because this could lead to starvation for some packet
46
* Pushes have the highest priorty of normal messages. Also, there is a
47
* special 'urgent' queue that has higher priority, that includes replies to
48
* keepalive pings and other important high-priority messages.
50
#define URGENT_RATIO INT_MAX
60
/*****************************************************************************/
64
gt_packet_type_t msg_type;
65
size_t ratio; /* how many packets left on this turn? */
66
size_t bytes_queued; /* total bytes queued */
72
struct packet_queue queues[NR_QUEUES];
73
int total_pkts; /* used to quickly test if empty */
76
/*****************************************************************************/
77
/* DEBUGGING/TRACING */
80
/* ripped from gt_packet.c */
81
static const char *packet_command_str (uint8_t cmd)
87
case GT_MSG_PING: return "PING";
88
case GT_MSG_PING_REPLY: return "PONG";
89
case GT_MSG_BYE: return "BYE";
90
case GT_MSG_QUERY_ROUTE: return "QROUTE";
91
case GT_MSG_VENDOR: return "VMSG";
92
case GT_MSG_VENDOR_STD: return "VMSG-S";
93
case GT_MSG_PUSH: return "PUSH";
94
case GT_MSG_QUERY: return "QUERY";
95
case GT_MSG_QUERY_REPLY: return "HITS";
98
snprintf (buf, sizeof (buf), "[<%02hx>]", cmd);
103
static void dump_packet (struct io_buf *buf, String *str)
105
uint8_t cmd = get_command (buf->data);
106
string_appendf (str, "%s,", packet_command_str (cmd));
109
static void trace_queue_list (List *queue, String *str)
111
list_foreach (queue, (ListForeachFunc)dump_packet, str);
113
#endif /* TX_PACKET_DEBUG */
115
static void trace_queue (struct tx_layer *tx, const char *id)
118
struct tx_packet *tx_packet = tx->udata;
123
if (!(s = string_new (NULL, 0, 0, TRUE)))
127
string_appendf (s, "{%s totalpkts=%d ", id, tx_packet->total_pkts);
129
for (i = 0; i < NR_QUEUES; i++)
130
trace_queue_list (tx_packet->queues[i].queue, s);
132
string_append (s, "}");
134
GT->DBGSOCK (GT, c, "%s", s->str);
139
/*****************************************************************************/
141
/* return the queue on which this message should go */
142
static size_t get_queue (struct io_buf *msg)
146
cmd = get_command (msg->data);
154
case GT_MSG_VENDOR_STD:
155
case GT_MSG_QUERY_ROUTE:
160
/* queue keep-alive pings in the urgent queue */
161
if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
166
case GT_MSG_PING_REPLY:
169
* Queue replies to keep-alive ping in the urgent queue.
171
* This allows the remote end to starve it's own connection
172
* with a series of keep-alive pings. Only flow-control
175
if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
182
/* make queries from this node more important */
183
if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
188
case GT_MSG_QUERY_REPLY:
201
static void enqueue_packet (struct packet_queue *pkt_queue, struct io_buf *msg)
203
pkt_queue->queue = list_append (pkt_queue->queue, msg);
207
* Called from upper layer when it wants to send us a message buffer.
209
static tx_status_t tx_packet_queue (struct tx_layer *tx, struct io_buf *io_buf)
211
struct tx_packet *tx_packet = tx->udata;
214
queue_nr = get_queue (io_buf);
216
assert (queue_nr < NR_QUEUES);
217
enqueue_packet (&tx_packet->queues[queue_nr], io_buf);
219
tx_packet->total_pkts++;
220
assert (tx_packet->total_pkts > 0);
222
trace_queue (tx, "*0*");
227
/*****************************************************************************/
229
static void set_queue (struct packet_queue *queue, gt_packet_type_t msg_type,
232
queue->msg_type = msg_type;
236
static void reset_ratios (struct packet_queue *queue, size_t len)
238
set_queue (&queue[0], 0xff, URGENT_RATIO);
239
set_queue (&queue[1], GT_MSG_PUSH, PUSH_RATIO);
240
set_queue (&queue[2], GT_MSG_QUERY_REPLY, QHIT_RATIO);
241
set_queue (&queue[3], GT_MSG_QUERY, QUERY_RATIO);
242
set_queue (&queue[4], GT_MSG_PING_REPLY, PONG_RATIO);
243
set_queue (&queue[5], GT_MSG_PING, PING_RATIO);
244
set_queue (&queue[6], 0xff, MISC_RATIO);
247
/*****************************************************************************/
250
* Try to send a single message buffer from the packet queue to the lower
251
* layer. If the lower layer has become saturated, return FALSE.
253
* The lower layer takes responsibility for the messages sent to it in
254
* entirety in gt_tx_layer_queue() unless it is full. In that case it
257
static tx_status_t shift_queue (struct tx_layer *tx,
258
struct tx_packet *tx_packet,
259
struct packet_queue *pkt_queue)
265
msg_l = list_nth (pkt_queue->queue, 0);
268
ret = gt_tx_layer_queue (tx, msg);
272
assert (ret != TX_EMPTY); /* impossible to be empty */
276
/* shift this packet off the queue */
277
pkt_queue->queue = list_remove_link (pkt_queue->queue, msg_l);
279
tx_packet->total_pkts--;
280
assert (tx_packet->total_pkts >= 0);
283
trace_queue (tx, "*2*");
288
static tx_status_t service_queues (struct tx_layer *layer,
289
struct tx_packet *tx_packet)
294
for (i = 0; i < NR_QUEUES; i++)
296
struct packet_queue *pkt_queue = &tx_packet->queues[i];
298
/* skip if ratio is small */
299
while (pkt_queue->ratio > 0 && pkt_queue->queue != NULL)
301
ret = shift_queue (layer, tx_packet, pkt_queue);
312
/* reset the ratios to write more data */
313
reset_ratios (tx_packet->queues, NR_QUEUES);
315
/* we wrote something, so return ok */
316
if (tx_packet->total_pkts == 0)
319
/* tail recurse until lower layer is saturated */
320
return service_queues (layer, tx_packet);
324
* Gets called when the lower layer is writable.
326
static tx_status_t tx_packet_ready (struct tx_layer *tx)
328
struct tx_packet *tx_packet = tx->udata;
330
if (tx_packet->total_pkts == 0)
334
trace_queue (tx, "*1*");
336
return service_queues (tx, tx_packet);
339
/*****************************************************************************/
341
static BOOL tx_packet_init (struct tx_layer *tx)
343
struct tx_packet *tx_packet;
346
if (!(tx_packet = malloc (sizeof (struct tx_packet))))
349
tx_packet->total_pkts = 0;
351
for (i = 0; i < NR_QUEUES; i++)
353
tx_packet->queues[i].queue = NULL;
354
tx_packet->queues[i].bytes_queued = 0;
357
reset_ratios (tx_packet->queues, NR_QUEUES);
359
tx->udata = tx_packet;
364
static BOOL free_io_buf (struct io_buf *io_buf, void *udata)
366
io_buf_free (io_buf);
370
static void flush_packets (struct packet_queue *pkt_queue)
372
list_foreach_remove (pkt_queue->queue, (ListForeachFunc)free_io_buf, NULL);
376
static void tx_packet_destroy (struct tx_layer *tx)
378
struct tx_packet *tx_packet = tx->udata;
381
for (i = 0; i < NR_QUEUES; i++)
382
flush_packets (&tx_packet->queues[i]);
387
static void tx_packet_consume (struct tx_layer *tx, BOOL stop)
392
/*****************************************************************************/
394
static void tx_packet_enable (struct tx_layer *tx)
399
static void tx_packet_disable (struct tx_layer *tx)
404
/*****************************************************************************/
406
struct tx_layer_ops gt_tx_packet_ops =