~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSThread.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-10-02 14:17:48 UTC
  • mfrom: (1.1.1 upstream)
  • mto: (2.1.17 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20101002141748-m6vbfbfjhrw1153e
Tags: 2010.09.1802-1
* New upstream release.
* Removed pid-file argument hack.
* Updated GPL-2 address to be new address.
* Directly copy in drizzledump.1 since debian doesn't have sphinx 1.0 yet.
* Link to jquery from libjs-jquery. Add it as a depend.
* Add drizzled.8 symlink to the install files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Original author: Paul McCullagh (H&G2JCtL)
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-05-20
 
23
 *
 
24
 * CORE SYSTEM:
 
25
 * A independently running thread.
 
26
 *
 
27
 */
 
28
 
 
29
#include "CSConfig.h"
 
30
 
 
31
#ifdef OS_WINDOWS
 
32
#include <signal.h>
 
33
#include "uniwin.h"
 
34
#else
 
35
#include <signal.h>
 
36
#include <sys/signal.h>
 
37
#endif
 
38
#include <unistd.h>
 
39
#include <errno.h>
 
40
 
 
41
#include "CSGlobal.h"
 
42
#include "CSLog.h"
 
43
#include "CSException.h"
 
44
#include "CSThread.h"
 
45
#include "CSStrUtil.h"
 
46
#include "CSMemory.h"
 
47
 
 
48
/*
 
49
 * ---------------------------------------------------------------
 
50
 * SIGNAL HANDLERS
 
51
 */
 
52
 
 
53
extern "C" {
 
54
 
 
55
 
 
56
static void td_catch_signal(int sig)
 
57
{
 
58
        CSThread *self;
 
59
 
 
60
        if ((self = CSThread::getSelf())) {
 
61
                if (self->isMain()) {
 
62
                        /* The main thread will pass on a signal to all threads: */
 
63
                        if (self->myThreadList)
 
64
                                self->myThreadList->signalAllThreads(sig);
 
65
                        self->setSignalPending(sig);
 
66
                }
 
67
        }
 
68
        
 
69
}
 
70
 
 
71
static  void td_throw_signal(int sig)
 
72
{
 
73
        CSThread *self;
 
74
 
 
75
        if ((self = CSThread::getSelf())) {
 
76
                if (self->isMain()) {
 
77
                        /* The main thread will pass on a signal to all threads: */
 
78
                        if (self->myThreadList)
 
79
                                self->myThreadList->signalAllThreads(sig);
 
80
                }
 
81
                self->setSignalPending(sig);
 
82
                self->interrupted();
 
83
        }
 
84
}
 
85
 
 
86
static bool td_setup_signals(CSThread *thread)
 
87
{
 
88
#ifdef OS_WINDOWS
 
89
        return true;
 
90
#else
 
91
        struct sigaction action;
 
92
 
 
93
    sigemptyset(&action.sa_mask);
 
94
    action.sa_flags = 0;
 
95
 
 
96
    action.sa_handler = td_catch_signal;
 
97
 
 
98
        if (sigaction(SIGUSR2, &action, NULL) == -1)
 
99
                goto error_occurred;
 
100
 
 
101
    action.sa_handler = td_throw_signal;
 
102
 
 
103
        return true;
 
104
 
 
105
        error_occurred:
 
106
 
 
107
        if (thread) {
 
108
                thread->myException.initOSError(CS_CONTEXT, errno);
 
109
                thread->myException.setStackTrace(thread);
 
110
        }
 
111
        else
 
112
                CSException::throwOSError(CS_CONTEXT, errno);
 
113
        return false;
 
114
#endif
 
115
}
 
116
 
 
117
}
 
118
 
 
119
/*
 
120
 * ---------------------------------------------------------------
 
121
 * THREAD LISTS
 
122
 */
 
123
 
 
124
void CSThreadList::signalAllThreads(int sig)
 
125
{
 
126
        CSThread *ptr;
 
127
 
 
128
        enter_();
 
129
        lock_(this);
 
130
        ptr = (CSThread *) getBack();
 
131
        while (ptr) {
 
132
                if (ptr != self)
 
133
                        ptr->signal(sig);
 
134
                ptr = (CSThread *) ptr->getNextLink();
 
135
        }
 
136
        unlock_(this);
 
137
 
 
138
        exit_();
 
139
}
 
140
 
 
141
void CSThreadList::quitAllThreads()
 
142
{
 
143
        CSThread *ptr;
 
144
 
 
145
        enter_();
 
146
        lock_(this);
 
147
        
 
148
        ptr = (CSThread *) getBack();
 
149
        while (ptr) {
 
150
                if (ptr != self)
 
151
                        ptr->myMustQuit = true;
 
152
                ptr = (CSThread *) ptr->getNextLink();
 
153
        }
 
154
        
 
155
        unlock_(this);
 
156
        exit_();
 
157
}
 
158
 
 
159
void CSThreadList::stopAllThreads()
 
160
{
 
161
        CSThread *thread;
 
162
 
 
163
        enter_();
 
164
        for (;;) {
 
165
                /* Get a thread that is not self! */
 
166
                lock_(this);
 
167
                if ((thread = (CSThread *) getBack())) {
 
168
                        while (thread) {
 
169
                                if (thread != self)
 
170
                                        break;
 
171
                                thread = (CSThread *) thread->getNextLink();
 
172
                        }
 
173
                }
 
174
                if (thread)
 
175
                        thread->retain();
 
176
                unlock_(this);
 
177
                
 
178
                if (!thread)
 
179
                        break;
 
180
                        
 
181
                push_(thread);
 
182
                thread->stop();
 
183
                release_(thread);
 
184
        }
 
185
        exit_();
 
186
}
 
187
 
 
188
/*
 
189
 * ---------------------------------------------------------------
 
190
 * CSTHREAD
 
191
 */
 
192
 
 
193
void CSThread::addToList()
 
194
{
 
195
        if (myThreadList) {
 
196
                enter_();
 
197
                ASSERT(self == this);
 
198
                lock_(myThreadList);
 
199
                myThreadList->addFront(self);
 
200
                isRunning = true;
 
201
                unlock_(myThreadList);
 
202
                exit_();
 
203
        }
 
204
        else
 
205
                isRunning = true;
 
206
}
 
207
        
 
208
void CSThread::removeFromList()
 
209
{
 
210
        if (myThreadList && isRunning) {
 
211
                enter_();
 
212
                /* Retain the thread in order to ensure
 
213
                 * that after it is removed from the list,
 
214
                 * that it is not freed! This would make the
 
215
                 * unlock_() call invalid, because it requires
 
216
                 * on the thread.
 
217
                 */
 
218
                push_(this);
 
219
                lock_(myThreadList);
 
220
                myThreadList->remove(RETAIN(this));
 
221
                unlock_(myThreadList);
 
222
                pop_(this);
 
223
                outer_();
 
224
        }
 
225
        this->release();
 
226
}
 
227
 
 
228
void *CSThread::dispatch(void *arg)
 
229
{
 
230
        CSThread                *self;
 
231
        void                    *return_data = NULL;
 
232
        int                             err;
 
233
 
 
234
        /* Get a reference to myself: */
 
235
        self = reinterpret_cast<CSThread*>(arg);
 
236
        ASSERT(self);
 
237
 
 
238
        /* Store my thread in the thread key: */
 
239
        if ((err = pthread_setspecific(CSThread::sThreadKey, self))) {
 
240
                CSException::logOSError(self, CS_CONTEXT, err);
 
241
                return NULL;
 
242
        }
 
243
 
 
244
        /*
 
245
         * Make sure the thread is not freed while we
 
246
         * are running:
 
247
         */
 
248
        self->retain();
 
249
 
 
250
        try_(a) {
 
251
                td_setup_signals(NULL);
 
252
 
 
253
                /* Add the thread to the list: */
 
254
                self->addToList();
 
255
 
 
256
                // Run the task from the correct context
 
257
                return_data = self->run();
 
258
        }
 
259
        catch_(a) {
 
260
                self->logException();
 
261
        }
 
262
        cont_(a);
 
263
 
 
264
        /*
 
265
         * Removing from the thread list will also release the thread.
 
266
         */
 
267
        self->removeFromList();
 
268
 
 
269
        // Exit the thread
 
270
        return return_data;
 
271
}
 
272
 
 
273
 
 
274
extern "C"
 
275
{
 
276
 
 
277
static void *dispatch_wrapper(void *arg)
 
278
{
 
279
        return CSThread::dispatch(arg);
 
280
}
 
281
 
 
282
}
 
283
 
 
284
void *CSThread::run()
 
285
{
 
286
        if (iRunFunc)
 
287
                return iRunFunc();
 
288
        return NULL;
 
289
}
 
290
 
 
291
void CSThread::start()
 
292
{
 
293
        int err;
 
294
 
 
295
        err = pthread_create(&iThread, NULL, dispatch_wrapper, (void *) this);
 
296
        if (err)
 
297
                CSException::throwOSError(CS_CONTEXT, err);
 
298
        while (!isRunning) {
 
299
                /* Check if the thread is still alive,
 
300
                 * so we don't hang forever.
 
301
                 */
 
302
                if (pthread_kill(iThread, 0))
 
303
                        break;
 
304
                usleep(10);
 
305
        }
 
306
}
 
307
 
 
308
void CSThread::stop()
 
309
{
 
310
        signal(SIGTERM);
 
311
        join();
 
312
}
 
313
 
 
314
void *CSThread::join()
 
315
{
 
316
        void    *return_data;
 
317
        int             err;
 
318
 
 
319
        enter_();
 
320
        if ((err = pthread_join(iThread, &return_data)))
 
321
                CSException::throwOSError(CS_CONTEXT, err);
 
322
        return_(return_data);
 
323
}
 
324
 
 
325
void CSThread::setSignalPending(unsigned int sig)
 
326
{
 
327
        if (sig == SIGTERM)
 
328
                /* The terminate signal takes priority: */
 
329
                signalPending = SIGTERM;
 
330
        else if (!signalPending)
 
331
                /* Otherwise, first signal wins... */
 
332
                signalPending = sig;
 
333
}
 
334
 
 
335
void CSThread::signal(unsigned int sig)
 
336
{
 
337
        int err;
 
338
 
 
339
        setSignalPending(sig);
 
340
        if ((err = pthread_kill(iThread, SIGUSR2)))
 
341
        {
 
342
                /* Ignore the error if the process does not exist! */
 
343
                if (err != ESRCH) /* No such process */
 
344
                        CSException::throwOSError(CS_CONTEXT, err);
 
345
        }
 
346
}
 
347
 
 
348
void CSThread::throwSignal()
 
349
{
 
350
        int sig;
 
351
 
 
352
        if ((sig = signalPending) && !ignoreSignals) {
 
353
                signalPending = 0;
 
354
                CSException::throwSignal(CS_CONTEXT, sig);
 
355
        }
 
356
}
 
357
 
 
358
bool CSThread::isMain()
 
359
{
 
360
        return iIsMain;
 
361
}
 
362
 
 
363
/*
 
364
 * -----------------------------------------------------------------------
 
365
 * THROWING EXCEPTIONS
 
366
 */
 
367
 
 
368
/* 
 
369
 * When an exception is .
 
370
 */
 
371
 
 
372
void CSThread::releaseObjects(CSReleasePtr top)
 
373
{
 
374
        CSObject *obj;
 
375
 
 
376
        while (relTop > top) {
 
377
                /* Remove and release or unlock the object on the top of the stack: */
 
378
                relTop--;
 
379
                switch(relTop->r_type) {
 
380
                        case CS_RELEASE_OBJECT:
 
381
                                if ((obj = relTop->x.r_object))
 
382
                                        obj->release();
 
383
                                break;
 
384
                        case CS_RELEASE_MUTEX:
 
385
                                if (relTop->x.r_mutex)
 
386
                                        relTop->x.r_mutex->unlock();
 
387
                                break;
 
388
                        case CS_RELEASE_POOLED:
 
389
                                if (relTop->x.r_pooled)
 
390
                                        relTop->x.r_pooled->returnToPool();
 
391
                                break;
 
392
                }
 
393
        }
 
394
}
 
395
 
 
396
/* Throw an already registered error: */
 
397
void CSThread::throwException()
 
398
{
 
399
        /* Record the stack trace: */
 
400
        if (this->jumpDepth > 0 && this->jumpDepth <= CS_JUMP_STACK_SIZE) {
 
401
                /*
 
402
                 * As recommended by Barry:
 
403
                 * release the objects before we jump!
 
404
                 * This has the advantage that the stack context is still
 
405
                 * valid when the resources are released.
 
406
                 */
 
407
                releaseObjects(this->jumpEnv[this->jumpDepth-1].jb_res_top);
 
408
 
 
409
                /* Then do the longjmp: */
 
410
                longjmp(this->jumpEnv[this->jumpDepth-1].jb_buffer, 1);
 
411
        }
 
412
}
 
413
 
 
414
void CSThread::logStack(int depth, const char *msg)
 
415
{
 
416
        char buffer[CS_EXC_CONTEXT_SIZE +1];
 
417
        CSL.lock();
 
418
        CSL.log(this, CSLog::Trace, msg);
 
419
        
 
420
        for (int i= callTop-1; i>=0 && depth; i--, depth--) {
 
421
                cs_format_context(CS_EXC_CONTEXT_SIZE, buffer,
 
422
                        callStack[i].cs_func, callStack[i].cs_file, callStack[i].cs_line);
 
423
                strcat(buffer, "\n");
 
424
                CSL.log(this, CSLog::Trace, buffer);
 
425
        }
 
426
        CSL.unlock();
 
427
}
 
428
 
 
429
void CSThread::logException()
 
430
{
 
431
        myException.log(this);
 
432
}
 
433
 
 
434
/*
 
435
 * This function is called when an exception is caught.
 
436
 * It restores the function call top and frees
 
437
 * any resource allocated by lower levels.
 
438
 */
 
439
void CSThread::caught()
 
440
{
 
441
        /* Restore the call top: */
 
442
        this->callTop = this->jumpEnv[this->jumpDepth].jb_call_top;
 
443
 
 
444
        /* 
 
445
         * Release all all objects that were pushed after
 
446
         * this jump position was set:
 
447
         */
 
448
        releaseObjects(this->jumpEnv[this->jumpDepth].jb_res_top);
 
449
}
 
450
 
 
451
/*
 
452
 * ---------------------------------------------------------------
 
453
 * STATIC METHODS
 
454
 */
 
455
 
 
456
pthread_key_t   CSThread::sThreadKey;
 
457
bool                    CSThread::isUp = false;
 
458
 
 
459
bool CSThread::startUp()
 
460
{
 
461
        int err;
 
462
 
 
463
        isUp = false;
 
464
        if ((err = pthread_key_create(&sThreadKey, NULL))) {
 
465
                CSException::logOSError(CS_CONTEXT, errno);
 
466
                return false;
 
467
        } else
 
468
                isUp = true;
 
469
                
 
470
        return isUp;
 
471
}
 
472
 
 
473
void CSThread::shutDown()
 
474
{
 
475
        isUp = false;
 
476
}
 
477
 
 
478
bool CSThread::attach(CSThread *thread)
 
479
{
 
480
        ASSERT(!getSelf());
 
481
        
 
482
        if (!thread) {
 
483
                CSException::logOSError(CS_CONTEXT, ENOMEM);
 
484
                return false;
 
485
        }
 
486
 
 
487
        if (!setSelf(thread))
 
488
                return false;
 
489
 
 
490
        /* Now we are ready to receive signals: */
 
491
        if (!td_setup_signals(thread))
 
492
                return false;
 
493
 
 
494
        thread->addToList();
 
495
        thread->retain();
 
496
        return true;
 
497
}
 
498
 
 
499
void CSThread::detach(CSThread *thread)
 
500
{
 
501
        ASSERT(!getSelf() || getSelf() == thread);
 
502
        thread->removeFromList();
 
503
        thread->release();
 
504
        pthread_setspecific(sThreadKey, NULL);
 
505
}
 
506
 
 
507
CSThread* CSThread::getSelf()
 
508
{
 
509
        CSThread* self = NULL;
 
510
        
 
511
        if ((!isUp) || !(self = (CSThread*) pthread_getspecific(sThreadKey)))
 
512
                return (CSThread*) NULL;
 
513
                
 
514
#ifdef DEBUG
 
515
        if (self->iRefCount == 0) {
 
516
                pthread_setspecific(sThreadKey, NULL);
 
517
                CSException::throwAssertion(CS_CONTEXT, "Bad self pointer.");
 
518
        }       
 
519
#endif
 
520
 
 
521
        return self;
 
522
}
 
523
 
 
524
bool CSThread::setSelf(CSThread *self)
 
525
{
 
526
        int err;
 
527
 
 
528
        if (self) {
 
529
                self->iThread = pthread_self();
 
530
 
 
531
                /* Store my thread in the thread key: */
 
532
                if ((err = pthread_setspecific(sThreadKey, self))) {
 
533
                        self->myException.initOSError(CS_CONTEXT, err);
 
534
                        self->myException.setStackTrace(self);
 
535
                        return false;
 
536
                }
 
537
        }
 
538
        else
 
539
                pthread_setspecific(sThreadKey, NULL);
 
540
        return true;
 
541
}
 
542
 
 
543
/* timeout is in milliseconds */
 
544
void CSThread::sleep(unsigned long timeout)
 
545
{
 
546
        enter_();
 
547
        usleep(timeout * 1000);
 
548
        self->interrupted();
 
549
        exit_();
 
550
}
 
551
 
 
552
#ifdef DEBUG
 
553
int cs_assert(const char *func, const char *file, int line, const char *message)
 
554
{
 
555
        CSException::throwAssertion(func, file, line, message);
 
556
        return 0;
 
557
}
 
558
 
 
559
int cs_hope(const char *func, const char *file, int line, const char *message)
 
560
{
 
561
        CSException e;
 
562
                
 
563
        e.initAssertion(func, file, line, message);
 
564
        e.log(NULL);
 
565
        return 0;
 
566
}
 
567
#endif
 
568
 
 
569
CSThread *CSThread::newThread(CSString *name, ThreadRunFunc run_func, CSThreadList *list)
 
570
{
 
571
        CSThread *thd;
 
572
 
 
573
        enter_();
 
574
        if (!(thd = new CSThread(list))) {
 
575
                name->release();
 
576
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
577
        }
 
578
        thd->threadName = name;
 
579
        thd->iRunFunc = run_func;
 
580
        return_(thd);
 
581
}
 
582
 
 
583
CSThread *CSThread::newCSThread()
 
584
{
 
585
        CSThread *thd = NULL;
 
586
 
 
587
        if (!(thd = new CSThread(NULL))) {
 
588
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
589
        }
 
590
        
 
591
        return thd;
 
592
}
 
593
 
 
594
/*
 
595
 * ---------------------------------------------------------------
 
596
 * DAEMON THREADS
 
597
 */
 
598
 
 
599
CSDaemon::CSDaemon(time_t wait_time, CSThreadList *list):
 
600
CSThread(list),
 
601
CSSync(),
 
602
myWaitTime(wait_time),
 
603
iSuspended(false),
 
604
iSuspendCount(0)
 
605
{
 
606
}
 
607
 
 
608
CSDaemon::CSDaemon(CSThreadList *list):
 
609
CSThread(list),
 
610
CSSync(),
 
611
myWaitTime(0),
 
612
iSuspended(false),
 
613
iSuspendCount(0)
 
614
{
 
615
}
 
616
 
 
617
void *CSDaemon::run()
 
618
{
 
619
        bool must_sleep = false;
 
620
 
 
621
        CLOBBER_PROTECT(must_sleep);
 
622
 
 
623
        enter_();
 
624
        CLOBBER_PROTECT(self);
 
625
 
 
626
        myMustQuit = !initializeWork();
 
627
 
 
628
        restart:
 
629
        try_(a) {
 
630
                while (!myMustQuit) {
 
631
                        if (must_sleep) {
 
632
                                lock_(this);
 
633
                                if (myWaitTime)
 
634
                                        suspendedWait(myWaitTime);
 
635
                                else
 
636
                                        suspendedWait();
 
637
                                unlock_(this);
 
638
                                if (myMustQuit)
 
639
                                        break;
 
640
                        }
 
641
                        must_sleep = doWork();
 
642
                }
 
643
        }
 
644
        catch_(a) {
 
645
                if (!handleException())
 
646
                        myMustQuit = true;
 
647
        }
 
648
        cont_(a);
 
649
        if (!myMustQuit) {
 
650
                must_sleep = true;
 
651
                goto restart;
 
652
        }
 
653
 
 
654
        /* Prevent signals from going off in completeWork! */
 
655
        ignoreSignals = true;
 
656
 
 
657
        return_(completeWork());
 
658
}
 
659
 
 
660
bool CSDaemon::doWork()
 
661
{
 
662
        if (iRunFunc)
 
663
                (void) iRunFunc();
 
664
        return true;
 
665
}
 
666
 
 
667
bool CSDaemon::handleException()
 
668
{
 
669
        if (!myMustQuit)
 
670
                logException();
 
671
        return true;
 
672
}
 
673
 
 
674
void CSDaemon::wakeup()
 
675
{
 
676
        CSSync::wakeup();
 
677
}
 
678
 
 
679
void CSDaemon::stop()
 
680
{
 
681
        myMustQuit = true;
 
682
        wakeup();
 
683
        signal(SIGTERM);
 
684
        join();
 
685
}
 
686
 
 
687
void CSDaemon::suspend()
 
688
{
 
689
        enter_();
 
690
        lock_(this);
 
691
        iSuspendCount++;
 
692
        while (!iSuspended && !myMustQuit)
 
693
                wait(500);
 
694
        if (!iSuspended)
 
695
                iSuspendCount--;
 
696
        unlock_(this);
 
697
        exit_();
 
698
}
 
699
 
 
700
void CSDaemon::resume()
 
701
{
 
702
        enter_();
 
703
        lock_(this);
 
704
        if (iSuspendCount > 0)
 
705
                iSuspendCount--;
 
706
        wakeup();
 
707
        unlock_(this);
 
708
        exit_();
 
709
}
 
710
 
 
711
void CSDaemon::suspended()
 
712
{
 
713
        if (!iSuspendCount || myMustQuit) {
 
714
                iSuspended = false;
 
715
                return;
 
716
        }
 
717
        enter_();
 
718
        lock_(this);
 
719
        while (iSuspendCount && !myMustQuit) {
 
720
                iSuspended = true;
 
721
                wait(500);
 
722
        }
 
723
        iSuspended = false;
 
724
        unlock_(this);
 
725
        exit_();
 
726
}
 
727
 
 
728
void CSDaemon::suspendedWait()
 
729
{
 
730
        iSuspended = true;
 
731
        wait();
 
732
        if (iSuspendCount)
 
733
                suspended();
 
734
}
 
735
 
 
736
void CSDaemon::suspendedWait(time_t milli_sec)
 
737
{
 
738
        iSuspended = true;
 
739
        wait(milli_sec);
 
740
        if (iSuspendCount)
 
741
                suspended();
 
742
        else
 
743
                iSuspended = false;
 
744
}
 
745
 
 
746
/*
 
747
 * ---------------------------------------------------------------
 
748
 * THREAD POOLS
 
749
 */
 
750
 
 
751