~ubuntu-branches/ubuntu/oneiric/strigi/oneiric

« back to all changes in this revision

Viewing changes to src/daemon/queue/jobqueue.cpp

  • Committer: Package Import Robot
  • Author(s): Felix Geyer
  • Date: 2011-09-24 17:12:15 UTC
  • mfrom: (1.2.6 upstream)
  • mto: This revision was merged to the branch mainline in revision 44.
  • Revision ID: package-import@ubuntu.com-20110924171215-zmbi1f77jntvz65h
Tags: upstream-0.7.6
ImportĀ upstreamĀ versionĀ 0.7.6

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* This file is part of Strigi Desktop Search
2
 
 *
3
 
 * Copyright (C) 2007 Jos van den Oever <jos@vandenoever.info>
4
 
 *
5
 
 * This library is free software; you can redistribute it and/or
6
 
 * modify it under the terms of the GNU Library General Public
7
 
 * License as published by the Free Software Foundation; either
8
 
 * version 2 of the License, or (at your option) any later version.
9
 
 *
10
 
 * This library is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
 
 * Library General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU Library General Public License
16
 
 * along with this library; see the file COPYING.LIB.  If not, write to
17
 
 * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18
 
 * Boston, MA 02110-1301, USA.
19
 
 */
20
 
#include "jobqueue.h"
21
 
#include "job.h"
22
 
#include "strigi_thread.h"
23
 
#include <iostream>
24
 
#include <list>
25
 
#include <errno.h>
26
 
#include <string.h>
27
 
 
28
 
using namespace std;
29
 
 
30
 
class JobThread {
31
 
private:
32
 
    STRIGI_THREAD_DEFINE(thread);
33
 
    STRIGI_MUTEX_DEFINE(mutex);
34
 
    JobQueue::Private* const queue;
35
 
    Job* job;
36
 
    bool keeprunning;
37
 
 
38
 
    void workloop();
39
 
    static void* start(void* vt) {
40
 
        static_cast<JobThread*>(vt)->workloop();
41
 
        return 0;
42
 
    }
43
 
public:
44
 
    JobThread(JobQueue::Private* q) :queue(q), job(0), keeprunning(true) {
45
 
        STRIGI_MUTEX_INIT(&mutex);
46
 
        STRIGI_THREAD_CREATE(&thread, JobThread::start, this);
47
 
    }
48
 
    ~JobThread() {
49
 
        STRIGI_MUTEX_DESTROY(&mutex);
50
 
    }
51
 
    void stop();
52
 
    void waitTillFinished();
53
 
};
54
 
 
55
 
class JobQueue::Private {
56
 
private:
57
 
   std::list<Job*> jobs;
58
 
   std::list<JobThread*> threads;
59
 
   pthread_cond_t cond;
60
 
   STRIGI_MUTEX_DEFINE(mutex);
61
 
   bool keeprunning;
62
 
 
63
 
public:
64
 
   Private(uint n);
65
 
   ~Private();
66
 
   bool addJob(Job* job);
67
 
   /**
68
 
    * Function for JobThreads to call to wait until a new job is available.
69
 
    * This function may return 0, but that does not mean the thread is ready.
70
 
    **/
71
 
   Job* getNextJob();
72
 
   void nudge();
73
 
};
74
 
void
75
 
JobThread::workloop() {
76
 
    bool run;
77
 
    STRIGI_MUTEX_LOCK(&mutex);
78
 
    run = keeprunning;
79
 
    STRIGI_MUTEX_UNLOCK(&mutex);
80
 
    while (run) {
81
 
        Job* j = queue->getNextJob();
82
 
        STRIGI_MUTEX_LOCK(&mutex);
83
 
        if (j) {
84
 
            job = j;
85
 
            STRIGI_MUTEX_UNLOCK(&mutex);
86
 
            j->run();
87
 
            STRIGI_MUTEX_LOCK(&mutex);
88
 
            delete job;
89
 
            job = 0;
90
 
        }
91
 
        run = keeprunning;
92
 
        STRIGI_MUTEX_UNLOCK(&mutex);
93
 
    }
94
 
    cerr << "stopping" << endl;
95
 
    STRIGI_THREAD_EXIT(&thread);
96
 
}
97
 
void
98
 
JobThread::stop() {
99
 
    STRIGI_MUTEX_LOCK(&mutex);
100
 
    keeprunning = false;
101
 
    if (job) {
102
 
        job->stop();
103
 
    }
104
 
    STRIGI_MUTEX_UNLOCK(&mutex);
105
 
}
106
 
void
107
 
JobThread::waitTillFinished() {
108
 
    STRIGI_THREAD_JOIN(thread);
109
 
}
110
 
JobQueue::JobQueue(uint nthreads) :p(new Private(nthreads)) {}
111
 
JobQueue::~JobQueue() {
112
 
    stop();
113
 
}
114
 
bool
115
 
JobQueue::addJob(Job* job) {
116
 
    return (p) ?p->addJob(job) :false;
117
 
}
118
 
void
119
 
JobQueue::stop() {
120
 
    if (p) {
121
 
       delete p;
122
 
       p = 0;
123
 
    }
124
 
}
125
 
JobQueue::Private::Private(uint nthreads) :keeprunning(true) {
126
 
    pthread_cond_init(&cond, 0);
127
 
    STRIGI_MUTEX_INIT(&mutex);
128
 
    while (threads.size() < nthreads) {
129
 
        threads.push_back(new JobThread(this));
130
 
    }
131
 
}
132
 
JobQueue::Private::~Private() {
133
 
    STRIGI_MUTEX_LOCK(&mutex);
134
 
    keeprunning = false;
135
 
    STRIGI_MUTEX_UNLOCK(&mutex);
136
 
 
137
 
    // tell all threads to stop
138
 
    for (list<JobThread*>::const_iterator i = threads.begin();
139
 
            i != threads.end(); ++i) {
140
 
        (*i)->stop();
141
 
    }
142
 
    // wake up all threads
143
 
    STRIGI_MUTEX_LOCK(&mutex);
144
 
    pthread_cond_broadcast(&cond);
145
 
 
146
 
    // remove all remaining jobs
147
 
    for (list<Job*>::const_iterator i = jobs.begin(); i != jobs.end(); ++i) {
148
 
        delete *i;
149
 
    }
150
 
    jobs.clear();
151
 
    STRIGI_MUTEX_UNLOCK(&mutex);
152
 
 
153
 
    // wait for the threads to finish
154
 
    for (list<JobThread*>::const_iterator i = threads.begin();
155
 
            i != threads.end(); ++i) {
156
 
        (*i)->waitTillFinished();
157
 
        delete *i;
158
 
    }
159
 
    STRIGI_MUTEX_DESTROY(&mutex);
160
 
    pthread_cond_destroy(&cond);
161
 
}
162
 
bool
163
 
JobQueue::Private::addJob(Job* job) {
164
 
    STRIGI_MUTEX_LOCK(&mutex);
165
 
 
166
 
    // check if we can merge this job with a job from the waiting queue
167
 
    list<Job*>::iterator i, end = jobs.end();
168
 
    for (i = jobs.begin(); i != end; ++i) {
169
 
        if ((*i)->merge(job)) {
170
 
            STRIGI_MUTEX_UNLOCK(&mutex);
171
 
            return true;
172
 
        }
173
 
        if (job > *i) {
174
 
            break;
175
 
        }
176
 
    }
177
 
    // insert in front of position i
178
 
    if (i == end) {
179
 
        jobs.push_back(job);
180
 
    } else {
181
 
        jobs.insert(i, job);
182
 
    }
183
 
    // signal a couple of times to make sure
184
 
    pthread_cond_signal(&cond);
185
 
    STRIGI_MUTEX_UNLOCK(&mutex);
186
 
    return true;
187
 
}
188
 
void
189
 
JobQueue::nudge() {
190
 
    if (p) p->nudge();
191
 
}
192
 
void
193
 
JobQueue::Private::nudge() { 
194
 
    STRIGI_MUTEX_LOCK(&mutex);
195
 
    pthread_cond_signal(&cond);
196
 
    STRIGI_MUTEX_UNLOCK(&mutex);
197
 
}
198
 
Job*
199
 
JobQueue::Private::getNextJob() {
200
 
    Job* j = 0;
201
 
    STRIGI_MUTEX_LOCK(&mutex);
202
 
    if (keeprunning && jobs.size() == 0) {
203
 
        if (pthread_cond_wait(&cond, &mutex)) {
204
 
            cerr <<  "Error in cond_wait: " << strerror(errno) << endl;
205
 
        }
206
 
    }
207
 
    if (jobs.size()) {
208
 
        j = jobs.front();
209
 
        jobs.erase(jobs.begin());
210
 
    }
211
 
    STRIGI_MUTEX_UNLOCK(&mutex);
212
 
    return j;
213
 
}