~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/ndbapi/NdbReceiver.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <ndb_global.h>
 
17
#include "NdbImpl.hpp"
 
18
#include <NdbReceiver.hpp>
 
19
#include "NdbDictionaryImpl.hpp"
 
20
#include <NdbRecAttr.hpp>
 
21
#include <AttributeHeader.hpp>
 
22
#include <NdbTransaction.hpp>
 
23
#include <TransporterFacade.hpp>
 
24
#include <signaldata/TcKeyConf.hpp>
 
25
 
 
26
NdbReceiver::NdbReceiver(Ndb *aNdb) :
 
27
  theMagicNumber(0),
 
28
  m_ndb(aNdb),
 
29
  m_id(NdbObjectIdMap::InvalidId),
 
30
  m_type(NDB_UNINITIALIZED),
 
31
  m_owner(0)
 
32
{
 
33
  theCurrentRecAttr = theFirstRecAttr = 0;
 
34
  m_defined_rows = 0;
 
35
  m_rows = NULL;
 
36
}
 
37
 
 
38
NdbReceiver::~NdbReceiver()
 
39
{
 
40
  DBUG_ENTER("NdbReceiver::~NdbReceiver");
 
41
  if (m_id != NdbObjectIdMap::InvalidId) {
 
42
    m_ndb->theImpl->theNdbObjectIdMap.unmap(m_id, this);
 
43
  }
 
44
  delete[] m_rows;
 
45
  DBUG_VOID_RETURN;
 
46
}
 
47
 
 
48
int
 
49
NdbReceiver::init(ReceiverType type, void* owner)
 
50
{
 
51
  theMagicNumber = 0x11223344;
 
52
  m_type = type;
 
53
  m_owner = owner;
 
54
  theFirstRecAttr = NULL;
 
55
  theCurrentRecAttr = NULL;
 
56
  if (m_id == NdbObjectIdMap::InvalidId) {
 
57
    if (m_ndb)
 
58
    {
 
59
      m_id = m_ndb->theImpl->theNdbObjectIdMap.map(this);
 
60
      if (m_id == NdbObjectIdMap::InvalidId)
 
61
      {
 
62
        setErrorCode(4000);
 
63
        return -1;
 
64
      }
 
65
    }
 
66
  }
 
67
  return 0;
 
68
}
 
69
 
 
70
void
 
71
NdbReceiver::release(){
 
72
  NdbRecAttr* tRecAttr = theFirstRecAttr;
 
73
  while (tRecAttr != NULL)
 
74
  {
 
75
    NdbRecAttr* tSaveRecAttr = tRecAttr;
 
76
    tRecAttr = tRecAttr->next();
 
77
    m_ndb->releaseRecAttr(tSaveRecAttr);
 
78
  }
 
79
  theFirstRecAttr = NULL;
 
80
  theCurrentRecAttr = NULL;
 
81
}
 
82
  
 
83
NdbRecAttr *
 
84
NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
 
85
  NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
 
86
  if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){
 
87
    if (theFirstRecAttr == NULL)
 
88
      theFirstRecAttr = tRecAttr;
 
89
    else
 
90
      theCurrentRecAttr->next(tRecAttr);
 
91
    theCurrentRecAttr = tRecAttr;
 
92
    tRecAttr->next(NULL);
 
93
    return tRecAttr;
 
94
  }
 
95
  if(tRecAttr){
 
96
    m_ndb->releaseRecAttr(tRecAttr);
 
97
  }    
 
98
  return 0;
 
99
}
 
100
 
 
101
#define KEY_ATTR_ID (~(Uint32)0)
 
102
 
 
103
void
 
104
NdbReceiver::calculate_batch_size(Uint32 key_size,
 
105
                                  Uint32 parallelism,
 
106
                                  Uint32& batch_size,
 
107
                                  Uint32& batch_byte_size,
 
108
                                  Uint32& first_batch_size)
 
109
{
 
110
  TransporterFacade *tp= m_ndb->theImpl->m_transporter_facade;
 
111
  Uint32 max_scan_batch_size= tp->get_scan_batch_size();
 
112
  Uint32 max_batch_byte_size= tp->get_batch_byte_size();
 
113
  Uint32 max_batch_size= tp->get_batch_size();
 
114
  Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
 
115
  NdbRecAttr *rec_attr= theFirstRecAttr;
 
116
  while (rec_attr != NULL) {
 
117
    Uint32 attr_size= rec_attr->getColumn()->getSizeInBytes();
 
118
    attr_size= ((attr_size + 7) >> 2) << 2; //Even to word + overhead
 
119
    tot_size+= attr_size;
 
120
    rec_attr= rec_attr->next();
 
121
  }
 
122
  tot_size+= 32; //include signal overhead
 
123
 
 
124
  /**
 
125
   * Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE
 
126
   * bytes sent for each batch from each node. We do however ensure that
 
127
   * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
 
128
   * batch.
 
129
   */
 
130
  if (batch_size == 0)
 
131
  {
 
132
    batch_byte_size= max_batch_byte_size;
 
133
  }
 
134
  else
 
135
  {
 
136
    batch_byte_size= batch_size * tot_size;
 
137
  }
 
138
  
 
139
  if (batch_byte_size * parallelism > max_scan_batch_size) {
 
140
    batch_byte_size= max_scan_batch_size / parallelism;
 
141
  }
 
142
  batch_size= batch_byte_size / tot_size;
 
143
  if (batch_size == 0) {
 
144
    batch_size= 1;
 
145
  } else {
 
146
    if (batch_size > max_batch_size) {
 
147
      batch_size= max_batch_size;
 
148
    } else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
 
149
      batch_size= MAX_PARALLEL_OP_PER_SCAN;
 
150
    }
 
151
  }
 
152
  first_batch_size= batch_size;
 
153
  return;
 
154
}
 
155
 
 
156
int
 
157
NdbReceiver::do_get_value(NdbReceiver * org, 
 
158
                          Uint32 rows, 
 
159
                          Uint32 key_size,
 
160
                          Uint32 range_no){
 
161
  if(rows > m_defined_rows){
 
162
    delete[] m_rows;
 
163
    m_defined_rows = rows;
 
164
    if ((m_rows = new NdbRecAttr*[rows + 1]) == NULL)
 
165
    {
 
166
      setErrorCode(4000);
 
167
      return -1;
 
168
    }
 
169
  }
 
170
  m_rows[rows] = 0;
 
171
  
 
172
  NdbColumnImpl key;
 
173
  if(key_size){
 
174
    key.m_attrId = KEY_ATTR_ID;
 
175
    key.m_arraySize = key_size+1;
 
176
    key.m_attrSize = 4;
 
177
    key.m_nullable = true; // So that receive works w.r.t KEYINFO20
 
178
  }
 
179
  m_hidden_count = (key_size ? 1 : 0) + range_no ;
 
180
  
 
181
  for(Uint32 i = 0; i<rows; i++){
 
182
    NdbRecAttr * prev = theCurrentRecAttr;
 
183
    assert(prev == 0 || i > 0);
 
184
    
 
185
    // Put key-recAttr fir on each row
 
186
    if(key_size && !getValue(&key, (char*)0)){
 
187
      abort();
 
188
      return -1;
 
189
    }
 
190
    
 
191
    if(range_no && 
 
192
       !getValue(&NdbColumnImpl::getImpl(* NdbDictionary::Column::RANGE_NO),0))
 
193
    {
 
194
      abort();
 
195
    }
 
196
 
 
197
    NdbRecAttr* tRecAttr = org->theFirstRecAttr;
 
198
    while(tRecAttr != 0){
 
199
      if(getValue(&NdbColumnImpl::getImpl(*tRecAttr->m_column), (char*)0) != 0)
 
200
        tRecAttr = tRecAttr->next();
 
201
      else
 
202
        break;
 
203
    }
 
204
    
 
205
    if(tRecAttr){
 
206
      abort();
 
207
      return -1;
 
208
    }
 
209
 
 
210
    // Store first recAttr for each row in m_rows[i]
 
211
    if(prev){
 
212
      m_rows[i] = prev->next();
 
213
    } else {
 
214
      m_rows[i] = theFirstRecAttr;
 
215
    }
 
216
  } 
 
217
 
 
218
  prepareSend();
 
219
  return 0;
 
220
}
 
221
 
 
222
NdbRecAttr*
 
223
NdbReceiver::copyout(NdbReceiver & dstRec){
 
224
  NdbRecAttr *src = m_rows[m_current_row++];
 
225
  NdbRecAttr *dst = dstRec.theFirstRecAttr;
 
226
  NdbRecAttr *start = src;
 
227
  Uint32 tmp = m_hidden_count;
 
228
  while(tmp--)
 
229
    src = src->next();
 
230
  
 
231
  while(dst){
 
232
    Uint32 len = src->get_size_in_bytes();
 
233
    dst->receive_data((Uint32*)src->aRef(), len);
 
234
    src = src->next();
 
235
    dst = dst->next();
 
236
  }
 
237
 
 
238
  return start;
 
239
}
 
240
 
 
241
int
 
242
NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
 
243
{
 
244
  NdbRecAttr* currRecAttr = theCurrentRecAttr;
 
245
  
 
246
  for (Uint32 used = 0; used < aLength ; used++){
 
247
    AttributeHeader ah(* aDataPtr++);
 
248
    const Uint32 tAttrId = ah.getAttributeId();
 
249
    const Uint32 tAttrSize = ah.getByteSize();
 
250
 
 
251
    /**
 
252
     * Set all results to NULL if  not found...
 
253
     */
 
254
    while(currRecAttr && currRecAttr->attrId() != tAttrId){
 
255
      currRecAttr = currRecAttr->next();
 
256
    }
 
257
    
 
258
    if(currRecAttr && currRecAttr->receive_data(aDataPtr, tAttrSize)){
 
259
      Uint32 add= (tAttrSize + 3) >> 2;
 
260
      used += add;
 
261
      aDataPtr += add;
 
262
      currRecAttr = currRecAttr->next();
 
263
    } else {
 
264
      ndbout_c("%p: tAttrId: %d currRecAttr: %p tAttrSize: %d %d", this,
 
265
               tAttrId, currRecAttr, 
 
266
               tAttrSize, currRecAttr->get_size_in_bytes());
 
267
      currRecAttr = theCurrentRecAttr;
 
268
      while(currRecAttr != 0){
 
269
        ndbout_c("%d ", currRecAttr->attrId());
 
270
        currRecAttr = currRecAttr->next();
 
271
      }
 
272
      abort();
 
273
      return -1;
 
274
    }
 
275
  }
 
276
 
 
277
  theCurrentRecAttr = currRecAttr;
 
278
  
 
279
  /**
 
280
   * Update m_received_result_length
 
281
   */
 
282
  Uint32 exp = m_expected_result_length; 
 
283
  Uint32 tmp = m_received_result_length + aLength;
 
284
  m_received_result_length = tmp;
 
285
 
 
286
  return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
 
287
}
 
288
 
 
289
int
 
290
NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength)
 
291
{
 
292
  NdbRecAttr* currRecAttr = m_rows[m_current_row++];
 
293
  assert(currRecAttr->attrId() == KEY_ATTR_ID);
 
294
  currRecAttr->receive_data(aDataPtr, 4*(aLength + 1));
 
295
  
 
296
  /**
 
297
   * Save scanInfo in the end of keyinfo
 
298
   */
 
299
  ((Uint32*)currRecAttr->aRef())[aLength] = info;
 
300
  
 
301
  Uint32 tmp = m_received_result_length + aLength;
 
302
  m_received_result_length = tmp;
 
303
  
 
304
  return (tmp == m_expected_result_length ? 1 : 0);
 
305
}
 
306
 
 
307
void
 
308
NdbReceiver::setErrorCode(int code)
 
309
{
 
310
  theMagicNumber = 0;
 
311
  NdbOperation* op = (NdbOperation*)getOwner();
 
312
  op->setErrorCode(code);
 
313
}