~ubuntu-branches/ubuntu/raring/gift-gnutella/raring

« back to all changes in this revision

Viewing changes to src/io/tx_packet.c

  • Committer: Bazaar Package Importer
  • Author(s): Göran Weinholt
  • Date: 2005-07-31 13:56:53 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20050731135653-3i7bcwnrbe7wfd1i
Tags: 0.0.10.1-1
* New upstream version.
  - Fixes FTBFS with gcc-4.0 (closes: #286732).
* Updated debian/patches/update-gwebcaches.patch.
* Updated debian/patches/remove-too-old-check.patch.
* debian/control:
  - Change the encoding of my name to UTF-8.
  - Updated to Standards-Version: 3.6.2 (no changes).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * $Id: tx_packet.c,v 1.10 2005/01/04 14:59:23 mkern Exp $
 
3
 *
 
4
 * Copyright (C) 2004 giFT project (gift.sourceforge.net)
 
5
 *
 
6
 * This program is free software; you can redistribute it and/or modify it
 
7
 * under the terms of the GNU General Public License as published by the
 
8
 * Free Software Foundation; either version 2, or (at your option) any
 
9
 * later version.
 
10
 *
 
11
 * This program is distributed in the hope that it will be useful, but
 
12
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
14
 * General Public License for more details.
 
15
 */
 
16
 
 
17
#include "gt_gnutella.h"
 
18
#include "gt_packet.h"       /* packet manipulation macros */
 
19
 
 
20
#include "io/tx_stack.h"
 
21
#include "io/tx_layer.h"
 
22
#include "io/io_buf.h"
 
23
 
 
24
/*****************************************************************************/
 
25
 
 
26
#define TX_PACKET_DEBUG    0
 
27
 
 
28
/*****************************************************************************/
 
29
 
 
30
/*
 
31
 * Relative packet priority ratios.  These control how many packets of a
 
32
 * certain type are sent prior to looking for other types.  For each type we
 
33
 * maintain an independent FIFO queue.  Each time a packet can be sent, each
 
34
 * queue is checked.
 
35
 *
 
36
 * Packets in a queue will be sent while the ratio is greater than zero, and
 
37
 * there are no higher priority packets waiting.  Once there are no queues
 
38
 * with both waiting packets and a non-zero ratio, the queue priority ratios
 
39
 * are reset so that more packets can be sent.  This process continues until
 
40
 * the lower layer becomes saturated.
 
41
 *
 
42
 * Note that it is bad idea to reset the priority ratios only when all the
 
43
 * ratios are zero, because this could lead to starvation for some packet
 
44
 * types.
 
45
 *
 
46
 * Pushes have the highest priorty of normal messages.  Also, there is a
 
47
 * special 'urgent' queue that has higher priority, that includes replies to
 
48
 * keepalive pings and other important high-priority messages.
 
49
 */
 
50
#define URGENT_RATIO  INT_MAX
 
51
#define PUSH_RATIO    5
 
52
#define QHIT_RATIO    4
 
53
#define QUERY_RATIO   3
 
54
#define PONG_RATIO    2
 
55
#define PING_RATIO    1
 
56
#define MISC_RATIO    1
 
57
 
 
58
#define NR_QUEUES    (7)
 
59
 
 
60
/*****************************************************************************/
 
61
 
 
62
struct packet_queue
 
63
{
 
64
        gt_packet_type_t msg_type;
 
65
        size_t           ratio;        /* how many packets left on this turn? */
 
66
        size_t           bytes_queued; /* total bytes queued */
 
67
        List            *queue;
 
68
};
 
69
 
 
70
struct tx_packet
 
71
{
 
72
        struct packet_queue queues[NR_QUEUES];
 
73
        int                 total_pkts;         /* used to quickly test if empty */
 
74
};
 
75
 
 
76
/*****************************************************************************/
 
77
/* DEBUGGING/TRACING */
 
78
 
 
79
#if TX_PACKET_DEBUG
 
80
/* ripped from gt_packet.c */
 
81
static const char *packet_command_str (uint8_t cmd)
 
82
{
 
83
        static char buf[16];
 
84
 
 
85
        switch (cmd)
 
86
        {
 
87
         case GT_MSG_PING:        return "PING";
 
88
         case GT_MSG_PING_REPLY:  return "PONG";
 
89
         case GT_MSG_BYE:         return "BYE";
 
90
         case GT_MSG_QUERY_ROUTE: return "QROUTE";
 
91
         case GT_MSG_VENDOR:      return "VMSG";
 
92
         case GT_MSG_VENDOR_STD:  return "VMSG-S";
 
93
         case GT_MSG_PUSH:        return "PUSH";
 
94
         case GT_MSG_QUERY:       return "QUERY";
 
95
         case GT_MSG_QUERY_REPLY: return "HITS";
 
96
 
 
97
         default:
 
98
                snprintf (buf, sizeof (buf), "[<%02hx>]", cmd);
 
99
                return buf;
 
100
        }
 
101
}
 
102
 
 
103
static void dump_packet (struct io_buf *buf, String *str)
 
104
{
 
105
        uint8_t cmd = get_command (buf->data);
 
106
        string_appendf (str, "%s,", packet_command_str (cmd));
 
107
}
 
108
 
 
109
static void trace_queue_list (List *queue, String *str)
 
110
{
 
111
        list_foreach (queue, (ListForeachFunc)dump_packet, str);
 
112
}
 
113
#endif /* TX_PACKET_DEBUG */
 
114
 
 
115
static void trace_queue (struct tx_layer *tx, const char *id)
 
116
{
 
117
#if TX_PACKET_DEBUG
 
118
        struct tx_packet *tx_packet = tx->udata;
 
119
        int i;
 
120
        String *s;
 
121
        TCPC *c;
 
122
 
 
123
        if (!(s = string_new (NULL, 0, 0, TRUE)))
 
124
                return;
 
125
 
 
126
        c = tx->stack->c;
 
127
        string_appendf (s, "{%s totalpkts=%d ", id, tx_packet->total_pkts);
 
128
 
 
129
        for (i = 0; i < NR_QUEUES; i++)
 
130
                trace_queue_list (tx_packet->queues[i].queue, s);
 
131
 
 
132
        string_append (s, "}");
 
133
 
 
134
        GT->DBGSOCK (GT, c, "%s", s->str);
 
135
        string_free (s);
 
136
#endif
 
137
}
 
138
 
 
139
/*****************************************************************************/
 
140
 
 
141
/* return the queue on which this message should go */
 
142
static size_t get_queue (struct io_buf *msg)
 
143
{
 
144
        uint8_t cmd;
 
145
 
 
146
        cmd = get_command (msg->data);
 
147
 
 
148
        switch (cmd)
 
149
        {
 
150
         default:
 
151
                abort ();
 
152
 
 
153
         case GT_MSG_VENDOR:
 
154
         case GT_MSG_VENDOR_STD:
 
155
         case GT_MSG_QUERY_ROUTE:
 
156
                return 6;
 
157
 
 
158
         case GT_MSG_PING:
 
159
                {
 
160
                        /* queue keep-alive pings in the urgent queue */
 
161
                        if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
 
162
                                return 0;
 
163
                }
 
164
                return 5;
 
165
 
 
166
         case GT_MSG_PING_REPLY:
 
167
                {
 
168
                        /*
 
169
                         * Queue replies to keep-alive ping in the urgent queue.
 
170
                         *
 
171
                         * This allows the remote end to starve it's own connection
 
172
                         * with a series of keep-alive pings.  Only flow-control
 
173
                         * can handle this.
 
174
                         */
 
175
                        if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
 
176
                                return 0;
 
177
                }
 
178
                return 4;
 
179
 
 
180
         case GT_MSG_QUERY:
 
181
                {
 
182
                        /* make queries from this node more important */
 
183
                        if (get_ttl (msg->data) == 1 && get_hops (msg->data) == 0)
 
184
                                return 1;
 
185
                }
 
186
                return 3;
 
187
 
 
188
         case GT_MSG_QUERY_REPLY:
 
189
                return 2;
 
190
 
 
191
         case GT_MSG_PUSH:
 
192
                return 1;
 
193
 
 
194
         case GT_MSG_BYE:
 
195
                return 0;
 
196
        }
 
197
 
 
198
        abort ();
 
199
}
 
200
 
 
201
static void enqueue_packet (struct packet_queue *pkt_queue, struct io_buf *msg)
 
202
{
 
203
        pkt_queue->queue = list_append (pkt_queue->queue, msg);
 
204
}
 
205
 
 
206
/*
 
207
 * Called from upper layer when it wants to send us a message buffer.
 
208
 */
 
209
static tx_status_t tx_packet_queue (struct tx_layer *tx, struct io_buf *io_buf)
 
210
{
 
211
        struct tx_packet *tx_packet = tx->udata;
 
212
        size_t queue_nr;
 
213
 
 
214
        queue_nr = get_queue (io_buf);
 
215
 
 
216
        assert (queue_nr < NR_QUEUES);
 
217
        enqueue_packet (&tx_packet->queues[queue_nr], io_buf);
 
218
 
 
219
        tx_packet->total_pkts++;
 
220
        assert (tx_packet->total_pkts > 0);
 
221
 
 
222
        trace_queue (tx, "*0*");
 
223
 
 
224
        return TX_OK;
 
225
}
 
226
 
 
227
/*****************************************************************************/
 
228
 
 
229
static void set_queue (struct packet_queue *queue, gt_packet_type_t msg_type,
 
230
                       size_t prio)
 
231
{
 
232
        queue->msg_type = msg_type;
 
233
        queue->ratio    = prio;
 
234
}
 
235
 
 
236
static void reset_ratios (struct packet_queue *queue, size_t len)
 
237
{
 
238
        set_queue (&queue[0], 0xff,               URGENT_RATIO);
 
239
        set_queue (&queue[1], GT_MSG_PUSH,        PUSH_RATIO);
 
240
        set_queue (&queue[2], GT_MSG_QUERY_REPLY, QHIT_RATIO);
 
241
        set_queue (&queue[3], GT_MSG_QUERY,       QUERY_RATIO);
 
242
        set_queue (&queue[4], GT_MSG_PING_REPLY,  PONG_RATIO);
 
243
        set_queue (&queue[5], GT_MSG_PING,        PING_RATIO);
 
244
        set_queue (&queue[6], 0xff,               MISC_RATIO);
 
245
}
 
246
 
 
247
/*****************************************************************************/
 
248
 
 
249
/*
 
250
 * Try to send a single message buffer from the packet queue to the lower
 
251
 * layer.  If the lower layer has become saturated, return FALSE.
 
252
 *
 
253
 * The lower layer takes responsibility for the messages sent to it in
 
254
 * entirety in gt_tx_layer_queue() unless it is full.  In that case it
 
255
 * returns TX_FULL.
 
256
 */
 
257
static tx_status_t shift_queue (struct tx_layer *tx,
 
258
                                struct tx_packet *tx_packet,
 
259
                                struct packet_queue *pkt_queue)
 
260
{
 
261
        List          *msg_l;
 
262
        struct io_buf *msg;
 
263
        tx_status_t    ret;
 
264
 
 
265
        msg_l = list_nth (pkt_queue->queue, 0);
 
266
        msg   = msg_l->data;
 
267
 
 
268
        ret = gt_tx_layer_queue (tx, msg);
 
269
 
 
270
        if (ret != TX_OK)
 
271
        {
 
272
                assert (ret != TX_EMPTY); /* impossible to be empty */
 
273
                return ret;
 
274
        }
 
275
 
 
276
        /* shift this packet off the queue */
 
277
        pkt_queue->queue = list_remove_link (pkt_queue->queue, msg_l);
 
278
 
 
279
        tx_packet->total_pkts--;
 
280
        assert (tx_packet->total_pkts >= 0);
 
281
 
 
282
        if (TX_PACKET_DEBUG)
 
283
                trace_queue (tx, "*2*");
 
284
 
 
285
        return ret;
 
286
}
 
287
 
 
288
static tx_status_t service_queues (struct tx_layer *layer,
 
289
                                   struct tx_packet *tx_packet)
 
290
{
 
291
        int         i;
 
292
        tx_status_t ret;
 
293
 
 
294
        for (i = 0; i < NR_QUEUES; i++)
 
295
        {
 
296
                struct packet_queue *pkt_queue  = &tx_packet->queues[i];
 
297
 
 
298
                /* skip if ratio is small */
 
299
                while (pkt_queue->ratio > 0 && pkt_queue->queue != NULL)
 
300
                {
 
301
                        ret = shift_queue (layer, tx_packet, pkt_queue);
 
302
                        pkt_queue->ratio--;
 
303
 
 
304
                        if (ret == TX_FULL)
 
305
                                return TX_OK;
 
306
 
 
307
                        if (ret != TX_OK)
 
308
                                return ret;
 
309
                }
 
310
        }
 
311
 
 
312
        /* reset the ratios to write more data */
 
313
        reset_ratios (tx_packet->queues, NR_QUEUES);
 
314
 
 
315
        /* we wrote something, so return ok */
 
316
        if (tx_packet->total_pkts == 0)
 
317
                return TX_OK;
 
318
 
 
319
        /* tail recurse until lower layer is saturated */
 
320
        return service_queues (layer, tx_packet);
 
321
}
 
322
 
 
323
/*
 
324
 * Gets called when the lower layer is writable.
 
325
 */
 
326
static tx_status_t tx_packet_ready (struct tx_layer *tx)
 
327
{
 
328
        struct tx_packet *tx_packet = tx->udata;
 
329
 
 
330
        if (tx_packet->total_pkts == 0)
 
331
                return TX_EMPTY;
 
332
 
 
333
        if (TX_PACKET_DEBUG)
 
334
                trace_queue (tx, "*1*");
 
335
 
 
336
        return service_queues (tx, tx_packet);
 
337
}
 
338
 
 
339
/*****************************************************************************/
 
340
 
 
341
static BOOL tx_packet_init (struct tx_layer *tx)
 
342
{
 
343
        struct tx_packet *tx_packet;
 
344
        int               i;
 
345
 
 
346
        if (!(tx_packet = malloc (sizeof (struct tx_packet))))
 
347
                return FALSE;
 
348
 
 
349
        tx_packet->total_pkts = 0;
 
350
 
 
351
        for (i = 0; i < NR_QUEUES; i++)
 
352
        {
 
353
                tx_packet->queues[i].queue        = NULL;
 
354
                tx_packet->queues[i].bytes_queued = 0;
 
355
        }
 
356
 
 
357
        reset_ratios (tx_packet->queues, NR_QUEUES);
 
358
 
 
359
        tx->udata = tx_packet;
 
360
 
 
361
        return TRUE;
 
362
}
 
363
 
 
364
static BOOL free_io_buf (struct io_buf *io_buf, void *udata)
 
365
{
 
366
        io_buf_free (io_buf);
 
367
        return TRUE;
 
368
}
 
369
 
 
370
static void flush_packets (struct packet_queue *pkt_queue)
 
371
{
 
372
        list_foreach_remove (pkt_queue->queue, (ListForeachFunc)free_io_buf, NULL);
 
373
        pkt_queue = NULL;
 
374
}
 
375
 
 
376
static void tx_packet_destroy (struct tx_layer *tx)
 
377
{
 
378
        struct tx_packet *tx_packet = tx->udata;
 
379
        int i;
 
380
 
 
381
        for (i = 0; i < NR_QUEUES; i++)
 
382
                flush_packets (&tx_packet->queues[i]);
 
383
 
 
384
        FREE (tx_packet);
 
385
}
 
386
 
 
387
static void tx_packet_consume (struct tx_layer *tx, BOOL stop)
 
388
{
 
389
        /* nothing */
 
390
}
 
391
 
 
392
/*****************************************************************************/
 
393
 
 
394
static void tx_packet_enable (struct tx_layer *tx)
 
395
{
 
396
        /* TODO */
 
397
}
 
398
 
 
399
static void tx_packet_disable (struct tx_layer *tx)
 
400
{
 
401
        /* TODO */
 
402
}
 
403
 
 
404
/*****************************************************************************/
 
405
 
 
406
struct tx_layer_ops gt_tx_packet_ops =
 
407
{
 
408
        tx_packet_init,
 
409
        tx_packet_destroy,
 
410
        tx_packet_consume,
 
411
        tx_packet_queue,
 
412
        tx_packet_ready,
 
413
        tx_packet_enable,
 
414
        tx_packet_disable,
 
415
};