1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include <ndb_global.h>
17
#include "NdbImpl.hpp"
18
#include <NdbReceiver.hpp>
19
#include "NdbDictionaryImpl.hpp"
20
#include <NdbRecAttr.hpp>
21
#include <AttributeHeader.hpp>
22
#include <NdbTransaction.hpp>
23
#include <TransporterFacade.hpp>
24
#include <signaldata/TcKeyConf.hpp>
26
NdbReceiver::NdbReceiver(Ndb *aNdb) :
29
m_id(NdbObjectIdMap::InvalidId),
30
m_type(NDB_UNINITIALIZED),
33
theCurrentRecAttr = theFirstRecAttr = 0;
38
NdbReceiver::~NdbReceiver()
40
DBUG_ENTER("NdbReceiver::~NdbReceiver");
41
if (m_id != NdbObjectIdMap::InvalidId) {
42
m_ndb->theImpl->theNdbObjectIdMap.unmap(m_id, this);
49
NdbReceiver::init(ReceiverType type, void* owner)
51
theMagicNumber = 0x11223344;
54
theFirstRecAttr = NULL;
55
theCurrentRecAttr = NULL;
56
if (m_id == NdbObjectIdMap::InvalidId) {
59
m_id = m_ndb->theImpl->theNdbObjectIdMap.map(this);
60
if (m_id == NdbObjectIdMap::InvalidId)
71
NdbReceiver::release(){
72
NdbRecAttr* tRecAttr = theFirstRecAttr;
73
while (tRecAttr != NULL)
75
NdbRecAttr* tSaveRecAttr = tRecAttr;
76
tRecAttr = tRecAttr->next();
77
m_ndb->releaseRecAttr(tSaveRecAttr);
79
theFirstRecAttr = NULL;
80
theCurrentRecAttr = NULL;
84
NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
85
NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
86
if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){
87
if (theFirstRecAttr == NULL)
88
theFirstRecAttr = tRecAttr;
90
theCurrentRecAttr->next(tRecAttr);
91
theCurrentRecAttr = tRecAttr;
96
m_ndb->releaseRecAttr(tRecAttr);
101
#define KEY_ATTR_ID (~(Uint32)0)
104
NdbReceiver::calculate_batch_size(Uint32 key_size,
107
Uint32& batch_byte_size,
108
Uint32& first_batch_size)
110
TransporterFacade *tp= m_ndb->theImpl->m_transporter_facade;
111
Uint32 max_scan_batch_size= tp->get_scan_batch_size();
112
Uint32 max_batch_byte_size= tp->get_batch_byte_size();
113
Uint32 max_batch_size= tp->get_batch_size();
114
Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
115
NdbRecAttr *rec_attr= theFirstRecAttr;
116
while (rec_attr != NULL) {
117
Uint32 attr_size= rec_attr->getColumn()->getSizeInBytes();
118
attr_size= ((attr_size + 7) >> 2) << 2; //Even to word + overhead
119
tot_size+= attr_size;
120
rec_attr= rec_attr->next();
122
tot_size+= 32; //include signal overhead
125
* Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE
126
* bytes sent for each batch from each node. We do however ensure that
127
* no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
132
batch_byte_size= max_batch_byte_size;
136
batch_byte_size= batch_size * tot_size;
139
if (batch_byte_size * parallelism > max_scan_batch_size) {
140
batch_byte_size= max_scan_batch_size / parallelism;
142
batch_size= batch_byte_size / tot_size;
143
if (batch_size == 0) {
146
if (batch_size > max_batch_size) {
147
batch_size= max_batch_size;
148
} else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
149
batch_size= MAX_PARALLEL_OP_PER_SCAN;
152
first_batch_size= batch_size;
157
NdbReceiver::do_get_value(NdbReceiver * org,
161
if(rows > m_defined_rows){
163
m_defined_rows = rows;
164
if ((m_rows = new NdbRecAttr*[rows + 1]) == NULL)
174
key.m_attrId = KEY_ATTR_ID;
175
key.m_arraySize = key_size+1;
177
key.m_nullable = true; // So that receive works w.r.t KEYINFO20
179
m_hidden_count = (key_size ? 1 : 0) + range_no ;
181
for(Uint32 i = 0; i<rows; i++){
182
NdbRecAttr * prev = theCurrentRecAttr;
183
assert(prev == 0 || i > 0);
185
// Put key-recAttr fir on each row
186
if(key_size && !getValue(&key, (char*)0)){
192
!getValue(&NdbColumnImpl::getImpl(* NdbDictionary::Column::RANGE_NO),0))
197
NdbRecAttr* tRecAttr = org->theFirstRecAttr;
198
while(tRecAttr != 0){
199
if(getValue(&NdbColumnImpl::getImpl(*tRecAttr->m_column), (char*)0) != 0)
200
tRecAttr = tRecAttr->next();
210
// Store first recAttr for each row in m_rows[i]
212
m_rows[i] = prev->next();
214
m_rows[i] = theFirstRecAttr;
223
NdbReceiver::copyout(NdbReceiver & dstRec){
224
NdbRecAttr *src = m_rows[m_current_row++];
225
NdbRecAttr *dst = dstRec.theFirstRecAttr;
226
NdbRecAttr *start = src;
227
Uint32 tmp = m_hidden_count;
232
Uint32 len = src->get_size_in_bytes();
233
dst->receive_data((Uint32*)src->aRef(), len);
242
NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
244
NdbRecAttr* currRecAttr = theCurrentRecAttr;
246
for (Uint32 used = 0; used < aLength ; used++){
247
AttributeHeader ah(* aDataPtr++);
248
const Uint32 tAttrId = ah.getAttributeId();
249
const Uint32 tAttrSize = ah.getByteSize();
252
* Set all results to NULL if not found...
254
while(currRecAttr && currRecAttr->attrId() != tAttrId){
255
currRecAttr = currRecAttr->next();
258
if(currRecAttr && currRecAttr->receive_data(aDataPtr, tAttrSize)){
259
Uint32 add= (tAttrSize + 3) >> 2;
262
currRecAttr = currRecAttr->next();
264
ndbout_c("%p: tAttrId: %d currRecAttr: %p tAttrSize: %d %d", this,
265
tAttrId, currRecAttr,
266
tAttrSize, currRecAttr->get_size_in_bytes());
267
currRecAttr = theCurrentRecAttr;
268
while(currRecAttr != 0){
269
ndbout_c("%d ", currRecAttr->attrId());
270
currRecAttr = currRecAttr->next();
277
theCurrentRecAttr = currRecAttr;
280
* Update m_received_result_length
282
Uint32 exp = m_expected_result_length;
283
Uint32 tmp = m_received_result_length + aLength;
284
m_received_result_length = tmp;
286
return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
290
NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength)
292
NdbRecAttr* currRecAttr = m_rows[m_current_row++];
293
assert(currRecAttr->attrId() == KEY_ATTR_ID);
294
currRecAttr->receive_data(aDataPtr, 4*(aLength + 1));
297
* Save scanInfo in the end of keyinfo
299
((Uint32*)currRecAttr->aRef())[aLength] = info;
301
Uint32 tmp = m_received_result_length + aLength;
302
m_received_result_length = tmp;
304
return (tmp == m_expected_result_length ? 1 : 0);
308
NdbReceiver::setErrorCode(int code)
311
NdbOperation* op = (NdbOperation*)getOwner();
312
op->setErrorCode(code);