~ubuntu-branches/ubuntu/trusty/rsyslog/trusty

« back to all changes in this revision

Viewing changes to runtime/wtp.c

  • Committer: Dave Walker (Daviey)
  • Author(s): Scott Moser
  • Date: 2011-06-17 20:59:38 UTC
  • mfrom: (36.1.8 oneiric.merge)
  • Revision ID: davewalker@ubuntu.com-20110617205938-pfkizxz2wsgzi2ot
Tags: 5.8.1-1ubuntu1
* Resynchronise with Debian (LP: #794230).  Remaining changes:
  - Run as rsyslog:rsyslog, set $FileOwner to syslog
  - Replace init script with debian/rsyslog.upstart.
  - debian/rsyslog.logrotate: Use reload command to restart rsyslog
  - debian/rsyslog.conf: enable $RepeatedMsgReduction 
    to avoid bloating the syslog file (LP #453444)
  - Add debian/rsyslog.dmesg.upstart to save initial dmesg into a file.
    Install it in debian/rules.
  - debian/50-default.conf: set of default rules for syslog (forwarded to
    Debian #603160). remove file in postrm on purge. manage with ucf.
  - debian/rules: build with LDFLAGS=""
* Dropped:
  - debian/control: Bump build-dependency on debhelper
    debian now depends on dh >= 8
* New upstream release.
* Bump Standards-Version to 3.9.2. No further changes.
* Enable and install impstats module. (Closes: #620114)
* Update logcheck rule. (Closes: #616659)
* debian/rsyslog.init: Set correct compat level (5).
* The way rsyslog processes SIGHUP has changed. It no longer does a reload
  of its configuration, but simply closes all open files. To apply a changed
  configuration, rsyslogd needs to be restarted now.
  - Drop "reload" action from debian/rsyslog.init, map "force-reload" to
    "restart". (Closes: #580897)
  - Add "rotate" action to debian/rsyslog.init which sends SIGHUP to
    rsyslogd. Use that in debian/rsyslog.logrotate. (Closes: #626365)
  - Update debian/rsyslog-mysql.postinst and rsyslog-pgsql.postinst to use
    restart instead of reload.
  - Add a NEWS file explaining the changed SIGHUP handling.
* New upstream stable release.
* New upstream release.
  - Properly handle ANSI SQL strings in ompgsql. (Closes: #600479)
* New upstream release.
* debian/patches/02-pmaixforwardedfrom_type_nokeep.patch
  - Remove, merged upstream.
* debian/patches/03-epoll_create1-fallback.patch
  - Remove, merged upstream.
* debian/patches/03-epoll_create1-fallback.patch
  - If epoll_create1() is not available during runtime, fall back to
    epoll_create(). This fixes remote syslog when runnig rsyslog on a
    lenny kernel. (Closes: #617996)
* New upstream release.
* debian/rsyslog.links
  - Create symlink for rsyslog.service in multi-user.target.wants so rsyslog
    is enabled by default when using systemd.
* debian/patches/02-pmaixforwardedfrom_type_nokeep.patch
  - Fix build failure in aixforwardedfrom parser module by setting the
    module type to NOKEEP.
* debian/rsyslog.preinst
  - Remove old rsyslog.socket symlink from sockets.target.wants on upgrades
    as rsyslog uses syslog.socket now which is provided by systemd.
* debian/rsyslog.install
  - Stop installing rsyslog.socket.
* New upstream release.
* New upstream release.
  - Fix regression in imuxsock plugin which did no longer sanitize received
    messages. This makes 02-cleanup-trailing-lf.patch obsolete and also
    fixes the SQL syntax errors in the mysql output if the input contained
    NUL bytes. Closes: #614061
* Enable and install omprog output plugin. Closes: #552095
* Improve package description. Closes: #612948
  Thanks to Justin B Rye for the patch.
* debian/patches/02-cleanup-trailing-lf.patch
  - Fix regression in imuxsock plugin which did not remove a trailing LF
    anymore. Patch cherry-picked from upstream Git. Closes: #612829
* New upstream release.
* Enable and install parser modules.
* New upstream release.
* Upload to unstable.
* debian/patches/02-typo_fix_equation_sign.patch
  - Removed, merged upstream.
* debian/patches/03-atomic_operations.patch
  - Removed, merged upstream.
* debian/patches/03-atomic_operations.patch
  - Fix build failures on platforms which don't have 64 bit atomic
    operations. Patch cherry-picked from upstream Git. Closes: #600930
* New upstream development release.
* Remove patches, merged upstream
  - debian/patches/02-install_also_rsyslog_socket.patch
  - debian/patches/02-tls_loop_fix.patch
* debian/patches/02-typo_fix_equation_sign.patch
  - Fix small typo ("equation sign"). Closes: #575589
* debian/rsyslog.postinst
  - Remove pre-lenny migration code to rotate old log files from sysklogd.
* New upstream development release.
* debian/rsyslog.install
  - Install omruleset.so plugin: http://www.rsyslog.com/doc/omruleset.html
* debian/rsyslog.default
  - Start rsyslogd with native -c5 mode.
* Install systemd unit files which allow to run rsyslog in socket activation
  mode when systemd is used.
* debian/patches/02-install_also_rsyslog_socket.patch
  - When enabling rsyslog.service also enable rsyslog.socket. Patch
    cherry-picked from upstream Git.
* Bump debhelper compatibility level to 8. Update Build-Depends accordingly.

Show diffs side-by-side

added added

removed removed

Lines of Context:
8
8
 * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
9
9
 * if you are getting aquainted to the object.
10
10
 *
11
 
 * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
 
11
 * Copyright 2008,2009 Rainer Gerhards and Adiscon GmbH.
12
12
 *
13
13
 * This file is part of the rsyslog runtime library.
14
14
 *
44
44
#  include <sys/prctl.h>
45
45
#endif
46
46
 
47
 
#ifdef OS_SOLARIS
48
 
#       include <sched.h>
49
 
#endif
 
47
/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20
 
48
//#ifdef OS_SOLARIS
 
49
//#     include <sched.h>
 
50
//#endif
50
51
 
51
52
#include "rsyslog.h"
52
53
#include "stringbuf.h"
82
83
 
83
84
 
84
85
/* Not implemented dummy function for constructor */
85
 
static rsRetVal NotImplementedDummy() { return RS_RET_OK; }
 
86
static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; }
86
87
/* Standard-Constructor for the wtp object
87
88
 */
88
89
BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */
89
 
        pthread_mutex_init(&pThis->mut, NULL);
90
 
        pthread_mutex_init(&pThis->mutThrdShutdwn, NULL);
 
90
        pthread_mutex_init(&pThis->mutWtp, NULL);
91
91
        pthread_cond_init(&pThis->condThrdTrm, NULL);
 
92
        pthread_attr_init(&pThis->attrThrd);
 
93
        /* Set thread scheduling policy to default */
 
94
#ifdef HAVE_PTHREAD_SETSCHEDPARAM
 
95
        pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy);
 
96
        pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param);
 
97
        pthread_attr_setinheritsched(&pThis->attrThrd, PTHREAD_EXPLICIT_SCHED);
 
98
#endif
 
99
        pthread_attr_setdetachstate(&pThis->attrThrd, PTHREAD_CREATE_DETACHED);
92
100
        /* set all function pointers to "not implemented" dummy so that we can safely call them */
93
101
        pThis->pfChkStopWrkr = NotImplementedDummy;
94
 
        pThis->pfIsIdle = NotImplementedDummy;
 
102
        pThis->pfGetDeqBatchSize = NotImplementedDummy;
95
103
        pThis->pfDoWork = NotImplementedDummy;
96
 
        pThis->pfOnIdle = NotImplementedDummy;
97
 
        pThis->pfOnWorkerCancel = NotImplementedDummy;
98
 
        pThis->pfOnWorkerStartup = NotImplementedDummy;
99
 
        pThis->pfOnWorkerShutdown = NotImplementedDummy;
 
104
        pThis->pfObjProcessed = NotImplementedDummy;
 
105
        INIT_ATOMIC_HELPER_MUT(pThis->mutCurNumWrkThrd);
 
106
        INIT_ATOMIC_HELPER_MUT(pThis->mutWtpState);
100
107
ENDobjConstruct(wtp)
101
108
 
102
109
 
114
121
 
115
122
        ISOBJ_TYPE_assert(pThis, wtp);
116
123
 
117
 
        dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis));
 
124
        DBGPRINTF("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis));
118
125
        /* alloc and construct workers - this can only be done in finalizer as we previously do
119
126
         * not know the max number of workers
120
127
         */
121
 
        if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL)
122
 
                ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
123
 
 
 
128
        CHKmalloc(pThis->pWrkr = MALLOC(sizeof(wti_t*) * pThis->iNumWorkerThreads));
 
129
        
124
130
        for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
125
131
                CHKiRet(wtiConstruct(&pThis->pWrkr[i]));
126
132
                pWti = pThis->pWrkr[i];
140
146
BEGINobjDestruct(wtp) /* be sure to specify the object type also in END and CODESTART macros! */
141
147
        int i;
142
148
CODESTARTobjDestruct(wtp)
143
 
        wtpProcessThrdChanges(pThis); /* process thread changes one last time */
144
 
 
145
149
        /* destruct workers */
146
150
        for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
147
151
                wtiDestruct(&pThis->pWrkr[i]);
151
155
 
152
156
        /* actual destruction */
153
157
        pthread_cond_destroy(&pThis->condThrdTrm);
154
 
        pthread_mutex_destroy(&pThis->mut);
155
 
        pthread_mutex_destroy(&pThis->mutThrdShutdwn);
 
158
        pthread_mutex_destroy(&pThis->mutWtp);
 
159
        pthread_attr_destroy(&pThis->attrThrd);
 
160
        DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurNumWrkThrd);
 
161
        DESTROY_ATOMIC_HELPER_MUT(pThis->mutWtpState);
156
162
 
157
163
        free(pThis->pszDbgHdr);
158
164
ENDobjDestruct(wtp)
159
165
 
160
166
 
161
 
/* wake up at least one worker thread.
162
 
 * rgerhards, 2008-01-20
163
 
 */
164
 
rsRetVal
165
 
wtpWakeupWrkr(wtp_t *pThis)
166
 
{
167
 
        DEFiRet;
168
 
 
169
 
        /* TODO; mutex? I think not needed, as we do not need predictable exec order -- rgerhards, 2008-01-28 */
170
 
        ISOBJ_TYPE_assert(pThis, wtp);
171
 
        pthread_cond_signal(pThis->pcondBusy);
172
 
        RETiRet;
173
 
}
174
 
 
175
 
/* wake up all worker threads.
176
 
 * rgerhards, 2008-01-16
177
 
 */
178
 
rsRetVal
179
 
wtpWakeupAllWrkr(wtp_t *pThis)
180
 
{
181
 
        DEFiRet;
182
 
 
183
 
        ISOBJ_TYPE_assert(pThis, wtp);
184
 
        pthread_cond_broadcast(pThis->pcondBusy);
185
 
        RETiRet;
186
 
}
187
 
 
188
 
 
189
 
/* check if we had any worker thread changes and, if so, act
190
 
 * on them. At a minimum, terminated threads are harvested (joined).
191
 
 * This function MUST NEVER block on the queue mutex!
192
 
 */
193
 
rsRetVal
194
 
wtpProcessThrdChanges(wtp_t *pThis)
195
 
{
196
 
        DEFiRet;
197
 
        int i;
198
 
 
199
 
        ISOBJ_TYPE_assert(pThis, wtp);
200
 
 
201
 
        if(pThis->bThrdStateChanged == 0)
202
 
                FINALIZE;
203
 
 
204
 
        if(d_pthread_mutex_trylock(&(pThis->mutThrdShutdwn)) != 0) {
205
 
                /* another thread is already in the loop */
206
 
                FINALIZE;
207
 
        }
208
 
 
209
 
        /* Note: there is a left-over potential race condition below:
210
 
         * pThis->bThrdStateChanged may be re-set by another thread while
211
 
         * we work on it and thus the loop may terminate too early. However,
212
 
         * there are no really bad effects from that so I perfer - for this
213
 
         * version - to live with the problem as is. Not a good idea to 
214
 
         * introduce that large change into the stable branch without very
215
 
         * good reason. -- rgerhards, 2009-04-02
216
 
         */
217
 
        do {
218
 
                /* reset the change marker */
219
 
                ATOMIC_STORE_0_TO_INT(pThis->bThrdStateChanged);
220
 
                /* go through all threads */
221
 
                for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
222
 
                        wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX);
223
 
                }
224
 
        /* restart if another change occured while we were processing the changes */
225
 
        } while(pThis->bThrdStateChanged != 0);
226
 
 
227
 
        d_pthread_mutex_unlock(&(pThis->mutThrdShutdwn));
228
 
 
229
 
finalize_it:
230
 
        RETiRet;
231
 
}
232
 
 
233
 
 
234
 
/* Sent a specific state for the worker thread pool.
235
 
 * rgerhards, 2008-01-21
 
167
/* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21
 
168
 * We do not need to do atomic instructions as set operations are only 
 
169
 * called when terminating the pool, and then in strict sequence. So we
 
170
 * can never overwrite each other. On the other hand, it also doesn't
 
171
 * matter if the read operation obtains an older value, as we then simply
 
172
 * do one more iteration, what is perfectly legal (during shutdown
 
173
 * they are awoken in any case). -- rgerhards, 2009-07-20
236
174
 */
237
175
rsRetVal
238
176
wtpSetState(wtp_t *pThis, wtpState_t iNewState)
239
177
{
240
 
        DEFiRet;
241
 
 
242
178
        ISOBJ_TYPE_assert(pThis, wtp);
243
 
        pThis->wtpState = iNewState;
244
 
        /* TODO: must wakeup workers? seen to be not needed -- rgerhards, 2008-01-28 */
245
 
 
246
 
        RETiRet;
 
179
        pThis->wtpState = iNewState; // TODO: do we need a mutex here? 2010-04-26
 
180
        return RS_RET_OK;
247
181
}
248
182
 
249
183
 
250
184
/* check if the worker shall shutdown (1 = yes, 0 = no)
251
 
 * TODO: check if we can use atomic operations to enhance performance
252
185
 * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user"
253
186
 * (e.g. the queue clas)
254
187
 * rgerhards, 2008-01-21
255
188
 */
256
189
rsRetVal
257
 
wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
 
190
wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex)
258
191
{
259
192
        DEFiRet;
260
 
        DEFVARS_mutexProtection;
 
193
        wtpState_t wtpState;
261
194
 
262
195
        ISOBJ_TYPE_assert(pThis, wtp);
 
196
        /* we need a consistent value, but it doesn't really matter if it is changed
 
197
         * right after the fetch - then we simply do one more iteration in the worker
 
198
         */
 
199
        wtpState = (wtpState_t) ATOMIC_FETCH_32BIT((int*)&pThis->wtpState, &pThis->mutWtpState);
263
200
 
264
 
        BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
265
 
        if(   (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
266
 
           || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex)))
267
 
                iRet = RS_RET_TERMINATE_NOW;
268
 
        END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
 
201
        if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
 
202
                ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
 
203
        } else if(wtpState == wtpState_SHUTDOWN) {
 
204
                ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE);
 
205
        }
269
206
 
270
207
        /* try customer handler if one was set and we do not yet have a definite result */
271
 
        if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) {
 
208
        if(pThis->pfChkStopWrkr != NULL) {
272
209
                iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
273
210
        }
274
211
 
 
212
finalize_it:
275
213
        RETiRet;
276
214
}
277
215
 
278
216
 
279
217
#pragma GCC diagnostic ignored "-Wempty-body"
280
218
/* Send a shutdown command to all workers and see if they terminate.
281
 
 * A timeout may be specified.
 
219
 * A timeout may be specified. This function may also be called with
 
220
 * the current number of workers being 0, in which case it does not
 
221
 * shut down any worker.
282
222
 * rgerhards, 2008-01-14
283
223
 */
284
224
rsRetVal
286
226
{
287
227
        DEFiRet;
288
228
        int bTimedOut;
289
 
        int iCancelStateSave;
 
229
        int i;
290
230
 
291
231
        ISOBJ_TYPE_assert(pThis, wtp);
292
232
 
 
233
        /* lock mutex to prevent races (may otherwise happen during idle processing and such...) */
 
234
        d_pthread_mutex_lock(pThis->pmutUsr);
293
235
        wtpSetState(pThis, tShutdownCmd);
294
 
        wtpWakeupAllWrkr(pThis);
 
236
        pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */
 
237
        /* awake workers in retry loop */
 
238
        for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
 
239
                wtiWakeupThrd(pThis->pWrkr[i]);
 
240
        }
 
241
        d_pthread_mutex_unlock(pThis->pmutUsr);
295
242
 
296
 
        /* see if we need to harvest (join) any terminated threads (even in timeout case,
297
 
         * some may have terminated...
298
 
         */
299
 
        wtpProcessThrdChanges(pThis);
300
 
                
301
 
        /* and wait for their termination */
302
 
        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
303
 
        d_pthread_mutex_lock(&pThis->mut);
304
 
        pthread_cleanup_push(mutexCancelCleanup, &pThis->mut);
305
 
        pthread_setcancelstate(iCancelStateSave, NULL);
 
243
        /* wait for worker thread termination */
 
244
        d_pthread_mutex_lock(&pThis->mutWtp);
 
245
        pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp);
306
246
        bTimedOut = 0;
307
247
        while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
308
 
                dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
309
 
                           wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd);
 
248
                DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n",
 
249
                           wtpGetDbgHdr(pThis), timeoutVal(ptTimeout),
 
250
                           ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
310
251
 
311
 
                if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, ptTimeout) != 0) {
312
 
                        dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
 
252
                if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutWtp, ptTimeout) != 0) {
 
253
                        DBGPRINTF("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
313
254
                        bTimedOut = 1;  /* we exit the loop on timeout */
314
255
                }
 
256
 
 
257
                /* awake workers in retry loop */
 
258
                for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
 
259
                        wtiWakeupThrd(pThis->pWrkr[i]);
 
260
                }
 
261
 
315
262
        }
316
263
        pthread_cleanup_pop(1);
317
264
 
318
265
        if(bTimedOut)
319
266
                iRet = RS_RET_TIMED_OUT;
320
267
        
321
 
        /* see if we need to harvest (join) any terminated threads (even in timeout case,
322
 
         * some may have terminated...
323
 
         */
324
 
        wtpProcessThrdChanges(pThis);
325
 
 
326
268
        RETiRet;
327
269
}
328
270
#pragma GCC diagnostic warning "-Wempty-body"
329
271
 
330
272
 
331
 
/* indicate that a thread has terminated and awake anyone waiting on it
332
 
 * rgerhards, 2008-01-23
333
 
 */
334
 
rsRetVal wtpSignalWrkrTermination(wtp_t *pThis)
335
 
{
336
 
        DEFiRet;
337
 
        /* I leave the mutex code here out as it gives us deadlocks. I think it is not really
338
 
         * needed and we are on the safe side. I leave this comment in if practice proves us
339
 
         * wrong. The whole thing should be removed after half a year or year if we see there
340
 
         * actually is no issue (or revisit it from a theoretical POV).
341
 
         * rgerhards, 2008-01-28
342
 
         * revisited 2008-09-30, still a bit unclear, leave in
343
 
         */
344
 
        /*TODO: mutex or not mutex, that's the question ;)DEFVARS_mutexProtection;*/
345
 
 
346
 
        ISOBJ_TYPE_assert(pThis, wtp);
347
 
 
348
 
        /*BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);*/
349
 
        pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
350
 
        /*END_MTX_PROTECTED_OPERATIONS(&pThis->mut);*/
351
 
        RETiRet;
352
 
}
353
 
 
354
 
 
355
273
/* Unconditionally cancel all running worker threads.
356
274
 * rgerhards, 2008-01-14
357
275
 */
363
281
 
364
282
        ISOBJ_TYPE_assert(pThis, wtp);
365
283
 
366
 
        /* process any pending thread requests so that we know who actually is still running */
367
 
        wtpProcessThrdChanges(pThis);
368
 
 
369
284
        /* go through all workers and cancel those that are active */
370
285
        for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
371
 
                dbgprintf("%s: try canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
372
286
                wtiCancelThrd(pThis->pWrkr[i]);
373
287
        }
374
288
 
376
290
}
377
291
 
378
292
 
379
 
 
380
 
/* Set the Inactivity Guard
381
 
 * rgerhards, 2008-01-21
 
293
/* this function contains shared code for both regular worker shutdown as
 
294
 * well as shutdown via cancellation. We can not simply use pthread_cleanup_pop(1)
 
295
 * as this introduces a race in the debug system (RETiRet system).
 
296
 * rgerhards, 2009-10-26
382
297
 */
383
 
rsRetVal
384
 
wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex)
 
298
static inline void
 
299
wtpWrkrExecCleanup(wti_t *pWti)
385
300
{
386
 
        DEFiRet;
387
 
        DEFVARS_mutexProtection;
388
 
 
389
 
        BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
390
 
        pThis->bInactivityGuard = bNewState;
391
 
        END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
392
 
 
393
 
        RETiRet;
 
301
        wtp_t *pThis;
 
302
 
 
303
        BEGINfunc
 
304
        ISOBJ_TYPE_assert(pWti, wti);
 
305
        pThis = pWti->pWtp;
 
306
        ISOBJ_TYPE_assert(pThis, wtp);
 
307
 
 
308
        /* the order of the next two statements is important! */
 
309
        wtiSetState(pWti, WRKTHRD_STOPPED);
 
310
        ATOMIC_DEC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd);
 
311
 
 
312
        DBGPRINTF("%s: Worker thread %lx, terminated, um workers now %d\n",
 
313
                  wtpGetDbgHdr(pThis), (unsigned long) pWti,
 
314
                  ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
 
315
 
 
316
        ENDfunc
394
317
}
395
318
 
396
319
 
397
 
/* cancellation cleanup handler for executing worker
398
 
 * decrements the worker counter
399
 
 * rgerhards, 2008-01-20
 
320
/* cancellation cleanup handler for executing worker decrements the worker counter.
 
321
 * rgerhards, 2009-07-20
400
322
 */
401
 
void
 
323
static void
402
324
wtpWrkrExecCancelCleanup(void *arg)
403
325
{
404
 
        wtp_t *pThis = (wtp_t*) arg;
 
326
        wti_t *pWti = (wti_t*) arg;
 
327
        wtp_t *pThis;
405
328
 
406
329
        BEGINfunc
 
330
        ISOBJ_TYPE_assert(pWti, wti);
 
331
        pThis = pWti->pWtp;
407
332
        ISOBJ_TYPE_assert(pThis, wtp);
408
 
        pThis->iCurNumWrkThrd--;
409
 
        wtpSignalWrkrTermination(pThis);
410
 
 
411
 
        dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd);
 
333
        DBGPRINTF("%s: Worker thread %lx requested to be cancelled.\n",
 
334
                  wtpGetDbgHdr(pThis), (unsigned long) pWti);
 
335
 
 
336
        wtpWrkrExecCleanup(pWti);
 
337
 
412
338
        ENDfunc
 
339
        /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main
 
340
         * thread after the broadcast, which could destroy the debug class, resulting in a potential
 
341
         * segfault. So we need to do the broadcast as actually the last action in our processing
 
342
         */
 
343
        pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
413
344
}
414
345
 
415
346
 
421
352
static void *
422
353
wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */
423
354
{
 
355
        wti_t *pWti = (wti_t*) arg;
 
356
        wtp_t *pThis;
 
357
        sigset_t sigSet;
 
358
#       if HAVE_PRCTL && defined PR_SET_NAME
424
359
        uchar *pszDbgHdr;
425
360
        uchar thrdName[32] = "rs:";
426
 
        DEFiRet;
427
 
        DEFVARS_mutexProtection;
428
 
        wti_t *pWti = (wti_t*) arg;
429
 
        wtp_t *pThis;
430
 
        sigset_t sigSet;
 
361
#       endif
431
362
 
 
363
        BEGINfunc
432
364
        ISOBJ_TYPE_assert(pWti, wti);
433
365
        pThis = pWti->pWtp;
434
366
        ISOBJ_TYPE_assert(pThis, wtp);
435
367
 
 
368
        /* block all signals */
436
369
        sigfillset(&sigSet);
437
370
        pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
438
371
 
 
372
        /* but ignore SIGTTN, which we (ab)use to signal the thread to shutdown -- rgerhards, 2009-07-20 */
 
373
        sigemptyset(&sigSet);
 
374
        sigaddset(&sigSet, SIGTTIN);
 
375
        pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL);
 
376
 
439
377
#       if HAVE_PRCTL && defined PR_SET_NAME
440
378
        /* set thread name - we ignore if the call fails, has no harsh consequences... */
441
379
        pszDbgHdr = wtpGetDbgHdr(pThis);
445
383
        }
446
384
#       endif
447
385
 
448
 
        BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
449
 
 
450
 
        /* do some late initialization */
451
 
 
452
 
        pthread_cleanup_push(wtpWrkrExecCancelCleanup, pThis);
453
 
 
454
 
        /* finally change to RUNNING state. We need to check if we actually should still run,
455
 
         * because someone may have requested us to shut down even before we got a chance to do
456
 
         * our init. That would be a bad race... -- rgerhards, 2008-01-16
457
 
         */
458
 
        wtiSetState(pWti, eWRKTHRD_RUNNING, 0, MUTEX_ALREADY_LOCKED); /* we are running now! */
459
 
 
460
 
        do {
461
 
                END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
462
 
 
463
 
                iRet = wtiWorker(pWti); /* just to make sure: this is NOT protected by the mutex! */
464
 
 
465
 
                BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
466
 
        } while(pThis->iCurNumWrkThrd == 1 && pThis->bInactivityGuard == 1);
467
 
        /* inactivity guard prevents shutdown of all workers while one should be running due to race
468
 
         * condition. It can lead to one more worker running than desired, but that is acceptable. After
469
 
         * all, that worker will shutdown itself due to inactivity timeout. If, however, none were running
470
 
         * when one was required, processing could come to a halt. -- rgerhards, 2008-01-21
471
 
         */
472
 
 
 
386
        pthread_cleanup_push(wtpWrkrExecCancelCleanup, pWti);
 
387
        wtiWorker(pWti);
473
388
        pthread_cleanup_pop(0);
474
 
        pThis->iCurNumWrkThrd--;
475
 
        wtpSignalWrkrTermination(pThis);
476
 
 
477
 
        dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n",
478
 
                  wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd);
479
 
 
480
 
        END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
 
389
        wtpWrkrExecCleanup(pWti);
481
390
 
482
391
        ENDfunc
 
392
        /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main
 
393
         * thread after the broadcast, which could destroy the debug class, resulting in a potential
 
394
         * segfault. So we need to do the broadcast as actually the last action in our processing
 
395
         */
 
396
        pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
483
397
        pthread_exit(0);
484
398
}
485
399
#pragma GCC diagnostic warning "-Wempty-body"
487
401
 
488
402
/* start a new worker */
489
403
static rsRetVal
490
 
wtpStartWrkr(wtp_t *pThis, int bLockMutex)
 
404
wtpStartWrkr(wtp_t *pThis)
491
405
{
492
 
        DEFiRet;
493
 
        DEFVARS_mutexProtection;
494
406
        wti_t *pWti;
495
407
        int i;
496
408
        int iState;
 
409
        DEFiRet;
497
410
 
498
411
        ISOBJ_TYPE_assert(pThis, wtp);
499
412
 
500
 
        wtpProcessThrdChanges(pThis);   // TODO: Performance: this causes a lot of FUTEX calls
501
 
 
502
 
        BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
503
 
 
504
 
        pThis->iCurNumWrkThrd++;
505
 
 
506
 
        /* find free spot in thread table. If we find at least one worker that is in initialization,
507
 
         * we do NOT start a new one. Let's give the other one a chance, first.
508
 
         */
 
413
        d_pthread_mutex_lock(&pThis->mutWtp);
 
414
 
 
415
        /* find free spot in thread table. */
509
416
        for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
510
 
                if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) {
 
417
                if(wtiGetState(pThis->pWrkr[i]) == WRKTHRD_STOPPED) {
511
418
                        break;
512
419
                }
513
420
        }
515
422
        if(i == pThis->iNumWorkerThreads)
516
423
                ABORT_FINALIZE(RS_RET_NO_MORE_THREADS);
517
424
 
 
425
        if(i == 0 || pThis->toWrkShutdown == -1) {
 
426
                wtiSetAlwaysRunning(pThis->pWrkr[i]);
 
427
        }
 
428
 
518
429
        pWti = pThis->pWrkr[i];
519
 
        wtiSetState(pWti, eWRKTHRD_RUN_CREATED, 0, LOCK_MUTEX);
520
 
        iState = pthread_create(&(pWti->thrdID), NULL, wtpWorker, (void*) pWti);
521
 
        dbgprintf("%s: started with state %d, num workers now %d\n",
522
 
                  wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd);
 
430
        wtiSetState(pWti, WRKTHRD_RUNNING);
 
431
        iState = pthread_create(&(pWti->thrdID), &pThis->attrThrd, wtpWorker, (void*) pWti);
 
432
        ATOMIC_INC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd); /* we got one more! */
523
433
 
524
 
        /* indicate we just started a worker and would like to see it running */
525
 
        wtpSetInactivityGuard(pThis, 1, MUTEX_ALREADY_LOCKED);
 
434
        DBGPRINTF("%s: started with state %d, num workers now %d\n",
 
435
                  wtpGetDbgHdr(pThis), iState,
 
436
                  ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
526
437
 
527
438
finalize_it:
528
 
        END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
 
439
        d_pthread_mutex_unlock(&pThis->mutWtp);
529
440
        RETiRet;
530
441
}
531
442
 
542
453
wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
543
454
{
544
455
        DEFiRet;
545
 
        DEFVARS_mutexProtection;
546
456
        int nMissing; /* number workers missing to run */
547
457
        int i;
548
458
 
551
461
        if(nMaxWrkr == 0)
552
462
                FINALIZE;
553
463
 
554
 
        BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
555
 
 
556
464
        if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */
557
465
                nMaxWrkr = pThis->iNumWorkerThreads;
558
466
 
559
 
        nMissing = nMaxWrkr - pThis->iCurNumWrkThrd;
 
467
        nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd);
560
468
 
561
469
        if(nMissing > 0) {
562
 
                dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
 
470
                DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n",
 
471
                          wtpGetDbgHdr(pThis), nMissing);
563
472
                /* start the rqtd nbr of workers */
564
473
                for(i = 0 ; i < nMissing ; ++i) {
565
 
                        CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED));
566
 
                }
567
 
        } else  {
568
 
                if(nMaxWrkr > 0) {
569
 
        dbgprintf("wtpAdviseMaxWorkers signals busy\n");
570
 
                        wtpWakeupWrkr(pThis);
571
 
                }
 
474
                        CHKiRet(wtpStartWrkr(pThis));
 
475
                }
 
476
        } else {
 
477
                pthread_cond_signal(pThis->pcondBusy);
572
478
        }
573
479
 
574
480
        
575
481
finalize_it:
576
 
        END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
577
482
        RETiRet;
578
483
}
579
484
 
587
492
DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t)
588
493
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
589
494
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
590
 
DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int))
591
 
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int))
592
 
DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
593
 
DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*))
594
 
DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*))
595
 
DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*))
596
 
 
597
 
 
598
 
/* return the current number of worker threads.
599
 
 * TODO: atomic operation would bring a nice performance
600
 
 * enhancemcent
601
 
 * rgerhards, 2008-01-27
602
 
 */
603
 
int
604
 
wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex)
605
 
{
606
 
        DEFVARS_mutexProtection;
607
 
        int iNumWrkr;
608
 
 
609
 
        BEGINfunc
610
 
        ISOBJ_TYPE_assert(pThis, wtp);
611
 
 
612
 
        BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
613
 
        iNumWrkr = pThis->iCurNumWrkThrd;
614
 
        END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
615
 
 
616
 
        ENDfunc
617
 
        return iNumWrkr;
618
 
}
 
495
DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
 
496
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*))
 
497
DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*))
619
498
 
620
499
 
621
500
/* set the debug header message
639
518
                pThis->pszDbgHdr = NULL;
640
519
        }
641
520
 
642
 
        if((pThis->pszDbgHdr = malloc(sizeof(uchar) * lenMsg + 1)) == NULL)
 
521
        if((pThis->pszDbgHdr = MALLOC(sizeof(uchar) * lenMsg + 1)) == NULL)
643
522
                ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
644
523
 
645
524
        memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */
669
548
        CHKiRet(objUse(glbl, CORE_COMPONENT));
670
549
ENDObjClassInit(wtp)
671
550
 
672
 
/*
673
 
 * vi:set ai:
 
551
/* vi:set ai:
674
552
 */