~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to sql/event_scheduler.cc

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2004-2006 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include "mysql_priv.h"
 
17
#include "events.h"
 
18
#include "event_data_objects.h"
 
19
#include "event_scheduler.h"
 
20
#include "event_queue.h"
 
21
#include "event_db_repository.h"
 
22
 
 
23
/**
 
24
  @addtogroup Event_Scheduler
 
25
  @{
 
26
*/
 
27
 
 
28
#ifdef __GNUC__
 
29
#if __GNUC__ >= 2
 
30
#define SCHED_FUNC __FUNCTION__
 
31
#endif
 
32
#else
 
33
#define SCHED_FUNC "<unknown>"
 
34
#endif
 
35
 
 
36
#define LOCK_DATA()       lock_data(SCHED_FUNC, __LINE__)
 
37
#define UNLOCK_DATA()     unlock_data(SCHED_FUNC, __LINE__)
 
38
#define COND_STATE_WAIT(mythd, abstime, msg) \
 
39
        cond_wait(mythd, abstime, msg, SCHED_FUNC, __LINE__)
 
40
 
 
41
extern pthread_attr_t connection_attrib;
 
42
 
 
43
 
 
44
Event_db_repository *Event_worker_thread::db_repository;
 
45
 
 
46
 
 
47
static
 
48
const LEX_STRING scheduler_states_names[] =
 
49
{
 
50
  { C_STRING_WITH_LEN("INITIALIZED") },
 
51
  { C_STRING_WITH_LEN("RUNNING") },
 
52
  { C_STRING_WITH_LEN("STOPPING") }
 
53
};
 
54
 
 
55
struct scheduler_param {
 
56
  THD *thd;
 
57
  Event_scheduler *scheduler;
 
58
};
 
59
 
 
60
 
 
61
/*
 
62
  Prints the stack of infos, warnings, errors from thd to
 
63
  the console so it can be fetched by the logs-into-tables and
 
64
  checked later.
 
65
 
 
66
  SYNOPSIS
 
67
    evex_print_warnings
 
68
      thd  Thread used during the execution of the event
 
69
      et   The event itself
 
70
*/
 
71
 
 
72
void
 
73
Event_worker_thread::print_warnings(THD *thd, Event_job_data *et)
 
74
{
 
75
  MYSQL_ERROR *err;
 
76
  DBUG_ENTER("evex_print_warnings");
 
77
  if (!thd->warn_list.elements)
 
78
    DBUG_VOID_RETURN;
 
79
 
 
80
  char msg_buf[10 * STRING_BUFFER_USUAL_SIZE];
 
81
  char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE];
 
82
  String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info);
 
83
  prefix.length(0);
 
84
  prefix.append("Event Scheduler: [");
 
85
 
 
86
  prefix.append(et->definer.str, et->definer.length, system_charset_info);
 
87
  prefix.append("][", 2);
 
88
  prefix.append(et->dbname.str, et->dbname.length, system_charset_info);
 
89
  prefix.append('.');
 
90
  prefix.append(et->name.str, et->name.length, system_charset_info);
 
91
  prefix.append("] ", 2);
 
92
 
 
93
  List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
 
94
  while ((err= it++))
 
95
  {
 
96
    String err_msg(msg_buf, sizeof(msg_buf), system_charset_info);
 
97
    /* set it to 0 or we start adding at the end. That's the trick ;) */
 
98
    err_msg.length(0);
 
99
    err_msg.append(prefix);
 
100
    err_msg.append(err->msg, strlen(err->msg), system_charset_info);
 
101
    DBUG_ASSERT(err->level < 3);
 
102
    (sql_print_message_handlers[err->level])("%*s", err_msg.length(),
 
103
                                              err_msg.c_ptr());
 
104
  }
 
105
  DBUG_VOID_RETURN;
 
106
}
 
107
 
 
108
 
 
109
/*
 
110
  Performs post initialization of structures in a new thread.
 
111
 
 
112
  SYNOPSIS
 
113
    post_init_event_thread()
 
114
      thd  Thread
 
115
 
 
116
  NOTES
 
117
      Before this is called, one should not do any DBUG_XXX() calls.
 
118
 
 
119
*/
 
120
 
 
121
bool
 
122
post_init_event_thread(THD *thd)
 
123
{
 
124
  (void) init_new_connection_handler_thread();
 
125
  if (init_thr_lock() || thd->store_globals())
 
126
  {
 
127
    thd->cleanup();
 
128
    return TRUE;
 
129
  }
 
130
  lex_start(thd);
 
131
 
 
132
  pthread_mutex_lock(&LOCK_thread_count);
 
133
  threads.append(thd);
 
134
  thread_count++;
 
135
  thread_running++;
 
136
  pthread_mutex_unlock(&LOCK_thread_count);
 
137
 
 
138
  return FALSE;
 
139
}
 
140
 
 
141
 
 
142
/*
 
143
  Cleans up the THD and the threaded environment of the thread.
 
144
 
 
145
  SYNOPSIS
 
146
    deinit_event_thread()
 
147
      thd  Thread
 
148
*/
 
149
 
 
150
void
 
151
deinit_event_thread(THD *thd)
 
152
{
 
153
  thd->proc_info= "Clearing";
 
154
  DBUG_ASSERT(thd->net.buff != 0);
 
155
  net_end(&thd->net);
 
156
  DBUG_PRINT("exit", ("Event thread finishing"));
 
157
  pthread_mutex_lock(&LOCK_thread_count);
 
158
  thread_count--;
 
159
  thread_running--;
 
160
  delete thd;
 
161
  pthread_cond_broadcast(&COND_thread_count);
 
162
  pthread_mutex_unlock(&LOCK_thread_count);
 
163
}
 
164
 
 
165
 
 
166
/*
 
167
  Performs pre- pthread_create() initialisation of THD. Do this
 
168
  in the thread that will pass THD to the child thread. In the
 
169
  child thread call post_init_event_thread().
 
170
 
 
171
  SYNOPSIS
 
172
    pre_init_event_thread()
 
173
      thd  The THD of the thread. Has to be allocated by the caller.
 
174
 
 
175
  NOTES
 
176
    1. The host of the thead is my_localhost
 
177
    2. thd->net is initted with NULL - no communication.
 
178
*/
 
179
 
 
180
void
 
181
pre_init_event_thread(THD* thd)
 
182
{
 
183
  DBUG_ENTER("pre_init_event_thread");
 
184
  thd->client_capabilities= 0;
 
185
  thd->security_ctx->master_access= 0;
 
186
  thd->security_ctx->db_access= 0;
 
187
  thd->security_ctx->host_or_ip= (char*)my_localhost;
 
188
  my_net_init(&thd->net, NULL);
 
189
  thd->security_ctx->set_user((char*)"event_scheduler");
 
190
  thd->net.read_timeout= slave_net_timeout;
 
191
  thd->slave_thread= 0;
 
192
  thd->options|= OPTION_AUTO_IS_NULL;
 
193
  thd->client_capabilities|= CLIENT_MULTI_RESULTS;
 
194
  pthread_mutex_lock(&LOCK_thread_count);
 
195
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
 
196
  pthread_mutex_unlock(&LOCK_thread_count);
 
197
 
 
198
  /*
 
199
    Guarantees that we will see the thread in SHOW PROCESSLIST though its
 
200
    vio is NULL.
 
201
  */
 
202
 
 
203
  thd->proc_info= "Initialized";
 
204
  thd->version= refresh_version;
 
205
  thd->set_time();
 
206
 
 
207
  DBUG_VOID_RETURN;
 
208
}
 
209
 
 
210
 
 
211
/*
 
212
  Function that executes the scheduler,
 
213
 
 
214
  SYNOPSIS
 
215
    event_scheduler_thread()
 
216
      arg  Pointer to `struct scheduler_param`
 
217
 
 
218
  RETURN VALUE
 
219
    0  OK
 
220
*/
 
221
 
 
222
pthread_handler_t
 
223
event_scheduler_thread(void *arg)
 
224
{
 
225
  /* needs to be first for thread_stack */
 
226
  THD *thd= (THD *) ((struct scheduler_param *) arg)->thd;
 
227
  Event_scheduler *scheduler= ((struct scheduler_param *) arg)->scheduler;
 
228
  bool res;
 
229
 
 
230
  thd->thread_stack= (char *)&thd;              // remember where our stack is
 
231
  res= post_init_event_thread(thd);
 
232
 
 
233
  DBUG_ENTER("event_scheduler_thread");
 
234
  my_free((char*)arg, MYF(0));
 
235
  if (!res)
 
236
    scheduler->run(thd);
 
237
 
 
238
  DBUG_LEAVE;                               // Against gcc warnings
 
239
  my_thread_end();
 
240
  return 0;
 
241
}
 
242
 
 
243
 
 
244
/**
 
245
  Function that executes an event in a child thread. Setups the
 
246
  environment for the event execution and cleans after that.
 
247
 
 
248
  SYNOPSIS
 
249
    event_worker_thread()
 
250
      arg  The Event_job_data object to be processed
 
251
 
 
252
  RETURN VALUE
 
253
    0  OK
 
254
*/
 
255
 
 
256
pthread_handler_t
 
257
event_worker_thread(void *arg)
 
258
{
 
259
  THD *thd;
 
260
  Event_queue_element_for_exec *event= (Event_queue_element_for_exec *)arg;
 
261
 
 
262
  thd= event->thd;
 
263
 
 
264
  Event_worker_thread worker_thread;
 
265
  worker_thread.run(thd, event);
 
266
 
 
267
  my_thread_end();
 
268
  return 0;                                     // Can't return anything here
 
269
}
 
270
 
 
271
 
 
272
/**
 
273
  Function that executes an event in a child thread. Setups the
 
274
  environment for the event execution and cleans after that.
 
275
 
 
276
  SYNOPSIS
 
277
    Event_worker_thread::run()
 
278
      thd    Thread context
 
279
      event  The Event_queue_element_for_exec object to be processed
 
280
*/
 
281
 
 
282
void
 
283
Event_worker_thread::run(THD *thd, Event_queue_element_for_exec *event)
 
284
{
 
285
  /* needs to be first for thread_stack */
 
286
  char my_stack;
 
287
  Event_job_data job_data;
 
288
  bool res;
 
289
 
 
290
  thd->thread_stack= &my_stack;                // remember where our stack is
 
291
  res= post_init_event_thread(thd);
 
292
 
 
293
  DBUG_ENTER("Event_worker_thread::run");
 
294
  DBUG_PRINT("info", ("Time is %ld, THD: 0x%lx", (long) my_time(0), (long) thd));
 
295
 
 
296
  if (res)
 
297
    goto end;
 
298
 
 
299
  if ((res= db_repository->load_named_event(thd, event->dbname, event->name,
 
300
                                            &job_data)))
 
301
  {
 
302
    DBUG_PRINT("error", ("Got error from load_named_event"));
 
303
    goto end;
 
304
  }
 
305
 
 
306
  thd->enable_slow_log= TRUE;
 
307
 
 
308
  res= job_data.execute(thd, event->dropped);
 
309
 
 
310
  print_warnings(thd, &job_data);
 
311
 
 
312
  if (res)
 
313
    sql_print_information("Event Scheduler: "
 
314
                          "[%s].[%s.%s] event execution failed.",
 
315
                          job_data.definer.str,
 
316
                          job_data.dbname.str, job_data.name.str);
 
317
end:
 
318
  DBUG_PRINT("info", ("Done with Event %s.%s", event->dbname.str,
 
319
             event->name.str));
 
320
 
 
321
  delete event;
 
322
  deinit_event_thread(thd);
 
323
 
 
324
  DBUG_VOID_RETURN;
 
325
}
 
326
 
 
327
 
 
328
Event_scheduler::Event_scheduler(Event_queue *queue_arg)
 
329
  :state(INITIALIZED),
 
330
  scheduler_thd(NULL),
 
331
  queue(queue_arg),
 
332
  mutex_last_locked_at_line(0),
 
333
  mutex_last_unlocked_at_line(0),
 
334
  mutex_last_locked_in_func("n/a"),
 
335
  mutex_last_unlocked_in_func("n/a"),
 
336
  mutex_scheduler_data_locked(FALSE),
 
337
  waiting_on_cond(FALSE),
 
338
  started_events(0)
 
339
{
 
340
  pthread_mutex_init(&LOCK_scheduler_state, MY_MUTEX_INIT_FAST);
 
341
  pthread_cond_init(&COND_state, NULL);
 
342
}
 
343
 
 
344
 
 
345
Event_scheduler::~Event_scheduler()
 
346
{
 
347
  stop();                                    /* does nothing if not running */
 
348
  pthread_mutex_destroy(&LOCK_scheduler_state);
 
349
  pthread_cond_destroy(&COND_state);
 
350
}
 
351
 
 
352
 
 
353
/*
 
354
  Starts the scheduler (again). Creates a new THD and passes it to
 
355
  a forked thread. Does not wait for acknowledgement from the new
 
356
  thread that it has started. Asynchronous starting. Most of the
 
357
  needed initializations are done in the current thread to minimize
 
358
  the chance of failure in the spawned thread.
 
359
 
 
360
  SYNOPSIS
 
361
    Event_scheduler::start()
 
362
 
 
363
  RETURN VALUE
 
364
    FALSE  OK
 
365
    TRUE   Error (not reported)
 
366
*/
 
367
 
 
368
bool
 
369
Event_scheduler::start()
 
370
{
 
371
  THD *new_thd= NULL;
 
372
  bool ret= FALSE;
 
373
  pthread_t th;
 
374
  struct scheduler_param *scheduler_param_value;
 
375
  DBUG_ENTER("Event_scheduler::start");
 
376
 
 
377
  LOCK_DATA();
 
378
  DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state].str));
 
379
  if (state > INITIALIZED)
 
380
    goto end;
 
381
 
 
382
  if (!(new_thd= new THD))
 
383
  {
 
384
    sql_print_error("Event Scheduler: Cannot initialize the scheduler thread");
 
385
    ret= TRUE;
 
386
    goto end;
 
387
  }
 
388
  pre_init_event_thread(new_thd);
 
389
  new_thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER;
 
390
  new_thd->command= COM_DAEMON;
 
391
 
 
392
  /*
 
393
    We should run the event scheduler thread under the super-user privileges.
 
394
    In particular, this is needed to be able to lock the mysql.event table
 
395
    for writing when the server is running in the read-only mode.
 
396
  */
 
397
  new_thd->security_ctx->master_access |= SUPER_ACL;
 
398
 
 
399
  scheduler_param_value=
 
400
    (struct scheduler_param *)my_malloc(sizeof(struct scheduler_param), MYF(0));
 
401
  scheduler_param_value->thd= new_thd;
 
402
  scheduler_param_value->scheduler= this;
 
403
 
 
404
  scheduler_thd= new_thd;
 
405
  DBUG_PRINT("info", ("Setting state go RUNNING"));
 
406
  state= RUNNING;
 
407
  DBUG_PRINT("info", ("Forking new thread for scheduler. THD: 0x%lx", (long) new_thd));
 
408
  if (pthread_create(&th, &connection_attrib, event_scheduler_thread,
 
409
                    (void*)scheduler_param_value))
 
410
  {
 
411
    DBUG_PRINT("error", ("cannot create a new thread"));
 
412
    state= INITIALIZED;
 
413
    scheduler_thd= NULL;
 
414
    ret= TRUE;
 
415
 
 
416
    new_thd->proc_info= "Clearing";
 
417
    DBUG_ASSERT(new_thd->net.buff != 0);
 
418
    net_end(&new_thd->net);
 
419
    pthread_mutex_lock(&LOCK_thread_count);
 
420
    thread_count--;
 
421
    thread_running--;
 
422
    delete new_thd;
 
423
    pthread_cond_broadcast(&COND_thread_count);
 
424
    pthread_mutex_unlock(&LOCK_thread_count);
 
425
  }
 
426
end:
 
427
  UNLOCK_DATA();
 
428
 
 
429
  DBUG_RETURN(ret);
 
430
}
 
431
 
 
432
 
 
433
/*
 
434
  The main loop of the scheduler.
 
435
 
 
436
  SYNOPSIS
 
437
    Event_scheduler::run()
 
438
      thd  Thread
 
439
 
 
440
  RETURN VALUE
 
441
    FALSE  OK
 
442
    TRUE   Error (Serious error)
 
443
*/
 
444
 
 
445
bool
 
446
Event_scheduler::run(THD *thd)
 
447
{
 
448
  int res= FALSE;
 
449
  DBUG_ENTER("Event_scheduler::run");
 
450
 
 
451
  sql_print_information("Event Scheduler: scheduler thread started with id %lu",
 
452
                        thd->thread_id);
 
453
  /*
 
454
    Recalculate the values in the queue because there could have been stops
 
455
    in executions of the scheduler and some times could have passed by.
 
456
  */
 
457
  queue->recalculate_activation_times(thd);
 
458
 
 
459
  while (is_running())
 
460
  {
 
461
    Event_queue_element_for_exec *event_name;
 
462
 
 
463
    /* Gets a minimized version */
 
464
    if (queue->get_top_for_execution_if_time(thd, &event_name))
 
465
    {
 
466
      sql_print_information("Event Scheduler: "
 
467
                            "Serious error during getting next "
 
468
                            "event to execute. Stopping");
 
469
      break;
 
470
    }
 
471
 
 
472
    DBUG_PRINT("info", ("get_top_for_execution_if_time returned "
 
473
                        "event_name=0x%lx", (long) event_name));
 
474
    if (event_name)
 
475
    {
 
476
      if ((res= execute_top(event_name)))
 
477
        break;
 
478
    }
 
479
    else
 
480
    {
 
481
      DBUG_ASSERT(thd->killed);
 
482
      DBUG_PRINT("info", ("job_data is NULL, the thread was killed"));
 
483
    }
 
484
    DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str));
 
485
  }
 
486
 
 
487
  LOCK_DATA();
 
488
  deinit_event_thread(thd);
 
489
  scheduler_thd= NULL;
 
490
  state= INITIALIZED;
 
491
  DBUG_PRINT("info", ("Signalling back to the stopper COND_state"));
 
492
  pthread_cond_signal(&COND_state);
 
493
  UNLOCK_DATA();
 
494
 
 
495
  DBUG_RETURN(res);
 
496
}
 
497
 
 
498
 
 
499
/*
 
500
  Creates a new THD instance and then forks a new thread, while passing
 
501
  the THD pointer and job_data to it.
 
502
 
 
503
  SYNOPSIS
 
504
    Event_scheduler::execute_top()
 
505
 
 
506
  RETURN VALUE
 
507
    FALSE  OK
 
508
    TRUE   Error (Serious error)
 
509
*/
 
510
 
 
511
bool
 
512
Event_scheduler::execute_top(Event_queue_element_for_exec *event_name)
 
513
{
 
514
  THD *new_thd;
 
515
  pthread_t th;
 
516
  int res= 0;
 
517
  DBUG_ENTER("Event_scheduler::execute_top");
 
518
  if (!(new_thd= new THD()))
 
519
    goto error;
 
520
 
 
521
  pre_init_event_thread(new_thd);
 
522
  new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER;
 
523
  event_name->thd= new_thd;
 
524
  DBUG_PRINT("info", ("Event %s@%s ready for start",
 
525
             event_name->dbname.str, event_name->name.str));
 
526
 
 
527
  /*
 
528
    TODO: should use thread pool here, preferably with an upper limit
 
529
    on number of threads: if too many events are scheduled for the
 
530
    same time, starting all of them at once won't help them run truly
 
531
    in parallel (because of the great amount of synchronization), so
 
532
    we may as well execute them in sequence, keeping concurrency at a
 
533
    reasonable level.
 
534
  */
 
535
  /* Major failure */
 
536
  if ((res= pthread_create(&th, &connection_attrib, event_worker_thread,
 
537
                           event_name)))
 
538
    goto error;
 
539
 
 
540
  ++started_events;
 
541
 
 
542
  DBUG_PRINT("info", ("Event is in THD: 0x%lx", (long) new_thd));
 
543
  DBUG_RETURN(FALSE);
 
544
 
 
545
error:
 
546
  DBUG_PRINT("error", ("Event_scheduler::execute_top() res: %d", res));
 
547
  if (new_thd)
 
548
  {
 
549
    new_thd->proc_info= "Clearing";
 
550
    DBUG_ASSERT(new_thd->net.buff != 0);
 
551
    net_end(&new_thd->net);
 
552
    pthread_mutex_lock(&LOCK_thread_count);
 
553
    thread_count--;
 
554
    thread_running--;
 
555
    delete new_thd;
 
556
    pthread_cond_broadcast(&COND_thread_count);
 
557
    pthread_mutex_unlock(&LOCK_thread_count);
 
558
  }
 
559
  delete event_name;
 
560
  DBUG_RETURN(TRUE);
 
561
}
 
562
 
 
563
 
 
564
/*
 
565
  Checks whether the state of the scheduler is RUNNING
 
566
 
 
567
  SYNOPSIS
 
568
    Event_scheduler::is_running()
 
569
 
 
570
  RETURN VALUE
 
571
    TRUE   RUNNING
 
572
    FALSE  Not RUNNING
 
573
*/
 
574
 
 
575
bool
 
576
Event_scheduler::is_running()
 
577
{
 
578
  LOCK_DATA();
 
579
  bool ret= (state == RUNNING);
 
580
  UNLOCK_DATA();
 
581
  return ret;
 
582
}
 
583
 
 
584
 
 
585
/**
 
586
  Stops the scheduler (again). Waits for acknowledgement from the
 
587
  scheduler that it has stopped - synchronous stopping.
 
588
 
 
589
  Already running events will not be stopped. If the user needs
 
590
  them stopped manual intervention is needed.
 
591
 
 
592
  SYNOPSIS
 
593
    Event_scheduler::stop()
 
594
 
 
595
  RETURN VALUE
 
596
    FALSE  OK
 
597
    TRUE   Error (not reported)
 
598
*/
 
599
 
 
600
bool
 
601
Event_scheduler::stop()
 
602
{
 
603
  THD *thd= current_thd;
 
604
  DBUG_ENTER("Event_scheduler::stop");
 
605
  DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
 
606
 
 
607
  LOCK_DATA();
 
608
  DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state].str));
 
609
  if (state != RUNNING)
 
610
    goto end;
 
611
 
 
612
  /* Guarantee we don't catch spurious signals */
 
613
  do {
 
614
    DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from "
 
615
                        "the scheduler thread.  Current value of state is %s . "
 
616
                        "workers count=%d", scheduler_states_names[state].str,
 
617
                        workers_count()));
 
618
    /*
 
619
      NOTE: We don't use kill_one_thread() because it can't kill COM_DEAMON
 
620
      threads. In addition, kill_one_thread() requires THD but during shutdown
 
621
      current_thd is NULL. Hence, if kill_one_thread should be used it has to
 
622
      be modified to kill also daemons, by adding a flag, and also we have to
 
623
      create artificial THD here. To save all this work, we just do what
 
624
      kill_one_thread() does to kill a thread. See also sql_repl.cc for similar
 
625
      usage.
 
626
    */
 
627
 
 
628
    state= STOPPING;
 
629
    DBUG_PRINT("info", ("Scheduler thread has id %lu",
 
630
                        scheduler_thd->thread_id));
 
631
    /* Lock from delete */
 
632
    pthread_mutex_lock(&scheduler_thd->LOCK_thd_data);
 
633
    /* This will wake up the thread if it waits on Queue's conditional */
 
634
    sql_print_information("Event Scheduler: Killing the scheduler thread, "
 
635
                          "thread id %lu",
 
636
                          scheduler_thd->thread_id);
 
637
    scheduler_thd->awake(THD::KILL_CONNECTION);
 
638
    pthread_mutex_unlock(&scheduler_thd->LOCK_thd_data);
 
639
 
 
640
    /* thd could be 0x0, when shutting down */
 
641
    sql_print_information("Event Scheduler: "
 
642
                          "Waiting for the scheduler thread to reply");
 
643
    COND_STATE_WAIT(thd, NULL, "Waiting scheduler to stop");
 
644
  } while (state == STOPPING);
 
645
  DBUG_PRINT("info", ("Scheduler thread has cleaned up. Set state to INIT"));
 
646
  sql_print_information("Event Scheduler: Stopped");
 
647
end:
 
648
  UNLOCK_DATA();
 
649
  DBUG_RETURN(FALSE);
 
650
}
 
651
 
 
652
 
 
653
/*
 
654
  Returns the number of living event worker threads.
 
655
 
 
656
  SYNOPSIS
 
657
    Event_scheduler::workers_count()
 
658
*/
 
659
 
 
660
uint
 
661
Event_scheduler::workers_count()
 
662
{
 
663
  THD *tmp;
 
664
  uint count= 0;
 
665
 
 
666
  DBUG_ENTER("Event_scheduler::workers_count");
 
667
  pthread_mutex_lock(&LOCK_thread_count);       // For unlink from list
 
668
  I_List_iterator<THD> it(threads);
 
669
  while ((tmp=it++))
 
670
    if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
 
671
      ++count;
 
672
  pthread_mutex_unlock(&LOCK_thread_count);
 
673
  DBUG_PRINT("exit", ("%d", count));
 
674
  DBUG_RETURN(count);
 
675
}
 
676
 
 
677
 
 
678
/*
 
679
  Auxiliary function for locking LOCK_scheduler_state. Used
 
680
  by the LOCK_DATA macro.
 
681
 
 
682
  SYNOPSIS
 
683
    Event_scheduler::lock_data()
 
684
      func  Which function is requesting mutex lock
 
685
      line  On which line mutex lock is requested
 
686
*/
 
687
 
 
688
void
 
689
Event_scheduler::lock_data(const char *func, uint line)
 
690
{
 
691
  DBUG_ENTER("Event_scheduler::lock_data");
 
692
  DBUG_PRINT("enter", ("func=%s line=%u", func, line));
 
693
  pthread_mutex_lock(&LOCK_scheduler_state);
 
694
  mutex_last_locked_in_func= func;
 
695
  mutex_last_locked_at_line= line;
 
696
  mutex_scheduler_data_locked= TRUE;
 
697
  DBUG_VOID_RETURN;
 
698
}
 
699
 
 
700
 
 
701
/*
 
702
  Auxiliary function for unlocking LOCK_scheduler_state. Used
 
703
  by the UNLOCK_DATA macro.
 
704
 
 
705
  SYNOPSIS
 
706
    Event_scheduler::unlock_data()
 
707
      func  Which function is requesting mutex unlock
 
708
      line  On which line mutex unlock is requested
 
709
*/
 
710
 
 
711
void
 
712
Event_scheduler::unlock_data(const char *func, uint line)
 
713
{
 
714
  DBUG_ENTER("Event_scheduler::unlock_data");
 
715
  DBUG_PRINT("enter", ("func=%s line=%u", func, line));
 
716
  mutex_last_unlocked_at_line= line;
 
717
  mutex_scheduler_data_locked= FALSE;
 
718
  mutex_last_unlocked_in_func= func;
 
719
  pthread_mutex_unlock(&LOCK_scheduler_state);
 
720
  DBUG_VOID_RETURN;
 
721
}
 
722
 
 
723
 
 
724
/*
 
725
  Wrapper for pthread_cond_wait/timedwait
 
726
 
 
727
  SYNOPSIS
 
728
    Event_scheduler::cond_wait()
 
729
      thd     Thread (Could be NULL during shutdown procedure)
 
730
      abstime If not null then call pthread_cond_timedwait()
 
731
      msg     Message for thd->proc_info
 
732
      func    Which function is requesting cond_wait
 
733
      line    On which line cond_wait is requested
 
734
*/
 
735
 
 
736
void
 
737
Event_scheduler::cond_wait(THD *thd, struct timespec *abstime, const char* msg,
 
738
                           const char *func, uint line)
 
739
{
 
740
  DBUG_ENTER("Event_scheduler::cond_wait");
 
741
  waiting_on_cond= TRUE;
 
742
  mutex_last_unlocked_at_line= line;
 
743
  mutex_scheduler_data_locked= FALSE;
 
744
  mutex_last_unlocked_in_func= func;
 
745
  if (thd)
 
746
    thd->enter_cond(&COND_state, &LOCK_scheduler_state, msg);
 
747
 
 
748
  DBUG_PRINT("info", ("pthread_cond_%swait", abstime? "timed":""));
 
749
  if (!abstime)
 
750
    pthread_cond_wait(&COND_state, &LOCK_scheduler_state);
 
751
  else
 
752
    pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime);
 
753
  if (thd)
 
754
  {
 
755
    /*
 
756
      This will free the lock so we need to relock. Not the best thing to
 
757
      do but we need to obey cond_wait()
 
758
    */
 
759
    thd->exit_cond("");
 
760
    LOCK_DATA();
 
761
  }
 
762
  mutex_last_locked_in_func= func;
 
763
  mutex_last_locked_at_line= line;
 
764
  mutex_scheduler_data_locked= TRUE;
 
765
  waiting_on_cond= FALSE;
 
766
  DBUG_VOID_RETURN;
 
767
}
 
768
 
 
769
 
 
770
/*
 
771
  Dumps the internal status of the scheduler
 
772
 
 
773
  SYNOPSIS
 
774
    Event_scheduler::dump_internal_status()
 
775
*/
 
776
 
 
777
void
 
778
Event_scheduler::dump_internal_status()
 
779
{
 
780
  DBUG_ENTER("Event_scheduler::dump_internal_status");
 
781
 
 
782
  puts("");
 
783
  puts("Event scheduler status:");
 
784
  printf("State      : %s\n", scheduler_states_names[state].str);
 
785
  printf("Thread id  : %lu\n", scheduler_thd? scheduler_thd->thread_id : 0);
 
786
  printf("LLA        : %s:%u\n", mutex_last_locked_in_func,
 
787
                                 mutex_last_locked_at_line);
 
788
  printf("LUA        : %s:%u\n", mutex_last_unlocked_in_func,
 
789
                                 mutex_last_unlocked_at_line);
 
790
  printf("WOC        : %s\n", waiting_on_cond? "YES":"NO");
 
791
  printf("Workers    : %u\n", workers_count());
 
792
  printf("Executed   : %lu\n", (ulong) started_events);
 
793
  printf("Data locked: %s\n", mutex_scheduler_data_locked ? "YES":"NO");
 
794
 
 
795
  DBUG_VOID_RETURN;
 
796
}
 
797
 
 
798
/**
 
799
  @} (End of group Event_Scheduler)
 
800
*/