56
# define pthread_yield() sched_yield()
54
60
DEFobjStaticHelpers
57
63
/* forward-definitions */
58
rsRetVal queueChkPersist(queue_t *pThis);
59
static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex);
60
static rsRetVal queueRateLimiter(queue_t *pThis);
61
static int queueChkStopWrkrDA(queue_t *pThis);
62
static int queueIsIdleDA(queue_t *pThis);
63
static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
64
static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2);
65
static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex);
64
rsRetVal qqueueChkPersist(qqueue_t *pThis);
65
static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
66
static rsRetVal qqueueRateLimiter(qqueue_t *pThis);
67
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
68
static int qqueueIsIdleDA(qqueue_t *pThis);
69
static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
70
static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2);
71
static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
67
73
/* some constants for queuePersist () */
68
74
#define QUEUE_CHECKPOINT 1
118
124
* this point in time. The mutex must be locked when
119
125
* ths function is called. -- rgerhards, 2008-01-25
121
static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
127
static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
126
ISOBJ_TYPE_assert(pThis, queue);
132
ISOBJ_TYPE_assert(pThis, qqueue);
128
134
if(!pThis->bEnqOnly) {
129
135
if(pThis->bRunsDA) {
130
136
/* if we have not yet reached the high water mark, there is no need to start a
131
137
* worker. -- rgerhards, 2008-01-26
133
if(queueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
139
if(qqueueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
134
140
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
137
143
if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
140
iMaxWorkers = queueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
146
iMaxWorkers = qqueueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
142
148
wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
178
184
* rgerhards, 2008-01-15
181
queueTurnOffDAMode(queue_t *pThis)
187
qqueueTurnOffDAMode(qqueue_t *pThis)
185
ISOBJ_TYPE_assert(pThis, queue);
191
ISOBJ_TYPE_assert(pThis, qqueue);
186
192
ASSERT(pThis->bRunsDA);
188
194
/* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
189
195
* to wait for its startup... -- rgerhards, 2008-01-25
191
queueWaitDAModeInitialized(pThis);
197
qqueueWaitDAModeInitialized(pThis);
193
199
/* if we need to pull any data that we still need from the (child) disk queue,
194
200
* now would be the time to do so. At present, we do not need this, but I'd like to
207
213
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
208
214
* this will be quick.
210
queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
216
qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
211
217
dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
213
219
/* now we need to check if the regular queue has some messages. This may be the case
214
220
* when it is waiting that the high water mark is reached again. If so, we need to start up
215
221
* a regular worker. -- rgerhards, 2008-01-26
217
if(queueGetOverallQueueSize(pThis) > 0) {
218
queueAdviseMaxWorkers(pThis);
223
if(qqueueGetOverallQueueSize(pThis) > 0) {
224
qqueueAdviseMaxWorkers(pThis);
259
265
* rgerhards, 2008-01-15
262
queueStartDA(queue_t *pThis)
268
qqueueStartDA(qqueue_t *pThis)
265
271
uchar pszDAQName[128];
267
ISOBJ_TYPE_assert(pThis, queue);
273
ISOBJ_TYPE_assert(pThis, qqueue);
269
275
if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */
270
276
FINALIZE; /* ... then we are already done! */
272
278
/* create message queue */
273
CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
279
CHKiRet(qqueueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
275
281
/* give it a name */
276
282
snprintf((char*) pszDAQName, sizeof(pszDAQName)/sizeof(uchar), "%s[DA]", obj.GetName((obj_t*) pThis));
282
288
pThis->pqDA->pqParent = pThis;
284
CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr));
285
CHKiRet(queueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
286
CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
287
CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
288
CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
289
CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
290
CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
291
CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
292
CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
293
CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
294
CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
295
CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
296
CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
290
CHKiRet(qqueueSetpUsr(pThis->pqDA, pThis->pUsr));
291
CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
292
CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
293
CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
294
CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
295
CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
296
CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
297
CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq));
298
CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
299
CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
300
CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
301
CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0));
302
CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0));
297
303
if(pThis->toQShutdown == 0) {
298
CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
304
CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
300
306
/* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND
301
307
* have an obviously large backlog, we can't finish it in any case. So there is no point
302
308
* in holding shutdown longer than necessary. -- rgerhards, 2008-01-15
304
CHKiRet(queueSettoQShutdown(pThis->pqDA, 1));
310
CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1));
307
iRet = queueStart(pThis->pqDA);
313
iRet = qqueueStart(pThis->pqDA);
308
314
/* file not found is expected, that means it is no previous QIF available */
309
315
if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
310
316
FINALIZE; /* something is wrong */
322
328
pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
324
330
dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
325
queueGetID(pThis->pqDA));
331
qqueueGetID(pThis->pqDA));
328
334
if(iRet != RS_RET_OK) {
329
335
if(pThis->pqDA != NULL) {
330
queueDestruct(&pThis->pqDA);
336
qqueueDestruct(&pThis->pqDA);
332
338
dbgoprint((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet);
333
339
pThis->bIsDA = 0;
362
368
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis));
363
369
CHKiRet(wtpConstruct (&pThis->pWtpDA));
364
370
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
365
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA));
366
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA));
367
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA));
368
CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) queueConsumerCancelCleanup));
369
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueStartDA));
370
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueTurnOffDAMode));
371
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
372
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
373
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA));
374
CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup));
375
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
376
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
371
377
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
372
378
CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
373
379
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
421
427
* we need at least one).
423
429
dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n",
424
queueGetOverallQueueSize(pThis));
425
queueAdviseMaxWorkers(pThis);
430
qqueueGetOverallQueueSize(pThis));
431
qqueueAdviseMaxWorkers(pThis);
427
433
/* this is the case when we are currently not running in DA mode. So it is time
428
434
* to turn it back on.
430
436
dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
431
queueGetOverallQueueSize(pThis));
432
queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
437
qqueueGetOverallQueueSize(pThis));
438
qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
754
760
while(iUngottenObjs > 0) {
755
761
/* fill the queue from disk */
756
762
CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL));
757
queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
763
qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
758
764
--iUngottenObjs; /* one less */
761
767
/* and now the stream objects (some order as when persisted!) */
762
768
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF,
763
(rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
769
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
764
770
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF,
765
(rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
771
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
767
773
CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
768
774
CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead));
955
961
* rgerhards, 2008-01-20
958
queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex)
964
qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
961
967
DEFVARS_mutexProtection;
963
ISOBJ_TYPE_assert(pThis, queue);
969
ISOBJ_TYPE_assert(pThis, qqueue);
964
970
ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15
965
971
The second time I noticed it the queue was in destruction with NO worker threads
966
972
running. The pUsr ptr was totally off and provided no clue what it may be pointing
985
991
* rgerhards, 2008-01-29
988
queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr)
994
qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
992
ISOBJ_TYPE_assert(pThis, queue);
998
ISOBJ_TYPE_assert(pThis, qqueue);
993
999
ASSERT(ppUsr != NULL);
995
iRet = queueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
1001
iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
996
1002
--pThis->iUngottenObjs; /* indicate one less */
997
1003
dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr));
1041
1047
* losing the whole process because it loops... -- rgerhards, 2008-01-03
1043
1049
if(pThis->iUngottenObjs > 0) {
1044
iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr);
1050
iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
1046
1052
iRet = pThis->qDel(pThis, pUsr);
1047
--pThis->iQueueSize;
1053
ATOMIC_DEC(pThis->iQueueSize);
1050
1056
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
1065
1071
* complex) if each would have its own shutdown. The function does not self check
1066
1072
* this condition - the caller must make sure it is not called with a parent.
1068
static rsRetVal queueShutdownWorkers(queue_t *pThis)
1074
static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
1071
1077
DEFVARS_mutexProtection;
1072
1078
struct timespec tTimeout;
1073
1079
rsRetVal iRetLocal;
1075
ISOBJ_TYPE_assert(pThis, queue);
1081
ISOBJ_TYPE_assert(pThis, qqueue);
1076
1082
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
1078
1084
dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
1154
1160
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
1155
1161
if(pThis->bRunsDA)
1156
queueWaitDAModeInitialized(pThis);
1162
qqueueWaitDAModeInitialized(pThis);
1158
1164
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
1159
1165
/* optimize parameters for shutdown of DA-enabled queues */
1160
if(pThis->bIsDA && queueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
1166
if(pThis->bIsDA && qqueueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
1161
1167
/* switch to enqueue-only mode so that no more actions happen */
1162
1168
if(pThis->bRunsDA == 0) {
1163
queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
1169
qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
1165
1171
/* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1)
1166
1172
* but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27
1168
queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
1174
qqueueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
1170
1176
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
1171
1177
/* make sure we do not timeout before we are done */
1268
1274
* is done by queueStart(). The reason is that we want to give the caller a chance
1269
1275
* to modify some parameters before the queue is actually started.
1271
rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
1277
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
1272
1278
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*))
1277
1283
ASSERT(ppThis != NULL);
1278
1284
ASSERT(pConsumer != NULL);
1279
1285
ASSERT(iWorkerThreads >= 0);
1281
if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) {
1287
if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) {
1282
1288
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
1285
1291
/* we have an object, so let's fill the properties */
1286
1292
objConstructSetObjInfo(pThis);
1293
pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc();
1287
1294
if((pThis->pszSpoolDir = (uchar*) strdup((char*)glbl.GetWorkDir())) == NULL)
1288
1295
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
1290
1297
/* set some water marks so that we have useful defaults if none are set specifically */
1291
pThis->iFullDlyMrk = (iMaxQueueSize < 100) ? iMaxQueueSize : 100; /* 100 should be far sufficient */
1292
pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 70; /* default 70% */
1298
pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */
1299
pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */
1294
1301
pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir);
1295
1302
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
1341
1348
/* cancellation cleanup handler for queueWorker ()
1342
1349
* Updates admin structure and frees ressources.
1344
* arg1 - user pointer (in this case a queue_t)
1351
* arg1 - user pointer (in this case a qqueue_t)
1345
1352
* arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!])
1346
1353
* Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists!
1347
1354
* rgerhards, 2008-01-16
1349
1356
static rsRetVal
1350
queueConsumerCancelCleanup(void *arg1, void *arg2)
1357
qqueueConsumerCancelCleanup(void *arg1, void *arg2)
1354
queue_t *pThis = (queue_t*) arg1;
1361
qqueue_t *pThis = (qqueue_t*) arg1;
1355
1362
obj_t *pUsr = (obj_t*) arg2;
1357
ISOBJ_TYPE_assert(pThis, queue);
1364
ISOBJ_TYPE_assert(pThis, qqueue);
1359
1366
if(pUsr != NULL) {
1360
1367
/* make sure the data element is not lost */
1361
1368
dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
1362
CHKiRet(queueUngetObj(pThis, pUsr, LOCK_MUTEX));
1369
CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX));
1587
1592
* rgerhards, 2008-01-21
1589
1594
static rsRetVal
1590
queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
1595
qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
1594
ISOBJ_TYPE_assert(pThis, queue);
1599
ISOBJ_TYPE_assert(pThis, qqueue);
1595
1600
ISOBJ_TYPE_assert(pWti, wti);
1597
CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
1602
CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
1598
1603
CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
1600
1605
/* we now need to check if we should deliberately delay processing a bit
1621
1626
* rgerhards, 2008-01-14
1623
1628
static rsRetVal
1624
queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
1629
qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
1628
ISOBJ_TYPE_assert(pThis, queue);
1633
ISOBJ_TYPE_assert(pThis, qqueue);
1629
1634
ISOBJ_TYPE_assert(pWti, wti);
1631
CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
1632
CHKiRet(queueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
1636
CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
1637
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
1635
1640
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
1697
1702
* are not stable! DA queue version
1700
queueIsIdleDA(queue_t *pThis)
1705
qqueueIsIdleDA(qqueue_t *pThis)
1702
1707
/* remember: iQueueSize is the DA queue size, not the main queue! */
1703
1708
/* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
1704
return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
1709
return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
1706
1711
/* must only be called when the queue mutex is locked, else results
1707
1712
* are not stable! Regular queue version
1710
queueIsIdleReg(queue_t *pThis)
1715
qqueueIsIdleReg(qqueue_t *pThis)
1712
1717
#if 0 /* enable for performance testing */
1714
ret = queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk);
1719
ret = qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk);
1715
1720
if(ret) fprintf(stderr, "queue is idle\n");
1718
1723
/* regular code! */
1719
return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
1724
return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
1809
1814
/* call type-specific constructor */
1810
1815
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
1812
dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d starting\n",
1817
dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, "
1818
"full delay %d, light delay %d starting\n",
1813
1819
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
1814
queueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1);
1820
qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1,
1821
pThis->iFullDlyMrk, pThis->iLightDlyMrk);
1816
1823
if(pThis->qType == QUEUETYPE_DIRECT)
1817
1824
FINALIZE; /* with direct queues, we are already finished... */
1822
1829
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
1823
1830
CHKiRet(wtpConstruct (&pThis->pWtpReg));
1824
1831
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
1825
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter));
1826
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg));
1827
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg));
1828
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg));
1829
CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))queueConsumerCancelCleanup));
1830
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrStartup));
1831
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrShutdown));
1832
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter));
1833
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg));
1834
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg));
1835
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg));
1836
CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup));
1837
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup));
1838
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown));
1832
1839
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
1833
1840
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
1834
1841
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
1841
1848
/* If we are disk-assisted, we need to check if there is a QIF file
1842
1849
* which we need to load. -- rgerhards, 2008-01-15
1844
iRetLocal = queueHaveQIF(pThis);
1851
iRetLocal = qqueueHaveQIF(pThis);
1845
1852
if(iRetLocal == RS_RET_OK) {
1846
1853
dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n");
1847
queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
1854
qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
1848
1855
bInitialized = 1; /* we are done */
1850
1857
/* TODO: use logerror? -- rgerhards, 2008-01-16 */
1898
1905
FINALIZE; /* if the queue is empty, we are happy and done... */
1901
dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", queueGetOverallQueueSize(pThis));
1908
dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", qqueueGetOverallQueueSize(pThis));
1903
1910
/* Construct file name */
1904
1911
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
1905
1912
(char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
1907
if((bIsCheckpoint != QUEUE_CHECKPOINT) && (queueGetOverallQueueSize(pThis) == 0)) {
1914
if((bIsCheckpoint != QUEUE_CHECKPOINT) && (qqueueGetOverallQueueSize(pThis) == 0)) {
1908
1915
if(pThis->bNeedDelQIF) {
1909
1916
unlink((char*)pszQIFNam);
1910
1917
pThis->bNeedDelQIF = 0;
1972
1979
* abide to our regular call interface)...
1973
1980
* rgerhards, 2008-01-13
1975
rsRetVal queueChkPersist(queue_t *pThis)
1982
rsRetVal qqueueChkPersist(qqueue_t *pThis)
1979
ISOBJ_TYPE_assert(pThis, queue);
1986
ISOBJ_TYPE_assert(pThis, qqueue);
1981
1988
if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
1982
queuePersist(pThis, QUEUE_CHECKPOINT);
1989
qqueuePersist(pThis, QUEUE_CHECKPOINT);
1983
1990
pThis->iUpdsSincePersist = 0;
2112
2119
* Enqueues the new element and awakes worker thread.
2115
queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
2122
qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
2118
2125
int iCancelStateSave;
2119
2126
struct timespec t;
2121
ISOBJ_TYPE_assert(pThis, queue);
2128
ISOBJ_TYPE_assert(pThis, qqueue);
2130
/* first check if we need to discard this message (which will cause CHKiRet() to exit)
2131
* rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize
2132
* and bRunsDA parameters may not reflect the correct settings here, but they are
2133
* "good enough" in the sense that they can be used to drive the decision. Valgrind's
2134
* threading tools may point this access to be an error, but this is done
2135
* intentional. I do not see this causes problems to us.
2137
CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
2123
2139
/* Please note that this function is not cancel-safe and consequently
2124
2140
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
2161
2174
if(flowCtlType == eFLOWCTL_FULL_DELAY) {
2162
2175
while(pThis->iQueueSize >= pThis->iFullDlyMrk) {
2163
dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayble message - blocking.\n");
2176
dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n");
2164
2177
pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */
2166
2179
} else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) {
2167
2180
if(pThis->iQueueSize >= pThis->iLightDlyMrk) {
2168
dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayble message - blocking a bit.\n");
2181
dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n");
2169
2182
timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */
2170
2183
pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */
2191
2204
/* and finally enqueue the message */
2192
CHKiRet(queueAdd(pThis, pUsr));
2193
queueChkPersist(pThis);
2205
CHKiRet(qqueueAdd(pThis, pUsr));
2206
qqueueChkPersist(pThis);
2196
2209
if(pThis->qType != QUEUETYPE_DIRECT) {
2197
2210
/* make sure at least one worker is running. */
2198
queueAdviseMaxWorkers(pThis);
2199
dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
2211
qqueueAdviseMaxWorkers(pThis);
2200
2212
/* and release the mutex */
2201
2213
d_pthread_mutex_unlock(pThis->mut);
2202
2214
pthread_setcancelstate(iCancelStateSave, NULL);
2215
dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
2216
/* the following pthread_yield is experimental, but brought us performance
2217
* benefit. For details, please see http://kb.monitorware.com/post14216.html#p14216
2218
* rgerhards, 2008-10-09
2219
* but this is only true for uniprocessors, so we guard it with an optimize flag -- rgerhards, 2008-10-22
2221
if(pThis->bOptimizeUniProc)
2261
2281
/* some simple object access methods */
2262
DEFpropSetMeth(queue, iPersistUpdCnt, int)
2263
DEFpropSetMeth(queue, iDeqtWinFromHr, int)
2264
DEFpropSetMeth(queue, iDeqtWinToHr, int)
2265
DEFpropSetMeth(queue, toQShutdown, long)
2266
DEFpropSetMeth(queue, toActShutdown, long)
2267
DEFpropSetMeth(queue, toWrkShutdown, long)
2268
DEFpropSetMeth(queue, toEnq, long)
2269
DEFpropSetMeth(queue, iHighWtrMrk, int)
2270
DEFpropSetMeth(queue, iLowWtrMrk, int)
2271
DEFpropSetMeth(queue, iDiscardMrk, int)
2272
DEFpropSetMeth(queue, iFullDlyMrk, int)
2273
DEFpropSetMeth(queue, iDiscardSeverity, int)
2274
DEFpropSetMeth(queue, bIsDA, int)
2275
DEFpropSetMeth(queue, iMinMsgsPerWrkr, int)
2276
DEFpropSetMeth(queue, bSaveOnShutdown, int)
2277
DEFpropSetMeth(queue, pUsr, void*)
2278
DEFpropSetMeth(queue, iDeqSlowdown, int)
2279
DEFpropSetMeth(queue, sizeOnDiskMax, int64)
2282
DEFpropSetMeth(qqueue, iPersistUpdCnt, int)
2283
DEFpropSetMeth(qqueue, iDeqtWinFromHr, int)
2284
DEFpropSetMeth(qqueue, iDeqtWinToHr, int)
2285
DEFpropSetMeth(qqueue, toQShutdown, long)
2286
DEFpropSetMeth(qqueue, toActShutdown, long)
2287
DEFpropSetMeth(qqueue, toWrkShutdown, long)
2288
DEFpropSetMeth(qqueue, toEnq, long)
2289
DEFpropSetMeth(qqueue, iHighWtrMrk, int)
2290
DEFpropSetMeth(qqueue, iLowWtrMrk, int)
2291
DEFpropSetMeth(qqueue, iDiscardMrk, int)
2292
DEFpropSetMeth(qqueue, iFullDlyMrk, int)
2293
DEFpropSetMeth(qqueue, iDiscardSeverity, int)
2294
DEFpropSetMeth(qqueue, bIsDA, int)
2295
DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
2296
DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
2297
DEFpropSetMeth(qqueue, pUsr, void*)
2298
DEFpropSetMeth(qqueue, iDeqSlowdown, int)
2299
DEFpropSetMeth(qqueue, sizeOnDiskMax, int64)
2282
2302
/* This function can be used as a generic way to set properties. Only the subset
2314
rsRetVal queueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
2334
rsRetVal qqueueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
2316
2336
/* Initialize the stream class. Must be called as the very first method
2317
2337
* before anything else is called inside this class.
2318
2338
* rgerhards, 2008-01-09
2320
BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE)
2340
BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE)
2321
2341
/* request objects we use */
2322
2342
CHKiRet(objUse(glbl, CORE_COMPONENT));
2324
2344
/* now set our own handlers */
2325
OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
2326
ENDObjClassInit(queue)
2345
OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty);
2346
ENDObjClassInit(qqueue)