1
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
5
COPYING CONDITIONS NOTICE:
7
This program is free software; you can redistribute it and/or modify
8
it under the terms of version 2 of the GNU General Public License as
9
published by the Free Software Foundation, and provided that the
10
following conditions are met:
12
* Redistributions of source code must retain this COPYING
13
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
14
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
15
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
18
* Redistributions in binary form must reproduce this COPYING
19
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
20
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
21
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
22
GRANT (below) in the documentation and/or other materials
23
provided with the distribution.
25
You should have received a copy of the GNU General Public License
26
along with this program; if not, write to the Free Software
27
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
32
TokuDB, Tokutek Fractal Tree Indexing Library.
33
Copyright (C) 2007-2013 Tokutek, Inc.
37
This program is distributed in the hope that it will be useful, but
38
WITHOUT ANY WARRANTY; without even the implied warranty of
39
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
40
General Public License for more details.
42
UNIVERSITY PATENT NOTICE:
44
The technology is licensed by the Massachusetts Institute of
45
Technology, Rutgers State University of New Jersey, and the Research
46
Foundation of State University of New York at Stony Brook under
47
United States of America Serial No. 11/760379 and to the patents
48
and/or patent applications resulting from it.
50
PATENT MARKING NOTICE:
52
This software is covered by US Patent No. 8,185,551.
53
This software is covered by US Patent No. 8,489,638.
57
"THIS IMPLEMENTATION" means the copyrightable works distributed by
58
Tokutek as part of the Fractal Tree project.
60
"PATENT CLAIMS" means the claims of patents that are owned or
61
licensable by Tokutek, both currently or in the future; and that in
62
the absence of this license would be infringed by THIS
63
IMPLEMENTATION or by using or running THIS IMPLEMENTATION.
65
"PATENT CHALLENGE" shall mean a challenge to the validity,
66
patentability, enforceability and/or non-infringement of any of the
67
PATENT CLAIMS or otherwise opposing any of the PATENT CLAIMS.
69
Tokutek hereby grants to you, for the term and geographical scope of
70
the PATENT CLAIMS, a non-exclusive, no-charge, royalty-free,
71
irrevocable (except as stated in this section) patent license to
72
make, have made, use, offer to sell, sell, import, transfer, and
73
otherwise run, modify, and propagate the contents of THIS
74
IMPLEMENTATION, where such license applies only to the PATENT
75
CLAIMS. This grant does not include claims that would be infringed
76
only as a consequence of further modifications of THIS
77
IMPLEMENTATION. If you or your agent or licensee institute or order
78
or agree to the institution of patent litigation against any entity
79
(including a cross-claim or counterclaim in a lawsuit) alleging that
80
THIS IMPLEMENTATION constitutes direct or contributory patent
81
infringement, or inducement of patent infringement, then any rights
82
granted to you under this License shall terminate as of the date
83
such litigation is filed. If you or your agent or exclusive
84
licensee institute or order or agree to the institution of a PATENT
85
CHALLENGE, then Tokutek may terminate any rights granted to you
89
#ident "Copyright (c) 2010-2013 Tokutek Inc. All rights reserved."
90
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
93
#include <toku_assert.h>
97
#include "ftloader-internal.h"
99
#include <portability/toku_path.h>
102
static int qsort_compare_ints (const void *a, const void *b) {
103
int avalue = *(int*)a;
104
int bvalue = *(int*)b;
105
if (avalue<bvalue) return -1;
106
if (avalue>bvalue) return +1;
111
static int compare_ints (DB* UU(desc), const DBT *akey, const DBT *bkey) {
112
assert(akey->size==sizeof(int));
113
assert(bkey->size==sizeof(int));
114
return qsort_compare_ints(akey->data, bkey->data);
117
static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) {
118
fprintf(stderr, "error in test");
123
static void expect_dups_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) {
127
static void test_merge_internal (int a[], int na, int b[], int nb, bool dups) {
128
int *MALLOC_N(na+nb, ab); // the combined array a and b
129
for (int i=0; i<na; i++) {
132
for (int i=0; i<nb; i++) {
135
struct row *MALLOC_N(na, ar);
136
struct row *MALLOC_N(nb, br);
137
for (int i=0; i<na; i++) {
138
ar[i].off = i*sizeof(a[0]);
139
ar[i].klen = sizeof(a[i]);
142
for (int i=0; i<nb; i++) {
143
br[i].off = (na+i)*sizeof(b[0]);
144
br[i].klen = sizeof(b[i]);
147
struct row *MALLOC_N(na+nb, cr);
149
struct ft_loader_s bl;
150
ft_loader_init_error_callback(&bl.error_callback);
151
ft_loader_set_error_function(&bl.error_callback, dups ? expect_dups_cb : err_cb, NULL);
152
struct rowset rs = { .memory_budget = 0, .n_rows = 0, .n_rows_limit = 0, .rows = NULL, .n_bytes = 0, .n_bytes_limit = 0,
154
merge_row_arrays_base(cr, ar, na, br, nb, 0, dest_db, compare_ints, &bl, &rs);
155
ft_loader_call_error_function(&bl.error_callback);
162
for (int k=0; k<na+nb; k++) {
163
int voff = cr[k].off;
164
int vc = *(int*)(((char*)ab)+voff);
169
} else if (vc==b[j]) {
182
ft_loader_destroy_error_callback(&bl.error_callback);
185
/* Test the basic merger. */
186
static void test_merge (void) {
188
int avals[]={1,2,3,4,5};
189
int *bvals = NULL; //icc won't let us use a zero-sized array explicitly or by [] = {} construction.
190
test_merge_internal(avals, 5, bvals, 0, false);
191
test_merge_internal(bvals, 0, avals, 5, false);
194
int avals[]={1,3,5,7};
196
test_merge_internal(avals, 4, bvals, 2, false);
197
test_merge_internal(bvals, 2, avals, 4, false);
200
int avals[]={1,2,3,5,6,7};
201
int bvals[]={2,4,5,6,8};
202
test_merge_internal(avals, 6, bvals, 5, true);
203
test_merge_internal(bvals, 5, avals, 6, true);
207
static void test_internal_mergesort_row_array (int a[], int n) {
208
struct row *MALLOC_N(n, ar);
209
for (int i=0; i<n; i++) {
210
ar[i].off = i*sizeof(a[0]);
211
ar[i].klen = sizeof(a[i]);
214
struct rowset rs = { .memory_budget = 0, .n_rows = 0, .n_rows_limit = 0, .rows = NULL, .n_bytes = 0, .n_bytes_limit = 0,
216
ft_loader_mergesort_row_array (ar, n, 0, NULL, compare_ints, NULL, &rs);
217
int *MALLOC_N(n, tmp);
218
for (int i=0; i<n; i++) {
221
qsort(tmp, n, sizeof(a[0]), qsort_compare_ints);
222
for (int i=0; i<n; i++) {
223
int voff = ar[i].off;
224
int v = *(int*)(((char*)a)+voff);
231
static void test_mergesort_row_array (void) {
233
int avals[]={5,2,1,7};
234
for (int i=0; i<=4; i++)
235
test_internal_mergesort_row_array(avals, i);
237
const int MAX_LEN = 100;
238
enum { MAX_VAL = 1000 };
239
for (int i=0; i<MAX_LEN; i++) {
241
for (int j=0; j<MAX_VAL; j++) used[j]=false;
242
int len=1+random()%MAX_LEN;
244
for (int j=0; j<len; j++) {
247
v = random()%MAX_VAL;
252
test_internal_mergesort_row_array(avals, len);
256
static void test_read_write_rows (char *tf_template) {
257
struct ft_loader_s bl;
259
bl.temp_file_template = tf_template;
260
int r = ft_loader_init_file_infos(&bl.file_infos);
263
r = ft_loader_open_temp_file(&bl, &file);
268
const char *keystrings[] = {"abc", "b", "cefgh"};
269
const char *valstrings[] = {"defg", "", "xyz"};
270
uint64_t actual_size=0;
271
for (int i=0; i<3; i++) {
273
toku_fill_dbt(&key, keystrings[i], strlen(keystrings[i]));
275
toku_fill_dbt(&val, valstrings[i], strlen(valstrings[i]));
276
r = loader_write_row(&key, &val, file, toku_bl_fidx2file(&bl, file), &dataoff, nullptr, &bl);
278
actual_size+=key.size + val.size + 8;
280
if (actual_size != dataoff) fprintf(stderr, "actual_size=%" PRIu64 ", dataoff=%" PRIu64 "\n", actual_size, dataoff);
281
assert(actual_size == dataoff);
283
r = ft_loader_fi_close(&bl.file_infos, file, true);
286
r = ft_loader_fi_reopen(&bl.file_infos, file, "r");
294
while (0==loader_read_row(toku_bl_fidx2file(&bl, file), &key, &val)) {
295
assert(strlen(keystrings[n_read])==key.size);
296
assert(strlen(valstrings[n_read])==val.size);
297
assert(0==memcmp(keystrings[n_read], key.data, key.size));
298
assert(0==memcmp(valstrings[n_read], val.data, val.size));
299
assert(key.size<=key.ulen);
300
assert(val.size<=val.ulen);
307
r = ft_loader_fi_close(&bl.file_infos, file, true);
310
r = ft_loader_fi_unlink(&bl.file_infos, file);
313
assert(bl.file_infos.n_files_open==0);
314
assert(bl.file_infos.n_files_extant==0);
316
ft_loader_fi_destroy(&bl.file_infos, false);
319
static void fill_rowset (struct rowset *rows,
323
uint64_t *size_est) {
324
init_rowset(rows, toku_ft_loader_get_rowset_budget_for_testing());
325
for (int i=0; i<n; i++) {
327
toku_fill_dbt(&key, &keys[i], sizeof keys[i]);
329
toku_fill_dbt(&val, vals[i], strlen(vals[i]));
330
add_row(rows, &key, &val);
331
*size_est += ft_loader_leafentry_size(key.size, val.size, TXNID_NONE);
335
static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], const char *name) {
339
toku_cachetable_create(&ct, 0, ZERO_LSN, NULL_LOGGER);
341
TOKUTXN const null_txn = NULL;
343
toku_ft_handle_create(&t);
344
toku_ft_set_bt_compare(t, compare_ints);
345
r = toku_ft_handle_open(t, name, 0, 0, ct, null_txn); assert(r==0);
347
FT_CURSOR cursor = NULL;
348
r = toku_ft_cursor(t, &cursor, NULL, false, false); assert(r == 0);
352
for (i=0; i<n; i++) {
353
struct check_pair pair = {sizeof sorted_keys[i], &sorted_keys[i], (ITEMLEN) strlen(sorted_vals[i]), sorted_vals[i], 0};
354
r = toku_ft_cursor_get(cursor, NULL, lookup_checkf, &pair, DB_NEXT);
356
assert(pair.call_count ==0);
359
assert(pair.call_count==1);
360
userdata += pair.keylen + pair.vallen;
363
struct check_pair pair; memset(&pair, 0, sizeof pair);
364
r = toku_ft_cursor_get(cursor, NULL, lookup_checkf, &pair, DB_NEXT);
367
toku_ft_cursor_close(cursor);
370
toku_ft_handle_stat64(t, NULL, &s);
371
assert(s.nkeys == (uint64_t) n && s.ndata == (uint64_t) n && s.dsize == userdata);
373
r = toku_close_ft_handle_nolsn(t, 0); assert(r==0);
374
toku_cachetable_close(&ct);
377
static void test_merge_files (const char *tf_template, const char *output_name) {
379
struct ft_loader_s bl;
381
bl.temp_file_template = tf_template;
382
bl.reserved_memory = 512*1024*1024;
383
int r = ft_loader_init_file_infos(&bl.file_infos); CKERR(r);
384
ft_loader_lock_init(&bl);
385
ft_loader_init_error_callback(&bl.error_callback);
386
ft_loader_set_fractal_workers_count_from_c(&bl);
388
struct merge_fileset fs;
389
init_merge_fileset(&fs);
391
int a_keys[] = { 1, 3, 5, 7, 8, 9};
392
int b_keys[] = { 0, 2, 4, 6 };
393
const char *a_vals[] = {"a", "c", "e", "g", "h", "i"};
394
const char *b_vals[] = {"0", "b", "d", "f"};
395
int sorted_keys[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
396
const char *sorted_vals[] = { "0", "a", "b", "c", "d", "e", "f", "g", "h", "i" };
397
struct rowset aset, bset;
398
uint64_t size_est = 0;
399
fill_rowset(&aset, a_keys, a_vals, 6, &size_est);
400
fill_rowset(&bset, b_keys, b_vals, 4, &size_est);
402
toku_ft_loader_set_n_rows(&bl, 6+4);
404
ft_loader_set_error_function(&bl.error_callback, err_cb, NULL);
405
r = ft_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints); CKERR(r);
406
r = ft_loader_sort_and_write_rows(&bset, &fs, &bl, 0, dest_db, compare_ints); CKERR(r);
407
assert(fs.n_temp_files==2 && fs.n_temp_files_limit >= fs.n_temp_files);
408
// destroy_rowset(&aset);
409
// destroy_rowset(&bset);
410
for (int i=0; i<2; i++) assert(fs.data_fidxs[i].idx != -1);
412
ft_loader_fi_close_all(&bl.file_infos);
415
r = queue_create(&q, 0xFFFFFFFF); // infinite queue.
418
r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r);
420
assert(fs.n_temp_files==0);
423
toku_fill_dbt(&desc.dbt, "abcd", 4);
425
int fd = open(output_name, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO);
428
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q, size_est, 0, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD);
431
destroy_merge_fileset(&fs);
432
ft_loader_fi_destroy(&bl.file_infos, false);
433
ft_loader_destroy_error_callback(&bl.error_callback);
434
ft_loader_lock_destroy(&bl);
437
verify_dbfile(10, sorted_keys, sorted_vals, output_name);
439
r = queue_destroy(q);
443
/* Test to see if we can open temporary files. */
444
int test_main (int argc, const char *argv[]) {
447
if (strcmp(argv[0],"-v")==0) {
449
} else if (strcmp(argv[0],"-q")==0) {
457
const char* directory = TOKU_TEST_FILENAME;
458
int r = toku_os_mkdir(directory, 0755);
459
if (r!=0) CKERR2(errno, EEXIST);
461
int templen = strlen(directory)+15;
462
char tf_template[templen];
464
int n = snprintf(tf_template, templen, "%s/tempXXXXXX", directory);
465
assert (n>0 && n<templen);
467
char output_name[templen];
469
int n = snprintf(output_name, templen, "%s/data.tokudb", directory);
470
assert (n>0 && n<templen);
472
test_read_write_rows(tf_template);
474
test_mergesort_row_array();
475
test_merge_files(tf_template, output_name);
478
char deletecmd[templen];
479
int n = snprintf(deletecmd, templen, "rm -rf %s", directory);
480
assert(n>0 && n<templen);
481
r = system(deletecmd);