2
* $Id: tx_deflate.c,v 1.15 2004/05/02 08:55:00 hipnod Exp $
4
* Copyright (C) 2004 giFT project (gift.sourceforge.net)
6
* This program is free software; you can redistribute it and/or modify it
7
* under the terms of the GNU General Public License as published by the
8
* Free Software Foundation; either version 2, or (at your option) any
11
* This program is distributed in the hope that it will be useful, but
12
* WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14
* General Public License for more details.
17
#include "gt_gnutella.h"
18
#include "gt_packet.h" /* packet manipulation macros */
20
#include "io/tx_stack.h"
21
#include "io/tx_layer.h"
22
#include "io/io_buf.h"
26
/*****************************************************************************/
28
#define DEFLATE_DEBUG 0
31
#define DEFLATE_TRACEFN(tx) \
32
GT->DBGSOCK (GT, tx->stack->c, "entered")
34
#define DEFLATE_DUMP(tx_deflate) \
38
float percent = ((float)tx_deflate->nbytes_in - \
39
tx_deflate->nbytes_out - tx_deflate->nbytes_unflushed) / \
40
(float)tx_deflate->nbytes_in; \
42
GT->DBGSOCK (GT, tx->stack->c, "in %lu out %lu flushed %lu unflushed %lu (flushing %d) " \
43
"ratio %.2f%% avg %.2f", \
44
(long)tx_deflate->nbytes_in, (long)tx_deflate->nbytes_out, \
45
(long)tx_deflate->nbytes_flushed, \
46
(long)tx_deflate->nbytes_unflushed, \
47
(long)tx_deflate->flushing, percent * 100.0, \
48
(double)tx_deflate->nbytes_out / \
49
difftime (time (NULL), tx->stack->start_time)); \
52
#else /* !DEFLATE_DEBUG */
53
#define DEFLATE_TRACEFN(tx)
54
#define DEFLATE_DUMP(tx_deflate)
55
#endif /* DEFLATE_DEBUG */
57
/*****************************************************************************/
59
#define TX_DEFLATE_BUFSIZE (1024 - 1) /* -1 for auto-nullification */
61
#define FLUSH_AFTER (4096) /* flush after this many bytes */
63
#define NAGLE_TIMEOUT (200 * MSEC) /* 200 milliseconds */
65
/*****************************************************************************/
72
/* compressed buffer */
75
/* Nagle timer that sends stored data after NAGLE_TIMEOUT milliseconds */
78
size_t nbytes_in; /* total uncompressed bytes */
79
size_t nbytes_out; /* total compressed bytes */
80
size_t nbytes_flushed; /* total bytes written to lower layer */
81
size_t nbytes_unflushed; /* bytes currently waiting in z_stream */
84
* Whether the zstream is currently being flushed, and so whether deflate
85
* must receive a Z_SYNC_FLUSH parameter to continue flushing. The flush
86
* ends when deflate returns with avail_out > 0.
91
* When doing a flush, it's possible that there will be a partially
92
* filled buffer leftover. If there's no new data that comes in, the data
93
* will be delayed again until more data comes from the upper layer. This
94
* flag is set when this happens, so we know that we should flush the
95
* buffer to the lower layer as soon as possible, even if it isn't
101
/*****************************************************************************/
103
static void start_nagle_timer (struct tx_layer *tx, struct tx_deflate *deflate);
104
static void stop_nagle_timer (struct tx_layer *tx, struct tx_deflate *deflate);
106
/*****************************************************************************/
108
static void tx_deflate_enable (struct tx_layer *tx)
113
static void tx_deflate_disable (struct tx_layer *tx)
118
/*****************************************************************************/
120
static void tx_deflate_toggle (struct tx_layer *tx, BOOL stop)
122
/* nothing, we do not consume packets, only pass along */
125
/*****************************************************************************/
127
static BOOL alloc_buffer (struct tx_deflate *tx_deflate)
132
if (!(tx_deflate->buf = io_buf_new (TX_DEFLATE_BUFSIZE)))
138
static void finish_flush (struct tx_deflate *tx_deflate)
140
tx_deflate->nbytes_unflushed = 0;
141
tx_deflate->flushing = FALSE;
144
static tx_status_t flush_buffer (struct tx_layer *tx,
145
struct tx_deflate *tx_deflate)
152
n = io_buf_read_avail (tx_deflate->buf);
155
* The buffer filled up. Try to send again until the lower
156
* layer is saturated.
158
ret = gt_tx_layer_queue (tx, tx_deflate->buf);
159
assert (ret != TX_EMPTY);
161
if (ret == TX_ERROR || ret == TX_FULL)
164
tx_deflate->nbytes_flushed += n;
165
assert (ret == TX_OK);
167
stop_nagle_timer (tx, tx_deflate);
169
tx_deflate->buf = NULL;
170
tx_deflate->delayed = FALSE;
176
* Try to flush the data inside the z_stream and send it to the layer beneath
179
static tx_status_t flush_stream (struct tx_layer *tx,
180
struct tx_deflate *tx_deflate)
182
z_stream *z = &tx_deflate->z;
185
size_t wlen, old_avail;
189
if (!alloc_buffer (tx_deflate))
192
old_avail = io_buf_write_avail (tx_deflate->buf);
195
z->next_in = NULL; /* don't disrupt anything else */
196
z->next_out = io_buf_write_ptr (tx_deflate->buf);
197
z->avail_out = old_avail;
199
zret = deflate (z, Z_SYNC_FLUSH);
202
* If this is true we've already flushed all possible data.
204
if (zret == Z_BUF_ERROR)
206
tx_deflate->flushing = FALSE;
208
/* send the stored data */
209
if (io_buf_read_avail (tx_deflate->buf) > 0)
210
return flush_buffer (tx, tx_deflate);
218
wlen = old_avail - z->avail_out;
220
io_buf_push (tx_deflate->buf, wlen);
221
tx_deflate->nbytes_out += wlen;
223
tx_deflate->flushing = TRUE;
225
/* if there is space, the flush completed successfully */
226
if (z->avail_out > 0)
227
finish_flush (tx_deflate);
229
if ((ret = flush_buffer (tx, tx_deflate) != TX_OK))
232
/* stop when the flush completes */
233
if (!tx_deflate->flushing)
236
/* tail recurse until the flush completes */
237
return flush_stream (tx, tx_deflate);
240
static BOOL deflate_nagle_timeout (struct tx_layer *tx)
242
struct tx_deflate *tx_deflate = tx->udata;
247
/* this assertion means we have to disarm the timer when sending the
249
assert (tx_deflate->buf != NULL);
251
ret = flush_stream (tx, tx_deflate);
253
/* no matter what, we disable the Nagle timer after this */
254
stop_nagle_timer (tx, tx_deflate);
258
gt_tx_stack_abort (tx->stack);
263
GT->DBGSOCK (GT, tx->stack->c, "buffer delayed?: %d", tx_deflate->delayed);
268
static void start_nagle_timer (struct tx_layer *tx,
269
struct tx_deflate *tx_deflate)
272
GT->DBGSOCK (GT, tx->stack->c, "nagle timer=%d", tx_deflate->nagle_timer);
274
if (tx_deflate->nagle_timer != 0)
277
tx_deflate->nagle_timer = timer_add (NAGLE_TIMEOUT,
278
(TimerCallback)deflate_nagle_timeout,
282
static void stop_nagle_timer (struct tx_layer *tx,
283
struct tx_deflate *tx_deflate)
286
GT->DBGSOCK (GT, tx->stack->c, "nagle timer=%d", tx_deflate->nagle_timer);
288
timer_remove_zero (&tx_deflate->nagle_timer);
291
/*****************************************************************************/
294
* The upper layer has sent us a buffer to process.
296
static tx_status_t tx_deflate_queue (struct tx_layer *tx, struct io_buf *msg)
298
struct tx_deflate *tx_deflate = tx->udata;
299
z_stream *z = &tx_deflate->z;
300
BOOL flush_completed = FALSE;
306
* Deflate the incoming message, adding it to the buffer.
308
* If our buffer is currently full, return TX_FULL.
311
if (!alloc_buffer (tx_deflate))
317
z->next_in = io_buf_read_ptr (msg);
318
z->avail_in = io_buf_read_avail (msg);
319
z->next_out = io_buf_write_ptr (tx_deflate->buf);
320
z->avail_out = io_buf_write_avail (tx_deflate->buf);
322
if (z->avail_out == 0)
325
while (io_buf_read_avail (msg) > 0 && z->avail_out > 0)
329
assert (z->next_in == io_buf_read_ptr (msg));
330
assert (z->next_out == io_buf_write_ptr (tx_deflate->buf));
332
/* begin flushing after a certain amount */
333
if (tx_deflate->nbytes_unflushed >= FLUSH_AFTER)
334
tx_deflate->flushing = TRUE;
336
ret = deflate (z, tx_deflate->flushing ? Z_SYNC_FLUSH : 0);
340
GT->DBGFN (GT, "deflate: error %d", ret);
345
rlen = io_buf_read_avail (msg) - z->avail_in;
346
wlen = io_buf_write_avail (tx_deflate->buf) - z->avail_out;
347
assert (rlen > 0 || wlen > 0); /* hmm, is this true when flushing? */
352
tx_deflate->nbytes_in += rlen;
353
tx_deflate->nbytes_unflushed += rlen;
354
tx_deflate->nbytes_out += wlen;
356
DEFLATE_DUMP(tx_deflate);
358
/* update the buffer lengths */
359
io_buf_push (tx_deflate->buf, wlen);
360
io_buf_pop (msg, rlen);
362
if (z->avail_out == 0)
366
* If we have available output space and no more input space,
367
* we know the flush completed, so unset flush mode.
369
* NOTE: there might be a bug here. The flush may fit exactly
370
* everytime, causing us to never leave flush mode. I think zlib may
371
* try to prevent this itself, though.
373
if (tx_deflate->flushing && z->avail_in == 0)
375
flush_completed = TRUE;
376
finish_flush (tx_deflate);
381
* If we completed a flush, and the buffer isn't full, set the delayed
382
* flag so that service_deflate() will write the buffer immediately to
383
* reduce latency, as it has already endured a Nagle timeout period.
385
if (flush_completed &&
386
io_buf_read_avail (tx_deflate->buf) < TX_DEFLATE_BUFSIZE)
390
GT->DBGSOCK (GT, tx->stack->c, "setting ->delayed flag on buf(%d)",
391
io_buf_read_avail (tx_deflate->buf));
394
tx_deflate->delayed = TRUE;
398
* If the message buffer was only partially emptied, don't free
399
* it and let tx_layer.c know to handle it specially.
401
if (io_buf_read_avail (msg) > 0)
409
/*****************************************************************************/
412
* Get more data to write.
414
static tx_status_t get_buffers (struct tx_layer *tx,
415
struct tx_deflate *tx_deflate)
417
if (tx_deflate->buf && io_buf_write_avail (tx_deflate->buf) == 0)
420
return gt_tx_layer_ready (tx);
424
* This is the most complicated part of the whole stack:
426
* [1] Call upper layer's ready routine to grab a buffer (gt_tx_layer_ready).
428
* [2] That function will call tx_deflate_queue, which compresses the data to
429
* a buffer, as many times as it can while there's more data to process.
431
* [3] If we didn't fill the buffer, or there was no data, return TX_EMPTY
432
* telling the lower layer there is no data.
434
* [4] If there's no data in the upper layer, but we're in flush mode, call
435
* flush_stream() to send whatever data is stored inside the z_stream,
438
* [5] If we filled the buffer, or if we have a paritally filled buffer that
439
* was delayed in deflate_nagle_timeout(), send it to the lower layer with
440
* flush_buffer(). If the lower layer returns TX_FULL, stop and return
441
* TX_OK. Otherwise, continue by calling this function recursively.
443
* NOTE: The buffer is filled in tx_deflate_queue but sent in this
444
* function (or from the Nagle timer if the buffer isn't full).
446
* The caller of this function has to setup a Nagle timer if any data was
447
* written and TX_FULL was not encountered.
449
static tx_status_t service_deflate (struct tx_layer *tx,
450
struct tx_deflate *tx_deflate)
457
ret = get_buffers (tx, tx_deflate);
465
assert (ret == TX_EMPTY);
467
/* [4]: continue flush even if no data avail */
468
if (tx_deflate->flushing)
469
ret = flush_stream (tx, tx_deflate);
474
assert (tx_deflate->buf != NULL);
478
if (tx_deflate->delayed)
480
GT->DBGSOCK (GT, tx->stack->c, "flushing delayed buf(%d)",
481
io_buf_read_avail (tx_deflate->buf));
485
assert (ret == TX_OK);
490
* flush_buffer will stop the Nagle timer if the buffer was
493
* We must also flush the buffer if it contains partial data from a
494
* previous flush that was delayed in the Nagle timer due to having no
497
if (tx_deflate->delayed || io_buf_write_avail (tx_deflate->buf) == 0)
498
ret = flush_buffer (tx, tx_deflate);
503
/* tail recurse until the lower layer is saturated */
504
return service_deflate (tx, tx_deflate);
508
* The lower layer is ready to write.
510
static tx_status_t tx_deflate_ready (struct tx_layer *tx)
512
struct tx_deflate *tx_deflate = tx->udata;
516
/* keep track of how much was previously flushed */
517
old_flushed = tx_deflate->nbytes_flushed;
519
ret = service_deflate (tx, tx_deflate);
521
if (ret == TX_ERROR || ret == TX_FULL)
525
/* flush buffer shouldve deactivated the Nagle timer */
526
assert (tx_deflate->nagle_timer == 0);
528
/* we wrote something -- let caller know it's ok */
535
assert (ret == TX_OK || ret == TX_EMPTY);
538
* If the lower layer was not saturated (evidenced by _not_ returning
539
* TX_FULL), and there is a partially completed buffer, the Nagle
540
* timer must be armed. This ensures the data waiting in this layer will
541
* go out in a timely manner. If the lower layer was saturated, we don't
542
* need to arm the timer because there is no buffer space to flush to
543
* anyway, and when the lower layer unsaturates it will reinvoke this
544
* layer to write more data.
546
* TODO: Still need to flush if there is some urgent data waiting. So,
547
* should add a ->flush callback.
549
* XXX: Using tx_deflate->buf != NULL as a hacky way to recognize that
550
* some data was written to the z_stream.
552
if (tx_deflate->buf != NULL)
553
start_nagle_timer (tx, tx_deflate);
557
GT->DBGSOCK (GT, tx->stack->c, "buf waiting=[%d] ret=%s",
558
tx_deflate->buf ? io_buf_read_avail (tx_deflate->buf) : 0,
559
ret == TX_EMPTY ? "TX_EMPTY" : "TX_OK");
562
DEFLATE_DUMP(tx_deflate);
565
* For the return value from this function, decipher whether
566
* service_deflate() wrote some data.
568
* If nothing was written, then we should stop sending now, by returning
569
* TX_EMPTY. That will remove the input in tx_link.c that's calling this
570
* layer, which kind of sucks, because this could be the case a lot of the
571
* time when the whole buffer hasn't been filled up, leading to a removing
572
* and adding the input a lot.
574
* Otherwise, return TX_OK if something was sent to the lower layer.
576
if (old_flushed == tx_deflate->nbytes_flushed)
582
/*****************************************************************************/
584
static BOOL tx_deflate_init (struct tx_layer *tx)
586
struct tx_deflate *tx_deflate;
588
if (!(tx_deflate = malloc (sizeof(*tx_deflate))))
591
/* zlib documents these variables as needing initialization before
593
tx_deflate->z.zalloc = Z_NULL;
594
tx_deflate->z.zfree = Z_NULL;
595
tx_deflate->z.opaque = Z_NULL;
597
if (deflateInit (&tx_deflate->z, Z_DEFAULT_COMPRESSION) != Z_OK)
603
tx_deflate->buf = NULL;
604
tx_deflate->nagle_timer = 0;
605
tx_deflate->nbytes_in = 0;
606
tx_deflate->nbytes_out = 0;
607
tx_deflate->nbytes_flushed = 0;
608
tx_deflate->nbytes_unflushed = 0;
609
tx_deflate->flushing = FALSE;
610
tx_deflate->delayed = FALSE;
612
tx->udata = tx_deflate;
616
static void tx_deflate_destroy (struct tx_layer *tx)
618
struct tx_deflate *tx_deflate = tx->udata;
620
io_buf_free (tx_deflate->buf);
621
timer_remove (tx_deflate->nagle_timer);
623
deflateEnd (&tx_deflate->z);
627
/*****************************************************************************/
629
struct tx_layer_ops gt_tx_deflate_ops =