1
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
5
COPYING CONDITIONS NOTICE:
7
This program is free software; you can redistribute it and/or modify
8
it under the terms of version 2 of the GNU General Public License as
9
published by the Free Software Foundation, and provided that the
10
following conditions are met:
12
* Redistributions of source code must retain this COPYING
13
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
14
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
15
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
18
* Redistributions in binary form must reproduce this COPYING
19
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
20
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
21
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
22
GRANT (below) in the documentation and/or other materials
23
provided with the distribution.
25
You should have received a copy of the GNU General Public License
26
along with this program; if not, write to the Free Software
27
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
32
TokuDB, Tokutek Fractal Tree Indexing Library.
33
Copyright (C) 2007-2013 Tokutek, Inc.
37
This program is distributed in the hope that it will be useful, but
38
WITHOUT ANY WARRANTY; without even the implied warranty of
39
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
40
General Public License for more details.
42
UNIVERSITY PATENT NOTICE:
44
The technology is licensed by the Massachusetts Institute of
45
Technology, Rutgers State University of New Jersey, and the Research
46
Foundation of State University of New York at Stony Brook under
47
United States of America Serial No. 11/760379 and to the patents
48
and/or patent applications resulting from it.
50
PATENT MARKING NOTICE:
52
This software is covered by US Patent No. 8,185,551.
53
This software is covered by US Patent No. 8,489,638.
57
"THIS IMPLEMENTATION" means the copyrightable works distributed by
58
Tokutek as part of the Fractal Tree project.
60
"PATENT CLAIMS" means the claims of patents that are owned or
61
licensable by Tokutek, both currently or in the future; and that in
62
the absence of this license would be infringed by THIS
63
IMPLEMENTATION or by using or running THIS IMPLEMENTATION.
65
"PATENT CHALLENGE" shall mean a challenge to the validity,
66
patentability, enforceability and/or non-infringement of any of the
67
PATENT CLAIMS or otherwise opposing any of the PATENT CLAIMS.
69
Tokutek hereby grants to you, for the term and geographical scope of
70
the PATENT CLAIMS, a non-exclusive, no-charge, royalty-free,
71
irrevocable (except as stated in this section) patent license to
72
make, have made, use, offer to sell, sell, import, transfer, and
73
otherwise run, modify, and propagate the contents of THIS
74
IMPLEMENTATION, where such license applies only to the PATENT
75
CLAIMS. This grant does not include claims that would be infringed
76
only as a consequence of further modifications of THIS
77
IMPLEMENTATION. If you or your agent or licensee institute or order
78
or agree to the institution of patent litigation against any entity
79
(including a cross-claim or counterclaim in a lawsuit) alleging that
80
THIS IMPLEMENTATION constitutes direct or contributory patent
81
infringement, or inducement of patent infringement, then any rights
82
granted to you under this License shall terminate as of the date
83
such litigation is filed. If you or your agent or exclusive
84
licensee institute or order or agree to the institution of a PATENT
85
CHALLENGE, then Tokutek may terminate any rights granted to you
89
#ident "Copyright (c) 2010-2013 Tokutek Inc. All rights reserved."
90
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
92
// test the loader write dbfile function
94
#define DONT_DEPRECATE_WRITES
95
#define DONT_DEPRECATE_MALLOC
98
#include "ftloader-internal.h"
99
#include <portability/toku_path.h>
101
static int event_count, event_count_trigger;
103
static void my_assert_hook (void) {
104
fprintf(stderr, "event_count=%d\n", event_count);
107
static void reset_event_counts(void) {
108
event_count = event_count_trigger = 0;
111
static void event_hit(void) {
114
static int loader_poll_callback(void *UU(extra), float UU(progress)) {
117
if (event_count_trigger == event_count) {
119
if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
127
static size_t bad_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stream) {
130
if (event_count_trigger == event_count) {
132
if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
136
r = fwrite(ptr, size, nmemb, stream);
138
errno = ferror(stream);
144
static ssize_t bad_write(int fd, const void * bp, size_t len) {
147
if (event_count_trigger == event_count) {
149
if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
153
r = write(fd, bp, len);
158
static ssize_t bad_pwrite(int fd, const void * bp, size_t len, toku_off_t off) {
161
if (event_count_trigger == event_count) {
163
if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
167
r = pwrite(fd, bp, len, off);
173
bad_fdopen(int fd, const char * mode) {
176
if (event_count_trigger == event_count) {
178
if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
182
rval = fdopen(fd, mode);
188
bad_fopen(const char *filename, const char *mode) {
191
if (event_count_trigger == event_count) {
193
if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
197
rval = fopen(filename, mode);
204
bad_open(const char *path, int oflag, int mode) {
207
if (event_count_trigger == event_count) {
209
if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
213
rval = open(path, oflag, mode);
221
bad_fclose(FILE * stream) {
224
// Must close the stream even in the "error case" because otherwise there is no way to get the memory back.
225
rval = fclose(stream);
227
if (event_count_trigger == event_count) {
228
if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
236
int bad_read_errno = 0;
239
bad_read(int fd, void *buf, size_t count) {
242
if (event_count_trigger == event_count) {
244
if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
245
errno = bad_read_errno;
248
rval = read(fd, buf, count);
252
static int my_malloc_event = 1;
253
static int my_malloc_count = 0, my_big_malloc_count = 0;
254
static int my_realloc_count = 0, my_big_realloc_count = 0;
256
static void reset_my_malloc_counts(void) {
257
my_malloc_count = my_big_malloc_count = 0;
258
my_realloc_count = my_big_realloc_count = 0;
261
size_t min_malloc_error_size = 0;
263
static void *my_malloc(size_t n) {
265
if (n >= min_malloc_error_size) {
266
my_big_malloc_count++;
267
if (my_malloc_event) {
269
if (event_count == event_count_trigger) {
271
if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
280
static int do_realloc_errors = 1;
282
static void *my_realloc(void *p, size_t n) {
284
if (n >= min_malloc_error_size) {
285
my_big_realloc_count++;
286
if (do_realloc_errors) {
288
if (event_count == event_count_trigger) {
290
if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
296
return os_realloc(p, n);
300
static int qsort_compare_ints (const void *a, const void *b) {
301
int avalue = *(int*)a;
302
int bvalue = *(int*)b;
303
if (avalue<bvalue) return -1;
304
if (avalue>bvalue) return +1;
309
static int compare_ints (DB* UU(desc), const DBT *akey, const DBT *bkey) {
310
assert(akey->size==sizeof(int));
311
assert(bkey->size==sizeof(int));
312
return qsort_compare_ints(akey->data, bkey->data);
315
static char *errorstr_static (int err) {
316
static char errorstr[100];
317
toku_ft_strerror_r(err, errorstr, sizeof(errorstr));
322
static void err_cb(DB *db UU(), int dbn, int err, DBT *key UU(), DBT *val UU(), void *extra UU()) {
323
fprintf(stderr, "error in test dbn=%d err=%d (%s)\n", dbn, err, errorstr_static(err));
327
enum { N_SOURCES = 2, N_DEST_DBS=1 };
331
static char *make_fname(const char *directory, const char *fname, int idx) {
332
int len = strlen(directory)+strlen(fname)+20;
333
char *XMALLOC_N(len, result);
334
int r = snprintf(result, len, "%s/%s%d", directory, fname, idx);
336
return result; // don't care that it's a little too long.
340
struct consumer_thunk {
345
static void *consumer_thread (void *ctv) {
346
struct consumer_thunk *cthunk = (struct consumer_thunk *)ctv;
349
int r = queue_deq(cthunk->q, &item, NULL, NULL);
350
if (r==EOF) return NULL;
352
struct rowset *rowset = (struct rowset *)item;
353
cthunk->n_read += rowset->n_rows;
354
destroy_rowset(rowset);
360
static void test (const char *directory, bool is_error) {
362
int *XMALLOC_N(N_SOURCES, fds);
364
char **XMALLOC_N(N_SOURCES, fnames);
365
int *XMALLOC_N(N_SOURCES, n_records_in_fd);
366
for (int i=0; i<N_SOURCES; i++) {
367
fnames[i] = make_fname(directory, "temp", i);
368
fds[i] = open(fnames[i], O_CREAT|O_RDWR, S_IRWXU);
370
n_records_in_fd[i] = 0;
372
for (int i=0; i<N_RECORDS; i++) {
374
int fdi = random()%N_SOURCES;
376
{ int r = write(fd, &size, 4); assert(r==4); }
377
{ int r = write(fd, &i, 4); assert(r==4); }
378
{ int r = write(fd, &size, 4); assert(r==4); }
379
{ int r = write(fd, &i, 4); assert(r==4); }
380
n_records_in_fd[fdi]++;
382
for (int i=0; i<N_SOURCES; i++) {
383
toku_off_t r = lseek(fds[i], 0, SEEK_SET);
388
FT_HANDLE *XCALLOC_N(N_DEST_DBS, brts);
389
DB* *XCALLOC_N(N_DEST_DBS, dbs);
390
const char **XMALLOC_N(N_DEST_DBS, new_fnames_in_env);
391
for (int i=0; i<N_DEST_DBS; i++) {
393
snprintf(s, sizeof(s), "db%d.db", i);
394
new_fnames_in_env[i] = toku_strdup(s);
395
assert(new_fnames_in_env[i]);
397
ft_compare_func *XMALLOC_N(N_DEST_DBS, bt_compare_functions);
398
bt_compare_functions[0] = compare_ints;
400
enum {CACHETABLE_SIZE = 64*1024};
402
toku_cachetable_create(&ct, CACHETABLE_SIZE, (LSN){1}, NULL);
406
int r = toku_ft_loader_internal_init (&bl,
408
(generate_row_for_put_func)NULL,
410
N_DEST_DBS, brts, dbs,
412
bt_compare_functions,
415
TXNID_NONE, true, 0, false);
419
ft_loader_init_error_callback(&bl->error_callback);
420
ft_loader_set_error_function(&bl->error_callback, err_cb, NULL);
421
ft_loader_init_poll_callback(&bl->poll_callback);
422
ft_loader_set_poll_function(&bl->poll_callback, loader_poll_callback, NULL);
423
ft_loader_set_fractal_workers_count_from_c(bl);
426
{ int r = queue_create(&q, 1000); assert(r==0); }
428
const int MERGE_BUF_SIZE = 100000; // bigger than 64K so that we will trigger malloc issues.
429
{ int r = create_dbufio_fileset(&bfs, N_SOURCES, fds, MERGE_BUF_SIZE, false); assert(r==0); }
430
FIDX *XMALLOC_N(N_SOURCES, src_fidxs);
431
assert(bl->file_infos.n_files==0);
432
bl->file_infos.n_files = N_SOURCES;
433
bl->file_infos.n_files_limit = N_SOURCES;
434
bl->file_infos.n_files_open = 0;
435
bl->file_infos.n_files_extant = 0;
436
XREALLOC_N(bl->file_infos.n_files_limit, bl->file_infos.file_infos);
437
for (int i=0; i<N_SOURCES; i++) {
438
// all we really need is the number of records in the file. The rest of the file_info is unused by the dbufio code.n
439
bl->file_infos.file_infos[i].n_rows = n_records_in_fd[i];
440
// However we need these for the destroy method to work right.
441
bl->file_infos.file_infos[i].is_extant = false;
442
bl->file_infos.file_infos[i].is_open = false;
443
bl->file_infos.file_infos[i].buffer = NULL;
444
src_fidxs[i].idx = i;
446
toku_pthread_t consumer;
447
struct consumer_thunk cthunk = {q, 0};
449
int r = toku_pthread_create(&consumer, NULL, consumer_thread, (void*)&cthunk);
453
toku_set_func_malloc_only(my_malloc);
454
toku_set_func_realloc_only(my_realloc);
455
ft_loader_set_os_fwrite(bad_fwrite);
456
toku_set_func_write(bad_write);
457
toku_set_func_pwrite(bad_pwrite);
458
toku_set_func_fdopen(bad_fdopen);
459
toku_set_func_fopen(bad_fopen);
460
toku_set_func_open(bad_open);
461
toku_set_func_fclose(bad_fclose);
462
if (bad_read_errno) toku_set_func_read(bad_read);
466
int r = toku_merge_some_files_using_dbufio(true, FIDX_NULL, q, N_SOURCES, bfs, src_fidxs, bl, 0, (DB*)NULL, compare_ints, 10000);
467
if (is_error && r!=0) {
470
if (r!=0) printf("%s:%d r=%d (%s)\n", __FILE__, __LINE__, r, errorstr_static(r));
474
panic_dbufio_fileset(bfs, r);
477
int r = queue_eof(q);
481
toku_set_func_malloc(NULL);
482
toku_set_func_realloc(NULL);
483
ft_loader_set_os_fwrite(NULL);
484
toku_set_func_write(NULL);
485
toku_set_func_pwrite(NULL);
486
toku_set_func_fdopen(NULL);
487
toku_set_func_fopen(NULL);
488
toku_set_func_open(NULL);
489
toku_set_func_fclose(NULL);
490
toku_set_func_read(NULL);
491
do_assert_hook = my_assert_hook;
495
int r = toku_pthread_join(consumer, &vresult);
497
assert(vresult==NULL);
498
//printf("n_read = %ld, N_SOURCES=%d N_RECORDS=%d\n", cthunk.n_read, N_SOURCES, N_RECORDS);
500
assert(cthunk.n_read == N_RECORDS);
503
//printf("%s:%d Destroying\n", __FILE__, __LINE__);
505
int r = queue_destroy(bl->primary_rowset_queue);
509
int r = queue_destroy(q);
512
toku_ft_loader_internal_destroy(bl, false);
514
toku_cachetable_close(&ct);
516
for (int i=0; i<N_DEST_DBS; i++) {
517
toku_free((void*)new_fnames_in_env[i]);
519
for (int i=0; i<N_SOURCES; i++) {
520
toku_free(fnames[i]);
522
destroy_dbufio_fileset(bfs);
527
toku_free(new_fnames_in_env);
528
toku_free(bt_compare_functions);
530
toku_free(src_fidxs);
531
toku_free(n_records_in_fd);
535
static int usage(const char *progname, int n) {
536
fprintf(stderr, "Usage:\n %s [-v] [-q] [-r %d] [-s] [-m] [-tend NEVENTS] directory\n", progname, n);
537
fprintf(stderr, "[-v] turn on verbose\n");
538
fprintf(stderr, "[-q] turn off verbose\n");
539
fprintf(stderr, "[-r %d] set the number of rows\n", n);
540
fprintf(stderr, "[-s] set the small loader size factor\n");
541
fprintf(stderr, "[-m] inject big malloc failures\n");
542
fprintf(stderr, "[-tend NEVENTS] stop testing after N events\n");
543
fprintf(stderr, "[-bad_read_errno ERRNO]\n");
547
int test_main (int argc, const char *argv[]) {
550
const char *progname=argv[0];
553
if (strcmp(argv[0],"-h")==0) {
554
return usage(progname, N_RECORDS);
555
} else if (strcmp(argv[0],"-v")==0) {
557
} else if (strcmp(argv[0],"-q")==0) {
559
} else if (strcmp(argv[0],"-r") == 0) {
561
N_RECORDS = atoi(argv[0]);
562
} else if (strcmp(argv[0],"-s") == 0) {
563
toku_ft_loader_set_size_factor(1);
564
} else if (strcmp(argv[0],"-m") == 0) {
566
} else if (strcmp(argv[0],"-tend") == 0 && argc > 1) {
568
tend = atoi(argv[0]);
569
} else if (strcmp(argv[0],"-tstart") == 0 && argc > 1) {
571
tstart = atoi(argv[0]);
572
} else if (strcmp(argv[0], "-bad_read_errno") == 0 && argc > 1) {
574
bad_read_errno = atoi(argv[0]);
575
} else if (argc!=1) {
576
return usage(progname, N_RECORDS);
583
const char* directory = TOKU_TEST_FILENAME;
584
char unlink_all[strlen(directory)+20];
585
snprintf(unlink_all, strlen(directory)+20, "rm -rf %s", directory);
587
int templen = strlen(directory)+15;
588
char tf_template[templen];
589
int tlen = snprintf(tf_template, templen, "%s/tempXXXXXX", directory);
590
assert (tlen>0 && tlen<templen);
592
char output_name[templen];
593
int olen = snprintf(output_name, templen, "%s/test.tokudb", directory);
594
assert (olen>0 && olen<templen);
598
r = system(unlink_all); CKERR(r);
599
r = toku_os_mkdir(directory, 0755); CKERR(r);
600
test(directory, false);
602
if (verbose) printf("my_malloc_count=%d big_count=%d\n", my_malloc_count, my_big_malloc_count);
605
int event_limit = event_count;
606
if (tend>=0 && tend<event_limit) event_limit=tend;
607
if (verbose) printf("event_limit=%d\n", event_limit);
609
for (int i = tstart+1; i <= event_limit; i++) {
610
reset_event_counts();
611
reset_my_malloc_counts();
612
event_count_trigger = i;
613
r = system(unlink_all); CKERR(r);
614
r = toku_os_mkdir(directory, 0755); CKERR(r);
615
if (verbose) printf("event=%d\n", i);
616
test(directory, true);
618
r = system(unlink_all); CKERR(r);