~ppsspp/ppsspp/ppsspp-1.2.2

« back to all changes in this revision

Viewing changes to ext/native/thread/prioritizedworkqueue.cpp

  • Committer: Sérgio Benjamim
  • Date: 2016-04-25 02:30:18 UTC
  • Revision ID: sergio_br2@yahoo.com.br-20160425023018-wk3rd7nu30fejjzz
1.2.2 source.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#include "base/functional.h"
 
2
#include "base/logging.h"
 
3
#include "base/timeutil.h"
 
4
#include "thread/thread.h"
 
5
#include "thread/prioritizedworkqueue.h"
 
6
 
 
7
PrioritizedWorkQueue::~PrioritizedWorkQueue() {
 
8
        if (!done_) {
 
9
                ELOG("PrioritizedWorkQueue destroyed but not done!");
 
10
        }
 
11
}
 
12
 
 
13
void PrioritizedWorkQueue::Add(PrioritizedWorkQueueItem *item) {
 
14
        lock_guard guard(mutex_);
 
15
        queue_.push_back(item);
 
16
        notEmpty_.notify_one();
 
17
}
 
18
 
 
19
void PrioritizedWorkQueue::Stop() {
 
20
        lock_guard guard(mutex_);
 
21
        done_ = true;
 
22
        notEmpty_.notify_one();
 
23
}
 
24
 
 
25
void PrioritizedWorkQueue::Flush() {
 
26
        lock_guard guard(mutex_);
 
27
        int flush_count = 0;
 
28
        for (auto iter = queue_.begin(); iter != queue_.end(); ++iter) {
 
29
                delete *iter;
 
30
                flush_count++;
 
31
        }
 
32
        queue_.clear();
 
33
        ILOG("Flushed %d un-executed tasks", flush_count);
 
34
}
 
35
 
 
36
bool PrioritizedWorkQueue::WaitUntilDone(bool all) {
 
37
        // We'll lock drain this entire time, so make sure you follow that lock ordering.
 
38
        lock_guard guard(drainMutex_);
 
39
        if (AllItemsDone()) {
 
40
                return true;
 
41
        }
 
42
 
 
43
        while (!AllItemsDone()) {
 
44
                drain_.wait(drainMutex_);
 
45
                if (!all) {
 
46
                        // Return whether empty or not, something just drained.
 
47
                        return AllItemsDone();
 
48
                }
 
49
        }
 
50
 
 
51
        return true;
 
52
}
 
53
 
 
54
void PrioritizedWorkQueue::NotifyDrain() {
 
55
        lock_guard guard(drainMutex_);
 
56
        drain_.notify_one();
 
57
}
 
58
 
 
59
bool PrioritizedWorkQueue::AllItemsDone() {
 
60
        lock_guard guard(mutex_);
 
61
        return queue_.empty() && !working_;
 
62
}
 
63
 
 
64
// The worker should simply call this in a loop. Will block when appropriate.
 
65
PrioritizedWorkQueueItem *PrioritizedWorkQueue::Pop() {
 
66
        {
 
67
                lock_guard guard(mutex_);
 
68
                working_ = false;  // The thread only calls Pop if it's done.
 
69
        }
 
70
 
 
71
        // Important: make sure mutex_ is not locked while draining.
 
72
        NotifyDrain();
 
73
 
 
74
        lock_guard guard(mutex_);
 
75
        if (done_) {
 
76
                return 0;
 
77
        }
 
78
 
 
79
        while (queue_.empty()) {
 
80
                notEmpty_.wait(mutex_);
 
81
                if (done_) {
 
82
                        return 0;
 
83
                }
 
84
        }
 
85
 
 
86
        // Find the top priority item (lowest value).
 
87
        float best_prio = std::numeric_limits<float>::infinity();
 
88
        std::vector<PrioritizedWorkQueueItem *>::iterator best = queue_.end();
 
89
        for (auto iter = queue_.begin(); iter != queue_.end(); ++iter) {
 
90
                if ((*iter)->priority() < best_prio) {
 
91
                        best = iter;
 
92
                        best_prio = (*iter)->priority();
 
93
                }
 
94
        }
 
95
 
 
96
        if (best != queue_.end()) {
 
97
                PrioritizedWorkQueueItem *poppedItem = *best;
 
98
                queue_.erase(best);
 
99
                working_ = true;  // This will be worked on.
 
100
                return poppedItem;
 
101
        } else {
 
102
                // Not really sure how this can happen, but let's be safe.
 
103
                return 0;
 
104
        }
 
105
}
 
106
 
 
107
// TODO: This feels ugly. Revisit later.
 
108
 
 
109
static std::thread *workThread;
 
110
 
 
111
static void threadfunc(PrioritizedWorkQueue *wq) {
 
112
        while (true) {
 
113
                PrioritizedWorkQueueItem *item = wq->Pop();
 
114
                if (!item) {
 
115
                        if (wq->Done())
 
116
                                break;
 
117
                } else {
 
118
                        item->run();
 
119
                        delete item;
 
120
                }
 
121
        }
 
122
}
 
123
 
 
124
void ProcessWorkQueueOnThreadWhile(PrioritizedWorkQueue *wq) {
 
125
        workThread = new std::thread(std::bind(&threadfunc, wq));
 
126
}
 
127
 
 
128
void StopProcessingWorkQueue(PrioritizedWorkQueue *wq) {
 
129
        wq->Stop();
 
130
        if (workThread) {
 
131
                workThread->join();
 
132
                delete workThread;
 
133
        }
 
134
        workThread = 0;
 
135
}