1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* Original author: Paul McCullagh (H&G2JCtL)
20
* Continued development: Barry Leslie
25
* A independently running thread.
36
#include <sys/signal.h>
43
#include "CSException.h"
45
#include "CSStrUtil.h"
49
* ---------------------------------------------------------------
56
static void td_catch_signal(int sig)
60
if ((self = CSThread::getSelf())) {
62
/* The main thread will pass on a signal to all threads: */
63
if (self->myThreadList)
64
self->myThreadList->signalAllThreads(sig);
65
self->setSignalPending(sig);
71
static void td_throw_signal(int sig)
75
if ((self = CSThread::getSelf())) {
77
/* The main thread will pass on a signal to all threads: */
78
if (self->myThreadList)
79
self->myThreadList->signalAllThreads(sig);
81
self->setSignalPending(sig);
86
static bool td_setup_signals(CSThread *thread)
91
struct sigaction action;
93
sigemptyset(&action.sa_mask);
96
action.sa_handler = td_catch_signal;
98
if (sigaction(SIGUSR2, &action, NULL) == -1)
101
action.sa_handler = td_throw_signal;
108
thread->myException.initOSError(CS_CONTEXT, errno);
109
thread->myException.setStackTrace(thread);
112
CSException::throwOSError(CS_CONTEXT, errno);
120
* ---------------------------------------------------------------
124
void CSThreadList::signalAllThreads(int sig)
130
ptr = (CSThread *) getBack();
134
ptr = (CSThread *) ptr->getNextLink();
141
void CSThreadList::quitAllThreads()
148
ptr = (CSThread *) getBack();
151
ptr->myMustQuit = true;
152
ptr = (CSThread *) ptr->getNextLink();
159
void CSThreadList::stopAllThreads()
165
/* Get a thread that is not self! */
167
if ((thread = (CSThread *) getBack())) {
171
thread = (CSThread *) thread->getNextLink();
189
* ---------------------------------------------------------------
193
void CSThread::addToList()
197
ASSERT(self == this);
199
myThreadList->addFront(self);
201
unlock_(myThreadList);
208
void CSThread::removeFromList()
210
if (myThreadList && isRunning) {
212
/* Retain the thread in order to ensure
213
* that after it is removed from the list,
214
* that it is not freed! This would make the
215
* unlock_() call invalid, because it requires
220
myThreadList->remove(RETAIN(this));
221
unlock_(myThreadList);
228
void *CSThread::dispatch(void *arg)
231
void *return_data = NULL;
234
/* Get a reference to myself: */
235
self = reinterpret_cast<CSThread*>(arg);
238
/* Store my thread in the thread key: */
239
if ((err = pthread_setspecific(CSThread::sThreadKey, self))) {
240
CSException::logOSError(self, CS_CONTEXT, err);
245
* Make sure the thread is not freed while we
251
td_setup_signals(NULL);
253
/* Add the thread to the list: */
256
// Run the task from the correct context
257
return_data = self->run();
260
self->logException();
265
* Removing from the thread list will also release the thread.
267
self->removeFromList();
277
static void *dispatch_wrapper(void *arg)
279
return CSThread::dispatch(arg);
284
void *CSThread::run()
291
void CSThread::start()
295
err = pthread_create(&iThread, NULL, dispatch_wrapper, (void *) this);
297
CSException::throwOSError(CS_CONTEXT, err);
299
/* Check if the thread is still alive,
300
* so we don't hang forever.
302
if (pthread_kill(iThread, 0))
308
void CSThread::stop()
314
void *CSThread::join()
320
if ((err = pthread_join(iThread, &return_data)))
321
CSException::throwOSError(CS_CONTEXT, err);
322
return_(return_data);
325
void CSThread::setSignalPending(unsigned int sig)
328
/* The terminate signal takes priority: */
329
signalPending = SIGTERM;
330
else if (!signalPending)
331
/* Otherwise, first signal wins... */
335
void CSThread::signal(unsigned int sig)
339
setSignalPending(sig);
340
if ((err = pthread_kill(iThread, SIGUSR2)))
342
/* Ignore the error if the process does not exist! */
343
if (err != ESRCH) /* No such process */
344
CSException::throwOSError(CS_CONTEXT, err);
348
void CSThread::throwSignal()
352
if ((sig = signalPending) && !ignoreSignals) {
354
CSException::throwSignal(CS_CONTEXT, sig);
358
bool CSThread::isMain()
364
* -----------------------------------------------------------------------
365
* THROWING EXCEPTIONS
369
* When an exception is .
372
void CSThread::releaseObjects(CSReleasePtr top)
376
while (relTop > top) {
377
/* Remove and release or unlock the object on the top of the stack: */
379
switch(relTop->r_type) {
380
case CS_RELEASE_OBJECT:
381
if ((obj = relTop->x.r_object))
384
case CS_RELEASE_MUTEX:
385
if (relTop->x.r_mutex)
386
relTop->x.r_mutex->unlock();
388
case CS_RELEASE_POOLED:
389
if (relTop->x.r_pooled)
390
relTop->x.r_pooled->returnToPool();
396
/* Throw an already registered error: */
397
void CSThread::throwException()
399
/* Record the stack trace: */
400
if (this->jumpDepth > 0 && this->jumpDepth <= CS_JUMP_STACK_SIZE) {
402
* As recommended by Barry:
403
* release the objects before we jump!
404
* This has the advantage that the stack context is still
405
* valid when the resources are released.
407
releaseObjects(this->jumpEnv[this->jumpDepth-1].jb_res_top);
409
/* Then do the longjmp: */
410
longjmp(this->jumpEnv[this->jumpDepth-1].jb_buffer, 1);
414
void CSThread::logStack(int depth, const char *msg)
416
char buffer[CS_EXC_CONTEXT_SIZE +1];
418
CSL.log(this, CSLog::Trace, msg);
420
for (int i= callTop-1; i>=0 && depth; i--, depth--) {
421
cs_format_context(CS_EXC_CONTEXT_SIZE, buffer,
422
callStack[i].cs_func, callStack[i].cs_file, callStack[i].cs_line);
423
strcat(buffer, "\n");
424
CSL.log(this, CSLog::Trace, buffer);
429
void CSThread::logException()
431
myException.log(this);
435
* This function is called when an exception is caught.
436
* It restores the function call top and frees
437
* any resource allocated by lower levels.
439
void CSThread::caught()
441
/* Restore the call top: */
442
this->callTop = this->jumpEnv[this->jumpDepth].jb_call_top;
445
* Release all all objects that were pushed after
446
* this jump position was set:
448
releaseObjects(this->jumpEnv[this->jumpDepth].jb_res_top);
452
* ---------------------------------------------------------------
456
pthread_key_t CSThread::sThreadKey;
457
bool CSThread::isUp = false;
459
bool CSThread::startUp()
464
if ((err = pthread_key_create(&sThreadKey, NULL))) {
465
CSException::logOSError(CS_CONTEXT, errno);
473
void CSThread::shutDown()
478
bool CSThread::attach(CSThread *thread)
483
CSException::logOSError(CS_CONTEXT, ENOMEM);
487
if (!setSelf(thread))
490
/* Now we are ready to receive signals: */
491
if (!td_setup_signals(thread))
499
void CSThread::detach(CSThread *thread)
501
ASSERT(!getSelf() || getSelf() == thread);
502
thread->removeFromList();
504
pthread_setspecific(sThreadKey, NULL);
507
CSThread* CSThread::getSelf()
509
CSThread* self = NULL;
511
if ((!isUp) || !(self = (CSThread*) pthread_getspecific(sThreadKey)))
512
return (CSThread*) NULL;
515
if (self->iRefCount == 0) {
516
pthread_setspecific(sThreadKey, NULL);
517
CSException::throwAssertion(CS_CONTEXT, "Bad self pointer.");
524
bool CSThread::setSelf(CSThread *self)
529
self->iThread = pthread_self();
531
/* Store my thread in the thread key: */
532
if ((err = pthread_setspecific(sThreadKey, self))) {
533
self->myException.initOSError(CS_CONTEXT, err);
534
self->myException.setStackTrace(self);
539
pthread_setspecific(sThreadKey, NULL);
543
/* timeout is in milliseconds */
544
void CSThread::sleep(unsigned long timeout)
547
usleep(timeout * 1000);
553
int cs_assert(const char *func, const char *file, int line, const char *message)
555
CSException::throwAssertion(func, file, line, message);
559
int cs_hope(const char *func, const char *file, int line, const char *message)
563
e.initAssertion(func, file, line, message);
569
CSThread *CSThread::newThread(CSString *name, ThreadRunFunc run_func, CSThreadList *list)
574
if (!(thd = new CSThread(list))) {
576
CSException::throwOSError(CS_CONTEXT, ENOMEM);
578
thd->threadName = name;
579
thd->iRunFunc = run_func;
583
CSThread *CSThread::newCSThread()
585
CSThread *thd = NULL;
587
if (!(thd = new CSThread(NULL))) {
588
CSException::throwOSError(CS_CONTEXT, ENOMEM);
595
* ---------------------------------------------------------------
599
CSDaemon::CSDaemon(time_t wait_time, CSThreadList *list):
602
myWaitTime(wait_time),
608
CSDaemon::CSDaemon(CSThreadList *list):
617
void *CSDaemon::run()
619
bool must_sleep = false;
621
CLOBBER_PROTECT(must_sleep);
624
CLOBBER_PROTECT(self);
626
myMustQuit = !initializeWork();
630
while (!myMustQuit) {
634
suspendedWait(myWaitTime);
641
must_sleep = doWork();
645
if (!handleException())
654
/* Prevent signals from going off in completeWork! */
655
ignoreSignals = true;
657
return_(completeWork());
660
bool CSDaemon::doWork()
667
bool CSDaemon::handleException()
674
void CSDaemon::wakeup()
679
void CSDaemon::stop()
687
void CSDaemon::suspend()
692
while (!iSuspended && !myMustQuit)
700
void CSDaemon::resume()
704
if (iSuspendCount > 0)
711
void CSDaemon::suspended()
713
if (!iSuspendCount || myMustQuit) {
719
while (iSuspendCount && !myMustQuit) {
728
void CSDaemon::suspendedWait()
736
void CSDaemon::suspendedWait(time_t milli_sec)
747
* ---------------------------------------------------------------