~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/ndbapi/TransporterFacade.hpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#ifndef TransporterFacade_H
 
17
#define TransporterFacade_H
 
18
 
 
19
#include <kernel_types.h>
 
20
#include <ndb_limits.h>
 
21
#include <NdbThread.h>
 
22
#include <TransporterRegistry.hpp>
 
23
#include <NdbMutex.h>
 
24
#include "DictCache.hpp"
 
25
#include <BlockNumbers.h>
 
26
#include <mgmapi.h>
 
27
 
 
28
class ClusterMgr;
 
29
class ArbitMgr;
 
30
class IPCConfig;
 
31
struct ndb_mgm_configuration;
 
32
class ConfigRetriever;
 
33
 
 
34
class Ndb;
 
35
class NdbApiSignal;
 
36
class NdbWaiter;
 
37
 
 
38
typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]);
 
39
typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete);
 
40
 
 
41
extern "C" {
 
42
  void* runSendRequest_C(void*);
 
43
  void* runReceiveResponse_C(void*);
 
44
  void atexit_stop_instance();
 
45
}
 
46
 
 
47
class TransporterFacade
 
48
{
 
49
public:
 
50
  /**
 
51
   * Max number of Ndb objects.  
 
52
   * (Ndb objects should not be shared by different threads.)
 
53
   */
 
54
  STATIC_CONST( MAX_NO_THREADS = 4711 );
 
55
  TransporterFacade();
 
56
  virtual ~TransporterFacade();
 
57
  bool init(Uint32, const ndb_mgm_configuration *);
 
58
 
 
59
  int start_instance(int, const ndb_mgm_configuration*);
 
60
  void stop_instance();
 
61
  
 
62
  /**
 
63
   * Register this block for sending/receiving signals
 
64
   * @return BlockNumber or -1 for failure
 
65
   */
 
66
  int open(void* objRef, ExecuteFunction, NodeStatusFunction);
 
67
  
 
68
  // Close this block number
 
69
  int close(BlockNumber blockNumber, Uint64 trans_id);
 
70
  Uint32 get_active_ndb_objects() const;
 
71
 
 
72
  // Only sends to nodes which are alive
 
73
  int sendSignal(NdbApiSignal * signal, NodeId nodeId);
 
74
  int sendSignal(NdbApiSignal*, NodeId, 
 
75
                 LinearSectionPtr ptr[3], Uint32 secs);
 
76
  int sendFragmentedSignal(NdbApiSignal*, NodeId, 
 
77
                           LinearSectionPtr ptr[3], Uint32 secs);
 
78
 
 
79
  // Is node available for running transactions
 
80
  bool   get_node_alive(NodeId nodeId) const;
 
81
  bool   get_node_stopping(NodeId nodeId) const;
 
82
  bool   getIsDbNode(NodeId nodeId) const;
 
83
  bool   getIsNodeSendable(NodeId nodeId) const;
 
84
  Uint32 getNodeGrp(NodeId nodeId) const;
 
85
  Uint32 getNodeSequence(NodeId nodeId) const;
 
86
 
 
87
  // Is there space in sendBuffer to send messages
 
88
  bool   check_send_size(Uint32 node_id, Uint32 send_size);
 
89
 
 
90
  // My own processor id
 
91
  NodeId ownId() const;
 
92
 
 
93
  void connected();
 
94
 
 
95
  void doConnect(int NodeId);
 
96
  void reportConnected(int NodeId);
 
97
  void doDisconnect(int NodeId);
 
98
  void reportDisconnected(int NodeId);
 
99
 
 
100
  NodeId get_an_alive_node();
 
101
  void ReportNodeAlive(NodeId nodeId);
 
102
  void ReportNodeDead(NodeId nodeId);
 
103
  void ReportNodeFailureComplete(NodeId nodeId);
 
104
 
 
105
  /**
 
106
   * Send signal to each registered object
 
107
   */
 
108
  void for_each(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]);
 
109
  
 
110
  void lock_mutex();
 
111
  void unlock_mutex();
 
112
 
 
113
  // Improving the API performance
 
114
  void forceSend(Uint32 block_number);
 
115
  void checkForceSend(Uint32 block_number);
 
116
 
 
117
  // Close this block number
 
118
  int close_local(BlockNumber blockNumber);
 
119
 
 
120
  // Scan batch configuration parameters
 
121
  Uint32 get_scan_batch_size();
 
122
  Uint32 get_batch_byte_size();
 
123
  Uint32 get_batch_size();
 
124
  Uint32 m_waitfor_timeout; // in milli seconds...
 
125
 
 
126
  TransporterRegistry* get_registry() { return theTransporterRegistry;};
 
127
 
 
128
/*
 
129
  When a thread has sent its signals and is ready to wait for reception
 
130
  of these it does normally always wait on a conditional mutex and
 
131
  the actual reception is handled by the receiver thread in the NDB API.
 
132
  With the below new methods and variables each thread has the possibility
 
133
  of becoming owner of the "right" to poll for signals. Effectually this
 
134
  means that the thread acts temporarily as a receiver thread.
 
135
  For the thread that succeeds in grabbing this "ownership" it will avoid
 
136
  a number of expensive calls to conditional mutex and even more expensive
 
137
  context switches to wake up.
 
138
  When an owner of the poll "right" has completed its own task it is likely
 
139
  that there are others still waiting. In this case we pick one of the
 
140
  threads as new owner of the poll "right". Since we want to switch owner
 
141
  as seldom as possible we always pick the last thread which is likely to
 
142
  be the last to complete its reception.
 
143
*/
 
144
  void external_poll(Uint32 wait_time);
 
145
  NdbWaiter* get_poll_owner(void) const { return poll_owner; }
 
146
  void set_poll_owner(NdbWaiter* new_owner) { poll_owner= new_owner; }
 
147
  Uint32 put_in_cond_wait_queue(NdbWaiter *aWaiter);
 
148
  void remove_from_cond_wait_queue(NdbWaiter *aWaiter);
 
149
  NdbWaiter* rem_last_from_cond_wait_queue();
 
150
  // heart beat received from a node (e.g. a signal came)
 
151
  void hb_received(NodeId n);
 
152
 
 
153
private:
 
154
  void init_cond_wait_queue();
 
155
  struct CondWaitQueueElement {
 
156
    NdbWaiter *cond_wait_object;
 
157
    Uint32 next_cond_wait;
 
158
    Uint32 prev_cond_wait;
 
159
  };
 
160
  NdbWaiter *poll_owner;
 
161
  CondWaitQueueElement cond_wait_array[MAX_NO_THREADS];
 
162
  Uint32 first_in_cond_wait;
 
163
  Uint32 first_free_cond_wait;
 
164
  Uint32 last_in_cond_wait;
 
165
  /* End poll owner stuff */
 
166
  /**
 
167
   * Send a signal unconditional of node status (used by ClusterMgr)
 
168
   */
 
169
  friend class ClusterMgr;
 
170
  friend class ArbitMgr;
 
171
  friend class MgmtSrvr;
 
172
  friend class SignalSender;
 
173
  friend class GrepPS;
 
174
  friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond
 
175
  friend class GrepSS;
 
176
  friend class Ndb;
 
177
  friend class Ndb_cluster_connection_impl;
 
178
  friend class NdbTransaction;
 
179
  
 
180
  int sendSignalUnCond(NdbApiSignal *, NodeId nodeId);
 
181
 
 
182
  bool isConnected(NodeId aNodeId);
 
183
  void doStop();
 
184
 
 
185
  TransporterRegistry* theTransporterRegistry;
 
186
  SocketServer m_socket_server;
 
187
  int sendPerformedLastInterval;
 
188
  int theOwnId;
 
189
 
 
190
  NodeId theStartNodeId;
 
191
 
 
192
  ClusterMgr* theClusterMgr;
 
193
  ArbitMgr* theArbitMgr;
 
194
  
 
195
  // Improving the API response time
 
196
  int checkCounter;
 
197
  Uint32 currentSendLimit;
 
198
  
 
199
  void calculateSendLimit();
 
200
 
 
201
  // Scan batch configuration parameters
 
202
  Uint32 m_scan_batch_size;
 
203
  Uint32 m_batch_byte_size;
 
204
  Uint32 m_batch_size;
 
205
 
 
206
  // Declarations for the receive and send thread
 
207
  int  theStopReceive;
 
208
 
 
209
  void threadMainSend(void);
 
210
  NdbThread* theSendThread;
 
211
  void threadMainReceive(void);
 
212
  NdbThread* theReceiveThread;
 
213
 
 
214
  friend void* runSendRequest_C(void*);
 
215
  friend void* runReceiveResponse_C(void*);
 
216
  friend void atexit_stop_instance();
 
217
 
 
218
  /**
 
219
   * Block number handling
 
220
   */
 
221
private:
 
222
 
 
223
  struct ThreadData {
 
224
    STATIC_CONST( ACTIVE = (1 << 16) | 1 );
 
225
    STATIC_CONST( INACTIVE = (1 << 16) );
 
226
    STATIC_CONST( END_OF_LIST = MAX_NO_THREADS + 1 );
 
227
    
 
228
    ThreadData(Uint32 initialSize = 32);
 
229
    
 
230
    /**
 
231
     * Split "object" into 3 list
 
232
     *   This to improve locality
 
233
     *   when iterating over lists
 
234
     */
 
235
    struct Object_Execute {
 
236
      void * m_object;
 
237
      ExecuteFunction m_executeFunction;
 
238
    };
 
239
    struct NodeStatus_NextFree {
 
240
      NodeStatusFunction m_statusFunction;
 
241
    };
 
242
 
 
243
    Uint32 m_use_cnt;
 
244
    Uint32 m_firstFree;
 
245
    Vector<Uint32> m_statusNext;
 
246
    Vector<Object_Execute> m_objectExecute;
 
247
    Vector<NodeStatusFunction> m_statusFunction;
 
248
    
 
249
    int open(void* objRef, ExecuteFunction, NodeStatusFunction);
 
250
    int close(int number);
 
251
    void expand(Uint32 size);
 
252
 
 
253
    inline Object_Execute get(Uint16 blockNo) const {
 
254
      blockNo -= MIN_API_BLOCK_NO;
 
255
      if(likely (blockNo < m_objectExecute.size())){
 
256
        return m_objectExecute[blockNo];
 
257
      }
 
258
      Object_Execute oe = { 0, 0 };
 
259
      return oe;
 
260
    }
 
261
 
 
262
    /**
 
263
     * Is the block number used currently
 
264
     */
 
265
    inline bool getInUse(Uint16 index) const {
 
266
      return (m_statusNext[index] & (1 << 16)) != 0;
 
267
    }
 
268
  } m_threads;
 
269
  
 
270
  Uint32 m_max_trans_id;
 
271
  Uint32 m_fragmented_signal_id;
 
272
 
 
273
  /**
 
274
   * execute function
 
275
   */
 
276
  friend void execute(void * callbackObj, SignalHeader * const header, 
 
277
                      Uint8 prio, 
 
278
                      Uint32 * const theData, LinearSectionPtr ptr[3]);
 
279
  
 
280
public:
 
281
  NdbMutex* theMutexPtr;
 
282
 
 
283
public:
 
284
  GlobalDictCache m_globalDictCache;
 
285
};
 
286
 
 
287
class PollGuard
 
288
{
 
289
  public:
 
290
  PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter, Uint32 block_no);
 
291
  ~PollGuard() { unlock_and_signal(); }
 
292
  int wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state,
 
293
                    bool forceSend= false);
 
294
  int wait_for_input_in_loop(int wait_time, bool forceSend);
 
295
  void wait_for_input(int wait_time);
 
296
  int wait_scan(int wait_time, NodeId nodeId, bool forceSend);
 
297
  void unlock_and_signal();
 
298
  private:
 
299
  TransporterFacade *m_tp;
 
300
  NdbWaiter *m_waiter;
 
301
  Uint32 m_block_no;
 
302
  bool m_locked;
 
303
};
 
304
 
 
305
 
 
306
inline
 
307
void 
 
308
TransporterFacade::lock_mutex()
 
309
{
 
310
  NdbMutex_Lock(theMutexPtr);
 
311
}
 
312
 
 
313
inline
 
314
void 
 
315
TransporterFacade::unlock_mutex()
 
316
{
 
317
  NdbMutex_Unlock(theMutexPtr);
 
318
}
 
319
 
 
320
#include "ClusterMgr.hpp"
 
321
 
 
322
inline
 
323
unsigned Ndb_cluster_connection_impl::get_connect_count() const
 
324
{
 
325
  return m_transporter_facade->theClusterMgr->m_connect_count;
 
326
}
 
327
 
 
328
inline
 
329
bool
 
330
TransporterFacade::check_send_size(Uint32 node_id, Uint32 send_size)
 
331
{
 
332
  return true;
 
333
}
 
334
 
 
335
inline
 
336
bool
 
337
TransporterFacade::getIsDbNode(NodeId n) const {
 
338
  return 
 
339
    theClusterMgr->getNodeInfo(n).defined && 
 
340
    theClusterMgr->getNodeInfo(n).m_info.m_type == NodeInfo::DB;
 
341
}
 
342
 
 
343
inline
 
344
Uint32
 
345
TransporterFacade::getNodeGrp(NodeId n) const {
 
346
  return theClusterMgr->getNodeInfo(n).m_state.nodeGroup;
 
347
}
 
348
 
 
349
 
 
350
inline
 
351
bool
 
352
TransporterFacade::get_node_alive(NodeId n) const {
 
353
 
 
354
  const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(n);
 
355
  return node.m_alive;
 
356
}
 
357
 
 
358
inline
 
359
void
 
360
TransporterFacade::hb_received(NodeId n) {
 
361
  theClusterMgr->hb_received(n);
 
362
}
 
363
 
 
364
inline
 
365
bool
 
366
TransporterFacade::get_node_stopping(NodeId n) const {
 
367
  const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(n);
 
368
  return (!node.m_state.getSingleUserMode() &&
 
369
          (node.m_state.startLevel == NodeState::SL_STOPPING_1) ||
 
370
          (node.m_state.startLevel == NodeState::SL_STOPPING_2));
 
371
}
 
372
 
 
373
inline
 
374
bool
 
375
TransporterFacade::getIsNodeSendable(NodeId n) const {
 
376
  const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(n);
 
377
  const Uint32 startLevel = node.m_state.startLevel;
 
378
 
 
379
  if (node.m_info.m_type == NodeInfo::DB) {
 
380
    return node.compatible && (startLevel == NodeState::SL_STARTED ||
 
381
                               startLevel == NodeState::SL_STOPPING_1 ||
 
382
                               node.m_state.getSingleUserMode());
 
383
  } else {
 
384
    ndbout_c("TransporterFacade::getIsNodeSendable: Illegal node type: "
 
385
             "%d of node: %d", 
 
386
             node.m_info.m_type, n);
 
387
    abort();
 
388
    return false; // to remove compiler warning
 
389
  }
 
390
}
 
391
 
 
392
inline
 
393
Uint32
 
394
TransporterFacade::getNodeSequence(NodeId n) const {
 
395
  return theClusterMgr->getNodeInfo(n).m_info.m_connectCount;
 
396
}
 
397
 
 
398
inline
 
399
Uint32
 
400
TransporterFacade::get_scan_batch_size() {
 
401
  return m_scan_batch_size;
 
402
}
 
403
 
 
404
inline
 
405
Uint32
 
406
TransporterFacade::get_batch_byte_size() {
 
407
  return m_batch_byte_size;
 
408
}
 
409
 
 
410
inline
 
411
Uint32
 
412
TransporterFacade::get_batch_size() {
 
413
  return m_batch_size;
 
414
}
 
415
 
 
416
 
 
417
 
 
418
#endif // TransporterFacade_H