165
165
ISOBJ_TYPE_assert(pThis, tcpsrv);
166
166
assert(pThis->pSessions == NULL);
168
dbgprintf("Allocating buffer for %d TCP sessions.\n", pThis->iSessMax);
168
DBGPRINTF("Allocating buffer for %d TCP sessions.\n", pThis->iSessMax);
169
169
if((pThis->pSessions = (tcps_sess_t **) calloc(pThis->iSessMax, sizeof(tcps_sess_t *))) == NULL) {
170
dbgprintf("Error: TCPSessInit() could not alloc memory for TCP session table.\n");
170
DBGPRINTF("Error: TCPSessInit() could not alloc memory for TCP session table.\n");
171
171
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
239
239
if(pThis->pSessions != NULL) {
240
240
/* close all TCP connections! */
241
i = TCPSessGetNxtSess(pThis, -1);
243
tcps_sess.Destruct(&pThis->pSessions[i]);
244
/* now get next... */
245
i = TCPSessGetNxtSess(pThis, i);
241
if(!pThis->bUsingEPoll) {
242
i = TCPSessGetNxtSess(pThis, -1);
244
tcps_sess.Destruct(&pThis->pSessions[i]);
245
/* now get next... */
246
i = TCPSessGetNxtSess(pThis, i);
248
250
/* we are done with the session table - so get rid of it... */
468
/* This function is called to gather input. */
471
/* helper to close a session. Takes status of poll vs. select into consideration.
472
* rgerhards, 2009-11-25
474
static inline rsRetVal
475
closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) {
478
CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL));
480
pThis->pOnRegularClose(*ppSess);
481
tcps_sess.Destruct(ppSess);
487
/* process a receive request on one of the streams
488
* If pPoll is non-NULL, we have a netstream in epoll mode, which means we need
489
* to remove any descriptor we close from the epoll set.
490
* rgerhards, 2009-07-020
493
doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
495
char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */
500
ISOBJ_TYPE_assert(pThis, tcpsrv);
501
DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm);
502
/* Receive message */
503
iRet = pThis->pRcvData(*ppSess, buf, sizeof(buf), &iRcvd);
506
if(pThis->bEmitMsgOnClose) {
510
prop.GetString((*ppSess)->fromHostIP, &pszPeer, &lenPeer);
511
errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n",
512
(*ppSess)->pStrm, pszPeer);
514
CHKiRet(closeSess(pThis, ppSess, pPoll));
517
/* we simply ignore retry - this is not an error, but we also have not received anything */
520
/* valid data received, process it! */
521
localRet = tcps_sess.DataRcvd(*ppSess, buf, iRcvd);
522
if(localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL) {
523
/* in this case, something went awfully wrong.
524
* We are instructed to terminate the session.
526
errmsg.LogError(0, localRet, "Tearing down TCP Session - see "
527
"previous messages for reason(s)\n");
528
CHKiRet(closeSess(pThis, ppSess, pPoll));
533
errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n",
535
CHKiRet(closeSess(pThis, ppSess, pPoll));
544
/* This function is called to gather input.
545
* This variant here is only used if we need to work with a netstream driver
546
* that does not support epoll().
469
548
#pragma GCC diagnostic ignored "-Wempty-body"
549
static inline rsRetVal
550
RunSelect(tcpsrv_t *pThis)
508
586
/* wait for io to become ready */
509
587
CHKiRet(nssel.Wait(pSel, &nfds));
588
if(glbl.GetGlobalInputTermState() == 1)
589
break; /* terminate input! */
511
591
for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
592
if(glbl.GetGlobalInputTermState() == 1)
593
ABORT_FINALIZE(RS_RET_FORCE_TERM);
512
594
CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds));
514
dbgprintf("New connect on NSD %p.\n", pThis->ppLstn[i]);
596
DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
515
597
SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
516
598
--nfds; /* indicate we have processed one */
520
602
/* now check the sessions */
521
603
iTCPSess = TCPSessGetNxtSess(pThis, -1);
522
604
while(nfds && iTCPSess != -1) {
605
if(glbl.GetGlobalInputTermState() == 1)
606
ABORT_FINALIZE(RS_RET_FORCE_TERM);
523
607
CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds));
525
char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */
526
dbgprintf("netstream %p with new data\n", pThis->pSessions[iTCPSess]->pStrm);
528
/* Receive message */
529
iRet = pThis->pRcvData(pThis->pSessions[iTCPSess], buf, sizeof(buf), &iRcvd);
532
if(pThis->bEmitMsgOnClose) {
536
prop.GetString(pThis->pSessions[iTCPSess]->fromHostIP, &pszPeer, &lenPeer);
537
errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n",
538
pThis->pSessions[iTCPSess]->pStrm, pszPeer);
540
pThis->pOnRegularClose(pThis->pSessions[iTCPSess]);
541
tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
544
/* we simply ignore retry - this is not an error, but we also have not received anything */
547
/* valid data received, process it! */
548
if(tcps_sess.DataRcvd(pThis->pSessions[iTCPSess], buf, iRcvd) != RS_RET_OK) {
549
/* in this case, something went awfully wrong.
550
* We are instructed to terminate the session.
552
errmsg.LogError(0, NO_ERRCODE, "Tearing down TCP Session %d - see "
553
"previous messages for reason(s)\n", iTCPSess);
554
pThis->pOnErrClose(pThis->pSessions[iTCPSess]);
555
tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
560
errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n",
561
pThis->pSessions[iTCPSess]->pStrm);
562
pThis->pOnErrClose(pThis->pSessions[iTCPSess]);
563
tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
609
doReceive(pThis, &pThis->pSessions[iTCPSess], NULL);
566
610
--nfds; /* indicate we have processed one */
568
612
iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
580
624
/* note that this point is usually not reached */
581
pthread_cleanup_pop(0); /* remove cleanup handler */
625
pthread_cleanup_pop(1); /* remove cleanup handler */
585
629
#pragma GCC diagnostic warning "-Wempty-body"
632
/* This function is called to gather input. It tries doing that via the epoll()
633
* interface. If the driver does not support that, it falls back to calling its
634
* select() equivalent.
635
* rgerhards, 2009-11-18
642
tcps_sess_t *pNewSess;
643
nspoll_t *pPoll = NULL;
647
ISOBJ_TYPE_assert(pThis, tcpsrv);
649
/* this is an endless loop - it is terminated by the framework canelling
650
* this thread. Thus, we also need to instantiate a cancel cleanup handler
651
* to prevent us from leaking anything. -- rgerhards, 20080-04-24
653
if((localRet = nspoll.Construct(&pPoll)) == RS_RET_OK) {
655
localRet = nspoll.ConstructFinalize(pPoll);
657
if(localRet != RS_RET_OK) {
658
/* fall back to select */
659
dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet);
660
iRet = RunSelect(pThis);
664
dbgprintf("tcpsrv uses epoll() interface, nsdpol driver found\n");
666
/* flag that we are in epoll mode */
667
pThis->bUsingEPoll = TRUE;
669
/* Add the TCP listen sockets to the list of sockets to monitor */
670
for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
671
dbgprintf("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn);
672
CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD));
673
dbgprintf("Added listener %d\n", i);
677
localRet = nspoll.Wait(pPoll, -1, &i, &pUsr);
678
if(glbl.GetGlobalInputTermState() == 1)
679
break; /* terminate input! */
681
/* check if we need to ignore the i/o ready state. We do this if we got an invalid
682
* return state. Validly, this can happen for RS_RET_EINTR, for other cases it may
683
* not be the right thing, but what is the right thing is really hard at this point...
685
if(localRet != RS_RET_OK)
688
dbgprintf("poll returned with i %d, pUsr %p\n", i, pUsr);
690
if(pUsr == pThis->ppLstn) {
691
DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
692
SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
693
CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
694
DBGPRINTF("New session created with NSD %p.\n", pNewSess);
696
pNewSess = (tcps_sess_t*) pUsr;
697
doReceive(pThis, &pNewSess, pPoll);
701
/* remove the tcp listen sockets from the epoll set */
702
for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
703
CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_DEL));
708
nspoll.Destruct(&pPoll);
588
713
/* Standard-Constructor */
589
714
BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */
590
715
pThis->iSessMax = TCPSESS_MAX_DEFAULT;
591
716
pThis->iLstnMax = TCPLSTN_MAX_DEFAULT;
592
717
pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
718
pThis->bDisableLFDelim = 0;
593
719
pThis->OnMsgReceive = NULL;
594
720
ENDobjConstruct(tcpsrv)