~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/ndbapi/SignalSender.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include "SignalSender.hpp"
 
17
#include <NdbSleep.h>
 
18
#include <SignalLoggerManager.hpp>
 
19
#include <signaldata/NFCompleteRep.hpp>
 
20
#include <signaldata/NodeFailRep.hpp>
 
21
 
 
22
 
 
23
SimpleSignal::SimpleSignal(bool dealloc){
 
24
  memset(this, 0, sizeof(* this));
 
25
  deallocSections = dealloc;
 
26
}
 
27
 
 
28
SimpleSignal::~SimpleSignal(){
 
29
  if(!deallocSections)
 
30
    return;
 
31
  if(ptr[0].p != 0) delete []ptr[0].p;
 
32
  if(ptr[1].p != 0) delete []ptr[1].p;
 
33
  if(ptr[2].p != 0) delete []ptr[2].p;
 
34
}
 
35
 
 
36
void 
 
37
SimpleSignal::set(class SignalSender& ss,
 
38
                  Uint8  trace, Uint16 recBlock, Uint16 gsn, Uint32 len){
 
39
  
 
40
  header.theTrace                = trace;
 
41
  header.theReceiversBlockNumber = recBlock;
 
42
  header.theVerId_signalNumber   = gsn;
 
43
  header.theLength               = len;
 
44
  header.theSendersBlockRef      = refToBlock(ss.getOwnRef());
 
45
}
 
46
 
 
47
void
 
48
SimpleSignal::print(FILE * out){
 
49
  fprintf(out, "---- Signal ----------------\n");
 
50
  SignalLoggerManager::printSignalHeader(out, header, 0, 0, false);
 
51
  SignalLoggerManager::printSignalData(out, header, theData);
 
52
  for(Uint32 i = 0; i<header.m_noOfSections; i++){
 
53
    Uint32 len = ptr[i].sz;
 
54
    fprintf(out, " --- Section %d size=%d ---\n", i, len);
 
55
    Uint32 * signalData = ptr[i].p;
 
56
    while(len >= 7){
 
57
      fprintf(out, 
 
58
              " H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x\n",
 
59
              signalData[0], signalData[1], signalData[2], signalData[3], 
 
60
              signalData[4], signalData[5], signalData[6]);
 
61
      len -= 7;
 
62
      signalData += 7;
 
63
    }
 
64
    if(len > 0){
 
65
      fprintf(out, " H\'%.8x", signalData[0]);
 
66
      for(Uint32 i = 1; i<len; i++)
 
67
        fprintf(out, " H\'%.8x", signalData[i]);
 
68
      fprintf(out, "\n");
 
69
    }
 
70
  }
 
71
}
 
72
 
 
73
SignalSender::SignalSender(TransporterFacade *facade)
 
74
  : m_lock(0)
 
75
{
 
76
  m_cond = NdbCondition_Create();
 
77
  theFacade = facade;
 
78
  lock();
 
79
  m_blockNo = theFacade->open(this, execSignal, execNodeStatus);
 
80
  unlock();
 
81
  assert(m_blockNo > 0);
 
82
}
 
83
 
 
84
SignalSender::~SignalSender(){
 
85
  int i;
 
86
  if (m_lock)
 
87
    unlock();
 
88
  theFacade->close(m_blockNo,0);
 
89
  // free these _after_ closing theFacade to ensure that
 
90
  // we delete all signals
 
91
  for (i= m_jobBuffer.size()-1; i>= 0; i--)
 
92
    delete m_jobBuffer[i];
 
93
  for (i= m_usedBuffer.size()-1; i>= 0; i--)
 
94
    delete m_usedBuffer[i];
 
95
  NdbCondition_Destroy(m_cond);
 
96
}
 
97
 
 
98
int SignalSender::lock()
 
99
{
 
100
  if (NdbMutex_Lock(theFacade->theMutexPtr))
 
101
    return -1;
 
102
  m_lock= 1;
 
103
  return 0;
 
104
}
 
105
 
 
106
int SignalSender::unlock()
 
107
{
 
108
  if (NdbMutex_Unlock(theFacade->theMutexPtr))
 
109
    return -1;
 
110
  m_lock= 0;
 
111
  return 0;
 
112
}
 
113
 
 
114
Uint32
 
115
SignalSender::getOwnRef() const {
 
116
  return numberToRef(m_blockNo, theFacade->ownId());
 
117
}
 
118
 
 
119
Uint32
 
120
SignalSender::getAliveNode() const{
 
121
  return theFacade->get_an_alive_node();
 
122
}
 
123
 
 
124
const ClusterMgr::Node & 
 
125
SignalSender::getNodeInfo(Uint16 nodeId) const {
 
126
  return theFacade->theClusterMgr->getNodeInfo(nodeId);
 
127
}
 
128
 
 
129
Uint32
 
130
SignalSender::getNoOfConnectedNodes() const {
 
131
  return theFacade->theClusterMgr->getNoOfConnectedNodes();
 
132
}
 
133
 
 
134
template<class T>
 
135
SimpleSignal *
 
136
SignalSender::waitFor(Uint32 timeOutMillis, T & t)
 
137
{
 
138
  SimpleSignal * s = t.check(m_jobBuffer);
 
139
  if(s != 0){
 
140
    if (m_usedBuffer.push_back(s))
 
141
    {
 
142
      return 0;
 
143
    }
 
144
    return s;
 
145
  }
 
146
  
 
147
  NDB_TICKS now = NdbTick_CurrentMillisecond();
 
148
  NDB_TICKS stop = now + timeOutMillis;
 
149
  Uint32 wait = (timeOutMillis == 0 ? 10 : timeOutMillis);
 
150
  do {
 
151
    NdbCondition_WaitTimeout(m_cond,
 
152
                             theFacade->theMutexPtr, 
 
153
                             wait);
 
154
    
 
155
    
 
156
    SimpleSignal * s = t.check(m_jobBuffer);
 
157
    if(s != 0){
 
158
      if (m_usedBuffer.push_back(s))
 
159
      {
 
160
        return 0;
 
161
      }
 
162
      return s;
 
163
    }
 
164
    
 
165
    now = NdbTick_CurrentMillisecond();
 
166
    wait = (timeOutMillis == 0 ? 10 : stop - now);
 
167
  } while(stop > now || timeOutMillis == 0);
 
168
  
 
169
  return 0;
 
170
 
171
 
 
172
class WaitForAny {
 
173
public:
 
174
  WaitForAny() {}
 
175
  SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){
 
176
    if(m_jobBuffer.size() > 0){
 
177
      SimpleSignal * s = m_jobBuffer[0];
 
178
      m_jobBuffer.erase(0);
 
179
      return s;
 
180
    }
 
181
    return 0;
 
182
  }
 
183
};
 
184
  
 
185
SimpleSignal *
 
186
SignalSender::waitFor(Uint32 timeOutMillis){
 
187
  
 
188
  WaitForAny w;
 
189
  return waitFor(timeOutMillis, w);
 
190
}
 
191
 
 
192
class WaitForNode {
 
193
public:
 
194
  WaitForNode() {}
 
195
  Uint32 m_nodeId;
 
196
  SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){
 
197
    Uint32 len = m_jobBuffer.size();
 
198
    for(Uint32 i = 0; i<len; i++){
 
199
      if(refToNode(m_jobBuffer[i]->header.theSendersBlockRef) == m_nodeId){
 
200
        SimpleSignal * s = m_jobBuffer[i];
 
201
        m_jobBuffer.erase(i);
 
202
        return s;
 
203
      }
 
204
    }
 
205
    return 0;
 
206
  }
 
207
};
 
208
 
 
209
SimpleSignal *
 
210
SignalSender::waitFor(Uint16 nodeId, Uint32 timeOutMillis){
 
211
  
 
212
  WaitForNode w;
 
213
  w.m_nodeId = nodeId;
 
214
  return waitFor(timeOutMillis, w);
 
215
}
 
216
 
 
217
#include <NdbApiSignal.hpp>
 
218
 
 
219
void
 
220
SignalSender::execSignal(void* signalSender, 
 
221
                         NdbApiSignal* signal, 
 
222
                         class LinearSectionPtr ptr[3]){
 
223
  SimpleSignal * s = new SimpleSignal(true);
 
224
  s->header = * signal;
 
225
  memcpy(&s->theData[0], signal->getDataPtr(), 4 * s->header.theLength);
 
226
  for(Uint32 i = 0; i<s->header.m_noOfSections; i++){
 
227
    s->ptr[i].p = new Uint32[ptr[i].sz];
 
228
    s->ptr[i].sz = ptr[i].sz;
 
229
    memcpy(s->ptr[i].p, ptr[i].p, 4 * ptr[i].sz);
 
230
  }
 
231
  SignalSender * ss = (SignalSender*)signalSender;
 
232
  ss->m_jobBuffer.push_back(s);
 
233
  NdbCondition_Signal(ss->m_cond);
 
234
}
 
235
  
 
236
void 
 
237
SignalSender::execNodeStatus(void* signalSender, 
 
238
                             Uint32 nodeId, 
 
239
                             bool alive, 
 
240
                             bool nfCompleted){
 
241
  if (alive) {
 
242
    // node connected
 
243
    return;
 
244
  }
 
245
 
 
246
  SimpleSignal * s = new SimpleSignal(true);
 
247
  SignalSender * ss = (SignalSender*)signalSender;
 
248
 
 
249
  // node disconnected
 
250
  if(nfCompleted)
 
251
  {
 
252
    // node shutdown complete
 
253
    s->header.theVerId_signalNumber = GSN_NF_COMPLETEREP;
 
254
    NFCompleteRep *rep = (NFCompleteRep *)s->getDataPtrSend();
 
255
    rep->blockNo = 0;
 
256
    rep->nodeId = 0;
 
257
    rep->failedNodeId = nodeId;
 
258
    rep->unused = 0;
 
259
    rep->from = 0;
 
260
  }
 
261
  else
 
262
  {
 
263
    // node failure
 
264
    s->header.theVerId_signalNumber = GSN_NODE_FAILREP;
 
265
    NodeFailRep *rep = (NodeFailRep *)s->getDataPtrSend();
 
266
    rep->failNo = 0;
 
267
    rep->masterNodeId = 0;
 
268
    rep->noOfNodes = 1;
 
269
    NodeBitmask::clear(rep->theNodes);
 
270
    NodeBitmask::set(rep->theNodes,nodeId);
 
271
  }
 
272
 
 
273
  ss->m_jobBuffer.push_back(s);
 
274
  NdbCondition_Signal(ss->m_cond);
 
275
}
 
276
 
 
277
#if __SUNPRO_CC != 0x560
 
278
template SimpleSignal* SignalSender::waitFor<WaitForNode>(unsigned, WaitForNode&);
 
279
template SimpleSignal* SignalSender::waitFor<WaitForAny>(unsigned, WaitForAny&);
 
280
#endif
 
281
template class Vector<SimpleSignal*>;
 
282