~jlukas79/+junk/mysql-server

« back to all changes in this revision

Viewing changes to sql/scheduler.cc

manual merge 6.0-main --> 6.0-bka-review

Show diffs side-by-side

added added

removed removed

Lines of Context:
31
31
 */
32
32
 
33
33
static bool init_dummy(void) {return 0;}
34
 
static void post_kill_dummy(THD *thd) {}  
 
34
static void post_kill_dummy(THD *thd) {}
35
35
static void end_dummy(void) {}
36
36
static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; }
37
37
 
94
94
 
95
95
#include "event.h"
96
96
 
 
97
static struct event_base *base;
 
98
 
97
99
static uint created_threads, killed_threads;
98
100
static bool kill_pool_threads;
99
101
 
135
137
  int flags;
136
138
  return pipe(pipe_fds) < 0 ||
137
139
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
138
 
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
 
140
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1 ||
139
141
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
140
142
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
141
143
}
148
150
 
149
151
thd_scheduler::thd_scheduler()
150
152
  : logged_in(FALSE), io_event(NULL), thread_attached(FALSE)
151
 
{  
 
153
{
152
154
#ifndef DBUG_OFF
153
 
  dbug_explain_buf[0]= 0;
 
155
  dbug_explain[0]= '\0';
 
156
  set_explain= FALSE;
154
157
#endif
155
158
}
156
159
 
165
168
{
166
169
  io_event=
167
170
    (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
168
 
    
 
171
 
169
172
  if (!io_event)
170
173
  {
171
174
    sql_print_error("Memory allocation error in thd_scheduler::init\n");
172
175
    return TRUE;
173
176
  }
174
 
  
175
 
  event_set(io_event, parent_thd->net.vio->sd, EV_READ, 
 
177
 
 
178
  event_set(io_event, parent_thd->net.vio->sd, EV_READ,
176
179
            libevent_io_callback, (void*)parent_thd);
177
 
    
 
180
 
178
181
  list.data= parent_thd;
179
 
  
 
182
 
180
183
  return FALSE;
181
184
}
182
185
 
198
201
  thd->mysys_var->abort= 0;
199
202
  thread_attached= TRUE;
200
203
#ifndef DBUG_OFF
201
 
  swap_dbug_explain();
 
204
  /*
 
205
    When we attach the thread for a connection for the first time,
 
206
    we know that there is no session value set yet. Thus
 
207
    the initial setting of set_explain to FALSE is OK.
 
208
  */
 
209
  if (set_explain)
 
210
    DBUG_SET(dbug_explain);
202
211
#endif
203
212
  return FALSE;
204
213
}
213
222
  if (thread_attached)
214
223
  {
215
224
    THD* thd = (THD*)list.data;
 
225
    pthread_mutex_lock(&thd->LOCK_delete);
216
226
    thd->mysys_var= NULL;
 
227
    pthread_mutex_unlock(&thd->LOCK_delete);
217
228
    thread_attached= FALSE;
218
229
#ifndef DBUG_OFF
219
 
    swap_dbug_explain();
 
230
    /*
 
231
      If during the session @@session.dbug was assigned, the
 
232
      dbug options/state has been pushed. Check if this is the
 
233
      case, to be able to restore the state when we attach this
 
234
      logical connection to a physical thread.
 
235
    */
 
236
    if (_db_is_pushed_())
 
237
    {
 
238
      set_explain= TRUE;
 
239
      if (DBUG_EXPLAIN(dbug_explain, sizeof(dbug_explain)))
 
240
        sql_print_error("thd_scheduler: DBUG_EXPLAIN buffer is too small");
 
241
    }
 
242
    /* DBUG_POP() is a no-op in case there is no session state */
 
243
    DBUG_POP();
220
244
#endif
221
245
  }
222
246
}
223
247
 
224
 
 
225
 
/*
226
 
  Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
227
 
 
228
 
  This is used to preserve the SESSION DEBUG variable, which is mapped to the OS 
229
 
  thread during a command, but each command is handled by a different thread.
230
 
*/
231
 
 
232
 
#ifndef DBUG_OFF
233
 
void thd_scheduler::swap_dbug_explain()
234
 
{
235
 
  char buffer[sizeof(dbug_explain_buf)];
236
 
  if (DBUG_EXPLAIN(buffer, sizeof(buffer)))
237
 
    sql_print_error("DBUG_EXPLAIN buffer too small.\n");
238
 
  DBUG_POP();
239
 
  DBUG_PUSH(dbug_explain_buf);
240
 
  memcpy(dbug_explain_buf, buffer, sizeof(buffer));
241
 
}
242
 
#endif
243
 
 
244
248
/**
245
249
  Create all threads for the thread pool
246
250
 
259
263
  uint i;
260
264
  DBUG_ENTER("libevent_init");
261
265
 
262
 
  event_init();
263
 
  
 
266
  base= (struct event_base *) event_init();
 
267
 
264
268
  created_threads= 0;
265
269
  killed_threads= 0;
266
270
  kill_pool_threads= FALSE;
267
 
  
 
271
 
268
272
  pthread_mutex_init(&LOCK_event_loop, NULL);
269
273
  pthread_mutex_init(&LOCK_thd_add, NULL);
270
 
  
 
274
 
271
275
  /* set up the pipe used to add new thds to the event pool */
272
276
  if (init_pipe(thd_add_pipe))
273
277
  {
286
290
            libevent_add_thd_callback, NULL);
287
291
  event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
288
292
            libevent_kill_thd_callback, NULL);
289
 
 
290
 
 if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
291
 
 {
292
 
   sql_print_error("thd_add_event event_add error in libevent_init\n");
293
 
   libevent_end();
294
 
   DBUG_RETURN(1);
295
 
   
296
 
 }
 
293
 
 
294
  if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
 
295
  {
 
296
    sql_print_error("thd_add_event event_add error in libevent_init\n");
 
297
    libevent_end();
 
298
    DBUG_RETURN(1);
 
299
  }
297
300
  /* Set up the thread pool */
298
301
  created_threads= killed_threads= 0;
299
302
  pthread_mutex_lock(&LOCK_thread_count);
317
320
  while (created_threads != thread_pool_size)
318
321
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
319
322
  pthread_mutex_unlock(&LOCK_thread_count);
320
 
  
 
323
 
321
324
  DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size));
322
325
  DBUG_RETURN(FALSE);
323
326
}
325
328
 
326
329
/*
327
330
  This is called when data is ready on the socket.
328
 
  
 
331
 
329
332
  NOTES
330
333
    This is only called by the thread that owns LOCK_event_loop.
331
 
  
332
 
    We add the thd that got the data to thds_need_processing, and 
 
334
 
 
335
    We add the thd that got the data to thds_need_processing, and
333
336
    cause the libevent event_loop() to terminate. Then this same thread will
334
337
    return from event_loop and pick the thd value back up for processing.
335
338
*/
336
339
 
337
340
void libevent_io_callback(int, short, void *ctx)
338
 
{    
 
341
{
339
342
  safe_mutex_assert_owner(&LOCK_event_loop);
340
343
  THD *thd= (THD*)ctx;
341
344
  thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
344
347
 
345
348
/*
346
349
  This is called when we have a thread we want to be killed.
347
 
  
 
350
 
348
351
  NOTES
349
352
    This is only called by the thread that owns LOCK_event_loop.
350
353
*/
351
354
 
352
355
void libevent_kill_thd_callback(int Fd, short, void*)
353
 
{    
 
356
{
354
357
  safe_mutex_assert_owner(&LOCK_event_loop);
355
358
 
356
359
  /* clear the pending events */
382
385
  This is used to add connections to the pool. This callback is invoked from
383
386
  the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
384
387
  written to it.
385
 
  
 
388
 
386
389
  NOTES
387
390
    This is only called by the thread that owns LOCK_event_loop.
388
391
*/
389
392
 
390
393
void libevent_add_thd_callback(int Fd, short, void *)
391
 
 
394
{
392
395
  safe_mutex_assert_owner(&LOCK_event_loop);
393
396
 
394
397
  /* clear the pending events */
404
407
    thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
405
408
 
406
409
    pthread_mutex_unlock(&LOCK_thd_add);
407
 
    
 
410
 
408
411
    if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
409
412
    {
410
413
      /*
421
424
      {
422
425
        sql_print_error("event_add error in libevent_add_thd_callback\n");
423
426
        libevent_connection_close(thd);
424
 
      } 
 
427
      }
425
428
      else
426
429
      {
427
430
        thds_waiting_for_io= list_add(thds_waiting_for_io,
444
447
static void libevent_add_connection(THD *thd)
445
448
{
446
449
  DBUG_ENTER("libevent_add_connection");
447
 
  DBUG_PRINT("enter", ("thd: 0x%lx  thread_id: %lu",
448
 
                       (long) thd, thd->thread_id));
449
 
  
 
450
  DBUG_PRINT("enter", ("thd: %p  thread_id: %lu",
 
451
                       thd, thd->thread_id));
 
452
 
450
453
  if (thd->scheduler.init(thd))
451
454
  {
452
455
    sql_print_error("Scheduler init error in libevent_add_new_connection\n");
456
459
  }
457
460
  threads.append(thd);
458
461
  libevent_thd_add(thd);
459
 
  
 
462
 
460
463
  pthread_mutex_unlock(&LOCK_thread_count);
461
464
  DBUG_VOID_RETURN;
462
465
}
464
467
 
465
468
/**
466
469
  @brief Signal a waiting connection it's time to die.
467
 
 
 
470
 
468
471
  @details This function will signal libevent the THD should be killed.
469
472
    Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
470
473
    upon entry.
471
 
 
 
474
 
472
475
  @param[in]  thd The connection to kill
473
476
*/
474
477
 
478
481
    Note, we just wake up libevent with an event that a THD should be killed,
479
482
    It will search its list of thds for thd->killed ==  KILL_CONNECTION to
480
483
    find the THDs it should kill.
481
 
    
 
484
 
482
485
    So we don't actually tell it which one and we don't actually use the
483
486
    THD being passed to us, but that's just a design detail that could change
484
487
    later.
495
498
static void libevent_connection_close(THD *thd)
496
499
{
497
500
  DBUG_ENTER("libevent_connection_close");
498
 
  DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
 
501
  DBUG_PRINT("enter", ("thd: %p", thd));
499
502
 
500
503
  thd->killed= THD::KILL_CONNECTION;          // Avoid error messages
501
504
 
548
551
  if (created_threads == thread_pool_size)
549
552
    (void) pthread_cond_signal(&COND_thread_count);
550
553
  (void) pthread_mutex_unlock(&LOCK_thread_count);
551
 
  
 
554
 
552
555
  for (;;)
553
556
  {
554
557
    THD *thd= NULL;
555
558
    (void) pthread_mutex_lock(&LOCK_event_loop);
556
 
    
 
559
 
557
560
    /* get thd(s) to process */
558
561
    while (!thds_need_processing)
559
562
    {
565
568
      }
566
569
      event_loop(EVLOOP_ONCE);
567
570
    }
568
 
    
 
571
 
569
572
    /* pop the first thd off the list */
570
573
    thd= (THD*)thds_need_processing->data;
571
574
    thds_need_processing= list_delete(thds_need_processing,
572
575
                                      thds_need_processing);
573
 
    
 
576
 
574
577
    (void) pthread_mutex_unlock(&LOCK_event_loop);
575
 
    
 
578
 
576
579
    /* now we process the connection (thd) */
577
 
    
 
580
 
578
581
    /* set up the thd<->thread links. */
579
582
    thd->thread_stack= (char*) &thd;
580
 
    
 
583
 
581
584
    if (thd->scheduler.thread_attach())
582
585
    {
583
586
      libevent_connection_close(thd);
615
618
      }
616
619
    } while (libevent_needs_immediate_processing(thd));
617
620
  }
618
 
  
 
621
 
619
622
thread_exit:
620
623
  DBUG_PRINT("exit", ("ending thread"));
621
624
  (void) pthread_mutex_lock(&LOCK_thread_count);
629
632
 
630
633
 
631
634
/*
632
 
  Returns TRUE if the connection needs immediate processing and FALSE if 
 
635
  Returns TRUE if the connection needs immediate processing and FALSE if
633
636
  instead it's queued for libevent processing or closed,
634
637
*/
635
638
 
646
649
    Note: we cannot add for event processing because the whole request might
647
650
    already be buffered and we wouldn't receive an event.
648
651
  */
649
 
  if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
 
652
  if (vio_pending(thd->net.vio) > 0)
650
653
    return TRUE;
651
 
  
 
654
 
652
655
  thd->scheduler.thread_detach();
653
656
  libevent_thd_add(thd);
654
657
  return FALSE;
657
660
 
658
661
/*
659
662
  Adds a THD to queued for libevent processing.
660
 
  
 
663
 
661
664
  This call does not actually register the event with libevent.
662
665
  Instead, it places the THD onto a queue and signals libevent by writing
663
666
  a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
668
671
{
669
672
  char c=0;
670
673
  /* release any audit resources, this thd is going to sleep */
671
 
  mysql_audit_release(thd);  
 
674
  mysql_audit_release(thd);
672
675
  pthread_mutex_lock(&LOCK_thd_add);
673
676
  /* queue for libevent */
674
677
  thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
687
690
  DBUG_ENTER("libevent_end");
688
691
  DBUG_PRINT("enter", ("created_threads: %d  killed_threads: %u",
689
692
                       created_threads, killed_threads));
690
 
  
691
 
  
 
693
 
692
694
  (void) pthread_mutex_lock(&LOCK_thread_count);
693
 
  
 
695
 
694
696
  kill_pool_threads= TRUE;
695
697
  while (killed_threads != created_threads)
696
698
  {
701
703
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
702
704
  }
703
705
  (void) pthread_mutex_unlock(&LOCK_thread_count);
704
 
  
 
706
 
705
707
  event_del(&thd_add_event);
706
708
  close(thd_add_pipe[0]);
707
709
  close(thd_add_pipe[1]);
708
710
  event_del(&thd_kill_event);
709
711
  close(thd_kill_pipe[0]);
710
712
  close(thd_kill_pipe[1]);
 
713
  event_base_free(base);
711
714
 
712
715
  (void) pthread_mutex_destroy(&LOCK_event_loop);
713
716
  (void) pthread_mutex_destroy(&LOCK_thd_add);