1
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
4
COPYING CONDITIONS NOTICE:
6
This program is free software; you can redistribute it and/or modify
7
it under the terms of version 2 of the GNU General Public License as
8
published by the Free Software Foundation, and provided that the
9
following conditions are met:
11
* Redistributions of source code must retain this COPYING
12
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
13
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
14
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
17
* Redistributions in binary form must reproduce this COPYING
18
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
19
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
20
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
21
GRANT (below) in the documentation and/or other materials
22
provided with the distribution.
24
You should have received a copy of the GNU General Public License
25
along with this program; if not, write to the Free Software
26
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
31
TokuDB, Tokutek Fractal Tree Indexing Library.
32
Copyright (C) 2007-2013 Tokutek, Inc.
36
This program is distributed in the hope that it will be useful, but
37
WITHOUT ANY WARRANTY; without even the implied warranty of
38
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
39
General Public License for more details.
41
UNIVERSITY PATENT NOTICE:
43
The technology is licensed by the Massachusetts Institute of
44
Technology, Rutgers State University of New Jersey, and the Research
45
Foundation of State University of New York at Stony Brook under
46
United States of America Serial No. 11/760379 and to the patents
47
and/or patent applications resulting from it.
49
PATENT MARKING NOTICE:
51
This software is covered by US Patent No. 8,185,551.
52
This software is covered by US Patent No. 8,489,638.
56
"THIS IMPLEMENTATION" means the copyrightable works distributed by
57
Tokutek as part of the Fractal Tree project.
59
"PATENT CLAIMS" means the claims of patents that are owned or
60
licensable by Tokutek, both currently or in the future; and that in
61
the absence of this license would be infringed by THIS
62
IMPLEMENTATION or by using or running THIS IMPLEMENTATION.
64
"PATENT CHALLENGE" shall mean a challenge to the validity,
65
patentability, enforceability and/or non-infringement of any of the
66
PATENT CLAIMS or otherwise opposing any of the PATENT CLAIMS.
68
Tokutek hereby grants to you, for the term and geographical scope of
69
the PATENT CLAIMS, a non-exclusive, no-charge, royalty-free,
70
irrevocable (except as stated in this section) patent license to
71
make, have made, use, offer to sell, sell, import, transfer, and
72
otherwise run, modify, and propagate the contents of THIS
73
IMPLEMENTATION, where such license applies only to the PATENT
74
CLAIMS. This grant does not include claims that would be infringed
75
only as a consequence of further modifications of THIS
76
IMPLEMENTATION. If you or your agent or licensee institute or order
77
or agree to the institution of patent litigation against any entity
78
(including a cross-claim or counterclaim in a lawsuit) alleging that
79
THIS IMPLEMENTATION constitutes direct or contributory patent
80
infringement, or inducement of patent infringement, then any rights
81
granted to you under this License shall terminate as of the date
82
such litigation is filed. If you or your agent or exclusive
83
licensee institute or order or agree to the institution of a PATENT
84
CHALLENGE, then Tokutek may terminate any rights granted to you
88
#ident "Copyright (c) 2010-2013 Tokutek Inc. All rights reserved."
92
#include "toku_pthread.h"
99
enum {MAX_ROW_LEN=1024};
100
static int NUM_DBS=10;
101
static int DISALLOW_PUTS=0;
102
static int COMPRESS=0;
103
static int USE_REGION=0;
104
static const char *envdir = TOKU_TEST_FILENAME;
106
static int generate_rows_for_region(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) __attribute__((unused));
107
static int generate_rows_for_lineitem(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) __attribute__((unused));
109
// linenumber,orderkey form a unique, primary key
110
// key is a potentially duplicate secondary key
117
static __attribute__((__unused__)) int
118
tpch_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
119
assert(db && a && b);
120
assert(a->size == sizeof(struct tpch_key));
121
assert(b->size == sizeof(struct tpch_key));
123
unsigned int xl = (*((struct tpch_key *) a->data)).linenumber;
124
unsigned int xo = (*((struct tpch_key *) a->data)).orderkey;
125
unsigned int xk = (*((struct tpch_key *) a->data)).key;
127
unsigned int yl = (*((struct tpch_key *) b->data)).linenumber;
128
unsigned int yo = (*((struct tpch_key *) b->data)).orderkey;
129
unsigned int yk = (*((struct tpch_key *) b->data)).key;
131
// printf("tpch_dbt_cmp xl:%d, yl:%d, xo:%d, yo:%d, xk:%d, yk:%d\n", xl, yl, xo, yo, xk, yk);
133
if (xk<yk) return -1;
136
if (xl<yl) return -1;
139
if (xo>yo) return -1;
145
static int lineno = 0;
146
static char *tpch_read_row(FILE *fp, int *key, char *val)
149
return fgets(val, MAX_ROW_LEN , fp);
154
* split '|' separated fields into fields array
156
static void tpch_parse_row(char *row, char *fields[], int fields_N)
166
fields[field][i] = '\0';
167
//printf("field : <%s>\n", fields[field]);
172
fields[field][i++] = c;
175
assert(field == fields_N);
182
static int generate_rows_for_region(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val)
184
toku_dbt_array_resize(dest_keys, 1);
185
toku_dbt_array_resize(dest_vals, 1);
186
DBT *dest_key = &dest_keys->dbts[0];
187
DBT *dest_val = &dest_vals->dbts[0];
192
assert(*(uint32_t*)dest_db->app_private == 0);
198
char row[8+32+160+8];
199
sprintf(row, "%s", (char*)src_val->data);
201
const uint32_t fields_N = 3;
202
char *fields[3] = {regionkey, name, comment};
203
tpch_parse_row(row, fields, fields_N);
205
if (dest_key->flags==DB_DBT_REALLOC) {
206
if (dest_key->data) toku_free(dest_key->data);
210
if (dest_val->flags==DB_DBT_REALLOC) {
211
if (dest_val->data) toku_free(dest_val->data);
216
struct tpch_key *XMALLOC(key);
217
key->orderkey = atoi(regionkey);
218
key->linenumber = atoi(regionkey);
219
key->key = atoi(regionkey);
221
char *XMALLOC_N(sizeof(row), val);
222
sprintf(val, "%s|%s", name, comment);
224
dbt_init(dest_key, key, sizeof(struct tpch_key));
225
dest_key->flags = DB_DBT_REALLOC;
227
dbt_init(dest_val, val, strlen(val)+1);
228
dest_val->flags = DB_DBT_REALLOC;
238
static int generate_rows_for_lineitem(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val)
240
toku_dbt_array_resize(dest_keys, 1);
241
toku_dbt_array_resize(dest_vals, 1);
242
DBT *dest_key = &dest_keys->dbts[0];
243
DBT *dest_val = &dest_vals->dbts[0];
254
char extendedprice[16];
261
char receiptdate[16];
262
char shipinstruct[32];
265
char row[16+16+16+8+8+16+8+8+8+8+16+16+16+32+16+48 + 8];
266
sprintf(row, "%s", (char*)src_val->data);
268
const uint32_t fields_N = 16;
269
char *fields[16] = {orderkey,
285
tpch_parse_row(row, fields, fields_N);
287
if (dest_key->flags==DB_DBT_REALLOC) {
288
if (dest_key->data) toku_free(dest_key->data);
292
if (dest_val->flags==DB_DBT_REALLOC) {
293
if (dest_val->data) toku_free(dest_val->data);
298
struct tpch_key *XMALLOC(key);
299
key->orderkey = atoi(linenumber);
300
key->linenumber = atoi(orderkey);
303
uint32_t which = *(uint32_t*)dest_db->app_private;
306
val = toku_xstrdup(row);
309
val = toku_xstrdup(orderkey);
314
key->key = atoi(linenumber);
318
key->key = atoi(orderkey);
322
key->key = atoi(suppkey);
326
key->key = atoi(partkey);// not really, ...
330
key->key = atoi(partkey);
334
key->key = atoi(linenumber) + atoi(suppkey); // not really ...
337
key->key = atoi(linenumber) +atoi(partkey); // not really ...
341
key->key = atoi(suppkey) + atoi(partkey); // not really ...
347
dbt_init(dest_key, key, sizeof(struct tpch_key));
348
dest_key->flags = DB_DBT_REALLOC;
350
dbt_init(dest_val, val, strlen(val)+1);
351
dest_val->flags = DB_DBT_REALLOC;
357
static void *expect_poll_void = &expect_poll_void;
358
static int poll_count=0;
359
static int poll_function (void *extra, float progress) {
361
static int did_one=0;
362
static struct timeval start;
364
gettimeofday(&now, 0);
369
printf("%6.6f %5.1f%%\n", now.tv_sec - start.tv_sec + 1e-6*(now.tv_usec - start.tv_usec), progress*100);
371
assert(extra==expect_poll_void);
372
assert(0.0<=progress && progress<=1.0);
377
static int test_loader(DB **dbs)
382
uint32_t db_flags[MAX_DBS];
383
uint32_t dbt_flags[MAX_DBS];
384
for(int i=0;i<MAX_DBS;i++) {
385
db_flags[i] = DB_NOOVERWRITE;
388
uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
391
// select which table to loader
393
fp = fopen("./region.tbl", "r");
395
fprintf(stderr, "%s:%d %s\n", __FUNCTION__, __LINE__, strerror(errno));
400
fp = fopen("./lineitem.tbl", "r");
402
fprintf(stderr, "%s:%d %s\n", __FUNCTION__, __LINE__, strerror(errno));
408
// create and initialize loader
410
r = env->txn_begin(env, NULL, &txn, 0);
412
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
414
r = loader->set_error_callback(loader, NULL, NULL);
416
r = loader->set_poll_function(loader, poll_function, expect_poll_void);
419
// using loader->put, put values into DB
420
printf("puts "); fflush(stdout);
425
c = tpch_read_row(fp, &k, v);
427
while ( c != NULL ) {
428
v[strlen(v)-1] = '\0'; // remove trailing \n
429
dbt_init(&key, &k, sizeof(int));
430
dbt_init(&val, v, strlen(v)+1);
431
r = loader->put(loader, &key, &val);
437
if (verbose) { if((i++%10000) == 0){printf("."); fflush(stdout);} }
438
c = tpch_read_row(fp, &k, v);
440
if(verbose) {printf("\n"); fflush(stdout);}
446
printf("closing"); fflush(stdout);
447
r = loader->close(loader);
451
if ( DISALLOW_PUTS == 0 ) assert(poll_count>0);
453
r = txn->commit(txn, 0);
459
static int run_test(void)
462
char rmcmd[32 + strlen(envdir)];
463
snprintf(rmcmd, sizeof rmcmd, "rm -rf %s", envdir);
464
r = system(rmcmd); CKERR(r);
465
r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
467
r = db_env_create(&env, 0); CKERR(r);
468
db_env_enable_engine_status(0); // disable engine status on crash because test is expected to fail
469
r = env->set_default_bt_compare(env, tpch_dbt_cmp); CKERR(r);
470
// select which TPC-H table to load
472
r = env->set_generate_row_callback_for_put(env, generate_rows_for_region); CKERR(r);
476
r = env->set_generate_row_callback_for_put(env, generate_rows_for_lineitem); CKERR(r);
480
int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE;
481
r = env->open(env, envdir, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
482
env->set_errfile(env, stderr);
483
//Disable auto-checkpointing
484
r = env->checkpointing_set_period(env, 0); CKERR(r);
487
dbt_init(&desc, "foo", sizeof("foo"));
488
char name[MAX_NAME*2];
490
DB **dbs = (DB**)toku_malloc(sizeof(DB*) * NUM_DBS);
493
for(int i=0;i<NUM_DBS;i++) {
495
r = db_create(&dbs[i], env, 0); CKERR(r);
496
dbs[i]->app_private = &idx[i];
497
snprintf(name, sizeof(name), "db_%04x", i);
498
r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
499
IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
500
{ int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
504
// -------------------------- //
505
int testr = test_loader(dbs);
506
// -------------------------- //
508
for(int i=0;i<NUM_DBS;i++) {
509
dbs[i]->close(dbs[i], 0); CKERR(r);
512
r = env->close(env, 0); CKERR(r);
518
// ------------ infrastructure ----------
519
static void do_args(int argc, char * const argv[]);
521
int test_main(int argc, char * const *argv) {
527
static void do_args(int argc, char * const argv[]) {
532
if (strcmp(argv[0], "-v")==0) {
534
} else if (strcmp(argv[0],"-q")==0) {
536
if (verbose<0) verbose=0;
537
} else if (strcmp(argv[0], "-h")==0) {
540
fprintf(stderr, "Usage: -h -p -g\n%s\n", cmd);
542
} else if (strcmp(argv[0], "-p")==0) {
543
DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
544
} else if (strcmp(argv[0], "-z")==0) {
545
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
546
} else if (strcmp(argv[0], "-g")==0) {
548
} else if (strcmp(argv[0], "-e") == 0) {
553
fprintf(stderr, "Unknown arg: %s\n", argv[0]);