~ubuntu-branches/debian/jessie/ion/jessie

« back to all changes in this revision

Viewing changes to bp/ltp/ltpcli.c

  • Committer: Package Import Robot
  • Author(s): Leo Iannacone
  • Date: 2012-02-01 09:46:31 UTC
  • Revision ID: package-import@ubuntu.com-20120201094631-qpfwehc1b7ftkjgx
Tags: upstream-2.5.3~dfsg1
ImportĀ upstreamĀ versionĀ 2.5.3~dfsg1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
        ltpcli.c:       BP LTP-based convergence-layer input
 
3
                        daemon, designed to serve as an input
 
4
                        duct.
 
5
 
 
6
        Author: Scott Burleigh, JPL
 
7
 
 
8
        Copyright (c) 2007, California Institute of Technology.
 
9
        ALL RIGHTS RESERVED.  U.S. Government Sponsorship
 
10
        acknowledged.
 
11
        
 
12
                                                                        */
 
13
#include "ltpcla.h"
 
14
#include "ipnfw.h"
 
15
#include "dtn2fw.h"
 
16
 
 
17
static void     interruptThread()
 
18
{
 
19
        isignal(SIGTERM, interruptThread);
 
20
        ionKillMainThread("ltpcli");
 
21
}
 
22
 
 
23
/*      *       *       Receiver thread functions       *       *       */
 
24
 
 
25
typedef struct
 
26
{
 
27
        VInduct         *vduct;
 
28
        int             running;
 
29
} ReceiverThreadParms;
 
30
 
 
31
static int      acquireRedBundles(AcqWorkArea *work, Object zco,
 
32
                        unsigned long senderEngineNbr)
 
33
{
 
34
        char    engineNbrString[21];
 
35
        char    senderEidBuffer[SDRSTRING_BUFSZ];
 
36
        char    *senderEid;
 
37
 
 
38
        isprintf(engineNbrString, sizeof engineNbrString, "%lu",
 
39
                        senderEngineNbr);
 
40
        senderEid = senderEidBuffer;
 
41
        getSenderEid(&senderEid, engineNbrString);
 
42
        if (bpBeginAcq(work, 0, senderEid) < 0)
 
43
        {
 
44
                putErrmsg("Can't begin acquisition of bundle(s).", NULL);
 
45
                return -1;
 
46
        }
 
47
 
 
48
        if (bpLoadAcq(work, zco) < 0)
 
49
        {
 
50
                putErrmsg("Can't continue bundle acquisition.", NULL);
 
51
                return -1;
 
52
        }
 
53
 
 
54
        if (bpEndAcq(work) < 0)
 
55
        {
 
56
                putErrmsg("Can't end acquisition of bundle(s).", NULL);
 
57
                return -1;
 
58
        }
 
59
 
 
60
        return 0;
 
61
}
 
62
 
 
63
static int      handleGreenSegment(AcqWorkArea *work, LtpSessionId *sessionId,
 
64
                        unsigned char endOfBlock, unsigned long offset,
 
65
                        unsigned long length, Object zco, unsigned long *buflen,                        char **buffer)
 
66
{
 
67
        Sdr                     sdr = getIonsdr();
 
68
        static LtpSessionId     currentSessionId = { 0, 0 };
 
69
        static unsigned long    currentOffset = 0;
 
70
        unsigned long           fillLength;
 
71
        char                    engineNbrString[21];
 
72
        char                    senderEidBuffer[SDRSTRING_BUFSZ];
 
73
        char                    *senderEid;
 
74
        ZcoReader               reader;
 
75
 
 
76
        if (zco == 0)           /*      Import session canceled.        */
 
77
        {
 
78
                bpCancelAcq(work);
 
79
                currentSessionId.sourceEngineId = 0;
 
80
                currentSessionId.sessionNbr = 0;
 
81
                currentOffset = 0;
 
82
                return 0;
 
83
        }
 
84
 
 
85
        if (zco_source_data_length(sdr, zco) != length)
 
86
        {
 
87
                return 0;       /*      Just discard the segment.       */
 
88
        }
 
89
 
 
90
        if (sessionId->sourceEngineId != currentSessionId.sourceEngineId
 
91
        || sessionId->sessionNbr != currentSessionId.sessionNbr)
 
92
        {
 
93
                /*      Did not receive end-of-block segment for the
 
94
                 *      block that was being received.  Discard the
 
95
                 *      partially received bundle in the work area,
 
96
                 *      if any.                                         */
 
97
 
 
98
                bpCancelAcq(work);
 
99
                currentSessionId.sourceEngineId = 0;
 
100
                currentSessionId.sessionNbr = 0;
 
101
                currentOffset = 0;
 
102
        }
 
103
 
 
104
        if (currentOffset == 0)
 
105
        {
 
106
                /*      Start new green bundle acquisition.             */
 
107
 
 
108
                isprintf(engineNbrString, sizeof engineNbrString, "%lu",
 
109
                                sessionId->sourceEngineId);
 
110
                senderEid = senderEidBuffer;
 
111
                getSenderEid(&senderEid, engineNbrString);
 
112
                if (bpBeginAcq(work, 0, senderEid) < 0)
 
113
                {
 
114
                        putErrmsg("Can't begin acquisition of bundle.", NULL);
 
115
                        return -1;
 
116
                }
 
117
 
 
118
                currentSessionId.sourceEngineId = sessionId->sourceEngineId;
 
119
                currentSessionId.sessionNbr = sessionId->sessionNbr;
 
120
        }
 
121
 
 
122
        if (offset < currentOffset)     /*      Out of order.           */
 
123
        {
 
124
                return 0;       /*      Just discard the segment.       */
 
125
        }
 
126
 
 
127
        if (offset > currentOffset)
 
128
        {
 
129
                /*      Must insert fill data -- partial loss of
 
130
                 *      bundle payload, for example, may be okay.       */
 
131
 
 
132
                fillLength = offset - currentOffset;
 
133
                if (fillLength > *buflen)
 
134
                {
 
135
                        /*      Make buffer big enough for the fill
 
136
                         *      data.                                   */
 
137
 
 
138
                        if (*buffer)
 
139
                        {
 
140
                                MRELEASE(*buffer);
 
141
                                *buflen = 0;
 
142
                        }
 
143
 
 
144
                        *buffer = MTAKE(fillLength);
 
145
                        if (*buffer == NULL)
 
146
                        {
 
147
                                /*      Gap is too large to fill.
 
148
                                 *      Might be a DOS attack; cancel
 
149
                                 *      acquisition.                    */
 
150
 
 
151
                                bpCancelAcq(work);
 
152
                                currentSessionId.sourceEngineId = 0;
 
153
                                currentSessionId.sessionNbr = 0;
 
154
                                currentOffset = 0;
 
155
                                return 0;
 
156
                        }
 
157
 
 
158
                        *buflen = fillLength;
 
159
                }
 
160
 
 
161
                memset(*buffer, 0, fillLength);
 
162
                if (bpContinueAcq(work, *buffer, (int) fillLength) < 0)
 
163
                {
 
164
                        putErrmsg("Can't insert bundle fill data.", NULL);
 
165
                        return -1;
 
166
                }
 
167
 
 
168
                currentOffset += fillLength;
 
169
        }
 
170
 
 
171
        if (length > *buflen)
 
172
        {
 
173
                /*      Make buffer big enough for the green data.      */
 
174
 
 
175
                if (*buffer)
 
176
                {
 
177
                        MRELEASE(*buffer);
 
178
                        *buflen = 0;
 
179
                }
 
180
 
 
181
                *buffer = MTAKE(length);
 
182
                if (*buffer == NULL)
 
183
                {
 
184
                        /*      Segment is too large.  Might be a
 
185
                         *      DOS attack; cancel acquisition.         */
 
186
 
 
187
                        bpCancelAcq(work);
 
188
                        currentSessionId.sourceEngineId = 0;
 
189
                        currentSessionId.sessionNbr = 0;
 
190
                        currentOffset = 0;
 
191
                        return 0;
 
192
                }
 
193
 
 
194
                *buflen = length;
 
195
        }
 
196
 
 
197
        /*      Extract data from segment ZCO so that it can be
 
198
         *      appended to the bundle acquisition ZCO.                 */
 
199
 
 
200
        sdr_begin_xn(sdr);
 
201
        zco_start_receiving(sdr, zco, &reader);
 
202
        if (zco_receive_source(sdr, &reader, length, *buffer) < 0)
 
203
        {
 
204
                putErrmsg("Failed reading green segment data.", NULL);
 
205
                return -1;
 
206
        }
 
207
 
 
208
        zco_stop_receiving(sdr, &reader);
 
209
        if (sdr_end_xn(sdr) < 0)
 
210
        {
 
211
                putErrmsg("Crashed on green data extraction.", NULL);
 
212
                return -1;
 
213
        }
 
214
 
 
215
        if (bpContinueAcq(work, *buffer, (int) length) < 0)
 
216
        {
 
217
                putErrmsg("Can't continue bundle acquisition.", NULL);
 
218
                return -1;
 
219
        }
 
220
 
 
221
        currentOffset += length;
 
222
        if (endOfBlock)
 
223
        {
 
224
                if (bpEndAcq(work) < 0)
 
225
                {
 
226
                        putErrmsg("Can't end acquisition of bundle.", NULL);
 
227
                        return -1;
 
228
                }
 
229
 
 
230
                currentSessionId.sourceEngineId = 0;
 
231
                currentSessionId.sessionNbr = 0;
 
232
                currentOffset = 0;
 
233
        }
 
234
 
 
235
        return 0;
 
236
}
 
237
 
 
238
static void     *handleNotices(void *parm)
 
239
{
 
240
        /*      Main loop for LTP notice reception and handling.        */
 
241
 
 
242
        Sdr                     sdr = getIonsdr();
 
243
        ReceiverThreadParms     *rtp = (ReceiverThreadParms *) parm;
 
244
        char                    *procName = "ltpcli";
 
245
        AcqWorkArea             *redWork;
 
246
        AcqWorkArea             *greenWork;
 
247
        LtpNoticeType           type;
 
248
        LtpSessionId            sessionId;
 
249
        unsigned char           reasonCode;
 
250
        unsigned char           endOfBlock;
 
251
        unsigned long           dataOffset;
 
252
        unsigned long           dataLength;
 
253
        Object                  data;           /*      ZCO reference.  */
 
254
        unsigned long           greenBuflen = 0;
 
255
        char                    *greenBuffer = NULL;
 
256
 
 
257
        snooze(1);      /*      Let main thread become interruptable.   */
 
258
        if (ltp_open(BpLtpClientId) < 0)
 
259
        {
 
260
                putErrmsg("ltpcli can't open client access.",
 
261
                                itoa(BpLtpClientId));
 
262
                ionKillMainThread(procName);
 
263
                return NULL;
 
264
        }
 
265
 
 
266
        redWork = bpGetAcqArea(rtp->vduct);
 
267
        greenWork = bpGetAcqArea(rtp->vduct);
 
268
        if (redWork == NULL || greenWork == NULL)
 
269
        {
 
270
                ltp_close(BpLtpClientId);
 
271
                putErrmsg("ltpcli can't get acquisition work areas", NULL);
 
272
                ionKillMainThread(procName);
 
273
                return NULL;
 
274
        }
 
275
 
 
276
        /*      Can now start receiving notices.  On failure, take
 
277
         *      down the CLI.                                           */
 
278
 
 
279
        while (rtp->running)
 
280
        {
 
281
                if (ltp_get_notice(BpLtpClientId, &type, &sessionId,
 
282
                                &reasonCode, &endOfBlock, &dataOffset,
 
283
                                &dataLength, &data) < 0)
 
284
                {
 
285
                        putErrmsg("Can't get LTP notice.", NULL);
 
286
                        ionKillMainThread(procName);
 
287
                        rtp->running = 0;
 
288
                        continue;
 
289
                }
 
290
 
 
291
                switch (type)
 
292
                {
 
293
                case LtpExportSessionComplete:  /*      Xmit success.   */
 
294
                        if (data == 0)          /*      Ignore it.      */
 
295
                        {
 
296
                                break;          /*      Out of switch.  */
 
297
                        }
 
298
 
 
299
                        if (bpHandleXmitSuccess(data) < 0)
 
300
                        {
 
301
                                putErrmsg("Crashed on xmit success.", NULL);
 
302
                                ionKillMainThread(procName);
 
303
                                rtp->running = 0;
 
304
                                break;          /*      Out of switch.  */
 
305
                        }
 
306
 
 
307
                        sdr_begin_xn(sdr);
 
308
                        zco_destroy_reference(sdr, data);
 
309
                        if (sdr_end_xn(sdr) < 0)
 
310
                        {
 
311
                                putErrmsg("Crashed on data cleanup.", NULL);
 
312
                                ionKillMainThread(procName);
 
313
                                rtp->running = 0;
 
314
                        }
 
315
 
 
316
                        break;          /*      Out of switch.          */
 
317
 
 
318
                case LtpExportSessionCanceled:  /*      Xmit failure.   */
 
319
                        if (data == 0)          /*      Ignore it.      */
 
320
                        {
 
321
                                break;          /*      Out of switch.  */
 
322
                        }
 
323
 
 
324
                        if (bpHandleXmitFailure(data) < 0)
 
325
                        {
 
326
                                putErrmsg("Crashed on xmit failure.", NULL);
 
327
                                ionKillMainThread(procName);
 
328
                                rtp->running = 0;
 
329
                                break;          /*      Out of switch.  */
 
330
                        }
 
331
 
 
332
                        sdr_begin_xn(sdr);
 
333
                        zco_destroy_reference(sdr, data);
 
334
                        if (sdr_end_xn(sdr) < 0)
 
335
                        {
 
336
                                putErrmsg("Crashed on data cleanup.", NULL);
 
337
                                ionKillMainThread(procName);
 
338
                                rtp->running = 0;
 
339
                        }
 
340
 
 
341
                        break;          /*      Out of switch.          */
 
342
 
 
343
                case LtpImportSessionCanceled:
 
344
                        /*      None of the red data for the import
 
345
                         *      session (if any) have been received
 
346
                         *      yet, so nothing to discard.  In case
 
347
                         *      part or all of the import session was
 
348
                         *      green data, force deletion of retained
 
349
                         *      data.                                   */
 
350
 
 
351
                        sessionId.sourceEngineId = 0;
 
352
                        sessionId.sessionNbr = 0;
 
353
                        if (handleGreenSegment(greenWork, &sessionId,
 
354
                                        0, 0, 0, 0, NULL, NULL) < 0)
 
355
                        {
 
356
                                putErrmsg("Can't cancel green session.", NULL);
 
357
                                ionKillMainThread(procName);
 
358
                                rtp->running = 0;
 
359
                        }
 
360
 
 
361
                        break;          /*      Out of switch.          */
 
362
 
 
363
                case LtpRecvRedPart:
 
364
                        if (!endOfBlock)
 
365
                        {
 
366
                                /*      Block is partially red and
 
367
                                 *      partially green.  Too risky
 
368
                                 *      to wait for green EOB before
 
369
                                 *      clearing the work area, so
 
370
                                 *      just discard the data.          */
 
371
 
 
372
                                sdr_begin_xn(sdr);
 
373
                                zco_destroy_reference(sdr, data);
 
374
                                if (sdr_end_xn(sdr) < 0)
 
375
                                {
 
376
                                        putErrmsg("Crashed: partially red.",
 
377
                                                        NULL);
 
378
                                        ionKillMainThread(procName);
 
379
                                        rtp->running = 0;
 
380
                                }
 
381
 
 
382
                                break;          /*      Out of switch.  */
 
383
                        }
 
384
 
 
385
                        if (acquireRedBundles(redWork, data,
 
386
                                        sessionId.sourceEngineId) < 0)
 
387
                        {
 
388
                                putErrmsg("Can't acquire bundle(s).", NULL);
 
389
                                ionKillMainThread(procName);
 
390
                                rtp->running = 0;
 
391
                        }
 
392
 
 
393
                        break;          /*      Out of switch.          */
 
394
 
 
395
                case LtpRecvGreenSegment:
 
396
                        if (handleGreenSegment(greenWork, &sessionId,
 
397
                                        endOfBlock, dataOffset, dataLength,
 
398
                                        data, &greenBuflen, &greenBuffer) < 0)
 
399
                        {
 
400
                                putErrmsg("Can't handle green segment.", NULL);
 
401
                                ionKillMainThread(procName);
 
402
                                rtp->running = 0;
 
403
                        }
 
404
 
 
405
                        /*      Discard the ZCO in any case.            */
 
406
 
 
407
                        sdr_begin_xn(sdr);
 
408
                        zco_destroy_reference(sdr, data);
 
409
                        if (sdr_end_xn(sdr) < 0)
 
410
                        {
 
411
                                putErrmsg("Crashed: green segment.", NULL);
 
412
                                ionKillMainThread(procName);
 
413
                                rtp->running = 0;
 
414
                        }
 
415
 
 
416
                        break;          /*      Out of switch.          */
 
417
 
 
418
                default:
 
419
                        break;          /*      Out of switch.          */
 
420
                }
 
421
 
 
422
                /*      Make sure other tasks have a chance to run.     */
 
423
 
 
424
                sm_TaskYield();
 
425
        }
 
426
 
 
427
        writeErrmsgMemos();
 
428
        writeMemo("[i] ltpcli receiver thread has ended.");
 
429
 
 
430
        /*      Free resources.                                         */
 
431
 
 
432
        if (greenBuffer)
 
433
        {
 
434
                MRELEASE(greenBuffer);
 
435
        }
 
436
 
 
437
        bpReleaseAcqArea(greenWork);
 
438
        bpReleaseAcqArea(redWork);
 
439
        ltp_close(BpLtpClientId);
 
440
        return NULL;
 
441
}
 
442
 
 
443
/*      *       *       Main thread functions   *       *       *       */
 
444
 
 
445
#if defined (VXWORKS) || defined (RTEMS)
 
446
int     ltpcli(int a1, int a2, int a3, int a4, int a5,
 
447
                int a6, int a7, int a8, int a9, int a10)
 
448
{
 
449
        char    *ductName = (char *) a1;
 
450
#else
 
451
int     main(int argc, char *argv[])
 
452
{
 
453
        char    *ductName = (argc > 1 ? argv[1] : NULL);
 
454
#endif
 
455
        VInduct                 *vduct;
 
456
        PsmAddress              vductElt;
 
457
        ReceiverThreadParms     rtp;
 
458
        pthread_t               receiverThread;
 
459
 
 
460
        if (ductName == NULL)
 
461
        {
 
462
                PUTS("Usage: ltpcli <local engine number>]");
 
463
                return 0;
 
464
        }
 
465
 
 
466
        if (bpAttach() < 0)
 
467
        {
 
468
                putErrmsg("ltpcli can't attach to BP.", NULL);
 
469
                return -1;
 
470
        }
 
471
 
 
472
        findInduct("ltp", ductName, &vduct, &vductElt);
 
473
        if (vductElt == 0)
 
474
        {
 
475
                putErrmsg("No such ltp duct.", ductName);
 
476
                return -1;
 
477
        }
 
478
 
 
479
        if (vduct->cliPid != ERROR && vduct->cliPid != sm_TaskIdSelf())
 
480
        {
 
481
                putErrmsg("CLI task is already started for this duct.",
 
482
                                itoa(vduct->cliPid));
 
483
                return -1;
 
484
        }
 
485
 
 
486
        /*      All command-line arguments are now validated.           */
 
487
 
 
488
        if (ltp_attach() < 0)
 
489
        {
 
490
                putErrmsg("ltpcli can't initialize LTP.", NULL);
 
491
                return -1;
 
492
        }
 
493
 
 
494
        /*      Initialize sender endpoint ID lookup.                   */
 
495
 
 
496
        ipnInit();
 
497
        dtn2Init();
 
498
 
 
499
        /*      Set up signal handling; SIGTERM is shutdown signal.     */
 
500
 
 
501
        ionNoteMainThread("ltpcli");
 
502
        isignal(SIGTERM, interruptThread);
 
503
 
 
504
        /*      Start the receiver thread.                              */
 
505
 
 
506
        rtp.vduct = vduct;
 
507
        rtp.running = 1;
 
508
        if (pthread_create(&receiverThread, NULL, handleNotices, &rtp))
 
509
        {
 
510
                putSysErrmsg("ltpcli can't create receiver thread", NULL);
 
511
                return 1;
 
512
        }
 
513
 
 
514
        /*      Now sleep until interrupted by SIGTERM, at which point
 
515
         *      it's time to stop the induct.                           */
 
516
 
 
517
        writeMemo("[i] ltpcli is running.");
 
518
        ionPauseMainThread(-1);
 
519
 
 
520
        /*      Time to shut down.                                      */
 
521
 
 
522
        rtp.running = 0;
 
523
 
 
524
        /*      Stop the receiver thread by interrupting client access. */
 
525
 
 
526
        ltp_interrupt(BpLtpClientId);
 
527
        pthread_join(receiverThread, NULL);
 
528
        writeErrmsgMemos();
 
529
        writeMemo("[i] ltpcli duct has ended.");
 
530
        ionDetach();
 
531
        return 0;
 
532
}