1
/* This file is part of Strigi Desktop Search
3
* Copyright (C) 2007 Jos van den Oever <jos@vandenoever.info>
5
* This library is free software; you can redistribute it and/or
6
* modify it under the terms of the GNU Library General Public
7
* License as published by the Free Software Foundation; either
8
* version 2 of the License, or (at your option) any later version.
10
* This library is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13
* Library General Public License for more details.
15
* You should have received a copy of the GNU Library General Public License
16
* along with this library; see the file COPYING.LIB. If not, write to
17
* the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18
* Boston, MA 02110-1301, USA.
22
#include "strigi_thread.h"
32
STRIGI_THREAD_DEFINE(thread);
33
STRIGI_MUTEX_DEFINE(mutex);
34
JobQueue::Private* const queue;
39
static void* start(void* vt) {
40
static_cast<JobThread*>(vt)->workloop();
44
JobThread(JobQueue::Private* q) :queue(q), job(0), keeprunning(true) {
45
STRIGI_MUTEX_INIT(&mutex);
46
STRIGI_THREAD_CREATE(&thread, JobThread::start, this);
49
STRIGI_MUTEX_DESTROY(&mutex);
52
void waitTillFinished();
55
class JobQueue::Private {
58
std::list<JobThread*> threads;
60
STRIGI_MUTEX_DEFINE(mutex);
66
bool addJob(Job* job);
68
* Function for JobThreads to call to wait until a new job is available.
69
* This function may return 0, but that does not mean the thread is ready.
75
JobThread::workloop() {
77
STRIGI_MUTEX_LOCK(&mutex);
79
STRIGI_MUTEX_UNLOCK(&mutex);
81
Job* j = queue->getNextJob();
82
STRIGI_MUTEX_LOCK(&mutex);
85
STRIGI_MUTEX_UNLOCK(&mutex);
87
STRIGI_MUTEX_LOCK(&mutex);
92
STRIGI_MUTEX_UNLOCK(&mutex);
94
cerr << "stopping" << endl;
95
STRIGI_THREAD_EXIT(&thread);
99
STRIGI_MUTEX_LOCK(&mutex);
104
STRIGI_MUTEX_UNLOCK(&mutex);
107
JobThread::waitTillFinished() {
108
STRIGI_THREAD_JOIN(thread);
110
JobQueue::JobQueue(uint nthreads) :p(new Private(nthreads)) {}
111
JobQueue::~JobQueue() {
115
JobQueue::addJob(Job* job) {
116
return (p) ?p->addJob(job) :false;
125
JobQueue::Private::Private(uint nthreads) :keeprunning(true) {
126
pthread_cond_init(&cond, 0);
127
STRIGI_MUTEX_INIT(&mutex);
128
while (threads.size() < nthreads) {
129
threads.push_back(new JobThread(this));
132
JobQueue::Private::~Private() {
133
STRIGI_MUTEX_LOCK(&mutex);
135
STRIGI_MUTEX_UNLOCK(&mutex);
137
// tell all threads to stop
138
for (list<JobThread*>::const_iterator i = threads.begin();
139
i != threads.end(); ++i) {
142
// wake up all threads
143
STRIGI_MUTEX_LOCK(&mutex);
144
pthread_cond_broadcast(&cond);
146
// remove all remaining jobs
147
for (list<Job*>::const_iterator i = jobs.begin(); i != jobs.end(); ++i) {
151
STRIGI_MUTEX_UNLOCK(&mutex);
153
// wait for the threads to finish
154
for (list<JobThread*>::const_iterator i = threads.begin();
155
i != threads.end(); ++i) {
156
(*i)->waitTillFinished();
159
STRIGI_MUTEX_DESTROY(&mutex);
160
pthread_cond_destroy(&cond);
163
JobQueue::Private::addJob(Job* job) {
164
STRIGI_MUTEX_LOCK(&mutex);
166
// check if we can merge this job with a job from the waiting queue
167
list<Job*>::iterator i, end = jobs.end();
168
for (i = jobs.begin(); i != end; ++i) {
169
if ((*i)->merge(job)) {
170
STRIGI_MUTEX_UNLOCK(&mutex);
177
// insert in front of position i
183
// signal a couple of times to make sure
184
pthread_cond_signal(&cond);
185
STRIGI_MUTEX_UNLOCK(&mutex);
193
JobQueue::Private::nudge() {
194
STRIGI_MUTEX_LOCK(&mutex);
195
pthread_cond_signal(&cond);
196
STRIGI_MUTEX_UNLOCK(&mutex);
199
JobQueue::Private::getNextJob() {
201
STRIGI_MUTEX_LOCK(&mutex);
202
if (keeprunning && jobs.size() == 0) {
203
if (pthread_cond_wait(&cond, &mutex)) {
204
cerr << "Error in cond_wait: " << strerror(errno) << endl;
209
jobs.erase(jobs.begin());
211
STRIGI_MUTEX_UNLOCK(&mutex);