1
/* Copyright (C) 2004-2006 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include "mysql_priv.h"
18
#include "event_data_objects.h"
19
#include "event_scheduler.h"
20
#include "event_queue.h"
21
#include "event_db_repository.h"
24
@addtogroup Event_Scheduler
30
#define SCHED_FUNC __FUNCTION__
33
#define SCHED_FUNC "<unknown>"
36
#define LOCK_DATA() lock_data(SCHED_FUNC, __LINE__)
37
#define UNLOCK_DATA() unlock_data(SCHED_FUNC, __LINE__)
38
#define COND_STATE_WAIT(mythd, abstime, msg) \
39
cond_wait(mythd, abstime, msg, SCHED_FUNC, __LINE__)
41
extern pthread_attr_t connection_attrib;
44
Event_db_repository *Event_worker_thread::db_repository;
48
const LEX_STRING scheduler_states_names[] =
50
{ C_STRING_WITH_LEN("INITIALIZED") },
51
{ C_STRING_WITH_LEN("RUNNING") },
52
{ C_STRING_WITH_LEN("STOPPING") }
55
struct scheduler_param {
57
Event_scheduler *scheduler;
62
Prints the stack of infos, warnings, errors from thd to
63
the console so it can be fetched by the logs-into-tables and
68
thd Thread used during the execution of the event
73
Event_worker_thread::print_warnings(THD *thd, Event_job_data *et)
76
DBUG_ENTER("evex_print_warnings");
77
if (!thd->warn_list.elements)
80
char msg_buf[10 * STRING_BUFFER_USUAL_SIZE];
81
char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE];
82
String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info);
84
prefix.append("Event Scheduler: [");
86
prefix.append(et->definer.str, et->definer.length, system_charset_info);
87
prefix.append("][", 2);
88
prefix.append(et->dbname.str, et->dbname.length, system_charset_info);
90
prefix.append(et->name.str, et->name.length, system_charset_info);
91
prefix.append("] ", 2);
93
List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
96
String err_msg(msg_buf, sizeof(msg_buf), system_charset_info);
97
/* set it to 0 or we start adding at the end. That's the trick ;) */
99
err_msg.append(prefix);
100
err_msg.append(err->msg, strlen(err->msg), system_charset_info);
101
DBUG_ASSERT(err->level < 3);
102
(sql_print_message_handlers[err->level])("%*s", err_msg.length(),
110
Performs post initialization of structures in a new thread.
113
post_init_event_thread()
117
Before this is called, one should not do any DBUG_XXX() calls.
122
post_init_event_thread(THD *thd)
124
(void) init_new_connection_handler_thread();
125
if (init_thr_lock() || thd->store_globals())
132
pthread_mutex_lock(&LOCK_thread_count);
136
pthread_mutex_unlock(&LOCK_thread_count);
143
Cleans up the THD and the threaded environment of the thread.
146
deinit_event_thread()
151
deinit_event_thread(THD *thd)
153
thd->proc_info= "Clearing";
154
DBUG_ASSERT(thd->net.buff != 0);
156
DBUG_PRINT("exit", ("Event thread finishing"));
157
pthread_mutex_lock(&LOCK_thread_count);
161
pthread_cond_broadcast(&COND_thread_count);
162
pthread_mutex_unlock(&LOCK_thread_count);
167
Performs pre- pthread_create() initialisation of THD. Do this
168
in the thread that will pass THD to the child thread. In the
169
child thread call post_init_event_thread().
172
pre_init_event_thread()
173
thd The THD of the thread. Has to be allocated by the caller.
176
1. The host of the thead is my_localhost
177
2. thd->net is initted with NULL - no communication.
181
pre_init_event_thread(THD* thd)
183
DBUG_ENTER("pre_init_event_thread");
184
thd->client_capabilities= 0;
185
thd->security_ctx->master_access= 0;
186
thd->security_ctx->db_access= 0;
187
thd->security_ctx->host_or_ip= (char*)my_localhost;
188
my_net_init(&thd->net, NULL);
189
thd->security_ctx->set_user((char*)"event_scheduler");
190
thd->net.read_timeout= slave_net_timeout;
191
thd->slave_thread= 0;
192
thd->options|= OPTION_AUTO_IS_NULL;
193
thd->client_capabilities|= CLIENT_MULTI_RESULTS;
194
pthread_mutex_lock(&LOCK_thread_count);
195
thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
196
pthread_mutex_unlock(&LOCK_thread_count);
199
Guarantees that we will see the thread in SHOW PROCESSLIST though its
203
thd->proc_info= "Initialized";
204
thd->version= refresh_version;
212
Function that executes the scheduler,
215
event_scheduler_thread()
216
arg Pointer to `struct scheduler_param`
223
event_scheduler_thread(void *arg)
225
/* needs to be first for thread_stack */
226
THD *thd= (THD *) ((struct scheduler_param *) arg)->thd;
227
Event_scheduler *scheduler= ((struct scheduler_param *) arg)->scheduler;
230
thd->thread_stack= (char *)&thd; // remember where our stack is
231
res= post_init_event_thread(thd);
233
DBUG_ENTER("event_scheduler_thread");
234
my_free((char*)arg, MYF(0));
238
DBUG_LEAVE; // Against gcc warnings
245
Function that executes an event in a child thread. Setups the
246
environment for the event execution and cleans after that.
249
event_worker_thread()
250
arg The Event_job_data object to be processed
257
event_worker_thread(void *arg)
260
Event_queue_element_for_exec *event= (Event_queue_element_for_exec *)arg;
264
Event_worker_thread worker_thread;
265
worker_thread.run(thd, event);
268
return 0; // Can't return anything here
273
Function that executes an event in a child thread. Setups the
274
environment for the event execution and cleans after that.
277
Event_worker_thread::run()
279
event The Event_queue_element_for_exec object to be processed
283
Event_worker_thread::run(THD *thd, Event_queue_element_for_exec *event)
285
/* needs to be first for thread_stack */
287
Event_job_data job_data;
290
thd->thread_stack= &my_stack; // remember where our stack is
291
res= post_init_event_thread(thd);
293
DBUG_ENTER("Event_worker_thread::run");
294
DBUG_PRINT("info", ("Time is %ld, THD: 0x%lx", (long) my_time(0), (long) thd));
299
if ((res= db_repository->load_named_event(thd, event->dbname, event->name,
302
DBUG_PRINT("error", ("Got error from load_named_event"));
306
thd->enable_slow_log= TRUE;
308
res= job_data.execute(thd, event->dropped);
310
print_warnings(thd, &job_data);
313
sql_print_information("Event Scheduler: "
314
"[%s].[%s.%s] event execution failed.",
315
job_data.definer.str,
316
job_data.dbname.str, job_data.name.str);
318
DBUG_PRINT("info", ("Done with Event %s.%s", event->dbname.str,
322
deinit_event_thread(thd);
328
Event_scheduler::Event_scheduler(Event_queue *queue_arg)
332
mutex_last_locked_at_line(0),
333
mutex_last_unlocked_at_line(0),
334
mutex_last_locked_in_func("n/a"),
335
mutex_last_unlocked_in_func("n/a"),
336
mutex_scheduler_data_locked(FALSE),
337
waiting_on_cond(FALSE),
340
pthread_mutex_init(&LOCK_scheduler_state, MY_MUTEX_INIT_FAST);
341
pthread_cond_init(&COND_state, NULL);
345
Event_scheduler::~Event_scheduler()
347
stop(); /* does nothing if not running */
348
pthread_mutex_destroy(&LOCK_scheduler_state);
349
pthread_cond_destroy(&COND_state);
354
Starts the scheduler (again). Creates a new THD and passes it to
355
a forked thread. Does not wait for acknowledgement from the new
356
thread that it has started. Asynchronous starting. Most of the
357
needed initializations are done in the current thread to minimize
358
the chance of failure in the spawned thread.
361
Event_scheduler::start()
365
TRUE Error (not reported)
369
Event_scheduler::start()
374
struct scheduler_param *scheduler_param_value;
375
DBUG_ENTER("Event_scheduler::start");
378
DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state].str));
379
if (state > INITIALIZED)
382
if (!(new_thd= new THD))
384
sql_print_error("Event Scheduler: Cannot initialize the scheduler thread");
388
pre_init_event_thread(new_thd);
389
new_thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER;
390
new_thd->command= COM_DAEMON;
393
We should run the event scheduler thread under the super-user privileges.
394
In particular, this is needed to be able to lock the mysql.event table
395
for writing when the server is running in the read-only mode.
397
new_thd->security_ctx->master_access |= SUPER_ACL;
399
scheduler_param_value=
400
(struct scheduler_param *)my_malloc(sizeof(struct scheduler_param), MYF(0));
401
scheduler_param_value->thd= new_thd;
402
scheduler_param_value->scheduler= this;
404
scheduler_thd= new_thd;
405
DBUG_PRINT("info", ("Setting state go RUNNING"));
407
DBUG_PRINT("info", ("Forking new thread for scheduler. THD: 0x%lx", (long) new_thd));
408
if (pthread_create(&th, &connection_attrib, event_scheduler_thread,
409
(void*)scheduler_param_value))
411
DBUG_PRINT("error", ("cannot create a new thread"));
416
new_thd->proc_info= "Clearing";
417
DBUG_ASSERT(new_thd->net.buff != 0);
418
net_end(&new_thd->net);
419
pthread_mutex_lock(&LOCK_thread_count);
423
pthread_cond_broadcast(&COND_thread_count);
424
pthread_mutex_unlock(&LOCK_thread_count);
434
The main loop of the scheduler.
437
Event_scheduler::run()
442
TRUE Error (Serious error)
446
Event_scheduler::run(THD *thd)
449
DBUG_ENTER("Event_scheduler::run");
451
sql_print_information("Event Scheduler: scheduler thread started with id %lu",
454
Recalculate the values in the queue because there could have been stops
455
in executions of the scheduler and some times could have passed by.
457
queue->recalculate_activation_times(thd);
461
Event_queue_element_for_exec *event_name;
463
/* Gets a minimized version */
464
if (queue->get_top_for_execution_if_time(thd, &event_name))
466
sql_print_information("Event Scheduler: "
467
"Serious error during getting next "
468
"event to execute. Stopping");
472
DBUG_PRINT("info", ("get_top_for_execution_if_time returned "
473
"event_name=0x%lx", (long) event_name));
476
if ((res= execute_top(event_name)))
481
DBUG_ASSERT(thd->killed);
482
DBUG_PRINT("info", ("job_data is NULL, the thread was killed"));
484
DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str));
488
deinit_event_thread(thd);
491
DBUG_PRINT("info", ("Signalling back to the stopper COND_state"));
492
pthread_cond_signal(&COND_state);
500
Creates a new THD instance and then forks a new thread, while passing
501
the THD pointer and job_data to it.
504
Event_scheduler::execute_top()
508
TRUE Error (Serious error)
512
Event_scheduler::execute_top(Event_queue_element_for_exec *event_name)
517
DBUG_ENTER("Event_scheduler::execute_top");
518
if (!(new_thd= new THD()))
521
pre_init_event_thread(new_thd);
522
new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER;
523
event_name->thd= new_thd;
524
DBUG_PRINT("info", ("Event %s@%s ready for start",
525
event_name->dbname.str, event_name->name.str));
528
TODO: should use thread pool here, preferably with an upper limit
529
on number of threads: if too many events are scheduled for the
530
same time, starting all of them at once won't help them run truly
531
in parallel (because of the great amount of synchronization), so
532
we may as well execute them in sequence, keeping concurrency at a
536
if ((res= pthread_create(&th, &connection_attrib, event_worker_thread,
542
DBUG_PRINT("info", ("Event is in THD: 0x%lx", (long) new_thd));
546
DBUG_PRINT("error", ("Event_scheduler::execute_top() res: %d", res));
549
new_thd->proc_info= "Clearing";
550
DBUG_ASSERT(new_thd->net.buff != 0);
551
net_end(&new_thd->net);
552
pthread_mutex_lock(&LOCK_thread_count);
556
pthread_cond_broadcast(&COND_thread_count);
557
pthread_mutex_unlock(&LOCK_thread_count);
565
Checks whether the state of the scheduler is RUNNING
568
Event_scheduler::is_running()
576
Event_scheduler::is_running()
579
bool ret= (state == RUNNING);
586
Stops the scheduler (again). Waits for acknowledgement from the
587
scheduler that it has stopped - synchronous stopping.
589
Already running events will not be stopped. If the user needs
590
them stopped manual intervention is needed.
593
Event_scheduler::stop()
597
TRUE Error (not reported)
601
Event_scheduler::stop()
603
THD *thd= current_thd;
604
DBUG_ENTER("Event_scheduler::stop");
605
DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
608
DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state].str));
609
if (state != RUNNING)
612
/* Guarantee we don't catch spurious signals */
614
DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from "
615
"the scheduler thread. Current value of state is %s . "
616
"workers count=%d", scheduler_states_names[state].str,
619
NOTE: We don't use kill_one_thread() because it can't kill COM_DEAMON
620
threads. In addition, kill_one_thread() requires THD but during shutdown
621
current_thd is NULL. Hence, if kill_one_thread should be used it has to
622
be modified to kill also daemons, by adding a flag, and also we have to
623
create artificial THD here. To save all this work, we just do what
624
kill_one_thread() does to kill a thread. See also sql_repl.cc for similar
629
DBUG_PRINT("info", ("Scheduler thread has id %lu",
630
scheduler_thd->thread_id));
631
/* Lock from delete */
632
pthread_mutex_lock(&scheduler_thd->LOCK_thd_data);
633
/* This will wake up the thread if it waits on Queue's conditional */
634
sql_print_information("Event Scheduler: Killing the scheduler thread, "
636
scheduler_thd->thread_id);
637
scheduler_thd->awake(THD::KILL_CONNECTION);
638
pthread_mutex_unlock(&scheduler_thd->LOCK_thd_data);
640
/* thd could be 0x0, when shutting down */
641
sql_print_information("Event Scheduler: "
642
"Waiting for the scheduler thread to reply");
643
COND_STATE_WAIT(thd, NULL, "Waiting scheduler to stop");
644
} while (state == STOPPING);
645
DBUG_PRINT("info", ("Scheduler thread has cleaned up. Set state to INIT"));
646
sql_print_information("Event Scheduler: Stopped");
654
Returns the number of living event worker threads.
657
Event_scheduler::workers_count()
661
Event_scheduler::workers_count()
666
DBUG_ENTER("Event_scheduler::workers_count");
667
pthread_mutex_lock(&LOCK_thread_count); // For unlink from list
668
I_List_iterator<THD> it(threads);
670
if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
672
pthread_mutex_unlock(&LOCK_thread_count);
673
DBUG_PRINT("exit", ("%d", count));
679
Auxiliary function for locking LOCK_scheduler_state. Used
680
by the LOCK_DATA macro.
683
Event_scheduler::lock_data()
684
func Which function is requesting mutex lock
685
line On which line mutex lock is requested
689
Event_scheduler::lock_data(const char *func, uint line)
691
DBUG_ENTER("Event_scheduler::lock_data");
692
DBUG_PRINT("enter", ("func=%s line=%u", func, line));
693
pthread_mutex_lock(&LOCK_scheduler_state);
694
mutex_last_locked_in_func= func;
695
mutex_last_locked_at_line= line;
696
mutex_scheduler_data_locked= TRUE;
702
Auxiliary function for unlocking LOCK_scheduler_state. Used
703
by the UNLOCK_DATA macro.
706
Event_scheduler::unlock_data()
707
func Which function is requesting mutex unlock
708
line On which line mutex unlock is requested
712
Event_scheduler::unlock_data(const char *func, uint line)
714
DBUG_ENTER("Event_scheduler::unlock_data");
715
DBUG_PRINT("enter", ("func=%s line=%u", func, line));
716
mutex_last_unlocked_at_line= line;
717
mutex_scheduler_data_locked= FALSE;
718
mutex_last_unlocked_in_func= func;
719
pthread_mutex_unlock(&LOCK_scheduler_state);
725
Wrapper for pthread_cond_wait/timedwait
728
Event_scheduler::cond_wait()
729
thd Thread (Could be NULL during shutdown procedure)
730
abstime If not null then call pthread_cond_timedwait()
731
msg Message for thd->proc_info
732
func Which function is requesting cond_wait
733
line On which line cond_wait is requested
737
Event_scheduler::cond_wait(THD *thd, struct timespec *abstime, const char* msg,
738
const char *func, uint line)
740
DBUG_ENTER("Event_scheduler::cond_wait");
741
waiting_on_cond= TRUE;
742
mutex_last_unlocked_at_line= line;
743
mutex_scheduler_data_locked= FALSE;
744
mutex_last_unlocked_in_func= func;
746
thd->enter_cond(&COND_state, &LOCK_scheduler_state, msg);
748
DBUG_PRINT("info", ("pthread_cond_%swait", abstime? "timed":""));
750
pthread_cond_wait(&COND_state, &LOCK_scheduler_state);
752
pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime);
756
This will free the lock so we need to relock. Not the best thing to
757
do but we need to obey cond_wait()
762
mutex_last_locked_in_func= func;
763
mutex_last_locked_at_line= line;
764
mutex_scheduler_data_locked= TRUE;
765
waiting_on_cond= FALSE;
771
Dumps the internal status of the scheduler
774
Event_scheduler::dump_internal_status()
778
Event_scheduler::dump_internal_status()
780
DBUG_ENTER("Event_scheduler::dump_internal_status");
783
puts("Event scheduler status:");
784
printf("State : %s\n", scheduler_states_names[state].str);
785
printf("Thread id : %lu\n", scheduler_thd? scheduler_thd->thread_id : 0);
786
printf("LLA : %s:%u\n", mutex_last_locked_in_func,
787
mutex_last_locked_at_line);
788
printf("LUA : %s:%u\n", mutex_last_unlocked_in_func,
789
mutex_last_unlocked_at_line);
790
printf("WOC : %s\n", waiting_on_cond? "YES":"NO");
791
printf("Workers : %u\n", workers_count());
792
printf("Executed : %lu\n", (ulong) started_events);
793
printf("Data locked: %s\n", mutex_scheduler_data_locked ? "YES":"NO");
799
@} (End of group Event_Scheduler)