5
#ifdef HAVE_REPLICATION
7
#include "sql_string.h"
10
#include <my_bitmap.h>
11
#include "rpl_slave.h"
14
Legends running throughout the module:
20
B-event event that Begins a group (a transaction)
21
T-event event that Terminates a group (a transaction)
24
/* Assigned Partition Hash (APH) entry */
25
typedef struct st_db_worker_hash_entry
31
The number of transaction pending on this database.
32
This should only be modified under the lock slave_worker_hash_lock.
36
The list of temp tables belonging to @ db database is
37
attached to an assigned @c worker to become its thd->temporary_tables.
38
The list is updated with every ddl incl CREATE, DROP.
39
It is removed from the entry and merged to the coordinator's
40
thd->temporary_tables in case of events: slave stops, APH oversize.
42
TABLE* volatile temporary_tables;
44
/* todo: relax concurrency to mimic record-level locking.
45
That is to augmenting the entry with mutex/cond pair
48
timestamp updated_at; */
50
} db_worker_hash_entry;
52
bool init_hash_workers(ulong slave_parallel_workers);
53
void destroy_hash_workers(Relay_log_info*);
54
Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
55
db_worker_hash_entry **ptr_entry,
56
bool need_temp_tables, Slave_worker *w);
57
Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
58
int wait_for_workers_to_finish(Relay_log_info const *rli,
59
Slave_worker *ignore= NULL);
61
#define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
63
#define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
65
typedef struct slave_job_item
71
The class defines a type of queue with a predefined max size that is
72
implemented using the circular memory buffer.
73
That is items of the queue are accessed as indexed elements of
74
the array buffer in a way that when the index value reaches
75
a max value it wraps around to point to the first buffer element.
77
class circular_buffer_queue
82
ulong size; // the Size of the queue in terms of element
83
ulong avail; // first Available index to append at (next to tail)
84
ulong entry; // the head index or the entry point to the queue.
85
volatile ulong len; // actual length
88
circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
89
size(max), avail(0), entry(max), len(0), inited_queue(FALSE)
91
DBUG_ASSERT(size < (ulong) -1);
92
if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc))
95
circular_buffer_queue () : inited_queue(FALSE) {}
96
~circular_buffer_queue ()
103
Content of the being dequeued item is copied to the arg-pointer
106
@return the queue's array index that the de-queued item
108
an error encoded in beyond the index legacy range.
110
ulong de_queue(uchar *);
112
Similar to de_queue but extracting happens from the tail side.
114
ulong de_tail(uchar *val);
117
return the index where the arg item locates
118
or an error encoded as a value in beyond of the legacy range
119
[0, size) (value `size' is excluded).
121
ulong en_queue(void *item);
123
return the value of @c data member of the head of the queue.
126
bool gt(ulong i, ulong k); // comparision of ordering of two entities
127
/* index is within the valid range */
128
bool in(ulong k) { return !empty() &&
129
(entry > avail ? (k >= entry || k < avail) : (k >= entry && k < avail)); }
130
bool empty() { return entry == size; }
131
bool full() { return avail == size; }
134
typedef struct st_slave_job_group
136
char *group_master_log_name; // (actually redundant)
138
T-event lop_pos filled by Worker for CheckPoint (CP)
140
my_off_t group_master_log_pos;
143
When relay-log name changes allocates and fill in a new name of relay-log,
144
otherwise it fills in NULL.
145
Coordinator keeps track of each Worker has been notified on the updating
146
to make sure the routine runs once per change.
148
W checks the value at commit and memoriezes a not-NULL.
149
Freeing unless NULL is left to Coordinator at CP.
151
char *group_relay_log_name; // The value is last seen relay-log
152
my_off_t group_relay_log_pos; // filled by W
154
Slave_worker *worker;
155
ulonglong total_seqno;
157
my_off_t master_log_pos; // B-event log_pos
158
/* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
159
uint checkpoint_seqno;
160
my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
161
char* checkpoint_log_name;
162
my_off_t checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
163
char* checkpoint_relay_log_name;
164
volatile uchar done; // Flag raised by W, read and reset by Coordinator
165
ulong shifted; // shift the last CP bitmap at receiving a new CP
166
time_t ts; // Group's timestampt to update Seconds_behind_master
168
bool notified; // to debug group_master_log_name change notification
171
Coordinator fills the struct with defaults and options at starting of
172
a group distribution.
174
void reset(my_off_t master_pos, ulonglong seqno)
176
master_log_pos= master_pos;
177
group_master_log_pos= group_relay_log_pos= 0;
178
group_master_log_name= NULL; // todo: remove
179
group_relay_log_name= NULL;
180
worker_id= MTS_WORKER_UNDEF;
182
checkpoint_log_name= NULL;
183
checkpoint_log_pos= 0;
184
checkpoint_relay_log_name= NULL;
185
checkpoint_relay_log_pos= 0;
186
checkpoint_seqno= (uint) -1;
195
Group Assigned Queue whose first element identifies first gap
196
in committed sequence. The head of the queue is therefore next to
199
class Slave_committed_queue : public circular_buffer_queue
205
/* master's Rot-ev exec */
206
void update_current_binlog(const char *post_rotate);
209
The last checkpoint time Low-Water-Mark
213
/* last time processed indexes for each worker */
214
DYNAMIC_ARRAY last_done;
216
/* the being assigned group index in GAQ */
217
ulong assigned_group_index;
219
Slave_committed_queue (const char *log, uint el_size, ulong max, uint n,
221
: circular_buffer_queue(el_size, max, inc), inited(FALSE)
226
if (max >= (ulong) -1 || !circular_buffer_queue::inited_queue)
230
my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0);
231
for (k= 0; k < n; k++)
232
insert_dynamic(&last_done, (uchar*) &l); // empty for each Worker
233
lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0));
234
lwm.group_relay_log_name[0]= 0;
237
~Slave_committed_queue ()
241
delete_dynamic(&last_done);
242
my_free(lwm.group_relay_log_name);
243
free_dynamic_items(); // free possibly left allocated strings in GAQ list
248
bool count_done(Relay_log_info* rli);
251
/* Checkpoint routine refreshes the queue */
252
ulong move_queue_head(DYNAMIC_ARRAY *ws);
253
/* Method is for slave shutdown time cleanup */
254
void free_dynamic_items();
256
returns a pointer to Slave_job_group struct instance as indexed by arg
257
in the circular buffer dyn-array
259
Slave_job_group* get_job_group(ulong ind)
261
return (Slave_job_group*) dynamic_array_ptr(&Q, ind);
265
Assignes @c assigned_group_index to an index of enqueued item
268
ulong en_queue(void *item)
270
return assigned_group_index= circular_buffer_queue::en_queue(item);
275
class Slave_jobs_queue : public circular_buffer_queue
280
Coordinator marks with true, Worker signals back at queue back to
284
ulonglong waited_overfill;
287
class Slave_worker : public Relay_log_info
290
Slave_worker(Relay_log_info *rli
291
#ifdef HAVE_PSI_INTERFACE
292
,PSI_mutex_key *param_key_info_run_lock,
293
PSI_mutex_key *param_key_info_data_lock,
294
PSI_mutex_key *param_key_info_sleep_lock,
295
PSI_mutex_key *param_key_info_data_cond,
296
PSI_mutex_key *param_key_info_start_cond,
297
PSI_mutex_key *param_key_info_stop_cond,
298
PSI_mutex_key *param_key_info_sleep_cond
302
virtual ~Slave_worker();
304
Slave_jobs_queue jobs; // assignment queue containing events to execute
305
mysql_mutex_t jobs_lock; // mutex for the jobs queue
306
mysql_cond_t jobs_cond; // condition variable for the jobs queue
307
Relay_log_info *c_rli; // pointer to Coordinator's rli
308
DYNAMIC_ARRAY curr_group_exec_parts; // Current Group Executed Partitions
309
bool curr_group_seen_begin; // is set to TRUE with explicit B-event
310
ulong id; // numberic identifier of the Worker
313
Worker runtime statictics
315
// the index in GAQ of the last processed group by this Worker
316
volatile ulong last_group_done_index;
317
ulong wq_empty_waits; // how many times got idle
318
ulong events_done; // how many events (statements) processed
319
ulong groups_done; // how many groups (transactions) processed
320
volatile int curr_jobs; // number of active assignments
321
// number of partitions allocated to the worker at point in time
322
long usage_partition;
323
// symmetric to rli->mts_end_group_sets_max_dbs
324
bool end_group_sets_max_dbs;
326
volatile bool relay_log_change_notified; // Coord sets and resets, W can read
327
volatile bool checkpoint_notified; // Coord sets and resets, W can read
328
volatile bool master_log_change_notified; // Coord sets and resets, W can read
329
ulong bitmap_shifted; // shift the last bitmap at receiving new CP
330
// WQ current excess above the overrun level
333
number of events starting from which Worker queue is regarded as
334
close to full. The number of the excessive events yields a weight factor
335
to compute Coordinator's nap.
339
reverse to overrun: the number of events below which Worker is
340
considered underruning
342
ulong underrun_level;
344
Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker.
345
When WQ length is dropped below overrun the counter is reset.
349
Coordinates of the last CheckPoint (CP) this Worker has
350
acknowledged; part of is persisent data
352
char checkpoint_relay_log_name[FN_REFLEN];
353
ulonglong checkpoint_relay_log_pos;
354
char checkpoint_master_log_name[FN_REFLEN];
355
ulonglong checkpoint_master_log_pos;
356
MY_BITMAP group_executed; // bitmap describes groups executed after last CP
357
MY_BITMAP group_shifted; // temporary bitmap to compute group_executed
358
ulong checkpoint_seqno; // the most significant ON bit in group_executed
359
enum en_running_state
363
ERROR_LEAVING, // is set by Worker
364
KILLED // is set by Coordinator
367
The running status is guarded by jobs_lock mutex that a writer
368
Coordinator or Worker itself needs to hold when write a new value.
370
en_running_state volatile running_status;
372
int init_worker(Relay_log_info*, ulong);
373
int rli_init_info(bool);
374
int flush_info(bool force= FALSE);
375
static size_t get_number_worker_fields();
376
void slave_worker_ends_group(Log_event*, int);
377
const char *get_master_log_name();
378
ulonglong get_master_log_pos() { return master_log_pos; };
379
ulonglong set_master_log_pos(ulong val) { return master_log_pos= val; };
380
bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force);
381
bool reset_recovery_info();
383
Different from the parent method in that this does not delete
384
rli_description_event.
385
The method runs by Coordinator when Worker are synched or being
388
void set_rli_description_event(Format_description_log_event *fdle)
390
DBUG_ASSERT(!fdle || (running_status == Slave_worker::RUNNING && info_thd));
393
mysql_mutex_assert_owner(&jobs_lock);
397
adapt_to_master_version(fdle);
398
rli_description_event= fdle;
401
inline void reset_gaq_index() { gaq_index= c_rli->gaq->size; };
402
inline void set_gaq_index(ulong val)
404
if (gaq_index == c_rli->gaq->size)
410
virtual void do_report(loglevel level, int err_code,
411
const char *msg, va_list v_args) const;
414
ulong gaq_index; // GAQ index of the current assignment
415
ulonglong master_log_pos; // event's cached log_pos for possibile error report
417
bool read_info(Rpl_info_handler *from);
418
bool write_info(Rpl_info_handler *to);
419
Slave_worker& operator=(const Slave_worker& info);
420
Slave_worker(const Slave_worker& info);
423
TABLE* mts_move_temp_table_to_entry(TABLE*, THD*, db_worker_hash_entry*);
424
TABLE* mts_move_temp_tables_to_thd(THD*, TABLE*);
425
#endif // HAVE_REPLICATION