1
/* Copyright (C) 2004-2006 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include "mysql_priv.h"
17
#include "event_queue.h"
18
#include "event_data_objects.h"
21
@addtogroup Event_Scheduler
25
#define EVENT_QUEUE_INITIAL_SIZE 30
26
#define EVENT_QUEUE_EXTENT 30
30
#define SCHED_FUNC __FUNCTION__
33
#define SCHED_FUNC "<unknown>"
36
#define LOCK_QUEUE_DATA() lock_data(SCHED_FUNC, __LINE__)
37
#define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__)
40
Compares the execute_at members of two Event_queue_element instances.
41
Used as callback for the prioritized queue when shifting
45
event_queue_element_data_compare_q()
46
vptr Not used (set it to NULL)
47
a First Event_queue_element object
48
b Second Event_queue_element object
51
-1 a->execute_at < b->execute_at
52
0 a->execute_at == b->execute_at
53
1 a->execute_at > b->execute_at
56
execute_at.second_part is not considered during comparison
59
extern "C" int event_queue_element_compare_q(void *, uchar *, uchar *);
61
int event_queue_element_compare_q(void *vptr, uchar* a, uchar *b)
63
Event_queue_element *left = (Event_queue_element *)a;
64
Event_queue_element *right = (Event_queue_element *)b;
65
my_time_t lhs = left->execute_at;
66
my_time_t rhs = right->execute_at;
68
if (left->status == Event_parse_data::DISABLED)
69
return right->status != Event_parse_data::DISABLED;
71
if (right->status == Event_parse_data::DISABLED)
74
return (lhs < rhs ? -1 : (lhs > rhs ? 1 : 0));
79
Constructor of class Event_queue.
82
Event_queue::Event_queue()
85
Event_queue::Event_queue()
86
:next_activation_at(0),
87
mutex_last_locked_at_line(0),
88
mutex_last_unlocked_at_line(0),
89
mutex_last_attempted_lock_at_line(0),
90
mutex_last_locked_in_func("n/a"),
91
mutex_last_unlocked_in_func("n/a"),
92
mutex_last_attempted_lock_in_func("n/a"),
93
mutex_queue_data_locked(FALSE),
94
mutex_queue_data_attempting_lock(FALSE),
95
waiting_on_cond(FALSE)
97
pthread_mutex_init(&LOCK_event_queue, MY_MUTEX_INIT_FAST);
98
pthread_cond_init(&COND_queue_state, NULL);
102
Event_queue::~Event_queue()
105
pthread_mutex_destroy(&LOCK_event_queue);
106
pthread_cond_destroy(&COND_queue_state);
111
This is a queue's constructor. Until this method is called, the
112
queue is unusable. We don't use a C++ constructor instead in
113
order to be able to check the return value. The queue is
114
initialized once at server startup. Initialization can fail in
115
case of a failure reading events from the database or out of
127
Event_queue::init_queue(THD *thd)
129
DBUG_ENTER("Event_queue::init_queue");
130
DBUG_PRINT("enter", ("this: 0x%lx", (long) this));
134
if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/,
135
0 /*max_on_top*/, event_queue_element_compare_q,
136
NULL, EVENT_QUEUE_EXTENT))
138
sql_print_error("Event Scheduler: Can't initialize the execution queue");
152
Deinits the queue. Remove all elements from it and destroys them
156
Event_queue::deinit_queue()
160
Event_queue::deinit_queue()
162
DBUG_ENTER("Event_queue::deinit_queue");
166
delete_queue(&queue);
174
Adds an event to the queue.
176
Compute the next execution time for an event, and if it is still
177
active, add it to the queue. Otherwise delete it.
178
The object is left intact in case of an error. Otherwise
179
the queue container assumes ownership of it.
181
@param[in] thd thread handle
182
@param[in] new_element a new element to add to the queue
183
@param[out] created set to TRUE if no error and the element is
184
added to the queue, FALSE otherwise
186
@retval TRUE an error occured. The value of created is undefined,
187
the element was not deleted.
188
@retval FALSE success
192
Event_queue::create_event(THD *thd, Event_queue_element *new_element,
195
DBUG_ENTER("Event_queue::create_event");
196
DBUG_PRINT("enter", ("thd: 0x%lx et=%s.%s", (long) thd,
197
new_element->dbname.str, new_element->name.str));
199
/* Will do nothing if the event is disabled */
200
new_element->compute_next_execution_time();
201
if (new_element->status != Event_parse_data::ENABLED)
208
DBUG_PRINT("info", ("new event in the queue: 0x%lx", (long) new_element));
211
*created= (queue_insert_safe(&queue, (uchar *) new_element) == FALSE);
212
dbug_dump_queue(thd->query_start());
213
pthread_cond_broadcast(&COND_queue_state);
216
DBUG_RETURN(!*created);
221
Updates an event from the scheduler queue
224
Event_queue::update_event()
226
dbname Schema of the event
227
name Name of the event
228
new_schema New schema, in case of RENAME TO, otherwise NULL
229
new_name New name, in case of RENAME TO, otherwise NULL
233
Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
234
Event_queue_element *new_element)
236
DBUG_ENTER("Event_queue::update_event");
237
DBUG_PRINT("enter", ("thd: 0x%lx et=[%s.%s]", (long) thd, dbname.str, name.str));
239
if ((new_element->status == Event_parse_data::DISABLED) ||
240
(new_element->status == Event_parse_data::SLAVESIDE_DISABLED))
242
DBUG_PRINT("info", ("The event is disabled."));
244
Destroy the object but don't skip to end: because we may have to remove
245
object from the cache.
251
new_element->compute_next_execution_time();
254
find_n_remove_event(dbname, name);
256
/* If not disabled event */
259
DBUG_PRINT("info", ("new event in the queue: 0x%lx", (long) new_element));
260
queue_insert_safe(&queue, (uchar *) new_element);
261
pthread_cond_broadcast(&COND_queue_state);
264
dbug_dump_queue(thd->query_start());
272
Drops an event from the queue
275
Event_queue::drop_event()
277
dbname Schema of the event to drop
278
name Name of the event to drop
282
Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
284
DBUG_ENTER("Event_queue::drop_event");
285
DBUG_PRINT("enter", ("thd: 0x%lx db :%s name: %s", (long) thd,
286
dbname.str, name.str));
289
find_n_remove_event(dbname, name);
290
dbug_dump_queue(thd->query_start());
294
We don't signal here because the scheduler will catch the change
295
next time it wakes up.
303
Drops all events from the in-memory queue and disk that match
304
certain pattern evaluated by a comparator function
307
Event_queue::drop_matching_events()
309
pattern A pattern string
310
comparator The function to use for comparing
313
>=0 Number of dropped events
316
Expected is the caller to acquire lock on LOCK_event_queue
320
Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
321
bool (*comparator)(LEX_STRING, Event_basic *))
324
DBUG_ENTER("Event_queue::drop_matching_events");
325
DBUG_PRINT("enter", ("pattern=%s", pattern.str));
327
while (i < queue.elements)
329
Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
330
DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str));
331
if (comparator(pattern, et))
334
The queue is ordered. If we remove an element, then all elements
335
after it will shift one position to the left, if we imagine it as
336
an array from left to the right. In this case we should not
337
increment the counter and the (i < queue.elements) condition is ok.
339
queue_remove(&queue, i);
346
We don't call pthread_cond_broadcast(&COND_queue_state);
347
If we remove the top event:
348
1. The queue is empty. The scheduler will wake up at some time and
349
realize that the queue is empty. If create_event() comes inbetween
350
it will signal the scheduler
351
2. The queue is not empty, but the next event after the previous top,
352
won't be executed any time sooner than the element we removed. Hence,
353
we may not notify the scheduler and it will realize the change when it
354
wakes up from timedwait.
362
Drops all events from the in-memory queue and disk that are from
366
Event_queue::drop_schema_events()
368
schema The schema name
372
Event_queue::drop_schema_events(THD *thd, LEX_STRING schema)
374
DBUG_ENTER("Event_queue::drop_schema_events");
376
drop_matching_events(thd, schema, event_basic_db_equal);
383
Searches for an event in the queue
386
Event_queue::find_n_remove_event()
387
db The schema of the event to find
388
name The event to find
391
The caller should do the locking also the caller is responsible for
392
actual signalling in case an event is removed from the queue.
396
Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name)
399
DBUG_ENTER("Event_queue::find_n_remove_event");
401
for (i= 0; i < queue.elements; ++i)
403
Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
404
DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str,
405
et->dbname.str, et->name.str));
406
if (event_basic_identifier_equal(db, name, et))
408
queue_remove(&queue, i);
419
Recalculates activation times in the queue. There is one reason for
420
that. Because the values (execute_at) by which the queue is ordered are
421
changed by calls to compute_next_execution_time() on a request from the
422
scheduler thread, if it is not running then the values won't be updated.
423
Once the scheduler is started again the values has to be recalculated
424
so they are right for the current time.
427
Event_queue::recalculate_activation_times()
432
Event_queue::recalculate_activation_times(THD *thd)
435
DBUG_ENTER("Event_queue::recalculate_activation_times");
438
DBUG_PRINT("info", ("%u loaded events to be recalculated", queue.elements));
439
for (i= 0; i < queue.elements; i++)
441
((Event_queue_element*)queue_element(&queue, i))->compute_next_execution_time();
442
((Event_queue_element*)queue_element(&queue, i))->update_timing_fields(thd);
446
The disabled elements are moved to the end during the `fix`.
447
Start from the end and remove all of the elements which are
448
disabled. When we find the first non-disabled one we break, as we
449
have removed all. The queue has been ordered in a way the disabled
450
events are at the end.
452
for (i= queue.elements; i > 0; i--)
454
Event_queue_element *element = (Event_queue_element*)queue_element(&queue, i - 1);
455
if (element->status != Event_parse_data::DISABLED)
458
This won't cause queue re-order, because we remove
459
always the last element.
461
queue_remove(&queue, i - 1);
467
XXX: The events are dropped only from memory and not from disk
468
even if `drop_list[j]->dropped` is TRUE. There will be still on the
469
disk till next server restart.
470
Please add code here to do it.
478
Empties the queue and destroys the Event_queue_element objects in the
482
Event_queue::empty_queue()
485
Should be called with LOCK_event_queue locked
489
Event_queue::empty_queue()
492
DBUG_ENTER("Event_queue::empty_queue");
493
DBUG_PRINT("enter", ("Purging the queue. %u element(s)", queue.elements));
494
sql_print_information("Event Scheduler: Purging the queue. %u events",
496
/* empty the queue */
497
for (i= 0; i < queue.elements; ++i)
499
Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
502
resize_queue(&queue, 0);
508
Dumps the queue to the trace log.
511
Event_queue::dbug_dump_queue()
512
now Current timestamp
516
Event_queue::dbug_dump_queue(time_t now)
519
Event_queue_element *et;
521
DBUG_ENTER("Event_queue::dbug_dump_queue");
522
DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements));
523
for (i = 0; i < queue.elements; i++)
525
et= ((Event_queue_element*)queue_element(&queue, i));
526
DBUG_PRINT("info", ("et: 0x%lx name: %s.%s", (long) et,
527
et->dbname.str, et->name.str));
528
DBUG_PRINT("info", ("exec_at: %lu starts: %lu ends: %lu execs_so_far: %u "
529
"expr: %ld et.exec_at: %ld now: %ld "
530
"(et.exec_at - now): %d if: %d",
531
(long) et->execute_at, (long) et->starts,
532
(long) et->ends, et->execution_count,
533
(long) et->expression, (long) et->execute_at,
534
(long) now, (int) (et->execute_at - now),
535
et->execute_at <= now));
541
static const char *queue_empty_msg= "Waiting on empty queue";
542
static const char *queue_wait_msg= "Waiting for next activation";
545
Checks whether the top of the queue is elligible for execution and
546
returns an Event_job_data instance in case it should be executed.
547
`now` is compared against `execute_at` of the top element in the queue.
550
Event_queue::get_top_for_execution_if_time()
552
event_name [out] The object to execute
555
FALSE No error. event_name != NULL
560
Event_queue::get_top_for_execution_if_time(THD *thd,
561
Event_queue_element_for_exec **event_name)
565
DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
570
Event_queue_element *top= NULL;
572
/* Break loop if thd has been killed */
575
DBUG_PRINT("info", ("thd->killed=%d", thd->killed));
581
/* There are no events in the queue */
582
next_activation_at= 0;
584
/* Wait on condition until signaled. Release LOCK_queue while waiting. */
585
cond_wait(thd, NULL, queue_empty_msg, SCHED_FUNC, __LINE__);
590
top= ((Event_queue_element*) queue_element(&queue, 0));
592
thd->set_current_time(); /* Get current time */
594
next_activation_at= top->execute_at;
595
if (next_activation_at > thd->query_start())
598
Not yet time for top event, wait on condition with
599
time or until signaled. Release LOCK_queue while waiting.
601
struct timespec top_time;
602
set_timespec(top_time, next_activation_at - thd->query_start());
603
cond_wait(thd, &top_time, queue_wait_msg, SCHED_FUNC, __LINE__);
608
if (!(*event_name= new Event_queue_element_for_exec()) ||
609
(*event_name)->init(top->dbname, top->name))
615
DBUG_PRINT("info", ("Ready for execution"));
616
top->mark_last_executed(thd);
617
if (top->compute_next_execution_time())
618
top->status= Event_parse_data::DISABLED;
619
DBUG_PRINT("info", ("event %s status is %d", top->name.str, top->status));
621
top->execution_count++;
622
(*event_name)->dropped= top->dropped;
624
top->update_timing_fields(thd);
625
if (top->status == Event_parse_data::DISABLED)
627
DBUG_PRINT("info", ("removing from the queue"));
628
sql_print_information("Event Scheduler: Last execution of %s.%s. %s",
629
top->dbname.str, top->name.str,
630
top->dropped? "Dropping.":"");
632
queue_remove(&queue, 0);
635
queue_replaced(&queue);
637
dbug_dump_queue(thd->query_start());
643
DBUG_PRINT("info", ("returning %d et_new: 0x%lx ",
644
ret, (long) *event_name));
647
DBUG_PRINT("info", ("db: %s name: %s",
648
(*event_name)->dbname.str, (*event_name)->name.str));
655
Auxiliary function for locking LOCK_event_queue. Used by the
656
LOCK_QUEUE_DATA macro
659
Event_queue::lock_data()
660
func Which function is requesting mutex lock
661
line On which line mutex lock is requested
665
Event_queue::lock_data(const char *func, uint line)
667
DBUG_ENTER("Event_queue::lock_data");
668
DBUG_PRINT("enter", ("func=%s line=%u", func, line));
669
mutex_last_attempted_lock_in_func= func;
670
mutex_last_attempted_lock_at_line= line;
671
mutex_queue_data_attempting_lock= TRUE;
672
pthread_mutex_lock(&LOCK_event_queue);
673
mutex_last_attempted_lock_in_func= "";
674
mutex_last_attempted_lock_at_line= 0;
675
mutex_queue_data_attempting_lock= FALSE;
677
mutex_last_locked_in_func= func;
678
mutex_last_locked_at_line= line;
679
mutex_queue_data_locked= TRUE;
686
Auxiliary function for unlocking LOCK_event_queue. Used by the
687
UNLOCK_QUEUE_DATA macro
690
Event_queue::unlock_data()
691
func Which function is requesting mutex unlock
692
line On which line mutex unlock is requested
696
Event_queue::unlock_data(const char *func, uint line)
698
DBUG_ENTER("Event_queue::unlock_data");
699
DBUG_PRINT("enter", ("func=%s line=%u", func, line));
700
mutex_last_unlocked_at_line= line;
701
mutex_queue_data_locked= FALSE;
702
mutex_last_unlocked_in_func= func;
703
pthread_mutex_unlock(&LOCK_event_queue);
709
Wrapper for pthread_cond_wait/timedwait
712
Event_queue::cond_wait()
713
thd Thread (Could be NULL during shutdown procedure)
714
msg Message for thd->proc_info
715
abstime If not null then call pthread_cond_timedwait()
716
func Which function is requesting cond_wait
717
line On which line cond_wait is requested
721
Event_queue::cond_wait(THD *thd, struct timespec *abstime, const char* msg,
722
const char *func, uint line)
724
DBUG_ENTER("Event_queue::cond_wait");
725
waiting_on_cond= TRUE;
726
mutex_last_unlocked_at_line= line;
727
mutex_queue_data_locked= FALSE;
728
mutex_last_unlocked_in_func= func;
730
thd->enter_cond(&COND_queue_state, &LOCK_event_queue, msg);
732
DBUG_PRINT("info", ("pthread_cond_%swait", abstime? "timed":""));
734
pthread_cond_wait(&COND_queue_state, &LOCK_event_queue);
736
pthread_cond_timedwait(&COND_queue_state, &LOCK_event_queue, abstime);
738
mutex_last_locked_in_func= func;
739
mutex_last_locked_at_line= line;
740
mutex_queue_data_locked= TRUE;
741
waiting_on_cond= FALSE;
744
This will free the lock so we need to relock. Not the best thing to
745
do but we need to obey cond_wait()
748
lock_data(func, line);
755
Dumps the internal status of the queue
758
Event_queue::dump_internal_status()
762
Event_queue::dump_internal_status()
764
DBUG_ENTER("Event_queue::dump_internal_status");
768
puts("Event queue status:");
769
printf("Element count : %u\n", queue.elements);
770
printf("Data locked : %s\n", mutex_queue_data_locked? "YES":"NO");
771
printf("Attempting lock : %s\n", mutex_queue_data_attempting_lock? "YES":"NO");
772
printf("LLA : %s:%u\n", mutex_last_locked_in_func,
773
mutex_last_locked_at_line);
774
printf("LUA : %s:%u\n", mutex_last_unlocked_in_func,
775
mutex_last_unlocked_at_line);
776
if (mutex_last_attempted_lock_at_line)
777
printf("Last lock attempt at: %s:%u\n", mutex_last_attempted_lock_in_func,
778
mutex_last_attempted_lock_at_line);
779
printf("WOC : %s\n", waiting_on_cond? "YES":"NO");
782
my_tz_OFFSET0->gmt_sec_to_TIME(&time, next_activation_at);
783
if (time.year != 1970)
784
printf("Next activation : %04d-%02d-%02d %02d:%02d:%02d\n",
785
time.year, time.month, time.day, time.hour, time.minute, time.second);
787
printf("Next activation : never");
793
@} (End of group Event_Scheduler)