3
* This file implements the worker thread instance (wti) class.
5
* File begun on 2008-01-20 by RGerhards based on functions from the
6
* previous queue object class (the wti functions have been extracted)
8
* There is some in-depth documentation available in doc/dev_queue.html
9
* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
10
* if you are getting aquainted to the object.
12
* Copyright 2008 Rainer Gerhards and Adiscon GmbH.
14
* This file is part of the rsyslog runtime library.
16
* The rsyslog runtime library is free software: you can redistribute it and/or modify
17
* it under the terms of the GNU Lesser General Public License as published by
18
* the Free Software Foundation, either version 3 of the License, or
19
* (at your option) any later version.
21
* The rsyslog runtime library is distributed in the hope that it will be useful,
22
* but WITHOUT ANY WARRANTY; without even the implied warranty of
23
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24
* GNU Lesser General Public License for more details.
26
* You should have received a copy of the GNU Lesser General Public License
27
* along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
29
* A copy of the GPL can be found in the file "COPYING" in this distribution.
30
* A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
43
#include "stringbuf.h"
52
/* forward-definitions */
56
/* get the header for debug messages
57
* The caller must NOT free or otherwise modify the returned string!
60
wtiGetDbgHdr(wti_t *pThis)
62
ISOBJ_TYPE_assert(pThis, wti);
64
if(pThis->pszDbgHdr == NULL)
65
return (uchar*) "wti"; /* should not normally happen */
67
return pThis->pszDbgHdr;
71
/* get the current worker state. For simplicity and speed, we have
72
* NOT used our regular calling interface this time. I hope that won't
73
* bite in the long term... -- rgerhards, 2008-01-17
74
* TODO: may be performance optimized by atomic operations
77
wtiGetState(wti_t *pThis, int bLockMutex)
79
DEFVARS_mutexProtection;
83
ISOBJ_TYPE_assert(pThis, wti);
85
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
86
tCmd = pThis->tCurrCmd;
87
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
94
/* send a command to a specific thread
95
* bActiveOnly specifies if the command should be sent only when the worker is
96
* in an active state. -- rgerhards, 2008-01-20
99
wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
102
DEFVARS_mutexProtection;
104
ISOBJ_TYPE_assert(pThis, wti);
105
assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE);
107
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
109
/* all worker states must be followed sequentially, only termination can be set in any state */
110
if( (bActiveOnly && (pThis->tCurrCmd < eWRKTHRD_RUN_CREATED))
111
|| (pThis->tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) {
112
dbgprintf("%s: command %d can not be accepted in current %d processing state - ignored\n",
113
wtiGetDbgHdr(pThis), tCmd, pThis->tCurrCmd);
115
dbgprintf("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd);
117
case eWRKTHRD_TERMINATING:
118
/* TODO: re-enable meaningful debug msg! (via function callback?)
119
dbgprintf("%s: thread terminating with %d entries left in queue, %d workers running.\n",
120
wtiGetDbgHdr(pThis->pQueue), pThis->pQueue->iQueueSize,
121
pThis->pQueue->iCurNumWrkThrd);
123
pthread_cond_signal(&pThis->condExitDone);
124
dbgprintf("%s: worker terminating\n", wtiGetDbgHdr(pThis));
126
case eWRKTHRD_RUNNING:
127
pthread_cond_signal(&pThis->condInitDone);
129
/* these cases just to satisfy the compiler, we do (yet) not act an them: */
130
case eWRKTHRD_STOPPED:
131
case eWRKTHRD_RUN_CREATED:
132
case eWRKTHRD_RUN_INIT:
133
case eWRKTHRD_SHUTDOWN:
134
case eWRKTHRD_SHUTDOWN_IMMEDIATE:
138
pThis->tCurrCmd = tCmd; /* apply the new state */
141
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
146
/* Cancel the thread. If the thread is already cancelled or termination,
147
* we do not again cancel it. But it is save and legal to call wtiCancelThrd() in
149
* rgerhards, 2008-02-26
152
wtiCancelThrd(wti_t *pThis)
156
ISOBJ_TYPE_assert(pThis, wti);
158
d_pthread_mutex_lock(&pThis->mut);
160
if(pThis->tCurrCmd >= eWRKTHRD_TERMINATING) {
161
dbgoprint((obj_t*) pThis, "canceling worker thread\n");
162
pthread_cancel(pThis->thrdID);
163
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
164
pThis->pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
167
d_pthread_mutex_unlock(&pThis->mut);
174
BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */
175
CODESTARTobjDestruct(wti)
176
/* if we reach this point, we must make sure the associated worker has terminated. It is
177
* the callers duty to make sure the worker already knows it shall terminate.
178
* TODO: is it *really* the caller's duty? ...mmmhhhh.... smells bad... rgerhards, 2008-01-25
180
wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */
182
d_pthread_mutex_lock(&pThis->mut);
183
if(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) {
184
dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - joining it\n",
185
wtiGetDbgHdr(pThis), pThis);
186
/* let's hope the caller actually instructed it to shutdown... */
187
pthread_cond_wait(&pThis->condExitDone, &pThis->mut);
190
d_pthread_mutex_unlock(&pThis->mut);
192
/* actual destruction */
193
pthread_cond_destroy(&pThis->condInitDone);
194
pthread_cond_destroy(&pThis->condExitDone);
195
pthread_mutex_destroy(&pThis->mut);
197
if(pThis->pszDbgHdr != NULL)
198
free(pThis->pszDbgHdr);
202
/* Standard-Constructor for the wti object
204
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
205
pthread_cond_init(&pThis->condInitDone, NULL);
206
pthread_cond_init(&pThis->condExitDone, NULL);
207
pthread_mutex_init(&pThis->mut, NULL);
211
/* Construction finalizer
212
* rgerhards, 2008-01-17
215
wtiConstructFinalize(wti_t *pThis)
219
ISOBJ_TYPE_assert(pThis, wti);
221
dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
223
/* initialize our thread instance descriptor */
225
pThis->tCurrCmd = eWRKTHRD_STOPPED;
231
/* join a specific worker thread
232
* we do not lock the mutex, because join will sync anyways...
235
wtiJoinThrd(wti_t *pThis)
239
ISOBJ_TYPE_assert(pThis, wti);
240
dbgprintf("waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd);
241
pthread_join(pThis->thrdID, NULL);
242
wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */
243
pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
244
dbgprintf("worker %s has stopped\n", wtiGetDbgHdr(pThis));
249
/* check if we had a worker thread changes and, if so, act
250
* on it. At a minimum, terminated threads are harvested (joined).
253
wtiProcessThrdChanges(wti_t *pThis, int bLockMutex)
256
DEFVARS_mutexProtection;
258
ISOBJ_TYPE_assert(pThis, wti);
260
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
261
switch(pThis->tCurrCmd) {
262
case eWRKTHRD_TERMINATING:
263
/* we need to at least temporarily release the mutex, because otherwise
264
* we may deadlock with the thread we intend to join (it aquires the mutex
265
* during termination processing). -- rgerhards, 2008-02-26
267
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
268
iRet = wtiJoinThrd(pThis);
269
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
271
/* these cases just to satisfy the compiler, we do not act an them: */
272
case eWRKTHRD_STOPPED:
273
case eWRKTHRD_RUN_CREATED:
274
case eWRKTHRD_RUN_INIT:
275
case eWRKTHRD_RUNNING:
276
case eWRKTHRD_SHUTDOWN:
277
case eWRKTHRD_SHUTDOWN_IMMEDIATE:
281
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
287
/* cancellation cleanup handler for queueWorker ()
288
* Updates admin structure and frees ressources.
289
* rgerhards, 2008-01-16
292
wtiWorkerCancelCleanup(void *arg)
294
wti_t *pThis = (wti_t*) arg;
296
int iCancelStateSave;
299
ISOBJ_TYPE_assert(pThis, wti);
301
ISOBJ_TYPE_assert(pWtp, wtp);
303
dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
305
/* call user supplied handler (that one e.g. requeues the element) */
306
pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp);
308
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
309
d_pthread_mutex_lock(&pWtp->mut);
310
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
311
/* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */
312
pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
314
d_pthread_mutex_unlock(&pWtp->mut);
315
pthread_setcancelstate(iCancelStateSave, NULL);
320
/* generic worker thread framework
322
* Some special comments below, so that they do not clutter the main function code:
324
* On the use of pthread_testcancel():
325
* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
326
* a cancellation point in itself. As we run most of the time without cancel enabled, I fear
327
* we may never get cancelled if we do not create a cancellation point ourselfs.
329
* On the use of pthread_yield():
330
* We yield to give the other threads a chance to obtain the mutex. If we do not
331
* do that, this thread may very well aquire the mutex again before another thread
332
* has even a chance to run. The reason is that mutex operations are free to be
333
* implemented in the quickest possible way (and they typically are!). That is, the
334
* mutex lock/unlock most probably just does an atomic memory swap and does not necessarily
335
* schedule other threads waiting on the same mutex. That can lead to the same thread
336
* aquiring the mutex ever and ever again while all others are starving for it. We
337
* have exactly seen this behaviour when we deliberately introduced a long-running
338
* test action which basically did a sleep. I understand that with real actions the
339
* likelihood of this starvation condition is very low - but it could still happen
340
* and would be very hard to debug. The yield() is a sure fix, its performance overhead
341
* should be well accepted given the above facts. -- rgerhards, 2008-01-10
343
#pragma GCC diagnostic ignored "-Wempty-body"
345
wtiWorker(wti_t *pThis)
348
DEFVARS_mutexProtection;
350
wtp_t *pWtp; /* our worker thread pool */
351
int bInactivityTOOccured = 0;
353
ISOBJ_TYPE_assert(pThis, wti);
354
pWtp = pThis->pWtp; /* shortcut */
355
ISOBJ_TYPE_assert(pWtp, wtp);
357
dbgSetThrdName(pThis->pszDbgHdr);
359
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
361
BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
362
pWtp->pfOnWorkerStartup(pWtp->pUsr);
363
END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
365
/* now we have our identity, on to real processing */
366
while(1) { /* loop will be broken below - need to do mutex locks */
367
/* process any pending thread requests */
368
wtpProcessThrdChanges(pWtp);
369
pthread_testcancel(); /* see big comment in function header */
370
# if !defined(__hpux) /* pthread_yield is missing there! */
371
pthread_yield(); /* see big comment in function header */
374
/* if we have a rate-limiter set for this worker pool, let's call it. Please
375
* keep in mind that the rate-limiter may hold us for an extended period
376
* of time. -- rgerhards, 2008-04-02
378
if(pWtp->pfRateLimiter != NULL) {
379
pWtp->pfRateLimiter(pWtp->pUsr);
382
wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
383
BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
385
if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED))
386
|| wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
387
END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
388
break; /* end worker thread run */
390
bInactivityTOOccured = 0; /* reset for next run */
392
/* if we reach this point, we are still protected by the mutex */
394
if(pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) {
395
dbgprintf("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
396
pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
398
if(pWtp->toWrkShutdown == -1) {
399
/* never shut down any started worker */
400
d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
402
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
403
if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
404
dbgprintf("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
405
bInactivityTOOccured = 1; /* indicate we had a timeout */
408
END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
409
continue; /* request next iteration */
412
/* if we reach this point, we have a non-empty queue (and are still protected by mutex) */
413
pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
416
/* indicate termination */
417
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
418
d_pthread_mutex_lock(&pThis->mut);
419
pthread_cleanup_pop(0); /* remove cleanup handler */
421
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
423
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
424
pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
425
d_pthread_mutex_unlock(&pThis->mut);
426
pthread_setcancelstate(iCancelStateSave, NULL);
430
#pragma GCC diagnostic warning "-Wempty-body"
433
/* some simple object access methods */
434
DEFpropSetMeth(wti, pWtp, wtp_t*)
436
/* set the debug header message
437
* The passed-in string is duplicated. So if the caller does not need
438
* it any longer, it must free it. Must be called only before object is finalized.
439
* rgerhards, 2008-01-09
442
wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg)
446
ISOBJ_TYPE_assert(pThis, wti);
447
assert(pszMsg != NULL);
450
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
452
if(pThis->pszDbgHdr != NULL) {
453
free(pThis->pszDbgHdr);
454
pThis->pszDbgHdr = NULL;
457
if((pThis->pszDbgHdr = malloc(sizeof(uchar) * lenMsg + 1)) == NULL)
458
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
460
memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */
468
rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
471
/* Initialize the wti class. Must be called as the very first method
472
* before anything else is called inside this class.
473
* rgerhards, 2008-01-09
475
BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */
476
/* request objects we use */