2
Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
4
This program is free software; you can redistribute it and/or modify
5
it under the terms of the GNU General Public License as published by
6
the Free Software Foundation; version 2 of the License.
8
This program is distributed in the hope that it will be useful,
9
but WITHOUT ANY WARRANTY; without even the implied warranty of
10
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
GNU General Public License for more details.
13
You should have received a copy of the GNU General Public License
14
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
#ifndef TransporterFacade_H
19
#define TransporterFacade_H
21
#include <kernel_types.h>
22
#include <ndb_limits.h>
23
#include <NdbThread.h>
24
#include <TransporterRegistry.hpp>
26
#include "DictCache.hpp"
27
#include <BlockNumbers.h>
32
struct ndb_mgm_configuration;
40
void* runSendRequest_C(void*);
41
void* runReceiveResponse_C(void*);
44
class TransporterFacade : public TransporterCallback
48
* Max number of Ndb objects.
49
* (Ndb objects should not be shared by different threads.)
51
STATIC_CONST( MAX_NO_THREADS = 4711 );
52
TransporterFacade(GlobalDictCache *cache);
53
virtual ~TransporterFacade();
55
int start_instance(NodeId, const ndb_mgm_configuration*);
59
(Re)configure the TransporterFacade
60
to a specific configuration
62
bool configure(NodeId, const ndb_mgm_configuration *);
65
* Register this block for sending/receiving signals
66
* @blockNo block number to use, -1 => any blockNumber
67
* @return BlockNumber or -1 for failure
69
Uint32 open_clnt(trp_client*, int blockNo = -1);
70
int close_clnt(trp_client*);
72
Uint32 get_active_ndb_objects() const;
74
// Only sends to nodes which are alive
76
int sendSignal(const NdbApiSignal * signal, NodeId nodeId);
77
int sendSignal(const NdbApiSignal*, NodeId,
78
const LinearSectionPtr ptr[3], Uint32 secs);
79
int sendSignal(const NdbApiSignal*, NodeId,
80
const GenericSectionPtr ptr[3], Uint32 secs);
81
int sendFragmentedSignal(const NdbApiSignal*, NodeId,
82
const LinearSectionPtr ptr[3], Uint32 secs);
83
int sendFragmentedSignal(const NdbApiSignal*, NodeId,
84
const GenericSectionPtr ptr[3], Uint32 secs);
88
* These are functions used by ndb_mgmd
90
void ext_set_max_api_reg_req_interval(Uint32 ms);
91
void ext_update_connections();
92
struct in_addr ext_get_connect_address(Uint32 nodeId);
94
bool ext_isConnected(NodeId aNodeId);
95
void ext_doConnect(int aNodeId);
97
// Is node available for running transactions
99
bool get_node_alive(NodeId nodeId) const;
100
bool getIsNodeSendable(NodeId nodeId) const;
103
Uint32 getMinDbNodeVersion() const;
105
// My own processor id
106
NodeId ownId() const;
110
void doConnect(int NodeId);
111
void reportConnected(int NodeId);
112
void doDisconnect(int NodeId);
113
void reportDisconnected(int NodeId);
115
NodeId get_an_alive_node();
116
void trp_node_status(NodeId, Uint32 event);
119
* Send signal to each registered object
121
void for_each(trp_client* clnt,
122
const NdbApiSignal* aSignal, const LinearSectionPtr ptr[3]);
127
// Improving the API performance
128
void forceSend(Uint32 block_number);
129
void checkForceSend(Uint32 block_number);
131
TransporterRegistry* get_registry() { return theTransporterRegistry;};
134
When a thread has sent its signals and is ready to wait for reception
135
of these it does normally always wait on a conditional mutex and
136
the actual reception is handled by the receiver thread in the NDB API.
137
With the below new methods and variables each thread has the possibility
138
of becoming owner of the "right" to poll for signals. Effectually this
139
means that the thread acts temporarily as a receiver thread.
140
For the thread that succeeds in grabbing this "ownership" it will avoid
141
a number of expensive calls to conditional mutex and even more expensive
142
context switches to wake up.
143
When an owner of the poll "right" has completed its own task it is likely
144
that there are others still waiting. In this case we pick one of the
145
threads as new owner of the poll "right". Since we want to switch owner
146
as seldom as possible we always pick the last thread which is likely to
147
be the last to complete its reception.
149
void start_poll(trp_client*);
150
void do_poll(trp_client* clnt, Uint32 wait_time);
151
void complete_poll(trp_client*);
152
void wakeup(trp_client*);
154
void external_poll(Uint32 wait_time);
156
trp_client* get_poll_owner(bool) const { return m_poll_owner;}
157
trp_client* remove_last_from_poll_queue();
158
void add_to_poll_queue(trp_client* clnt);
159
void remove_from_poll_queue(trp_client* clnt);
161
trp_client * m_poll_owner;
162
trp_client * m_poll_queue_head; // First in queue
163
trp_client * m_poll_queue_tail; // Last in queue
164
/* End poll owner stuff */
166
// heart beat received from a node (e.g. a signal came)
167
void hb_received(NodeId n);
168
void set_auto_reconnect(int val);
169
int get_auto_reconnect() const;
171
/* TransporterCallback interface. */
172
void deliver_signal(SignalHeader * const header,
174
Uint32 * const signalData,
175
LinearSectionPtr ptr[3]);
176
int checkJobBuffer();
177
void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
178
void reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes);
179
void reportConnect(NodeId nodeId);
180
void reportDisconnect(NodeId nodeId, Uint32 errNo);
181
void reportError(NodeId nodeId, TransporterError errorCode,
182
const char *info = 0);
183
void transporter_recv_from(NodeId node);
184
Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
186
return theTransporterRegistry->get_bytes_to_send_iovec(node, dst, max);
188
Uint32 bytes_sent(NodeId node, Uint32 bytes)
190
return theTransporterRegistry->bytes_sent(node, bytes);
192
bool has_data_to_send(NodeId node)
194
return theTransporterRegistry->has_data_to_send(node);
196
void reset_send_buffer(NodeId node, bool should_be_empty)
198
theTransporterRegistry->reset_send_buffer(node, should_be_empty);
203
friend class trp_client;
204
friend class ClusterMgr;
205
friend class ArbitMgr;
206
friend class Ndb_cluster_connection;
207
friend class Ndb_cluster_connection_impl;
209
bool isConnected(NodeId aNodeId);
212
TransporterRegistry* theTransporterRegistry;
213
SocketServer m_socket_server;
214
int sendPerformedLastInterval;
216
NodeId theStartNodeId;
218
ClusterMgr* theClusterMgr;
220
// Improving the API response time
222
Uint32 currentSendLimit;
224
void calculateSendLimit();
226
// Declarations for the receive and send thread
229
void threadMainSend(void);
230
NdbThread* theSendThread;
231
void threadMainReceive(void);
232
NdbThread* theReceiveThread;
234
friend void* runSendRequest_C(void*);
235
friend void* runReceiveResponse_C(void*);
237
bool do_connect_mgm(NodeId, const ndb_mgm_configuration*);
240
* Block number handling
245
STATIC_CONST( ACTIVE = (1 << 16) | 1 );
246
STATIC_CONST( INACTIVE = (1 << 16) );
247
STATIC_CONST( END_OF_LIST = MAX_NO_THREADS + 1 );
249
ThreadData(Uint32 initialSize = 32);
253
Vector<Uint32> m_statusNext;
254
Vector<trp_client*> m_objectExecute;
256
int open(trp_client*);
257
int close(int number);
258
void expand(Uint32 size);
260
inline trp_client* get(Uint16 blockNo) const {
261
blockNo -= MIN_API_BLOCK_NO;
262
if(likely (blockNo < m_objectExecute.size()))
264
return m_objectExecute.getBase()[blockNo];
270
Uint32 m_fixed2dynamic[NO_API_FIXED_BLOCKS];
271
Uint32 m_fragmented_signal_id;
274
NdbMutex* theMutexPtr;
277
GlobalDictCache *m_globalDictCache;
282
TransporterFacade::lock_mutex()
284
NdbMutex_Lock(theMutexPtr);
289
TransporterFacade::unlock_mutex()
291
NdbMutex_Unlock(theMutexPtr);
294
#include "ClusterMgr.hpp"
295
#include "ndb_cluster_connection_impl.hpp"
298
unsigned Ndb_cluster_connection_impl::get_connect_count() const
300
if (m_transporter_facade->theClusterMgr)
301
return m_transporter_facade->theClusterMgr->m_connect_count;
306
unsigned Ndb_cluster_connection_impl::get_min_db_version() const
308
return m_transporter_facade->getMinDbNodeVersion();
313
TransporterFacade::get_node_alive(NodeId n) const {
316
return theClusterMgr->getNodeInfo(n).m_alive;
323
TransporterFacade::hb_received(NodeId n) {
324
theClusterMgr->hb_received(n);
329
TransporterFacade::getMinDbNodeVersion() const
332
return theClusterMgr->minDbVersion;
339
trp_client::getNodeInfo(Uint32 nodeId) const
341
return m_facade->theClusterMgr->getNodeInfo(nodeId);
345
* LinearSectionIterator
347
* This is an implementation of GenericSectionIterator
348
* that iterates over one linear section of memory.
349
* The iterator is used by the transporter at signal
350
* send time to obtain all of the relevant words for the
353
class LinearSectionIterator: public GenericSectionIterator
360
LinearSectionIterator(const Uint32* _data, Uint32 _len)
362
data= (_len == 0)? NULL:_data;
367
~LinearSectionIterator()
376
const Uint32* getNextWords(Uint32& sz)
391
* SignalSectionIterator
393
* This is an implementation of GenericSectionIterator
394
* that uses chained NdbApiSignal objects to store a
396
* The iterator is used by the transporter at signal
397
* send time to obtain all of the relevant words for the
400
class SignalSectionIterator: public GenericSectionIterator
403
NdbApiSignal* firstSignal;
404
NdbApiSignal* currentSignal;
406
SignalSectionIterator(NdbApiSignal* signal)
408
firstSignal= currentSignal= signal;
411
~SignalSectionIterator()
417
currentSignal= firstSignal;
420
const Uint32* getNextWords(Uint32& sz);
424
* GenericSectionIteratorReader
425
* Helper class to simplify reading data from
426
* GenericSectionIterator implementations
432
GenericSectionIterator* gsi;
433
const Uint32* chunkPtr;
436
GSIReader(GenericSectionIterator* _gsi)
443
void copyNWords(Uint32* dest, Uint32 n)
447
if (chunkRemain == 0)
449
/* Get next contiguous stretch of words from
452
chunkPtr = gsi->getNextWords(chunkRemain);
454
abort(); // Must have the words the caller asks for
458
/* Have some words from the iterator, copy some/
461
Uint32 wordsToCopy = MIN(chunkRemain, n);
462
memcpy(dest, chunkPtr, wordsToCopy << 2);
463
chunkPtr += wordsToCopy;
464
chunkRemain -= wordsToCopy;
476
#endif // TransporterFacade_H