~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/pool_of_threads/pool_of_threads.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-10-02 14:17:48 UTC
  • mfrom: (1.1.1 upstream)
  • mto: (2.1.17 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20101002141748-m6vbfbfjhrw1153e
Tags: 2010.09.1802-1
* New upstream release.
* Removed pid-file argument hack.
* Updated GPL-2 address to be new address.
* Directly copy in drizzledump.1 since debian doesn't have sphinx 1.0 yet.
* Link to jquery from libjs-jquery. Add it as a depend.
* Add drizzled.8 symlink to the install files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
#include <plugin/pool_of_threads/pool_of_threads.h>
24
24
#include "drizzled/pthread_globals.h"
25
25
#include "drizzled/internal/my_pthread.h"
26
 
 
 
26
#include <boost/program_options.hpp>
 
27
#include <drizzled/module/option_map.h>
 
28
 
 
29
#include <boost/thread/thread.hpp>
 
30
#include <boost/bind.hpp>
 
31
 
 
32
namespace po= boost::program_options;
27
33
using namespace std;
28
34
using namespace drizzled;
29
35
 
30
36
/* Global's (TBR) */
31
 
static PoolOfThreadsScheduler *scheduler= NULL;
32
37
 
33
38
/**
34
39
 * Set this to true to trigger killing of all threads in the pool
36
41
static volatile bool kill_pool_threads= false;
37
42
 
38
43
static volatile uint32_t created_threads= 0;
39
 
static int deinit(drizzled::plugin::Registry &registry);
40
44
 
41
45
static struct event session_add_event;
42
46
static struct event session_kill_event;
50
54
static void libevent_connection_close(Session *session);
51
55
void libevent_session_add(Session* session);
52
56
bool libevent_should_close_connection(Session* session);
 
57
void libevent_thread_proc(PoolOfThreadsScheduler *pot_scheduler);
53
58
extern "C" {
 
59
#if 0
54
60
  void *libevent_thread_proc(void *arg);
 
61
#endif
55
62
  void libevent_io_callback(int Fd, short Operation, void *ctx);
56
63
  void libevent_add_session_callback(int Fd, short Operation, void *ctx);
57
64
  void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
58
65
}
59
66
 
60
 
static uint32_t size= 0;
 
67
static uint32_t pool_size;
61
68
 
62
69
/**
63
70
 * @brief 
130
137
  char c;
131
138
  int count= 0;
132
139
 
133
 
  pthread_mutex_lock(&LOCK_session_kill);
 
140
  LOCK_session_kill.lock();
134
141
  while (! sessions_to_be_killed.empty())
135
142
  {
136
143
 
138
145
     Fetch a session from the queue
139
146
    */
140
147
    Session* session= sessions_to_be_killed.front();
141
 
    pthread_mutex_unlock(&LOCK_session_kill);
 
148
    LOCK_session_kill.unlock();
142
149
 
143
150
    session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
144
151
    assert(sched);
157
164
    */
158
165
    sessions_need_processing.push(sched->session);
159
166
 
160
 
    pthread_mutex_lock(&LOCK_session_kill);
 
167
    LOCK_session_kill.lock();
161
168
    /*
162
169
     Pop until this session is already processed
163
170
    */
173
180
    count++;
174
181
  }
175
182
  assert(count == 1);
176
 
  pthread_mutex_unlock(&LOCK_session_kill);
 
183
  LOCK_session_kill.unlock();
177
184
}
178
185
 
179
186
 
202
209
  char c;
203
210
  int count= 0;
204
211
 
205
 
  pthread_mutex_lock(&LOCK_session_add);
 
212
  LOCK_session_add.lock();
206
213
  while (! sessions_need_adding.empty())
207
214
  {
208
215
    /*
209
216
     Pop the first session off the queue 
210
217
    */
211
218
    Session* session= sessions_need_adding.front();
212
 
    pthread_mutex_unlock(&LOCK_session_add);
 
219
    LOCK_session_add.unlock();
213
220
 
214
221
    session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
215
222
    assert(sched);
237
244
      }
238
245
    }
239
246
 
240
 
    pthread_mutex_lock(&LOCK_session_add);
 
247
    LOCK_session_add.lock();
241
248
    /*
242
249
     Pop until this session is already processed
243
250
    */
253
260
    count++;
254
261
  }
255
262
  assert(count == 1);
256
 
  pthread_mutex_unlock(&LOCK_session_add);
 
263
  LOCK_session_add.unlock();
257
264
}
258
265
 
259
266
/**
301
308
 *  These procs only return/terminate on shutdown (kill_pool_threads ==
302
309
 *  true).
303
310
 */
304
 
void *libevent_thread_proc(void *ctx)
 
311
void libevent_thread_proc(PoolOfThreadsScheduler *pot_scheduler)
305
312
{
306
313
  if (internal::my_thread_init())
307
314
  {
310
317
    exit(1);
311
318
  }
312
319
 
313
 
  PoolOfThreadsScheduler *pot_scheduler=
314
 
    reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
315
 
  return pot_scheduler->mainLoop();
 
320
  (void)pot_scheduler->mainLoop();
316
321
}
317
322
 
318
323
void *PoolOfThreadsScheduler::mainLoop()
321
326
   Signal libevent_init() when all threads has been created and are ready
322
327
   to receive events.
323
328
  */
324
 
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
329
  (void) LOCK_thread_count.lock();
325
330
  created_threads++;
326
 
  if (created_threads == size)
327
 
    (void) pthread_cond_signal(&COND_thread_count);
328
 
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
331
  if (created_threads == pool_size)
 
332
    COND_thread_count.notify_one();
 
333
 
 
334
  (void) LOCK_thread_count.unlock();
329
335
 
330
336
  for (;;)
331
337
  {
332
338
    Session *session= NULL;
333
 
    (void) pthread_mutex_lock(&LOCK_event_loop);
 
339
    LOCK_event_loop.lock();
334
340
 
335
341
    /* get session(s) to process */
336
342
    while (sessions_need_processing.empty())
338
344
      if (kill_pool_threads)
339
345
      {
340
346
        /* the flag that we should die has been set */
341
 
        (void) pthread_mutex_unlock(&LOCK_event_loop);
 
347
        LOCK_event_loop.unlock();
342
348
        goto thread_exit;
343
349
      }
344
350
      event_loop(EVLOOP_ONCE);
349
355
    sessions_need_processing.pop();
350
356
    session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
351
357
 
352
 
    (void) pthread_mutex_unlock(&LOCK_event_loop);
 
358
    LOCK_event_loop.lock();
353
359
 
354
360
    /* now we process the connection (session) */
355
361
 
396
402
  }
397
403
 
398
404
thread_exit:
399
 
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
405
  (void) LOCK_thread_count.lock();
400
406
  created_threads--;
401
 
  pthread_cond_broadcast(&COND_thread_count);
402
 
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
407
  COND_thread_count.notify_all();
 
408
  (void) LOCK_thread_count.unlock();
403
409
  internal::my_thread_end();
404
410
  pthread_exit(0);
405
411
 
465
471
void PoolOfThreadsScheduler::sessionAddToQueue(session_scheduler *sched)
466
472
{
467
473
  char c= 0;
468
 
  pthread_mutex_lock(&LOCK_session_add);
 
474
  boost::mutex::scoped_lock scopedLock(LOCK_session_add);
469
475
  if (sessions_need_adding.empty())
470
476
  {
471
477
    /* notify libevent */
474
480
  }
475
481
  /* queue for libevent */
476
482
  sessions_need_adding.push(sched->session);
477
 
  pthread_mutex_unlock(&LOCK_session_add);
478
483
}
479
484
 
480
485
 
482
487
  : Scheduler(name_arg), sessions_need_adding(), sessions_to_be_killed(),
483
488
    sessions_need_processing(), sessions_waiting_for_io()
484
489
{
485
 
  struct sched_param tmp_sched_param;
486
 
 
487
 
  memset(&tmp_sched_param, 0, sizeof(struct sched_param));
488
490
  /* Setup attribute parameter for session threads. */
489
491
  (void) pthread_attr_init(&attr);
490
492
  (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
491
493
  pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
492
 
 
493
 
  tmp_sched_param.sched_priority= WAIT_PRIOR;
494
 
  (void) pthread_attr_setschedparam(&attr, &tmp_sched_param);
495
 
 
496
 
  pthread_mutex_init(&LOCK_session_add, NULL);
497
 
  pthread_mutex_init(&LOCK_session_kill, NULL);
498
 
  pthread_mutex_init(&LOCK_event_loop, NULL);
499
 
 
500
494
}
501
495
 
502
496
 
503
497
PoolOfThreadsScheduler::~PoolOfThreadsScheduler()
504
498
{
505
 
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
499
  (void) LOCK_thread_count.lock();
506
500
 
507
501
  kill_pool_threads= true;
508
502
  while (created_threads)
514
508
    size_t written= write(session_add_pipe[1], &c, sizeof(c));
515
509
    assert(written == sizeof(c));
516
510
 
517
 
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
 
511
    pthread_cond_wait(COND_thread_count.native_handle(), LOCK_thread_count.native_handle());
518
512
  }
519
 
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
513
  (void) LOCK_thread_count.unlock();
520
514
 
521
515
  event_del(&session_add_event);
522
516
  close(session_add_pipe[0]);
525
519
  close(session_kill_pipe[0]);
526
520
  close(session_kill_pipe[1]);
527
521
 
528
 
  (void) pthread_mutex_destroy(&LOCK_event_loop);
529
 
  (void) pthread_mutex_destroy(&LOCK_session_add);
530
 
  (void) pthread_mutex_destroy(&LOCK_session_kill);
531
522
  (void) pthread_attr_destroy(&attr);
532
523
}
533
524
 
552
543
{
553
544
  char c= 0;
554
545
 
555
 
  pthread_mutex_lock(&LOCK_session_kill);
 
546
  boost::mutex::scoped_lock scopedLock(LOCK_session_kill);
556
547
 
557
548
  if (sessions_to_be_killed.empty())
558
549
  {
568
559
    Push into the sessions_to_be_killed queue
569
560
  */
570
561
  sessions_to_be_killed.push(session);
571
 
  pthread_mutex_unlock(&LOCK_session_kill);
572
562
}
573
563
 
574
564
 
575
565
bool PoolOfThreadsScheduler::libevent_init(void)
576
566
{
577
 
  uint32_t x;
578
 
 
579
567
  event_init();
580
568
 
581
569
 
607
595
 
608
596
  }
609
597
  /* Set up the thread pool */
610
 
  pthread_mutex_lock(&LOCK_thread_count);
 
598
  boost::mutex::scoped_lock scopedLock(LOCK_thread_count);
611
599
 
612
 
  for (x= 0; x < size; x++)
 
600
  for (uint32_t x= 0; x < pool_size; x++)
613
601
  {
614
 
    pthread_t thread;
615
 
    int error;
616
 
    if ((error= pthread_create(&thread, &attr, libevent_thread_proc, this)))
 
602
    if (not new boost::thread(boost::bind(libevent_thread_proc, this)))
617
603
    {
618
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
619
 
                    error);
620
 
      pthread_mutex_unlock(&LOCK_thread_count);
 
604
      errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"), 1);
621
605
      return true;
622
606
    }
623
607
  }
624
608
 
625
609
  /* Wait until all threads are created */
626
 
  while (created_threads != size)
627
 
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
628
 
  pthread_mutex_unlock(&LOCK_thread_count);
 
610
  while (created_threads != pool_size)
 
611
  {
 
612
    COND_thread_count.wait(scopedLock);
 
613
  }
629
614
 
630
615
  return false;
631
616
}
637
622
 * 
638
623
 * @param[in] registry holding the record of the plugins
639
624
 */
640
 
static int init(drizzled::plugin::Registry &registry)
641
 
{
642
 
  assert(size != 0);
643
 
 
644
 
  scheduler= new PoolOfThreadsScheduler("pool_of_threads");
645
 
  registry.add(scheduler);
646
 
 
647
 
  return 0;
648
 
}
649
 
 
650
 
/**
651
 
 * @brief
652
 
 *  Waits until all pool threads have been deleted for clean shutdown
653
 
 */
654
 
static int deinit(drizzled::plugin::Registry &registry)
655
 
{
656
 
  registry.remove(scheduler);
657
 
  delete scheduler;
 
625
static int init(drizzled::module::Context &context)
 
626
{
 
627
  const module::option_map &vm= context.getOptions();
 
628
 
 
629
  if (vm.count("size"))
 
630
  {
 
631
    if (pool_size > 1024 || pool_size < 1)
 
632
    {
 
633
      errmsg_printf(ERRMSG_LVL_ERROR, _("Invalid value for size\n"));
 
634
      exit(-1);
 
635
    }
 
636
  }
 
637
 
 
638
  assert(pool_size != 0);
 
639
 
 
640
  context.add(new PoolOfThreadsScheduler("pool_of_threads"));
658
641
 
659
642
  return 0;
660
643
}
663
646
 The defaults here were picked based on what I see (aka Brian). They should
664
647
 be vetted across a larger audience.
665
648
*/
666
 
static DRIZZLE_SYSVAR_UINT(size, size,
 
649
static DRIZZLE_SYSVAR_UINT(size, pool_size,
667
650
                           PLUGIN_VAR_RQCMDARG,
668
651
                           N_("Size of Pool."),
669
652
                           NULL, NULL, 8, 1, 1024, 0);
670
653
 
 
654
static void init_options(drizzled::module::option_context &context)
 
655
{
 
656
  context("size",
 
657
          po::value<uint32_t>(&pool_size)->default_value(8),
 
658
          N_("Size of Pool."));
 
659
}
 
660
 
671
661
static drizzle_sys_var* sys_variables[]= {
672
662
  DRIZZLE_SYSVAR(size),
673
663
  NULL,
682
672
  "Pool of Threads Scheduler",
683
673
  PLUGIN_LICENSE_GPL,
684
674
  init, /* Plugin Init */
685
 
  deinit, /* Plugin Deinit */
686
675
  sys_variables,   /* system variables */
687
 
  NULL    /* config options */
 
676
  init_options    /* config options */
688
677
}
689
678
DRIZZLE_DECLARE_PLUGIN_END;