~ubuntu-branches/ubuntu/lucid/rsyslog/lucid-updates

« back to all changes in this revision

Viewing changes to runtime/queue.c

  • Committer: Bazaar Package Importer
  • Author(s): Michael Biebl
  • Date: 2009-06-23 12:12:43 UTC
  • mfrom: (1.1.11 upstream) (3.2.8 sid)
  • Revision ID: james.westby@ubuntu.com-20090623121243-d2fejarzidywnn17
Tags: 4.2.0-1
* New upstream release of the now stable v4 branch.
  - Fix warnings when /etc/rsyslog.d/ is empty. Closes: #530228
* debian/patches/imudp_multiple_udp_sockets.patch
  - Removed, merged upstream.
* debian/rsyslog.default
  - Set default compat mode to '4'.
* debian/rsyslog.logcheck.ignore.server
  - Update logcheck rules files to also ignore rsyslogd and imklog stop
    messages.
* debian/control
  - Bump Standards-Version to 3.8.2. No further changes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
49
49
#include "obj.h"
50
50
#include "wtp.h"
51
51
#include "wti.h"
 
52
#include "atomic.h"
 
53
 
 
54
#ifdef OS_SOLARIS
 
55
#       include <sched.h>
 
56
#       define pthread_yield() sched_yield()
 
57
#endif
52
58
 
53
59
/* static data */
54
60
DEFobjStaticHelpers
55
61
DEFobjCurrIf(glbl)
56
62
 
57
63
/* forward-definitions */
58
 
rsRetVal queueChkPersist(queue_t *pThis);
59
 
static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex);
60
 
static rsRetVal queueRateLimiter(queue_t *pThis);
61
 
static int queueChkStopWrkrDA(queue_t *pThis);
62
 
static int queueIsIdleDA(queue_t *pThis);
63
 
static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
64
 
static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2);
65
 
static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex);
 
64
rsRetVal qqueueChkPersist(qqueue_t *pThis);
 
65
static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
 
66
static rsRetVal qqueueRateLimiter(qqueue_t *pThis);
 
67
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
 
68
static int qqueueIsIdleDA(qqueue_t *pThis);
 
69
static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
 
70
static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2);
 
71
static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
66
72
 
67
73
/* some constants for queuePersist () */
68
74
#define QUEUE_CHECKPOINT        1
76
82
 * rgerhards, 2008-01-29
77
83
 */
78
84
static inline int
79
 
queueGetOverallQueueSize(queue_t *pThis)
 
85
qqueueGetOverallQueueSize(qqueue_t *pThis)
80
86
{
81
87
#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */
82
88
BEGINfunc
95
101
 * This function returns void, as it makes no sense to communicate an error back, even if
96
102
 * it happens.
97
103
 */
98
 
static inline void queueDrain(queue_t *pThis)
 
104
static inline void queueDrain(qqueue_t *pThis)
99
105
{
100
106
        void *pUsr;
101
107
        
118
124
 * this point in time. The mutex must be locked when
119
125
 * ths function is called. -- rgerhards, 2008-01-25
120
126
 */
121
 
static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
 
127
static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
122
128
{
123
129
        DEFiRet;
124
130
        int iMaxWorkers;
125
131
 
126
 
        ISOBJ_TYPE_assert(pThis, queue);
 
132
        ISOBJ_TYPE_assert(pThis, qqueue);
127
133
 
128
134
        if(!pThis->bEnqOnly) {
129
135
                if(pThis->bRunsDA) {
130
136
                        /* if we have not yet reached the high water mark, there is no need to start a
131
137
                         * worker. -- rgerhards, 2008-01-26
132
138
                         */
133
 
                        if(queueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
 
139
                        if(qqueueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
134
140
                                wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
135
141
                        }
136
142
                } else {
137
143
                        if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
138
144
                                iMaxWorkers = 1;
139
145
                        } else {
140
 
                                iMaxWorkers = queueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
 
146
                                iMaxWorkers = qqueueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
141
147
                        }
142
148
                        wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
143
149
                }
152
158
 * rgerhards, 2008-02-27
153
159
 */
154
160
static rsRetVal
155
 
queueWaitDAModeInitialized(queue_t *pThis)
 
161
qqueueWaitDAModeInitialized(qqueue_t *pThis)
156
162
{
157
163
        DEFiRet;
158
164
 
159
 
        ISOBJ_TYPE_assert(pThis, queue);
 
165
        ISOBJ_TYPE_assert(pThis, qqueue);
160
166
        ASSERT(pThis->bRunsDA);
161
167
 
162
168
        while(pThis->bRunsDA != 2) {
178
184
 * rgerhards, 2008-01-15
179
185
 */
180
186
static rsRetVal
181
 
queueTurnOffDAMode(queue_t *pThis)
 
187
qqueueTurnOffDAMode(qqueue_t *pThis)
182
188
{
183
189
        DEFiRet;
184
190
 
185
 
        ISOBJ_TYPE_assert(pThis, queue);
 
191
        ISOBJ_TYPE_assert(pThis, qqueue);
186
192
        ASSERT(pThis->bRunsDA);
187
193
 
188
194
        /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
189
195
         * to wait for its startup... -- rgerhards, 2008-01-25
190
196
         */
191
 
        queueWaitDAModeInitialized(pThis);
 
197
        qqueueWaitDAModeInitialized(pThis);
192
198
 
193
199
        /* if we need to pull any data that we still need from the (child) disk queue,
194
200
         * now would be the time to do so. At present, we do not need this, but I'd like to
207
213
                /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
208
214
                 * this will be quick.
209
215
                 */
210
 
                queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
 
216
                qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
211
217
                dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
212
218
                          iRet);
213
219
                /* now we need to check if the regular queue has some messages. This may be the case
214
220
                 * when it is waiting that the high water mark is reached again. If so, we need to start up
215
221
                 * a regular worker. -- rgerhards, 2008-01-26
216
222
                 */
217
 
                if(queueGetOverallQueueSize(pThis) > 0) {
218
 
                        queueAdviseMaxWorkers(pThis);
 
223
                if(qqueueGetOverallQueueSize(pThis) > 0) {
 
224
                        qqueueAdviseMaxWorkers(pThis);
219
225
                }
220
226
        }
221
227
 
231
237
 * rgerhards, 2008-01-14
232
238
 */
233
239
static rsRetVal
234
 
queueChkIsDA(queue_t *pThis)
 
240
qqueueChkIsDA(qqueue_t *pThis)
235
241
{
236
242
        DEFiRet;
237
243
 
238
 
        ISOBJ_TYPE_assert(pThis, queue);
 
244
        ISOBJ_TYPE_assert(pThis, qqueue);
239
245
        if(pThis->pszFilePrefix != NULL) {
240
246
                pThis->bIsDA = 1;
241
247
                dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n");
259
265
 * rgerhards, 2008-01-15
260
266
 */
261
267
static rsRetVal
262
 
queueStartDA(queue_t *pThis)
 
268
qqueueStartDA(qqueue_t *pThis)
263
269
{
264
270
        DEFiRet;
265
271
        uchar pszDAQName[128];
266
272
 
267
 
        ISOBJ_TYPE_assert(pThis, queue);
 
273
        ISOBJ_TYPE_assert(pThis, qqueue);
268
274
 
269
275
        if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */
270
276
                FINALIZE;       /* ... then we are already done! */
271
277
 
272
278
        /* create message queue */
273
 
        CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
 
279
        CHKiRet(qqueueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
274
280
 
275
281
        /* give it a name */
276
282
        snprintf((char*) pszDAQName, sizeof(pszDAQName)/sizeof(uchar), "%s[DA]", obj.GetName((obj_t*) pThis));
281
287
         */
282
288
        pThis->pqDA->pqParent = pThis;
283
289
 
284
 
        CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr));
285
 
        CHKiRet(queueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
286
 
        CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
287
 
        CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
288
 
        CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
289
 
        CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
290
 
        CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
291
 
        CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
292
 
        CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
293
 
        CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
294
 
        CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
295
 
        CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
296
 
        CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
 
290
        CHKiRet(qqueueSetpUsr(pThis->pqDA, pThis->pUsr));
 
291
        CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
 
292
        CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
 
293
        CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
 
294
        CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
 
295
        CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
 
296
        CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
 
297
        CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq));
 
298
        CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
 
299
        CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
 
300
        CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
 
301
        CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0));
 
302
        CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0));
297
303
        if(pThis->toQShutdown == 0) {
298
 
                CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
 
304
                CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
299
305
        } else {
300
306
                /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND
301
307
                 * have an obviously large backlog, we can't finish it in any case. So there is no point
302
308
                 * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15
303
309
                 */
304
 
                CHKiRet(queueSettoQShutdown(pThis->pqDA, 1));
 
310
                CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1));
305
311
        }
306
312
 
307
 
        iRet = queueStart(pThis->pqDA);
 
313
        iRet = qqueueStart(pThis->pqDA);
308
314
        /* file not found is expected, that means it is no previous QIF available */
309
315
        if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
310
316
                FINALIZE; /* something is wrong */
322
328
        pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
323
329
 
324
330
        dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
325
 
                  queueGetID(pThis->pqDA));
 
331
                  qqueueGetID(pThis->pqDA));
326
332
 
327
333
finalize_it:
328
334
        if(iRet != RS_RET_OK) {
329
335
                if(pThis->pqDA != NULL) {
330
 
                        queueDestruct(&pThis->pqDA);
 
336
                        qqueueDestruct(&pThis->pqDA);
331
337
                }
332
338
                dbgoprint((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet);
333
339
                pThis->bIsDA = 0;
344
350
 * rgerhards, 2008-01-16
345
351
 */
346
352
static inline rsRetVal
347
 
queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
 
353
qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
348
354
{
349
355
        DEFiRet;
350
356
        DEFVARS_mutexProtection;
362
368
                lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis));
363
369
                CHKiRet(wtpConstruct            (&pThis->pWtpDA));
364
370
                CHKiRet(wtpSetDbgHdr            (pThis->pWtpDA, pszBuf, lenBuf));
365
 
                CHKiRet(wtpSetpfChkStopWrkr     (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA));
366
 
                CHKiRet(wtpSetpfIsIdle          (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA));
367
 
                CHKiRet(wtpSetpfDoWork          (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA));
368
 
                CHKiRet(wtpSetpfOnWorkerCancel  (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) queueConsumerCancelCleanup));
369
 
                CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueStartDA));
370
 
                CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueTurnOffDAMode));
 
371
                CHKiRet(wtpSetpfChkStopWrkr     (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
 
372
                CHKiRet(wtpSetpfIsIdle          (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
 
373
                CHKiRet(wtpSetpfDoWork          (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA));
 
374
                CHKiRet(wtpSetpfOnWorkerCancel  (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup));
 
375
                CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
 
376
                CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
371
377
                CHKiRet(wtpSetpmutUsr           (pThis->pWtpDA, pThis->mut));
372
378
                CHKiRet(wtpSetpcondBusy         (pThis->pWtpDA, &pThis->notEmpty));
373
379
                CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
400
406
 * rgerhards, 2008-01-14
401
407
 */
402
408
static inline rsRetVal
403
 
queueChkStrtDA(queue_t *pThis)
 
409
qqueueChkStrtDA(qqueue_t *pThis)
404
410
{
405
411
        DEFiRet;
406
412
 
407
 
        ISOBJ_TYPE_assert(pThis, queue);
 
413
        ISOBJ_TYPE_assert(pThis, qqueue);
408
414
 
409
415
        /* if we do not hit the high water mark, we have nothing to do */
410
 
        if(queueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk)
 
416
        if(qqueueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk)
411
417
                ABORT_FINALIZE(RS_RET_OK);
412
418
 
413
419
        if(pThis->bRunsDA) {
421
427
                 * we need at least one).
422
428
                 */
423
429
                dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n",
424
 
                          queueGetOverallQueueSize(pThis));
425
 
                queueAdviseMaxWorkers(pThis);
 
430
                          qqueueGetOverallQueueSize(pThis));
 
431
                qqueueAdviseMaxWorkers(pThis);
426
432
        } else {
427
433
                /* this is the case when we are currently not running in DA mode. So it is time
428
434
                 * to turn it back on.
429
435
                 */
430
436
                dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
431
 
                          queueGetOverallQueueSize(pThis));
432
 
                queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
 
437
                          qqueueGetOverallQueueSize(pThis));
 
438
                qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
433
439
        }
434
440
 
435
441
finalize_it:
447
453
 */
448
454
 
449
455
/* -------------------- fixed array -------------------- */
450
 
static rsRetVal qConstructFixedArray(queue_t *pThis)
 
456
static rsRetVal qConstructFixedArray(qqueue_t *pThis)
451
457
{
452
458
        DEFiRet;
453
459
 
463
469
        pThis->tVars.farray.head = 0;
464
470
        pThis->tVars.farray.tail = 0;
465
471
 
466
 
        queueChkIsDA(pThis);
 
472
        qqueueChkIsDA(pThis);
467
473
 
468
474
finalize_it:
469
475
        RETiRet;
470
476
}
471
477
 
472
478
 
473
 
static rsRetVal qDestructFixedArray(queue_t *pThis)
 
479
static rsRetVal qDestructFixedArray(qqueue_t *pThis)
474
480
{
475
481
        DEFiRet;
476
482
        
485
491
}
486
492
 
487
493
 
488
 
static rsRetVal qAddFixedArray(queue_t *pThis, void* in)
 
494
static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
489
495
{
490
496
        DEFiRet;
491
497
 
498
504
        RETiRet;
499
505
}
500
506
 
501
 
static rsRetVal qDelFixedArray(queue_t *pThis, void **out)
 
507
static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out)
502
508
{
503
509
        DEFiRet;
504
510
 
517
523
 
518
524
/* first some generic functions which are also used for the unget linked list */
519
525
 
520
 
static inline rsRetVal queueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr)
 
526
static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr)
521
527
{
522
528
        DEFiRet;
523
529
        qLinkedList_t *pEntry;
543
549
        RETiRet;
544
550
}
545
551
 
546
 
static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr)
 
552
static inline rsRetVal qqueueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr)
547
553
{
548
554
        DEFiRet;
549
555
        qLinkedList_t *pEntry;
570
576
/* end generic functions which are also used for the unget linked list */
571
577
 
572
578
 
573
 
static rsRetVal qConstructLinkedList(queue_t *pThis)
 
579
static rsRetVal qConstructLinkedList(qqueue_t *pThis)
574
580
{
575
581
        DEFiRet;
576
582
 
579
585
        pThis->tVars.linklist.pRoot = 0;
580
586
        pThis->tVars.linklist.pLast = 0;
581
587
 
582
 
        queueChkIsDA(pThis);
 
588
        qqueueChkIsDA(pThis);
583
589
 
584
590
        RETiRet;
585
591
}
586
592
 
587
593
 
588
 
static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis)
 
594
static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis)
589
595
{
590
596
        DEFiRet;
591
597
 
598
604
        RETiRet;
599
605
}
600
606
 
601
 
static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr)
 
607
static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
602
608
{
603
609
        DEFiRet;
604
610
 
605
 
        iRet = queueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
 
611
        iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
606
612
#if 0
607
613
        qLinkedList_t *pEntry;
608
614
 
626
632
        RETiRet;
627
633
}
628
634
 
629
 
static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr)
 
635
static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr)
630
636
{
631
637
        DEFiRet;
632
 
        iRet = queueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
 
638
        iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
633
639
#if 0
634
640
        qLinkedList_t *pEntry;
635
641
 
656
662
 
657
663
 
658
664
static rsRetVal
659
 
queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t __attribute__((unused)) *pThis)
 
665
qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pThis)
660
666
{
661
667
        DEFiRet;
662
668
        ISOBJ_TYPE_assert(pStrm, strm);
663
 
        ISOBJ_TYPE_assert(pThis, queue);
 
669
        ISOBJ_TYPE_assert(pThis, qqueue);
664
670
        CHKiRet(strmSetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
665
671
finalize_it:
666
672
        RETiRet;
672
678
 * rgerhards, 2008-01-15
673
679
 */
674
680
static rsRetVal 
675
 
queueHaveQIF(queue_t *pThis)
 
681
qqueueHaveQIF(qqueue_t *pThis)
676
682
{
677
683
        DEFiRet;
678
684
        uchar pszQIFNam[MAXFNAME];
679
685
        size_t lenQIFNam;
680
686
        struct stat stat_buf;
681
687
 
682
 
        ISOBJ_TYPE_assert(pThis, queue);
 
688
        ISOBJ_TYPE_assert(pThis, qqueue);
683
689
 
684
690
        if(pThis->pszFilePrefix == NULL)
685
691
                ABORT_FINALIZE(RS_RET_NO_FILEPREFIX);
709
715
 * rgerhards, 2008-01-11
710
716
 */
711
717
static rsRetVal 
712
 
queueTryLoadPersistedInfo(queue_t *pThis)
 
718
qqueueTryLoadPersistedInfo(qqueue_t *pThis)
713
719
{
714
720
        DEFiRet;
715
721
        strm_t *psQIF = NULL;
719
725
        int iUngottenObjs;
720
726
        obj_t *pUsr;
721
727
 
722
 
        ISOBJ_TYPE_assert(pThis, queue);
 
728
        ISOBJ_TYPE_assert(pThis, qqueue);
723
729
 
724
730
        /* Construct file name */
725
731
        lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
754
760
        while(iUngottenObjs > 0) {
755
761
                /* fill the queue from disk */
756
762
                CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL));
757
 
                queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
 
763
                qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
758
764
                --iUngottenObjs; /* one less */
759
765
        }
760
766
 
761
767
        /* and now the stream objects (some order as when persisted!) */
762
768
        CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF,
763
 
                               (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
 
769
                               (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
764
770
        CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF,
765
 
                               (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
 
771
                               (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
766
772
 
767
773
        CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
768
774
        CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead));
792
798
 * allowed file size at this point - that should be a config setting...
793
799
 * rgerhards, 2008-01-10
794
800
 */
795
 
static rsRetVal qConstructDisk(queue_t *pThis)
 
801
static rsRetVal qConstructDisk(qqueue_t *pThis)
796
802
{
797
803
        DEFiRet;
798
804
        int bRestarted = 0;
800
806
        ASSERT(pThis != NULL);
801
807
 
802
808
        /* and now check if there is some persistent information that needs to be read in */
803
 
        iRet = queueTryLoadPersistedInfo(pThis);
 
809
        iRet = qqueueTryLoadPersistedInfo(pThis);
804
810
        if(iRet == RS_RET_OK)
805
811
                bRestarted = 1;
806
812
        else if(iRet != RS_RET_FILE_NOT_FOUND)
842
848
}
843
849
 
844
850
 
845
 
static rsRetVal qDestructDisk(queue_t *pThis)
 
851
static rsRetVal qDestructDisk(qqueue_t *pThis)
846
852
{
847
853
        DEFiRet;
848
854
        
854
860
        RETiRet;
855
861
}
856
862
 
857
 
static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
 
863
static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
858
864
{
859
865
        DEFiRet;
860
866
        number_t nWriteCount;
881
887
        RETiRet;
882
888
}
883
889
 
884
 
static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
 
890
static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr)
885
891
{
886
892
        DEFiRet;
887
893
 
912
918
}
913
919
 
914
920
/* -------------------- direct (no queueing) -------------------- */
915
 
static rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis)
916
 
{
917
 
        return RS_RET_OK;
918
 
}
919
 
 
920
 
 
921
 
static rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis)
922
 
{
923
 
        return RS_RET_OK;
924
 
}
925
 
 
926
 
static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
 
921
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
 
922
{
 
923
        return RS_RET_OK;
 
924
}
 
925
 
 
926
 
 
927
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
 
928
{
 
929
        return RS_RET_OK;
 
930
}
 
931
 
 
932
static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
927
933
{
928
934
        DEFiRet;
929
935
 
940
946
        RETiRet;
941
947
}
942
948
 
943
 
static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
 
949
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
944
950
{
945
951
        return RS_RET_OK;
946
952
}
955
961
 * rgerhards, 2008-01-20
956
962
 */
957
963
static rsRetVal
958
 
queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex)
 
964
qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
959
965
{
960
966
        DEFiRet;
961
967
        DEFVARS_mutexProtection;
962
968
 
963
 
        ISOBJ_TYPE_assert(pThis, queue);
 
969
        ISOBJ_TYPE_assert(pThis, qqueue);
964
970
        ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15
965
971
                               The second time I noticed it the queue was in destruction with NO worker threads
966
972
                               running. The pUsr ptr was totally off and provided no clue what it may be pointing
969
975
 
970
976
        dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr));
971
977
        BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
972
 
        iRet = queueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
 
978
        iRet = qqueueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
973
979
        ++pThis->iUngottenObjs; /* indicate one more */
974
980
        END_MTX_PROTECTED_OPERATIONS(pThis->mut);
975
981
 
985
991
 * rgerhards, 2008-01-29
986
992
 */
987
993
static rsRetVal
988
 
queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr)
 
994
qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
989
995
{
990
996
        DEFiRet;
991
997
 
992
 
        ISOBJ_TYPE_assert(pThis, queue);
 
998
        ISOBJ_TYPE_assert(pThis, qqueue);
993
999
        ASSERT(ppUsr != NULL);
994
1000
 
995
 
        iRet = queueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
 
1001
        iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
996
1002
        --pThis->iUngottenObjs; /* indicate one less */
997
1003
        dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr));
998
1004
 
1006
1012
 * things truely different. -- rgerhards, 2008-02-12
1007
1013
 */
1008
1014
static rsRetVal
1009
 
queueAdd(queue_t *pThis, void *pUsr)
 
1015
qqueueAdd(qqueue_t *pThis, void *pUsr)
1010
1016
{
1011
1017
        DEFiRet;
1012
1018
 
1015
1021
        CHKiRet(pThis->qAdd(pThis, pUsr));
1016
1022
 
1017
1023
        if(pThis->qType != QUEUETYPE_DIRECT) {
1018
 
                ++pThis->iQueueSize;
 
1024
                ATOMIC_INC(pThis->iQueueSize);
1019
1025
                dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize);
1020
1026
        }
1021
1027
 
1029
1035
 * ungotten list and, if so, dequeue it first.
1030
1036
 */
1031
1037
static rsRetVal
1032
 
queueDel(queue_t *pThis, void *pUsr)
 
1038
qqueueDel(qqueue_t *pThis, void *pUsr)
1033
1039
{
1034
1040
        DEFiRet;
1035
1041
 
1041
1047
         * losing the whole process because it loops... -- rgerhards, 2008-01-03
1042
1048
         */
1043
1049
        if(pThis->iUngottenObjs > 0) {
1044
 
                iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr);
 
1050
                iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
1045
1051
        } else {
1046
1052
                iRet = pThis->qDel(pThis, pUsr);
1047
 
                --pThis->iQueueSize;
 
1053
                ATOMIC_DEC(pThis->iQueueSize);
1048
1054
        }
1049
1055
 
1050
1056
        dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
1065
1071
 * complex) if each would have its own shutdown. The function does not self check
1066
1072
 * this condition - the caller must make sure it is not called with a parent.
1067
1073
 */
1068
 
static rsRetVal queueShutdownWorkers(queue_t *pThis)
 
1074
static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
1069
1075
{
1070
1076
        DEFiRet;
1071
1077
        DEFVARS_mutexProtection;
1072
1078
        struct timespec tTimeout;
1073
1079
        rsRetVal iRetLocal;
1074
1080
 
1075
 
        ISOBJ_TYPE_assert(pThis, queue);
 
1081
        ISOBJ_TYPE_assert(pThis, qqueue);
1076
1082
        ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
1077
1083
 
1078
1084
        dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
1086
1092
 
1087
1093
        /* first try to shutdown the queue within the regular shutdown period */
1088
1094
        BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
1089
 
        if(queueGetOverallQueueSize(pThis) > 0) {
 
1095
        if(qqueueGetOverallQueueSize(pThis) > 0) {
1090
1096
                if(pThis->bRunsDA) {
1091
1097
                        /* We may have waited on the low water mark. As it may have changed, we
1092
1098
                         * see if we reactivate the worker.
1124
1130
                if(pThis->bRunsDA) {
1125
1131
                        END_MTX_PROTECTED_OPERATIONS(pThis->mut);
1126
1132
                        dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
1127
 
                                 queueGetID(pThis->pqDA));
 
1133
                                 qqueueGetID(pThis->pqDA));
1128
1134
                        /* we use the same absolute timeout as above, so we do not use more than the configured
1129
1135
                         * timeout interval!
1130
1136
                         */
1153
1159
 
1154
1160
        /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
1155
1161
        if(pThis->bRunsDA)
1156
 
                queueWaitDAModeInitialized(pThis);
 
1162
                qqueueWaitDAModeInitialized(pThis);
1157
1163
 
1158
1164
        BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
1159
1165
        /* optimize parameters for shutdown of DA-enabled queues */
1160
 
        if(pThis->bIsDA && queueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
 
1166
        if(pThis->bIsDA && qqueueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
1161
1167
                /* switch to enqueue-only mode so that no more actions happen */
1162
1168
                if(pThis->bRunsDA == 0) {
1163
 
                        queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
 
1169
                        qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
1164
1170
                } else {
1165
1171
                        /* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1)
1166
1172
                         * but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27
1167
1173
                         */
1168
 
                        queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
 
1174
                        qqueueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
1169
1175
                }
1170
1176
                END_MTX_PROTECTED_OPERATIONS(pThis->mut);
1171
1177
                /* make sure we do not timeout before we are done */
1187
1193
         * they will automatically terminate as there no longer is any message left to process.
1188
1194
         */
1189
1195
        BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
1190
 
        if(queueGetOverallQueueSize(pThis) > 0) {
 
1196
        if(qqueueGetOverallQueueSize(pThis) > 0) {
1191
1197
                timeoutComp(&tTimeout, pThis->toActShutdown);
1192
1198
                if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
1193
1199
                        END_MTX_PROTECTED_OPERATIONS(pThis->mut);
1256
1262
         * Well, more precisely, they *are in termination*. Some cancel cleanup handlers
1257
1263
         * may still be running. 
1258
1264
         */
1259
 
        dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", queueGetOverallQueueSize(pThis));
 
1265
        dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", qqueueGetOverallQueueSize(pThis));
1260
1266
 
1261
1267
        RETiRet;
1262
1268
}
1268
1274
 * is done by queueStart(). The reason is that we want to give the caller a chance
1269
1275
 * to modify some parameters before the queue is actually started.
1270
1276
 */
1271
 
rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
 
1277
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
1272
1278
                        int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*))
1273
1279
{
1274
1280
        DEFiRet;
1275
 
        queue_t *pThis;
 
1281
        qqueue_t *pThis;
1276
1282
 
1277
1283
        ASSERT(ppThis != NULL);
1278
1284
        ASSERT(pConsumer != NULL);
1279
1285
        ASSERT(iWorkerThreads >= 0);
1280
1286
 
1281
 
        if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) {
 
1287
        if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) {
1282
1288
                ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
1283
1289
        }
1284
1290
 
1285
1291
        /* we have an object, so let's fill the properties */
1286
1292
        objConstructSetObjInfo(pThis);
 
1293
        pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc();
1287
1294
        if((pThis->pszSpoolDir = (uchar*) strdup((char*)glbl.GetWorkDir())) == NULL)
1288
1295
                ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
1289
1296
 
1290
1297
        /* set some water marks so that we have useful defaults if none are set specifically */
1291
 
        pThis->iFullDlyMrk = (iMaxQueueSize < 100) ? iMaxQueueSize : 100; /* 100 should be far sufficient */
1292
 
        pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 70; /* default 70% */
 
1298
        pThis->iFullDlyMrk  = iMaxQueueSize - (iMaxQueueSize / 100) *  3; /* default 97% */
 
1299
        pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */
1293
1300
 
1294
1301
        pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir);
1295
1302
        pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
1314
1321
                        pThis->qConstruct = qConstructLinkedList;
1315
1322
                        pThis->qDestruct = qDestructLinkedList;
1316
1323
                        pThis->qAdd = qAddLinkedList;
1317
 
                        pThis->qDel = (rsRetVal (*)(queue_t*,void**)) qDelLinkedList;
 
1324
                        pThis->qDel = (rsRetVal (*)(qqueue_t*,void**)) qDelLinkedList;
1318
1325
                        break;
1319
1326
                case QUEUETYPE_DISK:
1320
1327
                        pThis->qConstruct = qConstructDisk;
1341
1348
/* cancellation cleanup handler for queueWorker ()
1342
1349
 * Updates admin structure and frees ressources.
1343
1350
 * Params:
1344
 
 * arg1 - user pointer (in this case a queue_t)
 
1351
 * arg1 - user pointer (in this case a qqueue_t)
1345
1352
 * arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!])
1346
1353
 * Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists!
1347
1354
 * rgerhards, 2008-01-16
1348
1355
 */
1349
1356
static rsRetVal
1350
 
queueConsumerCancelCleanup(void *arg1, void *arg2)
 
1357
qqueueConsumerCancelCleanup(void *arg1, void *arg2)
1351
1358
{
1352
1359
        DEFiRet;
1353
1360
 
1354
 
        queue_t *pThis = (queue_t*) arg1;
 
1361
        qqueue_t *pThis = (qqueue_t*) arg1;
1355
1362
        obj_t *pUsr = (obj_t*) arg2;
1356
1363
 
1357
 
        ISOBJ_TYPE_assert(pThis, queue);
 
1364
        ISOBJ_TYPE_assert(pThis, qqueue);
1358
1365
 
1359
1366
        if(pUsr != NULL) {
1360
1367
                /* make sure the data element is not lost */
1361
1368
                dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
1362
 
                CHKiRet(queueUngetObj(pThis, pUsr, LOCK_MUTEX));
 
1369
                CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX));
1363
1370
        }
1364
1371
        
1365
1372
finalize_it:
1381
1388
 * the return state!
1382
1389
 * rgerhards, 2008-01-24
1383
1390
 */
1384
 
static int queueChkDiscardMsg(queue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr)
 
1391
static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr)
1385
1392
{
1386
1393
        DEFiRet;
1387
1394
        rsRetVal iRetLocal;
1388
1395
        int iSeverity;
1389
1396
 
1390
 
        ISOBJ_TYPE_assert(pThis, queue);
 
1397
        ISOBJ_TYPE_assert(pThis, qqueue);
1391
1398
        ISOBJ_assert(pUsr);
1392
1399
 
1393
1400
        if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) {
1412
1419
 * rgerhards, 2008-10-21
1413
1420
 */
1414
1421
static rsRetVal
1415
 
queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
 
1422
qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
1416
1423
{
1417
1424
        DEFiRet;
1418
1425
        void *pUsr;
1420
1427
        int bRunsDA;     /* cache for early mutex release */
1421
1428
 
1422
1429
        /* dequeue element (still protected from mutex) */
1423
 
        iRet = queueDel(pThis, &pUsr);
1424
 
        queueChkPersist(pThis);
1425
 
        iQueueSize = queueGetOverallQueueSize(pThis); /* cache this for after mutex release */
 
1430
        iRet = qqueueDel(pThis, &pUsr);
 
1431
        qqueueChkPersist(pThis);
 
1432
        iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */
1426
1433
        bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
1427
1434
 
1428
1435
        /* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY
1473
1480
         * provide real-time creation of spool files.
1474
1481
         * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong.
1475
1482
         */
1476
 
        CHKiRet(queueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
 
1483
        CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
1477
1484
 
1478
1485
finalize_it:
1479
1486
        if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
1522
1529
 * but you get the idea from the code above.
1523
1530
 */
1524
1531
static rsRetVal
1525
 
queueRateLimiter(queue_t *pThis)
 
1532
qqueueRateLimiter(qqueue_t *pThis)
1526
1533
{
1527
1534
        DEFiRet;
1528
1535
        int iDelay;
1530
1537
        time_t tCurr;
1531
1538
        struct tm m;
1532
1539
 
1533
 
        ISOBJ_TYPE_assert(pThis, queue);
1534
 
 
1535
 
        dbgoprint((obj_t*) pThis, "entering rate limiter\n");
 
1540
        ISOBJ_TYPE_assert(pThis, qqueue);
1536
1541
 
1537
1542
        iDelay = 0;
1538
1543
        if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */
1587
1592
 * rgerhards, 2008-01-21
1588
1593
 */
1589
1594
static rsRetVal
1590
 
queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
 
1595
qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
1591
1596
{
1592
1597
        DEFiRet;
1593
1598
 
1594
 
        ISOBJ_TYPE_assert(pThis, queue);
 
1599
        ISOBJ_TYPE_assert(pThis, qqueue);
1595
1600
        ISOBJ_TYPE_assert(pWti, wti);
1596
1601
 
1597
 
        CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
 
1602
        CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
1598
1603
        CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
1599
1604
 
1600
1605
        /* we now need to check if we should deliberately delay processing a bit
1621
1626
 * rgerhards, 2008-01-14
1622
1627
 */
1623
1628
static rsRetVal
1624
 
queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
 
1629
qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
1625
1630
{
1626
1631
        DEFiRet;
1627
1632
 
1628
 
        ISOBJ_TYPE_assert(pThis, queue);
 
1633
        ISOBJ_TYPE_assert(pThis, qqueue);
1629
1634
        ISOBJ_TYPE_assert(pWti, wti);
1630
1635
 
1631
 
        CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
1632
 
        CHKiRet(queueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
 
1636
        CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
 
1637
        CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
1633
1638
 
1634
1639
finalize_it:
1635
1640
        dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
1645
1650
 * the DA queue
1646
1651
 */
1647
1652
static int
1648
 
queueChkStopWrkrDA(queue_t *pThis)
 
1653
qqueueChkStopWrkrDA(qqueue_t *pThis)
1649
1654
{
1650
1655
        /* if our queue is in destruction, we drain to the DA queue and so we shall not terminate
1651
1656
         * until we have done so.
1664
1669
                           && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
1665
1670
                                /* this queue can never grow, so we can give up... */
1666
1671
                                bStopWrkr = 1;
1667
 
                        } else if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
 
1672
                        } else if(qqueueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
1668
1673
                                bStopWrkr = 1;
1669
1674
                        } else {
1670
1675
                                bStopWrkr = 0;
1687
1692
 * the DA queue
1688
1693
 */
1689
1694
static int
1690
 
queueChkStopWrkrReg(queue_t *pThis)
 
1695
qqueueChkStopWrkrReg(qqueue_t *pThis)
1691
1696
{
1692
 
        return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && queueGetOverallQueueSize(pThis) == 0);
 
1697
        return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0);
1693
1698
}
1694
1699
 
1695
1700
 
1697
1702
 * are not stable! DA queue version
1698
1703
 */
1699
1704
static int
1700
 
queueIsIdleDA(queue_t *pThis)
 
1705
qqueueIsIdleDA(qqueue_t *pThis)
1701
1706
{
1702
1707
        /* remember: iQueueSize is the DA queue size, not the main queue! */
1703
1708
        /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
1704
 
        return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
 
1709
        return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
1705
1710
}
1706
1711
/* must only be called when the queue mutex is locked, else results
1707
1712
 * are not stable! Regular queue version
1708
1713
 */
1709
1714
static int
1710
 
queueIsIdleReg(queue_t *pThis)
 
1715
qqueueIsIdleReg(qqueue_t *pThis)
1711
1716
{
1712
1717
#if 0 /* enable for performance testing */
1713
1718
        int ret;
1714
 
        ret = queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk);
 
1719
        ret = qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk);
1715
1720
        if(ret) fprintf(stderr, "queue is idle\n");
1716
1721
        return ret;
1717
1722
#else 
1718
1723
        /* regular code! */
1719
 
        return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
 
1724
        return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
1720
1725
#endif
1721
1726
}
1722
1727
 
1735
1740
 * I am telling this, because I, too, always get confused by those...
1736
1741
 */
1737
1742
static rsRetVal
1738
 
queueRegOnWrkrShutdown(queue_t *pThis)
 
1743
qqueueRegOnWrkrShutdown(qqueue_t *pThis)
1739
1744
{
1740
1745
        DEFiRet;
1741
1746
 
1742
 
        ISOBJ_TYPE_assert(pThis, queue);
 
1747
        ISOBJ_TYPE_assert(pThis, qqueue);
1743
1748
 
1744
1749
        if(pThis->pqParent != NULL) {
1745
1750
                pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
1756
1761
 * hook to indicate in the parent queue (if we are a child) that we are not done yet.
1757
1762
 */
1758
1763
static rsRetVal
1759
 
queueRegOnWrkrStartup(queue_t *pThis)
 
1764
qqueueRegOnWrkrStartup(qqueue_t *pThis)
1760
1765
{
1761
1766
        DEFiRet;
1762
1767
 
1763
 
        ISOBJ_TYPE_assert(pThis, queue);
 
1768
        ISOBJ_TYPE_assert(pThis, qqueue);
1764
1769
 
1765
1770
        if(pThis->pqParent != NULL) {
1766
1771
                pThis->pqParent->bChildIsDone = 0;
1773
1778
/* start up the queue - it must have been constructed and parameters defined
1774
1779
 * before.
1775
1780
 */
1776
 
rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
 
1781
rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
1777
1782
{
1778
1783
        DEFiRet;
1779
1784
        rsRetVal iRetLocal;
1809
1814
        /* call type-specific constructor */
1810
1815
        CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
1811
1816
 
1812
 
        dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d starting\n",
 
1817
        dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, "
 
1818
                                  "full delay %d, light delay %d starting\n",
1813
1819
                  pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
1814
 
                  queueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1);
 
1820
                  qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1,
 
1821
                  pThis->iFullDlyMrk, pThis->iLightDlyMrk);
1815
1822
 
1816
1823
        if(pThis->qType == QUEUETYPE_DIRECT)
1817
1824
                FINALIZE;       /* with direct queues, we are already finished... */
1822
1829
        lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
1823
1830
        CHKiRet(wtpConstruct            (&pThis->pWtpReg));
1824
1831
        CHKiRet(wtpSetDbgHdr            (pThis->pWtpReg, pszBuf, lenBuf));
1825
 
        CHKiRet(wtpSetpfRateLimiter     (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter));
1826
 
        CHKiRet(wtpSetpfChkStopWrkr     (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg));
1827
 
        CHKiRet(wtpSetpfIsIdle          (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg));
1828
 
        CHKiRet(wtpSetpfDoWork          (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg));
1829
 
        CHKiRet(wtpSetpfOnWorkerCancel  (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))queueConsumerCancelCleanup));
1830
 
        CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrStartup));
1831
 
        CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrShutdown));
 
1832
        CHKiRet(wtpSetpfRateLimiter     (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter));
 
1833
        CHKiRet(wtpSetpfChkStopWrkr     (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg));
 
1834
        CHKiRet(wtpSetpfIsIdle          (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg));
 
1835
        CHKiRet(wtpSetpfDoWork          (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg));
 
1836
        CHKiRet(wtpSetpfOnWorkerCancel  (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup));
 
1837
        CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup));
 
1838
        CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown));
1832
1839
        CHKiRet(wtpSetpmutUsr           (pThis->pWtpReg, pThis->mut));
1833
1840
        CHKiRet(wtpSetpcondBusy         (pThis->pWtpReg, &pThis->notEmpty));
1834
1841
        CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
1841
1848
                /* If we are disk-assisted, we need to check if there is a QIF file
1842
1849
                 * which we need to load. -- rgerhards, 2008-01-15
1843
1850
                 */
1844
 
                iRetLocal = queueHaveQIF(pThis);
 
1851
                iRetLocal = qqueueHaveQIF(pThis);
1845
1852
                if(iRetLocal == RS_RET_OK) {
1846
1853
                        dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n");
1847
 
                        queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
 
1854
                        qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
1848
1855
                        bInitialized = 1; /* we are done */
1849
1856
                } else {
1850
1857
                        /* TODO: use logerror? -- rgerhards, 2008-01-16 */
1861
1868
        /* if the queue already contains data, we need to start the correct number of worker threads. This can be
1862
1869
         * the case when a disk queue has been loaded. If we did not start it here, it would never start.
1863
1870
         */
1864
 
        queueAdviseMaxWorkers(pThis);
 
1871
        qqueueAdviseMaxWorkers(pThis);
1865
1872
        pThis->bQueueStarted = 1;
1866
1873
 
1867
1874
finalize_it:
1876
1883
 * and 0 otherwise.
1877
1884
 * rgerhards, 2008-01-10
1878
1885
 */
1879
 
static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
 
1886
static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
1880
1887
{
1881
1888
        DEFiRet;
1882
1889
        strm_t *psQIF = NULL; /* Queue Info File */
1887
1894
        ASSERT(pThis != NULL);
1888
1895
 
1889
1896
        if(pThis->qType != QUEUETYPE_DISK) {
1890
 
                if(queueGetOverallQueueSize(pThis) > 0) {
 
1897
                if(qqueueGetOverallQueueSize(pThis) > 0) {
1891
1898
                        /* This error code is OK, but we will probably not implement this any time
1892
1899
                         * The reason is that persistence happens via DA queues. But I would like to
1893
1900
                         * leave the code as is, as we so have a hook in case we need one.
1898
1905
                        FINALIZE; /* if the queue is empty, we are happy and done... */
1899
1906
        }
1900
1907
 
1901
 
        dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", queueGetOverallQueueSize(pThis));
 
1908
        dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", qqueueGetOverallQueueSize(pThis));
1902
1909
 
1903
1910
        /* Construct file name */
1904
1911
        lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
1905
1912
                             (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
1906
1913
 
1907
 
        if((bIsCheckpoint != QUEUE_CHECKPOINT) && (queueGetOverallQueueSize(pThis) == 0)) {
 
1914
        if((bIsCheckpoint != QUEUE_CHECKPOINT) && (qqueueGetOverallQueueSize(pThis) == 0)) {
1908
1915
                if(pThis->bNeedDelQIF) {
1909
1916
                        unlink((char*)pszQIFNam);
1910
1917
                        pThis->bNeedDelQIF = 0;
1938
1945
         * to the regular files. -- rgerhards, 2008-01-29
1939
1946
         */
1940
1947
        while(pThis->iUngottenObjs > 0) {
1941
 
                CHKiRet(queueGetUngottenObj(pThis, &pUsr));
 
1948
                CHKiRet(qqueueGetUngottenObj(pThis, &pUsr));
1942
1949
                CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
1943
1950
                objDestruct(pUsr);
1944
1951
        }
1972
1979
 * abide to our regular call interface)...
1973
1980
 * rgerhards, 2008-01-13
1974
1981
 */
1975
 
rsRetVal queueChkPersist(queue_t *pThis)
 
1982
rsRetVal qqueueChkPersist(qqueue_t *pThis)
1976
1983
{
1977
1984
        DEFiRet;
1978
1985
 
1979
 
        ISOBJ_TYPE_assert(pThis, queue);
 
1986
        ISOBJ_TYPE_assert(pThis, qqueue);
1980
1987
 
1981
1988
        if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
1982
 
                queuePersist(pThis, QUEUE_CHECKPOINT);
 
1989
                qqueuePersist(pThis, QUEUE_CHECKPOINT);
1983
1990
                pThis->iUpdsSincePersist = 0;
1984
1991
        }
1985
1992
 
1988
1995
 
1989
1996
 
1990
1997
/* destructor for the queue object */
1991
 
BEGINobjDestruct(queue) /* be sure to specify the object type also in END and CODESTART macros! */
1992
 
CODESTARTobjDestruct(queue)
 
1998
BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */
 
1999
CODESTARTobjDestruct(qqueue)
1993
2000
        pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
1994
2001
 
1995
2002
        /* shut down all workers (handles *all* of the persistence logic)
1999
2006
         * with a child! -- rgerhards, 2008-01-28
2000
2007
         */
2001
2008
        if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL)
2002
 
                queueShutdownWorkers(pThis);
 
2009
                qqueueShutdownWorkers(pThis);
2003
2010
 
2004
2011
        /* finally destruct our (regular) worker thread pool
2005
2012
         * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,
2024
2031
                wtpDestruct(&pThis->pWtpDA);
2025
2032
        }
2026
2033
        if(pThis->pqDA != NULL) {
2027
 
                queueDestruct(&pThis->pqDA);
 
2034
                qqueueDestruct(&pThis->pqDA);
2028
2035
        }
2029
2036
 
2030
2037
        /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty)
2034
2041
         * disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here
2035
2042
         * if need arises (what I doubt...) -- rgerhards, 2008-01-25
2036
2043
         */
2037
 
        CHKiRet_Hdlr(queuePersist(pThis, QUEUE_NO_CHECKPOINT)) {
 
2044
        CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) {
2038
2045
                dbgoprint((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet);
2039
2046
        }
2040
2047
 
2059
2066
 
2060
2067
        if(pThis->pszSpoolDir != NULL)
2061
2068
                free(pThis->pszSpoolDir);
2062
 
ENDobjDestruct(queue)
 
2069
ENDobjDestruct(qqueue)
2063
2070
 
2064
2071
 
2065
2072
/* set the queue's file prefix
2068
2075
 * rgerhards, 2008-01-09
2069
2076
 */
2070
2077
rsRetVal
2071
 
queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
 
2078
qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
2072
2079
{
2073
2080
        DEFiRet;
2074
2081
 
2091
2098
 * rgerhards, 2008-01-09
2092
2099
 */
2093
2100
rsRetVal
2094
 
queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize)
 
2101
qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize)
2095
2102
{
2096
2103
        DEFiRet;
2097
2104
 
2098
 
        ISOBJ_TYPE_assert(pThis, queue);
 
2105
        ISOBJ_TYPE_assert(pThis, qqueue);
2099
2106
        
2100
2107
        if(iMaxFileSize < 1024) {
2101
2108
                ABORT_FINALIZE(RS_RET_VALUE_TOO_LOW);
2112
2119
 * Enqueues the new element and awakes worker thread.
2113
2120
 */
2114
2121
rsRetVal
2115
 
queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
 
2122
qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
2116
2123
{
2117
2124
        DEFiRet;
2118
2125
        int iCancelStateSave;
2119
2126
        struct timespec t;
2120
2127
 
2121
 
        ISOBJ_TYPE_assert(pThis, queue);
 
2128
        ISOBJ_TYPE_assert(pThis, qqueue);
 
2129
 
 
2130
        /* first check if we need to discard this message (which will cause CHKiRet() to exit)
 
2131
         * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize
 
2132
         * and bRunsDA parameters may not reflect the correct settings here, but they are
 
2133
         * "good enough" in the sense that they can be used to drive the decision. Valgrind's
 
2134
         * threading tools may point this access to be an error, but this is done
 
2135
         * intentional. I do not see this causes problems to us.
 
2136
         */
 
2137
        CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
2122
2138
 
2123
2139
        /* Please note that this function is not cancel-safe and consequently
2124
2140
         * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
2131
2147
                d_pthread_mutex_lock(pThis->mut);
2132
2148
        }
2133
2149
 
2134
 
        /* first check if we need to discard this message (which will cause CHKiRet() to exit) */
2135
 
        CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
2136
 
 
2137
2150
        /* then check if we need to add an assistance disk queue */
2138
2151
        if(pThis->bIsDA)
2139
 
                CHKiRet(queueChkStrtDA(pThis));
 
2152
                CHKiRet(qqueueChkStrtDA(pThis));
2140
2153
        
2141
2154
        /* handle flow control
2142
2155
         * There are two different flow control mechanisms: basic and advanced flow control.
2160
2173
         */
2161
2174
        if(flowCtlType == eFLOWCTL_FULL_DELAY) {
2162
2175
                while(pThis->iQueueSize >= pThis->iFullDlyMrk) {
2163
 
                        dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayble message - blocking.\n");
 
2176
                        dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n");
2164
2177
                        pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */
2165
2178
                }
2166
2179
        } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) {
2167
2180
                if(pThis->iQueueSize >= pThis->iLightDlyMrk) {
2168
 
                        dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayble message - blocking a bit.\n");
 
2181
                        dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n");
2169
2182
                        timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */
2170
2183
                        pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */
2171
2184
                }
2189
2202
        }
2190
2203
 
2191
2204
        /* and finally enqueue the message */
2192
 
        CHKiRet(queueAdd(pThis, pUsr));
2193
 
        queueChkPersist(pThis);
 
2205
        CHKiRet(qqueueAdd(pThis, pUsr));
 
2206
        qqueueChkPersist(pThis);
2194
2207
 
2195
2208
finalize_it:
2196
2209
        if(pThis->qType != QUEUETYPE_DIRECT) {
2197
2210
                /* make sure at least one worker is running. */
2198
 
                queueAdviseMaxWorkers(pThis);
2199
 
                dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
 
2211
                qqueueAdviseMaxWorkers(pThis);
2200
2212
                /* and release the mutex */
2201
2213
                d_pthread_mutex_unlock(pThis->mut);
2202
2214
                pthread_setcancelstate(iCancelStateSave, NULL);
 
2215
                dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
 
2216
                /* the following pthread_yield is experimental, but brought us performance
 
2217
                 * benefit. For details, please see http://kb.monitorware.com/post14216.html#p14216
 
2218
                 * rgerhards, 2008-10-09
 
2219
                 * but this is only true for uniprocessors, so we guard it with an optimize flag -- rgerhards, 2008-10-22
 
2220
                 */
 
2221
                if(pThis->bOptimizeUniProc)
 
2222
                        pthread_yield();
2203
2223
        }
2204
2224
 
2205
2225
        RETiRet;
2215
2235
 * rgerhards, 2008-01-16
2216
2236
 */
2217
2237
static rsRetVal
2218
 
queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex)
 
2238
qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
2219
2239
{
2220
2240
        DEFiRet;
2221
2241
        DEFVARS_mutexProtection;
2222
2242
 
2223
 
        ISOBJ_TYPE_assert(pThis, queue);
 
2243
        ISOBJ_TYPE_assert(pThis, qqueue);
2224
2244
 
2225
2245
        /* for simplicity, we do one big mutex lock. This method is extremely seldom
2226
2246
         * called, so that doesn't matter... -- rgerhards, 2008-01-16
2259
2279
 
2260
2280
 
2261
2281
/* some simple object access methods */
2262
 
DEFpropSetMeth(queue, iPersistUpdCnt, int)
2263
 
DEFpropSetMeth(queue, iDeqtWinFromHr, int)
2264
 
DEFpropSetMeth(queue, iDeqtWinToHr, int)
2265
 
DEFpropSetMeth(queue, toQShutdown, long)
2266
 
DEFpropSetMeth(queue, toActShutdown, long)
2267
 
DEFpropSetMeth(queue, toWrkShutdown, long)
2268
 
DEFpropSetMeth(queue, toEnq, long)
2269
 
DEFpropSetMeth(queue, iHighWtrMrk, int)
2270
 
DEFpropSetMeth(queue, iLowWtrMrk, int)
2271
 
DEFpropSetMeth(queue, iDiscardMrk, int)
2272
 
DEFpropSetMeth(queue, iFullDlyMrk, int)
2273
 
DEFpropSetMeth(queue, iDiscardSeverity, int)
2274
 
DEFpropSetMeth(queue, bIsDA, int)
2275
 
DEFpropSetMeth(queue, iMinMsgsPerWrkr, int)
2276
 
DEFpropSetMeth(queue, bSaveOnShutdown, int)
2277
 
DEFpropSetMeth(queue, pUsr, void*)
2278
 
DEFpropSetMeth(queue, iDeqSlowdown, int)
2279
 
DEFpropSetMeth(queue, sizeOnDiskMax, int64)
 
2282
DEFpropSetMeth(qqueue, iPersistUpdCnt, int)
 
2283
DEFpropSetMeth(qqueue, iDeqtWinFromHr, int)
 
2284
DEFpropSetMeth(qqueue, iDeqtWinToHr, int)
 
2285
DEFpropSetMeth(qqueue, toQShutdown, long)
 
2286
DEFpropSetMeth(qqueue, toActShutdown, long)
 
2287
DEFpropSetMeth(qqueue, toWrkShutdown, long)
 
2288
DEFpropSetMeth(qqueue, toEnq, long)
 
2289
DEFpropSetMeth(qqueue, iHighWtrMrk, int)
 
2290
DEFpropSetMeth(qqueue, iLowWtrMrk, int)
 
2291
DEFpropSetMeth(qqueue, iDiscardMrk, int)
 
2292
DEFpropSetMeth(qqueue, iFullDlyMrk, int)
 
2293
DEFpropSetMeth(qqueue, iDiscardSeverity, int)
 
2294
DEFpropSetMeth(qqueue, bIsDA, int)
 
2295
DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
 
2296
DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
 
2297
DEFpropSetMeth(qqueue, pUsr, void*)
 
2298
DEFpropSetMeth(qqueue, iDeqSlowdown, int)
 
2299
DEFpropSetMeth(qqueue, sizeOnDiskMax, int64)
2280
2300
 
2281
2301
 
2282
2302
/* This function can be used as a generic way to set properties. Only the subset
2285
2305
 * rgerhards, 2008-01-11
2286
2306
 */
2287
2307
#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1)
2288
 
static rsRetVal queueSetProperty(queue_t *pThis, var_t *pProp)
 
2308
static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp)
2289
2309
{
2290
2310
        DEFiRet;
2291
2311
 
2292
 
        ISOBJ_TYPE_assert(pThis, queue);
 
2312
        ISOBJ_TYPE_assert(pThis, qqueue);
2293
2313
        ASSERT(pProp != NULL);
2294
2314
 
2295
2315
        if(isProp("iQueueSize")) {
2311
2331
#undef  isProp
2312
2332
 
2313
2333
/* dummy */
2314
 
rsRetVal queueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
 
2334
rsRetVal qqueueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
2315
2335
 
2316
2336
/* Initialize the stream class. Must be called as the very first method
2317
2337
 * before anything else is called inside this class.
2318
2338
 * rgerhards, 2008-01-09
2319
2339
 */
2320
 
BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE)
 
2340
BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE)
2321
2341
        /* request objects we use */
2322
2342
        CHKiRet(objUse(glbl, CORE_COMPONENT));
2323
2343
 
2324
2344
        /* now set our own handlers */
2325
 
        OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
2326
 
ENDObjClassInit(queue)
 
2345
        OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty);
 
2346
ENDObjClassInit(qqueue)
2327
2347
 
2328
2348
/* vi:set ai:
2329
2349
 */