2
* Copyright (c) 2005, Eric Crahen
4
* Permission is hereby granted, free of charge, to any person obtaining a copy
5
* of this software and associated documentation files (the "Software"), to deal
6
* in the Software without restriction, including without limitation the rights
7
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
* copies of the Software, and to permit persons to whom the Software is furnished
9
* to do so, subject to the following conditions:
11
* The above copyright notice and this permission notice shall be included in all
12
* copies or substantial portions of the Software.
14
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23
#include "zthread/ThreadedExecutor.h"
24
#include "zthread/Guard.h"
25
#include "zthread/FastMutex.h"
26
#include "zthread/Time.h"
28
#include "ThreadImpl.h"
37
typedef std::deque<ThreadImpl*> ThreadList;
39
typedef struct group_t {
43
group_t(size_t n) : id(n), count(0) {}
46
typedef std::deque<Group> GroupList;
48
//! Predicate to find a specific group
49
struct by_id : public std::unary_function<bool, Group> {
51
by_id(size_t n) : id(n) {}
52
bool operator()(const Group& grp) {
57
//! Functor to count groups
58
struct counter : public std::unary_function<void, Group> {
60
counter() : count(0) {}
61
void operator()(const Group& grp) { count += grp.count; }
62
operator size_t() { return count; }
72
WaiterQueue() : _id(0), _generation(0) {
73
// At least one empty-group exists
74
_list.push_back( Group(_id++) );
78
* Insert the current thread into the current waiter list
80
* @pre At least one empty group exists
81
* @post At least one empty group exists
83
bool wait(unsigned long timeout) {
85
ThreadImpl* self = ThreadImpl::current();
86
Monitor& m = self->getMonitor();
90
Guard<Lockable> g1(_lock);
92
// At least one empty-group exists
93
assert(!_list.empty());
95
// Return w/o waiting if there are no executing tasks
96
if((size_t)std::for_each(_list.begin(), _list.end(), counter()) < 1)
99
// Update the waiter list for the active group
100
_list.back().waiters.push_back(self);
101
size_t n = _list.back().id;
107
Guard<Lockable, UnlockedScope> g2(g1);
108
state = timeout == 0 ? m.wait() : m.wait(timeout);
114
// If awoke due to a reason other than the last task in the group 'n' completing,
115
// then then find the group 'self' is waiting in
116
GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
117
if(i != _list.end()) {
119
// Remove 'self' from that list if it is still a member
120
ThreadList::iterator j = std::find(i->waiters.begin(), i->waiters.end(), self);
121
if(j != i->waiters.end())
126
// At least one empty-group exists
127
assert(!_list.empty());
130
case Monitor::SIGNALED:
132
case Monitor::TIMEDOUT:
134
case Monitor::INTERRUPTED:
135
throw Interrupted_Exception();
137
throw Synchronization_Exception();
145
* Increment the active group count
147
* @pre at least 1 empty group exists
148
* @post at least 1 non-empty group exists
150
std::pair<size_t, size_t> increment() {
152
Guard<FastMutex> g(_lock);
154
// At least one empty-group exists
155
assert(!_list.empty());
157
GroupList::iterator i = --_list.end();
160
if(i == _list.end()) {
162
// A group should never have been removed until
163
// the final task in that group completed
170
// When the active group is being incremented, insert a new active group
171
// to replace it if there were waiting threads
172
if(i == --_list.end() && !i->waiters.empty())
173
_list.push_back(Group(_id++));
175
// At least 1 non-empty group exists
176
assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
178
return std::make_pair(n, _generation);
184
* Decrease the count for the group with the given id.
188
* @pre At least 1 non-empty group exists
189
* @post At least 1 empty group exists
191
void decrement(size_t n) {
193
Guard<FastMutex> g1(_lock);
195
// At least 1 non-empty group exists
196
assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
198
// Find the requested group
199
GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
200
if(i == _list.end()) {
202
// A group should never have been removed until
203
// the final task in that group completed
208
// Decrease the count for tasks in this group,
209
if(--i->count == 0 && i == _list.begin()) {
213
// When the first group completes, wake all waiters for every
214
// group, starting from the first until a group that is not
215
// complete is reached
218
// Don't remove the empty active group
219
if(i == --_list.end() && i->waiters.empty())
225
// If all waiters were awakened, remove the group
232
// Otherwise, unlock and yield allowing the waiter
233
// lists to be updated if other threads are busy
234
Guard<FastLock, UnlockedScope> g2(g1);
243
} while(i != _list.end() && i->count == 0);
245
// Ensure that an active group exists
247
_list.push_back( Group(++_id) );
251
// At least one group exists
252
assert(!_list.empty());
258
size_t generation(bool next = false) {
260
Guard<FastMutex> g(_lock);
261
return next ? _generation++ : _generation;
268
* Awaken all the waiters remaining in the given group
271
* - true if all the waiting threads were successfully awakened.
272
* - false if there were one or more threads that could not be awakened.
274
bool awaken(Group& grp) {
276
// Go through the waiter list in the given group;
277
for(ThreadList::iterator i = grp.waiters.begin(); i != grp.waiters.end();) {
279
ThreadImpl* impl = *i;
280
Monitor& m = impl->getMonitor();
282
// Try the monitor lock, if it cant be locked skip to the next waiter
285
// Notify the monitor & remove from the waiter list so time isn't
286
// wasted checking it again.
287
i = grp.waiters.erase(i);
289
// Try to wake the waiter, it doesn't matter if this is successful
290
// or not (only fails when the monitor is already going to stop waiting).
298
return grp.waiters.empty();
304
//! Synchronization point for the Executor
307
typedef std::deque<ThreadImpl*> ThreadList;
319
ExecutorImpl() : _canceled(false) {}
321
WaiterQueue& getWaiterQueue() {
325
void registerThread(size_t generation) {
327
// Interrupt slow starting threads
328
if(getWaiterQueue().generation() != generation)
329
ThreadImpl::current()->interrupt();
331
// Enqueue for possible future interrupt()
334
Guard<FastMutex> g(_lock);
335
_threads.push_back( ThreadImpl::current() );
341
void unregisterThread() {
343
Guard<FastMutex> g(_lock);
344
std::remove(_threads.begin(), _threads.end(), ThreadImpl::current() );
350
Guard<FastMutex> g(_lock);
360
Guard<FastMutex> g(_lock);
367
Guard<FastMutex> g(_lock);
369
// Interrupt all the registered threads
370
for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i)
373
// Bump the generation up, ensuring slow starting threads get this interrupt
374
getWaiterQueue().generation( true );
378
}; /* ExecutorImpl */
380
//! Wrap a generation and a group around a task
381
class Worker : public Runnable {
383
CountedPtr< ExecutorImpl > _impl;
391
Worker(const CountedPtr< ExecutorImpl >& impl, const Task& task)
392
: _impl(impl), _task(task) {
394
std::pair<size_t, size_t> pr( _impl->getWaiterQueue().increment() );
397
_generation = pr.second;
401
size_t group() const {
405
size_t generation() const {
411
// Register this thread once its begun; the generation is used to ensure
412
// threads that are slow starting are properly interrupted
414
_impl->registerThread( generation() );
419
/* consume the exceptions the work propogates */
422
_impl->getWaiterQueue().decrement( group() );
424
// Unregister this thread
426
_impl->unregisterThread();
434
ThreadedExecutor::ThreadedExecutor() : _impl(new ExecutorImpl) {}
436
ThreadedExecutor::~ThreadedExecutor() {}
438
void ThreadedExecutor::execute(const Task& task) {
440
Thread t( new Worker(_impl, task) );
444
void ThreadedExecutor::interrupt() {
448
void ThreadedExecutor::cancel() {
452
bool ThreadedExecutor::isCanceled() {
453
return _impl->isCanceled();
456
void ThreadedExecutor::wait() {
457
_impl->getWaiterQueue().wait(0);
460
bool ThreadedExecutor::wait(unsigned long timeout) {
461
return _impl->getWaiterQueue().wait(timeout == 0 ? 1 : timeout);