1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2006 MySQL AB
5
* Copyright (C) 2009 Sun Microsystems
7
* This program is free software; you can redistribute it and/or modify
8
* it under the terms of the GNU General Public License as published by
9
* the Free Software Foundation; version 2 of the License.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23
#include <plugin/pool_of_threads/pool_of_threads.h>
24
#include "drizzled/pthread_globals.h"
25
#include "drizzled/internal/my_pthread.h"
26
#include <boost/program_options.hpp>
27
#include <drizzled/module/option_map.h>
29
#include <boost/thread/thread.hpp>
30
#include <boost/bind.hpp>
32
namespace po= boost::program_options;
34
using namespace drizzled;
39
* Set this to true to trigger killing of all threads in the pool
41
static volatile bool kill_pool_threads= false;
43
static volatile uint32_t created_threads= 0;
45
static struct event session_add_event;
46
static struct event session_kill_event;
49
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
50
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
53
static bool libevent_needs_immediate_processing(Session *session);
54
static void libevent_connection_close(Session *session);
55
void libevent_session_add(Session* session);
56
bool libevent_should_close_connection(Session* session);
57
void libevent_thread_proc(PoolOfThreadsScheduler *pot_scheduler);
60
void *libevent_thread_proc(void *arg);
62
void libevent_io_callback(int Fd, short Operation, void *ctx);
63
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
64
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
67
static uint32_t pool_size;
71
* Create a pipe and set to non-blocking.
73
* True if there is an error.
75
static bool init_pipe(int pipe_fds[])
78
return pipe(pipe_fds) < 0 ||
79
(flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
80
fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1 ||
81
(flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
82
fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
91
* This is called when data is ready on the socket.
94
* This is only called by the thread that owns LOCK_event_loop.
96
* We add the session that got the data to sessions_need_processing, and
97
* cause the libevent event_loop() to terminate. Then this same thread will
98
* return from event_loop and pick the session value back up for
101
void libevent_io_callback(int, short, void *ctx)
103
Session *session= reinterpret_cast<Session*>(ctx);
104
session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
106
PoolOfThreadsScheduler *pot_scheduler= static_cast<PoolOfThreadsScheduler *>(session->scheduler);
107
pot_scheduler->doIO(sched);
110
void PoolOfThreadsScheduler::doIO(session_scheduler *sched)
112
safe_mutex_assert_owner(&LOCK_event_loop);
113
sessions_waiting_for_io.erase(sched->session);
114
sessions_need_processing.push(sched->session);
118
* This is called when we have a thread we want to be killed.
121
* This is only called by the thread that owns LOCK_event_loop.
123
void libevent_kill_session_callback(int Fd, short, void *ctx)
125
PoolOfThreadsScheduler *pot_scheduler=
126
reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
128
pot_scheduler->killSession(Fd);
131
void PoolOfThreadsScheduler::killSession(int Fd)
133
safe_mutex_assert_owner(&LOCK_event_loop);
135
For pending events clearing
140
LOCK_session_kill.lock();
141
while (! sessions_to_be_killed.empty())
145
Fetch a session from the queue
147
Session* session= sessions_to_be_killed.front();
148
LOCK_session_kill.unlock();
150
session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
154
Delete from libevent and add to the processing queue.
156
event_del(&sched->io_event);
158
Remove from the sessions_waiting_for_io set
160
sessions_waiting_for_io.erase(session);
162
Push into the sessions_need_processing; the kill action will be
163
performed out of the event loop
165
sessions_need_processing.push(sched->session);
167
LOCK_session_kill.lock();
169
Pop until this session is already processed
171
sessions_to_be_killed.pop();
175
Clear the pending events
176
One and only one charactor should be in the pipe
178
while (read(Fd, &c, sizeof(c)) == sizeof(c))
183
LOCK_session_kill.unlock();
189
* This is used to add connections to the pool. This callback is invoked
190
* from the libevent event_loop() call whenever the session_add_pipe[1]
191
* pipe has a byte written to it.
194
* This is only called by the thread that owns LOCK_event_loop.
196
void libevent_add_session_callback(int Fd, short, void *ctx)
198
PoolOfThreadsScheduler *pot_scheduler=
199
reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
200
pot_scheduler->addSession(Fd);
203
void PoolOfThreadsScheduler::addSession(int Fd)
205
safe_mutex_assert_owner(&LOCK_event_loop);
207
For pending events clearing
212
LOCK_session_add.lock();
213
while (! sessions_need_adding.empty())
216
Pop the first session off the queue
218
Session* session= sessions_need_adding.front();
219
LOCK_session_add.unlock();
221
session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
225
if (!sched->logged_in || libevent_should_close_connection(session))
228
Add session to sessions_need_processing queue. If it needs closing
229
we'll close it outside of event_loop().
231
sessions_need_processing.push(sched->session);
235
/* Add to libevent */
236
if (event_add(&sched->io_event, NULL))
238
errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
239
libevent_connection_close(session);
243
sessions_waiting_for_io.insert(sched->session);
247
LOCK_session_add.lock();
249
Pop until this session is already processed
251
sessions_need_adding.pop();
255
Clear the pending events
256
One and only one charactor should be in the pipe
258
while (read(Fd, &c, sizeof(c)) == sizeof(c))
263
LOCK_session_add.unlock();
268
* Close and delete a connection.
270
static void libevent_connection_close(Session *session)
272
session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
274
session->killed= Session::KILL_CONNECTION; /* Avoid error messages */
276
if (session->client->getFileDescriptor() >= 0) /* not already closed */
278
session->disconnect(0, true);
280
sched->thread_detach();
283
session->scheduler_arg= NULL;
285
Session::unlink(session); /* locks LOCK_thread_count and deletes session */
293
* Checks if a session should be closed.
295
* @retval true this session should be closed.
296
* @retval false not to be closed.
298
bool libevent_should_close_connection(Session* session)
300
return session->client->haveError() ||
301
session->killed == Session::KILL_CONNECTION;
307
* libevent_thread_proc is the outer loop of each thread in the thread pool.
308
* These procs only return/terminate on shutdown (kill_pool_threads ==
311
void libevent_thread_proc(PoolOfThreadsScheduler *pot_scheduler)
313
if (internal::my_thread_init())
315
internal::my_thread_global_end();
316
errmsg_printf(ERRMSG_LVL_ERROR, _("libevent_thread_proc: internal::my_thread_init() failed\n"));
320
(void)pot_scheduler->mainLoop();
323
void *PoolOfThreadsScheduler::mainLoop()
326
Signal libevent_init() when all threads has been created and are ready
329
(void) LOCK_thread_count.lock();
331
if (created_threads == pool_size)
332
COND_thread_count.notify_one();
334
(void) LOCK_thread_count.unlock();
338
Session *session= NULL;
339
LOCK_event_loop.lock();
341
/* get session(s) to process */
342
while (sessions_need_processing.empty())
344
if (kill_pool_threads)
346
/* the flag that we should die has been set */
347
LOCK_event_loop.unlock();
350
event_loop(EVLOOP_ONCE);
353
/* pop the first session off the queue */
354
session= sessions_need_processing.front();
355
sessions_need_processing.pop();
356
session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
358
LOCK_event_loop.lock();
360
/* now we process the connection (session) */
362
/* set up the session<->thread links. */
363
session->thread_stack= (char*) &session;
365
if (sched->thread_attach())
367
libevent_connection_close(session);
371
/* is the connection logged in yet? */
372
if (!sched->logged_in)
374
if (session->authenticate())
376
/* Failed to log in */
377
libevent_connection_close(session);
382
/* login successful */
383
sched->logged_in= true;
384
session->prepareForQueries();
385
if (!libevent_needs_immediate_processing(session))
386
continue; /* New connection is now waiting for data in libevent*/
392
/* Process a query */
393
if (! session->executeStatement())
395
libevent_connection_close(session);
398
} while (libevent_needs_immediate_processing(session));
400
if (kill_pool_threads) /* the flag that we should die has been set */
405
(void) LOCK_thread_count.lock();
407
COND_thread_count.notify_all();
408
(void) LOCK_thread_count.unlock();
409
internal::my_thread_end();
412
return NULL; /* purify: deadcode */
418
* Checks if a session needs immediate processing
420
* @retval true the session needs immediate processing
421
* @retval false if not, and is detached from the thread waiting for another
422
* adding. The naming of the function is misleading in this case; it
423
* actually does more than just checking if immediate processing is needed.
425
static bool libevent_needs_immediate_processing(Session *session)
427
session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
429
if (libevent_should_close_connection(session))
431
libevent_connection_close(session);
435
If more data in the socket buffer, return true to process another command.
437
Note: we cannot add for event processing because the whole request
438
might already be buffered and we wouldn't receive an event. This is
439
indeed the root of the reason of low performace. Need to be changed
440
when nonblocking Protocol is finished.
442
if (session->client->haveMoreData())
445
sched->thread_detach();
446
libevent_session_add(session);
454
* Adds a Session to queued for libevent processing.
457
* This call does not actually register the event with libevent.
458
* Instead, it places the Session onto a queue and signals libevent by writing
459
* a byte into session_add_pipe, which will cause our libevent_add_session_callback to
460
* be invoked which will find the Session on the queue and add it to libevent.
462
void libevent_session_add(Session* session)
464
session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
466
PoolOfThreadsScheduler *pot_scheduler=
467
static_cast<PoolOfThreadsScheduler *>(session->scheduler);
468
pot_scheduler->sessionAddToQueue(sched);
471
void PoolOfThreadsScheduler::sessionAddToQueue(session_scheduler *sched)
474
boost::mutex::scoped_lock scopedLock(LOCK_session_add);
475
if (sessions_need_adding.empty())
477
/* notify libevent */
478
size_t written= write(session_add_pipe[1], &c, sizeof(c));
479
assert(written == sizeof(c));
481
/* queue for libevent */
482
sessions_need_adding.push(sched->session);
486
PoolOfThreadsScheduler::PoolOfThreadsScheduler(const char *name_arg)
487
: Scheduler(name_arg), sessions_need_adding(), sessions_to_be_killed(),
488
sessions_need_processing(), sessions_waiting_for_io()
490
/* Setup attribute parameter for session threads. */
491
(void) pthread_attr_init(&attr);
492
(void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
493
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
497
PoolOfThreadsScheduler::~PoolOfThreadsScheduler()
499
(void) LOCK_thread_count.lock();
501
kill_pool_threads= true;
502
while (created_threads)
505
* Wake up the event loop
508
size_t written= write(session_add_pipe[1], &c, sizeof(c));
509
assert(written == sizeof(c));
511
pthread_cond_wait(COND_thread_count.native_handle(), LOCK_thread_count.native_handle());
513
(void) LOCK_thread_count.unlock();
515
event_del(&session_add_event);
516
close(session_add_pipe[0]);
517
close(session_add_pipe[1]);
518
event_del(&session_kill_event);
519
close(session_kill_pipe[0]);
520
close(session_kill_pipe[1]);
522
(void) pthread_attr_destroy(&attr);
526
bool PoolOfThreadsScheduler::addSession(Session *session)
528
assert(session->scheduler_arg == NULL);
529
session_scheduler *sched= new session_scheduler(session);
534
session->scheduler_arg= (void *)sched;
536
libevent_session_add(session);
542
void PoolOfThreadsScheduler::killSession(Session *session)
546
boost::mutex::scoped_lock scopedLock(LOCK_session_kill);
548
if (sessions_to_be_killed.empty())
551
Notify libevent with the killing event if this's the first killing
552
notification of the batch
554
size_t written= write(session_kill_pipe[1], &c, sizeof(c));
555
assert(written == sizeof(c));
559
Push into the sessions_to_be_killed queue
561
sessions_to_be_killed.push(session);
565
bool PoolOfThreadsScheduler::libevent_init(void)
570
/* Set up the pipe used to add new sessions to the event pool */
571
if (init_pipe(session_add_pipe))
573
errmsg_printf(ERRMSG_LVL_ERROR,
574
_("init_pipe(session_add_pipe) error in libevent_init\n"));
577
/* Set up the pipe used to kill sessions in the event queue */
578
if (init_pipe(session_kill_pipe))
580
errmsg_printf(ERRMSG_LVL_ERROR,
581
_("init_pipe(session_kill_pipe) error in libevent_init\n"));
582
close(session_add_pipe[0]);
583
close(session_add_pipe[1]);
586
event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
587
libevent_add_session_callback, this);
588
event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
589
libevent_kill_session_callback, this);
591
if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
593
errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
597
/* Set up the thread pool */
598
boost::mutex::scoped_lock scopedLock(LOCK_thread_count);
600
for (uint32_t x= 0; x < pool_size; x++)
602
if (not new boost::thread(boost::bind(libevent_thread_proc, this)))
604
errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"), 1);
609
/* Wait until all threads are created */
610
while (created_threads != pool_size)
612
COND_thread_count.wait(scopedLock);
621
* Called to initialize the pool of threads scheduler plugin
623
* @param[in] registry holding the record of the plugins
625
static int init(drizzled::module::Context &context)
627
const module::option_map &vm= context.getOptions();
629
if (vm.count("size"))
631
if (pool_size > 1024 || pool_size < 1)
633
errmsg_printf(ERRMSG_LVL_ERROR, _("Invalid value for size\n"));
638
assert(pool_size != 0);
640
context.add(new PoolOfThreadsScheduler("pool_of_threads"));
646
The defaults here were picked based on what I see (aka Brian). They should
647
be vetted across a larger audience.
649
static DRIZZLE_SYSVAR_UINT(size, pool_size,
652
NULL, NULL, 8, 1, 1024, 0);
654
static void init_options(drizzled::module::option_context &context)
657
po::value<uint32_t>(&pool_size)->default_value(8),
658
N_("Size of Pool."));
661
static drizzle_sys_var* sys_variables[]= {
662
DRIZZLE_SYSVAR(size),
666
DRIZZLE_DECLARE_PLUGIN
672
"Pool of Threads Scheduler",
674
init, /* Plugin Init */
675
sys_variables, /* system variables */
676
init_options /* config options */
678
DRIZZLE_DECLARE_PLUGIN_END;