~ubuntu-branches/ubuntu/trusty/hugin/trusty-proposed

« back to all changes in this revision

Viewing changes to src/foreign/zthread/src/ThreadedExecutor.cxx

  • Committer: Bazaar Package Importer
  • Author(s): Andreas Metzler
  • Date: 2011-01-06 14:28:24 UTC
  • mfrom: (1.1.9 upstream) (0.1.21 experimental)
  • Revision ID: james.westby@ubuntu.com-20110106142824-zn9lxylg5z44dynn
* Drop Cyril Brulebois from Uploaders. Thank you very much for your work.
* Bump package version. (rc3 was re-released as 2010.4.0).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2005, Eric Crahen
 
3
 *
 
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 
5
 * of this software and associated documentation files (the "Software"), to deal
 
6
 * in the Software without restriction, including without limitation the rights
 
7
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 
8
 * copies of the Software, and to permit persons to whom the Software is furnished
 
9
 * to do so, subject to the following conditions:
 
10
 *
 
11
 * The above copyright notice and this permission notice shall be included in all
 
12
 * copies or substantial portions of the Software.
 
13
 *
 
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
 
18
 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 
19
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 
20
 *
 
21
 */
 
22
 
 
23
#include "zthread/ThreadedExecutor.h"
 
24
#include "zthread/Guard.h"
 
25
#include "zthread/FastMutex.h"
 
26
#include "zthread/Time.h"
 
27
 
 
28
#include "ThreadImpl.h"
 
29
 
 
30
namespace ZThread {
 
31
 
 
32
  namespace {
 
33
 
 
34
    //! 
 
35
    class WaiterQueue {
 
36
 
 
37
      typedef std::deque<ThreadImpl*>  ThreadList;
 
38
      
 
39
      typedef struct group_t {
 
40
        size_t     id;
 
41
        size_t     count;
 
42
        ThreadList waiters;
 
43
        group_t(size_t n) : id(n), count(0) {}
 
44
      } Group;
 
45
 
 
46
      typedef std::deque<Group>  GroupList;
 
47
 
 
48
      //! Predicate to find a specific group
 
49
      struct by_id : public std::unary_function<bool, Group> {
 
50
        size_t id;
 
51
        by_id(size_t n) : id(n) {}
 
52
        bool operator()(const Group& grp) {
 
53
          return grp.id == id;
 
54
        }
 
55
      };
 
56
 
 
57
      //! Functor to count groups
 
58
      struct counter : public std::unary_function<void, Group> {
 
59
        size_t count;
 
60
        counter() : count(0) {}
 
61
        void operator()(const Group& grp) { count += grp.count; }
 
62
        operator size_t() { return count; }
 
63
      };
 
64
      
 
65
      FastMutex     _lock;
 
66
      GroupList _list;
 
67
      size_t    _id;
 
68
      size_t    _generation;
 
69
 
 
70
    public:
 
71
      
 
72
      WaiterQueue() : _id(0), _generation(0) {
 
73
        // At least one empty-group exists
 
74
        _list.push_back( Group(_id++) );
 
75
      }
 
76
 
 
77
      /**
 
78
       * Insert the current thread into the current waiter list
 
79
       *
 
80
       * @pre  At least one empty group exists
 
81
       * @post At least one empty group exists
 
82
       */
 
83
      bool wait(unsigned long timeout) {
 
84
 
 
85
        ThreadImpl* self = ThreadImpl::current();
 
86
        Monitor& m = self->getMonitor();
 
87
 
 
88
        Monitor::STATE state;
 
89
 
 
90
        Guard<Lockable> g1(_lock);
 
91
 
 
92
        // At least one empty-group exists
 
93
        assert(!_list.empty());
 
94
 
 
95
        // Return w/o waiting if there are no executing tasks
 
96
        if((size_t)std::for_each(_list.begin(), _list.end(), counter()) < 1)
 
97
          return true;
 
98
 
 
99
        // Update the waiter list for the active group 
 
100
        _list.back().waiters.push_back(self);
 
101
        size_t n = _list.back().id;
 
102
 
 
103
        m.acquire();
 
104
        
 
105
        {
 
106
          
 
107
          Guard<Lockable, UnlockedScope> g2(g1);          
 
108
          state = timeout == 0 ? m.wait() : m.wait(timeout);
 
109
 
 
110
        }
 
111
 
 
112
        m.release();
 
113
 
 
114
        // If awoke due to a reason other than the last task in the group 'n' completing,
 
115
        // then then find the group 'self' is waiting in
 
116
        GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
 
117
        if(i != _list.end()) {
 
118
 
 
119
          // Remove 'self' from that list if it is still a member
 
120
          ThreadList::iterator j = std::find(i->waiters.begin(), i->waiters.end(), self);
 
121
          if(j != i->waiters.end())
 
122
            i->waiters.erase(j);
 
123
 
 
124
        }
 
125
 
 
126
        // At least one empty-group exists
 
127
        assert(!_list.empty());
 
128
 
 
129
        switch(state) {
 
130
          case Monitor::SIGNALED:
 
131
            break;
 
132
          case Monitor::TIMEDOUT:
 
133
            return false;
 
134
          case Monitor::INTERRUPTED:
 
135
            throw Interrupted_Exception();
 
136
          default:
 
137
            throw Synchronization_Exception();
 
138
        } 
 
139
       
 
140
        return true;
 
141
 
 
142
      }
 
143
      
 
144
      /**
 
145
       * Increment the active group count
 
146
       *
 
147
       * @pre at least 1 empty group exists
 
148
       * @post at least 1 non-empty group exists 
 
149
       */
 
150
      std::pair<size_t, size_t> increment() {
 
151
        
 
152
        Guard<FastMutex> g(_lock);
 
153
        
 
154
        // At least one empty-group exists
 
155
        assert(!_list.empty());
 
156
 
 
157
        GroupList::iterator i = --_list.end();
 
158
        size_t n = i->id;
 
159
 
 
160
        if(i == _list.end()) {
 
161
 
 
162
          // A group should never have been removed until
 
163
          // the final task in that group completed
 
164
          assert(0);
 
165
 
 
166
        }
 
167
 
 
168
        i->count++;
 
169
 
 
170
        // When the active group is being incremented, insert a new active group
 
171
        // to replace it if there were waiting threads
 
172
        if(i == --_list.end() && !i->waiters.empty()) 
 
173
          _list.push_back(Group(_id++));
 
174
 
 
175
        // At least 1 non-empty group exists
 
176
        assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
 
177
 
 
178
        return std::make_pair(n, _generation);
 
179
 
 
180
      }
 
181
      
 
182
 
 
183
      /**
 
184
       * Decrease the count for the group with the given id.
 
185
       *
 
186
       * @param n group id
 
187
       * 
 
188
       * @pre  At least 1 non-empty group exists
 
189
       * @post At least 1 empty group exists
 
190
       */
 
191
      void decrement(size_t n) {
 
192
 
 
193
        Guard<FastMutex> g1(_lock);
 
194
 
 
195
        // At least 1 non-empty group exists
 
196
        assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
 
197
 
 
198
        // Find the requested group
 
199
        GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
 
200
        if(i == _list.end()) {
 
201
          
 
202
          // A group should never have been removed until
 
203
          // the final task in that group completed
 
204
          assert(0);
 
205
 
 
206
        }
 
207
 
 
208
        // Decrease the count for tasks in this group,
 
209
        if(--i->count == 0 && i == _list.begin()) {
 
210
          
 
211
          do { 
 
212
 
 
213
            // When the first group completes, wake all waiters for every
 
214
            // group, starting from the first until a group that is not 
 
215
            // complete is reached
 
216
 
 
217
            /*
 
218
            // Don't remove the empty active group
 
219
            if(i == --_list.end() && i->waiters.empty())
 
220
              break;
 
221
            */
 
222
            
 
223
            if( awaken(*i) ) {
 
224
            
 
225
              // If all waiters were awakened, remove the group  
 
226
              i = _list.erase(i);
 
227
              
 
228
            } else {
 
229
              
 
230
              {
 
231
 
 
232
                // Otherwise, unlock and yield allowing the waiter
 
233
                // lists to be updated if other threads are busy
 
234
                Guard<FastLock, UnlockedScope> g2(g1);
 
235
                ThreadImpl::yield();
 
236
              
 
237
              }
 
238
 
 
239
              i = _list.begin();
 
240
 
 
241
            }
 
242
              
 
243
          } while(i != _list.end() && i->count == 0); 
 
244
          
 
245
          // Ensure that an active group exists
 
246
          if(_list.empty())
 
247
            _list.push_back( Group(++_id) );
 
248
 
 
249
        }
 
250
 
 
251
        // At least one group exists
 
252
        assert(!_list.empty());
 
253
        
 
254
      }
 
255
 
 
256
      /**
 
257
       */
 
258
      size_t generation(bool next = false) {
 
259
 
 
260
        Guard<FastMutex> g(_lock);
 
261
        return next ? _generation++ : _generation;
 
262
 
 
263
      }
 
264
      
 
265
    private:
 
266
      
 
267
      /**
 
268
       * Awaken all the waiters remaining in the given group
 
269
       * 
 
270
       * @return
 
271
       *   - true if all the waiting threads were successfully awakened.
 
272
       *   - false if there were one or more threads that could not be awakened.
 
273
       */
 
274
      bool awaken(Group& grp) {
 
275
 
 
276
        // Go through the waiter list in the given group; 
 
277
        for(ThreadList::iterator i = grp.waiters.begin(); i != grp.waiters.end();) {
 
278
          
 
279
          ThreadImpl* impl = *i;
 
280
          Monitor& m = impl->getMonitor();
 
281
          
 
282
          // Try the monitor lock, if it cant be locked skip to the next waiter
 
283
          if(m.tryAcquire()) {
 
284
            
 
285
            // Notify the monitor & remove from the waiter list so time isn't
 
286
            // wasted checking it again.
 
287
            i = grp.waiters.erase(i);
 
288
            
 
289
            // Try to wake the waiter, it doesn't matter if this is successful
 
290
            // or not (only fails when the monitor is already going to stop waiting). 
 
291
            m.notify();
 
292
            m.release();
 
293
            
 
294
          } else ++i;
 
295
          
 
296
        }
 
297
        
 
298
        return grp.waiters.empty();
 
299
 
 
300
      }
 
301
 
 
302
    };
 
303
 
 
304
    //! Synchronization point for the Executor 
 
305
    class ExecutorImpl {
 
306
 
 
307
      typedef std::deque<ThreadImpl*> ThreadList;
 
308
 
 
309
      bool _canceled;
 
310
      FastMutex _lock;      
 
311
 
 
312
      //! Worker threads
 
313
      ThreadList _threads;
 
314
      
 
315
      WaiterQueue _queue;
 
316
 
 
317
    public:
 
318
 
 
319
      ExecutorImpl() : _canceled(false) {}
 
320
 
 
321
      WaiterQueue& getWaiterQueue() { 
 
322
        return _queue;
 
323
      }
 
324
 
 
325
      void registerThread(size_t generation) {
 
326
               
 
327
        // Interrupt slow starting threads
 
328
        if(getWaiterQueue().generation() != generation)
 
329
          ThreadImpl::current()->interrupt();
 
330
 
 
331
        // Enqueue for possible future interrupt() 
 
332
        else {
 
333
 
 
334
          Guard<FastMutex> g(_lock);
 
335
          _threads.push_back( ThreadImpl::current() );
 
336
 
 
337
        }
 
338
 
 
339
      }
 
340
 
 
341
      void unregisterThread() {
 
342
        
 
343
        Guard<FastMutex> g(_lock);
 
344
        std::remove(_threads.begin(), _threads.end(), ThreadImpl::current() );
 
345
 
 
346
      }
 
347
 
 
348
      void cancel() {
 
349
 
 
350
        Guard<FastMutex> g(_lock);
 
351
        _canceled = true;
 
352
 
 
353
      }
 
354
 
 
355
      bool isCanceled() {
 
356
 
 
357
        if(_canceled)
 
358
          return true;
 
359
 
 
360
        Guard<FastMutex> g(_lock);
 
361
        return _canceled;
 
362
 
 
363
      }
 
364
 
 
365
      void interrupt() {
 
366
 
 
367
        Guard<FastMutex> g(_lock);
 
368
 
 
369
        // Interrupt all the registered threads
 
370
        for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i)
 
371
          (*i)->interrupt();
 
372
        
 
373
        // Bump the generation up, ensuring slow starting threads get this interrupt
 
374
        getWaiterQueue().generation( true );
 
375
 
 
376
      }      
 
377
 
 
378
    }; /* ExecutorImpl */
 
379
 
 
380
    //! Wrap a generation and a group around a task
 
381
    class Worker : public Runnable {
 
382
 
 
383
      CountedPtr< ExecutorImpl > _impl;
 
384
      Task _task;
 
385
 
 
386
      size_t _generation;
 
387
      size_t _group;
 
388
 
 
389
    public:
 
390
 
 
391
      Worker(const CountedPtr< ExecutorImpl >& impl, const Task& task)
 
392
        : _impl(impl), _task(task) {
 
393
 
 
394
        std::pair<size_t, size_t> pr( _impl->getWaiterQueue().increment() );
 
395
    
 
396
        _group      = pr.first;
 
397
        _generation = pr.second;
 
398
 
 
399
      }
 
400
 
 
401
      size_t group() const {
 
402
        return _group;
 
403
      }
 
404
 
 
405
      size_t generation() const {
 
406
        return _generation;
 
407
      }
 
408
      
 
409
      void run() {
 
410
        
 
411
        // Register this thread once its begun; the generation is used to ensure
 
412
        // threads that are slow starting are properly interrupted
 
413
 
 
414
        _impl->registerThread( generation() );
 
415
        
 
416
        try {
 
417
          _task->run();          
 
418
        } catch(...) {
 
419
          /* consume the exceptions the work propogates */
 
420
        }
 
421
        
 
422
        _impl->getWaiterQueue().decrement( group() );
 
423
 
 
424
        // Unregister this thread
 
425
 
 
426
        _impl->unregisterThread();
 
427
 
 
428
      }
 
429
 
 
430
    }; /* Worker */
 
431
 
 
432
  }
 
433
 
 
434
  ThreadedExecutor::ThreadedExecutor() : _impl(new ExecutorImpl) {}
 
435
 
 
436
  ThreadedExecutor::~ThreadedExecutor() {}
 
437
  
 
438
  void ThreadedExecutor::execute(const Task& task) {
 
439
     
 
440
    Thread t( new Worker(_impl, task) );
 
441
 
 
442
  }  
 
443
 
 
444
  void ThreadedExecutor::interrupt() {
 
445
    _impl->interrupt();
 
446
  }
 
447
 
 
448
  void ThreadedExecutor::cancel() {
 
449
    _impl->cancel();    
 
450
  }
 
451
  
 
452
  bool ThreadedExecutor::isCanceled() {
 
453
    return _impl->isCanceled();
 
454
  }
 
455
 
 
456
  void ThreadedExecutor::wait() {
 
457
    _impl->getWaiterQueue().wait(0);
 
458
  }
 
459
 
 
460
  bool ThreadedExecutor::wait(unsigned long timeout) { 
 
461
    return _impl->getWaiterQueue().wait(timeout == 0 ? 1 : timeout);
 
462
  }
 
463
 
 
464
}