1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#ifndef TransporterFacade_H
17
#define TransporterFacade_H
19
#include <kernel_types.h>
20
#include <ndb_limits.h>
21
#include <NdbThread.h>
22
#include <TransporterRegistry.hpp>
24
#include "DictCache.hpp"
25
#include <BlockNumbers.h>
31
struct ndb_mgm_configuration;
32
class ConfigRetriever;
38
typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]);
39
typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete);
42
void* runSendRequest_C(void*);
43
void* runReceiveResponse_C(void*);
44
void atexit_stop_instance();
47
class TransporterFacade
51
* Max number of Ndb objects.
52
* (Ndb objects should not be shared by different threads.)
54
STATIC_CONST( MAX_NO_THREADS = 4711 );
56
virtual ~TransporterFacade();
57
bool init(Uint32, const ndb_mgm_configuration *);
59
int start_instance(int, const ndb_mgm_configuration*);
63
* Register this block for sending/receiving signals
64
* @return BlockNumber or -1 for failure
66
int open(void* objRef, ExecuteFunction, NodeStatusFunction);
68
// Close this block number
69
int close(BlockNumber blockNumber, Uint64 trans_id);
70
Uint32 get_active_ndb_objects() const;
72
// Only sends to nodes which are alive
73
int sendSignal(NdbApiSignal * signal, NodeId nodeId);
74
int sendSignal(NdbApiSignal*, NodeId,
75
LinearSectionPtr ptr[3], Uint32 secs);
76
int sendFragmentedSignal(NdbApiSignal*, NodeId,
77
LinearSectionPtr ptr[3], Uint32 secs);
79
// Is node available for running transactions
80
bool get_node_alive(NodeId nodeId) const;
81
bool get_node_stopping(NodeId nodeId) const;
82
bool getIsDbNode(NodeId nodeId) const;
83
bool getIsNodeSendable(NodeId nodeId) const;
84
Uint32 getNodeGrp(NodeId nodeId) const;
85
Uint32 getNodeSequence(NodeId nodeId) const;
87
// Is there space in sendBuffer to send messages
88
bool check_send_size(Uint32 node_id, Uint32 send_size);
90
// My own processor id
95
void doConnect(int NodeId);
96
void reportConnected(int NodeId);
97
void doDisconnect(int NodeId);
98
void reportDisconnected(int NodeId);
100
NodeId get_an_alive_node();
101
void ReportNodeAlive(NodeId nodeId);
102
void ReportNodeDead(NodeId nodeId);
103
void ReportNodeFailureComplete(NodeId nodeId);
106
* Send signal to each registered object
108
void for_each(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]);
113
// Improving the API performance
114
void forceSend(Uint32 block_number);
115
void checkForceSend(Uint32 block_number);
117
// Close this block number
118
int close_local(BlockNumber blockNumber);
120
// Scan batch configuration parameters
121
Uint32 get_scan_batch_size();
122
Uint32 get_batch_byte_size();
123
Uint32 get_batch_size();
124
Uint32 m_waitfor_timeout; // in milli seconds...
126
TransporterRegistry* get_registry() { return theTransporterRegistry;};
129
When a thread has sent its signals and is ready to wait for reception
130
of these it does normally always wait on a conditional mutex and
131
the actual reception is handled by the receiver thread in the NDB API.
132
With the below new methods and variables each thread has the possibility
133
of becoming owner of the "right" to poll for signals. Effectually this
134
means that the thread acts temporarily as a receiver thread.
135
For the thread that succeeds in grabbing this "ownership" it will avoid
136
a number of expensive calls to conditional mutex and even more expensive
137
context switches to wake up.
138
When an owner of the poll "right" has completed its own task it is likely
139
that there are others still waiting. In this case we pick one of the
140
threads as new owner of the poll "right". Since we want to switch owner
141
as seldom as possible we always pick the last thread which is likely to
142
be the last to complete its reception.
144
void external_poll(Uint32 wait_time);
145
NdbWaiter* get_poll_owner(void) const { return poll_owner; }
146
void set_poll_owner(NdbWaiter* new_owner) { poll_owner= new_owner; }
147
Uint32 put_in_cond_wait_queue(NdbWaiter *aWaiter);
148
void remove_from_cond_wait_queue(NdbWaiter *aWaiter);
149
NdbWaiter* rem_last_from_cond_wait_queue();
150
// heart beat received from a node (e.g. a signal came)
151
void hb_received(NodeId n);
154
void init_cond_wait_queue();
155
struct CondWaitQueueElement {
156
NdbWaiter *cond_wait_object;
157
Uint32 next_cond_wait;
158
Uint32 prev_cond_wait;
160
NdbWaiter *poll_owner;
161
CondWaitQueueElement cond_wait_array[MAX_NO_THREADS];
162
Uint32 first_in_cond_wait;
163
Uint32 first_free_cond_wait;
164
Uint32 last_in_cond_wait;
165
/* End poll owner stuff */
167
* Send a signal unconditional of node status (used by ClusterMgr)
169
friend class ClusterMgr;
170
friend class ArbitMgr;
171
friend class MgmtSrvr;
172
friend class SignalSender;
174
friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond
177
friend class Ndb_cluster_connection_impl;
178
friend class NdbTransaction;
180
int sendSignalUnCond(NdbApiSignal *, NodeId nodeId);
182
bool isConnected(NodeId aNodeId);
185
TransporterRegistry* theTransporterRegistry;
186
SocketServer m_socket_server;
187
int sendPerformedLastInterval;
190
NodeId theStartNodeId;
192
ClusterMgr* theClusterMgr;
193
ArbitMgr* theArbitMgr;
195
// Improving the API response time
197
Uint32 currentSendLimit;
199
void calculateSendLimit();
201
// Scan batch configuration parameters
202
Uint32 m_scan_batch_size;
203
Uint32 m_batch_byte_size;
206
// Declarations for the receive and send thread
209
void threadMainSend(void);
210
NdbThread* theSendThread;
211
void threadMainReceive(void);
212
NdbThread* theReceiveThread;
214
friend void* runSendRequest_C(void*);
215
friend void* runReceiveResponse_C(void*);
216
friend void atexit_stop_instance();
219
* Block number handling
224
STATIC_CONST( ACTIVE = (1 << 16) | 1 );
225
STATIC_CONST( INACTIVE = (1 << 16) );
226
STATIC_CONST( END_OF_LIST = MAX_NO_THREADS + 1 );
228
ThreadData(Uint32 initialSize = 32);
231
* Split "object" into 3 list
232
* This to improve locality
233
* when iterating over lists
235
struct Object_Execute {
237
ExecuteFunction m_executeFunction;
239
struct NodeStatus_NextFree {
240
NodeStatusFunction m_statusFunction;
245
Vector<Uint32> m_statusNext;
246
Vector<Object_Execute> m_objectExecute;
247
Vector<NodeStatusFunction> m_statusFunction;
249
int open(void* objRef, ExecuteFunction, NodeStatusFunction);
250
int close(int number);
251
void expand(Uint32 size);
253
inline Object_Execute get(Uint16 blockNo) const {
254
blockNo -= MIN_API_BLOCK_NO;
255
if(likely (blockNo < m_objectExecute.size())){
256
return m_objectExecute[blockNo];
258
Object_Execute oe = { 0, 0 };
263
* Is the block number used currently
265
inline bool getInUse(Uint16 index) const {
266
return (m_statusNext[index] & (1 << 16)) != 0;
270
Uint32 m_max_trans_id;
271
Uint32 m_fragmented_signal_id;
276
friend void execute(void * callbackObj, SignalHeader * const header,
278
Uint32 * const theData, LinearSectionPtr ptr[3]);
281
NdbMutex* theMutexPtr;
284
GlobalDictCache m_globalDictCache;
290
PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter, Uint32 block_no);
291
~PollGuard() { unlock_and_signal(); }
292
int wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state,
293
bool forceSend= false);
294
int wait_for_input_in_loop(int wait_time, bool forceSend);
295
void wait_for_input(int wait_time);
296
int wait_scan(int wait_time, NodeId nodeId, bool forceSend);
297
void unlock_and_signal();
299
TransporterFacade *m_tp;
308
TransporterFacade::lock_mutex()
310
NdbMutex_Lock(theMutexPtr);
315
TransporterFacade::unlock_mutex()
317
NdbMutex_Unlock(theMutexPtr);
320
#include "ClusterMgr.hpp"
323
unsigned Ndb_cluster_connection_impl::get_connect_count() const
325
return m_transporter_facade->theClusterMgr->m_connect_count;
330
TransporterFacade::check_send_size(Uint32 node_id, Uint32 send_size)
337
TransporterFacade::getIsDbNode(NodeId n) const {
339
theClusterMgr->getNodeInfo(n).defined &&
340
theClusterMgr->getNodeInfo(n).m_info.m_type == NodeInfo::DB;
345
TransporterFacade::getNodeGrp(NodeId n) const {
346
return theClusterMgr->getNodeInfo(n).m_state.nodeGroup;
352
TransporterFacade::get_node_alive(NodeId n) const {
354
const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(n);
360
TransporterFacade::hb_received(NodeId n) {
361
theClusterMgr->hb_received(n);
366
TransporterFacade::get_node_stopping(NodeId n) const {
367
const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(n);
368
return (!node.m_state.getSingleUserMode() &&
369
(node.m_state.startLevel == NodeState::SL_STOPPING_1) ||
370
(node.m_state.startLevel == NodeState::SL_STOPPING_2));
375
TransporterFacade::getIsNodeSendable(NodeId n) const {
376
const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(n);
377
const Uint32 startLevel = node.m_state.startLevel;
379
if (node.m_info.m_type == NodeInfo::DB) {
380
return node.compatible && (startLevel == NodeState::SL_STARTED ||
381
startLevel == NodeState::SL_STOPPING_1 ||
382
node.m_state.getSingleUserMode());
384
ndbout_c("TransporterFacade::getIsNodeSendable: Illegal node type: "
386
node.m_info.m_type, n);
388
return false; // to remove compiler warning
394
TransporterFacade::getNodeSequence(NodeId n) const {
395
return theClusterMgr->getNodeInfo(n).m_info.m_connectCount;
400
TransporterFacade::get_scan_batch_size() {
401
return m_scan_batch_size;
406
TransporterFacade::get_batch_byte_size() {
407
return m_batch_byte_size;
412
TransporterFacade::get_batch_size() {
418
#endif // TransporterFacade_H