~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/ndbapi/NdbPoolImpl.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include "NdbPoolImpl.hpp"
 
17
 
 
18
NdbMutex *NdbPool::pool_mutex = NULL;
 
19
NdbPool *the_pool = NULL;
 
20
 
 
21
NdbPool*
 
22
NdbPool::create_instance(Ndb_cluster_connection* cc,
 
23
                         Uint32 max_ndb_obj,
 
24
                         Uint32 no_conn_obj,
 
25
                         Uint32 init_no_ndb_objects)
 
26
{
 
27
  if (!initPoolMutex()) {
 
28
    return NULL;
 
29
  }
 
30
  NdbMutex_Lock(pool_mutex);
 
31
  NdbPool* a_pool;
 
32
  if (the_pool != NULL) {
 
33
    a_pool = NULL;
 
34
  } else {
 
35
    the_pool = new NdbPool(cc, max_ndb_obj, no_conn_obj);
 
36
    if (!the_pool->init(init_no_ndb_objects)) {
 
37
      delete the_pool;
 
38
      the_pool = NULL;
 
39
    }
 
40
    a_pool = the_pool;
 
41
  }
 
42
  NdbMutex* temp = pool_mutex;
 
43
  if (a_pool == NULL) {
 
44
    pool_mutex = NULL;
 
45
  }
 
46
  NdbMutex_Unlock(pool_mutex);
 
47
  if (a_pool == NULL) {
 
48
    NdbMutex_Destroy(temp);
 
49
  }
 
50
  return a_pool;
 
51
}
 
52
 
 
53
void
 
54
NdbPool::drop_instance()
 
55
{
 
56
  if (pool_mutex == NULL) {
 
57
    return;
 
58
  }
 
59
  NdbMutex_Lock(pool_mutex);
 
60
  the_pool->release_all();
 
61
  delete the_pool;
 
62
  the_pool = NULL;
 
63
  NdbMutex* temp = pool_mutex;
 
64
  NdbMutex_Unlock(temp);
 
65
  NdbMutex_Destroy(temp);
 
66
}
 
67
 
 
68
bool
 
69
NdbPool::initPoolMutex()
 
70
{
 
71
  bool ret_result = false;
 
72
  if (pool_mutex == NULL) {
 
73
    pool_mutex = NdbMutex_Create();
 
74
    ret_result = ((pool_mutex == NULL) ? false : true);
 
75
  }
 
76
  return ret_result;
 
77
}
 
78
 
 
79
NdbPool::NdbPool(Ndb_cluster_connection* cc,
 
80
                 Uint32 max_no_objects,
 
81
                 Uint32 no_conn_objects)
 
82
{
 
83
  if (no_conn_objects > 1024) {
 
84
    no_conn_objects = 1024;
 
85
  }
 
86
  if (max_no_objects > MAX_NDB_OBJECTS) {
 
87
    max_no_objects = MAX_NDB_OBJECTS;
 
88
  } else if (max_no_objects == 0) {
 
89
    max_no_objects = 1;
 
90
  }
 
91
  m_max_ndb_objects = max_no_objects;
 
92
  m_no_of_conn_objects = no_conn_objects;
 
93
  m_no_of_objects = 0;
 
94
  m_waiting = 0;
 
95
  m_pool_reference = NULL;
 
96
  m_hash_entry = NULL;
 
97
  m_first_free = NULL_POOL;
 
98
  m_first_not_in_use = NULL_POOL;
 
99
  m_last_free = NULL_POOL;
 
100
  input_pool_cond = NULL;
 
101
  output_pool_cond = NULL;
 
102
  m_output_queue = 0;
 
103
  m_input_queue = 0;
 
104
  m_signal_count = 0;
 
105
  m_cluster_connection = cc;
 
106
}
 
107
 
 
108
NdbPool::~NdbPool()
 
109
{
 
110
  NdbCondition_Destroy(input_pool_cond);
 
111
  NdbCondition_Destroy(output_pool_cond);
 
112
}
 
113
 
 
114
void
 
115
NdbPool::release_all()
 
116
{
 
117
  int i;
 
118
  for (i = 0; i < m_max_ndb_objects + 1; i++) {
 
119
    if (m_pool_reference[i].ndb_reference != NULL) {
 
120
      assert(m_pool_reference[i].in_use);
 
121
      assert(m_pool_reference[i].free_entry);
 
122
      delete m_pool_reference[i].ndb_reference;
 
123
    }
 
124
  }
 
125
  delete [] m_pool_reference;
 
126
  delete [] m_hash_entry;
 
127
  m_pool_reference = NULL;
 
128
  m_hash_entry = NULL;
 
129
}
 
130
 
 
131
bool
 
132
NdbPool::init(Uint32 init_no_objects)
 
133
{
 
134
  bool ret_result = false;
 
135
  int i;
 
136
  do {
 
137
    input_pool_cond = NdbCondition_Create();
 
138
    output_pool_cond = NdbCondition_Create();
 
139
    if (input_pool_cond == NULL || output_pool_cond == NULL) {
 
140
      break;
 
141
    }
 
142
    if (init_no_objects > m_max_ndb_objects) {
 
143
      init_no_objects = m_max_ndb_objects;
 
144
    }
 
145
    if (init_no_objects == 0) {
 
146
      init_no_objects = 1;
 
147
    }
 
148
    m_pool_reference = new NdbPool::POOL_STRUCT[m_max_ndb_objects + 1];
 
149
    m_hash_entry     = new Uint8[POOL_HASH_TABLE_SIZE];
 
150
 
 
151
    if ((m_pool_reference == NULL) || (m_hash_entry == NULL)) {
 
152
      delete [] m_pool_reference;
 
153
      delete [] m_hash_entry;
 
154
      break;
 
155
    }
 
156
    for (i = 0; i < m_max_ndb_objects + 1; i++) {
 
157
      m_pool_reference[i].ndb_reference = NULL;
 
158
      m_pool_reference[i].in_use = false;
 
159
      m_pool_reference[i].next_free_object = i+1;
 
160
      m_pool_reference[i].prev_free_object = i-1;
 
161
      m_pool_reference[i].next_db_object = NULL_POOL;
 
162
      m_pool_reference[i].prev_db_object = NULL_POOL;
 
163
    }
 
164
    for (i = 0; i < POOL_HASH_TABLE_SIZE; i++) {
 
165
      m_hash_entry[i] = NULL_HASH;
 
166
    }
 
167
    m_pool_reference[m_max_ndb_objects].next_free_object = NULL_POOL;
 
168
    m_pool_reference[1].prev_free_object = NULL_POOL;
 
169
    m_first_not_in_use = 1;
 
170
    m_no_of_objects = init_no_objects;
 
171
    for (i = init_no_objects; i > 0 ; i--) {
 
172
      Uint32 fake_id;
 
173
      if (!allocate_ndb(fake_id, (const char*)NULL, (const char*)NULL)) {
 
174
        release_all();
 
175
        break;
 
176
      }
 
177
    }
 
178
    ret_result = true;
 
179
    break;
 
180
  } while (1);
 
181
  return ret_result;
 
182
}
 
183
 
 
184
/*
 
185
Get an Ndb object.
 
186
Input:
 
187
hint_id: 0 = no hint, otherwise a hint of which Ndb object the thread
 
188
         used the last time.
 
189
a_db_name: NULL = don't check for database specific Ndb  object, otherwise
 
190
           a hint of which database is preferred.
 
191
Output:
 
192
hint_id: Returns id of Ndb object returned
 
193
Return value: Ndb object pointer
 
194
*/
 
195
Ndb*
 
196
NdbPool::get_ndb_object(Uint32 &hint_id,
 
197
                        const char* a_catalog_name,
 
198
                        const char* a_schema_name)
 
199
{
 
200
  Ndb* ret_ndb = NULL;
 
201
  Uint32 hash_entry = compute_hash(a_schema_name);
 
202
  NdbMutex_Lock(pool_mutex);
 
203
  while (1) {
 
204
    /*
 
205
    We start by checking if we can use the hinted Ndb object.
 
206
    */
 
207
    if ((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL) {
 
208
      break;
 
209
    }
 
210
    /*
 
211
    The hinted Ndb object was not free. We need to allocate another object.
 
212
    We start by checking for a free Ndb object connected to the same database.
 
213
    */
 
214
    if (a_schema_name && (ret_ndb = get_db_hash(hint_id,
 
215
                                                hash_entry,
 
216
                                                a_catalog_name,
 
217
                                                a_schema_name))) {
 
218
      break;
 
219
    }
 
220
    /*
 
221
    No Ndb object connected to the preferred database was found.
 
222
    We look for a free Ndb object in general.
 
223
    */
 
224
    if ((ret_ndb = get_free_list(hint_id, hash_entry)) != NULL) {
 
225
      break;
 
226
    }
 
227
    /*
 
228
    No free Ndb object was found. If we haven't allocated objects up until the
 
229
    maximum number yet then we can allocate a new Ndb object here.
 
230
    */
 
231
    if (m_no_of_objects < m_max_ndb_objects) {
 
232
      if (allocate_ndb(hint_id, a_catalog_name, a_schema_name)) {
 
233
        assert((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL);
 
234
        break;
 
235
      }
 
236
    }
 
237
    /*
 
238
    We need to wait until an Ndb object becomes
 
239
    available.
 
240
    */
 
241
    if ((ret_ndb = wait_free_ndb(hint_id)) != NULL) {
 
242
      break;
 
243
    }
 
244
    /*
 
245
    Not even after waiting were we able to get hold of an Ndb object. We 
 
246
    return NULL to indicate this problem.
 
247
    */
 
248
    ret_ndb = NULL;
 
249
    break;
 
250
  }
 
251
  NdbMutex_Unlock(pool_mutex);
 
252
  if (ret_ndb != NULL) {
 
253
    /*
 
254
    We need to set the catalog and schema name of the Ndb object before
 
255
    returning it to the caller.
 
256
    */
 
257
    ret_ndb->setCatalogName(a_catalog_name);
 
258
    ret_ndb->setSchemaName(a_schema_name);
 
259
  }
 
260
  return ret_ndb;
 
261
}
 
262
 
 
263
void
 
264
NdbPool::return_ndb_object(Ndb* returned_ndb, Uint32 id)
 
265
{
 
266
  NdbMutex_Lock(pool_mutex);
 
267
  assert(id <= m_max_ndb_objects);
 
268
  assert(id != 0);
 
269
  assert(returned_ndb == m_pool_reference[id].ndb_reference);
 
270
  bool wait_cond = m_waiting;
 
271
  if (wait_cond) {
 
272
    NdbCondition* pool_cond;
 
273
    if (m_signal_count > 0) {
 
274
      pool_cond = output_pool_cond;
 
275
      m_signal_count--;
 
276
    } else {
 
277
      pool_cond = input_pool_cond;
 
278
    }
 
279
    add_wait_list(id);
 
280
    NdbMutex_Unlock(pool_mutex);
 
281
    NdbCondition_Signal(pool_cond);
 
282
  } else {
 
283
    add_free_list(id);
 
284
    add_db_hash(id);
 
285
    NdbMutex_Unlock(pool_mutex);
 
286
  }
 
287
}
 
288
 
 
289
bool
 
290
NdbPool::allocate_ndb(Uint32 &id,
 
291
                      const char* a_catalog_name,
 
292
                      const char* a_schema_name)
 
293
{
 
294
  Ndb* a_ndb;
 
295
  if (m_first_not_in_use == NULL_POOL) {
 
296
    return false;
 
297
  }
 
298
  if (a_schema_name) {
 
299
    a_ndb = new Ndb(m_cluster_connection, a_schema_name, a_catalog_name);
 
300
  } else {
 
301
    a_ndb = new Ndb(m_cluster_connection, "");
 
302
  }
 
303
  if (a_ndb == NULL) {
 
304
    return false;
 
305
  }
 
306
  a_ndb->init(m_no_of_conn_objects);
 
307
  m_no_of_objects++;
 
308
 
 
309
  id = m_first_not_in_use;
 
310
  Uint32 allocated_id = m_first_not_in_use;
 
311
  m_first_not_in_use = m_pool_reference[allocated_id].next_free_object;
 
312
 
 
313
  m_pool_reference[allocated_id].ndb_reference = a_ndb;
 
314
  m_pool_reference[allocated_id].in_use = true;
 
315
  m_pool_reference[allocated_id].free_entry = false;
 
316
 
 
317
  add_free_list(allocated_id);
 
318
  add_db_hash(allocated_id);
 
319
  return true;
 
320
}
 
321
 
 
322
void
 
323
NdbPool::add_free_list(Uint32 id)
 
324
{
 
325
  assert(!m_pool_reference[id].free_entry);
 
326
  assert(m_pool_reference[id].in_use);
 
327
  m_pool_reference[id].free_entry = true;
 
328
  m_pool_reference[id].next_free_object = m_first_free;
 
329
  m_pool_reference[id].prev_free_object = (Uint8)NULL_POOL;
 
330
  m_first_free = (Uint8)id;
 
331
  if (m_last_free == (Uint8)NULL_POOL) {
 
332
    m_last_free = (Uint8)id;
 
333
  }
 
334
}
 
335
 
 
336
void
 
337
NdbPool::add_db_hash(Uint32 id)
 
338
{
 
339
  Ndb* t_ndb = m_pool_reference[id].ndb_reference;
 
340
  const char* schema_name = t_ndb->getSchemaName();
 
341
  Uint32 hash_entry = compute_hash(schema_name);
 
342
  Uint8 next_db_entry = m_hash_entry[hash_entry];
 
343
  m_pool_reference[id].next_db_object = next_db_entry;
 
344
  m_pool_reference[id].prev_db_object = (Uint8)NULL_HASH;
 
345
  m_hash_entry[hash_entry] = (Uint8)id;
 
346
}
 
347
 
 
348
Ndb*
 
349
NdbPool::get_free_list(Uint32 &id, Uint32 hash_entry)
 
350
{
 
351
  if (m_first_free == NULL_POOL) {
 
352
    return NULL;
 
353
  }
 
354
  id = m_first_free;
 
355
  Ndb* ret_ndb = get_hint_ndb(m_first_free, hash_entry);
 
356
  assert(ret_ndb != NULL);
 
357
  return ret_ndb;
 
358
}
 
359
 
 
360
Ndb*
 
361
NdbPool::get_db_hash(Uint32 &id,
 
362
                     Uint32 hash_entry,
 
363
                     const char *a_catalog_name,
 
364
                     const char *a_schema_name)
 
365
{
 
366
  Uint32 entry_id = m_hash_entry[hash_entry];
 
367
  bool found = false;
 
368
  while (entry_id != NULL_HASH) {
 
369
    Ndb* t_ndb = m_pool_reference[entry_id].ndb_reference;
 
370
    const char *a_ndb_catalog_name = t_ndb->getCatalogName();
 
371
    if (strcmp(a_catalog_name, a_ndb_catalog_name) == 0) {
 
372
      const char *a_ndb_schema_name = t_ndb->getSchemaName();
 
373
      if (strcmp(a_schema_name, a_ndb_schema_name) == 0) {
 
374
        found = true;
 
375
        break;
 
376
      }
 
377
    }
 
378
    entry_id = m_pool_reference[entry_id].next_db_object;
 
379
  }
 
380
  if (found) {
 
381
    id = entry_id;
 
382
    Ndb* ret_ndb = get_hint_ndb(entry_id, hash_entry);
 
383
    assert(ret_ndb != NULL);
 
384
    return ret_ndb;
 
385
  }
 
386
  return NULL;
 
387
}
 
388
 
 
389
Ndb*
 
390
NdbPool::get_hint_ndb(Uint32 hint_id, Uint32 hash_entry)
 
391
{
 
392
  Ndb* ret_ndb = NULL;
 
393
  do {
 
394
    if ((hint_id != 0) &&
 
395
        (hint_id <= m_max_ndb_objects) &&
 
396
        (m_pool_reference[hint_id].in_use) &&
 
397
        (m_pool_reference[hint_id].free_entry)) {
 
398
      ret_ndb = m_pool_reference[hint_id].ndb_reference;
 
399
      if (ret_ndb != NULL) {
 
400
        break;
 
401
      } else {
 
402
        assert(false);
 
403
      }
 
404
    }
 
405
    return NULL;
 
406
  } while (1);
 
407
  /*
 
408
  This is where we remove the entry from the free list and from the db hash
 
409
  table.
 
410
  */
 
411
  remove_free_list(hint_id);
 
412
  remove_db_hash(hint_id, hash_entry);
 
413
  return ret_ndb;
 
414
}
 
415
 
 
416
void
 
417
NdbPool::remove_free_list(Uint32 id)
 
418
{
 
419
  Uint8 next_free_entry = m_pool_reference[id].next_free_object;
 
420
  Uint8 prev_free_entry = m_pool_reference[id].prev_free_object;
 
421
  if (prev_free_entry == (Uint8)NULL_POOL) {
 
422
    m_first_free = next_free_entry;
 
423
  } else {
 
424
    m_pool_reference[prev_free_entry].next_free_object = next_free_entry;
 
425
  }
 
426
  if (next_free_entry == (Uint8)NULL_POOL) {
 
427
    m_last_free = prev_free_entry;
 
428
  } else {
 
429
    m_pool_reference[next_free_entry].prev_free_object = prev_free_entry;
 
430
  }
 
431
  m_pool_reference[id].next_free_object = NULL_POOL;
 
432
  m_pool_reference[id].prev_free_object = NULL_POOL;
 
433
  m_pool_reference[id].free_entry = false;
 
434
}
 
435
 
 
436
void
 
437
NdbPool::remove_db_hash(Uint32 id, Uint32 hash_entry)
 
438
{
 
439
  Uint8 next_free_entry = m_pool_reference[id].next_db_object;
 
440
  Uint8 prev_free_entry = m_pool_reference[id].prev_db_object;
 
441
  if (prev_free_entry == (Uint8)NULL_HASH) {
 
442
    m_hash_entry[hash_entry] = next_free_entry;
 
443
  } else {
 
444
    m_pool_reference[prev_free_entry].next_db_object = next_free_entry;
 
445
  }
 
446
  if (next_free_entry == (Uint8)NULL_HASH) {
 
447
    ;
 
448
  } else {
 
449
    m_pool_reference[next_free_entry].prev_db_object = prev_free_entry;
 
450
  }
 
451
  m_pool_reference[id].next_db_object = NULL_HASH;
 
452
  m_pool_reference[id].prev_db_object = NULL_HASH;
 
453
}
 
454
 
 
455
Uint32
 
456
NdbPool::compute_hash(const char *a_schema_name)
 
457
{
 
458
  Uint32 len = strlen(a_schema_name);
 
459
  Uint32 h = 147;
 
460
  for (Uint32 i = 0; i < len; i++) {
 
461
    Uint32 c = a_schema_name[i];
 
462
    h = (h << 5) + h + c;
 
463
  }
 
464
  h &= (POOL_HASH_TABLE_SIZE - 1);
 
465
  return h;
 
466
}
 
467
 
 
468
Ndb*
 
469
NdbPool::wait_free_ndb(Uint32 &id)
 
470
{
 
471
  int res;
 
472
  int time_out = 3500;
 
473
  do {
 
474
    NdbCondition* tmp = input_pool_cond;
 
475
    m_waiting++;
 
476
    m_input_queue++;
 
477
    time_out -= 500;
 
478
    res = NdbCondition_WaitTimeout(input_pool_cond, pool_mutex, time_out);
 
479
    if (tmp == input_pool_cond) {
 
480
      m_input_queue--;
 
481
    } else {
 
482
      m_output_queue--;
 
483
      if (m_output_queue == 0) {
 
484
        switch_condition_queue();
 
485
      }
 
486
    }
 
487
    m_waiting--;
 
488
  } while (res == 0 && m_first_wait == NULL_POOL);
 
489
  if (res != 0 && m_first_wait == NULL_POOL) {
 
490
    return NULL;
 
491
  }
 
492
  id = m_first_wait;
 
493
  remove_wait_list();
 
494
  assert(m_waiting != 0 || m_first_wait == NULL_POOL);
 
495
  return m_pool_reference[id].ndb_reference;
 
496
}
 
497
 
 
498
void
 
499
NdbPool::remove_wait_list()
 
500
{
 
501
  Uint32 id = m_first_wait;
 
502
  m_first_wait = m_pool_reference[id].next_free_object;
 
503
  m_pool_reference[id].next_free_object = NULL_POOL;
 
504
  m_pool_reference[id].prev_free_object = NULL_POOL;
 
505
  m_pool_reference[id].free_entry = false;
 
506
}
 
507
 
 
508
void
 
509
NdbPool::add_wait_list(Uint32 id)
 
510
{
 
511
  m_pool_reference[id].next_free_object = m_first_wait;
 
512
  m_first_wait = id;
 
513
}
 
514
 
 
515
void
 
516
NdbPool::switch_condition_queue()
 
517
{
 
518
  m_signal_count = m_input_queue;
 
519
  Uint8 move_queue = m_input_queue;
 
520
  m_input_queue = m_output_queue;
 
521
  m_output_queue = move_queue;
 
522
 
 
523
  NdbCondition* move_cond = input_pool_cond;
 
524
  input_pool_cond = output_pool_cond;
 
525
  output_pool_cond = move_cond;
 
526
}
 
527