1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include "SignalSender.hpp"
17
#include "ConfigRetriever.hpp"
19
#include <SignalLoggerManager.hpp>
21
SimpleSignal::SimpleSignal(bool dealloc){
22
memset(this, 0, sizeof(* this));
23
deallocSections = dealloc;
26
SimpleSignal::~SimpleSignal(){
29
if(ptr[0].p != 0) delete []ptr[0].p;
30
if(ptr[1].p != 0) delete []ptr[1].p;
31
if(ptr[2].p != 0) delete []ptr[2].p;
35
SimpleSignal::set(class SignalSender& ss,
36
Uint8 trace, Uint16 recBlock, Uint16 gsn, Uint32 len){
38
header.theTrace = trace;
39
header.theReceiversBlockNumber = recBlock;
40
header.theVerId_signalNumber = gsn;
41
header.theLength = len;
42
header.theSendersBlockRef = refToBlock(ss.getOwnRef());
46
SimpleSignal::print(FILE * out){
47
fprintf(out, "---- Signal ----------------\n");
48
SignalLoggerManager::printSignalHeader(out, header, 0, 0, false);
49
SignalLoggerManager::printSignalData(out, header, theData);
50
for(Uint32 i = 0; i<header.m_noOfSections; i++){
51
Uint32 len = ptr[i].sz;
52
fprintf(out, " --- Section %d size=%d ---\n", i, len);
53
Uint32 * signalData = ptr[i].p;
56
" H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x\n",
57
signalData[0], signalData[1], signalData[2], signalData[3],
58
signalData[4], signalData[5], signalData[6]);
63
fprintf(out, " H\'%.8x", signalData[0]);
64
for(Uint32 i = 1; i<len; i++)
65
fprintf(out, " H\'%.8x", signalData[i]);
71
SignalSender::SignalSender(const char * connectString){
72
m_cond = NdbCondition_Create();
73
theFacade = TransporterFacade::start_instance(connectString);
74
m_blockNo = theFacade->open(this, execSignal, execNodeStatus);
75
assert(m_blockNo > 0);
78
SignalSender::~SignalSender(){
79
theFacade->close(m_blockNo);
80
theFacade->stop_instance();
81
NdbCondition_Destroy(m_cond);
85
SignalSender::getOwnRef() const {
86
return numberToRef(m_blockNo, theFacade->ownId());
90
SignalSender::connectOne(Uint32 timeOutMillis){
91
NDB_TICKS start = NdbTick_CurrentMillisecond();
92
NDB_TICKS now = start;
93
while(theFacade->theClusterMgr->getNoOfConnectedNodes() == 0 &&
94
(timeOutMillis == 0 || (now - start) < timeOutMillis)){
95
NdbSleep_MilliSleep(100);
97
return theFacade->theClusterMgr->getNoOfConnectedNodes() > 0;
101
SignalSender::connectAll(Uint32 timeOutMillis){
102
NDB_TICKS start = NdbTick_CurrentMillisecond();
103
NDB_TICKS now = start;
104
while(theFacade->theClusterMgr->getNoOfConnectedNodes() < 1 &&
105
(timeOutMillis == 0 || (now - start) < timeOutMillis)){
106
NdbSleep_MilliSleep(100);
108
return theFacade->theClusterMgr->getNoOfConnectedNodes() >= 1;
113
SignalSender::getAliveNode(){
114
return theFacade->get_an_alive_node();
117
const ClusterMgr::Node &
118
SignalSender::getNodeInfo(Uint16 nodeId) const {
119
return theFacade->theClusterMgr->getNodeInfo(nodeId);
123
SignalSender::getNoOfConnectedNodes() const {
124
return theFacade->theClusterMgr->getNoOfConnectedNodes();
128
SignalSender::sendSignal(Uint16 nodeId, const SimpleSignal * s){
129
return theFacade->theTransporterRegistry->prepareSend(&s->header,
138
SignalSender::waitFor(Uint32 timeOutMillis, T & t){
140
Guard g(theFacade->theMutexPtr);
142
SimpleSignal * s = t.check(m_jobBuffer);
147
NDB_TICKS now = NdbTick_CurrentMillisecond();
148
NDB_TICKS stop = now + timeOutMillis;
149
Uint32 wait = (timeOutMillis == 0 ? 10 : timeOutMillis);
151
NdbCondition_WaitTimeout(m_cond,
152
theFacade->theMutexPtr,
156
SimpleSignal * s = t.check(m_jobBuffer);
161
now = NdbTick_CurrentMillisecond();
162
wait = (timeOutMillis == 0 ? 10 : stop - now);
163
} while(stop > now || timeOutMillis == 0);
170
SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){
171
if(m_jobBuffer.size() > 0){
172
SimpleSignal * s = m_jobBuffer[0];
173
m_jobBuffer.erase(0);
181
SignalSender::waitFor(Uint32 timeOutMillis){
184
return waitFor(timeOutMillis, w);
190
SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){
191
Uint32 len = m_jobBuffer.size();
192
for(Uint32 i = 0; i<len; i++){
193
if(refToNode(m_jobBuffer[i]->header.theSendersBlockRef) == m_nodeId){
194
SimpleSignal * s = m_jobBuffer[i];
195
m_jobBuffer.erase(i);
204
SignalSender::waitFor(Uint16 nodeId, Uint32 timeOutMillis){
208
return waitFor(timeOutMillis, w);
211
#include <NdbApiSignal.hpp>
214
SignalSender::execSignal(void* signalSender,
215
NdbApiSignal* signal,
216
class LinearSectionPtr ptr[3]){
217
SimpleSignal * s = new SimpleSignal(true);
218
s->header = * signal;
219
memcpy(&s->theData[0], signal->getDataPtr(), 4 * s->header.theLength);
220
for(Uint32 i = 0; i<s->header.m_noOfSections; i++){
221
s->ptr[i].p = new Uint32[ptr[i].sz];
222
s->ptr[i].sz = ptr[i].sz;
223
memcpy(s->ptr[i].p, ptr[i].p, 4 * ptr[i].sz);
225
SignalSender * ss = (SignalSender*)signalSender;
226
ss->m_jobBuffer.push_back(s);
227
NdbCondition_Signal(ss->m_cond);
231
SignalSender::execNodeStatus(void* signalSender,