1
/** BEGIN COPYRIGHT BLOCK
2
* This Program is free software; you can redistribute it and/or modify it under
3
* the terms of the GNU General Public License as published by the Free Software
4
* Foundation; version 2 of the License.
6
* This Program is distributed in the hope that it will be useful, but WITHOUT
7
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
8
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
10
* You should have received a copy of the GNU General Public License along with
11
* this Program; if not, write to the Free Software Foundation, Inc., 59 Temple
12
* Place, Suite 330, Boston, MA 02111-1307 USA.
14
* In addition, as a special exception, Red Hat, Inc. gives You the additional
15
* right to link the code of this Program with code not covered under the GNU
16
* General Public License ("Non-GPL Code") and to distribute linked combinations
17
* including the two, subject to the limitations in this paragraph. Non-GPL Code
18
* permitted under this exception must only link to the code of this Program
19
* through those well defined interfaces identified in the file named EXCEPTION
20
* found in the source code files (the "Approved Interfaces"). The files of
21
* Non-GPL Code may instantiate templates or use macros or inline functions from
22
* the Approved Interfaces without causing the resulting work to be covered by
23
* the GNU General Public License. Only Red Hat, Inc. may make changes or
24
* additions to the list of Approved Interfaces. You must obey the GNU General
25
* Public License in all respects for all of the Program code and other code used
26
* in conjunction with the Program except the Non-GPL Code covered by this
27
* exception. If you modify this file, you may extend this exception to your
28
* version of the file, but you are not obligated to do so. If you do not wish to
29
* provide this exception without modification, you must delete this exception
30
* statement from your version and license this file solely under the GPL without
34
* Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
35
* Copyright (C) 2005 Red Hat, Inc.
36
* All rights reserved.
37
* END COPYRIGHT BLOCK **/
44
* the "new" ("deluxe") backend import code
46
* please make sure you use 4-space indentation on this file.
49
#include "back-ldbm.h"
53
#define ERR_IMPORT_ABORTED -23
54
#define DRYRUN_QUIT -24
57
/********** routines to manipulate the entry fifo **********/
59
/* this is pretty bogus -- could be a HUGE amount of memory */
60
/* Not anymore with the Import Queue Adaptative Algorithm (Regulation) */
61
#define MAX_FIFO_SIZE 8000
63
static int import_fifo_init(ImportJob *job)
65
ldbm_instance *inst = job->inst;
67
/* Work out how big the entry fifo can be */
68
if (inst->inst_cache.c_maxentries > 0)
69
job->fifo.size = inst->inst_cache.c_maxentries;
71
job->fifo.size = inst->inst_cache.c_maxsize / 1024; /* guess */
73
/* byte limit that should be respected to avoid memory starvation */
74
/* conservative computing: multiply by .8 to allow for reasonable overflow */
75
job->fifo.bsize = (inst->inst_cache.c_maxsize/10) << 3;
77
job->fifo.c_bsize = 0;
79
if (job->fifo.size > MAX_FIFO_SIZE)
80
job->fifo.size = MAX_FIFO_SIZE;
81
/* has to be at least 1 or 2, and anything less than about 100 destroys
82
* the point of doing all this optimization in the first place. */
83
if (job->fifo.size < 100)
86
/* Get memory for the entry fifo */
87
/* This is used to keep a ref'ed pointer to the last <cachesize>
88
* processed entries */
89
PR_ASSERT(NULL == job->fifo.item);
90
job->fifo.item = (FifoItem *)slapi_ch_calloc(job->fifo.size,
92
if (NULL == job->fifo.item) {
93
/* Memory allocation error */
99
FifoItem *import_fifo_fetch(ImportJob *job, ID id, int worker)
101
int idx = id % job->fifo.size;
104
if (job->fifo.item) {
105
fi = &(job->fifo.item[idx]);
112
if (fi->bad == FIFOITEM_BAD) {
113
fi->bad = FIFOITEM_BAD_PRINTED;
114
import_log_notice(job, "WARNING: bad entry: ID %d", id);
118
PR_ASSERT(fi->entry->ep_refcnt > 0);
124
static void import_fifo_destroy(ImportJob *job)
126
/* Free any entries in the fifo first */
127
struct backentry *be = NULL;
130
for (i = 0; i < job->fifo.size; i++) {
131
be = job->fifo.item[i].entry;
133
job->fifo.item[i].entry = NULL;
134
job->fifo.item[i].filename = NULL;
136
slapi_ch_free((void **)&job->fifo.item);
137
job->fifo.item = NULL;
141
/********** logging stuff **********/
143
#define LOG_BUFFER 512
145
/* this changes the 'nsTaskStatus' value, which is transient (anything logged
146
* here wipes out any previous status)
148
static void import_log_status_start(ImportJob *job)
150
if (! job->task_status)
151
job->task_status = (char *)slapi_ch_malloc(10 * LOG_BUFFER);
152
if (! job->task_status)
153
return; /* out of memory? */
155
job->task_status[0] = 0;
158
static void import_log_status_add_line(ImportJob *job, char *format, ...)
163
if (! job->task_status)
165
len = strlen(job->task_status);
166
if (len + 5 > (10 * LOG_BUFFER))
167
return; /* no room */
169
if (job->task_status[0])
170
strcat(job->task_status, "\n");
172
va_start(ap, format);
173
PR_vsnprintf(job->task_status + len, (10 * LOG_BUFFER) - len, format, ap);
177
static void import_log_status_done(ImportJob *job)
180
slapi_task_log_status(job->task, "%s", job->task_status);
184
/* this adds a line to the 'nsTaskLog' value, which is cumulative (anything
185
* logged here is added to the end)
187
void import_log_notice(ImportJob *job, char *format, ...)
190
char buffer[LOG_BUFFER];
192
va_start(ap, format);
193
PR_vsnprintf(buffer, LOG_BUFFER, format, ap);
197
slapi_task_log_notice(job->task, "%s", buffer);
199
/* also save it in the logs for posterity */
200
if (job->flags & FLAG_UPGRADEDNFORMAT) {
201
LDAPDebug(LDAP_DEBUG_ANY, "upgradedn %s: %s\n", job->inst->inst_name,
203
} else if (job->flags & FLAG_REINDEXING) {
204
LDAPDebug(LDAP_DEBUG_ANY, "reindex %s: %s\n", job->inst->inst_name,
207
LDAPDebug(LDAP_DEBUG_ANY, "import %s: %s\n", job->inst->inst_name,
212
static void import_task_destroy(Slapi_Task *task)
214
ImportJob *job = (ImportJob *)slapi_task_get_data(task);
216
if (job && job->task_status) {
217
slapi_ch_free((void **)&job->task_status);
218
job->task_status = NULL;
221
slapi_task_set_data(task, NULL);
224
static void import_task_abort(Slapi_Task *task)
228
/* don't log anything from here, because we're still holding the
229
* DSE lock for modify...
232
if (slapi_task_get_state(task) == SLAPI_TASK_FINISHED) {
238
* If the import thread happens to finish right now we're in trouble
239
* because it will free the job.
242
job = (ImportJob *)slapi_task_get_data(task);
244
import_abort_all(job, 0);
245
while (slapi_task_get_state(task) != SLAPI_TASK_FINISHED)
246
DS_Sleep(PR_MillisecondsToInterval(100));
251
/********** helper functions for importing **********/
254
/* Function used to gather a list of indexed attrs */
255
static int import_attr_callback(void *node, void *param)
257
ImportJob *job = (ImportJob *)param;
258
struct attrinfo *a = (struct attrinfo *)node;
260
if (job->flags & FLAG_DRYRUN) { /* dryrun; we don't need the workers */
263
if (job->flags & FLAG_UPGRADEDNFORMAT) {
264
/* Bring up import workers just for indexes having DN syntax
265
* attribute type. (except entrydn -- taken care below) */
267
Slapi_Attr attr = {0};
270
* Treat cn and ou specially. Bring up the import workers for
271
* cn and ou even though they are not DN syntax attribute.
272
* This is done because they have some exceptional case to store
273
* DN format in the admin entries such as UserPreferences.
275
if ((0 == PL_strcasecmp("cn", a->ai_type)) ||
276
(0 == PL_strcasecmp("commonname", a->ai_type)) ||
277
(0 == PL_strcasecmp("ou", a->ai_type)) ||
278
(0 == PL_strcasecmp("organizationalUnit", a->ai_type))) {
281
slapi_attr_init(&attr, a->ai_type);
282
rc = slapi_attr_is_dn_syntax_attr(&attr);
290
/* OK, so we now have hold of the attribute structure and the job info,
291
* let's see what we have. Remember that although this function is called
292
* many times, all these calls are in the context of a single thread, so we
293
* don't need to worry about protecting the data in the job structure.
296
/* We need to specifically exclude the (entrydn, entryrdn) & parentid &
297
* ancestorid indexes because we build those in the foreman thread.
299
if (IS_INDEXED(a->ai_indexmask) &&
300
(strcasecmp(a->ai_type, LDBM_ENTRYDN_STR) != 0) &&
301
(strcasecmp(a->ai_type, LDBM_ENTRYRDN_STR) != 0) &&
302
(strcasecmp(a->ai_type, LDBM_PARENTID_STR) != 0) &&
303
(strcasecmp(a->ai_type, LDBM_ANCESTORID_STR) != 0) &&
304
(strcasecmp(a->ai_type, numsubordinates) != 0)) {
305
/* Make an import_index_info structure, fill it in and insert into the
307
IndexInfo *info = CALLOC(IndexInfo);
310
/* Memory allocation error */
313
info->name = slapi_ch_strdup(a->ai_type);
315
if (NULL == info->name) {
316
/* Memory allocation error */
320
info->next = job->index_list;
321
job->index_list = info;
322
job->number_indexers++;
327
static void import_set_index_buffer_size(ImportJob *job)
329
IndexInfo *current_index = NULL;
330
size_t substring_index_count = 0;
331
size_t proposed_size = 0;
333
/* Count the substring indexes we have */
334
for (current_index = job->index_list; current_index != NULL;
335
current_index = current_index->next) {
336
if (current_index->ai->ai_indexmask & INDEX_SUB) {
337
substring_index_count++;
340
if (substring_index_count > 0) {
341
/* Make proposed size such that if all substring indices were
342
* reasonably full, we'd hit the target space */
343
proposed_size = (job->job_index_buffer_size / substring_index_count) /
344
IMPORT_INDEX_BUFFER_SIZE_CONSTANT;
345
if (proposed_size > IMPORT_MAX_INDEX_BUFFER_SIZE) {
346
proposed_size = IMPORT_MAX_INDEX_BUFFER_SIZE;
348
if (proposed_size < IMPORT_MIN_INDEX_BUFFER_SIZE) {
353
job->job_index_buffer_suggestion = proposed_size;
356
static void import_free_thread_data(ImportJob *job)
358
/* DBDB free the lists etc */
359
ImportWorkerInfo *worker = job->worker_list;
361
while (worker != NULL) {
362
ImportWorkerInfo *asabird = worker;
363
worker = worker->next;
364
if (asabird->work_type != PRODUCER)
365
slapi_ch_free( (void**)&asabird);
369
void import_free_job(ImportJob *job)
371
/* DBDB free the lists etc */
372
IndexInfo *index = job->index_list;
374
import_free_thread_data(job);
375
while (index != NULL) {
376
IndexInfo *asabird = index;
378
slapi_ch_free( (void**)&asabird->name);
379
slapi_ch_free( (void**)&asabird);
381
job->index_list = NULL;
382
if (NULL != job->mothers) {
383
import_subcount_stuff_term(job->mothers);
384
slapi_ch_free( (void**)&job->mothers);
387
ldbm_back_free_incl_excl(job->include_subtrees, job->exclude_subtrees);
388
charray_free(job->input_filenames);
390
import_fifo_destroy(job);
391
if (NULL != job->uuid_namespace)
392
slapi_ch_free((void **)&job->uuid_namespace);
394
PR_DestroyLock(job->wire_lock);
396
PR_DestroyCondVar(job->wire_cv);
397
slapi_ch_free((void **)&job->task_status);
400
/* determine if we are the correct backend for this entry
401
* (in a distributed suffix, some entries may be for other backends).
402
* if the entry's dn actually matches one of the suffixes of the be, we
403
* automatically take it as a belonging one, for such entries must be
404
* present in EVERY backend independently of the distribution applied.
406
int import_entry_belongs_here(Slapi_Entry *e, backend *be)
408
Slapi_Backend *retbe;
409
Slapi_DN *sdn = slapi_entry_get_sdn(e);
411
if (slapi_be_issuffix(be, sdn))
414
retbe = slapi_mapping_tree_find_backend_for_sdn(sdn);
415
return (retbe == be);
419
/********** starting threads and stuff **********/
421
/* Solaris is weird---we need an LWP per thread but NSPR doesn't give us
422
* one unless we make this magic belshe-call */
423
/* Fixed on Solaris 8; NSPR supports PR_GLOBAL_BOUND_THREAD */
424
#define CREATE_THREAD PR_CreateThread
426
static void import_init_worker_info(ImportWorkerInfo *info, ImportJob *job)
428
info->command = PAUSE;
430
info->first_ID = job->first_ID;
431
info->index_buffer_size = job->job_index_buffer_suggestion;
434
static int import_start_threads(ImportJob *job)
436
IndexInfo *current_index = NULL;
437
ImportWorkerInfo *foreman = NULL, *worker = NULL;
439
foreman = CALLOC(ImportWorkerInfo);
443
/* start the foreman */
444
import_init_worker_info(foreman, job);
445
foreman->work_type = FOREMAN;
446
if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_foreman, foreman,
447
PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
448
PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) {
449
PRErrorCode prerr = PR_GetError();
450
LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import foreman thread, "
451
SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
452
prerr, slapd_pr_strerror(prerr), 0);
457
foreman->next = job->worker_list;
458
job->worker_list = foreman;
460
/* Start follower threads, if we are doing attribute indexing */
461
current_index = job->index_list;
462
if (job->flags & FLAG_INDEX_ATTRS) {
463
while (current_index) {
464
/* make a new thread info structure */
465
worker = CALLOC(ImportWorkerInfo);
470
import_init_worker_info(worker, job);
471
worker->index_info = current_index;
472
worker->work_type = WORKER;
474
/* Start the thread */
475
if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_worker, worker,
476
PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
477
PR_UNJOINABLE_THREAD,
478
SLAPD_DEFAULT_THREAD_STACKSIZE)) {
479
PRErrorCode prerr = PR_GetError();
480
LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import worker thread, "
481
SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
482
prerr, slapd_pr_strerror(prerr), 0);
487
/* link it onto the job's thread list */
488
worker->next = job->worker_list;
489
job->worker_list = worker;
490
current_index = current_index->next;
496
import_log_notice(job, "Import thread creation failed.");
497
import_log_notice(job, "Aborting all import threads...");
498
import_abort_all(job, 1);
499
import_log_notice(job, "Import threads aborted.");
504
/********** monitoring the worker threads **********/
506
static void import_clear_progress_history(ImportJob *job)
510
for (i = 0; i < IMPORT_JOB_PROG_HISTORY_SIZE /*- 1*/; i++) {
511
job->progress_history[i] = job->first_ID;
512
job->progress_times[i] = job->start_time;
514
/* reset libdb cache stats */
515
job->inst->inst_cache_hits = job->inst->inst_cache_misses = 0;
518
static double import_grok_db_stats(ldbm_instance *inst)
520
DB_MPOOL_STAT *mpstat = NULL;
521
DB_MPOOL_FSTAT **mpfstat = NULL;
522
int return_value = -1;
523
double cache_hit_ratio = 0.0;
525
return_value = dblayer_memp_stat_instance(inst, &mpstat, &mpfstat);
531
if (0 == return_value) {
532
unsigned long current_cache_hits = mpstat->st_cache_hit;
533
unsigned long current_cache_misses = mpstat->st_cache_miss;
535
if (inst->inst_cache_hits) {
536
unsigned long hit_delta, miss_delta;
538
hit_delta = current_cache_hits - inst->inst_cache_hits;
539
miss_delta = current_cache_misses - inst->inst_cache_misses;
540
if (hit_delta != 0) {
541
cache_hit_ratio = (double)hit_delta /
542
(double)(hit_delta + miss_delta);
545
inst->inst_cache_misses = current_cache_misses;
546
inst->inst_cache_hits = current_cache_hits;
551
slapi_ch_free((void **)&mpstat);
553
#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR + DB_VERSION_PATCH <= 3204
554
/* In DB 3.2.4 and earlier, we need to free each element */
555
DB_MPOOL_FSTAT **tfsp;
556
for (tfsp = mpfstat; *tfsp; tfsp++)
557
slapi_ch_free((void **)tfsp);
559
slapi_ch_free((void **)&mpfstat);
561
return cache_hit_ratio;
564
static char* import_decode_worker_state(int state)
580
static void import_print_worker_status(ImportWorkerInfo *info)
582
char *name = (info->work_type == PRODUCER ? "Producer" :
583
(info->work_type == FOREMAN ? "Foreman" :
584
info->index_info->name));
586
import_log_status_add_line(info->job,
587
"%-25s %s%10ld %7.1f", name,
588
import_decode_worker_state(info->state),
589
info->last_ID_processed, info->rate);
593
#define IMPORT_CHUNK_TEST_HOLDOFF_TIME (5*60) /* Seconds */
595
/* Got to be lower than this: */
596
#define IMPORT_CHUNK_TEST_CACHE_HIT_RATIO (0.99)
597
/* Less than half as fast as we were doing: */
598
#define IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A (0.5)
599
/* A lot less fast than we were doing: */
600
#define IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B (0.1)
602
static int import_throw_in_towel(ImportJob *job, time_t current_time,
605
static int number_of_times_here = 0;
607
/* secret -c option allows specific chunk size to be set... */
608
if (job->merge_chunk_size != 0) {
609
if ((0 != job->lead_ID) &&
610
(trailing_ID > job->first_ID) &&
611
(trailing_ID - job->first_ID > job->merge_chunk_size)) {
617
/* Check stats to decide whether we're getting bogged down and should
618
* terminate this pass.
621
/* Check #1 : are we more than 10 minutes into the chunk ? */
622
if (current_time - job->start_time > IMPORT_CHUNK_TEST_HOLDOFF_TIME) {
623
/* Check #2 : Have we slowed down considerably recently ? */
624
if ((job->recent_progress_rate / job->average_progress_rate) <
625
IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A) {
626
/* Check #3: Cache performing poorly---the puported reason
627
* for the slowdown */
628
if (job->cache_hit_ratio < IMPORT_CHUNK_TEST_CACHE_HIT_RATIO) {
629
/* We have a winner ! */
630
import_log_notice(job, "Decided to end this pass because "
631
"the progress rate has dropped below "
632
"the %.0f%% threshold.",
633
IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A*100.0);
637
if ((job->recent_progress_rate / job->average_progress_rate) <
638
IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B) {
639
/* Alternative check: have we really, really slowed down,
640
* without the test for cache overflow? */
641
/* This is designed to catch the case where the cache has
642
* been misconfigured too large */
643
if (number_of_times_here > 10) {
644
/* Got to get here ten times at least */
645
import_log_notice(job, "Decided to end this pass "
646
"because the progress rate "
647
"plummeted below %.0f%%",
648
IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B*100.0);
651
number_of_times_here++;
656
number_of_times_here = 0;
660
static void import_push_progress_history(ImportJob *job, ID current_id,
665
for (i = 0; i < IMPORT_JOB_PROG_HISTORY_SIZE - 1; i++) {
666
job->progress_history[i] = job->progress_history[i+1];
667
job->progress_times[i] = job->progress_times[i+1];
669
job->progress_history[i] = current_id;
670
job->progress_times[i] = current_time;
673
static void import_calc_rate(ImportWorkerInfo *info, int time_interval)
675
size_t ids = info->last_ID_processed - info->previous_ID_counted;
676
double rate = (double)ids / time_interval;
678
if ( (info->previous_ID_counted != 0) && (info->last_ID_processed != 0) ) {
683
info->previous_ID_counted = info->last_ID_processed;
686
/* find the rate (ids/time) of work from a worker thread between history
689
#define HISTORY(N) (job->progress_history[N])
690
#define TIMES(N) (job->progress_times[N])
691
#define PROGRESS(A, B) ((HISTORY(B) > HISTORY(A)) ? \
692
((double)(HISTORY(B) - HISTORY(A)) / \
693
(double)(TIMES(B) - TIMES(A))) : \
696
static int import_monitor_threads(ImportJob *job, int *status)
698
PRIntervalTime tenthsecond = PR_MillisecondsToInterval(100);
699
ImportWorkerInfo *current_worker = NULL;
700
ImportWorkerInfo *producer = NULL, *foreman = NULL;
703
int count = 1; /* 1 to prevent premature status report */
704
int producer_done = 0;
705
const int display_interval = 200;
707
time_t last_time = 0;
708
time_t time_interval = 0;
711
for (current_worker = job->worker_list; current_worker != NULL;
712
current_worker = current_worker->next) {
713
current_worker->command = RUN;
714
if (current_worker->work_type == PRODUCER)
715
producer = current_worker;
716
if (current_worker->work_type == FOREMAN)
717
foreman = current_worker;
721
if (job->flags & FLAG_USE_FILES)
722
PR_ASSERT(producer != NULL);
724
PR_ASSERT(foreman != NULL);
731
job->start_time = last_time;
732
import_clear_progress_history(job);
735
ID trailing_ID = NOID;
737
DS_Sleep(tenthsecond);
740
/* First calculate the time interval since last reported */
741
if (0 == (count % display_interval)) {
743
time_interval = time_now - last_time;
744
last_time = time_now;
745
/* Now calculate our rate of progress overall for this chunk */
746
if (time_now != job->start_time) {
747
/* log a cute chart of the worker progress */
748
import_log_status_start(job);
749
import_log_status_add_line(job,
750
"Index status for import of %s:", job->inst->inst_name);
751
import_log_status_add_line(job,
752
"-------Index Task-------State---Entry----Rate-");
754
import_push_progress_history(job, foreman->last_ID_processed,
756
job->average_progress_rate =
757
(double)(HISTORY(IMPORT_JOB_PROG_HISTORY_SIZE-1)+1 - foreman->first_ID) /
758
(double)(TIMES(IMPORT_JOB_PROG_HISTORY_SIZE-1) - job->start_time);
759
job->recent_progress_rate =
760
PROGRESS(0, IMPORT_JOB_PROG_HISTORY_SIZE-1);
761
job->cache_hit_ratio = import_grok_db_stats(job->inst);
765
for (current_worker = job->worker_list; current_worker != NULL;
766
current_worker = current_worker->next) {
767
/* Calculate the ID at which the slowest worker is currently
769
if ((trailing_ID > current_worker->last_ID_processed) &&
770
(current_worker->work_type == WORKER)) {
771
trailing_ID = current_worker->last_ID_processed;
773
if (0 == (count % display_interval) && time_interval) {
774
import_calc_rate(current_worker, time_interval);
775
import_print_worker_status(current_worker);
777
if (current_worker->state == QUIT) {
778
rc = DRYRUN_QUIT; /* Set the RC; Don't abort now;
779
We have to stop other threads */
780
} else if (current_worker->state != FINISHED) {
783
if (current_worker->state == ABORTED) {
788
if ((0 == (count % display_interval)) &&
789
(job->start_time != time_now)) {
790
char buffer[256], *p = buffer;
792
import_log_status_done(job);
793
p += sprintf(p, "Processed %lu entries ", (u_long)job->ready_ID);
794
if (job->total_pass > 1)
795
p += sprintf(p, "(pass %d) ", job->total_pass);
797
p += sprintf(p, "-- average rate %.1f/sec, ",
798
job->average_progress_rate);
799
p += sprintf(p, "recent rate %.1f/sec, ",
800
job->recent_progress_rate);
801
p += sprintf(p, "hit ratio %.0f%%", job->cache_hit_ratio * 100.0);
802
import_log_notice(job, "%s", buffer);
805
/* Then let's see if it's time to complete this import pass */
807
giveup = import_throw_in_towel(job, time_now, trailing_ID);
809
/* If so, signal the lead thread to stop */
810
import_log_notice(job, "Ending pass number %d ...",
812
foreman->command = STOP;
813
while (foreman->state != FINISHED) {
814
DS_Sleep(tenthsecond);
816
import_log_notice(job, "Foreman is done; waiting for "
817
"workers to finish...");
821
/* if the producer is finished, and the foreman has caught up... */
823
producer_done = (producer->state == FINISHED) ||
824
(producer->state == QUIT);
826
/* set in ldbm_back_wire_import */
827
producer_done = (job->flags & FLAG_PRODUCER_DONE);
829
if (producer_done && (job->lead_ID == job->ready_ID)) {
830
/* tell the foreman to stop if he's still working. */
831
if (foreman->state != FINISHED)
832
foreman->command = STOP;
834
/* if all the workers are caught up too, we're done */
835
if (trailing_ID == job->lead_ID)
839
/* if the foreman is done (end of pass) and the worker threads
842
if ((foreman->state == FINISHED) && (job->ready_ID == trailing_ID)) {
849
import_log_notice(job, "Workers finished; cleaning up...");
851
/* Now tell all the workers to stop */
852
for (current_worker = job->worker_list; current_worker != NULL;
853
current_worker = current_worker->next) {
854
if (current_worker->work_type != PRODUCER)
855
current_worker->command = STOP;
858
/* Having done that, wait for them to say that they've stopped */
859
for (current_worker = job->worker_list; current_worker != NULL; ) {
860
if ((current_worker->state != FINISHED) &&
861
(current_worker->state != ABORTED) &&
862
(current_worker->state != QUIT) &&
863
(current_worker->work_type != PRODUCER)) {
864
DS_Sleep(tenthsecond); /* Only sleep if we hit a thread that is still not done */
867
current_worker = current_worker->next;
870
import_log_notice(job, "Workers cleaned up.");
872
/* If we're here and giveup is true, and the primary hadn't finished
873
* processing the input files, we need to return IMPORT_INCOMPLETE_PASS */
874
if (giveup && (job->input_filenames || (job->flags & FLAG_ONLINE) ||
875
(job->flags & FLAG_REINDEXING /* support multi-pass */))) {
876
if (producer_done && (job->ready_ID == job->lead_ID)) {
877
/* foreman caught up with the producer, and the producer is
880
*status = IMPORT_COMPLETE_PASS;
882
*status = IMPORT_INCOMPLETE_PASS;
885
*status = IMPORT_COMPLETE_PASS;
890
return ERR_IMPORT_ABORTED;
894
/********** running passes **********/
896
static int import_run_pass(ImportJob *job, int *status)
900
/* Start the threads running */
901
ret = import_start_threads(job);
903
import_log_notice(job, "Starting threads failed: %d\n", ret);
907
/* Monitor the threads until we're done or fail */
908
ret = import_monitor_threads(job, status);
909
if ((ret == ERR_IMPORT_ABORTED) || (ret == DRYRUN_QUIT)) {
911
} else if (ret != 0) {
912
import_log_notice(job, "Thread monitoring aborted: %d\n", ret);
920
static void import_set_abort_flag_all(ImportJob *job, int wait_for_them)
923
ImportWorkerInfo *worker;
925
/* tell all the worker threads to abort */
926
job->flags |= FLAG_ABORT;
928
/* setting of the flag in the job will be detected in the worker, foreman
929
* threads and if there are any threads which have a sleeptime 200 msecs
930
* = import_sleep_time; after that time, they will examine the condition
931
* (job->flags & FLAG_ABORT) which will unblock the thread to proceed to
932
* abort. Hence, we will sleep here for atleast 3 sec to make sure clean
934
/* allow all the aborts to be processed */
935
DS_Sleep(PR_MillisecondsToInterval(3000));
938
/* Having done that, wait for them to say that they've stopped */
939
for (worker = job->worker_list; worker != NULL; ) {
940
DS_Sleep(PR_MillisecondsToInterval(100));
941
if ((worker->state != FINISHED) && (worker->state != ABORTED) &&
942
(worker->state != QUIT)){
945
worker = worker->next;
952
/* tell all the threads to abort */
953
void import_abort_all(ImportJob *job, int wait_for_them)
955
ImportWorkerInfo *worker;
957
/* tell all the worker threads to abort */
958
job->flags |= FLAG_ABORT;
960
for (worker = job->worker_list; worker; worker = worker->next)
961
worker->command = ABORT;
964
/* Having done that, wait for them to say that they've stopped */
965
for (worker = job->worker_list; worker != NULL; ) {
966
DS_Sleep(PR_MillisecondsToInterval(100));
967
if ((worker->state != FINISHED) && (worker->state != ABORTED) &&
968
(worker->state != QUIT)) {
971
worker = worker->next;
977
/* Helper function to make up filenames */
978
int import_make_merge_filenames(char *directory, char *indexname, int pass,
979
char **oldname, char **newname)
981
/* Filenames look like this: attributename<LDBM_FILENAME_SUFFIX>
982
and need to be renamed to: attributename<LDBM_FILENAME_SUFFIX>.n
983
where n is the pass number.
985
*oldname = slapi_ch_smprintf("%s/%s%s", directory, indexname, LDBM_FILENAME_SUFFIX);
986
*newname = slapi_ch_smprintf("%s/%s.%d%s", directory, indexname, pass,
987
LDBM_FILENAME_SUFFIX);
988
if (!*oldname || !*newname) {
989
slapi_ch_free_string(oldname);
990
slapi_ch_free_string(newname);
996
/* Task here is as follows:
997
* First, if this is pass #1, check for the presence of a merge
998
* directory. If it is not present, create it.
999
* If it is present, delete all the files in it.
1000
* Then, flush the dblayer and close files.
1001
* Now create a numbered subdir of the merge directory for this pass.
1002
* Next, move the index files, except entrydn, parentid and id2entry to
1003
* the merge subdirectory. Important to move if we can, because
1004
* that can be millions of times faster than a copy.
1005
* Finally open the dblayer back up because the caller expects
1006
* us to not muck with it.
1008
static int import_sweep_after_pass(ImportJob *job)
1010
backend *be = job->inst->inst_be;
1013
import_log_notice(job, "Sweeping files for merging later...");
1015
ret = dblayer_instance_close(be);
1018
/* Walk the list of index jobs */
1019
ImportWorkerInfo *current_worker = NULL;
1021
for (current_worker = job->worker_list; current_worker != NULL;
1022
current_worker = current_worker->next) {
1023
/* Foreach job, rename the file to <filename>.n, where n is the
1025
if ((current_worker->work_type != FOREMAN) &&
1026
(current_worker->work_type != PRODUCER) &&
1027
(strcasecmp(current_worker->index_info->name, LDBM_PARENTID_STR) != 0)) {
1028
char *newname = NULL;
1029
char *oldname = NULL;
1031
ret = import_make_merge_filenames(job->inst->inst_dir_name,
1032
current_worker->index_info->name, job->current_pass,
1033
&oldname, &newname);
1037
if (PR_Access(oldname, PR_ACCESS_EXISTS) == PR_SUCCESS) {
1038
ret = PR_Rename(oldname, newname);
1039
if (ret != PR_SUCCESS) {
1040
PRErrorCode prerr = PR_GetError();
1041
import_log_notice(job, "Failed to rename file \"%s\" to \"%s\", "
1042
SLAPI_COMPONENT_NAME_NSPR " error %d (%s)",
1043
oldname, newname, prerr, slapd_pr_strerror(prerr));
1044
slapi_ch_free( (void**)&newname);
1045
slapi_ch_free( (void**)&oldname);
1049
slapi_ch_free( (void**)&newname);
1050
slapi_ch_free( (void**)&oldname);
1054
ret = dblayer_instance_start(be, DBLAYER_IMPORT_MODE);
1058
import_log_notice(job, "Sweep done.");
1060
if (ENOSPC == ret) {
1061
import_log_notice(job, "ERROR: NO DISK SPACE LEFT in sweep phase");
1063
import_log_notice(job, "ERROR: Sweep phase error %d (%s)", ret,
1064
dblayer_strerror(ret));
1071
/* when the import is done, this function is called to bring stuff back up.
1072
* returns 0 on success; anything else is an error
1074
static int import_all_done(ImportJob *job, int ret)
1076
ldbm_instance *inst = job->inst;
1078
/* Writing this file indicates to future server startups that
1079
* the db is OK unless it's in the dry run mode. */
1080
if ((ret == 0) && !(job->flags & FLAG_DRYRUN)) {
1081
char inst_dir[MAXPATHLEN*2];
1082
char *inst_dirp = NULL;
1083
inst_dirp = dblayer_get_full_inst_dir(inst->inst_li, inst,
1084
inst_dir, MAXPATHLEN*2);
1085
ret = dbversion_write(inst->inst_li, inst_dirp, NULL, DBVERSION_ALL);
1086
if (inst_dirp != inst_dir)
1087
slapi_ch_free_string(&inst_dirp);
1090
if ((job->task != NULL) && (0 == slapi_task_get_refcount(job->task))) {
1091
slapi_task_finish(job->task, ret);
1094
if (job->flags & FLAG_ONLINE) {
1095
/* make sure the indexes are online as well */
1096
/* richm 20070919 - if index entries are added online, they
1097
are created and marked as INDEX_OFFLINE, in anticipation
1098
of someone doing a db2index. In this case, the db2index
1099
code will correctly unset the INDEX_OFFLINE flag.
1100
However, if import is used to create the indexes, the
1101
INDEX_OFFLINE flag will not be cleared. So, we do that
1104
IndexInfo *index = job->index_list;
1105
while (index != NULL) {
1106
index->ai->ai_indexmask &= ~INDEX_OFFLINE;
1107
index = index->next;
1109
/* start up the instance */
1110
ret = dblayer_instance_start(job->inst->inst_be, DBLAYER_NORMAL_MODE);
1114
/* Reset USN slapi_counter with the last key of the entryUSN index */
1115
ldbm_set_last_usn(inst->inst_be);
1117
/* bring backend online again */
1118
slapi_mtn_be_enable(inst->inst_be);
1125
int import_main_offline(void *arg)
1127
ImportJob *job = (ImportJob *)arg;
1128
ldbm_instance *inst = job->inst;
1129
backend *be = inst->inst_be;
1131
time_t beginning = 0;
1137
ImportWorkerInfo *producer = NULL;
1138
char *opstr = "Import";
1141
slapi_task_inc_refcount(job->task);
1143
if (job->flags & FLAG_UPGRADEDNFORMAT) {
1144
if (job->flags & FLAG_DRYRUN) {
1145
opstr = "Upgrade Dn Dryrun";
1147
opstr = "Upgrade Dn";
1149
} else if (job->flags & FLAG_REINDEXING) {
1150
opstr = "Reindexing";
1152
PR_ASSERT(inst != NULL);
1155
/* Decide which indexes are needed */
1156
if (job->flags & FLAG_INDEX_ATTRS) {
1157
/* Here, we get an AVL tree which contains nodes for all attributes
1158
* in the schema. Given this tree, we need to identify those nodes
1159
* which are marked for indexing. */
1160
avl_apply(job->inst->inst_attrs, (IFP)import_attr_callback,
1161
(caddr_t)job, -1, AVL_INORDER);
1162
vlv_getindices((IFP)import_attr_callback, (void *)job, be);
1165
/* Determine how much index buffering space to allocate to each index */
1166
import_set_index_buffer_size(job);
1168
/* initialize the entry FIFO */
1169
ret = import_fifo_init(job);
1171
if (! (job->flags & FLAG_USE_FILES)) {
1172
PR_Lock(job->wire_lock);
1173
PR_NotifyCondVar(job->wire_cv);
1174
PR_Unlock(job->wire_lock);
1179
if (job->flags & FLAG_USE_FILES) {
1180
/* importing from files: start up a producer thread to read the
1181
* files and queue them
1183
producer = CALLOC(ImportWorkerInfo);
1187
/* start the producer */
1188
import_init_worker_info(producer, job);
1189
producer->work_type = PRODUCER;
1190
if (job->flags & FLAG_UPGRADEDNFORMAT)
1192
if (! CREATE_THREAD(PR_USER_THREAD, (VFP)upgradedn_producer,
1193
producer, PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
1194
PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) {
1195
PRErrorCode prerr = PR_GetError();
1196
LDAPDebug(LDAP_DEBUG_ANY,
1197
"unable to spawn upgrade dn producer thread, "
1198
SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
1199
prerr, slapd_pr_strerror(prerr), 0);
1203
else if (job->flags & FLAG_REINDEXING)
1205
if (! CREATE_THREAD(PR_USER_THREAD, (VFP)index_producer, producer,
1206
PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
1207
PR_UNJOINABLE_THREAD,
1208
SLAPD_DEFAULT_THREAD_STACKSIZE)) {
1209
PRErrorCode prerr = PR_GetError();
1210
LDAPDebug(LDAP_DEBUG_ANY,
1211
"unable to spawn index producer thread, "
1212
SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
1213
prerr, slapd_pr_strerror(prerr), 0);
1219
import_log_notice(job, "Beginning import job...");
1220
if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_producer, producer,
1221
PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
1222
PR_UNJOINABLE_THREAD,
1223
SLAPD_DEFAULT_THREAD_STACKSIZE)) {
1224
PRErrorCode prerr = PR_GetError();
1225
LDAPDebug(LDAP_DEBUG_ANY,
1226
"unable to spawn import producer thread, "
1227
SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
1228
prerr, slapd_pr_strerror(prerr), 0);
1233
if (0 == job->job_index_buffer_suggestion)
1234
import_log_notice(job, "Index buffering is disabled.");
1236
import_log_notice(job,
1237
"Index buffering enabled with bucket size %lu",
1238
job->job_index_buffer_suggestion);
1240
job->worker_list = producer;
1242
/* release the startup lock and let the entries start queueing up
1244
PR_Lock(job->wire_lock);
1245
PR_NotifyCondVar(job->wire_cv);
1246
PR_Unlock(job->wire_lock);
1249
/* Run as many passes as we need to complete the job or die honourably in
1251
while (! finished) {
1252
job->current_pass++;
1254
ret = import_run_pass(job, &status);
1255
/* The following could have happened:
1256
* (a) Some error happened such that we're hosed.
1257
* This is indicated by a non-zero return code.
1258
* (b) We finished the complete file without needing a second pass
1259
* This is indicated by a zero return code and a status of
1260
* IMPORT_COMPLETE_PASS and current_pass == 1;
1261
* (c) We completed a pass and need at least another one
1262
* This is indicated by a zero return code and a status of
1263
* IMPORT_INCOMPLETE_PASS
1264
* (d) We just completed what turned out to be the last in a
1266
* This is indicated by a zero return code and a status of
1267
* IMPORT_COMPLETE_PASS and current_pass > 1
1269
if (ret == ERR_IMPORT_ABORTED) {
1270
/* at least one of the threads has aborted -- shut down ALL
1272
import_log_notice(job, "Aborting all %s threads...", opstr);
1273
/* this abort sets the abort flag on the threads and will block for
1274
* the exit of all threads
1276
import_set_abort_flag_all(job, 1);
1277
import_log_notice(job, "%s threads aborted.", opstr);
1281
if (ret == DRYRUN_QUIT) {
1282
goto error; /* Found the candidate; close the db files and quit */
1286
/* Some horrible fate has befallen the import */
1287
import_log_notice(job, "Fatal pass error %d", ret);
1291
/* No error, but a number of possibilities */
1292
if ( IMPORT_COMPLETE_PASS == status ) {
1293
if (1 == job->current_pass) {
1294
/* We're done !!!! */ ;
1296
/* Save the files, then merge */
1297
ret = import_sweep_after_pass(job);
1301
ret = import_mega_merge(job);
1308
if (IMPORT_INCOMPLETE_PASS == status) {
1309
/* Need to go round again */
1310
/* Time to save the files we've built for later */
1311
ret = import_sweep_after_pass(job);
1315
if ( (inst->inst_li->li_maxpassbeforemerge != 0) &&
1316
(job->current_pass > inst->inst_li->li_maxpassbeforemerge) )
1318
ret = import_mega_merge(job);
1322
job->current_pass = 1;
1323
ret = import_sweep_after_pass(job);
1329
/* Fixup the first_ID value to reflect previous work */
1330
job->first_ID = job->ready_ID + 1;
1331
import_free_thread_data(job);
1332
job->worker_list = producer;
1333
import_log_notice(job, "Beginning pass number %d",
1342
/* kill the producer now; we're done */
1344
import_log_notice(job, "Cleaning up producer thread...");
1345
producer->command = STOP;
1346
/* wait for the lead thread to stop */
1347
while (producer->state != FINISHED) {
1348
DS_Sleep(PR_MillisecondsToInterval(100));
1352
import_log_notice(job, "Indexing complete. Post-processing...");
1353
/* Now do the numsubordinates attribute */
1354
/* [610066] reindexed db cannot be used in the following backup/restore */
1355
if ( (!(job->flags & FLAG_REINDEXING) || (job->flags & FLAG_DN2RDN)) &&
1356
(ret = update_subordinatecounts(be, job->mothers, job->encrypt, NULL))
1358
import_log_notice(job, "Failed to update numsubordinates attributes");
1361
import_log_notice(job, "Generating numSubordinates complete.");
1363
if (!entryrdn_get_noancestorid()) {
1364
/* And the ancestorid index */
1365
/* Creating ancestorid from the scratch; delete the index file first. */
1366
struct attrinfo *ai = NULL;
1367
ainfo_get(be, "ancestorid", &ai);
1368
dblayer_erase_index_file(be, ai, 0);
1370
if ((ret = ldbm_ancestorid_create_index(be)) != 0) {
1371
import_log_notice(job, "Failed to create ancestorid index");
1376
import_log_notice(job, "Flushing caches...");
1377
if (0 != (ret = dblayer_flush(job->inst->inst_li)) ) {
1378
import_log_notice(job, "Failed to flush database");
1382
/* New way to exit the routine: check the return code.
1383
* If it's non-zero, delete the database files.
1384
* Otherwise don't, but always close the database layer properly.
1385
* Then return. This ensures that we can't make a half-good/half-bad
1389
/* If we fail, the database is now in a mess, so we delete it
1390
except dry run mode */
1391
import_log_notice(job, "Closing files...");
1392
cache_clear(&job->inst->inst_cache, CACHE_TYPE_ENTRY);
1393
if (entryrdn_get_switch()) {
1394
cache_clear(&job->inst->inst_dncache, CACHE_TYPE_DN);
1397
/* If aborted, it's safer to rebuild the caches. */
1398
cache_destroy_please(&job->inst->inst_cache, CACHE_TYPE_ENTRY);
1399
if (entryrdn_get_switch()) { /* subtree-rename: on */
1400
cache_destroy_please(&job->inst->inst_dncache, CACHE_TYPE_DN);
1402
/* initialize the entry cache */
1403
if (! cache_init(&(inst->inst_cache), DEFAULT_CACHE_SIZE,
1404
DEFAULT_CACHE_ENTRIES, CACHE_TYPE_ENTRY)) {
1405
LDAPDebug0Args(LDAP_DEBUG_ANY, "import_main_offline: "
1406
"cache_init failed. Server should be restarted.\n");
1409
/* initialize the dn cache */
1410
if (! cache_init(&(inst->inst_dncache), DEFAULT_DNCACHE_SIZE,
1411
DEFAULT_DNCACHE_MAXCOUNT, CACHE_TYPE_DN)) {
1412
LDAPDebug0Args(LDAP_DEBUG_ANY, "import_main_offline: "
1413
"dn cache_init failed. Server should be restarted.\n");
1417
if (!(job->flags & FLAG_DRYRUN)) { /* If not dryrun */
1418
/* if running in the dry run mode, don't touch the db */
1419
dblayer_delete_instance_dir(be);
1421
dblayer_instance_close(job->inst->inst_be);
1423
if (0 != (ret = dblayer_instance_close(job->inst->inst_be)) ) {
1424
import_log_notice(job, "Failed to close database");
1427
if (!(job->flags & FLAG_ONLINE))
1428
dblayer_close(job->inst->inst_li, DBLAYER_IMPORT_MODE);
1431
if (verbose && (0 == ret)) {
1432
int seconds_to_import = end - beginning;
1433
size_t entries_processed = job->lead_ID - (job->starting_ID - 1);
1434
double entries_per_second =
1436
(double)entries_processed / (double)seconds_to_import : 0;
1438
if (job->not_here_skipped) {
1440
import_log_notice(job,
1441
"%s complete. Processed %lu entries "
1442
"(%d bad entries were skipped, "
1443
"%d entries were skipped because they don't "
1444
"belong to this database) in %d seconds. "
1445
"(%.2f entries/sec)",
1446
opstr, entries_processed,
1447
job->skipped, job->not_here_skipped,
1448
seconds_to_import, entries_per_second);
1450
import_log_notice(job,
1451
"%s complete. Processed %lu entries "
1452
"(%d entries were skipped because they don't "
1453
"belong to this database) "
1454
"in %d seconds. (%.2f entries/sec)",
1455
opstr, entries_processed,
1456
job->not_here_skipped, seconds_to_import,
1457
entries_per_second);
1461
import_log_notice(job,
1462
"%s complete. Processed %lu entries "
1463
"(%d were skipped) in %d seconds. "
1464
"(%.2f entries/sec)",
1465
opstr, entries_processed,
1466
job->skipped, seconds_to_import,
1467
entries_per_second);
1469
import_log_notice(job,
1470
"%s complete. Processed %lu entries "
1471
"in %d seconds. (%.2f entries/sec)",
1472
opstr, entries_processed,
1473
seconds_to_import, entries_per_second);
1478
if (job->flags & FLAG_DRYRUN) {
1480
import_log_notice(job, "%s complete. %s is up-to-date.",
1481
opstr, job->inst->inst_name);
1484
slapi_task_dec_refcount(job->task);
1486
import_all_done(job, ret);
1487
} else if (DRYRUN_QUIT == ret) {
1488
import_log_notice(job, "%s complete. %s needs upgradednformat.",
1489
opstr, job->inst->inst_name);
1491
slapi_task_dec_refcount(job->task);
1493
import_all_done(job, ret);
1497
if (job->task != NULL) {
1498
slapi_task_finish(job->task, ret);
1501
} else if (0 != ret) {
1502
import_log_notice(job, "%s failed.", opstr);
1503
if (job->task != NULL) {
1504
slapi_task_finish(job->task, ret);
1508
slapi_task_dec_refcount(job->task);
1510
import_all_done(job, ret);
1513
/* This instance isn't busy anymore */
1514
instance_set_not_busy(job->inst);
1516
import_free_job(job);
1524
* to be called by online import using PR_CreateThread()
1525
* offline import directly calls import_main_offline()
1528
void import_main(void *arg)
1530
import_main_offline(arg);
1533
int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb)
1536
int noattrindexes = 0;
1537
ImportJob *job = NULL;
1538
char **name_array = NULL;
1541
PRThread *thread = NULL;
1543
job = CALLOC(ImportJob);
1545
LDAPDebug(LDAP_DEBUG_ANY, "not enough memory to do import job\n",
1550
slapi_pblock_get( pb, SLAPI_BACKEND, &be);
1551
PR_ASSERT(NULL != be);
1552
job->inst = (ldbm_instance *)be->be_instance_info;
1553
slapi_pblock_get( pb, SLAPI_LDIF2DB_NOATTRINDEXES, &noattrindexes );
1554
slapi_pblock_get( pb, SLAPI_LDIF2DB_FILE, &name_array );
1555
slapi_pblock_get(pb, SLAPI_SEQ_TYPE, &up_flags); /* For upgrade dn and
1558
/* the removedupvals field is blatantly overloaded here to mean
1559
* the chunk size too. (chunk size = number of entries that should
1560
* be imported before starting a new pass. usually for debugging.)
1562
slapi_pblock_get(pb, SLAPI_LDIF2DB_REMOVEDUPVALS, &job->merge_chunk_size);
1563
if (job->merge_chunk_size == 1)
1564
job->merge_chunk_size = 0;
1565
/* get list of specifically included and/or excluded subtrees from
1567
ldbm_back_fetch_incl_excl(pb, &job->include_subtrees,
1568
&job->exclude_subtrees);
1569
/* get cn=tasks info, if any */
1570
slapi_pblock_get(pb, SLAPI_BACKEND_TASK, &job->task);
1571
slapi_pblock_get(pb, SLAPI_LDIF2DB_ENCRYPT, &job->encrypt);
1572
/* get uniqueid info */
1573
slapi_pblock_get(pb, SLAPI_LDIF2DB_GENERATE_UNIQUEID, &job->uuid_gen_type);
1574
if (job->uuid_gen_type == SLAPI_UNIQUEID_GENERATE_NAME_BASED) {
1577
slapi_pblock_get(pb, SLAPI_LDIF2DB_NAMESPACEID, &namespaceid);
1578
job->uuid_namespace = slapi_ch_strdup(namespaceid);
1581
job->flags = FLAG_USE_FILES;
1582
if (NULL == name_array) { /* no ldif file is given -> reindexing or
1584
if (up_flags & SLAPI_UPGRADEDNFORMAT) {
1585
job->flags |= FLAG_UPGRADEDNFORMAT;
1586
if (up_flags & SLAPI_DRYRUN) {
1587
job->flags |= FLAG_DRYRUN;
1590
job->flags |= FLAG_REINDEXING; /* call index_producer */
1591
if (up_flags & SLAPI_UPGRADEDB_DN2RDN) {
1592
if (entryrdn_get_switch()) {
1593
job->flags |= FLAG_DN2RDN; /* migrate to the rdn format */
1595
LDAPDebug1Arg(LDAP_DEBUG_ANY,
1596
"DN to RDN option is specified, "
1597
"but %s is not enabled\n",
1598
CONFIG_ENTRYRDN_SWITCH);
1599
import_free_job(job);
1606
if (!noattrindexes) {
1607
job->flags |= FLAG_INDEX_ATTRS;
1609
for (i = 0; name_array && name_array[i] != NULL; i++) {
1610
charray_add(&job->input_filenames, slapi_ch_strdup(name_array[i]));
1612
job->starting_ID = 1;
1614
job->mothers = CALLOC(import_subcount_stuff);
1616
/* how much space should we allocate to index buffering? */
1617
job->job_index_buffer_size = import_get_index_buffer_size();
1618
if (job->job_index_buffer_size == 0) {
1619
/* 10% of the allocated cache size + one meg */
1620
PR_Lock(job->inst->inst_li->li_config_mutex);
1621
job->job_index_buffer_size =
1622
(job->inst->inst_li->li_import_cachesize/10) + (1024*1024);
1623
PR_Unlock(job->inst->inst_li->li_config_mutex);
1625
import_subcount_stuff_init(job->mothers);
1627
if (job->task != NULL) {
1628
/* count files, use that to track "progress" in cn=tasks */
1630
while (name_array && name_array[total_files] != NULL)
1632
/* add 1 to account for post-import cleanup (which can take a
1633
* significant amount of time)
1635
/* NGK - This should eventually be cleaned up to use the public
1637
if (0 == total_files) { /* reindexing */
1638
job->task->task_work = 2;
1640
job->task->task_work = total_files + 1;
1642
job->task->task_progress = 0;
1643
job->task->task_state = SLAPI_TASK_RUNNING;
1644
slapi_task_set_data(job->task, job);
1645
slapi_task_set_destructor_fn(job->task, import_task_destroy);
1646
slapi_task_set_cancel_fn(job->task, import_task_abort);
1647
job->flags |= FLAG_ONLINE;
1649
/* create thread for import_main, so we can return */
1650
thread = PR_CreateThread(PR_USER_THREAD, import_main, (void *)job,
1651
PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
1652
PR_UNJOINABLE_THREAD,
1653
SLAPD_DEFAULT_THREAD_STACKSIZE);
1654
if (thread == NULL) {
1655
PRErrorCode prerr = PR_GetError();
1656
LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import thread, "
1657
SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
1658
prerr, slapd_pr_strerror(prerr), 0);
1659
import_free_job(job);
1666
/* old style -- do it all synchronously (THIS IS GOING AWAY SOON) */
1667
return import_main_offline((void *)job);