56
57
/* forward-definitions */
57
58
rsRetVal queueChkPersist(queue_t *pThis);
58
59
static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex);
60
static rsRetVal queueRateLimiter(queue_t *pThis);
59
61
static int queueChkStopWrkrDA(queue_t *pThis);
60
62
static int queueIsIdleDA(queue_t *pThis);
61
63
static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
272
274
CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
273
275
CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
274
276
CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
277
CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
278
CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
275
279
CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
276
280
CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
277
281
if(pThis->toQShutdown == 0) {
1412
1419
* on the nail [exact value]) -- rgerhards, 2008-03-14
1414
1421
if(iQueueSize < pThis->iFullDlyMrk) {
1415
dbgoprint((obj_t*) pThis, "queue size %d below FullDlyMrk %d\n", iQueueSize, pThis->iFullDlyMrk);
1416
1422
pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk);
1419
1425
if(iQueueSize < pThis->iLightDlyMrk) {
1420
dbgoprint((obj_t*) pThis, "queue size %d below LightDlyMrk %d\n", iQueueSize, pThis->iLightDlyMrk);
1421
1426
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
1461
* Here we may wait if a dequeue time window is defined or if we are
1462
* rate-limited. TODO: If we do so, we should also look into the
1463
* way new worker threads are spawned. Obviously, it doesn't make much
1464
* sense to spawn additional worker threads when none of them can do any
1465
* processing. However, it is deemed acceptable to allow this for an initial
1466
* implementation of the timeframe/rate limiting feature.
1467
* Please also note that these feature could also be implemented at the action
1468
* level. However, that would limit them to be used together with actions. We have
1469
* taken the broader approach, moving it right into the queue. This is even
1470
* necessary if we want to prevent spawning of multiple unnecessary worker
1471
* threads as described above. -- rgerhards, 2008-04-02
1474
* time window: tCurr is current time; tFrom is start time, tTo is end time (in mil 24h format).
1475
* We may have tFrom = 4, tTo = 10 --> run from 4 to 10 hrs. nice and happy
1476
* we may also have tFrom= 22, tTo = 4 -> run from 10pm to 4am, which is actually two
1477
* windows: 0-4; 22-23:59
1478
* so when to run? Let's assume we have 3am
1481
* if(tCurr < tTo [3 < 4] || tCurr > tFrom [3 > 22])
1484
* sleep for tFrom - tCurr "hours" [22 - 5 --> 17]
1486
* if(tCurr >= tFrom [3 >= 4] && tCurr < tTo [3 < 10])
1489
* sleep for tTo - tCurr "hours" [4 - 3 --> 1]
1492
* Bottom line: we need to check which type of window we have and need to adjust our
1493
* logic accordingly. Of course, sleep calculations need to be done up to the minute,
1494
* but you get the idea from the code above.
1497
queueRateLimiter(queue_t *pThis)
1505
ISOBJ_TYPE_assert(pThis, queue);
1507
dbgoprint((obj_t*) pThis, "entering rate limiter\n");
1510
if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */
1511
/* time calls are expensive, so only do them when needed */
1513
localtime_r(&tCurr, &m);
1514
iHrCurr = m.tm_hour;
1516
if(pThis->iDeqtWinToHr < pThis->iDeqtWinFromHr) {
1517
if(iHrCurr < pThis->iDeqtWinToHr || iHrCurr > pThis->iDeqtWinFromHr) {
1518
; /* do not delay */
1520
iDelay = (pThis->iDeqtWinFromHr - iHrCurr) * 3600;
1521
/* this time, we are already into the next hour, so we need
1522
* to subtract our current minute and seconds.
1524
iDelay -= m.tm_min * 60;
1528
if(iHrCurr >= pThis->iDeqtWinFromHr && iHrCurr < pThis->iDeqtWinToHr) {
1529
; /* do not delay */
1531
if(iHrCurr < pThis->iDeqtWinFromHr) {
1532
iDelay = (pThis->iDeqtWinFromHr - iHrCurr - 1) * 3600; /* -1 as we are already in the hour */
1533
iDelay += (60 - m.tm_min) * 60;
1534
iDelay += 60 - m.tm_sec;
1536
iDelay = (24 - iHrCurr + pThis->iDeqtWinFromHr) * 3600;
1537
/* this time, we are already into the next hour, so we need
1538
* to subtract our current minute and seconds.
1540
iDelay -= m.tm_min * 60;
1548
dbgoprint((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay);
1454
1557
/* This is the queue consumer in the regular (non-DA) case. It is
1455
1558
* protected by the queue mutex, but MUST release it as soon as possible.
1456
1559
* rgerhards, 2008-01-21
1691
1794
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
1692
1795
CHKiRet(wtpConstruct (&pThis->pWtpReg));
1693
1796
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
1797
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter));
1694
1798
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg));
1695
1799
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg));
1696
1800
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg));
2153
2257
/* some simple object access methods */
2154
2258
DEFpropSetMeth(queue, iPersistUpdCnt, int);
2259
DEFpropSetMeth(queue, iDeqtWinFromHr, int);
2260
DEFpropSetMeth(queue, iDeqtWinToHr, int);
2155
2261
DEFpropSetMeth(queue, toQShutdown, long);
2156
2262
DEFpropSetMeth(queue, toActShutdown, long);
2157
2263
DEFpropSetMeth(queue, toWrkShutdown, long);