2
tcpclo.c: BP TCP-based convergence-layer output
3
daemon. Note that this convergence-layer
4
output daemon is a "dedicated" CLO daemon
5
suitable only for a limited number of paths,
6
because it manages just a single TCP
9
Promiscuous CLO daemons need to be based on
12
Modification : This has been made compliant
13
to draft-irtf-dtnrg-tcp-clayer-02.
15
Author: Scott Burleigh, JPL
17
Copyright (c) 2004, California Institute of Technology.
18
ALL RIGHTS RESERVED. U.S. Government Sponsorship
24
static sm_SemId tcpcloSemaphore;
26
static void shutDownClo() /* Commands CLO termination. */
28
sm_SemEnd(tcpcloSemaphore);
31
/* * * Keepalive thread functions * * */
36
pthread_mutex_t *mutex;
37
struct sockaddr *socketName;
40
} KeepaliveThreadParms;
42
static void *sendKeepalives(void *parm)
44
KeepaliveThreadParms *parms = (KeepaliveThreadParms *) parm;
45
int keepaliveTimer = 0;
47
int backoffTimer = BACKOFF_TIMER_START;
48
int backoffTimerCount = 0;
49
unsigned char *buffer;
51
buffer = MTAKE(TCPCLA_BUFSZ); //To send keepalive bundle
54
putErrmsg("No memory for TCP buffer in tcpclo.", NULL);
59
while (*(parms->cloRunning))
63
if (keepaliveTimer < *(parms->keepalivePeriod))
68
// If the negotiated keep alive interval is 0, then
69
// keep alives will not be sent.
70
if(*(parms->keepalivePeriod) == 0)
75
/* Time to send a keepalive. Note that the
76
* interval between keepalive attempts will be
77
* KEEPALIVE_PERIOD plus (if the remote induct
78
* is not reachable) the length of time taken
79
* by TCP to determine that the connection
80
* attempt will not succeed (e.g., 3 seconds). */
83
pthread_mutex_lock(parms->mutex);
84
bytesSent = sendBundleByTCPCL(parms->socketName,
85
parms->ductSocket, 0, 0, buffer, parms->keepalivePeriod);
86
pthread_mutex_unlock(parms->mutex);
87
/* if the node is unable to establish a TCP connection,
88
* the connection should be tried only after some delay.
92
while((backoffTimerCount < backoffTimer) && (*(parms->ductSocket) < 0))
96
if(!(*(parms->cloRunning)))
101
backoffTimerCount = 0;
102
/* keepaliveTimer keeps track of when the keepalive needs
103
* to be sent. This value is set to keepalive period.
104
* That way at the end of backoff period a
107
keepaliveTimer = *(parms->keepalivePeriod);
109
if(backoffTimer < BACKOFF_TIMER_LIMIT)
115
backoffTimer = BACKOFF_TIMER_START;
126
/* * * Receiver thread functions * * */
130
pthread_mutex_t *mutex;
133
} ReceiveThreadParms;
136
static void *receiveBundles(void *parm)
138
/* Main loop for bundle reception thread */
140
ReceiveThreadParms *parms = (ReceiveThreadParms *) parm;
141
int threadRunning = 1;
145
buffer = MTAKE(TCPCLA_BUFSZ);
148
putErrmsg("tcpclo receiver can't get TCP buffer", NULL);
152
work = bpGetAcqArea(parms->vduct);
155
putErrmsg("tcpclo receiver can't get acquisition work area",
162
while (threadRunning && *(parms->cloRunning))
165
if(*(parms->bundleSocket) < 0)
171
if (bpBeginAcq(work, 0, NULL) < 0)
173
putErrmsg("Can't begin acquisition of bundle.", NULL);
178
switch (receiveBundleByTcpCL(*(parms->bundleSocket), work,
182
putErrmsg("Can't acquire bundle.", NULL);
183
/* Intentional fall-through to next case. */
185
case 0: /* Shutdown message */
186
/* Go back to the start of the while loop */
187
pthread_mutex_lock(parms->mutex);
188
closesocket(*(parms->bundleSocket));
189
*(parms->bundleSocket) = -1;
190
pthread_mutex_unlock(parms->mutex);
195
break; /* Out of switch. */
198
if (bpEndAcq(work) < 0)
200
putErrmsg("Can't end acquisition of bundle.", NULL);
204
/* Make sure other tasks have a chance to run. */
209
/* End of receiver thread; release resources. */
211
bpReleaseAcqArea(work);
216
/* * * Main thread functions * * * */
218
#if defined (VXWORKS) || defined (RTEMS)
219
int tcpclo(int a1, int a2, int a3, int a4, int a5,
220
int a6, int a7, int a8, int a9, int a10)
222
char *ductName = (char *) a1;
224
int main(int argc, char *argv[])
226
char *ductName = (argc > 1 ? argv[1] : NULL);
228
unsigned char *buffer;
237
unsigned short portNbr;
238
unsigned int hostNbr;
239
struct sockaddr socketName;
240
struct sockaddr_in *inetName;
242
pthread_mutex_t mutex;
243
KeepaliveThreadParms parms;
244
ReceiveThreadParms rparms;
245
pthread_t keepaliveThread;
246
pthread_t receiverThread;
248
BpExtendedCOS extendedCOS;
249
char destDuctName[MAX_CL_DUCT_NAME_LEN + 1];
250
unsigned int bundleLength;
253
int keepalivePeriod = 0;
256
if (ductName == NULL)
258
PUTS("Usage: tcpclo <remote host name>[:<port number>]");
264
putErrmsg("tcpclo can't attach to BP", NULL);
268
buffer = MTAKE(TCPCLA_BUFSZ);
271
putErrmsg("No memory for TCP buffer in tcpclo.", NULL);
275
findOutduct("tcp", ductName, &vduct, &vductElt);
278
putErrmsg("No such tcp duct.", ductName);
283
if (vduct->cloPid != ERROR && vduct->cloPid != sm_TaskIdSelf())
285
putErrmsg("CLO task is already started for this duct.",
286
itoa(vduct->cloPid));
291
/* All command-line arguments are now validated. */
294
sdr_read(sdr, (char *) &duct, sdr_list_data(sdr, vduct->outductElt),
296
sdr_read(sdr, (char *) &protocol, duct.protocol, sizeof(ClProtocol));
297
if (protocol.nominalRate <= 0)
299
vduct->xmitThrottle.nominalRate = DEFAULT_TCP_RATE;
303
vduct->xmitThrottle.nominalRate = protocol.nominalRate;
306
memset((char *) outflows, 0, sizeof outflows);
307
outflows[0].outboundBundles = duct.bulkQueue;
308
outflows[1].outboundBundles = duct.stdQueue;
309
outflows[2].outboundBundles = duct.urgentQueue;
310
for (i = 0; i < 3; i++)
312
outflows[i].svcFactor = 1 << i;
316
parseSocketSpec(ductName, &portNbr, &hostNbr);
319
portNbr = BpTcpDefaultPortNbr;
322
portNbr = htons(portNbr);
325
putErrmsg("Can't get IP address for host.", hostName);
330
hostNbr = htonl(hostNbr);
331
memset((char *) &socketName, 0, sizeof socketName);
332
inetName = (struct sockaddr_in *) &socketName;
333
inetName->sin_family = AF_INET;
334
inetName->sin_port = portNbr;
335
memcpy((char *) &(inetName->sin_addr.s_addr), (char *) &hostNbr, 4);
336
if (_tcpOutductId(&socketName, "tcp", ductName) < 0)
338
putErrmsg("Can't record TCP Outduct ID for connection.", NULL);
343
/* Set up signal handling. SIGTERM is shutdown signal. */
345
tcpcloSemaphore = vduct->semaphore;
346
sm_TaskVarAdd(&tcpcloSemaphore);
347
isignal(SIGTERM, shutDownClo);
349
isignal(SIGPIPE, handleConnectionLoss);
352
/* Start the keepalive thread for the eventual connection. */
354
tcpDesiredKeepAlivePeriod = KEEPALIVE_PERIOD;
355
parms.cloRunning = &running;
356
pthread_mutex_init(&mutex, NULL);
357
parms.mutex = &mutex;
358
parms.socketName = &socketName;
359
parms.ductSocket = &ductSocket;
360
parms.keepalivePeriod = &keepalivePeriod;
361
if (pthread_create(&keepaliveThread, NULL, sendKeepalives, &parms))
363
putSysErrmsg("tcpclo can't create keepalive thread", NULL);
365
pthread_mutex_destroy(&mutex);
369
// Returns the VInduct Object of first induct with same protocol
370
// as the outduct. The VInduct is required to create an acq area.
371
// The Acq Area inturn uses the throttle information from VInduct
372
// object while receiving bundles. The throttle information
373
// of all inducts of the same induct will be the same, so choosing
374
// any induct will serve the purpose.
376
findVInduct(&viduct,protocol.name);
379
putErrmsg("tcpclo can't get VInduct", NULL);
381
pthread_mutex_destroy(&mutex);
386
rparms.vduct = viduct;
387
rparms.bundleSocket = &ductSocket;
388
rparms.mutex = &mutex;
389
rparms.cloRunning = &running;
390
if (pthread_create(&receiverThread, NULL, receiveBundles, &rparms))
392
putSysErrmsg("tcpclo can't create receive thread", NULL);
394
pthread_mutex_destroy(&mutex);
398
/* Can now begin transmitting to remote duct. */
403
isprintf(txt, sizeof(txt),
404
"[i] tcpclo is running, spec=[%s:%d].",
405
inet_ntoa(inetName->sin_addr),
406
ntohs(inetName->sin_port));
410
while (running && !(sm_SemEnded(tcpcloSemaphore)))
412
if (bpDequeue(vduct, outflows, &bundleZco, &extendedCOS,
413
destDuctName, 1) < 0)
415
running = 0; /* Terminate CLO. */
419
if (bundleZco == 0) /* Interrupted. */
424
bundleLength = zco_length(sdr, bundleZco);
425
pthread_mutex_lock(&mutex);
426
bytesSent = sendBundleByTCPCL(&socketName, &ductSocket,
427
bundleLength, bundleZco, buffer, &keepalivePeriod);
428
pthread_mutex_unlock(&mutex);
431
running = 0; /* Terminate CLO. */
434
/* Make sure other tasks have a chance to run. */
439
if (sendShutDownMessage(&ductSocket, SHUT_DN_NO, -1, &socketName) < 0)
441
putErrmsg("Sending Shutdown message failed!!",NULL);
444
if (ductSocket != -1)
446
closesocket(ductSocket);
449
pthread_join(keepaliveThread, NULL);
450
writeMemo("tcpclo keep alive thread killed");
453
pthread_join(receiverThread, NULL);
454
writeMemo("tcpclo receiver thread killed");
457
writeMemo("[i] tcpclo duct has ended.");
458
oK(_tcpOutductId(&socketName, NULL, NULL));
460
pthread_mutex_destroy(&mutex);