1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include "NdbPoolImpl.hpp"
18
NdbMutex *NdbPool::pool_mutex = NULL;
19
NdbPool *the_pool = NULL;
22
NdbPool::create_instance(Ndb_cluster_connection* cc,
25
Uint32 init_no_ndb_objects)
27
if (!initPoolMutex()) {
30
NdbMutex_Lock(pool_mutex);
32
if (the_pool != NULL) {
35
the_pool = new NdbPool(cc, max_ndb_obj, no_conn_obj);
36
if (!the_pool->init(init_no_ndb_objects)) {
42
NdbMutex* temp = pool_mutex;
46
NdbMutex_Unlock(pool_mutex);
48
NdbMutex_Destroy(temp);
54
NdbPool::drop_instance()
56
if (pool_mutex == NULL) {
59
NdbMutex_Lock(pool_mutex);
60
the_pool->release_all();
63
NdbMutex* temp = pool_mutex;
64
NdbMutex_Unlock(temp);
65
NdbMutex_Destroy(temp);
69
NdbPool::initPoolMutex()
71
bool ret_result = false;
72
if (pool_mutex == NULL) {
73
pool_mutex = NdbMutex_Create();
74
ret_result = ((pool_mutex == NULL) ? false : true);
79
NdbPool::NdbPool(Ndb_cluster_connection* cc,
80
Uint32 max_no_objects,
81
Uint32 no_conn_objects)
83
if (no_conn_objects > 1024) {
84
no_conn_objects = 1024;
86
if (max_no_objects > MAX_NDB_OBJECTS) {
87
max_no_objects = MAX_NDB_OBJECTS;
88
} else if (max_no_objects == 0) {
91
m_max_ndb_objects = max_no_objects;
92
m_no_of_conn_objects = no_conn_objects;
95
m_pool_reference = NULL;
97
m_first_free = NULL_POOL;
98
m_first_not_in_use = NULL_POOL;
99
m_last_free = NULL_POOL;
100
input_pool_cond = NULL;
101
output_pool_cond = NULL;
105
m_cluster_connection = cc;
110
NdbCondition_Destroy(input_pool_cond);
111
NdbCondition_Destroy(output_pool_cond);
115
NdbPool::release_all()
118
for (i = 0; i < m_max_ndb_objects + 1; i++) {
119
if (m_pool_reference[i].ndb_reference != NULL) {
120
assert(m_pool_reference[i].in_use);
121
assert(m_pool_reference[i].free_entry);
122
delete m_pool_reference[i].ndb_reference;
125
delete [] m_pool_reference;
126
delete [] m_hash_entry;
127
m_pool_reference = NULL;
132
NdbPool::init(Uint32 init_no_objects)
134
bool ret_result = false;
137
input_pool_cond = NdbCondition_Create();
138
output_pool_cond = NdbCondition_Create();
139
if (input_pool_cond == NULL || output_pool_cond == NULL) {
142
if (init_no_objects > m_max_ndb_objects) {
143
init_no_objects = m_max_ndb_objects;
145
if (init_no_objects == 0) {
148
m_pool_reference = new NdbPool::POOL_STRUCT[m_max_ndb_objects + 1];
149
m_hash_entry = new Uint8[POOL_HASH_TABLE_SIZE];
151
if ((m_pool_reference == NULL) || (m_hash_entry == NULL)) {
152
delete [] m_pool_reference;
153
delete [] m_hash_entry;
156
for (i = 0; i < m_max_ndb_objects + 1; i++) {
157
m_pool_reference[i].ndb_reference = NULL;
158
m_pool_reference[i].in_use = false;
159
m_pool_reference[i].next_free_object = i+1;
160
m_pool_reference[i].prev_free_object = i-1;
161
m_pool_reference[i].next_db_object = NULL_POOL;
162
m_pool_reference[i].prev_db_object = NULL_POOL;
164
for (i = 0; i < POOL_HASH_TABLE_SIZE; i++) {
165
m_hash_entry[i] = NULL_HASH;
167
m_pool_reference[m_max_ndb_objects].next_free_object = NULL_POOL;
168
m_pool_reference[1].prev_free_object = NULL_POOL;
169
m_first_not_in_use = 1;
170
m_no_of_objects = init_no_objects;
171
for (i = init_no_objects; i > 0 ; i--) {
173
if (!allocate_ndb(fake_id, (const char*)NULL, (const char*)NULL)) {
187
hint_id: 0 = no hint, otherwise a hint of which Ndb object the thread
189
a_db_name: NULL = don't check for database specific Ndb object, otherwise
190
a hint of which database is preferred.
192
hint_id: Returns id of Ndb object returned
193
Return value: Ndb object pointer
196
NdbPool::get_ndb_object(Uint32 &hint_id,
197
const char* a_catalog_name,
198
const char* a_schema_name)
201
Uint32 hash_entry = compute_hash(a_schema_name);
202
NdbMutex_Lock(pool_mutex);
205
We start by checking if we can use the hinted Ndb object.
207
if ((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL) {
211
The hinted Ndb object was not free. We need to allocate another object.
212
We start by checking for a free Ndb object connected to the same database.
214
if (a_schema_name && (ret_ndb = get_db_hash(hint_id,
221
No Ndb object connected to the preferred database was found.
222
We look for a free Ndb object in general.
224
if ((ret_ndb = get_free_list(hint_id, hash_entry)) != NULL) {
228
No free Ndb object was found. If we haven't allocated objects up until the
229
maximum number yet then we can allocate a new Ndb object here.
231
if (m_no_of_objects < m_max_ndb_objects) {
232
if (allocate_ndb(hint_id, a_catalog_name, a_schema_name)) {
233
assert((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL);
238
We need to wait until an Ndb object becomes
241
if ((ret_ndb = wait_free_ndb(hint_id)) != NULL) {
245
Not even after waiting were we able to get hold of an Ndb object. We
246
return NULL to indicate this problem.
251
NdbMutex_Unlock(pool_mutex);
252
if (ret_ndb != NULL) {
254
We need to set the catalog and schema name of the Ndb object before
255
returning it to the caller.
257
ret_ndb->setCatalogName(a_catalog_name);
258
ret_ndb->setSchemaName(a_schema_name);
264
NdbPool::return_ndb_object(Ndb* returned_ndb, Uint32 id)
266
NdbMutex_Lock(pool_mutex);
267
assert(id <= m_max_ndb_objects);
269
assert(returned_ndb == m_pool_reference[id].ndb_reference);
270
bool wait_cond = m_waiting;
272
NdbCondition* pool_cond;
273
if (m_signal_count > 0) {
274
pool_cond = output_pool_cond;
277
pool_cond = input_pool_cond;
280
NdbMutex_Unlock(pool_mutex);
281
NdbCondition_Signal(pool_cond);
285
NdbMutex_Unlock(pool_mutex);
290
NdbPool::allocate_ndb(Uint32 &id,
291
const char* a_catalog_name,
292
const char* a_schema_name)
295
if (m_first_not_in_use == NULL_POOL) {
299
a_ndb = new Ndb(m_cluster_connection, a_schema_name, a_catalog_name);
301
a_ndb = new Ndb(m_cluster_connection, "");
306
a_ndb->init(m_no_of_conn_objects);
309
id = m_first_not_in_use;
310
Uint32 allocated_id = m_first_not_in_use;
311
m_first_not_in_use = m_pool_reference[allocated_id].next_free_object;
313
m_pool_reference[allocated_id].ndb_reference = a_ndb;
314
m_pool_reference[allocated_id].in_use = true;
315
m_pool_reference[allocated_id].free_entry = false;
317
add_free_list(allocated_id);
318
add_db_hash(allocated_id);
323
NdbPool::add_free_list(Uint32 id)
325
assert(!m_pool_reference[id].free_entry);
326
assert(m_pool_reference[id].in_use);
327
m_pool_reference[id].free_entry = true;
328
m_pool_reference[id].next_free_object = m_first_free;
329
m_pool_reference[id].prev_free_object = (Uint8)NULL_POOL;
330
m_first_free = (Uint8)id;
331
if (m_last_free == (Uint8)NULL_POOL) {
332
m_last_free = (Uint8)id;
337
NdbPool::add_db_hash(Uint32 id)
339
Ndb* t_ndb = m_pool_reference[id].ndb_reference;
340
const char* schema_name = t_ndb->getSchemaName();
341
Uint32 hash_entry = compute_hash(schema_name);
342
Uint8 next_db_entry = m_hash_entry[hash_entry];
343
m_pool_reference[id].next_db_object = next_db_entry;
344
m_pool_reference[id].prev_db_object = (Uint8)NULL_HASH;
345
m_hash_entry[hash_entry] = (Uint8)id;
349
NdbPool::get_free_list(Uint32 &id, Uint32 hash_entry)
351
if (m_first_free == NULL_POOL) {
355
Ndb* ret_ndb = get_hint_ndb(m_first_free, hash_entry);
356
assert(ret_ndb != NULL);
361
NdbPool::get_db_hash(Uint32 &id,
363
const char *a_catalog_name,
364
const char *a_schema_name)
366
Uint32 entry_id = m_hash_entry[hash_entry];
368
while (entry_id != NULL_HASH) {
369
Ndb* t_ndb = m_pool_reference[entry_id].ndb_reference;
370
const char *a_ndb_catalog_name = t_ndb->getCatalogName();
371
if (strcmp(a_catalog_name, a_ndb_catalog_name) == 0) {
372
const char *a_ndb_schema_name = t_ndb->getSchemaName();
373
if (strcmp(a_schema_name, a_ndb_schema_name) == 0) {
378
entry_id = m_pool_reference[entry_id].next_db_object;
382
Ndb* ret_ndb = get_hint_ndb(entry_id, hash_entry);
383
assert(ret_ndb != NULL);
390
NdbPool::get_hint_ndb(Uint32 hint_id, Uint32 hash_entry)
394
if ((hint_id != 0) &&
395
(hint_id <= m_max_ndb_objects) &&
396
(m_pool_reference[hint_id].in_use) &&
397
(m_pool_reference[hint_id].free_entry)) {
398
ret_ndb = m_pool_reference[hint_id].ndb_reference;
399
if (ret_ndb != NULL) {
408
This is where we remove the entry from the free list and from the db hash
411
remove_free_list(hint_id);
412
remove_db_hash(hint_id, hash_entry);
417
NdbPool::remove_free_list(Uint32 id)
419
Uint8 next_free_entry = m_pool_reference[id].next_free_object;
420
Uint8 prev_free_entry = m_pool_reference[id].prev_free_object;
421
if (prev_free_entry == (Uint8)NULL_POOL) {
422
m_first_free = next_free_entry;
424
m_pool_reference[prev_free_entry].next_free_object = next_free_entry;
426
if (next_free_entry == (Uint8)NULL_POOL) {
427
m_last_free = prev_free_entry;
429
m_pool_reference[next_free_entry].prev_free_object = prev_free_entry;
431
m_pool_reference[id].next_free_object = NULL_POOL;
432
m_pool_reference[id].prev_free_object = NULL_POOL;
433
m_pool_reference[id].free_entry = false;
437
NdbPool::remove_db_hash(Uint32 id, Uint32 hash_entry)
439
Uint8 next_free_entry = m_pool_reference[id].next_db_object;
440
Uint8 prev_free_entry = m_pool_reference[id].prev_db_object;
441
if (prev_free_entry == (Uint8)NULL_HASH) {
442
m_hash_entry[hash_entry] = next_free_entry;
444
m_pool_reference[prev_free_entry].next_db_object = next_free_entry;
446
if (next_free_entry == (Uint8)NULL_HASH) {
449
m_pool_reference[next_free_entry].prev_db_object = prev_free_entry;
451
m_pool_reference[id].next_db_object = NULL_HASH;
452
m_pool_reference[id].prev_db_object = NULL_HASH;
456
NdbPool::compute_hash(const char *a_schema_name)
458
Uint32 len = strlen(a_schema_name);
460
for (Uint32 i = 0; i < len; i++) {
461
Uint32 c = a_schema_name[i];
462
h = (h << 5) + h + c;
464
h &= (POOL_HASH_TABLE_SIZE - 1);
469
NdbPool::wait_free_ndb(Uint32 &id)
474
NdbCondition* tmp = input_pool_cond;
478
res = NdbCondition_WaitTimeout(input_pool_cond, pool_mutex, time_out);
479
if (tmp == input_pool_cond) {
483
if (m_output_queue == 0) {
484
switch_condition_queue();
488
} while (res == 0 && m_first_wait == NULL_POOL);
489
if (res != 0 && m_first_wait == NULL_POOL) {
494
assert(m_waiting != 0 || m_first_wait == NULL_POOL);
495
return m_pool_reference[id].ndb_reference;
499
NdbPool::remove_wait_list()
501
Uint32 id = m_first_wait;
502
m_first_wait = m_pool_reference[id].next_free_object;
503
m_pool_reference[id].next_free_object = NULL_POOL;
504
m_pool_reference[id].prev_free_object = NULL_POOL;
505
m_pool_reference[id].free_entry = false;
509
NdbPool::add_wait_list(Uint32 id)
511
m_pool_reference[id].next_free_object = m_first_wait;
516
NdbPool::switch_condition_queue()
518
m_signal_count = m_input_queue;
519
Uint8 move_queue = m_input_queue;
520
m_input_queue = m_output_queue;
521
m_output_queue = move_queue;
523
NdbCondition* move_cond = input_pool_cond;
524
input_pool_cond = output_pool_cond;
525
output_pool_cond = move_cond;