2
This file is part of GNUnet
3
(C) 2004 Christian Grothoff (and other contributing authors)
5
GNUnet is free software; you can redistribute it and/or modify
6
it under the terms of the GNU General Public License as published
7
by the Free Software Foundation; either version 2, or (at your
8
option) any later version.
10
GNUnet is distributed in the hope that it will be useful, but
11
WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13
General Public License for more details.
15
You should have received a copy of the GNU General Public License
16
along with GNUnet; see the file COPYING. If not, write to the
17
Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18
Boston, MA 02111-1307, USA.
21
* @file fragmentation/fragmentation.c
22
* @brief fragmentation and defragmentation, this code allows
23
* sending and receiving messages that are larger than
24
* the MTU of the transport. Messages are still limited
25
* to a maximum size of 65535 bytes, which is a good
26
* idea because otherwise we may need ungainly fragmentation
27
* buffers. Each connected peer can have at most one
28
* fragmented packet at any given point in time (prevents
29
* DoS attacks). Fragmented messages that have not been
30
* completed after a certain amount of time are discarded.
31
* @author Christian Grothoff
35
#include "gnunet_protocols.h"
36
#include "gnunet_util.h"
37
#include "gnunet_core.h"
38
#include "gnunet_stats_service.h"
39
#include "gnunet_fragmentation_service.h"
45
P2P_MESSAGE_HEADER header;
62
} P2P_fragmentation_MESSAGE;
65
* How many buckets does the fragment hash table
68
#define DEFRAG_BUCKET_COUNT 16
71
* After how long do fragments time out?
73
#ifndef DEFRAGMENTATION_TIMEOUT
74
#define DEFRAGMENTATION_TIMEOUT (3 * cronMINUTES)
78
* Entry in the linked list of fragments.
82
P2P_fragmentation_MESSAGE * frag;
86
* Entry in the hash table of fragments.
96
#define FRAGSIZE(fl) ((ntohs(fl->frag->header.size)-sizeof(P2P_fragmentation_MESSAGE)))
98
static CoreAPIForApplication * coreAPI;
100
static Stats_ServiceAPI * stats;
102
static int stat_defragmented;
104
static int stat_fragmented;
106
static int stat_discarded;
109
* Hashtable *with* collision management!
111
static FC * defragmentationCache[DEFRAG_BUCKET_COUNT];
114
* Lock for the defragmentation cache.
116
static Mutex defragCacheLock;
118
static void freeFL(FL * fl,
121
FL * link = fl->link;
123
stats->change(stat_discarded, c);
131
* This cron job ensures that we purge buffers of fragments
132
* that have timed out. It can run in much longer intervals
133
* than the defragmentationCron, e.g. every 60s.
135
* This method goes through the hashtable, finds entries that
136
* have timed out and removes them (and all the fragments that
137
* belong to the entry). It's a bit more complicated as the
138
* collision list is also collapsed.
140
static void defragmentationPurgeCron() {
146
MUTEX_LOCK(&defragCacheLock);
147
for (i=0;i<DEFRAG_BUCKET_COUNT;i++) {
149
smf = defragmentationCache[i];
150
while (smf != NULL) {
151
if (smf->ttl < cronTime(NULL)) {
152
/* free linked list of fragments */
153
freeFL(smf->head, 1);
157
defragmentationCache[i] = next;
165
} /* while smf != NULL */
166
} /* for all buckets */
167
MUTEX_UNLOCK(&defragCacheLock);
171
* Check if this fragment-list is complete. If yes, put it together,
172
* process and free all buffers. Does not free the pep
173
* itself (but sets the TTL to 0 to have the cron free it
174
* in the next iteration).
176
* @param pep the entry in the hash table
178
static void checkComplete(FC * pep) {
184
GNUNET_ASSERT(pep != NULL);
189
len = ntohs(pos->frag->len);
191
goto CLEANUP; /* really bad error! */
193
while ( (pos != NULL) &&
194
(ntohs(pos->frag->off) <= off) ) {
195
if (off >= off + FRAGSIZE(pos))
196
goto CLEANUP; /* error! */
197
if (ntohs(pos->frag->off) + FRAGSIZE(pos) > off)
198
off = ntohs(pos->frag->off) + FRAGSIZE(pos);
200
goto CLEANUP; /* error! */
204
return; /* some fragment is still missing */
208
while (pos != NULL) {
209
memcpy(&msg[ntohs(pos->frag->off)],
216
stats->change(stat_defragmented, 1);
217
/* handle message! */
218
coreAPI->injectMessage(&pep->sender,
225
/* free fragment buffers */
226
freeFL(pep->head, 0);
232
* See if the new fragment is a part of this entry and join them if
233
* yes. Return SYSERR if the fragments do not match. Return OK if
234
* the fragments do match and the fragment has been processed. The
235
* defragCacheLock is already aquired by the caller whenever this
236
* method is called.<p>
238
* @param entry the entry in the cache
239
* @param pep the new entry
240
* @param packet the ip part in the new entry
242
static int tryJoin(FC * entry,
243
const PeerIdentity * sender,
244
const P2P_fragmentation_MESSAGE * packet) {
245
/* frame before ours; may end in the middle of
246
our frame or before it starts; NULL if we are
247
the earliest position we have received so far */
249
/* frame after ours; may start in the middle of
250
our frame or after it; NULL if we are the last
251
fragment we have received so far */
253
/* current position in the frame-list */
255
/* the new entry that we're inserting */
260
GNUNET_ASSERT(entry != NULL);
261
if (! hostIdentityEquals(sender,
263
return SYSERR; /* wrong fragment list, try another! */
264
if (ntohl(packet->id) != entry->id)
265
return SYSERR; /* wrong fragment list, try another! */
268
if ( (pos != NULL) &&
269
(packet->len != pos->frag->len) )
270
return SYSERR; /* wrong fragment size */
273
/* find the before-frame */
274
while ( (pos != NULL) &&
275
(ntohs(pos->frag->off) <
276
ntohs(packet->off)) ) {
281
/* find the after-frame */
282
end = ntohs(packet->off) + ntohs(packet->header.size) - sizeof(P2P_fragmentation_MESSAGE);
283
if (end <= ntohs(packet->off)) {
285
"Received invalid fragment at %s:%d\n",
287
return SYSERR; /* yuck! integer overflow! */
294
while ( (after != NULL) &&
295
(ntohs(after->frag->off)<end) )
298
if ( (before != NULL) &&
299
(before == after) ) {
300
/* this implies after or before != NULL and thereby the new
301
fragment is redundant as it is fully enclosed in an earlier
304
stats->change(stat_defragmented, 1);
305
return OK; /* drop, there is a packet that spans our range! */
308
if ( (before != NULL) &&
310
( (htons(before->frag->off) +
312
>= htons(after->frag->off)) ) {
313
/* this implies that the fragment that starts before us and the
314
fragment that comes after this one leave no space in the middle
315
or even overlap; thus we can drop this redundant piece */
317
stats->change(stat_defragmented, 1);
322
pep = MALLOC(sizeof(FC));
323
pep->frag = MALLOC(ntohs(packet->header.size));
326
ntohs(packet->header.size));
329
if (before == NULL) {
332
while (pos != after) {
340
/* end of insert first */
344
/* insert last: find the end, free everything after it */
345
freeFL(before->link, 1);
350
/* ok, we are filling the middle between two fragments; insert. If
351
there is anything else in the middle, it can be dropped as we're
352
bigger & cover that area as well */
353
/* free everything between before and after */
355
while (pos != after) {
365
entry->ttl = cronTime(NULL) + DEFRAGMENTATION_TIMEOUT;
366
checkComplete(entry);
371
* Defragment the given fragment and pass to handler once
372
* defragmentation is complete.
374
* @param frag the packet to defragment
375
* @return SYSERR if the fragment is invalid
377
static int processFragment(const PeerIdentity * sender,
378
const P2P_MESSAGE_HEADER * frag) {
382
if (ntohs(frag->size) < sizeof(P2P_fragmentation_MESSAGE))
385
MUTEX_LOCK(&defragCacheLock);
386
hash = sender->hashPubKey.bits[0] % DEFRAG_BUCKET_COUNT;
387
smf = defragmentationCache[hash];
388
while (smf != NULL) {
389
if (OK == tryJoin(smf,
391
(P2P_fragmentation_MESSAGE*) frag)) {
392
MUTEX_UNLOCK(&defragCacheLock);
395
if (hostIdentityEquals(sender,
397
freeFL(smf->head, 1);
403
smf = MALLOC(sizeof(FC));
404
smf->next = defragmentationCache[hash];
405
defragmentationCache[hash] = smf;
406
smf->ttl = cronTime(NULL) + DEFRAGMENTATION_TIMEOUT;
407
smf->sender = *sender;
409
smf->id = ntohl(((P2P_fragmentation_MESSAGE*)frag)->id);
410
smf->head = MALLOC(sizeof(FL));
411
smf->head->link = NULL;
412
smf->head->frag = MALLOC(ntohs(frag->size));
413
memcpy(smf->head->frag,
417
MUTEX_UNLOCK(&defragCacheLock);
423
/* maximums size of each fragment */
425
/** how long is this message part expected to be? */
427
/** when did we intend to transmit? */
428
cron_t transmissionTime;
432
* Send a message that had to be fragmented (right now!). First grabs
433
* the first part of the message (obtained from ctx->se) and stores
434
* that in a P2P_fragmentation_MESSAGE envelope. The remaining fragments are
435
* added to the send queue with EXTREME_PRIORITY (to ensure that they
436
* will be transmitted next). The logic here is that if the priority
437
* for the first fragment was sufficiently high, the priority should
438
* also have been sufficiently high for all of the other fragments (at
439
* this time) since they have the same priority. And we want to make
440
* sure that we send all of them since just sending the first fragment
441
* and then going to other messages of equal priority would not be
442
* such a great idea (i.e. would just waste bandwidth).
444
static int fragmentBMC(void * buf,
446
unsigned short len) {
447
static int idGen = 0;
448
P2P_fragmentation_MESSAGE * frag;
453
if ( (len < ctx->mtu) ||
459
stats->change(stat_fragmented, 1);
460
id = (idGen++) + randomi(512);
461
/* write first fragment to buf */
462
frag = (P2P_fragmentation_MESSAGE*) buf;
463
frag->header.size = htons(len);
464
frag->header.type = htons(P2P_PROTO_fragment);
466
frag->off = htons(0);
467
frag->len = htons(ctx->len);
470
len - sizeof(P2P_fragmentation_MESSAGE));
472
/* create remaining fragments, add to queue! */
473
pos = len - sizeof(P2P_fragmentation_MESSAGE);
474
frag = MALLOC(ctx->mtu);
475
while (pos < ctx->len) {
476
mlen = sizeof(P2P_fragmentation_MESSAGE) + ctx->len - pos;
479
GNUNET_ASSERT(mlen > sizeof(P2P_fragmentation_MESSAGE));
480
frag->header.size = htons(mlen);
481
frag->header.type = htons(P2P_PROTO_fragment);
483
frag->off = htons(pos);
484
frag->len = htons(ctx->len);
486
&((char*)(&ctx[1]))[pos],
487
mlen - sizeof(P2P_fragmentation_MESSAGE));
488
coreAPI->unicast(&ctx->sender,
491
ctx->transmissionTime - cronTime(NULL));
492
pos += mlen - sizeof(P2P_fragmentation_MESSAGE);
494
GNUNET_ASSERT(pos == ctx->len);
501
* The given message must be fragmented. Produce a placeholder that
502
* corresponds to the first fragment. Once that fragment is scheduled
503
* for transmission, the placeholder should automatically add all of
504
* the other fragments (with very high priority).
506
void fragment(const PeerIdentity * peer,
509
unsigned int targetTime,
511
BuildMessageCallback bmc,
516
GNUNET_ASSERT(len > mtu);
517
GNUNET_ASSERT(mtu > sizeof(P2P_fragmentation_MESSAGE));
518
fbmc = MALLOC(sizeof(FragmentBMC) + len);
520
fbmc->sender = *peer;
521
fbmc->transmissionTime = targetTime;
529
if (SYSERR == bmc(&fbmc[1],
536
xlen = mtu - sizeof(P2P_fragmentation_MESSAGE);
537
coreAPI->unicastCallback(peer,
538
(BuildMessageCallback) &fragmentBMC,
541
prio * xlen / len, /* compute new prio */
546
* Initialize Fragmentation module.
548
Fragmentation_ServiceAPI *
549
provide_module_fragmentation(CoreAPIForApplication * capi) {
550
static Fragmentation_ServiceAPI ret;
554
stats = coreAPI->requestService("stats");
556
stat_defragmented = stats->create(gettext_noop("# messages defragmented"));
557
stat_fragmented = stats->create(gettext_noop("# messages fragmented"));
558
stat_discarded = stats->create(gettext_noop("# fragments discarded"));
560
for (i=0;i<DEFRAG_BUCKET_COUNT;i++)
561
defragmentationCache[i] = NULL;
562
MUTEX_CREATE(&defragCacheLock);
563
addCronJob((CronJob) &defragmentationPurgeCron,
568
_("`%s' registering handler %d\n"),
571
capi->registerHandler(P2P_PROTO_fragment,
574
ret.fragment = &fragment;
579
* Shutdown fragmentation.
581
void release_module_fragmentation() {
584
coreAPI->unregisterHandler(P2P_PROTO_fragment,
586
delCronJob((CronJob) &defragmentationPurgeCron,
589
for (i=0;i<DEFRAG_BUCKET_COUNT;i++) {
590
FC * pos = defragmentationCache[i];
591
while (pos != NULL) {
592
FC * next = pos->next;
593
freeFL(pos->head, 1);
599
coreAPI->releaseService(stats);
602
MUTEX_DESTROY(&defragCacheLock);
606
/* end of fragmentation.c */