~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/ndbapi/NdbIndexStat.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <ndb_global.h>
 
17
#include <AttributeHeader.hpp>
 
18
#include <NdbSqlUtil.hpp>
 
19
#include <NdbIndexStat.hpp>
 
20
#include <NdbTransaction.hpp>
 
21
#include <NdbIndexScanOperation.hpp>
 
22
#include "NdbDictionaryImpl.hpp"
 
23
#include <my_sys.h>
 
24
 
 
25
NdbIndexStat::NdbIndexStat(const NdbDictionary::Index* index) :
 
26
  m_index(index->m_impl),
 
27
  m_cache(NULL)
 
28
{
 
29
}
 
30
 
 
31
NdbIndexStat::~NdbIndexStat()
 
32
{
 
33
  delete [] m_cache;
 
34
  m_cache = NULL;
 
35
}
 
36
 
 
37
int
 
38
NdbIndexStat::alloc_cache(Uint32 entries)
 
39
{
 
40
  delete [] m_cache;
 
41
  m_cache = NULL;
 
42
  if (entries == 0) {
 
43
    return 0;
 
44
  }
 
45
  Uint32 i;
 
46
  Uint32 keysize = 0;
 
47
  for (i = 0; i < m_index.m_columns.size(); i++) {
 
48
    NdbColumnImpl* c = m_index.m_columns[i];
 
49
    keysize += 2;       // counting extra headers
 
50
    keysize += (c->m_attrSize * c->m_arraySize + 3 ) / 4;
 
51
  }
 
52
  Uint32 areasize = entries * (PointerSize + EntrySize + keysize);
 
53
  if (areasize > (1 << 16))
 
54
    areasize = (1 << 16);
 
55
  Uint32 cachesize = 2 * areasize;
 
56
  m_cache = new Uint32 [cachesize];
 
57
  if (m_cache == NULL) {
 
58
    set_error(4000);
 
59
    return -1;
 
60
  }
 
61
  m_areasize = areasize;
 
62
  m_seq = 0;
 
63
  Uint32 idir;
 
64
  for (idir = 0; idir <= 1; idir++) {
 
65
    Area& a = m_area[idir];
 
66
    a.m_data = &m_cache[idir * areasize];
 
67
    a.m_offset = a.m_data - &m_cache[0];
 
68
    a.m_free = areasize;
 
69
    a.m_entries = 0;
 
70
    a.m_idir = idir;
 
71
    a.pad1 = 0;
 
72
  }
 
73
#ifdef VM_TRACE
 
74
  memset(&m_cache[0], 0x3f, cachesize << 2);
 
75
#endif
 
76
  return 0;
 
77
}
 
78
 
 
79
#ifndef VM_TRACE
 
80
#define stat_verify()
 
81
#else
 
82
void
 
83
NdbIndexStat::stat_verify()
 
84
{
 
85
  Uint32 idir;
 
86
  for (idir = 0; idir <= 1; idir++) {
 
87
    Uint32 i;
 
88
    const Area& a = m_area[idir];
 
89
    assert(a.m_offset == idir * m_areasize);
 
90
    assert(a.m_data == &m_cache[a.m_offset]);
 
91
    Uint32 pointerwords = PointerSize * a.m_entries;
 
92
    Uint32 entrywords = 0;
 
93
    for (i = 0; i < a.m_entries; i++) {
 
94
      const Pointer& p = a.get_pointer(i);
 
95
      const Entry& e = a.get_entry(i);
 
96
      assert(a.get_pos(e) == p.m_pos);
 
97
      entrywords += EntrySize + e.m_keylen;
 
98
    }
 
99
    assert(a.m_free <= m_areasize);
 
100
    assert(pointerwords + a.m_free + entrywords == m_areasize);
 
101
    Uint32 off = pointerwords + a.m_free;
 
102
    for (i = 0; i < a.m_entries; i++) {
 
103
      assert(off < m_areasize);
 
104
      const Entry& e = *(const Entry*)&a.m_data[off];
 
105
      off += EntrySize + e.m_keylen;
 
106
    }
 
107
    assert(off == m_areasize);
 
108
    for (i = 0; i < a.m_entries; i++) {
 
109
      const Entry& e = a.get_entry(i);
 
110
      const Uint32* entrykey = (const Uint32*)&e + EntrySize;
 
111
      Uint32 n = 0;
 
112
      while (n + 2 <= e.m_keylen) {
 
113
        Uint32 t = entrykey[n++];
 
114
        assert(t == 2 * idir || t == 2 * idir + 1 || t == 4);
 
115
        AttributeHeader ah = *(const AttributeHeader*)&entrykey[n++];
 
116
        n += ah.getDataSize();
 
117
      }
 
118
      assert(n == e.m_keylen);
 
119
    }
 
120
    for (i = 0; i + 1 < a.m_entries; i++) {
 
121
      const Entry& e1 = a.get_entry(i);
 
122
      const Entry& e2 = a.get_entry(i + 1);
 
123
      const Uint32* entrykey1 = (const Uint32*)&e1 + EntrySize;
 
124
      const Uint32* entrykey2 = (const Uint32*)&e2 + EntrySize;
 
125
      int ret = stat_cmpkey(a, entrykey1, e1.m_keylen, entrykey2, e2.m_keylen);
 
126
      assert(ret == -1);
 
127
    }
 
128
  }
 
129
}
 
130
#endif
 
131
 
 
132
// compare keys
 
133
int
 
134
NdbIndexStat::stat_cmpkey(const Area& a, const Uint32* key1, Uint32 keylen1, const Uint32* key2, Uint32 keylen2)
 
135
{
 
136
  const Uint32 idir = a.m_idir;
 
137
  const int jdir = 1 - 2 * int(idir);
 
138
  Uint32 i1 = 0, i2 = 0;
 
139
  Uint32 t1 = 4, t2 = 4; //BoundEQ
 
140
  int ret = 0;
 
141
  Uint32 k = 0;
 
142
  while (k < m_index.m_columns.size()) {
 
143
    NdbColumnImpl* c = m_index.m_columns[k];
 
144
    Uint32 n = c->m_attrSize * c->m_arraySize;
 
145
    // absence of keypart is treated specially
 
146
    bool havekp1 = (i1 + 2 <= keylen1);
 
147
    bool havekp2 = (i2 + 2 <= keylen2);
 
148
    AttributeHeader ah1;
 
149
    AttributeHeader ah2;
 
150
    if (havekp1) {
 
151
      t1 = key1[i1++];
 
152
      assert(t1 == 2 * idir || t1 == 2 * idir + 1 || t1 == 4);
 
153
      ah1 = *(const AttributeHeader*)&key1[i1++];
 
154
    }
 
155
    if (havekp2) {
 
156
      t2 = key2[i2++];
 
157
      assert(t2 == 2 * idir || t2 == 2 * idir + 1 || t2 == 4);
 
158
      ah2 = *(const AttributeHeader*)&key2[i2++];
 
159
    }
 
160
    if (havekp1) {
 
161
      if (havekp2) {
 
162
        if (! ah1.isNULL()) {
 
163
          if (! ah2.isNULL()) {
 
164
            const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(c->m_type);
 
165
            ret = (*sqlType.m_cmp)(c->m_cs, &key1[i1], n, &key2[i2], n, true);
 
166
            if (ret != 0)
 
167
              break;
 
168
          } else {
 
169
            ret = +1;
 
170
            break;
 
171
          }
 
172
        } else if (! ah2.isNULL()) {
 
173
          ret = -1;
 
174
          break;
 
175
        }
 
176
      } else {
 
177
        ret = +jdir;
 
178
        break;
 
179
      }
 
180
    } else {
 
181
      if (havekp2) {
 
182
        ret = -jdir;
 
183
        break;
 
184
      } else {
 
185
        // no more keyparts on either side
 
186
        break;
 
187
      }
 
188
    }
 
189
    i1 += ah1.getDataSize();
 
190
    i2 += ah2.getDataSize();
 
191
    k++;
 
192
  }
 
193
  if (ret == 0) {
 
194
    // strict bound is greater as start key and less as end key
 
195
    int s1 = t1 & 1;
 
196
    int s2 = t2 & 1;
 
197
    ret = (s1 - s2) * jdir;
 
198
  }
 
199
  return ret;
 
200
}
 
201
 
 
202
// find first key >= given key
 
203
int
 
204
NdbIndexStat::stat_search(const Area& a, const Uint32* key, Uint32 keylen, Uint32* idx, bool* match)
 
205
{
 
206
  // points at minus/plus infinity
 
207
  int lo = -1;
 
208
  int hi = a.m_entries;
 
209
  // loop invariant: key(lo) < key < key(hi)
 
210
  while (hi - lo > 1) {
 
211
    // observe lo < j < hi
 
212
    int j = (hi + lo) / 2;
 
213
    Entry& e = a.get_entry(j);
 
214
    const Uint32* key2 = (Uint32*)&e + EntrySize;
 
215
    Uint32 keylen2 = e.m_keylen;
 
216
    int ret = stat_cmpkey(a, key, keylen, key2, keylen2);
 
217
    // observe the loop invariant if ret != 0
 
218
    if (ret < 0)
 
219
      hi = j;
 
220
    else if (ret > 0)
 
221
      lo = j;
 
222
    else {
 
223
      *idx = j;
 
224
      *match = true;
 
225
      return 0;
 
226
    }
 
227
  }
 
228
  // hi - lo == 1 and key(lo) < key < key(hi)
 
229
  *idx = hi;
 
230
  *match = false;
 
231
  return 0;
 
232
}
 
233
 
 
234
// find oldest entry
 
235
int
 
236
NdbIndexStat::stat_oldest(const Area& a)
 
237
{
 
238
  Uint32 i, k= 0, m;
 
239
  bool found = false;
 
240
  m = ~(Uint32)0;     // shut up incorrect CC warning
 
241
  for (i = 0; i < a.m_entries; i++) {
 
242
    Pointer& p = a.get_pointer(i);
 
243
    Uint32 m2 = m_seq >= p.m_seq ? m_seq - p.m_seq : p.m_seq - m_seq;
 
244
    if (! found || m < m2) {
 
245
      m = m2;
 
246
      k = i;
 
247
      found = true;
 
248
    }
 
249
  }
 
250
  assert(found);
 
251
  return k;
 
252
}
 
253
 
 
254
// delete entry
 
255
int
 
256
NdbIndexStat::stat_delete(Area& a, Uint32 k)
 
257
{
 
258
  Uint32 i;
 
259
  NdbIndexStat::Entry& e = a.get_entry(k);
 
260
  Uint32 entrylen = EntrySize + e.m_keylen;
 
261
  Uint32 pos = a.get_pos(e);
 
262
  // adjust pointers to entries after
 
263
  for (i = 0; i < a.m_entries; i++) {
 
264
    Pointer& p = a.get_pointer(i);
 
265
    if (p.m_pos < pos) {
 
266
      p.m_pos += entrylen;
 
267
    }
 
268
  }
 
269
  // compact entry area
 
270
  unsigned firstpos = a.get_firstpos();
 
271
  for (i = pos; i > firstpos; i--) {
 
272
    a.m_data[i + entrylen - 1] = a.m_data[i - 1];
 
273
  }
 
274
  // compact pointer area
 
275
  for (i = k; i + 1 < a.m_entries; i++) {
 
276
    NdbIndexStat::Pointer& p = a.get_pointer(i);
 
277
    NdbIndexStat::Pointer& q = a.get_pointer(i + 1);
 
278
    p = q;
 
279
  }
 
280
  a.m_free += PointerSize + entrylen;
 
281
  a.m_entries--;
 
282
  stat_verify();
 
283
  return 0;
 
284
}
 
285
 
 
286
// update or insert stat values
 
287
int
 
288
NdbIndexStat::stat_update(const Uint32* key1, Uint32 keylen1, const Uint32* key2, Uint32 keylen2, const float pct[2])
 
289
{
 
290
  const Uint32* const key[2] = { key1, key2 };
 
291
  const Uint32 keylen[2] = { keylen1, keylen2 };
 
292
  Uint32 idir;
 
293
  for (idir = 0; idir <= 1; idir++) {
 
294
    Area& a = m_area[idir];
 
295
    Uint32 k;
 
296
    bool match;
 
297
    stat_search(a, key[idir], keylen[idir], &k, &match);
 
298
    Uint16 seq = m_seq++;
 
299
    if (match) {
 
300
      // update old entry
 
301
      NdbIndexStat::Pointer& p = a.get_pointer(k);
 
302
      NdbIndexStat::Entry& e = a.get_entry(k);
 
303
      e.m_pct = pct[idir];
 
304
      p.m_seq = seq;
 
305
    } else {
 
306
      Uint32 entrylen = NdbIndexStat::EntrySize + keylen[idir];
 
307
      Uint32 need = NdbIndexStat::PointerSize + entrylen;
 
308
      while (need > a.m_free) {
 
309
        Uint32 j = stat_oldest(a);
 
310
        if (j < k)
 
311
          k--;
 
312
        stat_delete(a, j);
 
313
      }
 
314
      // insert pointer
 
315
      Uint32 i;
 
316
      for (i = a.m_entries; i > k; i--) {
 
317
        NdbIndexStat::Pointer& p1 = a.get_pointer(i);
 
318
        NdbIndexStat::Pointer& p2 = a.get_pointer(i - 1);
 
319
        p1 = p2;
 
320
      }
 
321
      NdbIndexStat::Pointer& p = a.get_pointer(k);
 
322
      // insert entry
 
323
      Uint32 firstpos = a.get_firstpos();
 
324
      p.m_pos = firstpos - entrylen;
 
325
      NdbIndexStat::Entry& e = a.get_entry(k);
 
326
      e.m_pct = pct[idir];
 
327
      e.m_keylen = keylen[idir];
 
328
      Uint32* entrykey = (Uint32*)&e + EntrySize;
 
329
      for (i = 0; i < keylen[idir]; i++) {
 
330
        entrykey[i] = key[idir][i];
 
331
      }
 
332
      p.m_seq = seq;
 
333
      // total
 
334
      a.m_free -= PointerSize + entrylen;
 
335
      a.m_entries++;
 
336
    }
 
337
  }
 
338
  stat_verify();
 
339
  return 0;
 
340
}
 
341
 
 
342
int
 
343
NdbIndexStat::stat_select(const Uint32* key1, Uint32 keylen1, const Uint32* key2, Uint32 keylen2, float pct[2])
 
344
{
 
345
  const Uint32* const key[2] = { key1, key2 };
 
346
  const Uint32 keylen[2] = { keylen1, keylen2 };
 
347
  Uint32 idir;
 
348
  for (idir = 0; idir <= 1; idir++) {
 
349
    Area& a = m_area[idir];
 
350
    Uint32 k;
 
351
    bool match;
 
352
    stat_search(a, key[idir], keylen[idir], &k, &match);
 
353
    if (match) {
 
354
      NdbIndexStat::Entry& e = a.get_entry(k);
 
355
      pct[idir] = e.m_pct;
 
356
    } else if (k == 0) {
 
357
      NdbIndexStat::Entry& e = a.get_entry(k);
 
358
      if (idir == 0)
 
359
        pct[idir] = e.m_pct / 2;
 
360
      else
 
361
        pct[idir] = e.m_pct + (1 - e.m_pct) / 2;
 
362
    } else if (k == a.m_entries) {
 
363
      NdbIndexStat::Entry& e = a.get_entry(k - 1);
 
364
      if (idir == 0)
 
365
        pct[idir] = e.m_pct + (1 - e.m_pct) / 2;
 
366
      else
 
367
        pct[idir] = e.m_pct / 2;
 
368
    } else {
 
369
      NdbIndexStat::Entry& e1 = a.get_entry(k - 1);
 
370
      NdbIndexStat::Entry& e2 = a.get_entry(k);
 
371
      pct[idir] = (e1.m_pct + e2.m_pct) / 2;
 
372
    }
 
373
  }
 
374
  return 0;
 
375
}
 
376
 
 
377
int
 
378
NdbIndexStat::records_in_range(const NdbDictionary::Index* index, NdbIndexScanOperation* op, Uint64 table_rows, Uint64* count, int flags)
 
379
{
 
380
  DBUG_ENTER("NdbIndexStat::records_in_range");
 
381
  Uint64 rows;
 
382
  Uint32 key1[1000], keylen1;
 
383
  Uint32 key2[1000], keylen2;
 
384
 
 
385
  if (m_cache == NULL)
 
386
    flags |= RR_UseDb | RR_NoUpdate;
 
387
  else if (m_area[0].m_entries == 0 || m_area[1].m_entries == 0)
 
388
    flags |= RR_UseDb;
 
389
 
 
390
  if ((flags & (RR_UseDb | RR_NoUpdate)) != RR_UseDb | RR_NoUpdate) {
 
391
    // get start and end key - assume bound is ordered, wellformed
 
392
    Uint32 bound[1000];
 
393
    Uint32 boundlen = op->getKeyFromSCANTABREQ(bound, 1000);
 
394
 
 
395
    keylen1 = keylen2 = 0;
 
396
    Uint32 n = 0;
 
397
    while (n < boundlen) {
 
398
      Uint32 t = bound[n];
 
399
      AttributeHeader ah(bound[n + 1]);
 
400
      Uint32 sz = 2 + ah.getDataSize();
 
401
      t &= 0xFFFF;      // may contain length
 
402
      assert(t <= 4);
 
403
      bound[n] = t;
 
404
      if (t == 0 || t == 1 || t == 4) {
 
405
        memcpy(&key1[keylen1], &bound[n], sz << 2);
 
406
        keylen1 += sz;
 
407
      }
 
408
      if (t == 2 || t == 3 || t == 4) {
 
409
        memcpy(&key2[keylen2], &bound[n], sz << 2);
 
410
        keylen2 += sz;
 
411
      }
 
412
      n += sz;
 
413
    }
 
414
  }
 
415
 
 
416
  if (flags & RR_UseDb) {
 
417
    Uint32 out[4] = { 0, 0, 0, 0 };  // rows, in, before, after
 
418
    float tot[4] = { 0, 0, 0, 0 };   // totals of above
 
419
    int cnt, ret;
 
420
    bool forceSend = true;
 
421
    NdbTransaction* trans = op->m_transConnection;
 
422
    if (op->interpret_exit_last_row() == -1 ||
 
423
        op->getValue(NdbDictionary::Column::RECORDS_IN_RANGE, (char*)out) == 0) {
 
424
      m_error = op->getNdbError();
 
425
      DBUG_PRINT("error", ("op:%d", op->getNdbError().code));
 
426
      DBUG_RETURN(-1);
 
427
    }
 
428
    if (trans->execute(NdbTransaction::NoCommit,
 
429
                       NdbOperation::AbortOnError, forceSend) == -1) {
 
430
      m_error = trans->getNdbError();
 
431
      DBUG_PRINT("error", ("trans:%d op:%d", trans->getNdbError().code,
 
432
                           op->getNdbError().code));
 
433
      DBUG_RETURN(-1);
 
434
    }
 
435
    cnt = 0;
 
436
    while ((ret = op->nextResult(true, forceSend)) == 0) {
 
437
      DBUG_PRINT("info", ("frag rows=%u in=%u before=%u after=%u [error=%d]",
 
438
                          out[0], out[1], out[2], out[3],
 
439
                          (int)(out[1] + out[2] + out[3]) - (int)out[0]));
 
440
      unsigned i;
 
441
      for (i = 0; i < 4; i++)
 
442
        tot[i] += (float)out[i];
 
443
      cnt++;
 
444
    }
 
445
    if (ret == -1) {
 
446
      m_error = op->getNdbError();
 
447
      DBUG_PRINT("error", ("trans:%d op:%d", trans->getNdbError().code,
 
448
                           op->getNdbError().code));
 
449
      DBUG_RETURN(-1);
 
450
    }
 
451
    op->close(forceSend);
 
452
    rows = (Uint64)tot[1];
 
453
    if (cnt != 0 && ! (flags & RR_NoUpdate)) {
 
454
      float pct[2];
 
455
      pct[0] = 100 * tot[2] / tot[0];
 
456
      pct[1] = 100 * tot[3] / tot[0];
 
457
      DBUG_PRINT("info", ("update stat pct"
 
458
                          " before=%.2f after=%.2f",
 
459
                          pct[0], pct[1]));
 
460
      stat_update(key1, keylen1, key2, keylen2, pct);
 
461
    }
 
462
  } else {
 
463
      float pct[2];
 
464
      stat_select(key1, keylen1, key2, keylen2, pct);
 
465
      float diff = 100.0 - (pct[0] + pct[1]);
 
466
      float trows = (float)table_rows;
 
467
      DBUG_PRINT("info", ("select stat pct"
 
468
                          " before=%.2f after=%.2f in=%.2f table_rows=%.2f",
 
469
                          pct[0], pct[1], diff, trows));
 
470
      rows = 0;
 
471
      if (diff >= 0)
 
472
        rows = (Uint64)(diff * trows / 100);
 
473
      if (rows == 0)
 
474
        rows = 1;
 
475
  }
 
476
 
 
477
  *count = rows;
 
478
  DBUG_PRINT("value", ("rows=%llu flags=%o", rows, flags));
 
479
  DBUG_RETURN(0);
 
480
}
 
481
 
 
482
void
 
483
NdbIndexStat::set_error(int code)
 
484
{
 
485
  m_error.code = code;
 
486
}
 
487
 
 
488
const NdbError&
 
489
NdbIndexStat::getNdbError() const
 
490
{
 
491
  return m_error;
 
492
}