~linuxjedi/drizzle/trunk-bug-667053

« back to all changes in this revision

Viewing changes to sql/scheduler.cc

  • Committer: brian
  • Date: 2008-06-25 05:29:13 UTC
  • Revision ID: brian@localhost.localdomain-20080625052913-6upwo0jsrl4lnapl
clean slate

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2007 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/*
 
17
  Implementation for the thread scheduler
 
18
*/
 
19
 
 
20
#ifdef USE_PRAGMA_INTERFACE
 
21
#pragma implementation
 
22
#endif
 
23
 
 
24
#include <mysql_priv.h>
 
25
#include "event.h"
 
26
 
 
27
 
 
28
/*
 
29
  'Dummy' functions to be used when we don't need any handling for a scheduler
 
30
  event
 
31
 */
 
32
 
 
33
static bool init_dummy(void) {return 0;}
 
34
static void post_kill_dummy(THD *thd) {}  
 
35
static void end_dummy(void) {}
 
36
static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; }
 
37
 
 
38
/*
 
39
  Initialize default scheduler with dummy functions so that setup functions
 
40
  only need to declare those that are relvant for their usage
 
41
*/
 
42
 
 
43
scheduler_functions::scheduler_functions()
 
44
  :init(init_dummy),
 
45
   init_new_connection_thread(init_new_connection_handler_thread),
 
46
   add_connection(0),                           // Must be defined
 
47
   post_kill_notification(post_kill_dummy),
 
48
   end_thread(end_thread_dummy), end(end_dummy)
 
49
{}
 
50
 
 
51
 
 
52
/*
 
53
  End connection, in case when we are using 'no-threads'
 
54
*/
 
55
 
 
56
static bool no_threads_end(THD *thd, bool put_in_cache)
 
57
{
 
58
  unlink_thd(thd);
 
59
  pthread_mutex_unlock(&LOCK_thread_count);
 
60
  return 1;                                     // Abort handle_one_connection
 
61
}
 
62
 
 
63
 
 
64
/*
 
65
  Initailize scheduler for --thread-handling=no-threads
 
66
*/
 
67
 
 
68
void one_thread_scheduler(scheduler_functions *func)
 
69
{
 
70
  func->max_threads= 1;
 
71
  func->add_connection= handle_connection_in_main_thread;
 
72
  func->init_new_connection_thread= init_dummy;
 
73
  func->end_thread= no_threads_end;
 
74
}
 
75
 
 
76
 
 
77
/*
 
78
  Initialize scheduler for --thread-handling=one-thread-per-connection
 
79
*/
 
80
 
 
81
void one_thread_per_connection_scheduler(scheduler_functions *func)
 
82
{
 
83
  func->max_threads= max_connections;
 
84
  func->add_connection= create_thread_to_handle_connection;
 
85
  func->end_thread= one_thread_per_connection_end;
 
86
}
 
87
 
 
88
static uint created_threads, killed_threads;
 
89
static bool kill_pool_threads;
 
90
 
 
91
static struct event thd_add_event;
 
92
static struct event thd_kill_event;
 
93
 
 
94
static pthread_mutex_t LOCK_thd_add;    /* protects thds_need_adding */
 
95
static LIST *thds_need_adding;    /* list of thds to add to libevent queue */
 
96
 
 
97
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
 
98
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
 
99
 
 
100
/*
 
101
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and 
 
102
  event_del) and thds_need_processing and thds_waiting_for_io.
 
103
*/
 
104
static pthread_mutex_t LOCK_event_loop;
 
105
static LIST *thds_need_processing; /* list of thds that needs some processing */
 
106
static LIST *thds_waiting_for_io; /* list of thds with added events */
 
107
 
 
108
pthread_handler_t libevent_thread_proc(void *arg);
 
109
static void libevent_end();
 
110
static bool libevent_needs_immediate_processing(THD *thd);
 
111
static void libevent_connection_close(THD *thd);
 
112
static bool libevent_should_close_connection(THD* thd);
 
113
static void libevent_thd_add(THD* thd);
 
114
void libevent_io_callback(int Fd, short Operation, void *ctx);
 
115
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
 
116
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
 
117
 
 
118
 
 
119
/*
 
120
  Create a pipe and set to non-blocking.
 
121
  Returns TRUE if there is an error.
 
122
*/
 
123
 
 
124
static bool init_pipe(int pipe_fds[])
 
125
{
 
126
  int flags;
 
127
  return pipe(pipe_fds) < 0 ||
 
128
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
 
129
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
 
130
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
 
131
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
 
132
}
 
133
 
 
134
 
 
135
/*
 
136
  thd_scheduler keeps the link between THD and events.
 
137
  It's embedded in the THD class.
 
138
*/
 
139
 
 
140
thd_scheduler::thd_scheduler()
 
141
  : logged_in(FALSE), io_event(NULL), thread_attached(FALSE)
 
142
{  
 
143
#ifndef DBUG_OFF
 
144
  dbug_explain_buf[0]= 0;
 
145
#endif
 
146
}
 
147
 
 
148
 
 
149
thd_scheduler::~thd_scheduler()
 
150
{
 
151
  my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
 
152
}
 
153
 
 
154
 
 
155
bool thd_scheduler::init(THD *parent_thd)
 
156
{
 
157
  io_event=
 
158
    (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
 
159
    
 
160
  if (!io_event)
 
161
  {
 
162
    sql_print_error("Memory allocation error in thd_scheduler::init\n");
 
163
    return TRUE;
 
164
  }
 
165
  
 
166
  event_set(io_event, parent_thd->net.vio->sd, EV_READ, 
 
167
            libevent_io_callback, (void*)parent_thd);
 
168
    
 
169
  list.data= parent_thd;
 
170
  
 
171
  return FALSE;
 
172
}
 
173
 
 
174
 
 
175
/*
 
176
  Attach/associate the connection with the OS thread, for command processing.
 
177
*/
 
178
 
 
179
bool thd_scheduler::thread_attach()
 
180
{
 
181
  DBUG_ASSERT(!thread_attached);
 
182
  THD* thd = (THD*)list.data;
 
183
  if (libevent_should_close_connection(thd) ||
 
184
      setup_connection_thread_globals(thd))
 
185
  {
 
186
    return TRUE;
 
187
  }
 
188
  my_errno= 0;
 
189
  thd->mysys_var->abort= 0;
 
190
  thread_attached= TRUE;
 
191
#ifndef DBUG_OFF
 
192
  swap_dbug_explain();
 
193
#endif
 
194
  return FALSE;
 
195
}
 
196
 
 
197
 
 
198
/*
 
199
  Detach/disassociate the connection with the OS thread.
 
200
*/
 
201
 
 
202
void thd_scheduler::thread_detach()
 
203
{
 
204
  if (thread_attached)
 
205
  {
 
206
    THD* thd = (THD*)list.data;
 
207
    thd->mysys_var= NULL;
 
208
    thread_attached= FALSE;
 
209
#ifndef DBUG_OFF
 
210
    swap_dbug_explain();
 
211
#endif
 
212
  }
 
213
}
 
214
 
 
215
 
 
216
/*
 
217
  Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
 
218
 
 
219
  This is used to preserve the SESSION DEBUG variable, which is mapped to the OS 
 
220
  thread during a command, but each command is handled by a different thread.
 
221
*/
 
222
 
 
223
#ifndef DBUG_OFF
 
224
void thd_scheduler::swap_dbug_explain()
 
225
{
 
226
  char buffer[sizeof(dbug_explain_buf)];
 
227
  if (DBUG_EXPLAIN(buffer, sizeof(buffer)))
 
228
    sql_print_error("DBUG_EXPLAIN buffer too small.\n");
 
229
  DBUG_POP();
 
230
  DBUG_PUSH(dbug_explain_buf);
 
231
  memcpy(dbug_explain_buf, buffer, sizeof(buffer));
 
232
}
 
233
#endif
 
234
 
 
235
/**
 
236
  Create all threads for the thread pool
 
237
 
 
238
  NOTES
 
239
    After threads are created we wait until all threads has signaled that
 
240
    they have started before we return
 
241
 
 
242
  RETURN
 
243
    0  ok
 
244
    1  We got an error creating the thread pool
 
245
       In this case we will abort all created threads
 
246
*/
 
247
 
 
248
static bool libevent_init(void)
 
249
{
 
250
  uint i;
 
251
  DBUG_ENTER("libevent_init");
 
252
 
 
253
  event_init();
 
254
  
 
255
  created_threads= 0;
 
256
  killed_threads= 0;
 
257
  kill_pool_threads= FALSE;
 
258
  
 
259
  pthread_mutex_init(&LOCK_event_loop, NULL);
 
260
  pthread_mutex_init(&LOCK_thd_add, NULL);
 
261
  
 
262
  /* set up the pipe used to add new thds to the event pool */
 
263
  if (init_pipe(thd_add_pipe))
 
264
  {
 
265
    sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
 
266
    DBUG_RETURN(1);
 
267
  }
 
268
  /* set up the pipe used to kill thds in the event queue */
 
269
  if (init_pipe(thd_kill_pipe))
 
270
  {
 
271
    sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
 
272
    close(thd_add_pipe[0]);
 
273
    close(thd_add_pipe[1]);
 
274
    DBUG_RETURN(1);
 
275
  }
 
276
  event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
 
277
            libevent_add_thd_callback, NULL);
 
278
  event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
 
279
            libevent_kill_thd_callback, NULL);
 
280
 
 
281
 if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
 
282
 {
 
283
   sql_print_error("thd_add_event event_add error in libevent_init\n");
 
284
   libevent_end();
 
285
   DBUG_RETURN(1);
 
286
   
 
287
 }
 
288
  /* Set up the thread pool */
 
289
  created_threads= killed_threads= 0;
 
290
  pthread_mutex_lock(&LOCK_thread_count);
 
291
 
 
292
  for (i= 0; i < thread_pool_size; i++)
 
293
  {
 
294
    pthread_t thread;
 
295
    int error;
 
296
    if ((error= pthread_create(&thread, &connection_attrib,
 
297
                               libevent_thread_proc, 0)))
 
298
    {
 
299
      sql_print_error("Can't create completion port thread (error %d)",
 
300
                      error);
 
301
      pthread_mutex_unlock(&LOCK_thread_count);
 
302
      libevent_end();                      // Cleanup
 
303
      DBUG_RETURN(TRUE);
 
304
    }
 
305
  }
 
306
 
 
307
  /* Wait until all threads are created */
 
308
  while (created_threads != thread_pool_size)
 
309
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
 
310
  pthread_mutex_unlock(&LOCK_thread_count);
 
311
  
 
312
  DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size));
 
313
  DBUG_RETURN(FALSE);
 
314
}
 
315
 
 
316
 
 
317
/*
 
318
  This is called when data is ready on the socket.
 
319
  
 
320
  NOTES
 
321
    This is only called by the thread that owns LOCK_event_loop.
 
322
  
 
323
    We add the thd that got the data to thds_need_processing, and 
 
324
    cause the libevent event_loop() to terminate. Then this same thread will
 
325
    return from event_loop and pick the thd value back up for processing.
 
326
*/
 
327
 
 
328
void libevent_io_callback(int, short, void *ctx)
 
329
{    
 
330
  safe_mutex_assert_owner(&LOCK_event_loop);
 
331
  THD *thd= (THD*)ctx;
 
332
  thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
 
333
  thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
 
334
}
 
335
 
 
336
/*
 
337
  This is called when we have a thread we want to be killed.
 
338
  
 
339
  NOTES
 
340
    This is only called by the thread that owns LOCK_event_loop.
 
341
*/
 
342
 
 
343
void libevent_kill_thd_callback(int Fd, short, void*)
 
344
{    
 
345
  safe_mutex_assert_owner(&LOCK_event_loop);
 
346
 
 
347
  /* clear the pending events */
 
348
  char c;
 
349
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
 
350
  {}
 
351
 
 
352
  LIST* list= thds_waiting_for_io;
 
353
  while (list)
 
354
  {
 
355
    THD *thd= (THD*)list->data;
 
356
    list= list_rest(list);
 
357
    if (thd->killed == THD::KILL_CONNECTION)
 
358
    {
 
359
      /*
 
360
        Delete from libevent and add to the processing queue.
 
361
      */
 
362
      event_del(thd->scheduler.io_event);
 
363
      thds_waiting_for_io= list_delete(thds_waiting_for_io,
 
364
                                       &thd->scheduler.list);
 
365
      thds_need_processing= list_add(thds_need_processing,
 
366
                                     &thd->scheduler.list);
 
367
    }
 
368
  }
 
369
}
 
370
 
 
371
 
 
372
/*
 
373
  This is used to add connections to the pool. This callback is invoked from
 
374
  the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
 
375
  written to it.
 
376
  
 
377
  NOTES
 
378
    This is only called by the thread that owns LOCK_event_loop.
 
379
*/
 
380
 
 
381
void libevent_add_thd_callback(int Fd, short, void *)
 
382
 
383
  safe_mutex_assert_owner(&LOCK_event_loop);
 
384
 
 
385
  /* clear the pending events */
 
386
  char c;
 
387
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
 
388
  {}
 
389
 
 
390
  pthread_mutex_lock(&LOCK_thd_add);
 
391
  while (thds_need_adding)
 
392
  {
 
393
    /* pop the first thd off the list */
 
394
    THD* thd= (THD*)thds_need_adding->data;
 
395
    thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
 
396
 
 
397
    pthread_mutex_unlock(&LOCK_thd_add);
 
398
    
 
399
    if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
 
400
    {
 
401
      /*
 
402
        Add thd to thds_need_processing list. If it needs closing we'll close
 
403
        it outside of event_loop().
 
404
      */
 
405
      thds_need_processing= list_add(thds_need_processing,
 
406
                                     &thd->scheduler.list);
 
407
    }
 
408
    else
 
409
    {
 
410
      /* Add to libevent */
 
411
      if (event_add(thd->scheduler.io_event, NULL))
 
412
      {
 
413
        sql_print_error("event_add error in libevent_add_thd_callback\n");
 
414
        libevent_connection_close(thd);
 
415
      } 
 
416
      else
 
417
      {
 
418
        thds_waiting_for_io= list_add(thds_waiting_for_io,
 
419
                                      &thd->scheduler.list);
 
420
      }
 
421
    }
 
422
    pthread_mutex_lock(&LOCK_thd_add);
 
423
  }
 
424
  pthread_mutex_unlock(&LOCK_thd_add);
 
425
}
 
426
 
 
427
 
 
428
/**
 
429
  Notify the thread pool about a new connection
 
430
 
 
431
  NOTES
 
432
    LOCK_thread_count is locked on entry. This function MUST unlock it!
 
433
*/
 
434
 
 
435
static void libevent_add_connection(THD *thd)
 
436
{
 
437
  DBUG_ENTER("libevent_add_connection");
 
438
  DBUG_PRINT("enter", ("thd: 0x%lx  thread_id: %lu",
 
439
                       (long) thd, thd->thread_id));
 
440
  
 
441
  if (thd->scheduler.init(thd))
 
442
  {
 
443
    sql_print_error("Scheduler init error in libevent_add_new_connection\n");
 
444
    pthread_mutex_unlock(&LOCK_thread_count);
 
445
    libevent_connection_close(thd);
 
446
    DBUG_VOID_RETURN;
 
447
  }
 
448
  threads.append(thd);
 
449
  libevent_thd_add(thd);
 
450
  
 
451
  pthread_mutex_unlock(&LOCK_thread_count);
 
452
  DBUG_VOID_RETURN;
 
453
}
 
454
 
 
455
 
 
456
/**
 
457
  @brief Signal a waiting connection it's time to die.
 
458
 
 
459
  @details This function will signal libevent the THD should be killed.
 
460
    Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
 
461
    upon entry.
 
462
 
 
463
  @param[in]  thd The connection to kill
 
464
*/
 
465
 
 
466
static void libevent_post_kill_notification(THD *)
 
467
{
 
468
  /*
 
469
    Note, we just wake up libevent with an event that a THD should be killed,
 
470
    It will search its list of thds for thd->killed ==  KILL_CONNECTION to
 
471
    find the THDs it should kill.
 
472
    
 
473
    So we don't actually tell it which one and we don't actually use the
 
474
    THD being passed to us, but that's just a design detail that could change
 
475
    later.
 
476
  */
 
477
  char c= 0;
 
478
  write(thd_kill_pipe[1], &c, sizeof(c));
 
479
}
 
480
 
 
481
 
 
482
/*
 
483
  Close and delete a connection.
 
484
*/
 
485
 
 
486
static void libevent_connection_close(THD *thd)
 
487
{
 
488
  DBUG_ENTER("libevent_connection_close");
 
489
  DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
 
490
 
 
491
  thd->killed= THD::KILL_CONNECTION;          // Avoid error messages
 
492
 
 
493
  if (thd->net.vio->sd >= 0)                  // not already closed
 
494
  {
 
495
    end_connection(thd);
 
496
    close_connection(thd, 0, 1);
 
497
  }
 
498
  thd->scheduler.thread_detach();
 
499
  unlink_thd(thd);   /* locks LOCK_thread_count and deletes thd */
 
500
  pthread_mutex_unlock(&LOCK_thread_count);
 
501
 
 
502
  DBUG_VOID_RETURN;
 
503
}
 
504
 
 
505
 
 
506
/*
 
507
  Returns true if we should close and delete a THD connection.
 
508
*/
 
509
 
 
510
static bool libevent_should_close_connection(THD* thd)
 
511
{
 
512
  return thd->net.error ||
 
513
         thd->net.vio == 0 ||
 
514
         thd->killed == THD::KILL_CONNECTION;
 
515
}
 
516
 
 
517
 
 
518
/*
 
519
  libevent_thread_proc is the outer loop of each thread in the thread pool.
 
520
  These procs only return/terminate on shutdown (kill_pool_threads == true).
 
521
*/
 
522
 
 
523
pthread_handler_t libevent_thread_proc(void *arg)
 
524
{
 
525
  if (init_new_connection_handler_thread())
 
526
  {
 
527
    my_thread_global_end();
 
528
    sql_print_error("libevent_thread_proc: my_thread_init() failed\n");
 
529
    exit(1);
 
530
  }
 
531
  DBUG_ENTER("libevent_thread_proc");
 
532
 
 
533
  /*
 
534
    Signal libevent_init() when all threads has been created and are ready to
 
535
    receive events.
 
536
  */
 
537
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
538
  created_threads++;
 
539
  if (created_threads == thread_pool_size)
 
540
    (void) pthread_cond_signal(&COND_thread_count);
 
541
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
542
  
 
543
  for (;;)
 
544
  {
 
545
    THD *thd= NULL;
 
546
    (void) pthread_mutex_lock(&LOCK_event_loop);
 
547
    
 
548
    /* get thd(s) to process */
 
549
    while (!thds_need_processing)
 
550
    {
 
551
      if (kill_pool_threads)
 
552
      {
 
553
        /* the flag that we should die has been set */
 
554
        (void) pthread_mutex_unlock(&LOCK_event_loop);
 
555
        goto thread_exit;
 
556
      }
 
557
      event_loop(EVLOOP_ONCE);
 
558
    }
 
559
    
 
560
    /* pop the first thd off the list */
 
561
    thd= (THD*)thds_need_processing->data;
 
562
    thds_need_processing= list_delete(thds_need_processing,
 
563
                                      thds_need_processing);
 
564
    
 
565
    (void) pthread_mutex_unlock(&LOCK_event_loop);
 
566
    
 
567
    /* now we process the connection (thd) */
 
568
    
 
569
    /* set up the thd<->thread links. */
 
570
    thd->thread_stack= (char*) &thd;
 
571
    
 
572
    if (thd->scheduler.thread_attach())
 
573
    {
 
574
      libevent_connection_close(thd);
 
575
      continue;
 
576
    }
 
577
 
 
578
    /* is the connection logged in yet? */
 
579
    if (!thd->scheduler.logged_in)
 
580
    {
 
581
      DBUG_PRINT("info", ("init new connection.  sd: %d",
 
582
                          thd->net.vio->sd));
 
583
      if (login_connection(thd))
 
584
      {
 
585
        /* Failed to log in */
 
586
        libevent_connection_close(thd);
 
587
        continue;
 
588
      }
 
589
      else
 
590
      {
 
591
        /* login successful */
 
592
        thd->scheduler.logged_in= TRUE;
 
593
        prepare_new_connection_state(thd);
 
594
        if (!libevent_needs_immediate_processing(thd))
 
595
          continue; /* New connection is now waiting for data in libevent*/
 
596
      }
 
597
    }
 
598
 
 
599
    do
 
600
    {
 
601
      /* Process a query */
 
602
      if (do_command(thd))
 
603
      {
 
604
        libevent_connection_close(thd);
 
605
        break;
 
606
      }
 
607
    } while (libevent_needs_immediate_processing(thd));
 
608
  }
 
609
  
 
610
thread_exit:
 
611
  DBUG_PRINT("exit", ("ending thread"));
 
612
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
613
  killed_threads++;
 
614
  pthread_cond_broadcast(&COND_thread_count);
 
615
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
616
  my_thread_end();
 
617
  pthread_exit(0);
 
618
  DBUG_RETURN(0);                               /* purify: deadcode */
 
619
}
 
620
 
 
621
 
 
622
/*
 
623
  Returns TRUE if the connection needs immediate processing and FALSE if 
 
624
  instead it's queued for libevent processing or closed,
 
625
*/
 
626
 
 
627
static bool libevent_needs_immediate_processing(THD *thd)
 
628
{
 
629
  if (libevent_should_close_connection(thd))
 
630
  {
 
631
    libevent_connection_close(thd);
 
632
    return FALSE;
 
633
  }
 
634
  /*
 
635
    If more data in the socket buffer, return TRUE to process another command.
 
636
 
 
637
    Note: we cannot add for event processing because the whole request might
 
638
    already be buffered and we wouldn't receive an event.
 
639
  */
 
640
  if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
 
641
    return TRUE;
 
642
  
 
643
  thd->scheduler.thread_detach();
 
644
  libevent_thd_add(thd);
 
645
  return FALSE;
 
646
}
 
647
 
 
648
 
 
649
/*
 
650
  Adds a THD to queued for libevent processing.
 
651
  
 
652
  This call does not actually register the event with libevent.
 
653
  Instead, it places the THD onto a queue and signals libevent by writing
 
654
  a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
 
655
  be invoked which will find the THD on the queue and add it to libevent.
 
656
*/
 
657
 
 
658
static void libevent_thd_add(THD* thd)
 
659
{
 
660
  char c=0;
 
661
  pthread_mutex_lock(&LOCK_thd_add);
 
662
  /* queue for libevent */
 
663
  thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
 
664
  /* notify libevent */
 
665
  write(thd_add_pipe[1], &c, sizeof(c));
 
666
  pthread_mutex_unlock(&LOCK_thd_add);
 
667
}
 
668
 
 
669
 
 
670
/**
 
671
  Wait until all pool threads have been deleted for clean shutdown
 
672
*/
 
673
 
 
674
static void libevent_end()
 
675
{
 
676
  DBUG_ENTER("libevent_end");
 
677
  DBUG_PRINT("enter", ("created_threads: %d  killed_threads: %u",
 
678
                       created_threads, killed_threads));
 
679
  
 
680
  
 
681
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
682
  
 
683
  kill_pool_threads= TRUE;
 
684
  while (killed_threads != created_threads)
 
685
  {
 
686
    /* wake up the event loop */
 
687
    char c= 0;
 
688
    write(thd_add_pipe[1], &c, sizeof(c));
 
689
 
 
690
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
 
691
  }
 
692
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
693
  
 
694
  event_del(&thd_add_event);
 
695
  close(thd_add_pipe[0]);
 
696
  close(thd_add_pipe[1]);
 
697
  event_del(&thd_kill_event);
 
698
  close(thd_kill_pipe[0]);
 
699
  close(thd_kill_pipe[1]);
 
700
 
 
701
  (void) pthread_mutex_destroy(&LOCK_event_loop);
 
702
  (void) pthread_mutex_destroy(&LOCK_thd_add);
 
703
  DBUG_VOID_RETURN;
 
704
}
 
705
 
 
706
 
 
707
void pool_of_threads_scheduler(scheduler_functions* func)
 
708
{
 
709
  func->max_threads= thread_pool_size;
 
710
  func->init= libevent_init;
 
711
  func->end=  libevent_end;
 
712
  func->post_kill_notification= libevent_post_kill_notification;
 
713
  func->add_connection= libevent_add_connection;
 
714
}