9
typedef Query_Queue Task;
10
class Task_Manager : None_Copyable
13
typedef Memory_Pool<Task> Pool;
14
typedef std::deque<Task*> Queue;
16
Task_Manager(std::size_t a_buffer_size) :
17
m_buffer_size(a_buffer_size)
27
Task* result= new Task(m_buffer_size);
33
Task* result= m_free.front();
39
void complete(Task* a_task)
41
if (m_free.size() < max_idle_thread_count())
43
m_free.push_back(a_task);
52
for(Pool::Set::iterator i= m_pool.set().begin(), e= m_pool.set().end();
60
for(Pool::Set::iterator i= m_pool.set().begin(), e= m_pool.set().end();
67
std::size_t m_buffer_size;
72
class Split_By_Connection : public Sheduler
75
typedef std::map<Thread_Id, Task*> Map;
76
typedef std::map<Task*, Thread_Id> Id;
77
typedef Memory_Pool<Task> Pool;
79
Split_By_Connection(std::size_t a_buffer_size) :
80
Sheduler(::max_idle_thread_count())
81
, m_manager(a_buffer_size)
87
Mutex::Lock lk(m_mutex);
93
Mutex::Lock lk(m_mutex);
97
virtual bool task_complete(Worker *a_worker, Task *a_task)
99
Mutex::Lock lk(m_mutex);
100
task_complete(a_task);
101
return do_task_complete(a_worker);
103
virtual bool task_crash(Worker *a_worker, Task *a_task, Error const &a_error)
105
Mutex::Lock lk(m_mutex);
106
task_complete(a_task);
107
has_troubles(a_error);
108
return do_task_crash(a_worker);
111
Map::iterator task(Thread_Id a_thread_id)
113
Map::iterator result= m_task.find(a_thread_id);
114
ASSERT((false == active(result)) ||
115
(a_thread_id == m_id.find(task(result))->second));
118
bool active(Map::iterator i)
120
return m_task.end() != i;
122
Task* task(Map::iterator i)
124
ASSERT(active(i) && (m_id.end() != m_id.find(i->second)));
127
Map::iterator create_task(Thread_Id a_thread_id)
129
std::pair< Thread_Id, Task* > task;
130
std::pair< Task*, Thread_Id > id;
132
task.first= id.second= a_thread_id;
133
id.first= task.second= m_manager.create();
135
std::pair< Map::iterator, bool > task_result;
136
std::pair< Id::iterator, bool > id_result;
138
task_result= m_task.insert(task);
139
ASSERT(true == task_result.second);
141
id_result= m_id.insert(id);
142
ASSERT(true == id_result.second);
144
ASSERT(task_result.first->first == id_result.first->second);
146
return task_result.first;
148
void task_complete(Task * a_task)
150
Id::iterator id= m_id.find(a_task);
151
ASSERT(m_id.end() != id);
152
Map::iterator task= m_task.find(id->second);
153
ASSERT(m_task.end() != task);
156
m_manager.complete(a_task);
159
virtual bool run(Query const &a_query)
161
bool is_quit= ::is_quit(a_query);
162
Thread_Id thread_id= a_query.thread_id;
164
Mutex::Lock lk(m_mutex);
166
if (is_stop() || is_flush())
169
Map::iterator i= task(thread_id);
171
bool is_active= active(i);
189
i= create_task(thread_id);
193
task(i)->push(a_query);
202
Task_Manager m_manager;
207
class Transaction_Manager : public Task_Manager
210
typedef Task_Manager Pool;
211
typedef Thread_Id Id;
212
typedef std::map<Thread_Id, Id> Thread_Id_Map;
213
typedef std::map<Id, Task*> Task_Map;
215
Transaction_Manager(std::size_t a_buffer_size) :
216
Task_Manager(a_buffer_size), m_id(0)
220
Task* add(Thread_Id_Map::iterator a_thread_id, Query const &a_query)
222
ASSERT(a_thread_id == m_thread_id.find(a_query.thread_id));
223
ASSERT(a_thread_id != m_thread_id.end());
224
ASSERT(a_thread_id->first == a_query.thread_id);
226
Task_Map::iterator task= m_task.find(a_thread_id->second);
227
ASSERT(m_task.end() != task);
228
Task* result= task->second;
229
result->push(a_query);
230
if (::is_end(a_query) || ::is_quit(a_query))
233
m_thread_id.erase(a_thread_id);
242
Task* add(Query const &a_query)
244
ASSERT(m_thread_id.find(a_query.thread_id) == m_thread_id.end());
246
if (::is_begin(a_query))
248
std::pair< Thread_Id, Id > id;
249
std::pair< Id, Task* > task;
250
std::pair< Thread_Id_Map::iterator, bool > id_result;
251
std::pair< Task_Map::iterator,bool > task_result;
253
id.first= a_query.thread_id;
256
id_result= m_thread_id.insert(id);
257
ASSERT(id_result.second);
259
task.first= (*(id_result.first)).second;
260
task.second= Pool::create();
262
task_result= m_task.insert(task);
263
ASSERT(task_result.second);
265
(*(task_result.first)).second->push(a_query);
269
else if (::is_quit(a_query) || ::is_end(a_query))
275
Task* task= Pool::create();
282
Task* create(Query const& a_query)
284
Thread_Id_Map::iterator thread_id= m_thread_id.find(a_query.thread_id);
285
if (m_thread_id.end() != thread_id)
287
return add(thread_id, a_query);
294
void complete(Task *a_task)
296
Pool::complete(a_task);
300
Thread_Id_Map m_thread_id;
304
class Split_By_Transaction : public Sheduler
307
//typedef Thread_Id Id;
308
//typedef std::map<Thread_Id, Task*> Map;
309
typedef std::deque<Task*> Queue;
311
Split_By_Transaction(std::size_t a_buffer_size
312
, std::size_t a_worker_max_count) :
313
Sheduler(a_worker_max_count)
314
, m_manager(a_buffer_size)
315
, m_worker_max_count(a_worker_max_count)
322
Mutex::Lock lk(m_mutex);
328
Mutex::Lock lk(m_mutex);
331
m_event.notify_all();
335
Mutex::Lock lk(m_mutex);
338
m_event.notify_all();
340
virtual bool task_complete(Worker *a_worker, Task *a_task)
342
Mutex::Lock lk(m_mutex);
344
m_manager.complete(a_task);
345
process_task_from_queue();
346
bool result= do_task_complete(a_worker);
347
m_event.notify_all();
350
virtual bool task_crash(Worker *a_worker, Task *a_task, Error const &a_error)
352
Mutex::Lock lk(m_mutex);
353
has_troubles(a_error);
355
m_manager.complete(a_task);
356
process_task_from_queue();
357
bool result= do_task_crash(a_worker);
358
m_event.notify_all();
364
return false == is_flush() && false == is_stop();
368
return m_worker_count < m_worker_max_count;
372
return false == m_queue.empty();
376
return m_worker_count * 2 > m_queue.size();
378
void process_task_from_queue()
380
while(has_worker() && has_task())
382
//std::cout << "ASSIGN TASK FROM QUEUE\n" << std::flush;
383
Task *task= m_queue.front();
389
bool add_task(Mutex::Lock &a_lock, Task *a_task)
393
process_task_from_queue();
396
//std::cout << "ASSIGN TASK TO FREE WORKER\n" << std::flush;
403
//std::cout << "PUSH TASK TO QUEUE\n" << std::flush;
404
m_queue.push_back(a_task);
407
//std::cout << "WAIT FREE WORKER\n" << std::flush;
408
m_event.wait(a_lock);
410
while(false == is_stop());
414
virtual bool run(Query const &a_query)
416
Mutex::Lock lk(m_mutex);
417
if (false == is_work())
419
Task* task= m_manager.create(a_query);
421
return add_task(lk, task);
426
Condition_Variable m_event;
427
Transaction_Manager m_manager;
428
const std::size_t m_worker_max_count;
429
//const std::size_t m_transaction_max_count;
430
std::size_t m_worker_count;
436
Sheduler_Pointer create_split_by_connection (std::size_t a_buffer_size)
438
return Sheduler_Pointer(new Split_By_Connection(a_buffer_size));
440
Sheduler_Pointer create_split_by_transaction(std::size_t a_buffer_size
441
, std::size_t a_worker_count)
442
//, std::size_t a_transaction_count)
444
return Sheduler_Pointer(new Split_By_Transaction(a_buffer_size
446
//, a_transaction_count));