~ubuntu-branches/ubuntu/precise/mysql-5.5/precise-201203300109

« back to all changes in this revision

Viewing changes to storage/ndb/src/ndbapi/signal-sender/SignalSender.cpp

  • Committer: Package Import Robot
  • Author(s): Clint Byrum
  • Date: 2011-11-08 11:31:13 UTC
  • Revision ID: package-import@ubuntu.com-20111108113113-3ulw01fvi4vn8m25
Tags: upstream-5.5.17
ImportĀ upstreamĀ versionĀ 5.5.17

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include "SignalSender.hpp"
 
17
#include "ConfigRetriever.hpp"
 
18
#include <NdbSleep.h>
 
19
#include <SignalLoggerManager.hpp>
 
20
 
 
21
SimpleSignal::SimpleSignal(bool dealloc){
 
22
  memset(this, 0, sizeof(* this));
 
23
  deallocSections = dealloc;
 
24
}
 
25
 
 
26
SimpleSignal::~SimpleSignal(){
 
27
  if(!deallocSections)
 
28
    return;
 
29
  if(ptr[0].p != 0) delete []ptr[0].p;
 
30
  if(ptr[1].p != 0) delete []ptr[1].p;
 
31
  if(ptr[2].p != 0) delete []ptr[2].p;
 
32
}
 
33
 
 
34
void 
 
35
SimpleSignal::set(class SignalSender& ss,
 
36
                  Uint8  trace, Uint16 recBlock, Uint16 gsn, Uint32 len){
 
37
  
 
38
  header.theTrace                = trace;
 
39
  header.theReceiversBlockNumber = recBlock;
 
40
  header.theVerId_signalNumber   = gsn;
 
41
  header.theLength               = len;
 
42
  header.theSendersBlockRef      = refToBlock(ss.getOwnRef());
 
43
}
 
44
 
 
45
void
 
46
SimpleSignal::print(FILE * out){
 
47
  fprintf(out, "---- Signal ----------------\n");
 
48
  SignalLoggerManager::printSignalHeader(out, header, 0, 0, false);
 
49
  SignalLoggerManager::printSignalData(out, header, theData);
 
50
  for(Uint32 i = 0; i<header.m_noOfSections; i++){
 
51
    Uint32 len = ptr[i].sz;
 
52
    fprintf(out, " --- Section %d size=%d ---\n", i, len);
 
53
    Uint32 * signalData = ptr[i].p;
 
54
    while(len >= 7){
 
55
      fprintf(out, 
 
56
              " H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x\n",
 
57
              signalData[0], signalData[1], signalData[2], signalData[3], 
 
58
              signalData[4], signalData[5], signalData[6]);
 
59
      len -= 7;
 
60
      signalData += 7;
 
61
    }
 
62
    if(len > 0){
 
63
      fprintf(out, " H\'%.8x", signalData[0]);
 
64
      for(Uint32 i = 1; i<len; i++)
 
65
        fprintf(out, " H\'%.8x", signalData[i]);
 
66
      fprintf(out, "\n");
 
67
    }
 
68
  }
 
69
}
 
70
 
 
71
SignalSender::SignalSender(const char * connectString){
 
72
  m_cond = NdbCondition_Create();
 
73
  theFacade = TransporterFacade::start_instance(connectString);
 
74
  m_blockNo = theFacade->open(this, execSignal, execNodeStatus);
 
75
  assert(m_blockNo > 0);
 
76
}
 
77
 
 
78
SignalSender::~SignalSender(){
 
79
  theFacade->close(m_blockNo);
 
80
  theFacade->stop_instance();
 
81
  NdbCondition_Destroy(m_cond);
 
82
}
 
83
 
 
84
Uint32
 
85
SignalSender::getOwnRef() const {
 
86
  return numberToRef(m_blockNo, theFacade->ownId());
 
87
}
 
88
 
 
89
bool
 
90
SignalSender::connectOne(Uint32 timeOutMillis){
 
91
  NDB_TICKS start = NdbTick_CurrentMillisecond();
 
92
  NDB_TICKS now = start;
 
93
  while(theFacade->theClusterMgr->getNoOfConnectedNodes() == 0 &&
 
94
        (timeOutMillis == 0 || (now - start) < timeOutMillis)){
 
95
    NdbSleep_MilliSleep(100);
 
96
  }
 
97
  return theFacade->theClusterMgr->getNoOfConnectedNodes() > 0;
 
98
}
 
99
 
 
100
bool
 
101
SignalSender::connectAll(Uint32 timeOutMillis){
 
102
  NDB_TICKS start = NdbTick_CurrentMillisecond();
 
103
  NDB_TICKS now = start;
 
104
  while(theFacade->theClusterMgr->getNoOfConnectedNodes() < 1 &&
 
105
        (timeOutMillis == 0 || (now - start) < timeOutMillis)){ 
 
106
    NdbSleep_MilliSleep(100);
 
107
  }
 
108
  return theFacade->theClusterMgr->getNoOfConnectedNodes() >= 1;
 
109
}
 
110
 
 
111
 
 
112
Uint32
 
113
SignalSender::getAliveNode(){
 
114
  return theFacade->get_an_alive_node();
 
115
}
 
116
 
 
117
const ClusterMgr::Node & 
 
118
SignalSender::getNodeInfo(Uint16 nodeId) const {
 
119
  return theFacade->theClusterMgr->getNodeInfo(nodeId);
 
120
}
 
121
 
 
122
Uint32
 
123
SignalSender::getNoOfConnectedNodes() const {
 
124
  return theFacade->theClusterMgr->getNoOfConnectedNodes();
 
125
}
 
126
 
 
127
SendStatus
 
128
SignalSender::sendSignal(Uint16 nodeId, const SimpleSignal * s){
 
129
  return theFacade->theTransporterRegistry->prepareSend(&s->header,
 
130
                                                        1, // JBB
 
131
                                                        &s->theData[0],
 
132
                                                        nodeId, 
 
133
                                                        &s->ptr[0]);
 
134
}
 
135
 
 
136
template<class T>
 
137
SimpleSignal *
 
138
SignalSender::waitFor(Uint32 timeOutMillis, T & t){
 
139
  
 
140
  Guard g(theFacade->theMutexPtr);
 
141
  
 
142
  SimpleSignal * s = t.check(m_jobBuffer);
 
143
  if(s != 0){
 
144
    return s;
 
145
  }
 
146
  
 
147
  NDB_TICKS now = NdbTick_CurrentMillisecond();
 
148
  NDB_TICKS stop = now + timeOutMillis;
 
149
  Uint32 wait = (timeOutMillis == 0 ? 10 : timeOutMillis);
 
150
  do {
 
151
    NdbCondition_WaitTimeout(m_cond,
 
152
                             theFacade->theMutexPtr, 
 
153
                             wait);
 
154
    
 
155
    
 
156
    SimpleSignal * s = t.check(m_jobBuffer);
 
157
    if(s != 0){
 
158
      return s;
 
159
    }
 
160
    
 
161
    now = NdbTick_CurrentMillisecond();
 
162
    wait = (timeOutMillis == 0 ? 10 : stop - now);
 
163
  } while(stop > now || timeOutMillis == 0);
 
164
  
 
165
  return 0;
 
166
 
167
 
 
168
class WaitForAny {
 
169
public:
 
170
  SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){
 
171
    if(m_jobBuffer.size() > 0){
 
172
      SimpleSignal * s = m_jobBuffer[0];
 
173
      m_jobBuffer.erase(0);
 
174
        return s;
 
175
    }
 
176
      return 0;
 
177
  }
 
178
};
 
179
  
 
180
SimpleSignal *
 
181
SignalSender::waitFor(Uint32 timeOutMillis){
 
182
  
 
183
  WaitForAny w;
 
184
  return waitFor(timeOutMillis, w);
 
185
}
 
186
 
 
187
class WaitForNode {
 
188
public:
 
189
  Uint32 m_nodeId;
 
190
  SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){
 
191
    Uint32 len = m_jobBuffer.size();
 
192
    for(Uint32 i = 0; i<len; i++){
 
193
      if(refToNode(m_jobBuffer[i]->header.theSendersBlockRef) == m_nodeId){
 
194
        SimpleSignal * s = m_jobBuffer[i];
 
195
        m_jobBuffer.erase(i);
 
196
        return s;
 
197
      }
 
198
    }
 
199
    return 0;
 
200
  }
 
201
};
 
202
 
 
203
SimpleSignal *
 
204
SignalSender::waitFor(Uint16 nodeId, Uint32 timeOutMillis){
 
205
  
 
206
  WaitForNode w;
 
207
  w.m_nodeId = nodeId;
 
208
  return waitFor(timeOutMillis, w);
 
209
}
 
210
 
 
211
#include <NdbApiSignal.hpp>
 
212
 
 
213
void
 
214
SignalSender::execSignal(void* signalSender, 
 
215
                         NdbApiSignal* signal, 
 
216
                         class LinearSectionPtr ptr[3]){
 
217
  SimpleSignal * s = new SimpleSignal(true);
 
218
  s->header = * signal;
 
219
  memcpy(&s->theData[0], signal->getDataPtr(), 4 * s->header.theLength);
 
220
  for(Uint32 i = 0; i<s->header.m_noOfSections; i++){
 
221
    s->ptr[i].p = new Uint32[ptr[i].sz];
 
222
    s->ptr[i].sz = ptr[i].sz;
 
223
    memcpy(s->ptr[i].p, ptr[i].p, 4 * ptr[i].sz);
 
224
  }
 
225
  SignalSender * ss = (SignalSender*)signalSender;
 
226
  ss->m_jobBuffer.push_back(s);
 
227
  NdbCondition_Signal(ss->m_cond);
 
228
}
 
229
  
 
230
void 
 
231
SignalSender::execNodeStatus(void* signalSender, 
 
232
                             Uint16 NodeId, 
 
233
                             bool alive, 
 
234
                             bool nfCompleted){
 
235
}
 
236