~ubuntu-branches/debian/stretch/ion/stretch

« back to all changes in this revision

Viewing changes to bp/tcp/tcpclo.c

  • Committer: Package Import Robot
  • Author(s): Leo Iannacone
  • Date: 2012-02-01 09:46:31 UTC
  • Revision ID: package-import@ubuntu.com-20120201094631-qpfwehc1b7ftkjgx
Tags: upstream-2.5.3~dfsg1
ImportĀ upstreamĀ versionĀ 2.5.3~dfsg1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
        tcpclo.c:       BP TCP-based convergence-layer output
 
3
                        daemon.  Note that this convergence-layer
 
4
                        output daemon is a "dedicated" CLO daemon
 
5
                        suitable only for a limited number of paths,
 
6
                        because it manages just a single TCP
 
7
                        connection.
 
8
 
 
9
                        Promiscuous CLO daemons need to be based on
 
10
                        UDP, Dgr, etc.
 
11
                        
 
12
                        Modification : This has been made compliant
 
13
                        to draft-irtf-dtnrg-tcp-clayer-02.
 
14
 
 
15
        Author: Scott Burleigh, JPL
 
16
 
 
17
        Copyright (c) 2004, California Institute of Technology.
 
18
        ALL RIGHTS RESERVED.  U.S. Government Sponsorship
 
19
        acknowledged.
 
20
        
 
21
                                                                        */
 
22
#include "tcpcla.h"
 
23
 
 
24
static sm_SemId         tcpcloSemaphore;
 
25
 
 
26
static void     shutDownClo()   /*      Commands CLO termination.       */
 
27
{
 
28
        sm_SemEnd(tcpcloSemaphore);
 
29
}
 
30
 
 
31
/*      *       *       Keepalive thread functions      *       *       */
 
32
 
 
33
typedef struct
 
34
{
 
35
        int             *cloRunning;
 
36
        pthread_mutex_t *mutex;
 
37
        struct sockaddr *socketName;
 
38
        int             *ductSocket;
 
39
        int             *keepalivePeriod;
 
40
} KeepaliveThreadParms;
 
41
 
 
42
static void     *sendKeepalives(void *parm)
 
43
{
 
44
        KeepaliveThreadParms    *parms = (KeepaliveThreadParms *) parm;
 
45
        int                     keepaliveTimer = 0;
 
46
        int                     bytesSent;
 
47
        int                     backoffTimer = BACKOFF_TIMER_START;
 
48
        int                     backoffTimerCount = 0;
 
49
        unsigned char           *buffer;
 
50
 
 
51
        buffer = MTAKE(TCPCLA_BUFSZ);   //To send keepalive bundle
 
52
        if (buffer == NULL)
 
53
        {
 
54
                putErrmsg("No memory for TCP buffer in tcpclo.", NULL);
 
55
                return NULL;
 
56
        }
 
57
 
 
58
        iblock(SIGTERM);
 
59
        while (*(parms->cloRunning))
 
60
        {
 
61
                snooze(1);
 
62
                keepaliveTimer++;
 
63
                if (keepaliveTimer < *(parms->keepalivePeriod))
 
64
                {
 
65
                        continue;
 
66
                }
 
67
 
 
68
                // If the negotiated keep alive interval is 0, then
 
69
                // keep alives will not be sent.
 
70
                if(*(parms->keepalivePeriod) == 0)
 
71
                {
 
72
                        continue;
 
73
                }
 
74
 
 
75
                /*      Time to send a keepalive.  Note that the
 
76
                 *      interval between keepalive attempts will be
 
77
                 *      KEEPALIVE_PERIOD plus (if the remote induct
 
78
                 *      is not reachable) the length of time taken
 
79
                 *      by TCP to determine that the connection
 
80
                 *      attempt will not succeed (e.g., 3 seconds).     */
 
81
 
 
82
                keepaliveTimer = 0;
 
83
                pthread_mutex_lock(parms->mutex);
 
84
                bytesSent = sendBundleByTCPCL(parms->socketName,
 
85
                                parms->ductSocket, 0, 0, buffer, parms->keepalivePeriod);
 
86
                pthread_mutex_unlock(parms->mutex);
 
87
                /*      if the node is unable to establish a TCP connection,
 
88
                 *      the connection should be tried only after some delay.
 
89
                 *                                                              */
 
90
                if(bytesSent == 0)
 
91
                {       
 
92
                        while((backoffTimerCount < backoffTimer) && (*(parms->ductSocket) < 0))
 
93
                        {
 
94
                                snooze(1);
 
95
                                backoffTimerCount++;
 
96
                                if(!(*(parms->cloRunning)))
 
97
                                {
 
98
                                        break;
 
99
                                }
 
100
                        }
 
101
                        backoffTimerCount = 0;
 
102
                        /*      keepaliveTimer keeps track of when the keepalive needs 
 
103
                         *      to be sent. This value is set to keepalive period.
 
104
                         *      That way at the end of backoff period a 
 
105
                         *      keepalive is sent
 
106
                         *                                                      */
 
107
                        keepaliveTimer = *(parms->keepalivePeriod);
 
108
 
 
109
                        if(backoffTimer < BACKOFF_TIMER_LIMIT)
 
110
                        {
 
111
                                backoffTimer *= 2;
 
112
                        }
 
113
                        continue;
 
114
                }
 
115
                backoffTimer = BACKOFF_TIMER_START;
 
116
                if (bytesSent < 0)
 
117
                {
 
118
                        shutDownClo();
 
119
                        break;
 
120
                }
 
121
        }
 
122
        MRELEASE(buffer);
 
123
        return NULL;
 
124
}
 
125
 
 
126
/*      *       *       Receiver thread functions       *       *       */
 
127
typedef struct
 
128
{
 
129
        int             *cloRunning;
 
130
        pthread_mutex_t *mutex;
 
131
        int             *bundleSocket;
 
132
        VInduct         *vduct;
 
133
} ReceiveThreadParms;
 
134
 
 
135
 
 
136
static void     *receiveBundles(void *parm)
 
137
{
 
138
        /*      Main loop for bundle reception thread   */
 
139
 
 
140
        ReceiveThreadParms      *parms = (ReceiveThreadParms *) parm;
 
141
        int                     threadRunning = 1;
 
142
        AcqWorkArea             *work;
 
143
        char                    *buffer;
 
144
 
 
145
        buffer = MTAKE(TCPCLA_BUFSZ);
 
146
        if (buffer == NULL)
 
147
        {
 
148
                putErrmsg("tcpclo receiver can't get TCP buffer", NULL);
 
149
                return NULL;
 
150
        }
 
151
 
 
152
        work = bpGetAcqArea(parms->vduct);
 
153
        if (work == NULL)
 
154
        {
 
155
                putErrmsg("tcpclo receiver can't get acquisition work area",
 
156
                                NULL);
 
157
                MRELEASE(buffer);
 
158
                return NULL;
 
159
        }
 
160
 
 
161
        iblock(SIGTERM);
 
162
        while (threadRunning && *(parms->cloRunning))
 
163
        {
 
164
                snooze(1);
 
165
                if(*(parms->bundleSocket) < 0)
 
166
                {
 
167
                        threadRunning = 0;
 
168
                        continue;
 
169
                }
 
170
 
 
171
                if (bpBeginAcq(work, 0, NULL) < 0)
 
172
                {
 
173
                        putErrmsg("Can't begin acquisition of bundle.", NULL);
 
174
                        threadRunning = 0;
 
175
                        continue;
 
176
                }
 
177
        
 
178
                switch (receiveBundleByTcpCL(*(parms->bundleSocket), work,
 
179
                                        buffer))
 
180
                {
 
181
                case -1:
 
182
                        putErrmsg("Can't acquire bundle.", NULL);
 
183
                        /*      Intentional fall-through to next case.  */
 
184
 
 
185
                case 0:                 /*      Shutdown message        */      
 
186
                        /*      Go back to the start of the while loop  */
 
187
                        pthread_mutex_lock(parms->mutex);
 
188
                        closesocket(*(parms->bundleSocket));
 
189
                        *(parms->bundleSocket) = -1;
 
190
                        pthread_mutex_unlock(parms->mutex);                     
 
191
                        threadRunning = 0;
 
192
                        continue;
 
193
 
 
194
                default:
 
195
                        break;                  /*      Out of switch.  */
 
196
                }
 
197
 
 
198
                if (bpEndAcq(work) < 0)
 
199
                {
 
200
                        putErrmsg("Can't end acquisition of bundle.", NULL);
 
201
                        threadRunning = 0;
 
202
                }
 
203
 
 
204
                /*      Make sure other tasks have a chance to run.     */
 
205
 
 
206
                sm_TaskYield();
 
207
        }
 
208
 
 
209
        /*      End of receiver thread; release resources.              */
 
210
 
 
211
        bpReleaseAcqArea(work);
 
212
        MRELEASE(buffer);
 
213
        return NULL;
 
214
}
 
215
 
 
216
/*      *       *       Main thread functions   *       *       *       */
 
217
 
 
218
#if defined (VXWORKS) || defined (RTEMS)
 
219
int     tcpclo(int a1, int a2, int a3, int a4, int a5,
 
220
                int a6, int a7, int a8, int a9, int a10)
 
221
{
 
222
        char    *ductName = (char *) a1;
 
223
#else
 
224
int     main(int argc, char *argv[])
 
225
{
 
226
        char    *ductName = (argc > 1 ? argv[1] : NULL);
 
227
#endif
 
228
        unsigned char           *buffer;
 
229
        VOutduct                *vduct;
 
230
        PsmAddress              vductElt;
 
231
        Sdr                     sdr;
 
232
        Outduct                 duct;
 
233
        ClProtocol              protocol;
 
234
        Outflow                 outflows[3];
 
235
        int                     i;
 
236
        char                    *hostName;
 
237
        unsigned short          portNbr;
 
238
        unsigned int            hostNbr;
 
239
        struct sockaddr         socketName;
 
240
        struct sockaddr_in      *inetName;
 
241
        int                     running = 1;
 
242
        pthread_mutex_t         mutex;
 
243
        KeepaliveThreadParms    parms;
 
244
        ReceiveThreadParms      rparms;
 
245
        pthread_t               keepaliveThread;
 
246
        pthread_t               receiverThread;
 
247
        Object                  bundleZco;
 
248
        BpExtendedCOS           extendedCOS;
 
249
        char                    destDuctName[MAX_CL_DUCT_NAME_LEN + 1];
 
250
        unsigned int            bundleLength;
 
251
        int                     ductSocket = -1;
 
252
        int                     bytesSent;
 
253
        int                     keepalivePeriod = 0;
 
254
        VInduct                 *viduct;
 
255
 
 
256
        if (ductName == NULL)
 
257
        {
 
258
                PUTS("Usage: tcpclo <remote host name>[:<port number>]");
 
259
                return 0;
 
260
        }
 
261
 
 
262
        if (bpAttach() < 0)
 
263
        {
 
264
                putErrmsg("tcpclo can't attach to BP", NULL);
 
265
                return 1;
 
266
        }
 
267
 
 
268
        buffer = MTAKE(TCPCLA_BUFSZ);
 
269
        if (buffer == NULL)
 
270
        {
 
271
                putErrmsg("No memory for TCP buffer in tcpclo.", NULL);
 
272
                return 1;
 
273
        }
 
274
 
 
275
        findOutduct("tcp", ductName, &vduct, &vductElt);
 
276
        if (vductElt == 0)
 
277
        {
 
278
                putErrmsg("No such tcp duct.", ductName);
 
279
                MRELEASE(buffer);
 
280
                return 1;
 
281
        }
 
282
 
 
283
        if (vduct->cloPid != ERROR && vduct->cloPid != sm_TaskIdSelf())
 
284
        {
 
285
                putErrmsg("CLO task is already started for this duct.",
 
286
                                itoa(vduct->cloPid));
 
287
                MRELEASE(buffer);
 
288
                return 1;
 
289
        }
 
290
 
 
291
        /*      All command-line arguments are now validated.           */
 
292
 
 
293
        sdr = getIonsdr();
 
294
        sdr_read(sdr, (char *) &duct, sdr_list_data(sdr, vduct->outductElt),
 
295
                        sizeof(Outduct));
 
296
        sdr_read(sdr, (char *) &protocol, duct.protocol, sizeof(ClProtocol));
 
297
        if (protocol.nominalRate <= 0)
 
298
        {
 
299
                vduct->xmitThrottle.nominalRate = DEFAULT_TCP_RATE;
 
300
        }
 
301
        else
 
302
        {
 
303
                vduct->xmitThrottle.nominalRate = protocol.nominalRate;
 
304
        }
 
305
 
 
306
        memset((char *) outflows, 0, sizeof outflows);
 
307
        outflows[0].outboundBundles = duct.bulkQueue;
 
308
        outflows[1].outboundBundles = duct.stdQueue;
 
309
        outflows[2].outboundBundles = duct.urgentQueue;
 
310
        for (i = 0; i < 3; i++)
 
311
        {
 
312
                outflows[i].svcFactor = 1 << i;
 
313
        }
 
314
 
 
315
        hostName = ductName;
 
316
        parseSocketSpec(ductName, &portNbr, &hostNbr);
 
317
        if (portNbr == 0)
 
318
        {
 
319
                portNbr = BpTcpDefaultPortNbr;
 
320
        }
 
321
 
 
322
        portNbr = htons(portNbr);
 
323
        if (hostNbr == 0)
 
324
        {
 
325
                putErrmsg("Can't get IP address for host.", hostName);
 
326
                MRELEASE(buffer);
 
327
                return 1;
 
328
        }
 
329
 
 
330
        hostNbr = htonl(hostNbr);
 
331
        memset((char *) &socketName, 0, sizeof socketName);
 
332
        inetName = (struct sockaddr_in *) &socketName;
 
333
        inetName->sin_family = AF_INET;
 
334
        inetName->sin_port = portNbr;
 
335
        memcpy((char *) &(inetName->sin_addr.s_addr), (char *) &hostNbr, 4);
 
336
        if (_tcpOutductId(&socketName, "tcp", ductName) < 0)
 
337
        {
 
338
                putErrmsg("Can't record TCP Outduct ID for connection.", NULL);
 
339
                MRELEASE(buffer);
 
340
                return -1;
 
341
        }
 
342
 
 
343
        /*      Set up signal handling.  SIGTERM is shutdown signal.    */
 
344
 
 
345
        tcpcloSemaphore = vduct->semaphore;
 
346
        sm_TaskVarAdd(&tcpcloSemaphore);
 
347
        isignal(SIGTERM, shutDownClo);
 
348
#ifndef mingw
 
349
        isignal(SIGPIPE, handleConnectionLoss);
 
350
#endif
 
351
 
 
352
        /*      Start the keepalive thread for the eventual connection. */
 
353
        
 
354
        tcpDesiredKeepAlivePeriod = KEEPALIVE_PERIOD;
 
355
        parms.cloRunning = &running;
 
356
        pthread_mutex_init(&mutex, NULL);
 
357
        parms.mutex = &mutex;
 
358
        parms.socketName = &socketName;
 
359
        parms.ductSocket = &ductSocket;
 
360
        parms.keepalivePeriod = &keepalivePeriod;
 
361
        if (pthread_create(&keepaliveThread, NULL, sendKeepalives, &parms))
 
362
        {
 
363
                putSysErrmsg("tcpclo can't create keepalive thread", NULL);
 
364
                MRELEASE(buffer);
 
365
                pthread_mutex_destroy(&mutex);
 
366
                return 1;
 
367
        }
 
368
 
 
369
        // Returns the VInduct Object of first induct with same protocol
 
370
        // as the outduct. The VInduct is required to create an acq area.
 
371
        // The Acq Area inturn uses the throttle information from VInduct
 
372
        // object while receiving bundles. The throttle information 
 
373
        // of all inducts of the same induct will be the same, so choosing 
 
374
        // any induct will serve the purpose.
 
375
        
 
376
        findVInduct(&viduct,protocol.name);
 
377
        if(viduct == NULL)
 
378
        {
 
379
                putErrmsg("tcpclo can't get VInduct", NULL);
 
380
                MRELEASE(buffer);
 
381
                pthread_mutex_destroy(&mutex);
 
382
                return 1;
 
383
        
 
384
        }
 
385
 
 
386
        rparms.vduct =  viduct;
 
387
        rparms.bundleSocket = &ductSocket;
 
388
        rparms.mutex = &mutex;
 
389
        rparms.cloRunning = &running;
 
390
        if (pthread_create(&receiverThread, NULL, receiveBundles, &rparms))
 
391
        {
 
392
                putSysErrmsg("tcpclo can't create receive thread", NULL);
 
393
                MRELEASE(buffer);
 
394
                pthread_mutex_destroy(&mutex);
 
395
                return 1;
 
396
        }
 
397
 
 
398
        /*      Can now begin transmitting to remote duct.              */
 
399
 
 
400
        {
 
401
                char    txt[500];
 
402
 
 
403
                isprintf(txt, sizeof(txt),
 
404
                        "[i] tcpclo is running, spec=[%s:%d].", 
 
405
                        inet_ntoa(inetName->sin_addr),
 
406
                        ntohs(inetName->sin_port));
 
407
                writeMemo(txt);
 
408
        }
 
409
 
 
410
        while (running && !(sm_SemEnded(tcpcloSemaphore)))
 
411
        {
 
412
                if (bpDequeue(vduct, outflows, &bundleZco, &extendedCOS,
 
413
                                destDuctName, 1) < 0)
 
414
                {
 
415
                        running = 0;    /*      Terminate CLO.          */
 
416
                        continue;
 
417
                }
 
418
 
 
419
                if (bundleZco == 0)     /*      Interrupted.            */
 
420
                {
 
421
                        continue;
 
422
                }
 
423
 
 
424
                bundleLength = zco_length(sdr, bundleZco);
 
425
                pthread_mutex_lock(&mutex);
 
426
                bytesSent = sendBundleByTCPCL(&socketName, &ductSocket,
 
427
                        bundleLength, bundleZco, buffer, &keepalivePeriod);
 
428
                pthread_mutex_unlock(&mutex);
 
429
                if(bytesSent < 0)
 
430
                {
 
431
                        running = 0;    /*      Terminate CLO.          */
 
432
                }
 
433
 
 
434
                /*      Make sure other tasks have a chance to run.     */
 
435
 
 
436
                sm_TaskYield();
 
437
        }
 
438
 
 
439
        if (sendShutDownMessage(&ductSocket, SHUT_DN_NO, -1, &socketName) < 0)
 
440
        {
 
441
                putErrmsg("Sending Shutdown message failed!!",NULL);
 
442
        }
 
443
 
 
444
        if (ductSocket != -1)
 
445
        {
 
446
                closesocket(ductSocket);
 
447
        }
 
448
 
 
449
        pthread_join(keepaliveThread, NULL);
 
450
        writeMemo("tcpclo keep alive thread killed");
 
451
        running = 0;
 
452
        
 
453
        pthread_join(receiverThread, NULL);
 
454
        writeMemo("tcpclo receiver thread killed");
 
455
 
 
456
        writeErrmsgMemos();
 
457
        writeMemo("[i] tcpclo duct has ended.");
 
458
        oK(_tcpOutductId(&socketName, NULL, NULL));
 
459
        MRELEASE(buffer);
 
460
        pthread_mutex_destroy(&mutex);
 
461
        bp_detach();
 
462
        return 0;
 
463
}