~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/database_xt.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-10-02 14:17:48 UTC
  • mfrom: (1.1.1 upstream)
  • mto: (2.1.17 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20101002141748-m6vbfbfjhrw1153e
Tags: 2010.09.1802-1
* New upstream release.
* Removed pid-file argument hack.
* Updated GPL-2 address to be new address.
* Directly copy in drizzledump.1 since debian doesn't have sphinx 1.0 yet.
* Link to jquery from libjs-jquery. Add it as a depend.
* Add drizzled.8 symlink to the install files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2005 PrimeBase Technologies GmbH
 
2
 *
 
3
 * PrimeBase XT
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * 2005-01-15   Paul McCullagh
 
20
 *
 
21
 * H&G2JCtL
 
22
 */
 
23
 
 
24
#include "xt_config.h"
 
25
 
 
26
#ifdef DRIZZLED
 
27
#include <bitset>
 
28
#endif
 
29
 
 
30
#include <string.h>
 
31
#include <stdio.h>
 
32
#include <signal.h>
 
33
 
 
34
#include "pthread_xt.h"
 
35
#include "hashtab_xt.h"
 
36
#include "filesys_xt.h"
 
37
#include "database_xt.h"
 
38
#include "memory_xt.h"
 
39
#include "heap_xt.h"
 
40
#include "datalog_xt.h"
 
41
#include "strutil_xt.h"
 
42
#include "util_xt.h"
 
43
#include "trace_xt.h"
 
44
#include "myxt_xt.h"
 
45
 
 
46
#ifdef DEBUG
 
47
//#define XT_TEST_XACT_OVERFLOW
 
48
#endif
 
49
 
 
50
#ifndef NAME_MAX
 
51
#define NAME_MAX 128
 
52
#endif
 
53
 
 
54
/*
 
55
 * -----------------------------------------------------------------------
 
56
 * GLOBALS
 
57
 */
 
58
 
 
59
xtPublic XTDatabaseHPtr         pbxt_database = NULL;           // The global open database
 
60
 
 
61
xtPublic xtLogOffset            xt_db_log_file_threshold;
 
62
xtPublic size_t                         xt_db_log_buffer_size;
 
63
xtPublic size_t                         xt_db_transaction_buffer_size;
 
64
xtPublic size_t                         xt_db_checkpoint_frequency;
 
65
xtPublic off_t                          xt_db_data_log_threshold;
 
66
xtPublic size_t                         xt_db_data_file_grow_size;
 
67
xtPublic size_t                         xt_db_row_file_grow_size;
 
68
xtPublic size_t                         xt_db_record_write_threshold;
 
69
xtPublic int                            xt_db_garbage_threshold;
 
70
xtPublic int                            xt_db_log_file_count;
 
71
xtPublic int                            xt_db_auto_increment_mode;              /* 0 = MySQL compatible, 1 = PrimeBase Compatible. */
 
72
xtPublic int                            xt_db_offline_log_function;             /* 0 = recycle logs, 1 = delete logs, 2 = keep logs */
 
73
xtPublic int                            xt_db_sweeper_priority;                 /* 0 = low (default), 1 = normal, 2 = high */
 
74
/* Buggy at the moment.
 
75
 * For example, a large alter table hangs.
 
76
 */
 
77
xtPublic int                            xt_db_rewrite_flushing = 0;             /* 0 = Normal fsync, 1 = Re-write flushing. */
 
78
xtPublic int                            xt_db_index_dirty_threshold;
 
79
xtPublic int                            xt_db_flush_log_at_trx_commit;  /* 0 = no-write/no-flush, 1 = yes, 2 = write/no-flush */
 
80
 
 
81
xtPublic XTSortedListPtr        xt_db_open_db_by_id = NULL;
 
82
xtPublic XTHashTabPtr           xt_db_open_databases = NULL;
 
83
xtPublic time_t                         xt_db_approximate_time = 0;             /* A "fast" alternative timer (not too accurate). */
 
84
 
 
85
static xtDatabaseID                             db_next_id = 1;
 
86
static volatile XTOpenFilePtr   db_lock_file = NULL;
 
87
 
 
88
/*
 
89
 * -----------------------------------------------------------------------
 
90
 * LOCK/UNLOCK INSTALLATION
 
91
 */
 
92
 
 
93
xtPublic void xt_lock_installation(XTThreadPtr self, const char *installation_path)
 
94
{
 
95
        char                    file_path[PATH_MAX];
 
96
        char                    buffer[101];
 
97
        size_t                  red_size;
 
98
        llong                   pid;
 
99
        xtBool                  cd = pbxt_crash_debug;
 
100
 
 
101
        xt_strcpy(PATH_MAX, file_path, installation_path);
 
102
        xt_add_pbxt_file(PATH_MAX, file_path, "no-debug");
 
103
        if (xt_fs_exists(file_path))
 
104
                pbxt_crash_debug = FALSE;
 
105
        xt_strcpy(PATH_MAX, file_path, installation_path);
 
106
        xt_add_pbxt_file(PATH_MAX, file_path, "crash-debug");
 
107
        if (xt_fs_exists(file_path))
 
108
                pbxt_crash_debug = TRUE;
 
109
 
 
110
        if (pbxt_crash_debug != cd) {
 
111
                if (pbxt_crash_debug)
 
112
                        xt_logf(XT_NT_WARNING, "Crash debugging has been turned on ('crash-debug' file exists)\n");
 
113
                else
 
114
                        xt_logf(XT_NT_WARNING, "Crash debugging has been turned off ('no-debug' file exists)\n");
 
115
        }
 
116
        else if (pbxt_crash_debug)
 
117
                xt_logf(XT_NT_WARNING, "Crash debugging is enabled\n");
 
118
 
 
119
        /* Moved the lock file out of the pbxt directory so that
 
120
         * it is possible to drop the pbxt database!
 
121
         */
 
122
        xt_strcpy(PATH_MAX, file_path, installation_path);
 
123
        xt_add_dir_char(PATH_MAX, file_path);
 
124
        xt_strcat(PATH_MAX, file_path, "pbxt-lock");
 
125
        db_lock_file = xt_open_file(self, file_path, XT_FT_STANDARD, XT_FS_CREATE | XT_FS_MAKE_PATH, 0);
 
126
 
 
127
        try_(a) {
 
128
                if (!xt_lock_file(self, db_lock_file)) {
 
129
                        xt_logf(XT_NT_ERROR, "A server appears to already be running\n");
 
130
                        xt_logf(XT_NT_ERROR, "The file: %s, is locked\n", file_path);
 
131
                        xt_throw_xterr(XT_CONTEXT, XT_ERR_SERVER_RUNNING);
 
132
                }
 
133
                if (!xt_pread_file(db_lock_file, 0, 100, 0, buffer, &red_size, &self->st_statistics.st_rec, self))
 
134
                        xt_throw(self);
 
135
                if (red_size > 0) {
 
136
                        buffer[red_size] = 0;
 
137
#ifdef XT_WIN
 
138
                        pid = (llong) _atoi64(buffer);
 
139
#else
 
140
                        pid = atoll(buffer);
 
141
#endif
 
142
                        /* Problem with this code is, after a restart
 
143
                         * the process ID's are reused.
 
144
                         * If some system process grabs the proc id that
 
145
                         * the server had on the last run, then
 
146
                         * the database will not start.
 
147
                        if (xt_process_exists((xtProcID) pid)) {
 
148
                                xt_logf(XT_NT_ERROR, "A server appears to already be running, process ID: %lld\n", pid);
 
149
                                xt_logf(XT_NT_ERROR, "Remove the file: %s, if this is not the case\n", file_path);
 
150
                                xt_throw_xterr(XT_CONTEXT, XT_ERR_SERVER_RUNNING);
 
151
                        }
 
152
                        */
 
153
                        xt_logf(XT_NT_INFO, "The server was not shutdown correctly, recovery required\n");
 
154
#ifdef XT_BACKUP_BEFORE_RECOVERY
 
155
                        if (pbxt_crash_debug) {
 
156
                                /* The server was not shut down correctly. Make a backup before
 
157
                                 * we start recovery.
 
158
                                 */
 
159
                                char extension[100];
 
160
 
 
161
                                for (int i=1;;i++) {
 
162
                                        xt_strcpy(PATH_MAX, file_path, installation_path);
 
163
                                        xt_remove_dir_char(file_path);
 
164
                                        sprintf(extension, "-recovery-%d", i);
 
165
                                        xt_strcat(PATH_MAX, file_path, extension);
 
166
                                        if (!xt_fs_exists(file_path))
 
167
                                                break;
 
168
                                }
 
169
                                xt_logf(XT_NT_INFO, "In order to reproduce recovery errors a backup of the installation\n");
 
170
                                xt_logf(XT_NT_INFO, "will be made to:\n");
 
171
                                xt_logf(XT_NT_INFO, "%s\n", file_path);
 
172
                                xt_logf(XT_NT_INFO, "Copy in progress...\n");
 
173
                                xt_fs_copy_dir(self, installation_path, file_path);
 
174
                                xt_logf(XT_NT_INFO, "Copy OK\n");
 
175
                        }
 
176
#endif
 
177
                }
 
178
 
 
179
                sprintf(buffer, "%lld", (llong) xt_getpid());
 
180
                xt_set_eof_file(self, db_lock_file, 0);
 
181
                if (!xt_pwrite_file(db_lock_file, 0, strlen(buffer), buffer, &self->st_statistics.st_rec, self))
 
182
                        xt_throw(self);
 
183
        }
 
184
        catch_(a) {
 
185
                xt_close_file_ns(db_lock_file);
 
186
                db_lock_file = NULL;
 
187
                xt_throw(self);
 
188
        }
 
189
        cont_(a);
 
190
}
 
191
 
 
192
xtPublic void xt_unlock_installation(XTThreadPtr self, const char *installation_path)
 
193
{
 
194
        if (db_lock_file) {
 
195
                char lock_file[PATH_MAX];
 
196
 
 
197
                xt_unlock_file(NULL, db_lock_file);
 
198
                xt_close_file_ns(db_lock_file);
 
199
                db_lock_file = NULL;
 
200
 
 
201
                xt_strcpy(PATH_MAX, lock_file, installation_path);
 
202
                xt_add_dir_char(PATH_MAX, lock_file);
 
203
                xt_strcat(PATH_MAX, lock_file, "pbxt-lock");
 
204
                xt_fs_delete(self, lock_file);
 
205
        }
 
206
}
 
207
 
 
208
int *xt_bad_pointer = 0;
 
209
 
 
210
void xt_crash_me(void)
 
211
{
 
212
        if (pbxt_crash_debug)
 
213
                *xt_bad_pointer = 123;
 
214
}
 
215
 
 
216
/*
 
217
 * -----------------------------------------------------------------------
 
218
 * INIT/EXIT DATABASE
 
219
 */
 
220
 
 
221
static xtBool db_hash_comp(void *key, void *data)
 
222
{
 
223
        XTDatabaseHPtr  db = (XTDatabaseHPtr) data;
 
224
 
 
225
        return strcmp((char *) key, db->db_name) == 0;
 
226
}
 
227
 
 
228
static xtHashValue db_hash(xtBool is_key, void *key_data)
 
229
{
 
230
        XTDatabaseHPtr  db = (XTDatabaseHPtr) key_data;
 
231
 
 
232
        if (is_key)
 
233
                return xt_ht_hash((char *) key_data);
 
234
        return xt_ht_hash(db->db_name);
 
235
}
 
236
 
 
237
static xtBool db_hash_comp_ci(void *key, void *data)
 
238
{
 
239
        XTDatabaseHPtr  db = (XTDatabaseHPtr) data;
 
240
 
 
241
        return strcasecmp((char *) key, db->db_name) == 0;
 
242
}
 
243
 
 
244
static xtHashValue db_hash_ci(xtBool is_key, void *key_data)
 
245
{
 
246
        XTDatabaseHPtr  db = (XTDatabaseHPtr) key_data;
 
247
 
 
248
        if (is_key)
 
249
                return xt_ht_casehash((char *) key_data);
 
250
        return xt_ht_casehash(db->db_name);
 
251
}
 
252
 
 
253
static void db_hash_free(XTThreadPtr self, void *data)
 
254
{
 
255
        xt_heap_release(self, (XTDatabaseHPtr) data);
 
256
}
 
257
 
 
258
static int db_cmp_db_id(struct XTThread *XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
 
259
{
 
260
        xtDatabaseID    db_id = *((xtDatabaseID *) a);
 
261
        XTDatabaseHPtr  *db_ptr = (XTDatabaseHPtr *) b;
 
262
 
 
263
        if (db_id == (*db_ptr)->db_id)
 
264
                return 0;
 
265
        if (db_id < (*db_ptr)->db_id)
 
266
                return -1;
 
267
        return 1;
 
268
}
 
269
 
 
270
xtPublic void xt_init_databases(XTThreadPtr self)
 
271
{
 
272
        if (pbxt_ignore_case)
 
273
                xt_db_open_databases = xt_new_hashtable(self, db_hash_comp_ci, db_hash_ci, db_hash_free, TRUE, TRUE);
 
274
        else
 
275
                xt_db_open_databases = xt_new_hashtable(self, db_hash_comp, db_hash, db_hash_free, TRUE, TRUE);
 
276
        xt_db_open_db_by_id = xt_new_sortedlist(self, sizeof(XTDatabaseHPtr), 20, 10, db_cmp_db_id, NULL, NULL, FALSE, FALSE);
 
277
}
 
278
 
 
279
xtPublic void xt_stop_database_threads(XTThreadPtr self, xtBool sync)
 
280
{
 
281
        u_int                   len = 0;
 
282
        XTDatabaseHPtr  *dbptr;
 
283
        XTDatabaseHPtr  db = NULL;
 
284
        
 
285
        if (xt_db_open_db_by_id)
 
286
                len = xt_sl_get_size(xt_db_open_db_by_id);
 
287
        for (u_int i=0; i<len; i++) {
 
288
                if ((dbptr = (XTDatabaseHPtr *) xt_sl_item_at(xt_db_open_db_by_id, i))) {
 
289
                        db = *dbptr;
 
290
 
 
291
                        if (sync) {
 
292
                                /* Wait for the sweeper: */
 
293
                                xt_wait_for_sweeper(self, db, 16);
 
294
                                
 
295
                                /* Wait for the writer: */
 
296
                                xt_wait_for_writer(self, db);
 
297
 
 
298
                                /* Wait for the checkpointer: */
 
299
                                xt_wait_for_checkpointer(self, db);
 
300
                        }
 
301
                        xt_stop_flusher(self, db);
 
302
                        xt_stop_checkpointer(self, db);
 
303
                        xt_stop_writer(self, db);
 
304
                        xt_stop_sweeper(self, db);
 
305
                        xt_stop_compactor(self, db);
 
306
                        xt_db_stop_pool_threads(self, db);
 
307
                }
 
308
        }
 
309
}
 
310
 
 
311
xtPublic void xt_exit_databases(XTThreadPtr self)
 
312
{
 
313
        if (xt_db_open_databases) {
 
314
                xt_free_hashtable(self, xt_db_open_databases);
 
315
                xt_db_open_databases = NULL;
 
316
        }
 
317
        if (xt_db_open_db_by_id) {
 
318
                xt_free_sortedlist(self, xt_db_open_db_by_id);
 
319
                xt_db_open_db_by_id = NULL;
 
320
        }
 
321
}
 
322
 
 
323
xtPublic void xt_create_database(XTThreadPtr self, char *path)
 
324
{
 
325
        xt_fs_mkdir(self, path);
 
326
}
 
327
 
 
328
static void db_finalize(XTThreadPtr self, void *x)
 
329
{
 
330
        XTDatabaseHPtr  db = (XTDatabaseHPtr) x;
 
331
 
 
332
        xt_stop_flusher(self, db);
 
333
        xt_stop_checkpointer(self, db);
 
334
        xt_stop_compactor(self, db);
 
335
        xt_stop_sweeper(self, db);
 
336
        xt_stop_writer(self, db);
 
337
        xt_db_thread_pool_exit(self, db);
 
338
 
 
339
        xt_sl_delete(self, xt_db_open_db_by_id, &db->db_id);
 
340
        /* 
 
341
         * Important is that xt_db_pool_exit() is called
 
342
         * before xt_xn_exit_db() because xt_xn_exit_db()
 
343
         * frees the checkpoint information which
 
344
         * may be required to shutdown the tables, which
 
345
         * flushes tables, and therefore does a checkpoint.
 
346
         */
 
347
        /* This was the previous order of shutdown:
 
348
        xt_xn_exit_db(self, db);
 
349
        xt_dl_exit_db(self, db);
 
350
        xt_db_pool_exit(self, db);
 
351
        db->db_indlogs.ilp_exit(self);
 
352
        */
 
353
 
 
354
        xt_db_pool_exit(self, db);
 
355
        db->db_indlogs.ilp_exit(self); 
 
356
        xt_dl_exit_db(self, db);
 
357
        xt_xn_exit_db(self, db);
 
358
        xt_tab_exit_db(self, db);
 
359
        if (db->db_name) {
 
360
                xt_free(self, db->db_name);
 
361
                db->db_name = NULL;
 
362
        }
 
363
        if (db->db_main_path) {
 
364
                xt_free(self, db->db_main_path);
 
365
                db->db_main_path = NULL;
 
366
        }
 
367
        xt_free_mutex(&db->db_init_sweep_lock);
 
368
}
 
369
 
 
370
static void db_onrelease(void *XT_UNUSED(x))
 
371
{
 
372
        /* Signal threads waiting for exclusive use of the database: */
 
373
        if (xt_db_open_databases)       // The database may already be closed.
 
374
                xt_ht_signal(NULL, xt_db_open_databases);
 
375
}
 
376
 
 
377
xtPublic void xt_add_pbxt_file(size_t size, char *path, const char *file)
 
378
{
 
379
        xt_add_dir_char(size, path);
 
380
        xt_strcat(size, path, "pbxt");
 
381
        xt_add_dir_char(size, path);
 
382
        xt_strcat(size, path, file);
 
383
}
 
384
 
 
385
xtPublic void xt_add_location_file(size_t size, char *path)
 
386
{
 
387
        xt_add_dir_char(size, path);
 
388
        xt_strcat(size, path, "pbxt");
 
389
        xt_add_dir_char(size, path);
 
390
        xt_strcat(size, path, "location");
 
391
}
 
392
 
 
393
xtPublic void xt_add_tables_file(size_t size, char *path)
 
394
{
 
395
        xt_add_dir_char(size, path);
 
396
        xt_strcat(size, path, "pbxt");
 
397
        xt_add_dir_char(size, path);
 
398
        xt_strcat(size, path, "tables");
 
399
}
 
400
 
 
401
xtPublic void xt_add_pbxt_dir(size_t size, char *path)
 
402
{
 
403
        xt_add_dir_char(size, path);
 
404
        xt_strcat(size, path, "pbxt");
 
405
}
 
406
 
 
407
xtPublic void xt_add_system_dir(size_t size, char *path)
 
408
{
 
409
        xt_add_dir_char(size, path);
 
410
        xt_strcat(size, path, "pbxt");
 
411
        xt_add_dir_char(size, path);
 
412
        xt_strcat(size, path, "system");
 
413
}
 
414
 
 
415
xtPublic void xt_add_data_dir(size_t size, char *path)
 
416
{
 
417
        xt_add_dir_char(size, path);
 
418
        xt_strcat(size, path, "pbxt");
 
419
        xt_add_dir_char(size, path);
 
420
        xt_strcat(size, path, "data");
 
421
}
 
422
 
 
423
/*
 
424
 * I have a problem here. I cannot rely on the path given to xt_get_database() to be
 
425
 * consistant. When called from ha_create_table() the path is not modified.
 
426
 * However when called from ha_open() the path is first transformed by a call to
 
427
 * fn_format(). I have given an example from a stack trace below.
 
428
 *
 
429
 * In this case the odd path comes from the option:
 
430
 * --tmpdir=/Users/build/Development/mysql/debug-mysql/mysql-test/var//tmp
 
431
 *
 
432
 * #3  0x001a3818 in ha_pbxt::create(char const*, st_table*, st_ha_create_information*) 
 
433
 *     (this=0x2036898, table_path=0xf0060bd0 "/users/build/development/mysql/debug-my
 
434
 *     sql/mysql-test/var//tmp/#sql5718_1_0.frm", table_arg=0xf00601c0,
 
435
 *     create_info=0x2017410) at ha_pbxt.cc:2323
 
436
 * #4  0x00140d74 in ha_create_table(char const*, st_ha_create_information*, bool) 
 
437
 *     (name=0xf0060bd0 "/users/build/development/mysql/debug-mysql/mysql-te
 
438
 *     st/var//tmp/#sql5718_1_0.frm", create_info=0x2017410, 
 
439
 *     update_create_info=false) at handler.cc:1387
 
440
 *
 
441
 * #4  0x0013f7a4 in handler::ha_open(char const*, int, int) (this=0x203ba98, 
 
442
 *     name=0xf005eb70 "/users/build/development/mysql/debug-mysql/mysql-te
 
443
 *     st/var/tmp/#sql5718_1_1", mode=2, test_if_locked=2) at handler.cc:993
 
444
 * #5  0x000cd900 in openfrm(char const*, char const*, unsigned, unsigned, 
 
445
 *     unsigned, st_table*) (name=0xf005f260 "/users/build/development/mys
 
446
 *     ql/debug-mysql/mysql-test/var//tmp/#sql5718_1_1.frm", 
 
447
 *     alias=0xf005fb90 "#sql-5718_1", db_stat=7, prgflag=44, 
 
448
 *     ha_open_flags=0, outparam=0x2039e18) at table.cc:771
 
449
 *
 
450
 * As a result, I no longer use the entire path as the key to find a database.
 
451
 * Just the last component of the path (i.e. the database name) should be
 
452
 * sufficient!?
 
453
 */
 
454
xtPublic XTDatabaseHPtr xt_get_database(XTThreadPtr self, const char *path, xtBool multi_path)
 
455
{
 
456
        XTDatabaseHPtr  db = NULL;
 
457
        char                    db_path[PATH_MAX];
 
458
        char                    db_name[NAME_MAX];
 
459
        xtBool                  multi_path_db = FALSE;
 
460
 
 
461
        /* A database may not be in use when this is called. */
 
462
        ASSERT(!self->st_database);
 
463
        xt_ht_lock(self, xt_db_open_databases);
 
464
        pushr_(xt_ht_unlock, xt_db_open_databases);
 
465
 
 
466
        xt_strcpy(PATH_MAX, db_path, path);
 
467
        xt_add_location_file(PATH_MAX, db_path);
 
468
        if (multi_path || xt_fs_exists(db_path))
 
469
                multi_path_db = TRUE;
 
470
 
 
471
        xt_strcpy(PATH_MAX, db_path, path);
 
472
        xt_remove_dir_char(db_path);
 
473
        xt_strcpy(NAME_MAX, db_name, xt_last_directory_of_path(db_path));
 
474
 
 
475
        db = (XTDatabaseHPtr) xt_ht_get(self, xt_db_open_databases, db_name);
 
476
        if (!db) {
 
477
                pushsr_(db, xt_heap_release, (XTDatabaseHPtr) xt_heap_new(self, sizeof(XTDatabaseRec), db_finalize));
 
478
                xt_heap_set_release_callback(db, db_onrelease);
 
479
                xt_init_mutex_with_autoname(self, &db->db_init_sweep_lock);
 
480
                db->db_id = db_next_id++;
 
481
                db->db_name = xt_dup_string(self, db_name);
 
482
                db->db_main_path = xt_dup_string(self, db_path);
 
483
                db->db_multi_path = multi_path_db;
 
484
#ifdef XT_TEST_XACT_OVERFLOW
 
485
                /* Test transaction ID overflow: */
 
486
                db->db_xn_curr_id = 0xFFFFFFFF - 30;
 
487
#endif
 
488
                xt_db_pool_init(self, db);
 
489
                xt_tab_init_db(self, db);
 
490
                xt_dl_init_db(self, db);
 
491
 
 
492
                /* Initialize the index logs: */
 
493
                db->db_indlogs.ilp_init(self, db, XT_INDEX_WRITE_BUFFER_SIZE); 
 
494
 
 
495
                /* Recover in xt_xn_init_db() may use background threads!: */
 
496
                xt_db_thread_pool_init(self, db);
 
497
 
 
498
                xt_xn_init_db(self, db);
 
499
                xt_sl_insert(self, xt_db_open_db_by_id, &db->db_id, &db);
 
500
 
 
501
                xt_start_sweeper(self, db);
 
502
                xt_start_compactor(self, db);
 
503
                xt_start_writer(self, db);
 
504
                xt_start_checkpointer(self, db);
 
505
                if (xt_db_flush_log_at_trx_commit == 0 || xt_db_flush_log_at_trx_commit == 2)
 
506
                        xt_start_flusher(self, db);
 
507
 
 
508
                popr_();
 
509
                xt_ht_put(self, xt_db_open_databases, db);
 
510
 
 
511
                /* The recovery process could attach parts of the open
 
512
                 * database to the thread!
 
513
                 */
 
514
                xt_unuse_database(self, self);
 
515
        }
 
516
        xt_heap_reference(self, db);
 
517
        freer_();
 
518
 
 
519
        /* {INDEX-RECOV_ROWID}
 
520
         * Wait for sweeper to finish processing possibly
 
521
         * unswept transactions after recovery.
 
522
         * This is required because during recovery for
 
523
         * all index entries written the row_id is set.
 
524
         *
 
525
         * When the row ID is set, this means that the row
 
526
         * is "clean". i.e. visible to all transactions.
 
527
         *
 
528
         * Obviously this is not necessary the case for all
 
529
         * index entries recovered. For example, 
 
530
         * transactions that still need to be swept may be
 
531
         * rolled back.
 
532
         *
 
533
         * As a result, we have to wait the the sweeper
 
534
         * to complete. Only then can we be sure that
 
535
         * all index entries that are not visible have
 
536
         * been removed.
 
537
         *
 
538
         * REASON WHY WE SET ROWID ON RECOVERY:
 
539
         * The row ID is set on recovery because the
 
540
         * change to the index may be lost after a crash.
 
541
         * The change to the index is done by the sweeper, and
 
542
         * there is no record of this change in the log.
 
543
         * The sweeper will not "re-sweep" all transations
 
544
         * that are recovered. As a result, this upadte
 
545
         * of the index by the sweeper may be lost.
 
546
         *
 
547
         * {OPEN-DB-SWEEPER-WAIT}
 
548
         * This has been moved to after the release of the open
 
549
         * database lock because:
 
550
         *
 
551
         * - We are waiting for the sweeper which may run out of
 
552
         * record cache.
 
553
         * - If it runs out of cache it well wait
 
554
         * for the freeer thread.
 
555
         * - For the freeer thread to be able to work it needs
 
556
         * to open the database.
 
557
         * - To open the database it needs the open database
 
558
         * lock.
 
559
         */
 
560
        /*
 
561
         * This has been moved, see: {WAIT-FOR-SW-AFTER-RECOV}
 
562
        pushr_(xt_heap_release, db);
 
563
        xt_wait_for_sweeper(self, db, 0);
 
564
        popr_();
 
565
        */
 
566
 
 
567
        return db;
 
568
}
 
569
 
 
570
xtPublic XTDatabaseHPtr xt_get_database_by_id(XTThreadPtr self, xtDatabaseID db_id)
 
571
{
 
572
        XTDatabaseHPtr  *dbptr;
 
573
        XTDatabaseHPtr  db = NULL;
 
574
 
 
575
        xt_ht_lock(self, xt_db_open_databases);
 
576
        pushr_(xt_ht_unlock, xt_db_open_databases);
 
577
        if ((dbptr = (XTDatabaseHPtr *) xt_sl_find(self, xt_db_open_db_by_id, &db_id))) {
 
578
                db = *dbptr;
 
579
                xt_heap_reference(self, db);
 
580
        }
 
581
        freer_(); // xt_ht_unlock(xt_db_open_databases)
 
582
        return db;
 
583
}
 
584
 
 
585
xtPublic void xt_drop_database(XTThreadPtr self, XTDatabaseHPtr db)
 
586
{
 
587
        char                    path[PATH_MAX];
 
588
        char                    db_name[NAME_MAX];
 
589
        XTOpenDirPtr    od;
 
590
        char                    *file;
 
591
        XTTablePathPtr  *tp_ptr;
 
592
 
 
593
        xt_ht_lock(self, xt_db_open_databases);
 
594
        pushr_(xt_ht_unlock, xt_db_open_databases);
 
595
 
 
596
        /* Shutdown the database daemons: */
 
597
        xt_stop_flusher(self, db);
 
598
        xt_stop_checkpointer(self, db);
 
599
        xt_stop_sweeper(self, db);
 
600
        xt_stop_compactor(self, db);
 
601
        xt_stop_writer(self, db);
 
602
        xt_db_thread_pool_exit(self, db);
 
603
 
 
604
        /* Remove the database from the directory: */
 
605
        xt_strcpy(NAME_MAX, db_name, db->db_name);
 
606
        xt_ht_del(self, xt_db_open_databases, db_name);
 
607
 
 
608
        /* Release the lock on the database directory: */
 
609
        freer_(); // xt_ht_unlock(xt_db_open_databases)
 
610
 
 
611
        /* Delete the transaction logs: */
 
612
        xt_xlog_delete_logs(self, db);
 
613
 
 
614
        /* Delete the data logs: */
 
615
        xt_dl_delete_logs(self, db);
 
616
 
 
617
        for (u_int i=0; i<xt_sl_get_size(db->db_table_paths); i++) {
 
618
 
 
619
                tp_ptr = (XTTablePathPtr *) xt_sl_item_at(db->db_table_paths, i);
 
620
 
 
621
                xt_strcpy(PATH_MAX, path, (*tp_ptr)->tp_path);
 
622
 
 
623
                /* Delete all files in the database: */
 
624
                pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
 
625
                while (xt_dir_next(self, od)) {
 
626
                        file = xt_dir_name(self, od);
 
627
                        if (xt_ends_with(file, ".xtr") ||
 
628
                                xt_ends_with(file, ".xtd") ||
 
629
                                xt_ends_with(file, ".xti") ||
 
630
                                xt_ends_with(file, ".xt"))
 
631
                        {
 
632
                                xt_add_dir_char(PATH_MAX, path);
 
633
                                xt_strcat(PATH_MAX, path, file);
 
634
                                xt_fs_delete(self, path);
 
635
                                xt_remove_last_name_of_path(path);
 
636
                        }
 
637
                }
 
638
                freer_(); // xt_dir_close(od)
 
639
                
 
640
        }
 
641
        if (!db->db_multi_path) {
 
642
                xt_strcpy(PATH_MAX, path, db->db_main_path);
 
643
                xt_add_pbxt_dir(PATH_MAX, path);
 
644
                if (!xt_fs_rmdir(NULL, path))
 
645
                        xt_log_and_clear_exception(self);
 
646
        }
 
647
}
 
648
 
 
649
/*
 
650
 * Open/use a database.
 
651
 */
 
652
xtPublic void xt_open_database(XTThreadPtr self, const char *path, xtBool multi_path)
 
653
{
 
654
        XTDatabaseHPtr db;
 
655
 
 
656
        /* We cannot get a database, without unusing the current
 
657
         * first. The reason is that the restart process will
 
658
         * partially set the current database!
 
659
         */
 
660
        xt_unuse_database(self, self);
 
661
        db = xt_get_database(self, path, multi_path);
 
662
        pushr_(xt_heap_release, db);
 
663
        xt_use_database(self, db, XT_FOR_USER);
 
664
        freer_();       // xt_heap_release(self, db);   
 
665
}
 
666
 
 
667
/* This function can only be called if you do not already have a database in
 
668
 * use. This is because to get a database pointer you are not allowed
 
669
 * to have a database in use!
 
670
 */
 
671
xtPublic void xt_use_database(XTThreadPtr self, XTDatabaseHPtr db, int what_for)
 
672
{
 
673
        /* Check if a transaction is in progress. If so,
 
674
         * we cannot change the database!
 
675
         */
 
676
        if (self->st_xact_data || self->st_database)
 
677
                xt_throw_xterr(XT_CONTEXT, XT_ERR_CANNOT_CHANGE_DB);
 
678
 
 
679
        xt_heap_reference(self, db);
 
680
        self->st_database = db;
 
681
#ifdef XT_WAIT_FOR_CLEANUP
 
682
        self->st_last_xact = 0;
 
683
        for (int i=0; i<XT_MAX_XACT_BEHIND; i++) {
 
684
                self->st_prev_xact[i] = db->db_xn_curr_id;
 
685
        }
 
686
#endif
 
687
        xt_xn_init_thread(self, what_for);
 
688
}
 
689
 
 
690
xtPublic void xt_unuse_database(XTThreadPtr self, XTThreadPtr other_thr)
 
691
{
 
692
        XTTask *tk;
 
693
 
 
694
        /* Wait for any asynchronous tasks to complete: */
 
695
        xt_wait_for_async_tasks(self);
 
696
 
 
697
        /* Free the results, if any: */
 
698
        while ((tk = (XTTask *) xt_get_task_result(self)))
 
699
                tk->tk_release();
 
700
 
 
701
        /* Abort the transacion if it belongs exclusively to this thread. */
 
702
        xt_lock_mutex(self, &other_thr->t_lock);
 
703
        pushr_(xt_unlock_mutex, &other_thr->t_lock);
 
704
 
 
705
        xt_xn_exit_thread(other_thr);
 
706
        if (other_thr->st_database) {
 
707
                xt_heap_release(self, other_thr->st_database);
 
708
                other_thr->st_database = NULL;
 
709
        }
 
710
        
 
711
        freer_();
 
712
}
 
713
 
 
714
xtPublic void xt_db_init_thread_ns(XTThreadPtr XT_UNUSED(new_thread))
 
715
{
 
716
#ifdef XT_IMPLEMENT_NO_ACTION
 
717
        memset(&new_thread->st_restrict_list, 0, sizeof(XTBasicListRec));
 
718
        new_thread->st_restrict_list.bl_item_size = sizeof(XTRestrictItemRec);
 
719
#endif
 
720
}
 
721
 
 
722
xtPublic void xt_db_exit_thread(XTThreadPtr self)
 
723
{
 
724
#ifdef XT_IMPLEMENT_NO_ACTION
 
725
        xt_bl_free(NULL, &self->st_restrict_list);
 
726
#endif
 
727
        xt_unuse_database(self, self);
 
728
}
 
729
 
 
730
/*
 
731
 * -----------------------------------------------------------------------
 
732
 * OPEN TABLE POOL
 
733
 */
 
734
 
 
735
#ifdef UNUSED_CODE
 
736
static void check_free_list(XTDatabaseHPtr db)
 
737
{
 
738
        XTOpenTablePtr  ot;
 
739
        u_int                   cnt = 0;
 
740
 
 
741
        ot = db->db_ot_pool.otp_mr_used;
 
742
        if (ot)
 
743
                ASSERT_NS(!ot->ot_otp_mr_used);
 
744
        ot = db->db_ot_pool.otp_lr_used;
 
745
        if (ot)
 
746
                ASSERT_NS(!ot->ot_otp_lr_used);
 
747
        while (ot) {
 
748
                cnt++;
 
749
                ot = ot->ot_otp_mr_used;
 
750
        }
 
751
        ASSERT_NS(cnt == db->db_ot_pool.otp_total_free);
 
752
}
 
753
#endif
 
754
 
 
755
xtPublic void xt_db_pool_init(XTThreadPtr self, XTDatabaseHPtr db)
 
756
{
 
757
        memset(&db->db_ot_pool, 0, sizeof(XTAllTablePoolsRec));
 
758
        xt_init_mutex_with_autoname(self, &db->db_ot_pool.opt_lock);
 
759
        xt_init_cond(self, &db->db_ot_pool.opt_cond);
 
760
}
 
761
 
 
762
xtPublic void xt_db_pool_exit(XTThreadPtr self, XTDatabaseHPtr db)
 
763
{
 
764
        XTOpenTablePoolPtr      table_pool, tmp;
 
765
        XTOpenTablePtr          ot, tmp_ot;
 
766
 
 
767
        xt_free_mutex(&db->db_ot_pool.opt_lock);
 
768
        xt_free_cond(&db->db_ot_pool.opt_cond);
 
769
        
 
770
        for (u_int i=0; i<XT_OPEN_TABLE_POOL_HASH_SIZE; i++) {
 
771
                table_pool = db->db_ot_pool.otp_hash[i];
 
772
                while (table_pool) {
 
773
                        tmp = table_pool->opt_next_hash;
 
774
                        ot = table_pool->opt_free_list;
 
775
                        while (ot) {
 
776
                                tmp_ot = ot->ot_otp_next_free;
 
777
                                ot->ot_thread = self;
 
778
                                xt_close_table(ot, TRUE, FALSE);
 
779
                                ot = tmp_ot;
 
780
                        }
 
781
                        xt_free(self, table_pool);
 
782
                        table_pool = tmp;
 
783
                }
 
784
        }
 
785
}
 
786
 
 
787
static XTOpenTablePoolPtr db_get_open_table_pool(XTDatabaseHPtr db, xtTableID tab_id)
 
788
{
 
789
        XTOpenTablePoolPtr      table_pool;
 
790
        u_int                           hash;
 
791
 
 
792
        hash = tab_id % XT_OPEN_TABLE_POOL_HASH_SIZE;
 
793
        table_pool = db->db_ot_pool.otp_hash[hash];
 
794
        while (table_pool) {
 
795
                if (table_pool->opt_tab_id == tab_id)
 
796
                        return table_pool;
 
797
                table_pool = table_pool->opt_next_hash;
 
798
        }
 
799
        
 
800
        if (!(table_pool = (XTOpenTablePoolPtr) xt_malloc_ns(sizeof(XTOpenTablePoolRec))))
 
801
                return NULL;
 
802
 
 
803
        table_pool->opt_db = db;
 
804
        table_pool->opt_tab_id = tab_id;
 
805
        table_pool->opt_total_open = 0;
 
806
        table_pool->opt_locked = XT_TABLE_NOT_LOCKED;
 
807
        table_pool->opt_free_list = NULL;
 
808
        table_pool->opt_next_hash = db->db_ot_pool.otp_hash[hash];
 
809
        db->db_ot_pool.otp_hash[hash] = table_pool;
 
810
        
 
811
        return table_pool;
 
812
}
 
813
 
 
814
static void db_free_open_table_pool(XTThreadPtr self, XTOpenTablePoolPtr table_pool)
 
815
{
 
816
        if (!table_pool->opt_locked && !table_pool->opt_total_open) {
 
817
                XTOpenTablePoolPtr      ptr, pptr = NULL;
 
818
                u_int                           hash;
 
819
 
 
820
                hash = table_pool->opt_tab_id % XT_OPEN_TABLE_POOL_HASH_SIZE;
 
821
                ptr = table_pool->opt_db->db_ot_pool.otp_hash[hash];
 
822
                while (ptr) {
 
823
                        if (ptr == table_pool)
 
824
                                break;
 
825
                        pptr = ptr;
 
826
                        ptr = ptr->opt_next_hash;
 
827
                }
 
828
                
 
829
                if (ptr == table_pool) {
 
830
                        if (pptr)
 
831
                                pptr->opt_next_hash = table_pool->opt_next_hash;
 
832
                        else
 
833
                                table_pool->opt_db->db_ot_pool.otp_hash[hash] = table_pool->opt_next_hash;
 
834
                }
 
835
 
 
836
                xt_free(self, table_pool);
 
837
        }
 
838
}
 
839
 
 
840
static XTOpenTablePoolPtr db_lock_table_pool(XTThreadPtr self, XTDatabaseHPtr db, xtTableID tab_id, xtBool flush_table)
 
841
{
 
842
        XTOpenTablePoolPtr      table_pool;
 
843
        XTOpenTablePtr          ot, tmp_ot;
 
844
 
 
845
        xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
 
846
        pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
 
847
 
 
848
        if (!(table_pool = db_get_open_table_pool(db, tab_id)))
 
849
                xt_throw(self);
 
850
 
 
851
        /* Wait for the lock: */
 
852
        while (table_pool->opt_locked) {
 
853
                xt_timed_wait_cond(self, &db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
 
854
                if (!(table_pool = db_get_open_table_pool(db, tab_id)))
 
855
                        xt_throw(self);
 
856
        }
 
857
 
 
858
        /* Enter locking phase 1: */
 
859
        table_pool->opt_locked = XT_TABLE_LOCK_WAITING;
 
860
 
 
861
        if (flush_table) {
 
862
                /* Don't know if this is interesting as a phase, but anyway... */
 
863
                table_pool->opt_locked = XT_TABLE_LOCK_FLUSHING;
 
864
                freer_(); // xt_unlock_mutex(db_ot_pool.opt_lock)
 
865
 
 
866
                pushr_(xt_db_unlock_table_pool, table_pool);
 
867
                /* During this time, background processes can use the
 
868
                 * pool!
 
869
                 *
 
870
                 * May also do a flush, but this is now taken care
 
871
                 * of here {FLUSH-BUG}
 
872
                 */
 
873
                if ((ot = xt_db_open_pool_table(self, db, tab_id, NULL, TRUE))) {
 
874
                        pushr_(xt_db_return_table_to_pool, ot);
 
875
                        xt_sync_flush_table(self, ot, 0);
 
876
                        freer_(); //xt_db_return_table_to_pool_foreground(ot);
 
877
                }
 
878
 
 
879
                popr_(); // Discard xt_db_unlock_table_pool_no_lock(table_pool)
 
880
 
 
881
                xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
 
882
                pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
 
883
        }
 
884
        
 
885
        /* Free all open tables not in use: */
 
886
        ot = table_pool->opt_free_list;
 
887
        table_pool->opt_free_list = NULL;
 
888
        while (ot) {
 
889
                tmp_ot = ot->ot_otp_next_free;
 
890
 
 
891
                /* Remove from MRU list: */
 
892
                if (db->db_ot_pool.otp_lr_used == ot)
 
893
                        db->db_ot_pool.otp_lr_used = ot->ot_otp_mr_used;
 
894
                if (db->db_ot_pool.otp_mr_used == ot)
 
895
                        db->db_ot_pool.otp_mr_used = ot->ot_otp_lr_used;
 
896
                if (ot->ot_otp_lr_used)
 
897
                        ot->ot_otp_lr_used->ot_otp_mr_used = ot->ot_otp_mr_used;
 
898
                if (ot->ot_otp_mr_used)
 
899
                        ot->ot_otp_mr_used->ot_otp_lr_used = ot->ot_otp_lr_used;
 
900
 
 
901
                if (db->db_ot_pool.otp_lr_used)
 
902
                        db->db_ot_pool.otp_free_time = db->db_ot_pool.otp_lr_used->ot_otp_free_time;
 
903
                
 
904
                ASSERT_NS(db->db_ot_pool.otp_total_free > 0);
 
905
                db->db_ot_pool.otp_total_free--;
 
906
 
 
907
                /* Close the table: */
 
908
                ASSERT(table_pool->opt_total_open > 0);
 
909
                table_pool->opt_total_open--;
 
910
 
 
911
                ot->ot_thread = self;
 
912
                xt_close_table(ot, table_pool->opt_total_open == 0, FALSE);
 
913
 
 
914
                /* Go to the next: */
 
915
                ot = tmp_ot;
 
916
        }
 
917
 
 
918
        /* Wait for other to close: */
 
919
        while (table_pool->opt_total_open > 0) {
 
920
                xt_timed_wait_cond_ns(&db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
 
921
        }
 
922
 
 
923
        /* 2nd phase, now the table is really locked: */
 
924
        table_pool->opt_locked = XT_TABLE_LOCKED;
 
925
 
 
926
        freer_(); // xt_unlock_mutex(db_ot_pool.opt_lock)
 
927
        return table_pool;
 
928
}
 
929
 
 
930
/*
 
931
 * This function locks a particular table by locking the table directory
 
932
 * and waiting for all open tables handles to close.
 
933
 *
 
934
 * Things are a bit complicated because the sweeper must be turned off before
 
935
 * the table directory is locked.
 
936
 */
 
937
xtPublic XTOpenTablePoolPtr xt_db_lock_table_pool_by_name(XTThreadPtr self, XTDatabaseHPtr db, XTPathStrPtr tab_name, xtBool no_load, xtBool flush_table, xtBool missing_ok, XTTableHPtr *ret_tab)
 
938
{
 
939
        XTOpenTablePoolPtr      table_pool;
 
940
        XTTableHPtr                     tab;
 
941
        xtTableID                       tab_id;
 
942
 
 
943
        pushsr_(tab, xt_heap_release, xt_use_table(self, tab_name, no_load, missing_ok));
 
944
        if (!tab) {
 
945
                freer_(); // xt_heap_release(tab)
 
946
                return NULL;
 
947
        }
 
948
 
 
949
        tab_id = tab->tab_id;
 
950
 
 
951
        if (ret_tab) {
 
952
                *ret_tab = tab;
 
953
                table_pool = db_lock_table_pool(self, db, tab_id, flush_table);
 
954
                popr_(); // Discard xt_heap_release(tab)
 
955
                return table_pool;
 
956
        }
 
957
 
 
958
        freer_(); // xt_heap_release(tab)
 
959
        return db_lock_table_pool(self, db, tab_id, flush_table);
 
960
}
 
961
 
 
962
xtPublic void xt_db_unlock_table_pool(XTThreadPtr self, XTOpenTablePoolPtr table_pool)
 
963
{
 
964
        XTDatabaseHPtr db;
 
965
 
 
966
        if (!table_pool)
 
967
                return;
 
968
 
 
969
        db = table_pool->opt_db;
 
970
        xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
 
971
        pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
 
972
 
 
973
        table_pool->opt_locked = XT_TABLE_NOT_LOCKED;
 
974
        xt_broadcast_cond(self, &db->db_ot_pool.opt_cond);
 
975
        db_free_open_table_pool(NULL, table_pool);
 
976
 
 
977
        freer_(); // xt_unlock_mutex(db_ot_pool.opt_lock)
 
978
}
 
979
 
 
980
xtPublic XTOpenTablePtr xt_db_open_table_using_tab(XTTableHPtr tab, XTThreadPtr thread)
 
981
{
 
982
        XTDatabaseHPtr          db = tab->tab_db;
 
983
        XTOpenTablePoolPtr      table_pool;
 
984
        XTOpenTablePtr          ot;
 
985
 
 
986
        xt_lock_mutex_ns(&db->db_ot_pool.opt_lock);
 
987
 
 
988
        if (!(table_pool = db_get_open_table_pool(db, tab->tab_id)))
 
989
                goto failed;
 
990
 
 
991
        while (table_pool->opt_locked) {
 
992
                if (!xt_timed_wait_cond_ns(&db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000))
 
993
                        goto failed_1;
 
994
                if (!(table_pool = db_get_open_table_pool(db, tab->tab_id)))
 
995
                        goto failed;
 
996
        }
 
997
 
 
998
        if ((ot = table_pool->opt_free_list)) {
 
999
                /* Remove from the free list: */
 
1000
                table_pool->opt_free_list = ot->ot_otp_next_free;
 
1001
                
 
1002
                /* Remove from MRU list: */
 
1003
                if (db->db_ot_pool.otp_lr_used == ot)
 
1004
                        db->db_ot_pool.otp_lr_used = ot->ot_otp_mr_used;
 
1005
                if (db->db_ot_pool.otp_mr_used == ot)
 
1006
                        db->db_ot_pool.otp_mr_used = ot->ot_otp_lr_used;
 
1007
                if (ot->ot_otp_lr_used)
 
1008
                        ot->ot_otp_lr_used->ot_otp_mr_used = ot->ot_otp_mr_used;
 
1009
                if (ot->ot_otp_mr_used)
 
1010
                        ot->ot_otp_mr_used->ot_otp_lr_used = ot->ot_otp_lr_used;
 
1011
 
 
1012
                if (db->db_ot_pool.otp_lr_used)
 
1013
                        db->db_ot_pool.otp_free_time = db->db_ot_pool.otp_lr_used->ot_otp_free_time;
 
1014
 
 
1015
                ASSERT_NS(db->db_ot_pool.otp_total_free > 0);
 
1016
                db->db_ot_pool.otp_total_free--;
 
1017
 
 
1018
                ot->ot_thread = thread;
 
1019
                goto done_ok;
 
1020
        }
 
1021
 
 
1022
        if ((ot = xt_open_table(tab))) {
 
1023
                ot->ot_thread = thread;
 
1024
                table_pool->opt_total_open++;
 
1025
        }
 
1026
 
 
1027
        done_ok:
 
1028
        db_free_open_table_pool(NULL, table_pool);
 
1029
        xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
 
1030
        return ot;
 
1031
 
 
1032
        failed_1:
 
1033
        db_free_open_table_pool(NULL, table_pool);
 
1034
 
 
1035
        failed:
 
1036
        xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
 
1037
        return NULL;
 
1038
}
 
1039
 
 
1040
xtPublic xtBool xt_db_open_pool_table_ns(XTOpenTablePtr *ret_ot, XTDatabaseHPtr db, xtTableID tab_id)
 
1041
{
 
1042
        XTThreadPtr     self = xt_get_self();
 
1043
        xtBool          ok = TRUE;
 
1044
 
 
1045
        try_(a) {
 
1046
                *ret_ot = xt_db_open_pool_table(self, db, tab_id, NULL, FALSE);
 
1047
        }
 
1048
        catch_(a) {
 
1049
                ok = FALSE;
 
1050
        }
 
1051
        cont_(a);
 
1052
        return ok;
 
1053
}
 
1054
 
 
1055
xtPublic XTOpenTablePtr xt_db_open_pool_table(XTThreadPtr self, XTDatabaseHPtr db, xtTableID tab_id, int *result, xtBool i_am_background)
 
1056
{
 
1057
        XTOpenTablePtr          ot;
 
1058
        XTOpenTablePoolPtr      table_pool;
 
1059
        XTTableHPtr                     tab;
 
1060
 
 
1061
        xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
 
1062
        pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
 
1063
 
 
1064
        if (!(table_pool = db_get_open_table_pool(db, tab_id)))
 
1065
                xt_throw(self);
 
1066
 
 
1067
        /* Background processes do not have to wait while flushing!
 
1068
         *
 
1069
         * I think I did this so that the background process would
 
1070
         * not hang during flushing. Exact reason currently
 
1071
         * unknown (maybe not any more see {HANG-ON-FREEER} below).
 
1072
         *
 
1073
         * NOTE: I have taken out check:
 
1074
         * && !(i_am_background && table_pool->opt_flushing) below.
 
1075
         * To which the text above refers. The reason is because it
 
1076
         * has been replaced by the more general rule (described
 
1077
         * below). This was used to ensure that background
 
1078
         * threads do not hang when the table is flushed
 
1079
         * in db_lock_table_pool(). Now (see below) I make
 
1080
         * sure that a background thread does not hang during a
 
1081
         * lock table, as long as the locker is still waiting!
 
1082
         *
 
1083
         * The fact that a background thread can get a open table
 
1084
         * handle while a user thread is locking and flushing a table
 
1085
         * led to the situation that the checkpointer
 
1086
         * could flush at the same time as a user process
 
1087
         * which was flushing due to a rename.
 
1088
         *
 
1089
         * This led to the situation described here: {FLUSH-BUG},
 
1090
         * which is now fixed.
 
1091
         *
 
1092
         * {HANG-ON-FREEER}
 
1093
         * 
 
1094
         * This error occurred during count_distinct3
 
1095
         *
 
1096
         * The sweeper is waiting for the free'er, but the sweeper has a table
 
1097
         * open (Table ./test/t2 test, ID 2)
 
1098
         *
 
1099
         * if (!xt_timed_wait_cond_ns(&dcg->tcm_freeer_cond, &dcg->tcm_freeer_lock, 30000)) {
 
1100
         *      dcg->tcm_threads_waiting--;
 
1101
         *      break;
 
1102
         * }
 
1103
         * #3   0x00e2a5b6 in xt_p_cond_timedwait at pthread_xt.cc:697
 
1104
         * #4   0x00e51543 in xt_timed_wait_cond at thread_xt.cc:2053
 
1105
         * #5   0x00e38fac in XTTabCache::tc_fetch at tabcache_xt.cc:682
 
1106
         * #6   0x00e3974c in XTTabCache::xt_tc_write_cond at tabcache_xt.cc:279
 
1107
         * #7   0x00e5ce31 in xn_sw_cleanup_variation at xaction_xt.cc:2101
 
1108
         * #8   0x00e5d619 in xn_sw_cleanup_xact at xaction_xt.cc:2327
 
1109
         * #9   0x00e5dddd in xn_sw_main at xaction_xt.cc:2608
 
1110
         * #10  0x00e5e123 in xn_sw_run_thread at xaction_xt.cc:2741
 
1111
         * #11  0x00e50640 in thr_main at thread_xt.cc:1081
 
1112
         * #12  0x91fdb155 in _pthread_start
 
1113
         * #13  0x91fdb012 in thread_start
 
1114
         *
 
1115
         * The user thread is trying to drop the table but the table has 1 open table
 
1116
         * (Table ./test/t2 test, ID 2)
 
1117
         *
 
1118
         * while (table_pool->opt_total_open > 0) {
 
1119
         *      xt_timed_wait_cond(self, &db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
 
1120
         * }
 
1121
         * #3   0x00e2a5b6 in xt_p_cond_timedwait at pthread_xt.cc:697
 
1122
         * #4   0x00e51543 in xt_timed_wait_cond at thread_xt.cc:2053
 
1123
         * But fix: xt_db_wait_for_open_tables() has been removed!
 
1124
         * #5   0x00de88a2 in xt_db_wait_for_open_tables at database_xt.cc:966
 
1125
         * #6   0x00e3e983 in tab_lock_table at table_xt.cc:1557
 
1126
         * #7   0x00e3eb1e in xt_drop_table at table_xt.cc:1941
 
1127
         * #8   0x00e0e78e in ha_pbxt::delete_table at ha_pbxt.cc:4848
 
1128
         * #9   0x00243a14 in handler::ha_delete_table at handler.cc:3311
 
1129
         * #10  0x00243b7e in ha_delete_table at handler.cc:1954
 
1130
         * #11  0x0025c990 in mysql_rm_table_part2 at sql_table.cc:1724
 
1131
         * #12  0x0025cee8 in mysql_rm_table at sql_table.cc:1518
 
1132
         * #13  0x0011256c in mysql_execute_command at sql_parse.cc:3357
 
1133
         * #14  0x001188ed in mysql_parse at sql_parse.cc:5929
 
1134
         * #15  0x00119663 in dispatch_command at sql_parse.cc:1216
 
1135
         * #16  0x0011a960 in do_command at sql_parse.cc:857
 
1136
         * #17  0x00105a3c in handle_one_connection at sql_connect.cc:1115
 
1137
         * #18  0x91fdb155 in _pthread_start
 
1138
         * #19  0x91fdb012 in thread_start
 
1139
         * 
 
1140
         * The free'er would like to free some memory but cannot becuase it requires
 
1141
         * an open table handle, but the pool is locked by the user thread which
 
1142
         * wants to drop the table.
 
1143
         *
 
1144
         * // Free'er wants to get an open table, but the pool is locked (by the renamer)
 
1145
         * while (table_pool->opt_locked && !(i_am_background && table_pool->opt_flushing)) {
 
1146
         *      xt_timed_wait_cond(self, &db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
 
1147
         *      if (!(table_pool = db_get_open_table_pool(db, tab_id)))
 
1148
         *              xt_throw(self);
 
1149
         * }
 
1150
         * #0   0x91fb146e in __semwait_signal
 
1151
         * #1   0x91fdc3e6 in _pthread_cond_wait
 
1152
         * #2   0x920019f8 in pthread_cond_timedwait$UNIX2003
 
1153
         * #3   0x00e2a5b6 in xt_p_cond_timedwait at pthread_xt.cc:697
 
1154
         * #4   0x00e51543 in xt_timed_wait_cond at thread_xt.cc:2053
 
1155
         * #5   0x00de8d8a in xt_db_open_pool_table at database_xt.cc:1091
 
1156
         * #6   0x00e39ce3 in tabc_get_table at tabcache_xt.cc:983
 
1157
         * #7   0x00e39deb in tabc_free_page at tabcache_xt.cc:1028
 
1158
         * #8   0x00e3a5d2 in tabc_fr_main at tabcache_xt.cc:1227
 
1159
         * #9   0x00e3a954 in tabc_fr_run_thread at tabcache_xt.cc:1290
 
1160
         * #10  0x00e50640 in thr_main at thread_xt.cc:1081
 
1161
         * #11  0x91fdb155 in _pthread_start
 
1162
         * #12  0x91fdb012 in thread_start
 
1163
         *
 
1164
         * My proposed solution:
 
1165
         *
 
1166
         * Firstly, It can be assumed that background (system processes) will eventually
 
1167
         * stop activity, once all work has been done.
 
1168
         * So allowing them to proceed, even when the table is locked will
 
1169
         * not lead to a livelock.
 
1170
         *
 
1171
         * This means that we should allow background processes to proceed with a
 
1172
         * locked table, as long as the locker is waiting.
 
1173
         */
 
1174
        while (table_pool->opt_locked) {
 
1175
                if (i_am_background && table_pool->opt_locked != XT_TABLE_LOCKED) {
 
1176
                        /* Background processes can proceed, if the locker is not in the
 
1177
                         * final lock phase!
 
1178
                         */
 
1179
                        break;
 
1180
                }
 
1181
                xt_timed_wait_cond(self, &db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
 
1182
                if (!(table_pool = db_get_open_table_pool(db, tab_id)))
 
1183
                        xt_throw(self);
 
1184
        }
 
1185
 
 
1186
        /* Moved from above, because db_get_open_table_pool() may return a different
 
1187
         * pool on each call!
 
1188
        */
 
1189
        pushr_(db_free_open_table_pool, table_pool);    
 
1190
        
 
1191
        if ((ot = table_pool->opt_free_list)) {
 
1192
                /* Remove from the free list: */
 
1193
                table_pool->opt_free_list = ot->ot_otp_next_free;
 
1194
                
 
1195
                /* Remove from MRU list: */
 
1196
                if (db->db_ot_pool.otp_lr_used == ot)
 
1197
                        db->db_ot_pool.otp_lr_used = ot->ot_otp_mr_used;
 
1198
                if (db->db_ot_pool.otp_mr_used == ot)
 
1199
                        db->db_ot_pool.otp_mr_used = ot->ot_otp_lr_used;
 
1200
                if (ot->ot_otp_lr_used)
 
1201
                        ot->ot_otp_lr_used->ot_otp_mr_used = ot->ot_otp_mr_used;
 
1202
                if (ot->ot_otp_mr_used)
 
1203
                        ot->ot_otp_mr_used->ot_otp_lr_used = ot->ot_otp_lr_used;
 
1204
 
 
1205
                if (db->db_ot_pool.otp_lr_used)
 
1206
                        db->db_ot_pool.otp_free_time = db->db_ot_pool.otp_lr_used->ot_otp_free_time;
 
1207
 
 
1208
                ASSERT(db->db_ot_pool.otp_total_free > 0);
 
1209
                db->db_ot_pool.otp_total_free--;
 
1210
 
 
1211
                freer_(); // db_free_open_table_pool(table_pool)
 
1212
                freer_(); // xt_unlock_mutex(&db->db_ot_pool.opt_lock)
 
1213
                ot->ot_thread = self;
 
1214
                return ot;
 
1215
        }
 
1216
 
 
1217
        if (!(tab = xt_use_table_by_id(self, db, tab_id, result))) {
 
1218
                /* The table no longer exists, ignore the change: */
 
1219
                freer_(); // db_free_open_table_pool(table_pool)
 
1220
                freer_(); // xt_unlock_mutex(&db->db_ot_pool.opt_lock)
 
1221
                return NULL;
 
1222
        }
 
1223
 
 
1224
        /* xt_use_table_by_id returns a referenced tab! */
 
1225
        pushr_(xt_heap_release, tab);
 
1226
        if ((ot = xt_open_table(tab))) {
 
1227
                ot->ot_thread = self;
 
1228
                table_pool->opt_total_open++;
 
1229
        }
 
1230
        freer_(); // xt_release_heap(tab)
 
1231
 
 
1232
        freer_(); // db_free_open_table_pool(table_pool)
 
1233
        freer_(); // xt_unlock_mutex(&db->db_ot_pool.opt_lock)
 
1234
        return ot;
 
1235
}
 
1236
 
 
1237
xtPublic void xt_db_return_table_to_pool(XTThreadPtr XT_UNUSED(self), XTOpenTablePtr ot)
 
1238
{
 
1239
        xt_db_return_table_to_pool_ns(ot);
 
1240
}
 
1241
 
 
1242
xtPublic void xt_db_return_table_to_pool_ns(XTOpenTablePtr ot)
 
1243
{
 
1244
        XTOpenTablePoolPtr      table_pool;
 
1245
        XTDatabaseHPtr          db = ot->ot_table->tab_db;
 
1246
        xtBool                          flush_table = TRUE;
 
1247
 
 
1248
        /* No open table returned to the pool should still
 
1249
         * have a cache handle!
 
1250
         */
 
1251
        ASSERT_NS(!ot->ot_ind_rhandle);
 
1252
        xt_lock_mutex_ns(&db->db_ot_pool.opt_lock);
 
1253
 
 
1254
        if (!(table_pool = db_get_open_table_pool(db, ot->ot_table->tab_id)))
 
1255
                goto failed;
 
1256
 
 
1257
        if (table_pool->opt_locked) {
 
1258
                /* Table will be closed below, because the table is
 
1259
                 * locked: */
 
1260
                if (table_pool->opt_total_open > 1)
 
1261
                        flush_table = FALSE;
 
1262
        }
 
1263
        else {
 
1264
                /* Put it on the free list: */
 
1265
                db->db_ot_pool.otp_total_free++;
 
1266
 
 
1267
                ot->ot_otp_next_free = table_pool->opt_free_list;
 
1268
                table_pool->opt_free_list = ot;
 
1269
 
 
1270
                /* This is the time the table was freed: */
 
1271
                ot->ot_otp_free_time = xt_db_approximate_time;
 
1272
 
 
1273
                /* Add to most recently used: */
 
1274
                if ((ot->ot_otp_lr_used = db->db_ot_pool.otp_mr_used))
 
1275
                        db->db_ot_pool.otp_mr_used->ot_otp_mr_used = ot;
 
1276
                ot->ot_otp_mr_used = NULL;
 
1277
                db->db_ot_pool.otp_mr_used = ot;
 
1278
                if (!db->db_ot_pool.otp_lr_used) {
 
1279
                        db->db_ot_pool.otp_lr_used = ot;
 
1280
                        db->db_ot_pool.otp_free_time = ot->ot_otp_free_time;
 
1281
                }
 
1282
 
 
1283
                ot = NULL;
 
1284
        }
 
1285
 
 
1286
        if (ot) {
 
1287
                xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
 
1288
                xt_close_table(ot, flush_table, FALSE);
 
1289
 
 
1290
                /* assume that table_pool cannot be invalidated in between as we have table_pool->opt_total_open > 0 */
 
1291
                xt_lock_mutex_ns(&db->db_ot_pool.opt_lock);
 
1292
                table_pool->opt_total_open--;
 
1293
        }
 
1294
 
 
1295
        db_free_open_table_pool(NULL, table_pool);
 
1296
 
 
1297
        if (!xt_broadcast_cond_ns(&db->db_ot_pool.opt_cond))
 
1298
                goto failed;
 
1299
        xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
 
1300
        
 
1301
        return;
 
1302
 
 
1303
        failed:
 
1304
        xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
 
1305
        if (ot)
 
1306
                xt_close_table(ot, TRUE, FALSE);
 
1307
        xt_log_and_clear_exception_ns();
 
1308
}
 
1309
 
 
1310
//#define TEST_FREE_OPEN_TABLES
 
1311
 
 
1312
#ifdef DEBUG
 
1313
#undef XT_OPEN_TABLE_FREE_TIME
 
1314
#define XT_OPEN_TABLE_FREE_TIME                 5
 
1315
#endif
 
1316
 
 
1317
xtPublic void xt_db_free_unused_open_tables(XTThreadPtr self, XTDatabaseHPtr db)
 
1318
{
 
1319
        XTOpenTablePoolPtr      table_pool;
 
1320
        size_t                          count;
 
1321
        XTOpenTablePtr          ot;
 
1322
        xtBool                          flush_table = TRUE;
 
1323
        u_int                           table_count;
 
1324
 
 
1325
        /* A quick check of the oldest free table: */
 
1326
        if (xt_db_approximate_time < db->db_ot_pool.otp_free_time + XT_OPEN_TABLE_FREE_TIME)
 
1327
                return;
 
1328
 
 
1329
        table_count = db->db_table_by_id ? xt_sl_get_size(db->db_table_by_id) : 0;
 
1330
        count = table_count * 3;
 
1331
        if (count < 20)
 
1332
                count = 20;
 
1333
#ifdef TEST_FREE_OPEN_TABLES
 
1334
        count = 10;
 
1335
#endif
 
1336
        if (db->db_ot_pool.otp_total_free > count) {
 
1337
                XTOpenTablePtr  ptr, pptr;
 
1338
 
 
1339
                count = table_count * 2;
 
1340
                if (count < 10)
 
1341
                        count = 10;
 
1342
#ifdef TEST_FREE_OPEN_TABLES
 
1343
                count = 5;
 
1344
#endif
 
1345
                xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
 
1346
                pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
 
1347
 
 
1348
                while (db->db_ot_pool.otp_total_free > count) {
 
1349
                        ASSERT_NS(db->db_ot_pool.otp_lr_used);
 
1350
                        if (!(ot = db->db_ot_pool.otp_lr_used))
 
1351
                                break;
 
1352
 
 
1353
                        /* Check how long the open table has been free: */
 
1354
                        if (xt_db_approximate_time < ot->ot_otp_free_time + XT_OPEN_TABLE_FREE_TIME)
 
1355
                                break;
 
1356
 
 
1357
                        ot->ot_thread = self;
 
1358
 
 
1359
                        /* Remove from MRU list: */
 
1360
                        db->db_ot_pool.otp_lr_used = ot->ot_otp_mr_used;
 
1361
                        if (db->db_ot_pool.otp_mr_used == ot)
 
1362
                                db->db_ot_pool.otp_mr_used = ot->ot_otp_lr_used;
 
1363
                        if (ot->ot_otp_lr_used)
 
1364
                                ot->ot_otp_lr_used->ot_otp_mr_used = ot->ot_otp_mr_used;
 
1365
                        if (ot->ot_otp_mr_used)
 
1366
                                ot->ot_otp_mr_used->ot_otp_lr_used = ot->ot_otp_lr_used;
 
1367
 
 
1368
                        if (db->db_ot_pool.otp_lr_used)
 
1369
                                db->db_ot_pool.otp_free_time = db->db_ot_pool.otp_lr_used->ot_otp_free_time;
 
1370
 
 
1371
                        ASSERT(db->db_ot_pool.otp_total_free > 0);
 
1372
                        db->db_ot_pool.otp_total_free--;
 
1373
 
 
1374
                        if (!(table_pool = db_get_open_table_pool(db, ot->ot_table->tab_id)))
 
1375
                                xt_throw(self);
 
1376
 
 
1377
                        /* Find the open table in the table pool,
 
1378
                         * and remove it from the list:
 
1379
                         */
 
1380
                        pptr = NULL;
 
1381
                        ptr = table_pool->opt_free_list;
 
1382
                        while (ptr) {
 
1383
                                if (ptr == ot)
 
1384
                                        break;
 
1385
                                pptr = ptr;
 
1386
                                ptr = ptr->ot_otp_next_free;
 
1387
                        }
 
1388
 
 
1389
                        ASSERT_NS(ptr == ot);
 
1390
                        if (ptr == ot) {
 
1391
                                if (pptr)
 
1392
                                        pptr->ot_otp_next_free = ot->ot_otp_next_free;
 
1393
                                else
 
1394
                                        table_pool->opt_free_list = ot->ot_otp_next_free;
 
1395
                        }
 
1396
 
 
1397
                        ASSERT_NS(table_pool->opt_total_open > 0);
 
1398
                        table_pool->opt_total_open--;
 
1399
                        if (table_pool->opt_total_open > 0)
 
1400
                                flush_table = FALSE;
 
1401
                        else
 
1402
                                flush_table = TRUE;
 
1403
 
 
1404
                        db_free_open_table_pool(self, table_pool);
 
1405
 
 
1406
                        freer_();
 
1407
 
 
1408
                        /* Close the table, but not
 
1409
                         * while holding the lock.
 
1410
                         */
 
1411
                        xt_close_table(ot, flush_table, FALSE);
 
1412
 
 
1413
                        xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
 
1414
                        pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
 
1415
                }
 
1416
 
 
1417
                freer_();
 
1418
        }
 
1419
}
 
1420
 
 
1421
/*
 
1422
 * -----------------------------------------------------------------------
 
1423
 * THE THREAD POOL
 
1424
 */
 
1425
 
 
1426
static void db_notify_all(XTTask *tk)
 
1427
{
 
1428
        XTThreadPtr target;
 
1429
 
 
1430
        for (u_int i=0; ; i++) {
 
1431
                if (!(target = (XTThreadPtr) tk->tk_notify_threads.pl_get_pointer(i)))
 
1432
                        break;
 
1433
 
 
1434
                /* The task is on the thread's to-do list: */
 
1435
                xt_lock_thread(target);
 
1436
                if (target->st_tasks_todo.pl_remove_pointer(tk)) {
 
1437
                        /* Notify the thread that there are no more tasks to do... */
 
1438
                        if (target->st_tasks_todo.pl_is_empty())
 
1439
                                xt_signal_thread(target);
 
1440
                        /* Release tasks that were on the to-do list: */
 
1441
                        tk->tk_release();
 
1442
                }
 
1443
                xt_unlock_thread(target);
 
1444
        }
 
1445
        tk->tk_notify_threads.pl_clear();
 
1446
}
 
1447
 
 
1448
static void db_thread_pool_main(XTThreadPtr self)
 
1449
{
 
1450
        XTDatabaseHPtr  db = self->st_database;
 
1451
        XTTask                  *tk;
 
1452
        XTThreadPtr             target;
 
1453
        xtBool                  job_done = FALSE;
 
1454
 
 
1455
        for (;;) {
 
1456
                xt_lock_mutex(self, &db->db_pool_lock);
 
1457
                pushr_(xt_unlock_mutex, &db->db_pool_lock);
 
1458
 
 
1459
                if (job_done) {
 
1460
                        db->db_pool_job_count--;
 
1461
                        job_done = FALSE;
 
1462
                }
 
1463
                
 
1464
                if (self->t_quit) {
 
1465
                        freer_();
 
1466
                        break;
 
1467
                }
 
1468
 
 
1469
                while (!self->t_quit && !(tk = db->db_task_queue_front)) {
 
1470
                        /* Wait for 1/10 second (to ensure we quit on time): */
 
1471
                        xt_timed_wait_cond(self, &db->db_pool_cond, &db->db_pool_lock, 100);
 
1472
                }
 
1473
 
 
1474
                if (self->t_quit) {
 
1475
                        freer_();
 
1476
                        break;
 
1477
                }
 
1478
 
 
1479
                db->db_task_queue_front = tk->tk_task_list_next;
 
1480
                if (!db->db_task_queue_front)
 
1481
                        db->db_task_queue_back = NULL;
 
1482
 
 
1483
                freer_();
 
1484
 
 
1485
                /* Perform the task: */
 
1486
                job_done = TRUE;
 
1487
                if (!tk->tk_task(self)) {
 
1488
                        /* Transfer error to the task: */
 
1489
                        tk->tk_success = false;
 
1490
                        if ((tk->tk_exception = (XTExceptionPtr) xt_malloc_ns(sizeof(XTExceptionRec))))
 
1491
                                *tk->tk_exception = self->t_exception;
 
1492
                        else
 
1493
                                tk->tk_out_of_memory = true;
 
1494
                }
 
1495
                else
 
1496
                        tk->tk_success = true;
 
1497
 
 
1498
                tk->tk_lock();
 
1499
                tk->tk_running = FALSE;
 
1500
 
 
1501
                /* Notify any there were forgotten: */
 
1502
                db_notify_all(tk);
 
1503
 
 
1504
                /* The task is done: */
 
1505
                if (tk->tk_waiting_threads.pl_is_empty()) {
 
1506
                        /* No waiting tasks, log the error: */
 
1507
                        if (!tk->tk_success)
 
1508
                                xt_log_and_clear_exception(self);
 
1509
                }
 
1510
                else {
 
1511
                        for (u_int i=0; ; i++) {
 
1512
                                if (!(target = (XTThreadPtr) tk->tk_waiting_threads.pl_get_pointer(i)))
 
1513
                                        break;
 
1514
 
 
1515
                                /* The task is on the thread's to-do list: */
 
1516
                                xt_lock_thread(target);
 
1517
                                if (target->st_tasks_todo.pl_remove_pointer(tk)) {
 
1518
                                        /* Add to the done list: */
 
1519
                                        if (!target->st_tasks_done.pl_add_pointer(tk)) {
 
1520
                                                tk->tk_release();
 
1521
                                                xt_log_and_clear_exception(self);
 
1522
                                        }
 
1523
 
 
1524
                                        /* Notify the thread that there are no more tasks to do... */
 
1525
                                        if (target->st_tasks_todo.pl_is_empty())
 
1526
                                                xt_signal_thread(target);
 
1527
                                }
 
1528
                                xt_unlock_thread(target);
 
1529
                        }
 
1530
                        tk->tk_waiting_threads.pl_clear();
 
1531
                }
 
1532
 
 
1533
                tk->tk_unlock();
 
1534
                /* We assume the reference is required to take the lock!
 
1535
                 */
 
1536
                tk->tk_release();
 
1537
        }
 
1538
}
 
1539
 
 
1540
typedef struct DBThreadData {
 
1541
        XTDatabaseHPtr  td_db;
 
1542
} DBThreadDataRec, *DBThreadDataPtr;
 
1543
 
 
1544
static void *db_thread_pool_run_thread(XTThreadPtr self)
 
1545
{
 
1546
        DBThreadDataPtr td = (DBThreadDataPtr) self->t_data;
 
1547
        XTDatabaseHPtr  db = td->td_db;
 
1548
        volatile int    i = 0;
 
1549
 
 
1550
        /* Note, the MySQL thread will be free when the this
 
1551
         * thread quits.
 
1552
         */
 
1553
        if (!myxt_create_thread())
 
1554
                xt_throw(self);
 
1555
 
 
1556
        while (!self->t_quit && i<10) {
 
1557
                try_(a) {
 
1558
                        /* Use the database: */
 
1559
                        xt_use_database(self, db, XT_FOR_POOL);
 
1560
 
 
1561
                        /* {BACKGROUND-RELEASE-DB} */
 
1562
                        xt_heap_release(self, self->st_database);
 
1563
 
 
1564
                        db_thread_pool_main(self);
 
1565
                }
 
1566
                catch_(a) {
 
1567
                        /* This error is "normal"! */
 
1568
                        if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
 
1569
                                !(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
 
1570
                                self->t_exception.e_sys_err == SIGTERM))
 
1571
                                xt_log_and_clear_exception(self);
 
1572
                }
 
1573
                cont_(a);
 
1574
 
 
1575
                /* Avoid releasing the database (done above) */
 
1576
                self->st_database = NULL;
 
1577
                xt_unuse_database(self, self);
 
1578
 
 
1579
                if (self->t_quit)
 
1580
                        break;
 
1581
 
 
1582
                /* Pause, in case the error repeats: */
 
1583
                xt_sleep_milli_second(10000);
 
1584
                i++;
 
1585
        }
 
1586
 
 
1587
        return NULL;
 
1588
}
 
1589
 
 
1590
static void db_free_pool_thread(XTThreadPtr self, void *data)
 
1591
{
 
1592
        DBThreadDataPtr         td = (DBThreadDataPtr) data;
 
1593
        XTDatabaseHPtr          db = td->td_db;
 
1594
        XTThreadPtr                     thread, pthread;
 
1595
 
 
1596
        xt_free(self, data);
 
1597
 
 
1598
        xt_lock_mutex(self, &db->db_pool_lock);
 
1599
        pushr_(xt_unlock_mutex, &db->db_pool_lock);
 
1600
 
 
1601
        /* Remove the thread from the pool: */
 
1602
        pthread = NULL;
 
1603
        thread = db->db_thread_pool;
 
1604
        while (thread != self) {
 
1605
                pthread = thread;
 
1606
                thread = thread->st_pool_next;
 
1607
        }
 
1608
        if (thread == self) {
 
1609
                db->db_pool_thread_count--;
 
1610
                if (pthread)
 
1611
                        pthread->st_pool_next = thread->st_pool_next;
 
1612
                else
 
1613
                        db->db_thread_pool = thread->st_pool_next;
 
1614
        }
 
1615
 
 
1616
        freer_(); // xt_unlock_mutex(&db->db_pool_lock)
 
1617
}
 
1618
 
 
1619
static XTThreadPtr db_create_pool_thread(XTDatabaseHPtr db)
 
1620
{
 
1621
        DBThreadDataPtr td;
 
1622
        char                    name[PATH_MAX];
 
1623
        XTThreadPtr             thread;
 
1624
 
 
1625
 
 
1626
        /* Note, this is just a test to see if we can create a MySQL thread.
 
1627
         * On shutdown, this is sometimes not possible!
 
1628
         */
 
1629
        if (!myxt_create_thread_possible())
 
1630
                return NULL;
 
1631
 
 
1632
        if (!(td = (DBThreadDataPtr) xt_malloc_ns(sizeof(DBThreadDataRec))))
 
1633
                return NULL;
 
1634
        td->td_db = db;
 
1635
 
 
1636
        db->db_pool_thread_count++;
 
1637
        sprintf(name, "I/O-%s", xt_last_directory_of_path(db->db_main_path));
 
1638
        xt_remove_dir_char(name);
 
1639
        xt_strcat(PATH_MAX, name, "-");
 
1640
        xt_strcati(PATH_MAX, name, db->db_pool_thread_count);
 
1641
        if (!(thread = xt_create_daemon_ns(name)))
 
1642
                goto failed;
 
1643
 
 
1644
        thread->st_pool_next = db->db_thread_pool;
 
1645
        db->db_thread_pool = thread;
 
1646
 
 
1647
        xt_set_thread_data(thread, td, db_free_pool_thread);
 
1648
 
 
1649
        return thread;
 
1650
 
 
1651
        failed:
 
1652
        xt_free_ns(td);
 
1653
        return NULL;
 
1654
}
 
1655
 
 
1656
/*
 
1657
 * Create a pool thread.
 
1658
 *
 
1659
 * This ensures that there is at least one pool thread.
 
1660
 *
 
1661
 * The advantage is that if, for some reason, no more threads
 
1662
 * can be created, then we have one that can do all the work.
 
1663
 */
 
1664
static void db_create_pool_thread(XTThreadPtr self, XTDatabaseHPtr db)
 
1665
{
 
1666
        XTThreadPtr     pool_thread = NULL;
 
1667
 
 
1668
        if (!(pool_thread = db_create_pool_thread(db)))
 
1669
                xt_throw(self);
 
1670
 
 
1671
        xt_run_thread(self, pool_thread, db_thread_pool_run_thread);
 
1672
}
 
1673
 
 
1674
xtPublic void xt_db_thread_pool_init(XTThreadPtr self, XTDatabaseHPtr db)
 
1675
{
 
1676
        xt_init_mutex_with_autoname(self, &db->db_pool_lock);
 
1677
        xt_init_cond(self, &db->db_pool_cond);
 
1678
        db_create_pool_thread(self, db);
 
1679
}
 
1680
 
 
1681
xtPublic void xt_db_thread_pool_exit(XTThreadPtr self, XTDatabaseHPtr db)
 
1682
{
 
1683
        xt_db_stop_pool_threads(self, db);
 
1684
        xt_free_mutex(&db->db_pool_lock);
 
1685
        xt_free_cond(&db->db_pool_cond);
 
1686
}
 
1687
 
 
1688
xtPublic void xt_db_stop_pool_threads(XTThreadPtr self, XTDatabaseHPtr db)
 
1689
{
 
1690
        XTThreadPtr     thread;
 
1691
        xtThreadID      tid;
 
1692
 
 
1693
        if (db->db_thread_pool) {
 
1694
                xt_lock_mutex(self, &db->db_pool_lock);
 
1695
                pushr_(xt_unlock_mutex, &db->db_pool_lock);
 
1696
 
 
1697
                while ((thread = db->db_thread_pool)) {
 
1698
                        tid = thread->t_id;
 
1699
 
 
1700
                        xt_terminate_thread(self, thread);
 
1701
                        xt_broadcast_cond(self, &db->db_pool_cond);
 
1702
 
 
1703
                        freer_(); // xt_unlock_mutex(&db->db_pool_lock)
 
1704
 
 
1705
                        xt_wait_for_thread_to_exit(tid, FALSE);
 
1706
 
 
1707
                        xt_lock_mutex(self, &db->db_pool_lock);
 
1708
                        pushr_(xt_unlock_mutex, &db->db_pool_lock);
 
1709
                }
 
1710
 
 
1711
                freer_(); // xt_unlock_mutex(&db->db_pool_lock)
 
1712
        }
 
1713
}
 
1714
 
 
1715
/*
 
1716
 * notify_complete means the thread will wait for the task to complete, and process the result.
 
1717
 * notify_early means the thread will wait for a notification from the task itself.
 
1718
 *
 
1719
 * Note. we assume the caller has a reference to the task. The caller is also responsible for
 
1720
 * freeing the reference!
 
1721
 *
 
1722
 * This function will take extra references as required!
 
1723
 */
 
1724
xtPublic xtBool xt_run_async_task(XTTask *tk, xtBool notify_complete, xtBool notify_early, XTThreadPtr thread, XTDatabaseHPtr db)
 
1725
{
 
1726
        tk->tk_lock();
 
1727
        if (notify_complete || notify_early) {
 
1728
                xt_lock_thread(thread);
 
1729
                /* Count the reference is the to-do list of the thread. */
 
1730
                tk->tk_reference();
 
1731
                if (!thread->st_tasks_todo.pl_add_pointer(tk)) {
 
1732
                        tk->tk_release();
 
1733
                        xt_unlock_thread(thread);
 
1734
                        goto failed_0;
 
1735
                }
 
1736
                xt_unlock_thread(thread);
 
1737
                
 
1738
                if (notify_complete) {
 
1739
                        if (!tk->tk_waiting_threads.pl_add_pointer(thread))
 
1740
                                goto failed_1;
 
1741
                }
 
1742
                else {
 
1743
                        if (!tk->tk_notify_threads.pl_add_pointer(thread))
 
1744
                                goto failed_1;
 
1745
                }
 
1746
        }
 
1747
 
 
1748
        if (!tk->tk_running) {
 
1749
                XTThreadPtr     pool_thread = NULL;
 
1750
 
 
1751
                /* Note, the caller should already have a reference,
 
1752
                 * otherwise it would not be safe to access the task.
 
1753
                 *
 
1754
                 * When running, we add one more reference. The reference
 
1755
                 * is owned by the running thread.
 
1756
                 *
 
1757
                 * The reference will be released, when the task exection
 
1758
                 * is complete.
 
1759
                 */
 
1760
                tk->tk_reference();
 
1761
                tk->tk_running = TRUE;
 
1762
                xt_lock_mutex_ns(&db->db_pool_lock);
 
1763
 
 
1764
                /* Check if we need to start a new thread: */
 
1765
                db->db_pool_job_count++;
 
1766
                if (db->db_pool_job_count > db->db_pool_thread_count && db->db_pool_thread_count < XT_ASYNC_THREAD_COUNT) {
 
1767
                        if (!(pool_thread = db_create_pool_thread(db))) {
 
1768
                                if (thread->t_exception.e_xt_err == XT_ERR_MYSQL_NO_THREAD ||
 
1769
                                        thread->t_exception.e_xt_err == XT_ERR_MYSQL_ERROR) {
 
1770
                                        /* We can ignore this error if error:
 
1771
                                         * XT_ERR_MYSQL_NO_THREAD can occur on shutdown. If we already have
 
1772
                                         * pool threads, then this is no problem!
 
1773
                                         */
 
1774
                                        if (db->db_pool_thread_count > 0)
 
1775
                                                goto ignore_create_thread_error;
 
1776
                                }
 
1777
                                xt_unlock_mutex_ns(&db->db_pool_lock);
 
1778
                                goto failed_2;
 
1779
                        }
 
1780
                }
 
1781
 
 
1782
                ignore_create_thread_error:
 
1783
                if (db->db_task_queue_back)
 
1784
                        db->db_task_queue_back->tk_task_list_next = tk;
 
1785
                else
 
1786
                        db->db_task_queue_front = tk;
 
1787
                tk->tk_task_list_next = NULL;
 
1788
                db->db_task_queue_back = tk;
 
1789
                xt_signal_cond(NULL, &db->db_pool_cond);
 
1790
 
 
1791
                xt_unlock_mutex_ns(&db->db_pool_lock);
 
1792
 
 
1793
                if (pool_thread) {
 
1794
                        if (!xt_run_thread_ns(pool_thread, db_thread_pool_run_thread))
 
1795
                                goto failed_2;
 
1796
                }
 
1797
        }
 
1798
 
 
1799
        tk->tk_unlock();
 
1800
        
 
1801
        return OK;
 
1802
 
 
1803
        failed_2:
 
1804
        if (notify_complete)
 
1805
                tk->tk_waiting_threads.pl_remove_pointer(thread);
 
1806
        else
 
1807
                tk->tk_notify_threads.pl_remove_pointer(thread);
 
1808
 
 
1809
        failed_1:
 
1810
        xt_lock_thread(thread);
 
1811
        if (thread->st_tasks_todo.pl_remove_pointer(tk))
 
1812
                tk->tk_release();
 
1813
        xt_unlock_thread(thread);
 
1814
 
 
1815
        failed_0:
 
1816
        tk->tk_unlock();
 
1817
        tk->tk_release();
 
1818
        return FAILED;
 
1819
}
 
1820
 
 
1821
xtPublic void xt_wait_for_async_tasks(XTThreadPtr thread)
 
1822
{
 
1823
        if (!thread->st_tasks_todo.pl_is_empty()) {
 
1824
                xt_lock_thread(thread);
 
1825
                while (!thread->st_tasks_todo.pl_is_empty()) {
 
1826
                        xt_timed_wait_thread(thread, 100);
 
1827
#ifdef DEBUG
 
1828
                        XTTask *tk;
 
1829
 
 
1830
                        for (int i=0; i++; ) {
 
1831
                                if (!(tk = (XTTask *) thread->st_tasks_todo.pl_get_pointer(i)))
 
1832
                                        break;
 
1833
                                ASSERT_NS(tk->tk_running);
 
1834
                        }
 
1835
#endif
 
1836
                }
 
1837
                xt_unlock_thread(thread);
 
1838
        }
 
1839
}
 
1840
 
 
1841
xtPublic xtBool xt_wait_for_async_task_results(XTThreadPtr thread)
 
1842
{
 
1843
        XTTask *tk;
 
1844
        xtBool ok = TRUE;
 
1845
 
 
1846
        /* Wait for the task to finish: */
 
1847
        xt_wait_for_async_tasks(thread);
 
1848
 
 
1849
        /* Collect the results: */
 
1850
        while ((tk = xt_get_task_result(thread))) {
 
1851
                if (!tk->tk_success) {
 
1852
                        XTThreadPtr self = xt_get_self();
 
1853
 
 
1854
                        if (ok) {
 
1855
                                /* Transfer the first error to this thread... */
 
1856
                                if (tk->tk_exception)
 
1857
                                        self->t_exception = *tk->tk_exception;
 
1858
                                else
 
1859
                                        xt_register_errno(XT_REG_CONTEXT, ENOMEM);
 
1860
                                ok = FALSE;
 
1861
                        }
 
1862
                        else {
 
1863
                                /* Log all other errors: */
 
1864
                                if (tk->tk_exception)
 
1865
                                        xt_log_exception(self, tk->tk_exception, XT_LOG_ERROR);
 
1866
                        }
 
1867
                }
 
1868
 
 
1869
                tk->tk_release();
 
1870
        }
 
1871
        
 
1872
        return ok;
 
1873
}
 
1874
 
 
1875
/* After waiting, call this function to collect the results.
 
1876
 * NOTE: Returns a references task!
 
1877
 */
 
1878
xtPublic XTTask *xt_get_task_result(XTThreadPtr thread)
 
1879
{
 
1880
        XTTask *tk;
 
1881
 
 
1882
        xt_lock_thread(thread);
 
1883
        if ((tk = (XTTask *) thread->st_tasks_done.pl_get_pointer(thread->st_tasks_done.pl_size() - 1)))
 
1884
                thread->st_tasks_done.pl_remove_pointer(tk);
 
1885
        xt_unlock_thread(thread);
 
1886
        return tk;
 
1887
}
 
1888
 
 
1889
/*
 
1890
 * This function will wake up a waiting thread so that the 
 
1891
 * task continues without a waiting thread.
 
1892
 */
 
1893
xtPublic void xt_async_task_notify(XTTask *tk)
 
1894
{
 
1895
        if (!tk->tk_notify_threads.pl_is_empty()) {
 
1896
                tk->tk_lock();
 
1897
                db_notify_all(tk);
 
1898
                tk->tk_unlock();
 
1899
        }
 
1900
}
 
1901
 
 
1902
class XTTestTask : public XTTask {
 
1903
        public:
 
1904
        XTTestTask() : XTTask(),
 
1905
                tt_sleep_time(3),
 
1906
                tt_ref_count(0)
 
1907
        { }
 
1908
 
 
1909
        virtual void    tk_delete() { delete this; }
 
1910
        virtual xtBool  tk_task(XTThreadPtr thread);
 
1911
 
 
1912
        int                             tt_sleep_time;
 
1913
        int                             tt_ref_count;
 
1914
 
 
1915
        virtual void    tk_reference();
 
1916
        virtual void    tk_release();
 
1917
};
 
1918
 
 
1919
xtBool XTTestTask::tk_task(XTThreadPtr XT_UNUSED(thread))
 
1920
{
 
1921
        sleep(tt_sleep_time);
 
1922
        return OK;
 
1923
}
 
1924
 
 
1925
static void db_multi_async_test(XTThreadPtr self, int count)
 
1926
{
 
1927
        XTTestTask *tt;
 
1928
 
 
1929
        if (!self->st_database) {
 
1930
                xt_logf(XT_NT_WARNING, "Open database required to run this test\n");
 
1931
                return;
 
1932
        }
 
1933
 
 
1934
        for (int i=0; i<count; i++) {
 
1935
                if (!(tt = new XTTestTask()))
 
1936
                        xt_throw_errno(XT_CONTEXT, ENOMEM);
 
1937
 
 
1938
                tt->tt_sleep_time = count;
 
1939
 
 
1940
                /* Run the task: */
 
1941
                tt->tk_reference();
 
1942
                if (!xt_run_async_task(tt, TRUE, FALSE, self, self->st_database)) {
 
1943
                        tt->tk_release();
 
1944
                        xt_throw(self);
 
1945
                }
 
1946
                tt->tk_release();
 
1947
        }
 
1948
 
 
1949
        /* Wait for the task to finish: */
 
1950
        xt_wait_for_async_tasks(self);
 
1951
 
 
1952
        /* Collect the results: */
 
1953
        while ((tt = (XTTestTask *) xt_get_task_result(self)))
 
1954
                tt->tk_release();
 
1955
}
 
1956
 
 
1957
void XTTestTask::tk_reference()
 
1958
{
 
1959
        tt_ref_count++;
 
1960
}
 
1961
 
 
1962
void XTTestTask::tk_release()
 
1963
{
 
1964
        tt_ref_count--;
 
1965
        if (!tt_ref_count)
 
1966
                delete this;
 
1967
}
 
1968
 
 
1969
xtPublic void xt_unit_test_async_task(XTThreadPtr self)
 
1970
{
 
1971
        db_multi_async_test(self, 1);
 
1972
        //db_multi_async_test(self, 10, true);
 
1973
        //db_multi_async_test(self, 5);
 
1974
}
 
1975