1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2006 MySQL AB
5
* Copyright (C) 2009 Sun Microsystems
7
* This program is free software; you can redistribute it and/or modify
8
* it under the terms of the GNU General Public License as published by
9
* the Free Software Foundation; version 2 of the License.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23
#include <plugin/pool_of_threads/pool_of_threads.h>
24
#include "drizzled/pthread_globals.h"
25
#include "drizzled/internal/my_pthread.h"
28
using namespace drizzled;
31
static PoolOfThreadsScheduler *scheduler= NULL;
34
* Set this to true to trigger killing of all threads in the pool
36
static volatile bool kill_pool_threads= false;
38
static volatile uint32_t created_threads= 0;
39
static int deinit(drizzled::plugin::Registry ®istry);
41
static struct event session_add_event;
42
static struct event session_kill_event;
45
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
46
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
49
static bool libevent_needs_immediate_processing(Session *session);
50
static void libevent_connection_close(Session *session);
51
void libevent_session_add(Session* session);
52
bool libevent_should_close_connection(Session* session);
54
void *libevent_thread_proc(void *arg);
55
void libevent_io_callback(int Fd, short Operation, void *ctx);
56
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
57
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
60
static uint32_t size= 0;
64
* Create a pipe and set to non-blocking.
66
* True if there is an error.
68
static bool init_pipe(int pipe_fds[])
71
return pipe(pipe_fds) < 0 ||
72
(flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
73
fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1 ||
74
(flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
75
fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
84
* This is called when data is ready on the socket.
87
* This is only called by the thread that owns LOCK_event_loop.
89
* We add the session that got the data to sessions_need_processing, and
90
* cause the libevent event_loop() to terminate. Then this same thread will
91
* return from event_loop and pick the session value back up for
94
void libevent_io_callback(int, short, void *ctx)
96
Session *session= reinterpret_cast<Session*>(ctx);
97
session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
99
PoolOfThreadsScheduler *pot_scheduler= static_cast<PoolOfThreadsScheduler *>(session->scheduler);
100
pot_scheduler->doIO(sched);
103
void PoolOfThreadsScheduler::doIO(session_scheduler *sched)
105
safe_mutex_assert_owner(&LOCK_event_loop);
106
sessions_waiting_for_io.erase(sched->session);
107
sessions_need_processing.push(sched->session);
111
* This is called when we have a thread we want to be killed.
114
* This is only called by the thread that owns LOCK_event_loop.
116
void libevent_kill_session_callback(int Fd, short, void *ctx)
118
PoolOfThreadsScheduler *pot_scheduler=
119
reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
121
pot_scheduler->killSession(Fd);
124
void PoolOfThreadsScheduler::killSession(int Fd)
126
safe_mutex_assert_owner(&LOCK_event_loop);
128
For pending events clearing
133
pthread_mutex_lock(&LOCK_session_kill);
134
while (! sessions_to_be_killed.empty())
138
Fetch a session from the queue
140
Session* session= sessions_to_be_killed.front();
141
pthread_mutex_unlock(&LOCK_session_kill);
143
session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
147
Delete from libevent and add to the processing queue.
149
event_del(&sched->io_event);
151
Remove from the sessions_waiting_for_io set
153
sessions_waiting_for_io.erase(session);
155
Push into the sessions_need_processing; the kill action will be
156
performed out of the event loop
158
sessions_need_processing.push(sched->session);
160
pthread_mutex_lock(&LOCK_session_kill);
162
Pop until this session is already processed
164
sessions_to_be_killed.pop();
168
Clear the pending events
169
One and only one charactor should be in the pipe
171
while (read(Fd, &c, sizeof(c)) == sizeof(c))
176
pthread_mutex_unlock(&LOCK_session_kill);
182
* This is used to add connections to the pool. This callback is invoked
183
* from the libevent event_loop() call whenever the session_add_pipe[1]
184
* pipe has a byte written to it.
187
* This is only called by the thread that owns LOCK_event_loop.
189
void libevent_add_session_callback(int Fd, short, void *ctx)
191
PoolOfThreadsScheduler *pot_scheduler=
192
reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
193
pot_scheduler->addSession(Fd);
196
void PoolOfThreadsScheduler::addSession(int Fd)
198
safe_mutex_assert_owner(&LOCK_event_loop);
200
For pending events clearing
205
pthread_mutex_lock(&LOCK_session_add);
206
while (! sessions_need_adding.empty())
209
Pop the first session off the queue
211
Session* session= sessions_need_adding.front();
212
pthread_mutex_unlock(&LOCK_session_add);
214
session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
218
if (!sched->logged_in || libevent_should_close_connection(session))
221
Add session to sessions_need_processing queue. If it needs closing
222
we'll close it outside of event_loop().
224
sessions_need_processing.push(sched->session);
228
/* Add to libevent */
229
if (event_add(&sched->io_event, NULL))
231
errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
232
libevent_connection_close(session);
236
sessions_waiting_for_io.insert(sched->session);
240
pthread_mutex_lock(&LOCK_session_add);
242
Pop until this session is already processed
244
sessions_need_adding.pop();
248
Clear the pending events
249
One and only one charactor should be in the pipe
251
while (read(Fd, &c, sizeof(c)) == sizeof(c))
256
pthread_mutex_unlock(&LOCK_session_add);
261
* Close and delete a connection.
263
static void libevent_connection_close(Session *session)
265
session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
267
session->killed= Session::KILL_CONNECTION; /* Avoid error messages */
269
if (session->client->getFileDescriptor() >= 0) /* not already closed */
271
session->disconnect(0, true);
273
sched->thread_detach();
276
session->scheduler_arg= NULL;
278
Session::unlink(session); /* locks LOCK_thread_count and deletes session */
286
* Checks if a session should be closed.
288
* @retval true this session should be closed.
289
* @retval false not to be closed.
291
bool libevent_should_close_connection(Session* session)
293
return session->client->haveError() ||
294
session->killed == Session::KILL_CONNECTION;
300
* libevent_thread_proc is the outer loop of each thread in the thread pool.
301
* These procs only return/terminate on shutdown (kill_pool_threads ==
304
void *libevent_thread_proc(void *ctx)
306
if (internal::my_thread_init())
308
internal::my_thread_global_end();
309
errmsg_printf(ERRMSG_LVL_ERROR, _("libevent_thread_proc: internal::my_thread_init() failed\n"));
313
PoolOfThreadsScheduler *pot_scheduler=
314
reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
315
return pot_scheduler->mainLoop();
318
void *PoolOfThreadsScheduler::mainLoop()
321
Signal libevent_init() when all threads has been created and are ready
324
(void) pthread_mutex_lock(&LOCK_thread_count);
326
if (created_threads == size)
327
(void) pthread_cond_signal(&COND_thread_count);
328
(void) pthread_mutex_unlock(&LOCK_thread_count);
332
Session *session= NULL;
333
(void) pthread_mutex_lock(&LOCK_event_loop);
335
/* get session(s) to process */
336
while (sessions_need_processing.empty())
338
if (kill_pool_threads)
340
/* the flag that we should die has been set */
341
(void) pthread_mutex_unlock(&LOCK_event_loop);
344
event_loop(EVLOOP_ONCE);
347
/* pop the first session off the queue */
348
session= sessions_need_processing.front();
349
sessions_need_processing.pop();
350
session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
352
(void) pthread_mutex_unlock(&LOCK_event_loop);
354
/* now we process the connection (session) */
356
/* set up the session<->thread links. */
357
session->thread_stack= (char*) &session;
359
if (sched->thread_attach())
361
libevent_connection_close(session);
365
/* is the connection logged in yet? */
366
if (!sched->logged_in)
368
if (session->authenticate())
370
/* Failed to log in */
371
libevent_connection_close(session);
376
/* login successful */
377
sched->logged_in= true;
378
session->prepareForQueries();
379
if (!libevent_needs_immediate_processing(session))
380
continue; /* New connection is now waiting for data in libevent*/
386
/* Process a query */
387
if (! session->executeStatement())
389
libevent_connection_close(session);
392
} while (libevent_needs_immediate_processing(session));
394
if (kill_pool_threads) /* the flag that we should die has been set */
399
(void) pthread_mutex_lock(&LOCK_thread_count);
401
pthread_cond_broadcast(&COND_thread_count);
402
(void) pthread_mutex_unlock(&LOCK_thread_count);
403
internal::my_thread_end();
406
return NULL; /* purify: deadcode */
412
* Checks if a session needs immediate processing
414
* @retval true the session needs immediate processing
415
* @retval false if not, and is detached from the thread waiting for another
416
* adding. The naming of the function is misleading in this case; it
417
* actually does more than just checking if immediate processing is needed.
419
static bool libevent_needs_immediate_processing(Session *session)
421
session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
423
if (libevent_should_close_connection(session))
425
libevent_connection_close(session);
429
If more data in the socket buffer, return true to process another command.
431
Note: we cannot add for event processing because the whole request
432
might already be buffered and we wouldn't receive an event. This is
433
indeed the root of the reason of low performace. Need to be changed
434
when nonblocking Protocol is finished.
436
if (session->client->haveMoreData())
439
sched->thread_detach();
440
libevent_session_add(session);
448
* Adds a Session to queued for libevent processing.
451
* This call does not actually register the event with libevent.
452
* Instead, it places the Session onto a queue and signals libevent by writing
453
* a byte into session_add_pipe, which will cause our libevent_add_session_callback to
454
* be invoked which will find the Session on the queue and add it to libevent.
456
void libevent_session_add(Session* session)
458
session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
460
PoolOfThreadsScheduler *pot_scheduler=
461
static_cast<PoolOfThreadsScheduler *>(session->scheduler);
462
pot_scheduler->sessionAddToQueue(sched);
465
void PoolOfThreadsScheduler::sessionAddToQueue(session_scheduler *sched)
468
pthread_mutex_lock(&LOCK_session_add);
469
if (sessions_need_adding.empty())
471
/* notify libevent */
472
size_t written= write(session_add_pipe[1], &c, sizeof(c));
473
assert(written == sizeof(c));
475
/* queue for libevent */
476
sessions_need_adding.push(sched->session);
477
pthread_mutex_unlock(&LOCK_session_add);
481
PoolOfThreadsScheduler::PoolOfThreadsScheduler(const char *name_arg)
482
: Scheduler(name_arg), sessions_need_adding(), sessions_to_be_killed(),
483
sessions_need_processing(), sessions_waiting_for_io()
485
struct sched_param tmp_sched_param;
487
memset(&tmp_sched_param, 0, sizeof(struct sched_param));
488
/* Setup attribute parameter for session threads. */
489
(void) pthread_attr_init(&attr);
490
(void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
491
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
493
tmp_sched_param.sched_priority= WAIT_PRIOR;
494
(void) pthread_attr_setschedparam(&attr, &tmp_sched_param);
496
pthread_mutex_init(&LOCK_session_add, NULL);
497
pthread_mutex_init(&LOCK_session_kill, NULL);
498
pthread_mutex_init(&LOCK_event_loop, NULL);
503
PoolOfThreadsScheduler::~PoolOfThreadsScheduler()
505
(void) pthread_mutex_lock(&LOCK_thread_count);
507
kill_pool_threads= true;
508
while (created_threads)
511
* Wake up the event loop
514
size_t written= write(session_add_pipe[1], &c, sizeof(c));
515
assert(written == sizeof(c));
517
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
519
(void) pthread_mutex_unlock(&LOCK_thread_count);
521
event_del(&session_add_event);
522
close(session_add_pipe[0]);
523
close(session_add_pipe[1]);
524
event_del(&session_kill_event);
525
close(session_kill_pipe[0]);
526
close(session_kill_pipe[1]);
528
(void) pthread_mutex_destroy(&LOCK_event_loop);
529
(void) pthread_mutex_destroy(&LOCK_session_add);
530
(void) pthread_mutex_destroy(&LOCK_session_kill);
531
(void) pthread_attr_destroy(&attr);
535
bool PoolOfThreadsScheduler::addSession(Session *session)
537
assert(session->scheduler_arg == NULL);
538
session_scheduler *sched= new session_scheduler(session);
543
session->scheduler_arg= (void *)sched;
545
libevent_session_add(session);
551
void PoolOfThreadsScheduler::killSession(Session *session)
555
pthread_mutex_lock(&LOCK_session_kill);
557
if (sessions_to_be_killed.empty())
560
Notify libevent with the killing event if this's the first killing
561
notification of the batch
563
size_t written= write(session_kill_pipe[1], &c, sizeof(c));
564
assert(written == sizeof(c));
568
Push into the sessions_to_be_killed queue
570
sessions_to_be_killed.push(session);
571
pthread_mutex_unlock(&LOCK_session_kill);
575
bool PoolOfThreadsScheduler::libevent_init(void)
582
/* Set up the pipe used to add new sessions to the event pool */
583
if (init_pipe(session_add_pipe))
585
errmsg_printf(ERRMSG_LVL_ERROR,
586
_("init_pipe(session_add_pipe) error in libevent_init\n"));
589
/* Set up the pipe used to kill sessions in the event queue */
590
if (init_pipe(session_kill_pipe))
592
errmsg_printf(ERRMSG_LVL_ERROR,
593
_("init_pipe(session_kill_pipe) error in libevent_init\n"));
594
close(session_add_pipe[0]);
595
close(session_add_pipe[1]);
598
event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
599
libevent_add_session_callback, this);
600
event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
601
libevent_kill_session_callback, this);
603
if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
605
errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
609
/* Set up the thread pool */
610
pthread_mutex_lock(&LOCK_thread_count);
612
for (x= 0; x < size; x++)
616
if ((error= pthread_create(&thread, &attr, libevent_thread_proc, this)))
618
errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
620
pthread_mutex_unlock(&LOCK_thread_count);
625
/* Wait until all threads are created */
626
while (created_threads != size)
627
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
628
pthread_mutex_unlock(&LOCK_thread_count);
636
* Called to initialize the pool of threads scheduler plugin
638
* @param[in] registry holding the record of the plugins
640
static int init(drizzled::plugin::Registry ®istry)
644
scheduler= new PoolOfThreadsScheduler("pool_of_threads");
645
registry.add(scheduler);
652
* Waits until all pool threads have been deleted for clean shutdown
654
static int deinit(drizzled::plugin::Registry ®istry)
656
registry.remove(scheduler);
663
The defaults here were picked based on what I see (aka Brian). They should
664
be vetted across a larger audience.
666
static DRIZZLE_SYSVAR_UINT(size, size,
669
NULL, NULL, 8, 1, 1024, 0);
671
static drizzle_sys_var* sys_variables[]= {
672
DRIZZLE_SYSVAR(size),
676
DRIZZLE_DECLARE_PLUGIN
682
"Pool of Threads Scheduler",
684
init, /* Plugin Init */
685
deinit, /* Plugin Deinit */
686
sys_variables, /* system variables */
687
NULL /* config options */
689
DRIZZLE_DECLARE_PLUGIN_END;