~vadim-tk/percona-server/flushing-algo

« back to all changes in this revision

Viewing changes to sql/rpl_handler.h

  • Committer: root
  • Date: 2011-10-29 01:34:40 UTC
  • Revision ID: root@hppro1.office.percona.com-20111029013440-qhnf4jk8kdjcf4e0
Initial import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
 
15
 
 
16
#ifndef RPL_HANDLER_H
 
17
#define RPL_HANDLER_H
 
18
 
 
19
#include "sql_priv.h"
 
20
#include "rpl_mi.h"
 
21
#include "rpl_rli.h"
 
22
#include "sql_plugin.h"
 
23
#include "replication.h"
 
24
 
 
25
class Observer_info {
 
26
public:
 
27
  void *observer;
 
28
  st_plugin_int *plugin_int;
 
29
  plugin_ref plugin;
 
30
 
 
31
  Observer_info(void *ob, st_plugin_int *p)
 
32
    :observer(ob), plugin_int(p)
 
33
  {
 
34
    plugin= plugin_int_to_ref(plugin_int);
 
35
  }
 
36
};
 
37
 
 
38
class Delegate {
 
39
public:
 
40
  typedef List<Observer_info> Observer_info_list;
 
41
  typedef List_iterator<Observer_info> Observer_info_iterator;
 
42
  
 
43
  int add_observer(void *observer, st_plugin_int *plugin)
 
44
  {
 
45
    int ret= FALSE;
 
46
    if (!inited)
 
47
      return TRUE;
 
48
    write_lock();
 
49
    Observer_info_iterator iter(observer_info_list);
 
50
    Observer_info *info= iter++;
 
51
    while (info && info->observer != observer)
 
52
      info= iter++;
 
53
    if (!info)
 
54
    {
 
55
      info= new Observer_info(observer, plugin);
 
56
      if (!info || observer_info_list.push_back(info, &memroot))
 
57
        ret= TRUE;
 
58
    }
 
59
    else
 
60
      ret= TRUE;
 
61
    unlock();
 
62
    return ret;
 
63
  }
 
64
  
 
65
  int remove_observer(void *observer, st_plugin_int *plugin)
 
66
  {
 
67
    int ret= FALSE;
 
68
    if (!inited)
 
69
      return TRUE;
 
70
    write_lock();
 
71
    Observer_info_iterator iter(observer_info_list);
 
72
    Observer_info *info= iter++;
 
73
    while (info && info->observer != observer)
 
74
      info= iter++;
 
75
    if (info)
 
76
    {
 
77
      iter.remove();
 
78
      delete info;
 
79
    }
 
80
    else
 
81
      ret= TRUE;
 
82
    unlock();
 
83
    return ret;
 
84
  }
 
85
 
 
86
  inline Observer_info_iterator observer_info_iter()
 
87
  {
 
88
    return Observer_info_iterator(observer_info_list);
 
89
  }
 
90
 
 
91
  inline bool is_empty()
 
92
  {
 
93
    return observer_info_list.is_empty();
 
94
  }
 
95
 
 
96
  inline int read_lock()
 
97
  {
 
98
    if (!inited)
 
99
      return TRUE;
 
100
    return rw_rdlock(&lock);
 
101
  }
 
102
 
 
103
  inline int write_lock()
 
104
  {
 
105
    if (!inited)
 
106
      return TRUE;
 
107
    return rw_wrlock(&lock);
 
108
  }
 
109
 
 
110
  inline int unlock()
 
111
  {
 
112
    if (!inited)
 
113
      return TRUE;
 
114
    return rw_unlock(&lock);
 
115
  }
 
116
 
 
117
  inline bool is_inited()
 
118
  {
 
119
    return inited;
 
120
  }
 
121
  
 
122
  Delegate()
 
123
  {
 
124
    inited= FALSE;
 
125
    if (my_rwlock_init(&lock, NULL))
 
126
      return;
 
127
    init_sql_alloc(&memroot, 1024, 0);
 
128
    inited= TRUE;
 
129
  }
 
130
  ~Delegate()
 
131
  {
 
132
    inited= FALSE;
 
133
    rwlock_destroy(&lock);
 
134
    free_root(&memroot, MYF(0));
 
135
  }
 
136
 
 
137
private:
 
138
  Observer_info_list observer_info_list;
 
139
  rw_lock_t lock;
 
140
  MEM_ROOT memroot;
 
141
  bool inited;
 
142
};
 
143
 
 
144
class Trans_delegate
 
145
  :public Delegate {
 
146
public:
 
147
  typedef Trans_observer Observer;
 
148
  int before_commit(THD *thd, bool all);
 
149
  int before_rollback(THD *thd, bool all);
 
150
  int after_commit(THD *thd, bool all);
 
151
  int after_rollback(THD *thd, bool all);
 
152
};
 
153
 
 
154
class Binlog_storage_delegate
 
155
  :public Delegate {
 
156
public:
 
157
  typedef Binlog_storage_observer Observer;
 
158
  int after_flush(THD *thd, const char *log_file,
 
159
                  my_off_t log_pos, bool synced);
 
160
};
 
161
 
 
162
#ifdef HAVE_REPLICATION
 
163
class Binlog_transmit_delegate
 
164
  :public Delegate {
 
165
public:
 
166
  typedef Binlog_transmit_observer Observer;
 
167
  int transmit_start(THD *thd, ushort flags,
 
168
                     const char *log_file, my_off_t log_pos);
 
169
  int transmit_stop(THD *thd, ushort flags);
 
170
  int reserve_header(THD *thd, ushort flags, String *packet);
 
171
  int before_send_event(THD *thd, ushort flags,
 
172
                        String *packet, const
 
173
                        char *log_file, my_off_t log_pos );
 
174
  int after_send_event(THD *thd, ushort flags,
 
175
                       String *packet);
 
176
  int after_reset_master(THD *thd, ushort flags);
 
177
};
 
178
 
 
179
class Binlog_relay_IO_delegate
 
180
  :public Delegate {
 
181
public:
 
182
  typedef Binlog_relay_IO_observer Observer;
 
183
  int thread_start(THD *thd, Master_info *mi);
 
184
  int thread_stop(THD *thd, Master_info *mi);
 
185
  int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
 
186
  int after_read_event(THD *thd, Master_info *mi,
 
187
                       const char *packet, ulong len,
 
188
                       const char **event_buf, ulong *event_len);
 
189
  int after_queue_event(THD *thd, Master_info *mi,
 
190
                        const char *event_buf, ulong event_len,
 
191
                        bool synced);
 
192
  int after_reset_slave(THD *thd, Master_info *mi);
 
193
private:
 
194
  void init_param(Binlog_relay_IO_param *param, Master_info *mi);
 
195
};
 
196
#endif /* HAVE_REPLICATION */
 
197
 
 
198
int delegates_init();
 
199
void delegates_destroy();
 
200
 
 
201
extern Trans_delegate *transaction_delegate;
 
202
extern Binlog_storage_delegate *binlog_storage_delegate;
 
203
#ifdef HAVE_REPLICATION
 
204
extern Binlog_transmit_delegate *binlog_transmit_delegate;
 
205
extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
 
206
#endif /* HAVE_REPLICATION */
 
207
 
 
208
/*
 
209
  if there is no observers in the delegate, we can return 0
 
210
  immediately.
 
211
*/
 
212
#define RUN_HOOK(group, hook, args)             \
 
213
  (group ##_delegate->is_empty() ?              \
 
214
   0 : group ##_delegate->hook args)
 
215
 
 
216
#endif /* RPL_HANDLER_H */