2
ltpcli.c: BP LTP-based convergence-layer input
3
daemon, designed to serve as an input
6
Author: Scott Burleigh, JPL
8
Copyright (c) 2007, California Institute of Technology.
9
ALL RIGHTS RESERVED. U.S. Government Sponsorship
17
static void interruptThread()
19
isignal(SIGTERM, interruptThread);
20
ionKillMainThread("ltpcli");
23
/* * * Receiver thread functions * * */
29
} ReceiverThreadParms;
31
static int acquireRedBundles(AcqWorkArea *work, Object zco,
32
unsigned long senderEngineNbr)
34
char engineNbrString[21];
35
char senderEidBuffer[SDRSTRING_BUFSZ];
38
isprintf(engineNbrString, sizeof engineNbrString, "%lu",
40
senderEid = senderEidBuffer;
41
getSenderEid(&senderEid, engineNbrString);
42
if (bpBeginAcq(work, 0, senderEid) < 0)
44
putErrmsg("Can't begin acquisition of bundle(s).", NULL);
48
if (bpLoadAcq(work, zco) < 0)
50
putErrmsg("Can't continue bundle acquisition.", NULL);
54
if (bpEndAcq(work) < 0)
56
putErrmsg("Can't end acquisition of bundle(s).", NULL);
63
static int handleGreenSegment(AcqWorkArea *work, LtpSessionId *sessionId,
64
unsigned char endOfBlock, unsigned long offset,
65
unsigned long length, Object zco, unsigned long *buflen, char **buffer)
67
Sdr sdr = getIonsdr();
68
static LtpSessionId currentSessionId = { 0, 0 };
69
static unsigned long currentOffset = 0;
70
unsigned long fillLength;
71
char engineNbrString[21];
72
char senderEidBuffer[SDRSTRING_BUFSZ];
76
if (zco == 0) /* Import session canceled. */
79
currentSessionId.sourceEngineId = 0;
80
currentSessionId.sessionNbr = 0;
85
if (zco_source_data_length(sdr, zco) != length)
87
return 0; /* Just discard the segment. */
90
if (sessionId->sourceEngineId != currentSessionId.sourceEngineId
91
|| sessionId->sessionNbr != currentSessionId.sessionNbr)
93
/* Did not receive end-of-block segment for the
94
* block that was being received. Discard the
95
* partially received bundle in the work area,
99
currentSessionId.sourceEngineId = 0;
100
currentSessionId.sessionNbr = 0;
104
if (currentOffset == 0)
106
/* Start new green bundle acquisition. */
108
isprintf(engineNbrString, sizeof engineNbrString, "%lu",
109
sessionId->sourceEngineId);
110
senderEid = senderEidBuffer;
111
getSenderEid(&senderEid, engineNbrString);
112
if (bpBeginAcq(work, 0, senderEid) < 0)
114
putErrmsg("Can't begin acquisition of bundle.", NULL);
118
currentSessionId.sourceEngineId = sessionId->sourceEngineId;
119
currentSessionId.sessionNbr = sessionId->sessionNbr;
122
if (offset < currentOffset) /* Out of order. */
124
return 0; /* Just discard the segment. */
127
if (offset > currentOffset)
129
/* Must insert fill data -- partial loss of
130
* bundle payload, for example, may be okay. */
132
fillLength = offset - currentOffset;
133
if (fillLength > *buflen)
135
/* Make buffer big enough for the fill
144
*buffer = MTAKE(fillLength);
147
/* Gap is too large to fill.
148
* Might be a DOS attack; cancel
152
currentSessionId.sourceEngineId = 0;
153
currentSessionId.sessionNbr = 0;
158
*buflen = fillLength;
161
memset(*buffer, 0, fillLength);
162
if (bpContinueAcq(work, *buffer, (int) fillLength) < 0)
164
putErrmsg("Can't insert bundle fill data.", NULL);
168
currentOffset += fillLength;
171
if (length > *buflen)
173
/* Make buffer big enough for the green data. */
181
*buffer = MTAKE(length);
184
/* Segment is too large. Might be a
185
* DOS attack; cancel acquisition. */
188
currentSessionId.sourceEngineId = 0;
189
currentSessionId.sessionNbr = 0;
197
/* Extract data from segment ZCO so that it can be
198
* appended to the bundle acquisition ZCO. */
201
zco_start_receiving(sdr, zco, &reader);
202
if (zco_receive_source(sdr, &reader, length, *buffer) < 0)
204
putErrmsg("Failed reading green segment data.", NULL);
208
zco_stop_receiving(sdr, &reader);
209
if (sdr_end_xn(sdr) < 0)
211
putErrmsg("Crashed on green data extraction.", NULL);
215
if (bpContinueAcq(work, *buffer, (int) length) < 0)
217
putErrmsg("Can't continue bundle acquisition.", NULL);
221
currentOffset += length;
224
if (bpEndAcq(work) < 0)
226
putErrmsg("Can't end acquisition of bundle.", NULL);
230
currentSessionId.sourceEngineId = 0;
231
currentSessionId.sessionNbr = 0;
238
static void *handleNotices(void *parm)
240
/* Main loop for LTP notice reception and handling. */
242
Sdr sdr = getIonsdr();
243
ReceiverThreadParms *rtp = (ReceiverThreadParms *) parm;
244
char *procName = "ltpcli";
245
AcqWorkArea *redWork;
246
AcqWorkArea *greenWork;
248
LtpSessionId sessionId;
249
unsigned char reasonCode;
250
unsigned char endOfBlock;
251
unsigned long dataOffset;
252
unsigned long dataLength;
253
Object data; /* ZCO reference. */
254
unsigned long greenBuflen = 0;
255
char *greenBuffer = NULL;
257
snooze(1); /* Let main thread become interruptable. */
258
if (ltp_open(BpLtpClientId) < 0)
260
putErrmsg("ltpcli can't open client access.",
261
itoa(BpLtpClientId));
262
ionKillMainThread(procName);
266
redWork = bpGetAcqArea(rtp->vduct);
267
greenWork = bpGetAcqArea(rtp->vduct);
268
if (redWork == NULL || greenWork == NULL)
270
ltp_close(BpLtpClientId);
271
putErrmsg("ltpcli can't get acquisition work areas", NULL);
272
ionKillMainThread(procName);
276
/* Can now start receiving notices. On failure, take
281
if (ltp_get_notice(BpLtpClientId, &type, &sessionId,
282
&reasonCode, &endOfBlock, &dataOffset,
283
&dataLength, &data) < 0)
285
putErrmsg("Can't get LTP notice.", NULL);
286
ionKillMainThread(procName);
293
case LtpExportSessionComplete: /* Xmit success. */
294
if (data == 0) /* Ignore it. */
296
break; /* Out of switch. */
299
if (bpHandleXmitSuccess(data) < 0)
301
putErrmsg("Crashed on xmit success.", NULL);
302
ionKillMainThread(procName);
304
break; /* Out of switch. */
308
zco_destroy_reference(sdr, data);
309
if (sdr_end_xn(sdr) < 0)
311
putErrmsg("Crashed on data cleanup.", NULL);
312
ionKillMainThread(procName);
316
break; /* Out of switch. */
318
case LtpExportSessionCanceled: /* Xmit failure. */
319
if (data == 0) /* Ignore it. */
321
break; /* Out of switch. */
324
if (bpHandleXmitFailure(data) < 0)
326
putErrmsg("Crashed on xmit failure.", NULL);
327
ionKillMainThread(procName);
329
break; /* Out of switch. */
333
zco_destroy_reference(sdr, data);
334
if (sdr_end_xn(sdr) < 0)
336
putErrmsg("Crashed on data cleanup.", NULL);
337
ionKillMainThread(procName);
341
break; /* Out of switch. */
343
case LtpImportSessionCanceled:
344
/* None of the red data for the import
345
* session (if any) have been received
346
* yet, so nothing to discard. In case
347
* part or all of the import session was
348
* green data, force deletion of retained
351
sessionId.sourceEngineId = 0;
352
sessionId.sessionNbr = 0;
353
if (handleGreenSegment(greenWork, &sessionId,
354
0, 0, 0, 0, NULL, NULL) < 0)
356
putErrmsg("Can't cancel green session.", NULL);
357
ionKillMainThread(procName);
361
break; /* Out of switch. */
366
/* Block is partially red and
367
* partially green. Too risky
368
* to wait for green EOB before
369
* clearing the work area, so
370
* just discard the data. */
373
zco_destroy_reference(sdr, data);
374
if (sdr_end_xn(sdr) < 0)
376
putErrmsg("Crashed: partially red.",
378
ionKillMainThread(procName);
382
break; /* Out of switch. */
385
if (acquireRedBundles(redWork, data,
386
sessionId.sourceEngineId) < 0)
388
putErrmsg("Can't acquire bundle(s).", NULL);
389
ionKillMainThread(procName);
393
break; /* Out of switch. */
395
case LtpRecvGreenSegment:
396
if (handleGreenSegment(greenWork, &sessionId,
397
endOfBlock, dataOffset, dataLength,
398
data, &greenBuflen, &greenBuffer) < 0)
400
putErrmsg("Can't handle green segment.", NULL);
401
ionKillMainThread(procName);
405
/* Discard the ZCO in any case. */
408
zco_destroy_reference(sdr, data);
409
if (sdr_end_xn(sdr) < 0)
411
putErrmsg("Crashed: green segment.", NULL);
412
ionKillMainThread(procName);
416
break; /* Out of switch. */
419
break; /* Out of switch. */
422
/* Make sure other tasks have a chance to run. */
428
writeMemo("[i] ltpcli receiver thread has ended.");
430
/* Free resources. */
434
MRELEASE(greenBuffer);
437
bpReleaseAcqArea(greenWork);
438
bpReleaseAcqArea(redWork);
439
ltp_close(BpLtpClientId);
443
/* * * Main thread functions * * * */
445
#if defined (VXWORKS) || defined (RTEMS)
446
int ltpcli(int a1, int a2, int a3, int a4, int a5,
447
int a6, int a7, int a8, int a9, int a10)
449
char *ductName = (char *) a1;
451
int main(int argc, char *argv[])
453
char *ductName = (argc > 1 ? argv[1] : NULL);
457
ReceiverThreadParms rtp;
458
pthread_t receiverThread;
460
if (ductName == NULL)
462
PUTS("Usage: ltpcli <local engine number>]");
468
putErrmsg("ltpcli can't attach to BP.", NULL);
472
findInduct("ltp", ductName, &vduct, &vductElt);
475
putErrmsg("No such ltp duct.", ductName);
479
if (vduct->cliPid != ERROR && vduct->cliPid != sm_TaskIdSelf())
481
putErrmsg("CLI task is already started for this duct.",
482
itoa(vduct->cliPid));
486
/* All command-line arguments are now validated. */
488
if (ltp_attach() < 0)
490
putErrmsg("ltpcli can't initialize LTP.", NULL);
494
/* Initialize sender endpoint ID lookup. */
499
/* Set up signal handling; SIGTERM is shutdown signal. */
501
ionNoteMainThread("ltpcli");
502
isignal(SIGTERM, interruptThread);
504
/* Start the receiver thread. */
508
if (pthread_create(&receiverThread, NULL, handleNotices, &rtp))
510
putSysErrmsg("ltpcli can't create receiver thread", NULL);
514
/* Now sleep until interrupted by SIGTERM, at which point
515
* it's time to stop the induct. */
517
writeMemo("[i] ltpcli is running.");
518
ionPauseMainThread(-1);
520
/* Time to shut down. */
524
/* Stop the receiver thread by interrupting client access. */
526
ltp_interrupt(BpLtpClientId);
527
pthread_join(receiverThread, NULL);
529
writeMemo("[i] ltpcli duct has ended.");