~ubuntu-branches/ubuntu/trusty/mysql-5.6/trusty

« back to all changes in this revision

Viewing changes to sql/rpl_rli_pdb.h

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-02-12 11:54:27 UTC
  • Revision ID: package-import@ubuntu.com-20140212115427-oq6tfsqxl1wuwehi
Tags: upstream-5.6.15
ImportĀ upstreamĀ versionĀ 5.6.15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#ifndef RPL_RLI_PDB_H
 
2
 
 
3
#define RPL_RLI_PDB_H
 
4
 
 
5
#ifdef HAVE_REPLICATION
 
6
 
 
7
#include "sql_string.h"
 
8
#include "rpl_rli.h"
 
9
#include <my_sys.h>
 
10
#include <my_bitmap.h>
 
11
#include "rpl_slave.h"
 
12
 
 
13
/**
 
14
  Legends running throughout the module:
 
15
 
 
16
  C  - Coordinator
 
17
  CP - checkpoint
 
18
  W  - Worker
 
19
 
 
20
  B-event event that Begins a group (a transaction)
 
21
  T-event event that Terminates a group (a transaction)
 
22
*/
 
23
 
 
24
/* Assigned Partition Hash (APH) entry */
 
25
typedef struct st_db_worker_hash_entry
 
26
{
 
27
  uint  db_len;
 
28
  const char *db;
 
29
  Slave_worker *worker;
 
30
  /*
 
31
    The number of transaction pending on this database.
 
32
    This should only be modified under the lock slave_worker_hash_lock.
 
33
   */
 
34
  long usage;
 
35
  /*
 
36
    The list of temp tables belonging to @ db database is
 
37
    attached to an assigned @c worker to become its thd->temporary_tables.
 
38
    The list is updated with every ddl incl CREATE, DROP.
 
39
    It is removed from the entry and merged to the coordinator's
 
40
    thd->temporary_tables in case of events: slave stops, APH oversize.
 
41
  */
 
42
  TABLE* volatile temporary_tables;
 
43
 
 
44
  /* todo: relax concurrency to mimic record-level locking.
 
45
     That is to augmenting the entry with mutex/cond pair
 
46
     pthread_mutex_t
 
47
     pthread_cond_t
 
48
     timestamp updated_at; */
 
49
 
 
50
} db_worker_hash_entry;
 
51
 
 
52
bool init_hash_workers(ulong slave_parallel_workers);
 
53
void destroy_hash_workers(Relay_log_info*);
 
54
Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
 
55
                               db_worker_hash_entry **ptr_entry,
 
56
                               bool need_temp_tables, Slave_worker *w);
 
57
Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
 
58
int wait_for_workers_to_finish(Relay_log_info const *rli,
 
59
                               Slave_worker *ignore= NULL);
 
60
 
 
61
#define SLAVE_INIT_DBS_IN_GROUP 4     // initial allocation for CGEP dynarray
 
62
 
 
63
#define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
 
64
 
 
65
typedef struct slave_job_item
 
66
{
 
67
  void *data;
 
68
} Slave_job_item;
 
69
 
 
70
/**
 
71
   The class defines a type of queue with a predefined max size that is
 
72
   implemented using the circular memory buffer.
 
73
   That is items of the queue are accessed as indexed elements of
 
74
   the array buffer in a way that when the index value reaches
 
75
   a max value it wraps around to point to the first buffer element.
 
76
*/
 
77
class circular_buffer_queue
 
78
{
 
79
public:
 
80
 
 
81
  DYNAMIC_ARRAY Q;
 
82
  ulong size;           // the Size of the queue in terms of element
 
83
  ulong avail;          // first Available index to append at (next to tail)
 
84
  ulong entry;          // the head index or the entry point to the queue.
 
85
  volatile ulong len;   // actual length
 
86
  bool inited_queue;
 
87
 
 
88
  circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
 
89
    size(max), avail(0), entry(max), len(0), inited_queue(FALSE)
 
90
  {
 
91
    DBUG_ASSERT(size < (ulong) -1);
 
92
    if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc))
 
93
      inited_queue= TRUE;
 
94
  }
 
95
  circular_buffer_queue () : inited_queue(FALSE) {}
 
96
  ~circular_buffer_queue ()
 
97
  {
 
98
    if (inited_queue)
 
99
      delete_dynamic(&Q);
 
100
  }
 
101
 
 
102
   /**
 
103
      Content of the being dequeued item is copied to the arg-pointer
 
104
      location.
 
105
      
 
106
      @return the queue's array index that the de-queued item
 
107
      located at, or
 
108
      an error encoded in beyond the index legacy range.
 
109
   */
 
110
  ulong de_queue(uchar *);
 
111
  /**
 
112
     Similar to de_queue but extracting happens from the tail side.
 
113
  */
 
114
  ulong de_tail(uchar *val);
 
115
 
 
116
  /**
 
117
    return the index where the arg item locates
 
118
           or an error encoded as a value in beyond of the legacy range
 
119
           [0, size) (value `size' is excluded).
 
120
  */
 
121
  ulong en_queue(void *item);
 
122
  /**
 
123
     return the value of @c data member of the head of the queue.
 
124
  */
 
125
  void* head_queue();
 
126
  bool   gt(ulong i, ulong k); // comparision of ordering of two entities
 
127
  /* index is within the valid range */
 
128
  bool in(ulong k) { return !empty() && 
 
129
      (entry > avail ? (k >= entry || k < avail) : (k >= entry && k < avail)); }
 
130
  bool empty() { return entry == size; }
 
131
  bool full() { return avail == size; }
 
132
};
 
133
 
 
134
typedef struct st_slave_job_group
 
135
{
 
136
  char *group_master_log_name;   // (actually redundant)
 
137
  /*
 
138
    T-event lop_pos filled by Worker for CheckPoint (CP)
 
139
  */
 
140
  my_off_t group_master_log_pos;
 
141
 
 
142
  /* 
 
143
     When relay-log name changes  allocates and fill in a new name of relay-log,
 
144
     otherwise it fills in NULL.
 
145
     Coordinator keeps track of each Worker has been notified on the updating
 
146
     to make sure the routine runs once per change.
 
147
 
 
148
     W checks the value at commit and memoriezes a not-NULL.
 
149
     Freeing unless NULL is left to Coordinator at CP.
 
150
  */
 
151
  char     *group_relay_log_name; // The value is last seen relay-log 
 
152
  my_off_t group_relay_log_pos;  // filled by W
 
153
  ulong worker_id;
 
154
  Slave_worker *worker;
 
155
  ulonglong total_seqno;
 
156
 
 
157
  my_off_t master_log_pos;       // B-event log_pos
 
158
  /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
 
159
  uint  checkpoint_seqno;
 
160
  my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
 
161
  char*    checkpoint_log_name;
 
162
  my_off_t checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
 
163
  char*    checkpoint_relay_log_name;
 
164
  volatile uchar done;  // Flag raised by W,  read and reset by Coordinator
 
165
  ulong    shifted;     // shift the last CP bitmap at receiving a new CP
 
166
  time_t   ts;          // Group's timestampt to update Seconds_behind_master
 
167
#ifndef DBUG_OFF
 
168
  bool     notified;    // to debug group_master_log_name change notification
 
169
#endif
 
170
  /*
 
171
    Coordinator fills the struct with defaults and options at starting of 
 
172
    a group distribution.
 
173
  */
 
174
  void reset(my_off_t master_pos, ulonglong seqno)
 
175
  {
 
176
    master_log_pos= master_pos;
 
177
    group_master_log_pos= group_relay_log_pos= 0;
 
178
    group_master_log_name= NULL; // todo: remove
 
179
    group_relay_log_name= NULL;
 
180
    worker_id= MTS_WORKER_UNDEF;
 
181
    total_seqno= seqno;
 
182
    checkpoint_log_name= NULL;
 
183
    checkpoint_log_pos= 0;
 
184
    checkpoint_relay_log_name= NULL;
 
185
    checkpoint_relay_log_pos= 0;
 
186
    checkpoint_seqno= (uint) -1;
 
187
    done= 0;
 
188
#ifndef DBUG_OFF
 
189
    notified= false;
 
190
#endif
 
191
  }
 
192
} Slave_job_group;
 
193
 
 
194
/**
 
195
  Group Assigned Queue whose first element identifies first gap
 
196
  in committed sequence. The head of the queue is therefore next to 
 
197
  the low-water-mark.
 
198
*/
 
199
class Slave_committed_queue : public circular_buffer_queue
 
200
{
 
201
public:
 
202
  
 
203
  bool inited;
 
204
 
 
205
  /* master's Rot-ev exec */
 
206
  void update_current_binlog(const char *post_rotate);
 
207
 
 
208
  /*
 
209
     The last checkpoint time Low-Water-Mark
 
210
  */
 
211
  Slave_job_group lwm;
 
212
  
 
213
  /* last time processed indexes for each worker */
 
214
  DYNAMIC_ARRAY last_done;
 
215
 
 
216
  /* the being assigned group index in GAQ */
 
217
  ulong assigned_group_index;
 
218
 
 
219
  Slave_committed_queue (const char *log, uint el_size, ulong max, uint n,
 
220
                         uint inc= 0)
 
221
    : circular_buffer_queue(el_size, max, inc), inited(FALSE)
 
222
  {
 
223
    uint k;
 
224
    ulonglong l= 0;
 
225
    
 
226
    if (max >= (ulong) -1 || !circular_buffer_queue::inited_queue)
 
227
      return;
 
228
    else
 
229
      inited= TRUE;
 
230
    my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0);
 
231
    for (k= 0; k < n; k++)
 
232
      insert_dynamic(&last_done, (uchar*) &l);  // empty for each Worker
 
233
    lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0));
 
234
    lwm.group_relay_log_name[0]= 0;
 
235
  }
 
236
 
 
237
  ~Slave_committed_queue ()
 
238
  { 
 
239
    if (inited)
 
240
    {
 
241
      delete_dynamic(&last_done);
 
242
      my_free(lwm.group_relay_log_name);
 
243
      free_dynamic_items();  // free possibly left allocated strings in GAQ list
 
244
    }
 
245
  }
 
246
 
 
247
#ifndef DBUG_OFF
 
248
  bool count_done(Relay_log_info* rli);
 
249
#endif
 
250
 
 
251
  /* Checkpoint routine refreshes the queue */
 
252
  ulong move_queue_head(DYNAMIC_ARRAY *ws);
 
253
  /* Method is for slave shutdown time cleanup */
 
254
  void free_dynamic_items();
 
255
  /* 
 
256
     returns a pointer to Slave_job_group struct instance as indexed by arg
 
257
     in the circular buffer dyn-array 
 
258
  */
 
259
  Slave_job_group* get_job_group(ulong ind)
 
260
  {
 
261
    return (Slave_job_group*) dynamic_array_ptr(&Q, ind);
 
262
  }
 
263
 
 
264
  /**
 
265
     Assignes @c assigned_group_index to an index of enqueued item
 
266
     and returns it.
 
267
  */
 
268
  ulong en_queue(void *item)
 
269
  {
 
270
    return assigned_group_index= circular_buffer_queue::en_queue(item);
 
271
  }
 
272
 
 
273
};
 
274
 
 
275
class Slave_jobs_queue : public circular_buffer_queue
 
276
{
 
277
public:
 
278
 
 
279
  /* 
 
280
     Coordinator marks with true, Worker signals back at queue back to
 
281
     available
 
282
  */
 
283
  bool overfill;
 
284
  ulonglong waited_overfill;
 
285
};
 
286
 
 
287
class Slave_worker : public Relay_log_info
 
288
{
 
289
public:
 
290
  Slave_worker(Relay_log_info *rli
 
291
#ifdef HAVE_PSI_INTERFACE
 
292
               ,PSI_mutex_key *param_key_info_run_lock,
 
293
               PSI_mutex_key *param_key_info_data_lock,
 
294
               PSI_mutex_key *param_key_info_sleep_lock,
 
295
               PSI_mutex_key *param_key_info_data_cond,
 
296
               PSI_mutex_key *param_key_info_start_cond,
 
297
               PSI_mutex_key *param_key_info_stop_cond,
 
298
               PSI_mutex_key *param_key_info_sleep_cond
 
299
#endif
 
300
               , uint param_id
 
301
              );
 
302
  virtual ~Slave_worker();
 
303
 
 
304
  Slave_jobs_queue jobs;   // assignment queue containing events to execute
 
305
  mysql_mutex_t jobs_lock; // mutex for the jobs queue
 
306
  mysql_cond_t  jobs_cond; // condition variable for the jobs queue
 
307
  Relay_log_info *c_rli;   // pointer to Coordinator's rli
 
308
  DYNAMIC_ARRAY curr_group_exec_parts; // Current Group Executed Partitions
 
309
  bool curr_group_seen_begin; // is set to TRUE with explicit B-event
 
310
  ulong id;                 // numberic identifier of the Worker
 
311
 
 
312
  /*
 
313
    Worker runtime statictics
 
314
  */
 
315
  // the index in GAQ of the last processed group by this Worker
 
316
  volatile ulong last_group_done_index;
 
317
  ulong wq_empty_waits;  // how many times got idle
 
318
  ulong events_done;     // how many events (statements) processed
 
319
  ulong groups_done;     // how many groups (transactions) processed
 
320
  volatile int curr_jobs; // number of active  assignments
 
321
  // number of partitions allocated to the worker at point in time
 
322
  long usage_partition;
 
323
  // symmetric to rli->mts_end_group_sets_max_dbs
 
324
  bool end_group_sets_max_dbs;
 
325
 
 
326
  volatile bool relay_log_change_notified; // Coord sets and resets, W can read
 
327
  volatile bool checkpoint_notified; // Coord sets and resets, W can read
 
328
  volatile bool master_log_change_notified; // Coord sets and resets, W can read
 
329
  ulong bitmap_shifted;  // shift the last bitmap at receiving new CP
 
330
  // WQ current excess above the overrun level
 
331
  long wq_overrun_cnt;
 
332
  /*
 
333
    number of events starting from which Worker queue is regarded as
 
334
    close to full. The number of the excessive events yields a weight factor
 
335
    to compute Coordinator's nap.
 
336
  */
 
337
  ulong overrun_level;
 
338
  /*
 
339
     reverse to overrun: the number of events below which Worker is
 
340
     considered underruning
 
341
  */
 
342
  ulong underrun_level;
 
343
  /*
 
344
    Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker.
 
345
    When WQ length is dropped below overrun the counter is reset.
 
346
  */
 
347
  ulong excess_cnt;
 
348
  /*
 
349
    Coordinates of the last CheckPoint (CP) this Worker has
 
350
    acknowledged; part of is persisent data
 
351
  */
 
352
  char checkpoint_relay_log_name[FN_REFLEN];
 
353
  ulonglong checkpoint_relay_log_pos;
 
354
  char checkpoint_master_log_name[FN_REFLEN];
 
355
  ulonglong checkpoint_master_log_pos;
 
356
  MY_BITMAP group_executed; // bitmap describes groups executed after last CP
 
357
  MY_BITMAP group_shifted;  // temporary bitmap to compute group_executed
 
358
  ulong checkpoint_seqno;   // the most significant ON bit in group_executed
 
359
  enum en_running_state
 
360
  {
 
361
    NOT_RUNNING= 0,
 
362
    RUNNING= 1,
 
363
    ERROR_LEAVING,         // is set by Worker
 
364
    KILLED                 // is set by Coordinator
 
365
  };
 
366
  /*
 
367
    The running status is guarded by jobs_lock mutex that a writer
 
368
    Coordinator or Worker itself needs to hold when write a new value.
 
369
  */
 
370
  en_running_state volatile running_status;
 
371
 
 
372
  int init_worker(Relay_log_info*, ulong);
 
373
  int rli_init_info(bool);
 
374
  int flush_info(bool force= FALSE);
 
375
  static size_t get_number_worker_fields();
 
376
  void slave_worker_ends_group(Log_event*, int);
 
377
  const char *get_master_log_name();
 
378
  ulonglong get_master_log_pos() { return master_log_pos; };
 
379
  ulonglong set_master_log_pos(ulong val) { return master_log_pos= val; };
 
380
  bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force);
 
381
  bool reset_recovery_info();
 
382
  /**
 
383
     Different from the parent method in that this does not delete
 
384
     rli_description_event.
 
385
     The method runs by Coordinator when Worker are synched or being
 
386
     destroyed.
 
387
  */
 
388
  void set_rli_description_event(Format_description_log_event *fdle)
 
389
  {
 
390
    DBUG_ASSERT(!fdle || (running_status == Slave_worker::RUNNING && info_thd));
 
391
#ifndef DBUG_OFF
 
392
    if (fdle)
 
393
      mysql_mutex_assert_owner(&jobs_lock);
 
394
#endif
 
395
 
 
396
    if (fdle)
 
397
      adapt_to_master_version(fdle);
 
398
    rli_description_event= fdle;
 
399
  }
 
400
 
 
401
  inline void reset_gaq_index() { gaq_index= c_rli->gaq->size; };
 
402
  inline void set_gaq_index(ulong val)
 
403
  { 
 
404
    if (gaq_index == c_rli->gaq->size)
 
405
      gaq_index= val;
 
406
  };
 
407
 
 
408
protected:
 
409
 
 
410
  virtual void do_report(loglevel level, int err_code,
 
411
                         const char *msg, va_list v_args) const;
 
412
 
 
413
private:
 
414
  ulong gaq_index;          // GAQ index of the current assignment 
 
415
  ulonglong master_log_pos; // event's cached log_pos for possibile error report
 
416
  void end_info();
 
417
  bool read_info(Rpl_info_handler *from);
 
418
  bool write_info(Rpl_info_handler *to);
 
419
  Slave_worker& operator=(const Slave_worker& info);
 
420
  Slave_worker(const Slave_worker& info);
 
421
};
 
422
 
 
423
TABLE* mts_move_temp_table_to_entry(TABLE*, THD*, db_worker_hash_entry*);
 
424
TABLE* mts_move_temp_tables_to_thd(THD*, TABLE*);
 
425
#endif // HAVE_REPLICATION
 
426
#endif