~stewart/percona-playback/cassert-header

« back to all changes in this revision

Viewing changes to src/sheduler.cc

  • Committer: Oleg Tsarev
  • Date: 2011-05-04 21:38:59 UTC
  • mfrom: (108.1.18 sbt)
  • Revision ID: oleg.tsarev@percona.com-20110504213859-pw1mgjmuj9gz44vb
merge split_by_transaction

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#include <set>
 
2
#include <map>
 
3
#include <queue>
 
4
#include "sheduler.h"
 
5
#include "worker.h"
 
6
#include "query.h"
 
7
#include "options.h"
 
8
 
 
9
typedef Query_Queue       Task;
 
10
class Task_Manager : None_Copyable
 
11
{
 
12
private:
 
13
  typedef Memory_Pool<Task> Pool;
 
14
  typedef std::deque<Task*> Queue;
 
15
public:
 
16
Task_Manager(std::size_t a_buffer_size) :
 
17
  m_buffer_size(a_buffer_size)
 
18
  {
 
19
  }
 
20
  ~Task_Manager()
 
21
  {
 
22
  }
 
23
  Task* create()
 
24
  {
 
25
    if (m_free.empty())
 
26
    {
 
27
      Task* result= new Task(m_buffer_size);
 
28
      m_pool.add(result);
 
29
      return result;
 
30
    }
 
31
    else
 
32
    {
 
33
      Task* result= m_free.front();
 
34
      m_free.pop_front();
 
35
      result->reset();
 
36
      return result;
 
37
    }
 
38
  }
 
39
  void complete(Task* a_task)
 
40
  {
 
41
    if (m_free.size() < max_idle_thread_count())
 
42
    {
 
43
      m_free.push_back(a_task);
 
44
    }
 
45
    else
 
46
    {
 
47
      m_pool.del(a_task);
 
48
    }
 
49
  }
 
50
  void flush()
 
51
  {
 
52
    for(Pool::Set::iterator i= m_pool.set().begin(), e= m_pool.set().end();
 
53
        e != i; ++i)
 
54
    {
 
55
      (*i)->flush();
 
56
    }
 
57
  }
 
58
  void stop()
 
59
  {
 
60
    for(Pool::Set::iterator i= m_pool.set().begin(), e= m_pool.set().end();
 
61
        e != i; ++i)
 
62
    {
 
63
      (*i)->stop();
 
64
    }
 
65
  }
 
66
private:
 
67
  std::size_t m_buffer_size;
 
68
  Queue m_free;
 
69
  Pool m_pool;
 
70
};
 
71
 
 
72
class Split_By_Connection : public Sheduler
 
73
{
 
74
private:
 
75
  typedef std::map<Thread_Id, Task*> Map;
 
76
  typedef std::map<Task*, Thread_Id> Id;
 
77
  typedef Memory_Pool<Task> Pool;
 
78
public:
 
79
Split_By_Connection(std::size_t a_buffer_size) :
 
80
  Sheduler(::max_idle_thread_count())
 
81
    , m_manager(a_buffer_size)
 
82
  {
 
83
  }
 
84
public:
 
85
  virtual void stop()
 
86
  {
 
87
    Mutex::Lock lk(m_mutex);
 
88
    m_manager.stop();
 
89
    do_stop();
 
90
  }
 
91
  virtual void flush()
 
92
  {
 
93
    Mutex::Lock lk(m_mutex);
 
94
    m_manager.flush();
 
95
    do_flush();
 
96
  }
 
97
  virtual bool task_complete(Worker *a_worker, Task *a_task)
 
98
  {
 
99
    Mutex::Lock lk(m_mutex);
 
100
    task_complete(a_task);
 
101
    return do_task_complete(a_worker);
 
102
  }
 
103
  virtual bool task_crash(Worker *a_worker, Task *a_task, Error const &a_error)
 
104
  {
 
105
    Mutex::Lock lk(m_mutex);
 
106
    task_complete(a_task);
 
107
    has_troubles(a_error);
 
108
    return do_task_crash(a_worker);
 
109
  }
 
110
private:
 
111
  Map::iterator task(Thread_Id a_thread_id)
 
112
  {
 
113
    Map::iterator result= m_task.find(a_thread_id);
 
114
    ASSERT((false == active(result)) ||
 
115
           (a_thread_id == m_id.find(task(result))->second));
 
116
    return result;
 
117
  }
 
118
  bool active(Map::iterator i)
 
119
  {
 
120
    return m_task.end() != i;
 
121
  }
 
122
  Task* task(Map::iterator i)
 
123
  {
 
124
    ASSERT(active(i) && (m_id.end() != m_id.find(i->second)));
 
125
    return i->second;
 
126
  }
 
127
  Map::iterator create_task(Thread_Id a_thread_id)
 
128
  {
 
129
    std::pair< Thread_Id, Task* > task;
 
130
    std::pair< Task*, Thread_Id > id;
 
131
 
 
132
    task.first=  id.second=   a_thread_id;
 
133
    id.first=    task.second= m_manager.create();
 
134
 
 
135
    std::pair< Map::iterator, bool > task_result;
 
136
    std::pair< Id::iterator, bool >  id_result;
 
137
 
 
138
    task_result= m_task.insert(task);
 
139
    ASSERT(true == task_result.second);
 
140
 
 
141
    id_result= m_id.insert(id);
 
142
    ASSERT(true == id_result.second);
 
143
 
 
144
    ASSERT(task_result.first->first == id_result.first->second);
 
145
 
 
146
    return task_result.first;
 
147
  }
 
148
  void task_complete(Task * a_task)
 
149
  {
 
150
    Id::iterator  id=   m_id.find(a_task);
 
151
    ASSERT(m_id.end() != id);
 
152
    Map::iterator task= m_task.find(id->second);
 
153
    ASSERT(m_task.end() != task);
 
154
    m_task.erase(task);
 
155
    m_id.erase(id);
 
156
    m_manager.complete(a_task);
 
157
  }
 
158
public:
 
159
  virtual bool run(Query const &a_query)
 
160
  {
 
161
    bool is_quit=        ::is_quit(a_query);
 
162
    Thread_Id thread_id= a_query.thread_id;
 
163
 
 
164
    Mutex::Lock lk(m_mutex);
 
165
 
 
166
    if (is_stop() || is_flush())
 
167
      return false;
 
168
 
 
169
    Map::iterator i= task(thread_id);
 
170
 
 
171
    bool is_active=  active(i);
 
172
 
 
173
    if (is_quit)
 
174
    {
 
175
      if (is_active)
 
176
      {
 
177
        // flush queue
 
178
        task(i)->flush();
 
179
      }
 
180
      else
 
181
      {
 
182
        // not start
 
183
      }
 
184
    }
 
185
    else
 
186
    {
 
187
      if (!is_active)
 
188
      {
 
189
        i= create_task(thread_id);
 
190
        task_start(task(i));
 
191
      }
 
192
      // add query to task
 
193
      task(i)->push(a_query);
 
194
      if (is_flush())
 
195
      {
 
196
        task(i)->flush();
 
197
      }
 
198
    }
 
199
    return true;
 
200
  }
 
201
private:
 
202
  Task_Manager m_manager;
 
203
  Map m_task;
 
204
  Id m_id;
 
205
};
 
206
 
 
207
class Transaction_Manager : public Task_Manager
 
208
{
 
209
private:
 
210
  typedef Task_Manager Pool;
 
211
  typedef Thread_Id Id;
 
212
  typedef std::map<Thread_Id, Id>  Thread_Id_Map;
 
213
  typedef std::map<Id, Task*>      Task_Map;
 
214
public:
 
215
Transaction_Manager(std::size_t a_buffer_size) :
 
216
  Task_Manager(a_buffer_size), m_id(0)
 
217
  {
 
218
  }
 
219
private:
 
220
  Task* add(Thread_Id_Map::iterator a_thread_id, Query const &a_query)
 
221
  {
 
222
    ASSERT(a_thread_id == m_thread_id.find(a_query.thread_id));
 
223
    ASSERT(a_thread_id != m_thread_id.end());
 
224
    ASSERT(a_thread_id->first == a_query.thread_id);
 
225
 
 
226
    Task_Map::iterator task= m_task.find(a_thread_id->second);
 
227
    ASSERT(m_task.end() != task);
 
228
    Task* result= task->second;
 
229
    result->push(a_query);
 
230
    if (::is_end(a_query) || ::is_quit(a_query))
 
231
    {
 
232
      result->flush();
 
233
      m_thread_id.erase(a_thread_id);
 
234
      m_task.erase(task);
 
235
      return result;
 
236
    }
 
237
    else
 
238
    {
 
239
      return 0;
 
240
    }
 
241
  }
 
242
  Task* add(Query const &a_query)
 
243
  {
 
244
    ASSERT(m_thread_id.find(a_query.thread_id) == m_thread_id.end());
 
245
 
 
246
    if (::is_begin(a_query))
 
247
    {
 
248
      std::pair< Thread_Id, Id > id;
 
249
      std::pair< Id, Task* > task;
 
250
      std::pair< Thread_Id_Map::iterator, bool > id_result;
 
251
      std::pair< Task_Map::iterator,bool > task_result;
 
252
 
 
253
      id.first=  a_query.thread_id;
 
254
      id.second= m_id++;
 
255
 
 
256
      id_result= m_thread_id.insert(id);
 
257
      ASSERT(id_result.second);
 
258
 
 
259
      task.first= (*(id_result.first)).second;
 
260
      task.second= Pool::create();
 
261
 
 
262
      task_result= m_task.insert(task);
 
263
      ASSERT(task_result.second);
 
264
 
 
265
      (*(task_result.first)).second->push(a_query);
 
266
 
 
267
      return 0;
 
268
    }
 
269
    else if (::is_quit(a_query) || ::is_end(a_query))
 
270
    {
 
271
      return 0;
 
272
    }
 
273
    else
 
274
    {
 
275
      Task* task= Pool::create();
 
276
      task->push(a_query);
 
277
      task->flush();
 
278
      return task;
 
279
    }
 
280
  }
 
281
public:
 
282
  Task* create(Query const& a_query)
 
283
  {
 
284
    Thread_Id_Map::iterator thread_id= m_thread_id.find(a_query.thread_id);
 
285
    if (m_thread_id.end() != thread_id)
 
286
    {
 
287
      return add(thread_id, a_query);
 
288
    }
 
289
    else
 
290
    {
 
291
      return add(a_query);
 
292
    }
 
293
  }
 
294
  void complete(Task *a_task)
 
295
  {
 
296
    Pool::complete(a_task);
 
297
  }
 
298
private:
 
299
  Id m_id;
 
300
  Thread_Id_Map m_thread_id;
 
301
  Task_Map      m_task;
 
302
};
 
303
 
 
304
class Split_By_Transaction : public Sheduler
 
305
{
 
306
private:
 
307
  //typedef Thread_Id                  Id;
 
308
  //typedef std::map<Thread_Id, Task*> Map;
 
309
  typedef std::deque<Task*> Queue;
 
310
public:
 
311
Split_By_Transaction(std::size_t a_buffer_size
 
312
                     , std::size_t a_worker_max_count) :
 
313
  Sheduler(a_worker_max_count)
 
314
    , m_manager(a_buffer_size)
 
315
    , m_worker_max_count(a_worker_max_count)
 
316
    , m_worker_count(0)
 
317
  {
 
318
  }
 
319
public:
 
320
  virtual void start()
 
321
  {
 
322
    Mutex::Lock lk(m_mutex);
 
323
    do_start();
 
324
    m_worker_count= 0;
 
325
  }
 
326
  virtual void stop()
 
327
  {
 
328
    Mutex::Lock lk(m_mutex);
 
329
    m_manager.stop();
 
330
    do_stop();
 
331
    m_event.notify_all();
 
332
  }
 
333
  virtual void flush()
 
334
  {
 
335
    Mutex::Lock lk(m_mutex);
 
336
    m_manager.flush();
 
337
    do_flush();
 
338
    m_event.notify_all();
 
339
  }
 
340
  virtual bool task_complete(Worker *a_worker, Task *a_task)
 
341
  {
 
342
    Mutex::Lock lk(m_mutex);
 
343
    --m_worker_count;
 
344
    m_manager.complete(a_task);
 
345
    process_task_from_queue();
 
346
    bool result= do_task_complete(a_worker);
 
347
    m_event.notify_all();
 
348
    return result;
 
349
  }
 
350
  virtual bool task_crash(Worker *a_worker, Task *a_task, Error const &a_error)
 
351
  {
 
352
    Mutex::Lock lk(m_mutex);
 
353
    has_troubles(a_error);
 
354
    --m_worker_count;
 
355
    m_manager.complete(a_task);
 
356
    process_task_from_queue();
 
357
    bool result= do_task_crash(a_worker);
 
358
    m_event.notify_all();
 
359
    return result;
 
360
  }
 
361
private:
 
362
  bool is_work()
 
363
  {
 
364
    return false == is_flush() && false == is_stop();
 
365
  }
 
366
  bool has_worker()
 
367
  {
 
368
    return m_worker_count < m_worker_max_count;
 
369
  }
 
370
  bool has_task()
 
371
  {
 
372
    return false == m_queue.empty();
 
373
  }
 
374
  bool has_free_task()
 
375
  {
 
376
    return m_worker_count * 2 > m_queue.size();
 
377
    }
 
378
  void process_task_from_queue()
 
379
  {
 
380
    while(has_worker() && has_task())
 
381
    {
 
382
      //std::cout << "ASSIGN TASK FROM QUEUE\n" << std::flush;
 
383
      Task *task= m_queue.front();
 
384
      m_queue.pop_front();
 
385
      task_start(task);
 
386
      ++m_worker_count;
 
387
    }
 
388
  }
 
389
  bool add_task(Mutex::Lock &a_lock, Task *a_task)
 
390
  {
 
391
    do
 
392
    {
 
393
      process_task_from_queue();
 
394
      if (has_worker())
 
395
      {
 
396
        //std::cout << "ASSIGN TASK TO FREE WORKER\n" << std::flush;
 
397
        task_start(a_task);
 
398
        ++m_worker_count;
 
399
        return true;
 
400
      }
 
401
      if (has_free_task())
 
402
      {
 
403
        //std::cout << "PUSH TASK TO QUEUE\n" << std::flush;
 
404
        m_queue.push_back(a_task);
 
405
        return true;
 
406
      }
 
407
      //std::cout << "WAIT FREE WORKER\n" << std::flush;
 
408
      m_event.wait(a_lock);
 
409
    }
 
410
    while(false == is_stop());
 
411
    return false;
 
412
  }
 
413
public:
 
414
  virtual bool run(Query const &a_query)
 
415
  {
 
416
    Mutex::Lock lk(m_mutex);
 
417
    if (false == is_work())
 
418
      return false;
 
419
    Task* task= m_manager.create(a_query);
 
420
    if (task)
 
421
      return add_task(lk, task);
 
422
    else
 
423
      return true;
 
424
  }
 
425
private:
 
426
  Condition_Variable m_event;
 
427
  Transaction_Manager m_manager;
 
428
  const std::size_t m_worker_max_count;
 
429
  //const std::size_t m_transaction_max_count;
 
430
  std::size_t m_worker_count;
 
431
  //Id    m_id;
 
432
  //Map   m_map;
 
433
  Queue m_queue;
 
434
};
 
435
 
 
436
Sheduler_Pointer create_split_by_connection (std::size_t a_buffer_size)
 
437
{
 
438
  return Sheduler_Pointer(new Split_By_Connection(a_buffer_size));
 
439
}
 
440
Sheduler_Pointer create_split_by_transaction(std::size_t a_buffer_size
 
441
                                             , std::size_t a_worker_count)
 
442
                                             //, std::size_t a_transaction_count)
 
443
{
 
444
  return Sheduler_Pointer(new Split_By_Transaction(a_buffer_size
 
445
                                                   , a_worker_count));
 
446
  //, a_transaction_count));
 
447
}