254
255
* to be run on multiple threads. So far, this is forbidden by the interface
255
256
* spec. -- rgerhards, 2008-01-30
257
CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
258
CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
258
259
obj.SetName((obj_t*) pThis->pQueue, pszQName);
260
261
/* ... set some properties ... */
267
268
errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
270
queueSetpUsr(pThis->pQueue, pThis);
271
setQPROP(queueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
272
setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
273
setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
274
setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
275
setQPROP(queueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
276
setQPROP(queueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
277
setQPROP(queueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown);
278
setQPROP(queueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq);
279
setQPROP(queueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark);
280
setQPROP(queueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark);
281
setQPROP(queueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark);
282
setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
283
setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
284
setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
285
setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
286
setQPROP(queueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr);
287
setQPROP(queueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr);
271
qqueueSetpUsr(pThis->pQueue, pThis);
272
setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
273
setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
274
setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
275
setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
276
setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
277
setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
278
setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown);
279
setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq);
280
setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark);
281
setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark);
282
setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark);
283
setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
284
setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
285
setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
286
setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
287
setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr);
288
setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr);
290
291
# undef setQPROPstr
499
527
#pragma GCC diagnostic warning "-Wempty-body"
530
/* call the HUP handler for a given action, if such a handler is defined. The
531
* action mutex is locked, because the HUP handler most probably needs to modify
532
* some internal state information.
533
* rgerhards, 2008-10-22
535
#pragma GCC diagnostic ignored "-Wempty-body"
537
actionCallHUPHdlr(action_t *pAction)
540
int iCancelStateSave;
542
ASSERT(pAction != NULL);
543
DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP);
545
if(pAction->pMod->doHUP == NULL) {
546
FINALIZE; /* no HUP handler, so we are done ;) */
549
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
550
d_pthread_mutex_lock(&pAction->mutActExec);
551
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
552
pthread_setcancelstate(iCancelStateSave, NULL);
553
CHKiRet(pAction->pMod->doHUP(pAction->pModData));
554
pthread_cleanup_pop(1); /* unlock mutex */
559
#pragma GCC diagnostic warning "-Wempty-body"
501
562
/* set the action message queue mode
502
563
* TODO: probably move this into queue object, merge with MainMsgQueue!
503
564
* rgerhards, 2008-01-28
598
657
ABORT_FINALIZE(RS_RET_ERR);
660
if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */
661
snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times",
662
pAction->f_prevcount);
664
snprintf((char*)szRepMsg, sizeof(szRepMsg), "message repeated %d times: [%.800s]",
665
pAction->f_prevcount, getMSG(pAction->f_pMsg));
601
668
/* We now need to update the other message properties.
602
669
* ... RAWMSG is a problem ... Please note that digital
603
670
* signatures inside the message are also invalidated.
605
datetime.getCurrTime(&(pMsg->tRcvdAt));
672
datetime.getCurrTime(&(pMsg->tRcvdAt), &(pMsg->ttGenTime));
606
673
memcpy(&pMsg->tTIMESTAMP, &pMsg->tRcvdAt, sizeof(struct syslogTime));
607
674
MsgSetMSG(pMsg, (char*)szRepMsg);
608
675
MsgSetRawMsg(pMsg, (char*)szRepMsg);
625
692
dbgprintf("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n",
626
693
(int) pAction->iSecsExecOnceInterval, (int) getActNow(pAction),
627
694
(int) (pAction->iSecsExecOnceInterval + pAction->tLastExec));
695
pAction->tLastExec = getActNow(pAction); /* re-init time flags */
631
pAction->f_time = pAction->tLastExec = getActNow(pAction); /* re-init time flags */
632
/* Note: tLastExec could be set in the if block above, but f_time causes us a hard time
633
* so far, I do not see a solution to getting rid of it. -- rgerhards, 2008-09-16
699
/* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
700
pAction->f_time = pAction->f_pMsg->ttGenTime;
636
702
/* When we reach this point, we have a valid, non-disabled action.
637
703
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
639
iRet = queueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
705
iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
641
707
if(iRet == RS_RET_OK)
642
708
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
785
851
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL));
786
852
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL));
787
853
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL));
854
CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL));