~ubuntu-branches/ubuntu/lucid/rsyslog/lucid-updates

1.1.2 by Michael Biebl
Import upstream version 3.14.2
1
/* wti.c
2
 *
3
 * This file implements the worker thread instance (wti) class.
4
 * 
5
 * File begun on 2008-01-20 by RGerhards based on functions from the 
6
 * previous queue object class (the wti functions have been extracted)
7
 *
8
 * There is some in-depth documentation available in doc/dev_queue.html
9
 * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
10
 * if you are getting aquainted to the object.
11
 *
12
 * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
13
 *
1.1.3 by Michael Biebl
Import upstream version 3.16.1
14
 * This file is part of the rsyslog runtime library.
1.1.2 by Michael Biebl
Import upstream version 3.14.2
15
 *
1.1.3 by Michael Biebl
Import upstream version 3.16.1
16
 * The rsyslog runtime library is free software: you can redistribute it and/or modify
17
 * it under the terms of the GNU Lesser General Public License as published by
1.1.2 by Michael Biebl
Import upstream version 3.14.2
18
 * the Free Software Foundation, either version 3 of the License, or
19
 * (at your option) any later version.
20
 *
1.1.3 by Michael Biebl
Import upstream version 3.16.1
21
 * The rsyslog runtime library is distributed in the hope that it will be useful,
1.1.2 by Michael Biebl
Import upstream version 3.14.2
22
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
1.1.3 by Michael Biebl
Import upstream version 3.16.1
24
 * GNU Lesser General Public License for more details.
1.1.2 by Michael Biebl
Import upstream version 3.14.2
25
 *
1.1.3 by Michael Biebl
Import upstream version 3.16.1
26
 * You should have received a copy of the GNU Lesser General Public License
27
 * along with the rsyslog runtime library.  If not, see <http://www.gnu.org/licenses/>.
1.1.2 by Michael Biebl
Import upstream version 3.14.2
28
 *
29
 * A copy of the GPL can be found in the file "COPYING" in this distribution.
1.1.3 by Michael Biebl
Import upstream version 3.16.1
30
 * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
1.1.2 by Michael Biebl
Import upstream version 3.14.2
31
 */
32
#include "config.h"
33
34
#include <stdio.h>
35
#include <stdlib.h>
36
#include <string.h>
37
#include <assert.h>
38
#include <signal.h>
39
#include <pthread.h>
40
#include <errno.h>
41
42
#include "rsyslog.h"
43
#include "syslogd.h"
44
#include "stringbuf.h"
45
#include "srUtils.h"
46
#include "wtp.h"
47
#include "wti.h"
48
#include "obj.h"
49
50
/* static data */
51
DEFobjStaticHelpers
52
53
/* forward-definitions */
54
55
/* methods */
56
57
/* get the header for debug messages
58
 * The caller must NOT free or otherwise modify the returned string!
59
 */
60
static inline uchar *
61
wtiGetDbgHdr(wti_t *pThis)
62
{
63
	ISOBJ_TYPE_assert(pThis, wti);
64
65
	if(pThis->pszDbgHdr == NULL)
66
		return (uchar*) "wti"; /* should not normally happen */
67
	else
68
		return pThis->pszDbgHdr;
69
}
70
71
72
/* get the current worker state. For simplicity and speed, we have
73
 * NOT used our regular calling interface this time. I hope that won't
74
 * bite in the long term... -- rgerhards, 2008-01-17
75
 * TODO: may be performance optimized by atomic operations
76
 */
77
qWrkCmd_t
78
wtiGetState(wti_t *pThis, int bLockMutex)
79
{
80
	DEFVARS_mutexProtection;
81
	qWrkCmd_t tCmd;
82
83
	BEGINfunc
84
	ISOBJ_TYPE_assert(pThis, wti);
85
86
	BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
87
	tCmd = pThis->tCurrCmd;
88
	END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
89
90
	ENDfunc
91
	return tCmd;
92
}
93
94
95
/* send a command to a specific thread
96
 * bActiveOnly specifies if the command should be sent only when the worker is
97
 * in an active state. -- rgerhards, 2008-01-20
98
 */
99
rsRetVal
100
wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
101
{
102
	DEFiRet;
103
	DEFVARS_mutexProtection;
104
105
	ISOBJ_TYPE_assert(pThis, wti);
106
	assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE);
107
108
	BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
109
110
	/* all worker states must be followed sequentially, only termination can be set in any state */
111
	if(   (bActiveOnly && (pThis->tCurrCmd < eWRKTHRD_RUN_CREATED))
112
	   || (pThis->tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) {
113
		dbgprintf("%s: command %d can not be accepted in current %d processing state - ignored\n",
114
			  wtiGetDbgHdr(pThis), tCmd, pThis->tCurrCmd);
115
	} else {
116
		dbgprintf("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd);
117
		switch(tCmd) {
118
			case eWRKTHRD_TERMINATING:
119
				/* TODO: re-enable meaningful debug msg! (via function callback?)
120
				dbgprintf("%s: thread terminating with %d entries left in queue, %d workers running.\n",
121
					  wtiGetDbgHdr(pThis->pQueue), pThis->pQueue->iQueueSize,
122
					  pThis->pQueue->iCurNumWrkThrd);
123
				*/
124
				pthread_cond_signal(&pThis->condExitDone);
125
				dbgprintf("%s: worker terminating\n", wtiGetDbgHdr(pThis));
126
				break;
127
			case eWRKTHRD_RUNNING:
128
				pthread_cond_signal(&pThis->condInitDone);
129
				break;
130
			/* these cases just to satisfy the compiler, we do (yet) not act an them: */
131
			case eWRKTHRD_STOPPED:
132
			case eWRKTHRD_RUN_CREATED:
133
			case eWRKTHRD_RUN_INIT:
134
			case eWRKTHRD_SHUTDOWN:
135
			case eWRKTHRD_SHUTDOWN_IMMEDIATE:
136
				/* DO NOTHING */
137
				break;
138
		}
139
		pThis->tCurrCmd = tCmd; /* apply the new state */
140
	}
141
142
	END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
143
	RETiRet;
144
}
145
146
147
/* Cancel the thread. If the thread is already cancelled or termination,
148
 * we do not again cancel it. But it is save and legal to call wtiCancelThrd() in
149
 * such situations.
150
 * rgerhards, 2008-02-26
151
 */
152
rsRetVal
153
wtiCancelThrd(wti_t *pThis)
154
{
155
	DEFiRet;
156
157
	ISOBJ_TYPE_assert(pThis, wti);
158
159
	d_pthread_mutex_lock(&pThis->mut);
160
161
	if(pThis->tCurrCmd >= eWRKTHRD_TERMINATING) {
162
		dbgoprint((obj_t*) pThis, "canceling worker thread\n");
163
		pthread_cancel(pThis->thrdID);
164
		wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
165
		pThis->pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
166
	}
167
168
	d_pthread_mutex_unlock(&pThis->mut);
169
170
	RETiRet;
171
}
172
173
174
/* Destructor */
175
BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */
176
CODESTARTobjDestruct(wti)
177
	/* if we reach this point, we must make sure the associated worker has terminated. It is
178
	 * the callers duty to make sure the worker already knows it shall terminate.
179
	 * TODO: is it *really* the caller's duty? ...mmmhhhh.... smells bad... rgerhards, 2008-01-25
180
	 */
181
	wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */
182
183
	d_pthread_mutex_lock(&pThis->mut);
184
	if(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) {
185
		dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - joining it\n",
186
			  wtiGetDbgHdr(pThis), pThis);
187
		/* let's hope the caller actually instructed it to shutdown... */
188
		pthread_cond_wait(&pThis->condExitDone, &pThis->mut);
189
		wtiJoinThrd(pThis);
190
	}
191
	d_pthread_mutex_unlock(&pThis->mut);
192
193
	/* actual destruction */
194
	pthread_cond_destroy(&pThis->condInitDone);
195
	pthread_cond_destroy(&pThis->condExitDone);
196
	pthread_mutex_destroy(&pThis->mut);
197
198
	if(pThis->pszDbgHdr != NULL)
199
		free(pThis->pszDbgHdr);
200
ENDobjDestruct(wti)
201
202
203
/* Standard-Constructor for the wti object
204
 */
205
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
206
	pthread_cond_init(&pThis->condInitDone, NULL);
207
	pthread_cond_init(&pThis->condExitDone, NULL);
208
	pthread_mutex_init(&pThis->mut, NULL);
209
ENDobjConstruct(wti)
210
211
212
/* Construction finalizer
213
 * rgerhards, 2008-01-17
214
 */
215
rsRetVal
216
wtiConstructFinalize(wti_t *pThis)
217
{
218
	DEFiRet;
219
220
	ISOBJ_TYPE_assert(pThis, wti);
221
222
	dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
223
224
	/* initialize our thread instance descriptor */
225
	pThis->pUsrp = NULL;
226
	pThis->tCurrCmd = eWRKTHRD_STOPPED;
227
228
	RETiRet;
229
}
230
231
232
/* join a specific worker thread
233
 * we do not lock the mutex, because join will sync anyways...
234
 */
235
rsRetVal
236
wtiJoinThrd(wti_t *pThis)
237
{
238
	DEFiRet;
239
240
	ISOBJ_TYPE_assert(pThis, wti);
241
	dbgprintf("waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd);
242
	pthread_join(pThis->thrdID, NULL);
243
	wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */
244
	pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
245
	dbgprintf("worker %s has stopped\n", wtiGetDbgHdr(pThis));
246
247
	RETiRet;
248
}
249
250
/* check if we had a worker thread changes and, if so, act
251
 * on it. At a minimum, terminated threads are harvested (joined).
252
 */
253
rsRetVal
254
wtiProcessThrdChanges(wti_t *pThis, int bLockMutex)
255
{
256
	DEFiRet;
257
	DEFVARS_mutexProtection;
258
259
	ISOBJ_TYPE_assert(pThis, wti);
260
261
	BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
262
	switch(pThis->tCurrCmd) {
263
		case eWRKTHRD_TERMINATING:
264
			/* we need to at least temporarily release the mutex, because otherwise
265
			 * we may deadlock with the thread we intend to join (it aquires the mutex
266
			 * during termination processing). -- rgerhards, 2008-02-26
267
			 */
268
			END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
269
			iRet = wtiJoinThrd(pThis);
270
			BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
271
			break;
272
		/* these cases just to satisfy the compiler, we do not act an them: */
273
		case eWRKTHRD_STOPPED:
274
		case eWRKTHRD_RUN_CREATED:
275
		case eWRKTHRD_RUN_INIT:
276
		case eWRKTHRD_RUNNING:
277
		case eWRKTHRD_SHUTDOWN:
278
		case eWRKTHRD_SHUTDOWN_IMMEDIATE:
279
			/* DO NOTHING */
280
			break;
281
	}
282
	END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
283
284
	RETiRet;
285
}
286
287
288
/* cancellation cleanup handler for queueWorker ()
289
 * Updates admin structure and frees ressources.
290
 * rgerhards, 2008-01-16
291
 */
292
static void
293
wtiWorkerCancelCleanup(void *arg)
294
{
295
	wti_t *pThis = (wti_t*) arg;
296
	wtp_t *pWtp;
297
	int iCancelStateSave;
298
299
	BEGINfunc
300
	ISOBJ_TYPE_assert(pThis, wti);
301
	pWtp = pThis->pWtp;
302
	ISOBJ_TYPE_assert(pWtp, wtp);
303
304
	dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
305
	
306
	/* call user supplied handler (that one e.g. requeues the element) */
307
	pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp);
308
309
	pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
310
	d_pthread_mutex_lock(&pWtp->mut);
311
	wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
312
	/* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */
313
	pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
314
315
	d_pthread_mutex_unlock(&pWtp->mut);
316
	pthread_setcancelstate(iCancelStateSave, NULL);
317
	ENDfunc
318
}
319
320
321
/* generic worker thread framework
322
 *
323
 * Some special comments below, so that they do not clutter the main function code:
324
 *
325
 * On the use of pthread_testcancel():
326
 * Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
327
 * a cancellation point in itself. As we run most of the time without cancel enabled, I fear
328
 * we may never get cancelled if we do not create a cancellation point ourselfs.
329
 *
330
 * On the use of pthread_yield():
331
 * We yield to give the other threads a chance to obtain the mutex. If we do not
332
 * do that, this thread may very well aquire the mutex again before another thread
333
 * has even a chance to run. The reason is that mutex operations are free to be
334
 * implemented in the quickest possible way (and they typically are!). That is, the
335
 * mutex lock/unlock most probably just does an atomic memory swap and does not necessarily
336
 * schedule other threads waiting on the same mutex. That can lead to the same thread
337
 * aquiring the mutex ever and ever again while all others are starving for it. We
338
 * have exactly seen this behaviour when we deliberately introduced a long-running
339
 * test action which basically did a sleep. I understand that with real actions the
340
 * likelihood of this starvation condition is very low - but it could still happen
341
 * and would be very hard to debug. The yield() is a sure fix, its performance overhead
342
 * should be well accepted given the above facts. -- rgerhards, 2008-01-10
343
 */
344
rsRetVal
345
wtiWorker(wti_t *pThis)
346
{
347
	DEFiRet;
348
	DEFVARS_mutexProtection;
349
	struct timespec t;
350
	wtp_t *pWtp;		/* our worker thread pool */
351
	int bInactivityTOOccured = 0;
352
353
	ISOBJ_TYPE_assert(pThis, wti);
354
	pWtp = pThis->pWtp; /* shortcut */
355
	ISOBJ_TYPE_assert(pWtp, wtp);
356
357
	dbgSetThrdName(pThis->pszDbgHdr);
358
	pThis->pUsrp = NULL;
359
	pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
360
361
	BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
362
	pWtp->pfOnWorkerStartup(pWtp->pUsr);
363
	END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
364
365
	/* now we have our identity, on to real processing */
366
	while(1) { /* loop will be broken below - need to do mutex locks */
367
		/* process any pending thread requests */
368
		wtpProcessThrdChanges(pWtp);
369
		pthread_testcancel(); /* see big comment in function header */
370
#		if !defined(__hpux) /* pthread_yield is missing there! */
371
		pthread_yield(); /* see big comment in function header */
372
#		endif
373
1.1.5 by Michael Biebl
Import upstream version 3.18.1
374
		/* if we have a rate-limiter set for this worker pool, let's call it. Please
375
		 * keep in mind that the rate-limiter may hold us for an extended period
376
		 * of time. -- rgerhards, 2008-04-02
377
		 */
378
		if(pWtp->pfRateLimiter != NULL) {
379
			pWtp->pfRateLimiter(pWtp->pUsr);
380
		}
381
		
1.1.2 by Michael Biebl
Import upstream version 3.14.2
382
		wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
383
		BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
384
385
		if(  (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED))
386
		   || wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
387
			END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
388
			break; /* end worker thread run */
389
		}
390
		bInactivityTOOccured = 0; /* reset for next run */
391
392
		/* if we reach this point, we are still protected by the mutex */
393
394
		if(pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) {
395
			dbgprintf("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
396
			pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
397
398
			if(pWtp->toWrkShutdown == -1) {
399
				/* never shut down any started worker */
400
				d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
401
			} else {
402
				timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
403
				if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
404
					dbgprintf("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
405
					bInactivityTOOccured = 1; /* indicate we had a timeout */
406
				}
407
			}
408
			END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
409
			continue; /* request next iteration */
410
		}
411
412
		/* if we reach this point, we have a non-empty queue (and are still protected by mutex) */
413
		pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
414
	}
415
416
	/* indicate termination */
417
	pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
418
	d_pthread_mutex_lock(&pThis->mut);
419
	pthread_cleanup_pop(0); /* remove cleanup handler */
420
421
	pWtp->pfOnWorkerShutdown(pWtp->pUsr);
422
423
	wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
424
	pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
425
	d_pthread_mutex_unlock(&pThis->mut);
426
	pthread_setcancelstate(iCancelStateSave, NULL);
427
428
	RETiRet;
429
}
430
431
432
/* some simple object access methods */
433
DEFpropSetMeth(wti, pWtp, wtp_t*);
434
435
/* set the debug header message
436
 * The passed-in string is duplicated. So if the caller does not need
437
 * it any longer, it must free it. Must be called only before object is finalized.
438
 * rgerhards, 2008-01-09
439
 */
440
rsRetVal
441
wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg)
442
{
443
	DEFiRet;
444
445
	ISOBJ_TYPE_assert(pThis, wti);
446
	assert(pszMsg != NULL);
447
	
448
	if(lenMsg < 1)
449
		ABORT_FINALIZE(RS_RET_PARAM_ERROR);
450
451
	if(pThis->pszDbgHdr != NULL) {
452
		free(pThis->pszDbgHdr);
453
		pThis->pszDbgHdr = NULL;
454
	}
455
456
	if((pThis->pszDbgHdr = malloc(sizeof(uchar) * lenMsg + 1)) == NULL)
457
		ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
458
459
	memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */
460
461
finalize_it:
462
	RETiRet;
463
}
464
465
466
/* dummy */
467
rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
468
469
470
/* Initialize the wti class. Must be called as the very first method
471
 * before anything else is called inside this class.
472
 * rgerhards, 2008-01-09
473
 */
474
BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */
475
	/* request objects we use */
476
ENDObjClassInit(wti)
477
478
/*
479
 * vi:set ai:
480
 */