~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/pool_of_threads/pool_of_threads.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-03-18 12:12:31 UTC
  • Revision ID: james.westby@ubuntu.com-20100318121231-k6g1xe6cshbwa0f8
Tags: upstream-2010.03.1347
ImportĀ upstreamĀ versionĀ 2010.03.1347

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 * Copyright (C) 2006 MySQL AB
 
5
 * Copyright (C) 2009 Sun Microsystems
 
6
 *
 
7
 * This program is free software; you can redistribute it and/or modify
 
8
 * it under the terms of the GNU General Public License as published by
 
9
 * the Free Software Foundation; version 2 of the License.
 
10
 *
 
11
 * This program is distributed in the hope that it will be useful,
 
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 * GNU General Public License for more details.
 
15
 *
 
16
 * You should have received a copy of the GNU General Public License
 
17
 * along with this program; if not, write to the Free Software
 
18
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
 
19
 */
 
20
 
 
21
#include "config.h"
 
22
#include <fcntl.h>
 
23
#include <plugin/pool_of_threads/pool_of_threads.h>
 
24
#include "drizzled/pthread_globals.h"
 
25
#include "drizzled/internal/my_pthread.h"
 
26
 
 
27
using namespace std;
 
28
using namespace drizzled;
 
29
 
 
30
/* Global's (TBR) */
 
31
static PoolOfThreadsScheduler *scheduler= NULL;
 
32
 
 
33
/**
 
34
 * Set this to true to trigger killing of all threads in the pool
 
35
 */
 
36
static volatile bool kill_pool_threads= false;
 
37
 
 
38
static volatile uint32_t created_threads= 0;
 
39
static int deinit(drizzled::plugin::Registry &registry);
 
40
 
 
41
static struct event session_add_event;
 
42
static struct event session_kill_event;
 
43
 
 
44
 
 
45
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
 
46
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
 
47
 
 
48
 
 
49
static bool libevent_needs_immediate_processing(Session *session);
 
50
static void libevent_connection_close(Session *session);
 
51
void libevent_session_add(Session* session);
 
52
bool libevent_should_close_connection(Session* session);
 
53
extern "C" {
 
54
  void *libevent_thread_proc(void *arg);
 
55
  void libevent_io_callback(int Fd, short Operation, void *ctx);
 
56
  void libevent_add_session_callback(int Fd, short Operation, void *ctx);
 
57
  void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
 
58
}
 
59
 
 
60
static uint32_t size= 0;
 
61
 
 
62
/**
 
63
 * @brief 
 
64
 *  Create a pipe and set to non-blocking. 
 
65
 * @return 
 
66
 *  True if there is an error.
 
67
 */
 
68
static bool init_pipe(int pipe_fds[])
 
69
{
 
70
  int flags;
 
71
  return pipe(pipe_fds) < 0 ||
 
72
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
 
73
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1 ||
 
74
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
 
75
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
 
76
}
 
77
 
 
78
 
 
79
 
 
80
 
 
81
 
 
82
/**
 
83
 * @brief
 
84
 *  This is called when data is ready on the socket.
 
85
 *
 
86
 * @details
 
87
 *  This is only called by the thread that owns LOCK_event_loop.
 
88
 *
 
89
 *  We add the session that got the data to sessions_need_processing, and
 
90
 *  cause the libevent event_loop() to terminate. Then this same thread will
 
91
 *  return from event_loop and pick the session value back up for
 
92
 *  processing.
 
93
 */
 
94
void libevent_io_callback(int, short, void *ctx)
 
95
{
 
96
  Session *session= reinterpret_cast<Session*>(ctx);
 
97
  session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
 
98
  assert(sched);
 
99
  PoolOfThreadsScheduler *pot_scheduler= static_cast<PoolOfThreadsScheduler *>(session->scheduler);
 
100
  pot_scheduler->doIO(sched);
 
101
}
 
102
 
 
103
void PoolOfThreadsScheduler::doIO(session_scheduler *sched)
 
104
{
 
105
  safe_mutex_assert_owner(&LOCK_event_loop);
 
106
  sessions_waiting_for_io.erase(sched->session);
 
107
  sessions_need_processing.push(sched->session);
 
108
}
 
109
/**
 
110
 * @brief
 
111
 *  This is called when we have a thread we want to be killed.
 
112
 *
 
113
 * @details
 
114
 *  This is only called by the thread that owns LOCK_event_loop.
 
115
 */
 
116
void libevent_kill_session_callback(int Fd, short, void *ctx)
 
117
{
 
118
  PoolOfThreadsScheduler *pot_scheduler=
 
119
    reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
 
120
 
 
121
  pot_scheduler->killSession(Fd);
 
122
}
 
123
 
 
124
void PoolOfThreadsScheduler::killSession(int Fd)
 
125
{
 
126
  safe_mutex_assert_owner(&LOCK_event_loop);
 
127
  /*
 
128
   For pending events clearing
 
129
  */
 
130
  char c;
 
131
  int count= 0;
 
132
 
 
133
  pthread_mutex_lock(&LOCK_session_kill);
 
134
  while (! sessions_to_be_killed.empty())
 
135
  {
 
136
 
 
137
    /*
 
138
     Fetch a session from the queue
 
139
    */
 
140
    Session* session= sessions_to_be_killed.front();
 
141
    pthread_mutex_unlock(&LOCK_session_kill);
 
142
 
 
143
    session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
 
144
    assert(sched);
 
145
 
 
146
    /*
 
147
     Delete from libevent and add to the processing queue.
 
148
    */
 
149
    event_del(&sched->io_event);
 
150
    /*
 
151
     Remove from the sessions_waiting_for_io set
 
152
    */
 
153
    sessions_waiting_for_io.erase(session);
 
154
    /*
 
155
     Push into the sessions_need_processing; the kill action will be
 
156
     performed out of the event loop
 
157
    */
 
158
    sessions_need_processing.push(sched->session);
 
159
 
 
160
    pthread_mutex_lock(&LOCK_session_kill);
 
161
    /*
 
162
     Pop until this session is already processed
 
163
    */
 
164
    sessions_to_be_killed.pop();
 
165
  }
 
166
  
 
167
  /*
 
168
   Clear the pending events 
 
169
   One and only one charactor should be in the pipe
 
170
  */
 
171
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
 
172
  {
 
173
    count++;
 
174
  }
 
175
  assert(count == 1);
 
176
  pthread_mutex_unlock(&LOCK_session_kill);
 
177
}
 
178
 
 
179
 
 
180
/**
 
181
 * @brief
 
182
 *  This is used to add connections to the pool. This callback is invoked
 
183
 *  from the libevent event_loop() call whenever the session_add_pipe[1]
 
184
 *  pipe has a byte written to it.
 
185
 *
 
186
 * @details
 
187
 *  This is only called by the thread that owns LOCK_event_loop.
 
188
 */
 
189
void libevent_add_session_callback(int Fd, short, void *ctx)
 
190
{
 
191
  PoolOfThreadsScheduler *pot_scheduler=
 
192
    reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
 
193
  pot_scheduler->addSession(Fd);
 
194
}
 
195
 
 
196
void PoolOfThreadsScheduler::addSession(int Fd)
 
197
{
 
198
  safe_mutex_assert_owner(&LOCK_event_loop);
 
199
  /*
 
200
   For pending events clearing
 
201
  */
 
202
  char c;
 
203
  int count= 0;
 
204
 
 
205
  pthread_mutex_lock(&LOCK_session_add);
 
206
  while (! sessions_need_adding.empty())
 
207
  {
 
208
    /*
 
209
     Pop the first session off the queue 
 
210
    */
 
211
    Session* session= sessions_need_adding.front();
 
212
    pthread_mutex_unlock(&LOCK_session_add);
 
213
 
 
214
    session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
 
215
    assert(sched);
 
216
 
 
217
 
 
218
    if (!sched->logged_in || libevent_should_close_connection(session))
 
219
    {
 
220
      /*
 
221
       Add session to sessions_need_processing queue. If it needs closing
 
222
       we'll close it outside of event_loop().
 
223
      */
 
224
      sessions_need_processing.push(sched->session);
 
225
    }
 
226
    else
 
227
    {
 
228
      /* Add to libevent */
 
229
      if (event_add(&sched->io_event, NULL))
 
230
      {
 
231
        errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
 
232
        libevent_connection_close(session);
 
233
      }
 
234
      else
 
235
      {
 
236
        sessions_waiting_for_io.insert(sched->session);
 
237
      }
 
238
    }
 
239
 
 
240
    pthread_mutex_lock(&LOCK_session_add);
 
241
    /*
 
242
     Pop until this session is already processed
 
243
    */
 
244
    sessions_need_adding.pop();
 
245
  }
 
246
 
 
247
  /*
 
248
   Clear the pending events 
 
249
   One and only one charactor should be in the pipe
 
250
  */
 
251
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
 
252
  {
 
253
    count++;
 
254
  }
 
255
  assert(count == 1);
 
256
  pthread_mutex_unlock(&LOCK_session_add);
 
257
}
 
258
 
 
259
/**
 
260
 * @brief 
 
261
 *  Close and delete a connection.
 
262
 */
 
263
static void libevent_connection_close(Session *session)
 
264
{
 
265
  session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
 
266
  assert(sched);
 
267
  session->killed= Session::KILL_CONNECTION;    /* Avoid error messages */
 
268
 
 
269
  if (session->client->getFileDescriptor() >= 0) /* not already closed */
 
270
  {
 
271
    session->disconnect(0, true);
 
272
  }
 
273
  sched->thread_detach();
 
274
  
 
275
  delete sched;
 
276
  session->scheduler_arg= NULL;
 
277
 
 
278
  Session::unlink(session);   /* locks LOCK_thread_count and deletes session */
 
279
 
 
280
  return;
 
281
}
 
282
 
 
283
 
 
284
/**
 
285
 * @brief 
 
286
 *  Checks if a session should be closed.
 
287
 *  
 
288
 * @retval true this session should be closed.  
 
289
 * @retval false not to be closed.
 
290
 */
 
291
bool libevent_should_close_connection(Session* session)
 
292
{
 
293
  return session->client->haveError() ||
 
294
         session->killed == Session::KILL_CONNECTION;
 
295
}
 
296
 
 
297
 
 
298
/**
 
299
 * @brief
 
300
 *  libevent_thread_proc is the outer loop of each thread in the thread pool.
 
301
 *  These procs only return/terminate on shutdown (kill_pool_threads ==
 
302
 *  true).
 
303
 */
 
304
void *libevent_thread_proc(void *ctx)
 
305
{
 
306
  if (internal::my_thread_init())
 
307
  {
 
308
    internal::my_thread_global_end();
 
309
    errmsg_printf(ERRMSG_LVL_ERROR, _("libevent_thread_proc: internal::my_thread_init() failed\n"));
 
310
    exit(1);
 
311
  }
 
312
 
 
313
  PoolOfThreadsScheduler *pot_scheduler=
 
314
    reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
 
315
  return pot_scheduler->mainLoop();
 
316
}
 
317
 
 
318
void *PoolOfThreadsScheduler::mainLoop()
 
319
{
 
320
  /*
 
321
   Signal libevent_init() when all threads has been created and are ready
 
322
   to receive events.
 
323
  */
 
324
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
325
  created_threads++;
 
326
  if (created_threads == size)
 
327
    (void) pthread_cond_signal(&COND_thread_count);
 
328
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
329
 
 
330
  for (;;)
 
331
  {
 
332
    Session *session= NULL;
 
333
    (void) pthread_mutex_lock(&LOCK_event_loop);
 
334
 
 
335
    /* get session(s) to process */
 
336
    while (sessions_need_processing.empty())
 
337
    {
 
338
      if (kill_pool_threads)
 
339
      {
 
340
        /* the flag that we should die has been set */
 
341
        (void) pthread_mutex_unlock(&LOCK_event_loop);
 
342
        goto thread_exit;
 
343
      }
 
344
      event_loop(EVLOOP_ONCE);
 
345
    }
 
346
 
 
347
    /* pop the first session off the queue */
 
348
    session= sessions_need_processing.front();
 
349
    sessions_need_processing.pop();
 
350
    session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
 
351
 
 
352
    (void) pthread_mutex_unlock(&LOCK_event_loop);
 
353
 
 
354
    /* now we process the connection (session) */
 
355
 
 
356
    /* set up the session<->thread links. */
 
357
    session->thread_stack= (char*) &session;
 
358
 
 
359
    if (sched->thread_attach())
 
360
    {
 
361
      libevent_connection_close(session);
 
362
      continue;
 
363
    }
 
364
 
 
365
    /* is the connection logged in yet? */
 
366
    if (!sched->logged_in)
 
367
    {
 
368
      if (session->authenticate())
 
369
      {
 
370
        /* Failed to log in */
 
371
        libevent_connection_close(session);
 
372
        continue;
 
373
      }
 
374
      else
 
375
      {
 
376
        /* login successful */
 
377
        sched->logged_in= true;
 
378
        session->prepareForQueries();
 
379
        if (!libevent_needs_immediate_processing(session))
 
380
          continue; /* New connection is now waiting for data in libevent*/
 
381
      }
 
382
    }
 
383
 
 
384
    do
 
385
    {
 
386
      /* Process a query */
 
387
      if (! session->executeStatement())
 
388
      {
 
389
        libevent_connection_close(session);
 
390
        break;
 
391
      }
 
392
    } while (libevent_needs_immediate_processing(session));
 
393
 
 
394
    if (kill_pool_threads) /* the flag that we should die has been set */
 
395
      goto thread_exit;
 
396
  }
 
397
 
 
398
thread_exit:
 
399
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
400
  created_threads--;
 
401
  pthread_cond_broadcast(&COND_thread_count);
 
402
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
403
  internal::my_thread_end();
 
404
  pthread_exit(0);
 
405
 
 
406
  return NULL;                               /* purify: deadcode */
 
407
}
 
408
 
 
409
 
 
410
/**
 
411
 * @brief
 
412
 *  Checks if a session needs immediate processing
 
413
 *
 
414
 * @retval true the session needs immediate processing 
 
415
 * @retval false if not, and is detached from the thread waiting for another
 
416
 * adding. The naming of the function is misleading in this case; it
 
417
 * actually does more than just checking if immediate processing is needed.
 
418
 */
 
419
static bool libevent_needs_immediate_processing(Session *session)
 
420
{
 
421
  session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
 
422
 
 
423
  if (libevent_should_close_connection(session))
 
424
  {
 
425
    libevent_connection_close(session);
 
426
    return false;
 
427
  }
 
428
  /*
 
429
   If more data in the socket buffer, return true to process another command.
 
430
  
 
431
   Note: we cannot add for event processing because the whole request
 
432
   might already be buffered and we wouldn't receive an event. This is
 
433
   indeed the root of the reason of low performace. Need to be changed
 
434
   when nonblocking Protocol is finished.
 
435
  */
 
436
  if (session->client->haveMoreData())
 
437
    return true;
 
438
 
 
439
  sched->thread_detach();
 
440
  libevent_session_add(session);
 
441
 
 
442
  return false;
 
443
}
 
444
 
 
445
 
 
446
/**
 
447
 * @brief 
 
448
 *  Adds a Session to queued for libevent processing.
 
449
 * 
 
450
 * @details
 
451
 *  This call does not actually register the event with libevent.
 
452
 *  Instead, it places the Session onto a queue and signals libevent by writing
 
453
 *  a byte into session_add_pipe, which will cause our libevent_add_session_callback to
 
454
 *  be invoked which will find the Session on the queue and add it to libevent.
 
455
 */
 
456
void libevent_session_add(Session* session)
 
457
{
 
458
  session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
 
459
  assert(sched);
 
460
  PoolOfThreadsScheduler *pot_scheduler=
 
461
    static_cast<PoolOfThreadsScheduler *>(session->scheduler);
 
462
  pot_scheduler->sessionAddToQueue(sched);
 
463
}
 
464
 
 
465
void PoolOfThreadsScheduler::sessionAddToQueue(session_scheduler *sched)
 
466
{
 
467
  char c= 0;
 
468
  pthread_mutex_lock(&LOCK_session_add);
 
469
  if (sessions_need_adding.empty())
 
470
  {
 
471
    /* notify libevent */
 
472
    size_t written= write(session_add_pipe[1], &c, sizeof(c));
 
473
    assert(written == sizeof(c));
 
474
  }
 
475
  /* queue for libevent */
 
476
  sessions_need_adding.push(sched->session);
 
477
  pthread_mutex_unlock(&LOCK_session_add);
 
478
}
 
479
 
 
480
 
 
481
PoolOfThreadsScheduler::PoolOfThreadsScheduler(const char *name_arg)
 
482
  : Scheduler(name_arg), sessions_need_adding(), sessions_to_be_killed(),
 
483
    sessions_need_processing(), sessions_waiting_for_io()
 
484
{
 
485
  struct sched_param tmp_sched_param;
 
486
 
 
487
  memset(&tmp_sched_param, 0, sizeof(struct sched_param));
 
488
  /* Setup attribute parameter for session threads. */
 
489
  (void) pthread_attr_init(&attr);
 
490
  (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 
491
  pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
 
492
 
 
493
  tmp_sched_param.sched_priority= WAIT_PRIOR;
 
494
  (void) pthread_attr_setschedparam(&attr, &tmp_sched_param);
 
495
 
 
496
  pthread_mutex_init(&LOCK_session_add, NULL);
 
497
  pthread_mutex_init(&LOCK_session_kill, NULL);
 
498
  pthread_mutex_init(&LOCK_event_loop, NULL);
 
499
 
 
500
}
 
501
 
 
502
 
 
503
PoolOfThreadsScheduler::~PoolOfThreadsScheduler()
 
504
{
 
505
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
506
 
 
507
  kill_pool_threads= true;
 
508
  while (created_threads)
 
509
  {
 
510
    /*
 
511
     * Wake up the event loop
 
512
     */
 
513
    char c= 0;
 
514
    size_t written= write(session_add_pipe[1], &c, sizeof(c));
 
515
    assert(written == sizeof(c));
 
516
 
 
517
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
 
518
  }
 
519
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
520
 
 
521
  event_del(&session_add_event);
 
522
  close(session_add_pipe[0]);
 
523
  close(session_add_pipe[1]);
 
524
  event_del(&session_kill_event);
 
525
  close(session_kill_pipe[0]);
 
526
  close(session_kill_pipe[1]);
 
527
 
 
528
  (void) pthread_mutex_destroy(&LOCK_event_loop);
 
529
  (void) pthread_mutex_destroy(&LOCK_session_add);
 
530
  (void) pthread_mutex_destroy(&LOCK_session_kill);
 
531
  (void) pthread_attr_destroy(&attr);
 
532
}
 
533
 
 
534
 
 
535
bool PoolOfThreadsScheduler::addSession(Session *session)
 
536
{
 
537
  assert(session->scheduler_arg == NULL);
 
538
  session_scheduler *sched= new session_scheduler(session);
 
539
 
 
540
  if (sched == NULL)
 
541
    return true;
 
542
 
 
543
  session->scheduler_arg= (void *)sched;
 
544
 
 
545
  libevent_session_add(session);
 
546
 
 
547
  return false;
 
548
}
 
549
 
 
550
 
 
551
void PoolOfThreadsScheduler::killSession(Session *session)
 
552
{
 
553
  char c= 0;
 
554
 
 
555
  pthread_mutex_lock(&LOCK_session_kill);
 
556
 
 
557
  if (sessions_to_be_killed.empty())
 
558
  {
 
559
    /* 
 
560
      Notify libevent with the killing event if this's the first killing
 
561
      notification of the batch
 
562
    */
 
563
    size_t written= write(session_kill_pipe[1], &c, sizeof(c));
 
564
    assert(written == sizeof(c));
 
565
  }
 
566
 
 
567
  /*
 
568
    Push into the sessions_to_be_killed queue
 
569
  */
 
570
  sessions_to_be_killed.push(session);
 
571
  pthread_mutex_unlock(&LOCK_session_kill);
 
572
}
 
573
 
 
574
 
 
575
bool PoolOfThreadsScheduler::libevent_init(void)
 
576
{
 
577
  uint32_t x;
 
578
 
 
579
  event_init();
 
580
 
 
581
 
 
582
  /* Set up the pipe used to add new sessions to the event pool */
 
583
  if (init_pipe(session_add_pipe))
 
584
  {
 
585
    errmsg_printf(ERRMSG_LVL_ERROR,
 
586
                  _("init_pipe(session_add_pipe) error in libevent_init\n"));
 
587
    return true;
 
588
  }
 
589
  /* Set up the pipe used to kill sessions in the event queue */
 
590
  if (init_pipe(session_kill_pipe))
 
591
  {
 
592
    errmsg_printf(ERRMSG_LVL_ERROR,
 
593
                  _("init_pipe(session_kill_pipe) error in libevent_init\n"));
 
594
    close(session_add_pipe[0]);
 
595
    close(session_add_pipe[1]);
 
596
    return true;
 
597
  }
 
598
  event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
 
599
            libevent_add_session_callback, this);
 
600
  event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
 
601
            libevent_kill_session_callback, this);
 
602
 
 
603
  if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
 
604
  {
 
605
    errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
 
606
    return true;
 
607
 
 
608
  }
 
609
  /* Set up the thread pool */
 
610
  pthread_mutex_lock(&LOCK_thread_count);
 
611
 
 
612
  for (x= 0; x < size; x++)
 
613
  {
 
614
    pthread_t thread;
 
615
    int error;
 
616
    if ((error= pthread_create(&thread, &attr, libevent_thread_proc, this)))
 
617
    {
 
618
      errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
 
619
                    error);
 
620
      pthread_mutex_unlock(&LOCK_thread_count);
 
621
      return true;
 
622
    }
 
623
  }
 
624
 
 
625
  /* Wait until all threads are created */
 
626
  while (created_threads != size)
 
627
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
 
628
  pthread_mutex_unlock(&LOCK_thread_count);
 
629
 
 
630
  return false;
 
631
}
 
632
 
 
633
 
 
634
/**
 
635
 * @brief
 
636
 *  Called to initialize the pool of threads scheduler plugin
 
637
 * 
 
638
 * @param[in] registry holding the record of the plugins
 
639
 */
 
640
static int init(drizzled::plugin::Registry &registry)
 
641
{
 
642
  assert(size != 0);
 
643
 
 
644
  scheduler= new PoolOfThreadsScheduler("pool_of_threads");
 
645
  registry.add(scheduler);
 
646
 
 
647
  return 0;
 
648
}
 
649
 
 
650
/**
 
651
 * @brief
 
652
 *  Waits until all pool threads have been deleted for clean shutdown
 
653
 */
 
654
static int deinit(drizzled::plugin::Registry &registry)
 
655
{
 
656
  registry.remove(scheduler);
 
657
  delete scheduler;
 
658
 
 
659
  return 0;
 
660
}
 
661
 
 
662
/*
 
663
 The defaults here were picked based on what I see (aka Brian). They should
 
664
 be vetted across a larger audience.
 
665
*/
 
666
static DRIZZLE_SYSVAR_UINT(size, size,
 
667
                           PLUGIN_VAR_RQCMDARG,
 
668
                           N_("Size of Pool."),
 
669
                           NULL, NULL, 8, 1, 1024, 0);
 
670
 
 
671
static drizzle_sys_var* sys_variables[]= {
 
672
  DRIZZLE_SYSVAR(size),
 
673
  NULL,
 
674
};
 
675
 
 
676
DRIZZLE_DECLARE_PLUGIN
 
677
{
 
678
  DRIZZLE_VERSION_ID,
 
679
  "pool_of_threads",
 
680
  "0.1",
 
681
  "Brian Aker",
 
682
  "Pool of Threads Scheduler",
 
683
  PLUGIN_LICENSE_GPL,
 
684
  init, /* Plugin Init */
 
685
  deinit, /* Plugin Deinit */
 
686
  sys_variables,   /* system variables */
 
687
  NULL    /* config options */
 
688
}
 
689
DRIZZLE_DECLARE_PLUGIN_END;