2
* Copyright 2011 Google Inc.
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
* you may not use this file except in compliance with the License.
6
* You may obtain a copy of the License at
8
* http://www.apache.org/licenses/LICENSE-2.0
10
* Unless required by applicable law or agreed to in writing, software
11
* distributed under the License is distributed on an "AS IS" BASIS,
12
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
* See the License for the specific language governing permissions and
14
* limitations under the License.
17
// Author: jmarantz@google.com (Joshua Marantz)
19
// implements a simple worker pool, allowing arbitrary functions to run
20
// using a pool of threads of predefined maximum size.
22
// This differs from QueuedWorker, which always uses exactly one thread.
23
// In this interface, any task can be assigned to any thread.
25
#ifndef PAGESPEED_KERNEL_THREAD_QUEUED_WORKER_POOL_H_
26
#define PAGESPEED_KERNEL_THREAD_QUEUED_WORKER_POOL_H_
28
#include <cstddef> // for size_t
33
#include "pagespeed/kernel/base/basictypes.h"
34
#include "pagespeed/kernel/base/function.h"
35
#include "pagespeed/kernel/base/scoped_ptr.h"
36
#include "pagespeed/kernel/base/string.h"
37
#include "pagespeed/kernel/base/string_util.h"
38
#include "pagespeed/kernel/base/thread_annotations.h"
39
#include "pagespeed/kernel/base/thread_system.h"
41
namespace net_instaweb {
47
// Maintains a predefined number of worker threads, and dispatches any
48
// number of groups of sequential tasks to those threads.
49
class QueuedWorkerPool {
51
static const int kNoLoadShedding = -1;
53
QueuedWorkerPool(int max_workers, StringPiece thread_name_base,
54
ThreadSystem* thread_system);
57
// Functions added to a Sequence will be run sequentially, though not
58
// necessarily always from the same worker thread. The scheduler will
59
// continue to schedule new work added to the sequence until
60
// FreeSequence is called.
63
// AddFunction is a callback that when invoked queues another callback on
64
// the given sequence, and when canceled queues a cancel call to the
65
// sequence instead. The cancellation behavior is what makes this different
66
// from a simple call to MakeFunction(sequence, &Sequence::Add, callback).
67
class AddFunction : public Function {
69
AddFunction(Sequence* sequence, Function* callback)
70
: sequence_(sequence), callback_(callback) { }
71
virtual ~AddFunction();
75
sequence_->Add(callback_);
77
virtual void Cancel() {
78
sequence_->Add(MakeFunction(callback_, &Function::CallCancel));
84
DISALLOW_COPY_AND_ASSIGN(AddFunction);
87
// Adds 'function' to a sequence. Note that this can occur at any time
88
// the sequence is live -- you can add functions to a sequence that has
89
// already started processing.
91
// 'function' can be called any time after Add(), and may in fact be
92
// called before Add() returns.
94
// Ownership of 'function' is transferred to the Sequence, which deletes
95
// it after execution or upon cancellation due to shutdown.
97
// If the pool is being shut down at the time Add is being called,
98
// this method will call function->Cancel().
99
void Add(Function* function) LOCKS_EXCLUDED(sequence_mutex_);
101
void set_queue_size_stat(Waveform* x) { queue_size_ = x; }
103
// Sets the maximum number of functions that can be enqueued to a sequence.
104
// By default, sequences are unbounded. When a bound is reached, the oldest
105
// functions are retired by calling Cancel() on them.
106
void set_max_queue_size(size_t x) { max_queue_size_ = x; }
108
// Calls Cancel on all pending functions in the queue.
109
void CancelPendingFunctions() LOCKS_EXCLUDED(sequence_mutex_);
112
// Construct using QueuedWorkerPool::NewSequence().
113
Sequence(ThreadSystem* thread_system, QueuedWorkerPool* pool);
115
// Free by calling QueuedWorkerPool::FreeSequence().
118
// Resets a new or recycled Sequence to its original state.
121
// Waits for any currently active function to complete, deletes
122
// any other outstanding functions. During the shutdown process,
123
// the Sequence will simply delete, without running, any function
124
// added to it from another thread.
126
// This function blocks until shutdown is complete.
127
void WaitForShutDown() LOCKS_EXCLUDED(sequence_mutex_);
129
// Puts the Sequence in shutdown mode, but does not block until shutdown
130
// is complete. Return 'true' if the sequence is inactive and thus can
131
// be immediately recycled.
132
bool InitiateShutDown() LOCKS_EXCLUDED(sequence_mutex_);
134
// Gets the next function in the sequence, and transfers ownership
136
Function* NextFunction() LOCKS_EXCLUDED(sequence_mutex_);
138
bool IsBusy() EXCLUSIVE_LOCKS_REQUIRED(sequence_mutex_);
140
// Returns number of tasks that were canceled.
141
int CancelTasksOnWorkQueue() EXCLUSIVE_LOCKS_REQUIRED(sequence_mutex_);
143
// Cancels all pending tasks (and updates stats appropriately).
144
void Cancel() LOCKS_EXCLUDED(sequence_mutex_);
146
friend class QueuedWorkerPool;
147
std::deque<Function*> work_queue_;
148
scoped_ptr<ThreadSystem::CondvarCapableMutex> sequence_mutex_;
149
QueuedWorkerPool* pool_;
152
scoped_ptr<ThreadSystem::Condvar> termination_condvar_;
153
Waveform* queue_size_;
154
size_t max_queue_size_;
156
DISALLOW_COPY_AND_ASSIGN(Sequence);
159
typedef std::set<Sequence*> SequenceSet;
161
// Sequence is owned by the pool, and will be automatically freed when
162
// the pool is finally freed (e.g. on server shutdown). But the sequence
163
// does *not* auto-destruct when complete; it must be explicitly freed
164
// using FreeSequence().
165
Sequence* NewSequence(); // Returns NULL if shutting down.
167
// Shuts down a sequence and frees it. This does *not* block waiting
168
// for the Sequence to finish.
169
void FreeSequence(Sequence* sequence);
171
// Shuts down all Sequences and Worker threads, but does not delete the
172
// sequences. The sequences will be deleted when the pool is destructed.
174
// Equivalent to "InitiateShutDown(); WaitForShutDownComplete();"
177
// Starts the shutdown process, preventing further tasks from being queued.
178
// Does not wait for any active tasks to be completed. This must be followed
179
// by WaitForShutDownComplete. It is invalid to call InitiateShutDown() twice
181
void InitiateShutDown();
183
// Blocks waiting for all outstanding tasks to be completed. Must be preceded
184
// by InitiateShutDown().
185
void WaitForShutDownComplete();
187
// Returns true if any of the given sequences is busy. Note that multiple
188
// sequences are checked atomically; otherwise we could end up missing
189
// work. For example, consider if we had a sequence for main rewrite work,
190
// and an another one for expensive work.
191
// In this case, if we tried to check their busyness independently, the
192
// following could happen:
193
// 1) First portion of inexpensive work is done, so we queue up
194
// some on expensive work thread.
195
// 2) We check whether inexpensive work sequence is busy. It's not.
196
// 3) The expensive work runs, finishes, and queues up more inexpensive
198
// 4) We check whether expensive sequence is busy. It's not, so we would
199
// conclude we quiesced --- while there was still work in the inexpensive
201
static bool AreBusy(const SequenceSet& sequences);
203
// If x == kNoLoadShedding disables load-shedding.
204
// Otherwise, if more than x sequences are queued waiting to run,
205
// sequences will start getting dropped and canceled, with oldest
206
// sequences canceled first.
208
// Precondition: x > 0 || x == kNoLoadShedding
209
// x = kNoLoadShedding (the default) disables the limit.
211
// Should be called before starting any work.
212
void SetLoadSheddingThreshold(int x);
214
// Sets up a timed-variable statistic indicating the current queue depth.
216
// This must be called prior to creating sequences.
217
void set_queue_size_stat(Waveform* x) { queue_size_ = x; }
220
friend class Sequence;
221
void Run(Sequence* sequence, QueuedWorker* worker);
222
void QueueSequence(Sequence* sequence);
223
Sequence* AssignWorkerToNextSequence(QueuedWorker* worker);
224
void SequenceNoLongerActive(Sequence* sequence);
226
ThreadSystem* thread_system_;
227
scoped_ptr<AbstractMutex> mutex_;
229
// active_workers_ and available_workers_ are mutually exclusive.
230
std::set<QueuedWorker*> active_workers_;
231
std::vector<QueuedWorker*> available_workers_;
233
// queued_sequences_ and free_sequences_ are mutually exclusive, but
234
// all_sequences contains all of them.
235
std::vector<Sequence*> all_sequences_;
236
std::deque<Sequence*> queued_sequences_;
237
std::vector<Sequence*> free_sequences_;
239
GoogleString thread_name_base_;
244
Waveform* queue_size_;
245
int load_shedding_threshold_;
247
DISALLOW_COPY_AND_ASSIGN(QueuedWorkerPool);
250
} // namespace net_instaweb
252
#endif // PAGESPEED_KERNEL_THREAD_QUEUED_WORKER_POOL_H_