84
85
/* Not implemented dummy function for constructor */
85
static rsRetVal NotImplementedDummy() { return RS_RET_OK; }
86
static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; }
86
87
/* Standard-Constructor for the wtp object
88
89
BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */
89
pthread_mutex_init(&pThis->mut, NULL);
90
pthread_mutex_init(&pThis->mutThrdShutdwn, NULL);
90
pthread_mutex_init(&pThis->mutWtp, NULL);
91
91
pthread_cond_init(&pThis->condThrdTrm, NULL);
92
pthread_attr_init(&pThis->attrThrd);
93
/* Set thread scheduling policy to default */
94
#ifdef HAVE_PTHREAD_SETSCHEDPARAM
95
pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy);
96
pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param);
97
pthread_attr_setinheritsched(&pThis->attrThrd, PTHREAD_EXPLICIT_SCHED);
99
pthread_attr_setdetachstate(&pThis->attrThrd, PTHREAD_CREATE_DETACHED);
92
100
/* set all function pointers to "not implemented" dummy so that we can safely call them */
93
101
pThis->pfChkStopWrkr = NotImplementedDummy;
94
pThis->pfIsIdle = NotImplementedDummy;
102
pThis->pfGetDeqBatchSize = NotImplementedDummy;
95
103
pThis->pfDoWork = NotImplementedDummy;
96
pThis->pfOnIdle = NotImplementedDummy;
97
pThis->pfOnWorkerCancel = NotImplementedDummy;
98
pThis->pfOnWorkerStartup = NotImplementedDummy;
99
pThis->pfOnWorkerShutdown = NotImplementedDummy;
104
pThis->pfObjProcessed = NotImplementedDummy;
105
INIT_ATOMIC_HELPER_MUT(pThis->mutCurNumWrkThrd);
106
INIT_ATOMIC_HELPER_MUT(pThis->mutWtpState);
100
107
ENDobjConstruct(wtp)
152
156
/* actual destruction */
153
157
pthread_cond_destroy(&pThis->condThrdTrm);
154
pthread_mutex_destroy(&pThis->mut);
155
pthread_mutex_destroy(&pThis->mutThrdShutdwn);
158
pthread_mutex_destroy(&pThis->mutWtp);
159
pthread_attr_destroy(&pThis->attrThrd);
160
DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurNumWrkThrd);
161
DESTROY_ATOMIC_HELPER_MUT(pThis->mutWtpState);
157
163
free(pThis->pszDbgHdr);
158
164
ENDobjDestruct(wtp)
161
/* wake up at least one worker thread.
162
* rgerhards, 2008-01-20
165
wtpWakeupWrkr(wtp_t *pThis)
169
/* TODO; mutex? I think not needed, as we do not need predictable exec order -- rgerhards, 2008-01-28 */
170
ISOBJ_TYPE_assert(pThis, wtp);
171
pthread_cond_signal(pThis->pcondBusy);
175
/* wake up all worker threads.
176
* rgerhards, 2008-01-16
179
wtpWakeupAllWrkr(wtp_t *pThis)
183
ISOBJ_TYPE_assert(pThis, wtp);
184
pthread_cond_broadcast(pThis->pcondBusy);
189
/* check if we had any worker thread changes and, if so, act
190
* on them. At a minimum, terminated threads are harvested (joined).
191
* This function MUST NEVER block on the queue mutex!
194
wtpProcessThrdChanges(wtp_t *pThis)
199
ISOBJ_TYPE_assert(pThis, wtp);
201
if(pThis->bThrdStateChanged == 0)
204
if(d_pthread_mutex_trylock(&(pThis->mutThrdShutdwn)) != 0) {
205
/* another thread is already in the loop */
209
/* Note: there is a left-over potential race condition below:
210
* pThis->bThrdStateChanged may be re-set by another thread while
211
* we work on it and thus the loop may terminate too early. However,
212
* there are no really bad effects from that so I perfer - for this
213
* version - to live with the problem as is. Not a good idea to
214
* introduce that large change into the stable branch without very
215
* good reason. -- rgerhards, 2009-04-02
218
/* reset the change marker */
219
ATOMIC_STORE_0_TO_INT(pThis->bThrdStateChanged);
220
/* go through all threads */
221
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
222
wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX);
224
/* restart if another change occured while we were processing the changes */
225
} while(pThis->bThrdStateChanged != 0);
227
d_pthread_mutex_unlock(&(pThis->mutThrdShutdwn));
234
/* Sent a specific state for the worker thread pool.
235
* rgerhards, 2008-01-21
167
/* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21
168
* We do not need to do atomic instructions as set operations are only
169
* called when terminating the pool, and then in strict sequence. So we
170
* can never overwrite each other. On the other hand, it also doesn't
171
* matter if the read operation obtains an older value, as we then simply
172
* do one more iteration, what is perfectly legal (during shutdown
173
* they are awoken in any case). -- rgerhards, 2009-07-20
238
176
wtpSetState(wtp_t *pThis, wtpState_t iNewState)
242
178
ISOBJ_TYPE_assert(pThis, wtp);
243
pThis->wtpState = iNewState;
244
/* TODO: must wakeup workers? seen to be not needed -- rgerhards, 2008-01-28 */
179
pThis->wtpState = iNewState; // TODO: do we need a mutex here? 2010-04-26
250
184
/* check if the worker shall shutdown (1 = yes, 0 = no)
251
* TODO: check if we can use atomic operations to enhance performance
252
185
* Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user"
253
186
* (e.g. the queue clas)
254
187
* rgerhards, 2008-01-21
257
wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
190
wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex)
260
DEFVARS_mutexProtection;
262
195
ISOBJ_TYPE_assert(pThis, wtp);
196
/* we need a consistent value, but it doesn't really matter if it is changed
197
* right after the fetch - then we simply do one more iteration in the worker
199
wtpState = (wtpState_t) ATOMIC_FETCH_32BIT((int*)&pThis->wtpState, &pThis->mutWtpState);
264
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
265
if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
266
|| ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex)))
267
iRet = RS_RET_TERMINATE_NOW;
268
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
201
if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
202
ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
203
} else if(wtpState == wtpState_SHUTDOWN) {
204
ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE);
270
207
/* try customer handler if one was set and we do not yet have a definite result */
271
if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) {
208
if(pThis->pfChkStopWrkr != NULL) {
272
209
iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
279
217
#pragma GCC diagnostic ignored "-Wempty-body"
280
218
/* Send a shutdown command to all workers and see if they terminate.
281
* A timeout may be specified.
219
* A timeout may be specified. This function may also be called with
220
* the current number of workers being 0, in which case it does not
221
* shut down any worker.
282
222
* rgerhards, 2008-01-14
289
int iCancelStateSave;
291
231
ISOBJ_TYPE_assert(pThis, wtp);
233
/* lock mutex to prevent races (may otherwise happen during idle processing and such...) */
234
d_pthread_mutex_lock(pThis->pmutUsr);
293
235
wtpSetState(pThis, tShutdownCmd);
294
wtpWakeupAllWrkr(pThis);
236
pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */
237
/* awake workers in retry loop */
238
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
239
wtiWakeupThrd(pThis->pWrkr[i]);
241
d_pthread_mutex_unlock(pThis->pmutUsr);
296
/* see if we need to harvest (join) any terminated threads (even in timeout case,
297
* some may have terminated...
299
wtpProcessThrdChanges(pThis);
301
/* and wait for their termination */
302
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
303
d_pthread_mutex_lock(&pThis->mut);
304
pthread_cleanup_push(mutexCancelCleanup, &pThis->mut);
305
pthread_setcancelstate(iCancelStateSave, NULL);
243
/* wait for worker thread termination */
244
d_pthread_mutex_lock(&pThis->mutWtp);
245
pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp);
307
247
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
308
dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
309
wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd);
248
DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n",
249
wtpGetDbgHdr(pThis), timeoutVal(ptTimeout),
250
ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
311
if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, ptTimeout) != 0) {
312
dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
252
if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutWtp, ptTimeout) != 0) {
253
DBGPRINTF("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
313
254
bTimedOut = 1; /* we exit the loop on timeout */
257
/* awake workers in retry loop */
258
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
259
wtiWakeupThrd(pThis->pWrkr[i]);
316
263
pthread_cleanup_pop(1);
319
266
iRet = RS_RET_TIMED_OUT;
321
/* see if we need to harvest (join) any terminated threads (even in timeout case,
322
* some may have terminated...
324
wtpProcessThrdChanges(pThis);
328
270
#pragma GCC diagnostic warning "-Wempty-body"
331
/* indicate that a thread has terminated and awake anyone waiting on it
332
* rgerhards, 2008-01-23
334
rsRetVal wtpSignalWrkrTermination(wtp_t *pThis)
337
/* I leave the mutex code here out as it gives us deadlocks. I think it is not really
338
* needed and we are on the safe side. I leave this comment in if practice proves us
339
* wrong. The whole thing should be removed after half a year or year if we see there
340
* actually is no issue (or revisit it from a theoretical POV).
341
* rgerhards, 2008-01-28
342
* revisited 2008-09-30, still a bit unclear, leave in
344
/*TODO: mutex or not mutex, that's the question ;)DEFVARS_mutexProtection;*/
346
ISOBJ_TYPE_assert(pThis, wtp);
348
/*BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);*/
349
pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
350
/*END_MTX_PROTECTED_OPERATIONS(&pThis->mut);*/
355
273
/* Unconditionally cancel all running worker threads.
356
274
* rgerhards, 2008-01-14
380
/* Set the Inactivity Guard
381
* rgerhards, 2008-01-21
293
/* this function contains shared code for both regular worker shutdown as
294
* well as shutdown via cancellation. We can not simply use pthread_cleanup_pop(1)
295
* as this introduces a race in the debug system (RETiRet system).
296
* rgerhards, 2009-10-26
384
wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex)
299
wtpWrkrExecCleanup(wti_t *pWti)
387
DEFVARS_mutexProtection;
389
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
390
pThis->bInactivityGuard = bNewState;
391
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
304
ISOBJ_TYPE_assert(pWti, wti);
306
ISOBJ_TYPE_assert(pThis, wtp);
308
/* the order of the next two statements is important! */
309
wtiSetState(pWti, WRKTHRD_STOPPED);
310
ATOMIC_DEC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd);
312
DBGPRINTF("%s: Worker thread %lx, terminated, um workers now %d\n",
313
wtpGetDbgHdr(pThis), (unsigned long) pWti,
314
ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
397
/* cancellation cleanup handler for executing worker
398
* decrements the worker counter
399
* rgerhards, 2008-01-20
320
/* cancellation cleanup handler for executing worker decrements the worker counter.
321
* rgerhards, 2009-07-20
402
324
wtpWrkrExecCancelCleanup(void *arg)
404
wtp_t *pThis = (wtp_t*) arg;
326
wti_t *pWti = (wti_t*) arg;
330
ISOBJ_TYPE_assert(pWti, wti);
407
332
ISOBJ_TYPE_assert(pThis, wtp);
408
pThis->iCurNumWrkThrd--;
409
wtpSignalWrkrTermination(pThis);
411
dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd);
333
DBGPRINTF("%s: Worker thread %lx requested to be cancelled.\n",
334
wtpGetDbgHdr(pThis), (unsigned long) pWti);
336
wtpWrkrExecCleanup(pWti);
339
/* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main
340
* thread after the broadcast, which could destroy the debug class, resulting in a potential
341
* segfault. So we need to do the broadcast as actually the last action in our processing
343
pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
422
353
wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */
355
wti_t *pWti = (wti_t*) arg;
358
# if HAVE_PRCTL && defined PR_SET_NAME
424
359
uchar *pszDbgHdr;
425
360
uchar thrdName[32] = "rs:";
427
DEFVARS_mutexProtection;
428
wti_t *pWti = (wti_t*) arg;
432
364
ISOBJ_TYPE_assert(pWti, wti);
433
365
pThis = pWti->pWtp;
434
366
ISOBJ_TYPE_assert(pThis, wtp);
368
/* block all signals */
436
369
sigfillset(&sigSet);
437
370
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
372
/* but ignore SIGTTN, which we (ab)use to signal the thread to shutdown -- rgerhards, 2009-07-20 */
373
sigemptyset(&sigSet);
374
sigaddset(&sigSet, SIGTTIN);
375
pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL);
439
377
# if HAVE_PRCTL && defined PR_SET_NAME
440
378
/* set thread name - we ignore if the call fails, has no harsh consequences... */
441
379
pszDbgHdr = wtpGetDbgHdr(pThis);
448
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
450
/* do some late initialization */
452
pthread_cleanup_push(wtpWrkrExecCancelCleanup, pThis);
454
/* finally change to RUNNING state. We need to check if we actually should still run,
455
* because someone may have requested us to shut down even before we got a chance to do
456
* our init. That would be a bad race... -- rgerhards, 2008-01-16
458
wtiSetState(pWti, eWRKTHRD_RUNNING, 0, MUTEX_ALREADY_LOCKED); /* we are running now! */
461
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
463
iRet = wtiWorker(pWti); /* just to make sure: this is NOT protected by the mutex! */
465
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
466
} while(pThis->iCurNumWrkThrd == 1 && pThis->bInactivityGuard == 1);
467
/* inactivity guard prevents shutdown of all workers while one should be running due to race
468
* condition. It can lead to one more worker running than desired, but that is acceptable. After
469
* all, that worker will shutdown itself due to inactivity timeout. If, however, none were running
470
* when one was required, processing could come to a halt. -- rgerhards, 2008-01-21
386
pthread_cleanup_push(wtpWrkrExecCancelCleanup, pWti);
473
388
pthread_cleanup_pop(0);
474
pThis->iCurNumWrkThrd--;
475
wtpSignalWrkrTermination(pThis);
477
dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n",
478
wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd);
480
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
389
wtpWrkrExecCleanup(pWti);
392
/* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main
393
* thread after the broadcast, which could destroy the debug class, resulting in a potential
394
* segfault. So we need to do the broadcast as actually the last action in our processing
396
pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
485
399
#pragma GCC diagnostic warning "-Wempty-body"
515
422
if(i == pThis->iNumWorkerThreads)
516
423
ABORT_FINALIZE(RS_RET_NO_MORE_THREADS);
425
if(i == 0 || pThis->toWrkShutdown == -1) {
426
wtiSetAlwaysRunning(pThis->pWrkr[i]);
518
429
pWti = pThis->pWrkr[i];
519
wtiSetState(pWti, eWRKTHRD_RUN_CREATED, 0, LOCK_MUTEX);
520
iState = pthread_create(&(pWti->thrdID), NULL, wtpWorker, (void*) pWti);
521
dbgprintf("%s: started with state %d, num workers now %d\n",
522
wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd);
430
wtiSetState(pWti, WRKTHRD_RUNNING);
431
iState = pthread_create(&(pWti->thrdID), &pThis->attrThrd, wtpWorker, (void*) pWti);
432
ATOMIC_INC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd); /* we got one more! */
524
/* indicate we just started a worker and would like to see it running */
525
wtpSetInactivityGuard(pThis, 1, MUTEX_ALREADY_LOCKED);
434
DBGPRINTF("%s: started with state %d, num workers now %d\n",
435
wtpGetDbgHdr(pThis), iState,
436
ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
528
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
439
d_pthread_mutex_unlock(&pThis->mutWtp);
551
461
if(nMaxWrkr == 0)
554
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
556
464
if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */
557
465
nMaxWrkr = pThis->iNumWorkerThreads;
559
nMissing = nMaxWrkr - pThis->iCurNumWrkThrd;
467
nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd);
561
469
if(nMissing > 0) {
562
dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
470
DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n",
471
wtpGetDbgHdr(pThis), nMissing);
563
472
/* start the rqtd nbr of workers */
564
473
for(i = 0 ; i < nMissing ; ++i) {
565
CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED));
569
dbgprintf("wtpAdviseMaxWorkers signals busy\n");
570
wtpWakeupWrkr(pThis);
474
CHKiRet(wtpStartWrkr(pThis));
477
pthread_cond_signal(pThis->pcondBusy);
576
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
587
492
DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t)
588
493
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
589
494
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
590
DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int))
591
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int))
592
DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
593
DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*))
594
DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*))
595
DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*))
598
/* return the current number of worker threads.
599
* TODO: atomic operation would bring a nice performance
601
* rgerhards, 2008-01-27
604
wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex)
606
DEFVARS_mutexProtection;
610
ISOBJ_TYPE_assert(pThis, wtp);
612
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
613
iNumWrkr = pThis->iCurNumWrkThrd;
614
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
495
DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
496
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*))
497
DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*))
621
500
/* set the debug header message