~alinuxninja/nginx-edge/trunk

« back to all changes in this revision

Viewing changes to debian/modules/ngx_pagespeed/psol/include/pagespeed/kernel/thread/queued_worker_pool.h

  • Committer: Vivian
  • Date: 2015-12-04 18:20:11 UTC
  • Revision ID: git-v1:a36f2bc32e884f7473b3a47040e5411306144d7d
* Do not extract psol.tar.gz

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
 * Copyright 2011 Google Inc.
3
 
 *
4
 
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 
 * you may not use this file except in compliance with the License.
6
 
 * You may obtain a copy of the License at
7
 
 *
8
 
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 
 *
10
 
 * Unless required by applicable law or agreed to in writing, software
11
 
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 
 * See the License for the specific language governing permissions and
14
 
 * limitations under the License.
15
 
 */
16
 
 
17
 
// Author: jmarantz@google.com (Joshua Marantz)
18
 
//
19
 
// implements a simple worker pool, allowing arbitrary functions to run
20
 
// using a pool of threads of predefined maximum size.
21
 
//
22
 
// This differs from QueuedWorker, which always uses exactly one thread.
23
 
// In this interface, any task can be assigned to any thread.
24
 
 
25
 
#ifndef PAGESPEED_KERNEL_THREAD_QUEUED_WORKER_POOL_H_
26
 
#define PAGESPEED_KERNEL_THREAD_QUEUED_WORKER_POOL_H_
27
 
 
28
 
#include <cstddef>  // for size_t
29
 
#include <deque>
30
 
#include <set>
31
 
#include <vector>
32
 
 
33
 
#include "pagespeed/kernel/base/basictypes.h"
34
 
#include "pagespeed/kernel/base/function.h"
35
 
#include "pagespeed/kernel/base/scoped_ptr.h"
36
 
#include "pagespeed/kernel/base/string.h"
37
 
#include "pagespeed/kernel/base/string_util.h"
38
 
#include "pagespeed/kernel/base/thread_annotations.h"
39
 
#include "pagespeed/kernel/base/thread_system.h"
40
 
 
41
 
namespace net_instaweb {
42
 
 
43
 
class AbstractMutex;
44
 
class QueuedWorker;
45
 
class Waveform;
46
 
 
47
 
// Maintains a predefined number of worker threads, and dispatches any
48
 
// number of groups of sequential tasks to those threads.
49
 
class QueuedWorkerPool {
50
 
 public:
51
 
  static const int kNoLoadShedding = -1;
52
 
 
53
 
  QueuedWorkerPool(int max_workers, StringPiece thread_name_base,
54
 
                   ThreadSystem* thread_system);
55
 
  ~QueuedWorkerPool();
56
 
 
57
 
  // Functions added to a Sequence will be run sequentially, though not
58
 
  // necessarily always from the same worker thread.  The scheduler will
59
 
  // continue to schedule new work added to the sequence until
60
 
  // FreeSequence is called.
61
 
  class Sequence {
62
 
   public:
63
 
    // AddFunction is a callback that when invoked queues another callback on
64
 
    // the given sequence, and when canceled queues a cancel call to the
65
 
    // sequence instead.  The cancellation behavior is what makes this different
66
 
    // from a simple call to MakeFunction(sequence, &Sequence::Add, callback).
67
 
    class AddFunction : public Function {
68
 
     public:
69
 
      AddFunction(Sequence* sequence, Function* callback)
70
 
          : sequence_(sequence), callback_(callback) { }
71
 
      virtual ~AddFunction();
72
 
 
73
 
     protected:
74
 
      virtual void Run() {
75
 
        sequence_->Add(callback_);
76
 
      }
77
 
      virtual void Cancel() {
78
 
        sequence_->Add(MakeFunction(callback_, &Function::CallCancel));
79
 
      }
80
 
 
81
 
     private:
82
 
      Sequence* sequence_;
83
 
      Function* callback_;
84
 
      DISALLOW_COPY_AND_ASSIGN(AddFunction);
85
 
    };
86
 
 
87
 
    // Adds 'function' to a sequence.  Note that this can occur at any time
88
 
    // the sequence is live -- you can add functions to a sequence that has
89
 
    // already started processing.
90
 
    //
91
 
    // 'function' can be called any time after Add(), and may in fact be
92
 
    // called before Add() returns.
93
 
    //
94
 
    // Ownership of 'function' is transferred to the Sequence, which deletes
95
 
    // it after execution or upon cancellation due to shutdown.
96
 
    //
97
 
    // If the pool is being shut down at the time Add is being called,
98
 
    // this method will call function->Cancel().
99
 
    void Add(Function* function) LOCKS_EXCLUDED(sequence_mutex_);
100
 
 
101
 
    void set_queue_size_stat(Waveform* x) { queue_size_ = x; }
102
 
 
103
 
    // Sets the maximum number of functions that can be enqueued to a sequence.
104
 
    // By default, sequences are unbounded.  When a bound is reached, the oldest
105
 
    // functions are retired by calling Cancel() on them.
106
 
    void set_max_queue_size(size_t x) { max_queue_size_ = x; }
107
 
 
108
 
    // Calls Cancel on all pending functions in the queue.
109
 
    void CancelPendingFunctions() LOCKS_EXCLUDED(sequence_mutex_);
110
 
 
111
 
   private:
112
 
    // Construct using QueuedWorkerPool::NewSequence().
113
 
    Sequence(ThreadSystem* thread_system, QueuedWorkerPool* pool);
114
 
 
115
 
    // Free by calling QueuedWorkerPool::FreeSequence().
116
 
    ~Sequence();
117
 
 
118
 
    // Resets a new or recycled Sequence to its original state.
119
 
    void Reset();
120
 
 
121
 
    // Waits for any currently active function to complete, deletes
122
 
    // any other outstanding functions.  During the shutdown process,
123
 
    // the Sequence will simply delete, without running, any function
124
 
    // added to it from another thread.
125
 
    //
126
 
    // This function blocks until shutdown is complete.
127
 
    void WaitForShutDown() LOCKS_EXCLUDED(sequence_mutex_);
128
 
 
129
 
    // Puts the Sequence in shutdown mode, but does not block until shutdown
130
 
    // is complete.  Return 'true' if the sequence is inactive and thus can
131
 
    // be immediately recycled.
132
 
    bool InitiateShutDown() LOCKS_EXCLUDED(sequence_mutex_);
133
 
 
134
 
    // Gets the next function in the sequence, and transfers ownership
135
 
    // the the caller.
136
 
    Function* NextFunction() LOCKS_EXCLUDED(sequence_mutex_);
137
 
 
138
 
    bool IsBusy() EXCLUSIVE_LOCKS_REQUIRED(sequence_mutex_);
139
 
 
140
 
    // Returns number of tasks that were canceled.
141
 
    int CancelTasksOnWorkQueue() EXCLUSIVE_LOCKS_REQUIRED(sequence_mutex_);
142
 
 
143
 
    // Cancels all pending tasks (and updates stats appropriately).
144
 
    void Cancel() LOCKS_EXCLUDED(sequence_mutex_);
145
 
 
146
 
    friend class QueuedWorkerPool;
147
 
    std::deque<Function*> work_queue_;
148
 
    scoped_ptr<ThreadSystem::CondvarCapableMutex> sequence_mutex_;
149
 
    QueuedWorkerPool* pool_;
150
 
    bool shutdown_;
151
 
    bool active_;
152
 
    scoped_ptr<ThreadSystem::Condvar> termination_condvar_;
153
 
    Waveform* queue_size_;
154
 
    size_t max_queue_size_;
155
 
 
156
 
    DISALLOW_COPY_AND_ASSIGN(Sequence);
157
 
  };
158
 
 
159
 
  typedef std::set<Sequence*> SequenceSet;
160
 
 
161
 
  // Sequence is owned by the pool, and will be automatically freed when
162
 
  // the pool is finally freed (e.g. on server shutdown).  But the sequence
163
 
  // does *not* auto-destruct when complete; it must be explicitly freed
164
 
  // using FreeSequence().
165
 
  Sequence* NewSequence();  // Returns NULL if shutting down.
166
 
 
167
 
  // Shuts down a sequence and frees it.  This does *not* block waiting
168
 
  // for the Sequence to finish.
169
 
  void FreeSequence(Sequence* sequence);
170
 
 
171
 
  // Shuts down all Sequences and Worker threads, but does not delete the
172
 
  // sequences.  The sequences will be deleted when the pool is destructed.
173
 
  //
174
 
  // Equivalent to "InitiateShutDown(); WaitForShutDownComplete();"
175
 
  void ShutDown();
176
 
 
177
 
  // Starts the shutdown process, preventing further tasks from being queued.
178
 
  // Does not wait for any active tasks to be completed.  This must be followed
179
 
  // by WaitForShutDownComplete.  It is invalid to call InitiateShutDown() twice
180
 
  // in a row.
181
 
  void InitiateShutDown();
182
 
 
183
 
  // Blocks waiting for all outstanding tasks to be completed.  Must be preceded
184
 
  // by InitiateShutDown().
185
 
  void WaitForShutDownComplete();
186
 
 
187
 
  // Returns true if any of the given sequences is busy. Note that multiple
188
 
  // sequences are checked atomically; otherwise we could end up missing
189
 
  // work. For example, consider if we had a sequence for main rewrite work,
190
 
  // and an another one for expensive work.
191
 
  // In this case, if we tried to check their busyness independently, the
192
 
  // following could happen:
193
 
  // 1) First portion of inexpensive work is done, so we queue up
194
 
  //    some on expensive work thread.
195
 
  // 2) We check whether inexpensive work sequence is busy. It's not.
196
 
  // 3) The expensive work runs, finishes, and queues up more inexpensive
197
 
  //    work.
198
 
  // 4) We check whether expensive sequence is busy. It's not, so we would
199
 
  //    conclude we quiesced --- while there was still work in the inexpensive
200
 
  //    queue.
201
 
  static bool AreBusy(const SequenceSet& sequences);
202
 
 
203
 
  // If x == kNoLoadShedding disables load-shedding.
204
 
  // Otherwise, if more than x sequences are queued waiting to run,
205
 
  // sequences will start getting dropped and canceled, with oldest
206
 
  // sequences canceled first.
207
 
  //
208
 
  // Precondition: x > 0 || x == kNoLoadShedding
209
 
  // x = kNoLoadShedding (the default) disables the limit.
210
 
  //
211
 
  // Should be called before starting any work.
212
 
  void SetLoadSheddingThreshold(int x);
213
 
 
214
 
  // Sets up a timed-variable statistic indicating the current queue depth.
215
 
  //
216
 
  // This must be called prior to creating sequences.
217
 
  void set_queue_size_stat(Waveform* x) { queue_size_ = x; }
218
 
 
219
 
 private:
220
 
  friend class Sequence;
221
 
  void Run(Sequence* sequence, QueuedWorker* worker);
222
 
  void QueueSequence(Sequence* sequence);
223
 
  Sequence* AssignWorkerToNextSequence(QueuedWorker* worker);
224
 
  void SequenceNoLongerActive(Sequence* sequence);
225
 
 
226
 
  ThreadSystem* thread_system_;
227
 
  scoped_ptr<AbstractMutex> mutex_;
228
 
 
229
 
  // active_workers_ and available_workers_ are mutually exclusive.
230
 
  std::set<QueuedWorker*> active_workers_;
231
 
  std::vector<QueuedWorker*> available_workers_;
232
 
 
233
 
  // queued_sequences_ and free_sequences_ are mutually exclusive, but
234
 
  // all_sequences contains all of them.
235
 
  std::vector<Sequence*> all_sequences_;
236
 
  std::deque<Sequence*> queued_sequences_;
237
 
  std::vector<Sequence*> free_sequences_;
238
 
 
239
 
  GoogleString thread_name_base_;
240
 
 
241
 
  size_t max_workers_;
242
 
  bool shutdown_;
243
 
 
244
 
  Waveform* queue_size_;
245
 
  int load_shedding_threshold_;
246
 
 
247
 
  DISALLOW_COPY_AND_ASSIGN(QueuedWorkerPool);
248
 
};
249
 
 
250
 
}  // namespace net_instaweb
251
 
 
252
 
#endif  // PAGESPEED_KERNEL_THREAD_QUEUED_WORKER_POOL_H_