1
#include "base/functional.h"
2
#include "base/logging.h"
3
#include "base/timeutil.h"
4
#include "thread/thread.h"
5
#include "thread/prioritizedworkqueue.h"
7
PrioritizedWorkQueue::~PrioritizedWorkQueue() {
9
ELOG("PrioritizedWorkQueue destroyed but not done!");
13
void PrioritizedWorkQueue::Add(PrioritizedWorkQueueItem *item) {
14
lock_guard guard(mutex_);
15
queue_.push_back(item);
16
notEmpty_.notify_one();
19
void PrioritizedWorkQueue::Stop() {
20
lock_guard guard(mutex_);
22
notEmpty_.notify_one();
25
void PrioritizedWorkQueue::Flush() {
26
lock_guard guard(mutex_);
28
for (auto iter = queue_.begin(); iter != queue_.end(); ++iter) {
33
ILOG("Flushed %d un-executed tasks", flush_count);
36
bool PrioritizedWorkQueue::WaitUntilDone(bool all) {
37
// We'll lock drain this entire time, so make sure you follow that lock ordering.
38
lock_guard guard(drainMutex_);
43
while (!AllItemsDone()) {
44
drain_.wait(drainMutex_);
46
// Return whether empty or not, something just drained.
47
return AllItemsDone();
54
void PrioritizedWorkQueue::NotifyDrain() {
55
lock_guard guard(drainMutex_);
59
bool PrioritizedWorkQueue::AllItemsDone() {
60
lock_guard guard(mutex_);
61
return queue_.empty() && !working_;
64
// The worker should simply call this in a loop. Will block when appropriate.
65
PrioritizedWorkQueueItem *PrioritizedWorkQueue::Pop() {
67
lock_guard guard(mutex_);
68
working_ = false; // The thread only calls Pop if it's done.
71
// Important: make sure mutex_ is not locked while draining.
74
lock_guard guard(mutex_);
79
while (queue_.empty()) {
80
notEmpty_.wait(mutex_);
86
// Find the top priority item (lowest value).
87
float best_prio = std::numeric_limits<float>::infinity();
88
std::vector<PrioritizedWorkQueueItem *>::iterator best = queue_.end();
89
for (auto iter = queue_.begin(); iter != queue_.end(); ++iter) {
90
if ((*iter)->priority() < best_prio) {
92
best_prio = (*iter)->priority();
96
if (best != queue_.end()) {
97
PrioritizedWorkQueueItem *poppedItem = *best;
99
working_ = true; // This will be worked on.
102
// Not really sure how this can happen, but let's be safe.
107
// TODO: This feels ugly. Revisit later.
109
static std::thread *workThread;
111
static void threadfunc(PrioritizedWorkQueue *wq) {
113
PrioritizedWorkQueueItem *item = wq->Pop();
124
void ProcessWorkQueueOnThreadWhile(PrioritizedWorkQueue *wq) {
125
workThread = new std::thread(std::bind(&threadfunc, wq));
128
void StopProcessingWorkQueue(PrioritizedWorkQueue *wq) {