2
Licensed Materials - Property of IBM
3
DB2 Storage Engine Enablement
4
Copyright IBM Corporation 2007,2008
7
Redistribution and use in source and binary forms, with or without modification,
8
are permitted provided that the following conditions are met:
9
(a) Redistributions of source code must retain this list of conditions, the
10
copyright notice in section {d} below, and the disclaimer following this
12
(b) Redistributions in binary form must reproduce this list of conditions, the
13
copyright notice in section (d) below, and the disclaimer following this
14
list of conditions, in the documentation and/or other materials provided
15
with the distribution.
16
(c) The name of IBM may not be used to endorse or promote products derived from
17
this software without specific prior written permission.
18
(d) The text of the required copyright notice is:
19
Licensed Materials - Property of IBM
20
DB2 Storage Engine Enablement
21
Copyright IBM Corporation 2007,2008
24
THIS SOFTWARE IS PROVIDED BY IBM CORPORATION "AS IS" AND ANY EXPRESS OR IMPLIED
25
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
26
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
27
SHALL IBM CORPORATION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
28
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
29
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31
CONTRACT, STRICT LIABILITY, OR TORT INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
32
IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
37
#include "db2i_ioBuffers.h"
40
Request another block of rows
42
Request the next set of rows from DB2. This must only be called after
45
@param orientation The direction to use when reading through the table.
47
void IOAsyncReadBuffer::loadNewRows(char orientation)
50
maxRows() = rowsToBlock;
52
DBUG_PRINT("db2i_ioBuffers::loadNewRows", ("Requesting %d rows, async = %d", rowsToBlock, readIsAsync));
54
rc = getBridge()->expectErrors(QMY_ERR_END_OF_BLOCK, QMY_ERR_LOB_SPACE_TOO_SMALL)
66
DBUG_PRINT("db2i_ioBuffers::loadNewRows", ("recordsRead: %d, rc: %d", (uint32)rowCount(), rc));
69
*releaseRowNeeded = true;
71
if (rc == QMY_ERR_END_OF_BLOCK)
73
// This is really just an informational error, so we ignore it.
75
DBUG_PRINT("db2i_ioBuffers::loadNewRows", ("End of block signalled"));
77
else if (rc == QMY_ERR_END_OF_FILE)
79
// If we reach EOF or end-of-key, DB2 guarantees that no rows will be locked.
80
rc = HA_ERR_END_OF_FILE;
81
*releaseRowNeeded = false;
83
else if (rc == QMY_ERR_KEY_NOT_FOUND)
85
rc = HA_ERR_KEY_NOT_FOUND;
86
*releaseRowNeeded = false;
94
Empty the message pipe to prepare for another read.
96
void IOAsyncReadBuffer::drainPipe()
98
DBUG_ASSERT(pipeState == PendingFullBufferMsg);
102
while ((bytes = read(msgPipe, msg, sizeof(msg))) > 0)
104
DBUG_PRINT("db2i_ioBuffers::drainPipe",("Pipe returned %d bytes", bytes));
105
lastMsg = &msg[bytes / (sizeof(msg[0]))-1];
106
if (lastMsg->CumRowCnt == maxRows() ||
107
lastMsg->RtnCod != 0)
109
pipeState = ConsumedFullBufferMsg;
114
DBUG_PRINT("db2i_ioBuffers::drainPipe",("rc = %d, rows = %d, max = %d", lastMsg->RtnCod, lastMsg->CumRowCnt, (uint32)maxRows()));
119
Poll the message pipe for async read messages
123
@param orientation The direction to use when reading through the table.
125
void IOAsyncReadBuffer::pollNextRow(char orientation)
127
DBUG_ASSERT(readIsAsync);
129
// Handle the case in which the buffer is full.
130
if (rowCount() == maxRows())
132
// If we haven't read to the end, exit here.
133
if (readCursor < rowCount())
136
if (pipeState == PendingFullBufferMsg)
138
if (pipeState == ConsumedFullBufferMsg)
139
loadNewRows(orientation);
144
PipeRpy_t* lastMsg = NULL;
148
int bytes = read(msgPipe, msg, sizeof(msg));
149
DBUG_PRINT("db2i_ioBuffers::pollNextRow",("Pipe returned %d bytes", bytes));
151
if (unlikely(bytes < 0))
153
DBUG_PRINT("db2i_ioBuffers::pollNextRow", ("Error"));
160
DBUG_ASSERT(bytes % sizeof(msg[0]) == 0);
161
lastMsg = &msg[bytes / (sizeof(msg[0]))-1];
163
if (lastMsg->RtnCod || (lastMsg->CumRowCnt == usedRows()))
165
rc = lastMsg->RtnCod;
170
*releaseRowNeeded = true;
172
if (rc == QMY_ERR_END_OF_BLOCK)
174
else if (rc == QMY_ERR_END_OF_FILE)
176
// If we reach EOF or end-of-key, DB2 guarantees that no rows will be locked.
177
rc = HA_ERR_END_OF_FILE;
178
*releaseRowNeeded = false;
180
else if (rc == QMY_ERR_KEY_NOT_FOUND)
182
rc = HA_ERR_KEY_NOT_FOUND;
183
*releaseRowNeeded = false;
187
DBUG_PRINT("db2i_ioBuffers::pollNextRow", ("Good data: rc=%d; rows=%d; usedRows=%d", lastMsg->RtnCod, lastMsg->CumRowCnt, (uint32)usedRows()));
188
if (lastMsg && likely(!rc))
190
if (lastMsg->CumRowCnt < maxRows())
191
pipeState = PendingFullBufferMsg;
193
pipeState = ConsumedFullBufferMsg;
195
DBUG_ASSERT(lastMsg->CumRowCnt <= usedRows());
198
DBUG_ASSERT(rowCount() <= getRowCapacity());
200
DBUG_PRINT("db2i_ioBuffers::pollNextRow", ("filledRows: %d, rc: %d", rowCount(), rc));
206
Prepare for the destruction of the row buffer storage.
208
void IOAsyncReadBuffer::prepForFree()
212
IORowBuffer::prepForFree();
217
Initialize the newly allocated storage.
219
@param sizeChanged Indicates whether the storage capacity is being changed.
221
void IOAsyncReadBuffer::initAfterAllocate(bool sizeChanged)
225
if (sizeChanged || ((void*)rrnList == NULL))
226
rrnList.realloc(getRowCapacity() * sizeof(uint32));
231
Send an initial read request
233
@param infile The file (table/index) being read from
234
@param orientation The orientation to use for this read request
235
@param rowsToBuffer The number of rows to request each time
236
@param useAsync Whether reads should be performed asynchronously.
237
@param key The key to use (if any)
238
@param keyLength The length of key (if any)
239
@param keyParts The number of columns in the key (if any)
242
void IOAsyncReadBuffer::newReadRequest(FILE_HANDLE infile,
250
DBUG_ENTER("db2i_ioBuffers::newReadRequest");
251
DBUG_ASSERT(rowsToBuffer <= getRowCapacity());
253
if (readCursor < rowCount())
254
DBUG_PRINT("PERF:",("Wasting %d buffered rows!\n", rowCount() - readCursor));
258
int ileDescriptor = QMY_REUSE;
262
if (likely(useAsync))
264
if (rowsToBuffer == 1)
266
// Async provides little or no benefit for single row reads, so we turn it off
267
DBUG_PRINT("db2i_ioBuffers::newReadRequest", ("Disabling async"));
273
if (rc) DBUG_VOID_RETURN;
275
// Translate the pipe write descriptor into the equivalent ILE descriptor
276
rc = fstatx(fildes[1], (struct stat*)&ileDescriptor, sizeof(ileDescriptor), STX_XPFFD_PASE);
283
pipeState = Untouched;
286
DBUG_PRINT("db2i_ioBuffers::newReadRequest", ("Opened pipe %d", fildes[0]));
291
readIsAsync = useAsync;
292
rowsToBlock = rowsToBuffer;
296
rc = getBridge()->expectErrors(QMY_ERR_END_OF_BLOCK, QMY_ERR_LOB_SPACE_TOO_SMALL)
309
// Having shared the pipe with ILE, we relinquish our claim on the write end
314
// If we reach EOF or end-of-key, DB2 guarantees that no rows will be locked.
315
if (rc == QMY_ERR_END_OF_FILE)
317
rc = HA_ERR_END_OF_FILE;
318
*releaseRowNeeded = false;
320
else if (rc == QMY_ERR_KEY_NOT_FOUND)
323
rc = HA_ERR_END_OF_FILE;
325
rc = HA_ERR_KEY_NOT_FOUND;
326
*releaseRowNeeded = false;
329
*releaseRowNeeded = true;