~ubuntu-branches/ubuntu/precise/389-ds-base/precise

« back to all changes in this revision

Viewing changes to ldap/servers/slapd/back-ldbm/import.c

  • Committer: Package Import Robot
  • Author(s): Timo Aaltonen
  • Date: 2012-03-01 23:54:24 UTC
  • Revision ID: package-import@ubuntu.com-20120301235424-kaim42d08pic3xi3
Tags: upstream-1.2.10.2
ImportĀ upstreamĀ versionĀ 1.2.10.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/** BEGIN COPYRIGHT BLOCK
 
2
 * This Program is free software; you can redistribute it and/or modify it under
 
3
 * the terms of the GNU General Public License as published by the Free Software
 
4
 * Foundation; version 2 of the License.
 
5
 * 
 
6
 * This Program is distributed in the hope that it will be useful, but WITHOUT
 
7
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 
8
 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
 
9
 * 
 
10
 * You should have received a copy of the GNU General Public License along with
 
11
 * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple
 
12
 * Place, Suite 330, Boston, MA 02111-1307 USA.
 
13
 * 
 
14
 * In addition, as a special exception, Red Hat, Inc. gives You the additional
 
15
 * right to link the code of this Program with code not covered under the GNU
 
16
 * General Public License ("Non-GPL Code") and to distribute linked combinations
 
17
 * including the two, subject to the limitations in this paragraph. Non-GPL Code
 
18
 * permitted under this exception must only link to the code of this Program
 
19
 * through those well defined interfaces identified in the file named EXCEPTION
 
20
 * found in the source code files (the "Approved Interfaces"). The files of
 
21
 * Non-GPL Code may instantiate templates or use macros or inline functions from
 
22
 * the Approved Interfaces without causing the resulting work to be covered by
 
23
 * the GNU General Public License. Only Red Hat, Inc. may make changes or
 
24
 * additions to the list of Approved Interfaces. You must obey the GNU General
 
25
 * Public License in all respects for all of the Program code and other code used
 
26
 * in conjunction with the Program except the Non-GPL Code covered by this
 
27
 * exception. If you modify this file, you may extend this exception to your
 
28
 * version of the file, but you are not obligated to do so. If you do not wish to
 
29
 * provide this exception without modification, you must delete this exception
 
30
 * statement from your version and license this file solely under the GPL without
 
31
 * exception. 
 
32
 * 
 
33
 * 
 
34
 * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
 
35
 * Copyright (C) 2005 Red Hat, Inc.
 
36
 * All rights reserved.
 
37
 * END COPYRIGHT BLOCK **/
 
38
 
 
39
#ifdef HAVE_CONFIG_H
 
40
#  include <config.h>
 
41
#endif
 
42
 
 
43
/*
 
44
 * the "new" ("deluxe") backend import code
 
45
 *
 
46
 * please make sure you use 4-space indentation on this file.
 
47
 */
 
48
 
 
49
#include "back-ldbm.h"
 
50
#include "vlv_srch.h"
 
51
#include "import.h"
 
52
 
 
53
#define ERR_IMPORT_ABORTED      -23
 
54
#define DRYRUN_QUIT             -24
 
55
 
 
56
 
 
57
/********** routines to manipulate the entry fifo **********/
 
58
 
 
59
/* this is pretty bogus -- could be a HUGE amount of memory */
 
60
/* Not anymore with the Import Queue Adaptative Algorithm (Regulation) */
 
61
#define MAX_FIFO_SIZE    8000
 
62
 
 
63
static int import_fifo_init(ImportJob *job)
 
64
{
 
65
    ldbm_instance *inst = job->inst;
 
66
 
 
67
    /* Work out how big the entry fifo can be */
 
68
    if (inst->inst_cache.c_maxentries > 0)
 
69
    job->fifo.size = inst->inst_cache.c_maxentries;
 
70
    else
 
71
    job->fifo.size = inst->inst_cache.c_maxsize / 1024;    /* guess */
 
72
 
 
73
    /* byte limit that should be respected to avoid memory starvation */
 
74
    /* conservative computing: multiply by .8 to allow for reasonable overflow */
 
75
    job->fifo.bsize = (inst->inst_cache.c_maxsize/10) << 3;
 
76
 
 
77
    job->fifo.c_bsize = 0;
 
78
    
 
79
    if (job->fifo.size > MAX_FIFO_SIZE)
 
80
    job->fifo.size = MAX_FIFO_SIZE;
 
81
    /* has to be at least 1 or 2, and anything less than about 100 destroys
 
82
     * the point of doing all this optimization in the first place. */
 
83
    if (job->fifo.size < 100)
 
84
        job->fifo.size = 100;
 
85
 
 
86
    /* Get memory for the entry fifo */
 
87
    /* This is used to keep a ref'ed pointer to the last <cachesize> 
 
88
     * processed entries */
 
89
    PR_ASSERT(NULL == job->fifo.item);
 
90
    job->fifo.item = (FifoItem *)slapi_ch_calloc(job->fifo.size,
 
91
                         sizeof(FifoItem));
 
92
    if (NULL == job->fifo.item) {
 
93
    /* Memory allocation error */
 
94
    return -1;
 
95
    }
 
96
    return 0;
 
97
}
 
98
 
 
99
FifoItem *import_fifo_fetch(ImportJob *job, ID id, int worker)
 
100
{
 
101
    int idx = id % job->fifo.size;
 
102
    FifoItem *fi;
 
103
 
 
104
    if (job->fifo.item) {
 
105
        fi = &(job->fifo.item[idx]);
 
106
    } else {
 
107
        return NULL;
 
108
    }
 
109
    if (fi->entry) {
 
110
        if (worker) {
 
111
            if (fi->bad) {
 
112
                if (fi->bad == FIFOITEM_BAD) {
 
113
                    fi->bad = FIFOITEM_BAD_PRINTED;
 
114
                    import_log_notice(job, "WARNING: bad entry: ID %d", id);
 
115
                }
 
116
                return NULL;
 
117
            }
 
118
            PR_ASSERT(fi->entry->ep_refcnt > 0);
 
119
        }
 
120
    }
 
121
    return fi;
 
122
}
 
123
 
 
124
static void import_fifo_destroy(ImportJob *job)
 
125
{
 
126
    /* Free any entries in the fifo first */
 
127
    struct backentry *be = NULL;
 
128
    size_t i = 0;
 
129
 
 
130
    for (i = 0; i < job->fifo.size; i++) {
 
131
        be = job->fifo.item[i].entry;
 
132
        backentry_free(&be);
 
133
        job->fifo.item[i].entry = NULL;
 
134
        job->fifo.item[i].filename = NULL;
 
135
    }
 
136
    slapi_ch_free((void **)&job->fifo.item);
 
137
    job->fifo.item = NULL;
 
138
}
 
139
 
 
140
 
 
141
/********** logging stuff **********/
 
142
 
 
143
#define LOG_BUFFER        512
 
144
 
 
145
/* this changes the 'nsTaskStatus' value, which is transient (anything logged
 
146
 * here wipes out any previous status)
 
147
 */
 
148
static void import_log_status_start(ImportJob *job)
 
149
{
 
150
    if (! job->task_status)
 
151
    job->task_status = (char *)slapi_ch_malloc(10 * LOG_BUFFER);
 
152
    if (! job->task_status)
 
153
    return;        /* out of memory? */
 
154
 
 
155
    job->task_status[0] = 0;
 
156
}
 
157
 
 
158
static void import_log_status_add_line(ImportJob *job, char *format, ...)
 
159
{
 
160
    va_list ap;
 
161
    int len = 0;
 
162
 
 
163
    if (! job->task_status)
 
164
        return;
 
165
    len = strlen(job->task_status);
 
166
    if (len + 5 > (10 * LOG_BUFFER))
 
167
        return;         /* no room */
 
168
 
 
169
    if (job->task_status[0])
 
170
        strcat(job->task_status, "\n");
 
171
 
 
172
    va_start(ap, format);
 
173
    PR_vsnprintf(job->task_status + len, (10 * LOG_BUFFER) - len, format, ap);
 
174
    va_end(ap);
 
175
}
 
176
 
 
177
static void import_log_status_done(ImportJob *job)
 
178
{
 
179
    if (job->task) {
 
180
        slapi_task_log_status(job->task, "%s", job->task_status);
 
181
    }
 
182
}
 
183
 
 
184
/* this adds a line to the 'nsTaskLog' value, which is cumulative (anything
 
185
 * logged here is added to the end)
 
186
 */
 
187
void import_log_notice(ImportJob *job, char *format, ...)
 
188
{
 
189
    va_list ap;
 
190
    char buffer[LOG_BUFFER];
 
191
 
 
192
    va_start(ap, format);
 
193
    PR_vsnprintf(buffer, LOG_BUFFER, format, ap);
 
194
    va_end(ap);
 
195
 
 
196
    if (job->task) {
 
197
        slapi_task_log_notice(job->task, "%s", buffer);
 
198
    }
 
199
    /* also save it in the logs for posterity */
 
200
    if (job->flags & FLAG_UPGRADEDNFORMAT) {
 
201
        LDAPDebug(LDAP_DEBUG_ANY, "upgradedn %s: %s\n", job->inst->inst_name,
 
202
                  buffer, 0);
 
203
    } else if (job->flags & FLAG_REINDEXING) {
 
204
        LDAPDebug(LDAP_DEBUG_ANY, "reindex %s: %s\n", job->inst->inst_name,
 
205
                  buffer, 0);
 
206
    } else {
 
207
        LDAPDebug(LDAP_DEBUG_ANY, "import %s: %s\n", job->inst->inst_name,
 
208
                  buffer, 0);
 
209
    }
 
210
}
 
211
 
 
212
static void import_task_destroy(Slapi_Task *task)
 
213
{
 
214
    ImportJob *job = (ImportJob *)slapi_task_get_data(task);
 
215
 
 
216
    if (job && job->task_status) {
 
217
        slapi_ch_free((void **)&job->task_status);
 
218
        job->task_status = NULL;
 
219
    }
 
220
    FREE(job);
 
221
    slapi_task_set_data(task, NULL);
 
222
}
 
223
 
 
224
static void import_task_abort(Slapi_Task *task)
 
225
{
 
226
    ImportJob *job;
 
227
 
 
228
    /* don't log anything from here, because we're still holding the
 
229
     * DSE lock for modify...
 
230
     */
 
231
 
 
232
    if (slapi_task_get_state(task) == SLAPI_TASK_FINISHED) {
 
233
        /* too late */
 
234
    }
 
235
 
 
236
    /*
 
237
     * Race condition.
 
238
     * If the import thread happens to finish right now we're in trouble
 
239
     * because it will free the job.
 
240
     */
 
241
 
 
242
    job = (ImportJob *)slapi_task_get_data(task);
 
243
 
 
244
    import_abort_all(job, 0); 
 
245
    while (slapi_task_get_state(task) != SLAPI_TASK_FINISHED)
 
246
         DS_Sleep(PR_MillisecondsToInterval(100));
 
247
 
 
248
}
 
249
 
 
250
 
 
251
/********** helper functions for importing **********/
 
252
 
 
253
 
 
254
/* Function used to gather a list of indexed attrs */
 
255
static int import_attr_callback(void *node, void *param)
 
256
{
 
257
    ImportJob *job = (ImportJob *)param;
 
258
    struct attrinfo *a = (struct attrinfo *)node;
 
259
 
 
260
    if (job->flags & FLAG_DRYRUN) { /* dryrun; we don't need the workers */
 
261
        return 0;
 
262
    }
 
263
    if (job->flags & FLAG_UPGRADEDNFORMAT) {
 
264
        /* Bring up import workers just for indexes having DN syntax 
 
265
         * attribute type. (except entrydn -- taken care below) */
 
266
        int rc = 0;
 
267
        Slapi_Attr attr = {0};
 
268
 
 
269
        /* 
 
270
         * Treat cn and ou specially.  Bring up the import workers for
 
271
         * cn and ou even though they are not DN syntax attribute.
 
272
         * This is done because they have some exceptional case to store
 
273
         * DN format in the admin entries such as UserPreferences.
 
274
         */
 
275
        if ((0 == PL_strcasecmp("cn", a->ai_type)) || 
 
276
            (0 == PL_strcasecmp("commonname", a->ai_type)) ||
 
277
            (0 == PL_strcasecmp("ou", a->ai_type)) ||
 
278
            (0 == PL_strcasecmp("organizationalUnit", a->ai_type))) {
 
279
            ;
 
280
        } else {
 
281
            slapi_attr_init(&attr, a->ai_type);
 
282
            rc = slapi_attr_is_dn_syntax_attr(&attr);
 
283
            attr_done(&attr);
 
284
            if (0 == rc) {
 
285
                return 0;
 
286
            }
 
287
        }
 
288
    }
 
289
 
 
290
    /* OK, so we now have hold of the attribute structure and the job info, 
 
291
     * let's see what we have.  Remember that although this function is called
 
292
     * many times, all these calls are in the context of a single thread, so we
 
293
     * don't need to worry about protecting the data in the job structure.
 
294
     */
 
295
 
 
296
    /* We need to specifically exclude the (entrydn, entryrdn) & parentid &
 
297
     * ancestorid indexes because we build those in the foreman thread.
 
298
     */
 
299
    if (IS_INDEXED(a->ai_indexmask) &&
 
300
        (strcasecmp(a->ai_type, LDBM_ENTRYDN_STR) != 0) &&
 
301
        (strcasecmp(a->ai_type, LDBM_ENTRYRDN_STR) != 0) &&
 
302
        (strcasecmp(a->ai_type, LDBM_PARENTID_STR) != 0) &&
 
303
        (strcasecmp(a->ai_type, LDBM_ANCESTORID_STR) != 0) &&
 
304
        (strcasecmp(a->ai_type, numsubordinates) != 0)) {
 
305
        /* Make an import_index_info structure, fill it in and insert into the
 
306
         * job's list */
 
307
        IndexInfo *info = CALLOC(IndexInfo);
 
308
    
 
309
        if (NULL == info) {
 
310
            /* Memory allocation error */
 
311
            return -1;
 
312
        }
 
313
        info->name = slapi_ch_strdup(a->ai_type);
 
314
        info->ai = a;
 
315
        if (NULL == info->name) {
 
316
            /* Memory allocation error */
 
317
            FREE(info);
 
318
            return -1;
 
319
        }
 
320
        info->next = job->index_list;
 
321
        job->index_list = info;
 
322
        job->number_indexers++;
 
323
    }
 
324
    return 0;
 
325
}
 
326
 
 
327
static void import_set_index_buffer_size(ImportJob *job)
 
328
{
 
329
    IndexInfo *current_index = NULL;
 
330
    size_t substring_index_count = 0;
 
331
    size_t proposed_size = 0;
 
332
 
 
333
    /* Count the substring indexes we have */
 
334
    for (current_index = job->index_list; current_index != NULL;
 
335
     current_index = current_index->next)  {
 
336
    if (current_index->ai->ai_indexmask & INDEX_SUB) {
 
337
        substring_index_count++;
 
338
    }
 
339
    }
 
340
    if (substring_index_count > 0) {
 
341
    /* Make proposed size such that if all substring indices were
 
342
     * reasonably full, we'd hit the target space */
 
343
    proposed_size = (job->job_index_buffer_size / substring_index_count) /
 
344
        IMPORT_INDEX_BUFFER_SIZE_CONSTANT;
 
345
    if (proposed_size > IMPORT_MAX_INDEX_BUFFER_SIZE) {
 
346
        proposed_size = IMPORT_MAX_INDEX_BUFFER_SIZE;
 
347
    }
 
348
    if (proposed_size < IMPORT_MIN_INDEX_BUFFER_SIZE) {
 
349
        proposed_size = 0;
 
350
    }
 
351
    }
 
352
 
 
353
    job->job_index_buffer_suggestion = proposed_size;
 
354
}
 
355
 
 
356
static void import_free_thread_data(ImportJob *job)
 
357
{
 
358
    /* DBDB free the lists etc */
 
359
    ImportWorkerInfo *worker = job->worker_list;
 
360
 
 
361
    while (worker != NULL) {
 
362
    ImportWorkerInfo *asabird = worker;
 
363
    worker = worker->next;
 
364
    if (asabird->work_type != PRODUCER)
 
365
        slapi_ch_free( (void**)&asabird);
 
366
    }
 
367
}
 
368
 
 
369
void import_free_job(ImportJob *job)
 
370
{
 
371
    /* DBDB free the lists etc */
 
372
    IndexInfo *index = job->index_list;
 
373
 
 
374
    import_free_thread_data(job);
 
375
    while (index != NULL) {
 
376
        IndexInfo *asabird = index;
 
377
        index = index->next;
 
378
        slapi_ch_free( (void**)&asabird->name);
 
379
        slapi_ch_free( (void**)&asabird);
 
380
    }
 
381
    job->index_list = NULL;
 
382
    if (NULL != job->mothers) {
 
383
        import_subcount_stuff_term(job->mothers);
 
384
        slapi_ch_free( (void**)&job->mothers);
 
385
    }
 
386
    
 
387
    ldbm_back_free_incl_excl(job->include_subtrees, job->exclude_subtrees);
 
388
    charray_free(job->input_filenames);
 
389
    if (job->fifo.size)
 
390
        import_fifo_destroy(job);
 
391
    if (NULL != job->uuid_namespace)
 
392
        slapi_ch_free((void **)&job->uuid_namespace);
 
393
    if (job->wire_lock)
 
394
        PR_DestroyLock(job->wire_lock);
 
395
    if (job->wire_cv)
 
396
        PR_DestroyCondVar(job->wire_cv);
 
397
    slapi_ch_free((void **)&job->task_status);
 
398
}
 
399
 
 
400
/* determine if we are the correct backend for this entry
 
401
 * (in a distributed suffix, some entries may be for other backends).
 
402
 * if the entry's dn actually matches one of the suffixes of the be, we 
 
403
 * automatically take it as a belonging one, for such entries must be 
 
404
 * present in EVERY backend independently of the distribution applied.
 
405
 */
 
406
int import_entry_belongs_here(Slapi_Entry *e, backend *be)
 
407
{
 
408
    Slapi_Backend *retbe;
 
409
    Slapi_DN *sdn = slapi_entry_get_sdn(e);
 
410
 
 
411
    if (slapi_be_issuffix(be, sdn))
 
412
        return 1;
 
413
 
 
414
    retbe = slapi_mapping_tree_find_backend_for_sdn(sdn);
 
415
    return (retbe == be);
 
416
}
 
417
 
 
418
 
 
419
/********** starting threads and stuff **********/
 
420
 
 
421
/* Solaris is weird---we need an LWP per thread but NSPR doesn't give us
 
422
 * one unless we make this magic belshe-call */
 
423
/* Fixed on Solaris 8; NSPR supports PR_GLOBAL_BOUND_THREAD */
 
424
#define CREATE_THREAD PR_CreateThread
 
425
 
 
426
static void import_init_worker_info(ImportWorkerInfo *info, ImportJob *job)
 
427
{
 
428
    info->command = PAUSE; 
 
429
    info->job = job;
 
430
    info->first_ID = job->first_ID;
 
431
    info->index_buffer_size = job->job_index_buffer_suggestion;
 
432
}
 
433
 
 
434
static int import_start_threads(ImportJob *job)
 
435
{
 
436
    IndexInfo *current_index = NULL;
 
437
    ImportWorkerInfo *foreman = NULL, *worker = NULL;
 
438
 
 
439
    foreman = CALLOC(ImportWorkerInfo);
 
440
    if (!foreman)
 
441
    goto error;
 
442
 
 
443
    /* start the foreman */
 
444
    import_init_worker_info(foreman, job);
 
445
    foreman->work_type = FOREMAN;
 
446
    if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_foreman, foreman,
 
447
                        PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
 
448
                        PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) {
 
449
        PRErrorCode prerr = PR_GetError();
 
450
        LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import foreman thread, "
 
451
                  SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
 
452
                  prerr, slapd_pr_strerror(prerr), 0);
 
453
        FREE(foreman);
 
454
        goto error;
 
455
    }
 
456
 
 
457
    foreman->next = job->worker_list;
 
458
    job->worker_list = foreman;
 
459
    
 
460
    /* Start follower threads, if we are doing attribute indexing */
 
461
    current_index = job->index_list;
 
462
    if (job->flags & FLAG_INDEX_ATTRS) {
 
463
    while (current_index) {
 
464
        /* make a new thread info structure */
 
465
        worker = CALLOC(ImportWorkerInfo);
 
466
        if (! worker)
 
467
        goto error;
 
468
 
 
469
        /* fill it in */
 
470
        import_init_worker_info(worker, job);
 
471
        worker->index_info = current_index;
 
472
        worker->work_type = WORKER;
 
473
 
 
474
        /* Start the thread */
 
475
        if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_worker, worker,
 
476
                PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
 
477
                PR_UNJOINABLE_THREAD,
 
478
                SLAPD_DEFAULT_THREAD_STACKSIZE)) {
 
479
            PRErrorCode prerr = PR_GetError();
 
480
            LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import worker thread, "
 
481
                    SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
 
482
                    prerr, slapd_pr_strerror(prerr), 0);
 
483
            FREE(worker);
 
484
            goto error;
 
485
        }
 
486
 
 
487
        /* link it onto the job's thread list */
 
488
        worker->next = job->worker_list;
 
489
        job->worker_list = worker;
 
490
        current_index = current_index->next;
 
491
    }
 
492
    }
 
493
    return 0;
 
494
 
 
495
error:
 
496
    import_log_notice(job, "Import thread creation failed.");
 
497
    import_log_notice(job, "Aborting all import threads...");
 
498
    import_abort_all(job, 1);
 
499
    import_log_notice(job, "Import threads aborted.");
 
500
    return -1;
 
501
}
 
502
 
 
503
 
 
504
/********** monitoring the worker threads **********/
 
505
 
 
506
static void import_clear_progress_history(ImportJob *job)
 
507
{
 
508
    int i = 0;
 
509
 
 
510
    for (i = 0; i < IMPORT_JOB_PROG_HISTORY_SIZE /*- 1*/; i++) {
 
511
    job->progress_history[i] = job->first_ID;
 
512
    job->progress_times[i] = job->start_time;
 
513
    }
 
514
    /* reset libdb cache stats */
 
515
    job->inst->inst_cache_hits = job->inst->inst_cache_misses = 0;
 
516
}
 
517
 
 
518
static double import_grok_db_stats(ldbm_instance *inst)
 
519
{
 
520
    DB_MPOOL_STAT *mpstat = NULL;
 
521
    DB_MPOOL_FSTAT **mpfstat = NULL;
 
522
    int return_value = -1;
 
523
    double cache_hit_ratio = 0.0;
 
524
 
 
525
    return_value = dblayer_memp_stat_instance(inst, &mpstat, &mpfstat);
 
526
 
 
527
    if (!mpstat) {
 
528
        goto out;
 
529
    }
 
530
 
 
531
    if (0 == return_value) {
 
532
    unsigned long current_cache_hits = mpstat->st_cache_hit;
 
533
    unsigned long current_cache_misses = mpstat->st_cache_miss;
 
534
    
 
535
    if (inst->inst_cache_hits) {
 
536
        unsigned long hit_delta, miss_delta;
 
537
 
 
538
        hit_delta = current_cache_hits - inst->inst_cache_hits;
 
539
        miss_delta = current_cache_misses - inst->inst_cache_misses;
 
540
        if (hit_delta != 0) {
 
541
        cache_hit_ratio = (double)hit_delta /
 
542
            (double)(hit_delta + miss_delta);
 
543
        }
 
544
    }
 
545
    inst->inst_cache_misses = current_cache_misses;
 
546
    inst->inst_cache_hits = current_cache_hits;
 
547
    }
 
548
 
 
549
out:
 
550
    if (mpstat)
 
551
        slapi_ch_free((void **)&mpstat);
 
552
    if (mpfstat) {
 
553
#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR + DB_VERSION_PATCH <= 3204
 
554
            /* In DB 3.2.4 and earlier, we need to free each element */
 
555
        DB_MPOOL_FSTAT **tfsp;
 
556
        for (tfsp = mpfstat; *tfsp; tfsp++)
 
557
        slapi_ch_free((void **)tfsp);
 
558
#endif
 
559
        slapi_ch_free((void **)&mpfstat);
 
560
    }
 
561
    return cache_hit_ratio;
 
562
}
 
563
 
 
564
static char* import_decode_worker_state(int state)
 
565
{
 
566
    switch (state) {
 
567
    case WAITING:
 
568
        return "W";
 
569
    case RUNNING:
 
570
        return "R";
 
571
    case FINISHED:
 
572
        return "F";
 
573
    case ABORTED:
 
574
        return "A";
 
575
    default:
 
576
        return "?";
 
577
    }
 
578
}
 
579
 
 
580
static void import_print_worker_status(ImportWorkerInfo *info)
 
581
{
 
582
    char *name = (info->work_type == PRODUCER ? "Producer" :
 
583
                  (info->work_type == FOREMAN ? "Foreman" :
 
584
                   info->index_info->name));
 
585
 
 
586
    import_log_status_add_line(info->job,
 
587
                               "%-25s %s%10ld %7.1f", name,
 
588
                               import_decode_worker_state(info->state),
 
589
                               info->last_ID_processed, info->rate);
 
590
}
 
591
 
 
592
 
 
593
#define IMPORT_CHUNK_TEST_HOLDOFF_TIME (5*60)    /* Seconds */
 
594
 
 
595
/* Got to be lower than this: */
 
596
#define IMPORT_CHUNK_TEST_CACHE_HIT_RATIO (0.99)
 
597
/* Less than half as fast as we were doing: */
 
598
#define IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A (0.5)
 
599
/* A lot less fast than we were doing: */
 
600
#define IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B (0.1)
 
601
 
 
602
static int import_throw_in_towel(ImportJob *job, time_t current_time,
 
603
                                 ID trailing_ID)
 
604
{
 
605
    static int number_of_times_here = 0;
 
606
 
 
607
    /* secret -c option allows specific chunk size to be set... */
 
608
    if (job->merge_chunk_size != 0) {
 
609
    if ((0 != job->lead_ID) &&
 
610
            (trailing_ID > job->first_ID) &&
 
611
        (trailing_ID - job->first_ID > job->merge_chunk_size)) {
 
612
        return 1;
 
613
    }
 
614
    return 0;
 
615
    }
 
616
 
 
617
    /* Check stats to decide whether we're getting bogged down and should
 
618
     * terminate this pass.
 
619
     */
 
620
 
 
621
    /* Check #1 : are we more than 10 minutes into the chunk ? */
 
622
    if (current_time - job->start_time > IMPORT_CHUNK_TEST_HOLDOFF_TIME) {
 
623
    /* Check #2 : Have we slowed down considerably recently ? */
 
624
    if ((job->recent_progress_rate / job->average_progress_rate) < 
 
625
        IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A) {
 
626
        /* Check #3: Cache performing poorly---the puported reason
 
627
         * for the slowdown */
 
628
        if (job->cache_hit_ratio < IMPORT_CHUNK_TEST_CACHE_HIT_RATIO) {
 
629
        /* We have a winner ! */
 
630
        import_log_notice(job, "Decided to end this pass because "
 
631
                  "the progress rate has dropped below "
 
632
                  "the %.0f%% threshold.",
 
633
                  IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A*100.0);
 
634
        return 1;
 
635
        }
 
636
    } else {
 
637
        if ((job->recent_progress_rate / job->average_progress_rate) <
 
638
        IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B) {
 
639
        /* Alternative check: have we really, really slowed down, 
 
640
         * without the test for cache overflow? */
 
641
        /* This is designed to catch the case where the cache has
 
642
         * been misconfigured too large */
 
643
        if (number_of_times_here > 10) {
 
644
            /* Got to get here ten times at least */
 
645
            import_log_notice(job, "Decided to end this pass "
 
646
                      "because the progress rate "
 
647
                      "plummeted below %.0f%%",
 
648
                      IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B*100.0);
 
649
            return 1;
 
650
        }
 
651
        number_of_times_here++;
 
652
        }
 
653
    }
 
654
    }
 
655
 
 
656
    number_of_times_here = 0;
 
657
    return 0;
 
658
}
 
659
 
 
660
static void import_push_progress_history(ImportJob *job, ID current_id,
 
661
                     time_t current_time)
 
662
{
 
663
    int i = 0;
 
664
 
 
665
    for (i = 0; i < IMPORT_JOB_PROG_HISTORY_SIZE - 1; i++) {
 
666
    job->progress_history[i] = job->progress_history[i+1];
 
667
    job->progress_times[i] = job->progress_times[i+1];
 
668
    }
 
669
    job->progress_history[i] = current_id;
 
670
    job->progress_times[i] = current_time;
 
671
}
 
672
 
 
673
static void import_calc_rate(ImportWorkerInfo *info, int time_interval)
 
674
{
 
675
    size_t ids = info->last_ID_processed - info->previous_ID_counted;
 
676
    double rate = (double)ids / time_interval;
 
677
    
 
678
    if ( (info->previous_ID_counted != 0) && (info->last_ID_processed != 0) ) {
 
679
    info->rate = rate;
 
680
    } else {
 
681
    info->rate = 0;
 
682
    }
 
683
    info->previous_ID_counted = info->last_ID_processed;
 
684
}
 
685
 
 
686
/* find the rate (ids/time) of work from a worker thread between history
 
687
 * marks A and B.
 
688
 */
 
689
#define HISTORY(N)    (job->progress_history[N])
 
690
#define TIMES(N)    (job->progress_times[N])
 
691
#define PROGRESS(A, B)    ((HISTORY(B) > HISTORY(A)) ? \
 
692
                           ((double)(HISTORY(B) - HISTORY(A)) /    \
 
693
                            (double)(TIMES(B) - TIMES(A))) : \
 
694
                           (double)0)
 
695
 
 
696
static int import_monitor_threads(ImportJob *job, int *status)
 
697
{
 
698
    PRIntervalTime tenthsecond = PR_MillisecondsToInterval(100);
 
699
    ImportWorkerInfo *current_worker = NULL;
 
700
    ImportWorkerInfo *producer = NULL, *foreman = NULL;
 
701
    int finished = 0;
 
702
    int giveup = 0;
 
703
    int count = 1;              /* 1 to prevent premature status report */
 
704
    int producer_done = 0;
 
705
    const int display_interval = 200;
 
706
    time_t time_now = 0;
 
707
    time_t last_time = 0;
 
708
    time_t time_interval = 0;
 
709
    int rc = 0;
 
710
 
 
711
    for (current_worker = job->worker_list; current_worker != NULL; 
 
712
         current_worker = current_worker->next) {
 
713
        current_worker->command = RUN;
 
714
        if (current_worker->work_type == PRODUCER)
 
715
            producer = current_worker;
 
716
        if (current_worker->work_type == FOREMAN)
 
717
            foreman = current_worker;
 
718
    }
 
719
 
 
720
 
 
721
    if (job->flags & FLAG_USE_FILES)
 
722
        PR_ASSERT(producer != NULL);
 
723
 
 
724
    PR_ASSERT(foreman != NULL);
 
725
 
 
726
    if (!foreman) {
 
727
        goto error_abort;
 
728
    }
 
729
 
 
730
    time(&last_time);
 
731
    job->start_time = last_time;
 
732
    import_clear_progress_history(job);
 
733
 
 
734
    while (!finished) {
 
735
        ID trailing_ID = NOID;
 
736
 
 
737
        DS_Sleep(tenthsecond);
 
738
        finished = 1;
 
739
 
 
740
        /* First calculate the time interval since last reported */
 
741
        if (0 == (count % display_interval)) {
 
742
            time(&time_now);
 
743
            time_interval = time_now - last_time;
 
744
            last_time = time_now;
 
745
            /* Now calculate our rate of progress overall for this chunk */
 
746
            if (time_now != job->start_time) {
 
747
                /* log a cute chart of the worker progress */
 
748
                import_log_status_start(job);
 
749
                import_log_status_add_line(job,
 
750
                    "Index status for import of %s:", job->inst->inst_name);
 
751
                import_log_status_add_line(job,
 
752
                    "-------Index Task-------State---Entry----Rate-");
 
753
 
 
754
                import_push_progress_history(job, foreman->last_ID_processed,
 
755
                                             time_now);
 
756
                job->average_progress_rate = 
 
757
                    (double)(HISTORY(IMPORT_JOB_PROG_HISTORY_SIZE-1)+1 - foreman->first_ID) /
 
758
                    (double)(TIMES(IMPORT_JOB_PROG_HISTORY_SIZE-1) - job->start_time);
 
759
                job->recent_progress_rate =
 
760
                    PROGRESS(0, IMPORT_JOB_PROG_HISTORY_SIZE-1);
 
761
                job->cache_hit_ratio = import_grok_db_stats(job->inst);
 
762
            }
 
763
        }
 
764
 
 
765
        for (current_worker = job->worker_list; current_worker != NULL;
 
766
             current_worker = current_worker->next) {
 
767
            /* Calculate the ID at which the slowest worker is currently
 
768
             * processing */
 
769
            if ((trailing_ID > current_worker->last_ID_processed) &&
 
770
                (current_worker->work_type == WORKER)) {
 
771
                trailing_ID = current_worker->last_ID_processed;
 
772
            }
 
773
            if (0 == (count % display_interval) && time_interval) {
 
774
                import_calc_rate(current_worker, time_interval);
 
775
                import_print_worker_status(current_worker);
 
776
            }
 
777
            if (current_worker->state == QUIT) {
 
778
                rc = DRYRUN_QUIT; /* Set the RC; Don't abort now; 
 
779
                                     We have to stop other threads */
 
780
            } else if (current_worker->state != FINISHED) {
 
781
                finished = 0;
 
782
            }
 
783
            if (current_worker->state == ABORTED) {
 
784
                goto error_abort;
 
785
            }
 
786
        }
 
787
 
 
788
        if ((0 == (count % display_interval)) &&
 
789
            (job->start_time != time_now)) {
 
790
            char buffer[256], *p = buffer;
 
791
 
 
792
            import_log_status_done(job);
 
793
            p += sprintf(p, "Processed %lu entries ", (u_long)job->ready_ID);
 
794
            if (job->total_pass > 1)
 
795
                p += sprintf(p, "(pass %d) ", job->total_pass);
 
796
 
 
797
            p += sprintf(p, "-- average rate %.1f/sec, ",
 
798
                         job->average_progress_rate);
 
799
            p += sprintf(p, "recent rate %.1f/sec, ",
 
800
                         job->recent_progress_rate);
 
801
            p += sprintf(p, "hit ratio %.0f%%", job->cache_hit_ratio * 100.0);
 
802
            import_log_notice(job, "%s", buffer);
 
803
        }
 
804
 
 
805
        /* Then let's see if it's time to complete this import pass */
 
806
        if (!giveup) {
 
807
            giveup = import_throw_in_towel(job, time_now, trailing_ID);
 
808
            if (giveup) {
 
809
                /* If so, signal the lead thread to stop */
 
810
                import_log_notice(job, "Ending pass number %d ...",
 
811
                                  job->total_pass);
 
812
                foreman->command = STOP;
 
813
                while (foreman->state != FINISHED) {
 
814
                    DS_Sleep(tenthsecond);
 
815
                }
 
816
                import_log_notice(job, "Foreman is done; waiting for "
 
817
                                  "workers to finish...");
 
818
            }
 
819
        }
 
820
 
 
821
        /* if the producer is finished, and the foreman has caught up... */
 
822
        if (producer) {
 
823
            producer_done = (producer->state == FINISHED) ||
 
824
                            (producer->state == QUIT);
 
825
        } else {
 
826
            /* set in ldbm_back_wire_import */
 
827
            producer_done = (job->flags & FLAG_PRODUCER_DONE);
 
828
        }
 
829
        if (producer_done && (job->lead_ID == job->ready_ID)) {
 
830
            /* tell the foreman to stop if he's still working. */
 
831
            if (foreman->state != FINISHED)
 
832
                foreman->command = STOP;
 
833
 
 
834
            /* if all the workers are caught up too, we're done */
 
835
            if (trailing_ID == job->lead_ID)
 
836
                break;
 
837
        }
 
838
 
 
839
        /* if the foreman is done (end of pass) and the worker threads
 
840
         * have caught up...
 
841
         */
 
842
        if ((foreman->state == FINISHED) && (job->ready_ID == trailing_ID)) {
 
843
            break;
 
844
        }
 
845
 
 
846
        count++;
 
847
    }
 
848
 
 
849
    import_log_notice(job, "Workers finished; cleaning up...");
 
850
 
 
851
    /* Now tell all the workers to stop */
 
852
    for (current_worker = job->worker_list; current_worker != NULL;
 
853
         current_worker = current_worker->next) {
 
854
        if (current_worker->work_type != PRODUCER)
 
855
            current_worker->command = STOP;
 
856
    }
 
857
 
 
858
    /* Having done that, wait for them to say that they've stopped */
 
859
    for (current_worker = job->worker_list; current_worker != NULL; ) {
 
860
        if ((current_worker->state != FINISHED) &&
 
861
            (current_worker->state != ABORTED) &&
 
862
            (current_worker->state != QUIT) &&
 
863
            (current_worker->work_type != PRODUCER)) {
 
864
            DS_Sleep(tenthsecond);    /* Only sleep if we hit a thread that is still not done */
 
865
            continue;
 
866
        } else {
 
867
            current_worker = current_worker->next;
 
868
        }
 
869
    }
 
870
    import_log_notice(job, "Workers cleaned up.");
 
871
 
 
872
    /* If we're here and giveup is true, and the primary hadn't finished
 
873
     * processing the input files, we need to return IMPORT_INCOMPLETE_PASS */
 
874
    if (giveup && (job->input_filenames || (job->flags & FLAG_ONLINE) ||
 
875
                   (job->flags & FLAG_REINDEXING /* support multi-pass */))) {
 
876
        if (producer_done && (job->ready_ID == job->lead_ID)) {
 
877
            /* foreman caught up with the producer, and the producer is
 
878
             * done.
 
879
             */
 
880
            *status = IMPORT_COMPLETE_PASS;
 
881
        } else {
 
882
            *status = IMPORT_INCOMPLETE_PASS;
 
883
        }
 
884
    } else {
 
885
        *status = IMPORT_COMPLETE_PASS;
 
886
    }
 
887
    return rc;
 
888
 
 
889
error_abort:
 
890
    return ERR_IMPORT_ABORTED;
 
891
}
 
892
 
 
893
 
 
894
/********** running passes **********/
 
895
 
 
896
static int import_run_pass(ImportJob *job, int *status)
 
897
{
 
898
    int ret = 0;
 
899
 
 
900
    /* Start the threads running */
 
901
    ret = import_start_threads(job);
 
902
    if (ret != 0) {
 
903
        import_log_notice(job, "Starting threads failed: %d\n", ret);
 
904
        goto error;
 
905
    }
 
906
 
 
907
    /* Monitor the threads until we're done or fail */
 
908
    ret = import_monitor_threads(job, status);
 
909
    if ((ret == ERR_IMPORT_ABORTED) || (ret == DRYRUN_QUIT)) {
 
910
        goto error;
 
911
    } else if (ret != 0) {
 
912
        import_log_notice(job, "Thread monitoring aborted: %d\n", ret);
 
913
        goto error;
 
914
    }
 
915
 
 
916
error:
 
917
    return ret;
 
918
}
 
919
 
 
920
static void import_set_abort_flag_all(ImportJob *job, int wait_for_them)
 
921
{
 
922
 
 
923
    ImportWorkerInfo *worker;
 
924
 
 
925
    /* tell all the worker threads to abort */
 
926
    job->flags |= FLAG_ABORT;
 
927
 
 
928
    /* setting of the flag in the job will be detected in the worker, foreman
 
929
     * threads and if there are any threads which have a sleeptime  200 msecs
 
930
     * = import_sleep_time; after that time, they will examine the condition
 
931
     * (job->flags & FLAG_ABORT) which will unblock the thread to proceed to
 
932
     * abort. Hence, we will sleep here for atleast 3 sec to make sure clean
 
933
     * up occurs */
 
934
        /* allow all the aborts to be processed */
 
935
             DS_Sleep(PR_MillisecondsToInterval(3000)); 
 
936
 
 
937
    if (wait_for_them) {
 
938
        /* Having done that, wait for them to say that they've stopped */
 
939
        for (worker = job->worker_list; worker != NULL; ) {
 
940
            DS_Sleep(PR_MillisecondsToInterval(100));
 
941
            if ((worker->state != FINISHED) && (worker->state != ABORTED) &&
 
942
                (worker->state != QUIT)){
 
943
                continue;
 
944
            } else {
 
945
                worker = worker->next;
 
946
            }
 
947
        }
 
948
    }
 
949
}
 
950
 
 
951
 
 
952
/* tell all the threads to abort */
 
953
void import_abort_all(ImportJob *job, int wait_for_them)
 
954
{
 
955
    ImportWorkerInfo *worker;
 
956
 
 
957
    /* tell all the worker threads to abort */
 
958
    job->flags |= FLAG_ABORT;
 
959
    
 
960
    for (worker = job->worker_list; worker; worker = worker->next)
 
961
        worker->command = ABORT;
 
962
 
 
963
    if (wait_for_them) {
 
964
        /* Having done that, wait for them to say that they've stopped */
 
965
        for (worker = job->worker_list; worker != NULL; ) {
 
966
            DS_Sleep(PR_MillisecondsToInterval(100));
 
967
            if ((worker->state != FINISHED) && (worker->state != ABORTED) &&
 
968
                (worker->state != QUIT)) {
 
969
                continue;
 
970
            } else {
 
971
                worker = worker->next;
 
972
            }
 
973
        }
 
974
    }
 
975
}
 
976
 
 
977
/* Helper function to make up filenames */
 
978
int import_make_merge_filenames(char *directory, char *indexname, int pass,
 
979
                char **oldname, char **newname)
 
980
{
 
981
    /* Filenames look like this: attributename<LDBM_FILENAME_SUFFIX> 
 
982
       and need to be renamed to: attributename<LDBM_FILENAME_SUFFIX>.n
 
983
       where n is the pass number.
 
984
       */
 
985
    *oldname = slapi_ch_smprintf("%s/%s%s", directory, indexname, LDBM_FILENAME_SUFFIX);
 
986
    *newname = slapi_ch_smprintf("%s/%s.%d%s", directory, indexname, pass,
 
987
        LDBM_FILENAME_SUFFIX);
 
988
    if (!*oldname || !*newname) {
 
989
        slapi_ch_free_string(oldname);
 
990
        slapi_ch_free_string(newname);
 
991
        return -1;
 
992
    }
 
993
    return 0;
 
994
}
 
995
 
 
996
/* Task here is as follows:
 
997
 * First, if this is pass #1, check for the presence of a merge
 
998
 *     directory. If it is not present, create it.
 
999
 * If it is present, delete all the files in it.
 
1000
 * Then, flush the dblayer and close files.
 
1001
 * Now create a numbered subdir of the merge directory for this pass.
 
1002
 * Next, move the index files, except entrydn, parentid and id2entry to 
 
1003
 *     the merge subdirectory. Important to move if we can, because
 
1004
 *     that can be millions of times faster than a copy.
 
1005
 * Finally open the dblayer back up because the caller expects 
 
1006
 *     us to not muck with it.
 
1007
 */
 
1008
static int import_sweep_after_pass(ImportJob *job)
 
1009
{
 
1010
    backend *be = job->inst->inst_be;
 
1011
    int ret = 0;
 
1012
 
 
1013
    import_log_notice(job, "Sweeping files for merging later...");
 
1014
 
 
1015
    ret = dblayer_instance_close(be);
 
1016
    
 
1017
    if (0 == ret) {
 
1018
    /* Walk the list of index jobs */
 
1019
    ImportWorkerInfo *current_worker = NULL;
 
1020
    
 
1021
    for (current_worker = job->worker_list; current_worker != NULL;
 
1022
         current_worker = current_worker->next) {
 
1023
        /* Foreach job, rename the file to <filename>.n, where n is the 
 
1024
         * pass number */
 
1025
        if ((current_worker->work_type != FOREMAN) && 
 
1026
        (current_worker->work_type != PRODUCER) && 
 
1027
        (strcasecmp(current_worker->index_info->name, LDBM_PARENTID_STR) != 0)) {
 
1028
        char *newname = NULL;
 
1029
        char *oldname = NULL;
 
1030
 
 
1031
        ret = import_make_merge_filenames(job->inst->inst_dir_name,
 
1032
            current_worker->index_info->name, job->current_pass,
 
1033
            &oldname, &newname);
 
1034
        if (0 != ret) {
 
1035
            break;
 
1036
        }
 
1037
        if (PR_Access(oldname, PR_ACCESS_EXISTS) == PR_SUCCESS) {
 
1038
            ret = PR_Rename(oldname, newname);
 
1039
            if (ret != PR_SUCCESS) {
 
1040
                PRErrorCode prerr = PR_GetError();
 
1041
                import_log_notice(job, "Failed to rename file \"%s\" to \"%s\", "
 
1042
                    SLAPI_COMPONENT_NAME_NSPR " error %d (%s)",
 
1043
                    oldname, newname, prerr, slapd_pr_strerror(prerr));
 
1044
                slapi_ch_free( (void**)&newname);
 
1045
                slapi_ch_free( (void**)&oldname);
 
1046
                break;
 
1047
            }
 
1048
        }
 
1049
        slapi_ch_free( (void**)&newname);
 
1050
        slapi_ch_free( (void**)&oldname);
 
1051
        }
 
1052
    }
 
1053
 
 
1054
    ret = dblayer_instance_start(be, DBLAYER_IMPORT_MODE);
 
1055
    }
 
1056
 
 
1057
    if (0 == ret) {
 
1058
    import_log_notice(job, "Sweep done.");
 
1059
    } else {
 
1060
    if (ENOSPC == ret) {
 
1061
        import_log_notice(job, "ERROR: NO DISK SPACE LEFT in sweep phase");
 
1062
    } else {
 
1063
        import_log_notice(job, "ERROR: Sweep phase error %d (%s)", ret,
 
1064
                  dblayer_strerror(ret));
 
1065
    }
 
1066
    }
 
1067
    
 
1068
    return ret;
 
1069
}
 
1070
 
 
1071
/* when the import is done, this function is called to bring stuff back up.
 
1072
 * returns 0 on success; anything else is an error
 
1073
 */
 
1074
static int import_all_done(ImportJob *job, int ret)
 
1075
{
 
1076
    ldbm_instance *inst = job->inst;
 
1077
 
 
1078
    /* Writing this file indicates to future server startups that
 
1079
     * the db is OK unless it's in the dry run mode. */
 
1080
    if ((ret == 0) && !(job->flags & FLAG_DRYRUN)) {
 
1081
        char inst_dir[MAXPATHLEN*2];
 
1082
        char *inst_dirp = NULL;
 
1083
        inst_dirp = dblayer_get_full_inst_dir(inst->inst_li, inst,
 
1084
                                              inst_dir, MAXPATHLEN*2);
 
1085
        ret = dbversion_write(inst->inst_li, inst_dirp, NULL, DBVERSION_ALL);
 
1086
        if (inst_dirp != inst_dir)
 
1087
            slapi_ch_free_string(&inst_dirp);
 
1088
    }
 
1089
 
 
1090
    if ((job->task != NULL) && (0 == slapi_task_get_refcount(job->task))) {
 
1091
        slapi_task_finish(job->task, ret);
 
1092
    }
 
1093
 
 
1094
    if (job->flags & FLAG_ONLINE) {
 
1095
        /* make sure the indexes are online as well */
 
1096
        /* richm 20070919 - if index entries are added online, they
 
1097
           are created and marked as INDEX_OFFLINE, in anticipation
 
1098
           of someone doing a db2index.  In this case, the db2index
 
1099
           code will correctly unset the INDEX_OFFLINE flag.
 
1100
           However, if import is used to create the indexes, the
 
1101
           INDEX_OFFLINE flag will not be cleared.  So, we do that
 
1102
           here
 
1103
        */
 
1104
        IndexInfo *index = job->index_list;
 
1105
        while (index != NULL) {
 
1106
            index->ai->ai_indexmask &= ~INDEX_OFFLINE;
 
1107
            index = index->next;
 
1108
        }
 
1109
        /* start up the instance */
 
1110
        ret = dblayer_instance_start(job->inst->inst_be, DBLAYER_NORMAL_MODE);
 
1111
        if (ret != 0)
 
1112
            return ret;
 
1113
 
 
1114
        /* Reset USN slapi_counter with the last key of the entryUSN index */
 
1115
        ldbm_set_last_usn(inst->inst_be);
 
1116
 
 
1117
        /* bring backend online again */
 
1118
        slapi_mtn_be_enable(inst->inst_be);
 
1119
    }
 
1120
 
 
1121
    return ret;
 
1122
}
 
1123
 
 
1124
 
 
1125
int import_main_offline(void *arg)
 
1126
{
 
1127
    ImportJob *job = (ImportJob *)arg;
 
1128
    ldbm_instance *inst = job->inst;
 
1129
    backend *be = inst->inst_be;
 
1130
    int ret = 0;
 
1131
    time_t beginning = 0;
 
1132
    time_t end = 0;
 
1133
    int finished = 0;
 
1134
    int status = 0;
 
1135
    int verbose = 1;
 
1136
    int aborted = 0;
 
1137
    ImportWorkerInfo *producer = NULL;
 
1138
    char *opstr = "Import";
 
1139
 
 
1140
    if (job->task)
 
1141
        slapi_task_inc_refcount(job->task);
 
1142
 
 
1143
    if (job->flags & FLAG_UPGRADEDNFORMAT) {
 
1144
        if (job->flags & FLAG_DRYRUN) {
 
1145
            opstr = "Upgrade Dn Dryrun";
 
1146
        } else {
 
1147
            opstr = "Upgrade Dn";
 
1148
        }
 
1149
    } else if (job->flags & FLAG_REINDEXING) {
 
1150
        opstr = "Reindexing";
 
1151
    }
 
1152
    PR_ASSERT(inst != NULL);
 
1153
    time(&beginning);
 
1154
 
 
1155
    /* Decide which indexes are needed */
 
1156
    if (job->flags & FLAG_INDEX_ATTRS) {
 
1157
        /* Here, we get an AVL tree which contains nodes for all attributes
 
1158
         * in the schema.  Given this tree, we need to identify those nodes
 
1159
         * which are marked for indexing. */
 
1160
        avl_apply(job->inst->inst_attrs, (IFP)import_attr_callback,
 
1161
                  (caddr_t)job, -1, AVL_INORDER);
 
1162
        vlv_getindices((IFP)import_attr_callback, (void *)job, be);
 
1163
    }
 
1164
 
 
1165
    /* Determine how much index buffering space to allocate to each index */
 
1166
    import_set_index_buffer_size(job);
 
1167
 
 
1168
    /* initialize the entry FIFO */
 
1169
    ret = import_fifo_init(job);
 
1170
    if (ret) {
 
1171
        if (! (job->flags & FLAG_USE_FILES)) {
 
1172
            PR_Lock(job->wire_lock);
 
1173
            PR_NotifyCondVar(job->wire_cv);
 
1174
            PR_Unlock(job->wire_lock);
 
1175
        }
 
1176
        goto error;
 
1177
    }
 
1178
 
 
1179
    if (job->flags & FLAG_USE_FILES) {
 
1180
        /* importing from files: start up a producer thread to read the
 
1181
         * files and queue them
 
1182
         */
 
1183
        producer = CALLOC(ImportWorkerInfo);
 
1184
        if (! producer)
 
1185
            goto error;
 
1186
        
 
1187
        /* start the producer */
 
1188
        import_init_worker_info(producer, job);
 
1189
        producer->work_type = PRODUCER;
 
1190
        if (job->flags & FLAG_UPGRADEDNFORMAT)
 
1191
        {
 
1192
            if (! CREATE_THREAD(PR_USER_THREAD, (VFP)upgradedn_producer, 
 
1193
                producer, PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
 
1194
                PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) {
 
1195
                PRErrorCode prerr = PR_GetError();
 
1196
                LDAPDebug(LDAP_DEBUG_ANY,
 
1197
                          "unable to spawn upgrade dn producer thread, "
 
1198
                          SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
 
1199
                          prerr, slapd_pr_strerror(prerr), 0);
 
1200
                goto error;
 
1201
            }
 
1202
        }
 
1203
        else if (job->flags & FLAG_REINDEXING)
 
1204
        {
 
1205
            if (! CREATE_THREAD(PR_USER_THREAD, (VFP)index_producer, producer,
 
1206
                PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
 
1207
                PR_UNJOINABLE_THREAD,
 
1208
                SLAPD_DEFAULT_THREAD_STACKSIZE)) {
 
1209
                PRErrorCode prerr = PR_GetError();
 
1210
                LDAPDebug(LDAP_DEBUG_ANY,
 
1211
                          "unable to spawn index producer thread, "
 
1212
                          SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
 
1213
                          prerr, slapd_pr_strerror(prerr), 0);
 
1214
                goto error;
 
1215
            }
 
1216
        }
 
1217
        else
 
1218
        {
 
1219
            import_log_notice(job, "Beginning import job...");
 
1220
            if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_producer, producer,
 
1221
                            PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
 
1222
                            PR_UNJOINABLE_THREAD,
 
1223
                            SLAPD_DEFAULT_THREAD_STACKSIZE)) {
 
1224
                        PRErrorCode prerr = PR_GetError();
 
1225
                LDAPDebug(LDAP_DEBUG_ANY,
 
1226
                        "unable to spawn import producer thread, "
 
1227
                        SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
 
1228
                        prerr, slapd_pr_strerror(prerr), 0);
 
1229
                goto error;
 
1230
            }
 
1231
        }
 
1232
 
 
1233
        if (0 == job->job_index_buffer_suggestion)
 
1234
                import_log_notice(job, "Index buffering is disabled.");
 
1235
        else
 
1236
                import_log_notice(job,
 
1237
                                "Index buffering enabled with bucket size %lu", 
 
1238
                                job->job_index_buffer_suggestion);
 
1239
 
 
1240
        job->worker_list = producer;
 
1241
    } else {
 
1242
        /* release the startup lock and let the entries start queueing up
 
1243
         * in for import */
 
1244
        PR_Lock(job->wire_lock);
 
1245
        PR_NotifyCondVar(job->wire_cv);
 
1246
        PR_Unlock(job->wire_lock);
 
1247
    }
 
1248
 
 
1249
    /* Run as many passes as we need to complete the job or die honourably in
 
1250
     * the attempt */
 
1251
    while (! finished) {
 
1252
        job->current_pass++;
 
1253
        job->total_pass++;
 
1254
        ret = import_run_pass(job, &status);
 
1255
        /* The following could have happened: 
 
1256
         *     (a) Some error happened such that we're hosed.
 
1257
         *         This is indicated by a non-zero return code.
 
1258
         *     (b) We finished the complete file without needing a second pass
 
1259
         *         This is indicated by a zero return code and a status of 
 
1260
         *         IMPORT_COMPLETE_PASS and current_pass == 1;
 
1261
         *     (c) We completed a pass and need at least another one
 
1262
         *         This is indicated by a zero return code and a status of
 
1263
         *         IMPORT_INCOMPLETE_PASS
 
1264
         *     (d) We just completed what turned out to be the last in a
 
1265
         *         series of passes
 
1266
         *         This is indicated by a zero return code and a status of
 
1267
         *         IMPORT_COMPLETE_PASS and current_pass > 1
 
1268
         */
 
1269
        if (ret == ERR_IMPORT_ABORTED) {
 
1270
            /* at least one of the threads has aborted -- shut down ALL
 
1271
             * of the threads */
 
1272
            import_log_notice(job, "Aborting all %s threads...", opstr);
 
1273
            /* this abort sets the  abort flag on the threads and will block for 
 
1274
             * the exit of all threads 
 
1275
             */
 
1276
            import_set_abort_flag_all(job, 1); 
 
1277
            import_log_notice(job, "%s threads aborted.", opstr);
 
1278
            aborted = 1;
 
1279
            goto error;
 
1280
        }
 
1281
        if (ret == DRYRUN_QUIT) {
 
1282
            goto error; /* Found the candidate; close the db files and quit */
 
1283
        }
 
1284
 
 
1285
        if (0 != ret) {
 
1286
            /* Some horrible fate has befallen the import */
 
1287
            import_log_notice(job, "Fatal pass error %d", ret);
 
1288
            goto error;
 
1289
        }
 
1290
 
 
1291
        /* No error, but a number of possibilities */
 
1292
        if ( IMPORT_COMPLETE_PASS == status ) {
 
1293
            if (1 == job->current_pass) {
 
1294
                /* We're done !!!! */ ;
 
1295
            } else {
 
1296
                /* Save the files, then merge */
 
1297
                ret = import_sweep_after_pass(job);
 
1298
                if (0 != ret) {
 
1299
                    goto error;
 
1300
                }
 
1301
                ret = import_mega_merge(job);
 
1302
                if (0 != ret) {
 
1303
                    goto error;
 
1304
                }
 
1305
            }
 
1306
            finished = 1;
 
1307
        } else {
 
1308
            if (IMPORT_INCOMPLETE_PASS == status) {
 
1309
                /* Need to go round again */
 
1310
                /* Time to save the files we've built for later */
 
1311
                ret = import_sweep_after_pass(job);
 
1312
                if (0 != ret) {
 
1313
                    goto error;
 
1314
                }
 
1315
                if ( (inst->inst_li->li_maxpassbeforemerge != 0) &&
 
1316
                        (job->current_pass > inst->inst_li->li_maxpassbeforemerge) )
 
1317
                {
 
1318
                        ret = import_mega_merge(job);
 
1319
                        if (0 != ret) {
 
1320
                                   goto error;
 
1321
                        }
 
1322
                        job->current_pass = 1;
 
1323
                        ret = import_sweep_after_pass(job);
 
1324
                        if (0 != ret) {
 
1325
                                   goto error;
 
1326
                        }
 
1327
                }
 
1328
 
 
1329
                /* Fixup the first_ID value to reflect previous work */
 
1330
                job->first_ID = job->ready_ID + 1;
 
1331
                import_free_thread_data(job);
 
1332
                job->worker_list = producer;
 
1333
                import_log_notice(job, "Beginning pass number %d",
 
1334
                                  job->total_pass+1);
 
1335
            } else {
 
1336
                /* Bizarro-slapd */
 
1337
                goto error;
 
1338
            }
 
1339
        }
 
1340
    }
 
1341
 
 
1342
    /* kill the producer now; we're done */
 
1343
    if (producer) {
 
1344
        import_log_notice(job, "Cleaning up producer thread...");
 
1345
        producer->command = STOP;
 
1346
        /* wait for the lead thread to stop */
 
1347
        while (producer->state != FINISHED) {
 
1348
            DS_Sleep(PR_MillisecondsToInterval(100));
 
1349
        }
 
1350
    }
 
1351
 
 
1352
    import_log_notice(job, "Indexing complete.  Post-processing...");
 
1353
    /* Now do the numsubordinates attribute */
 
1354
    /* [610066] reindexed db cannot be used in the following backup/restore */
 
1355
    if ( (!(job->flags & FLAG_REINDEXING) || (job->flags & FLAG_DN2RDN)) &&
 
1356
         (ret = update_subordinatecounts(be, job->mothers, job->encrypt, NULL))
 
1357
         != 0 ) {
 
1358
        import_log_notice(job, "Failed to update numsubordinates attributes");
 
1359
        goto error;
 
1360
    }
 
1361
    import_log_notice(job, "Generating numSubordinates complete.");
 
1362
 
 
1363
    if (!entryrdn_get_noancestorid()) {
 
1364
        /* And the ancestorid index */
 
1365
        /* Creating ancestorid from the scratch; delete the index file first. */
 
1366
        struct attrinfo *ai = NULL;
 
1367
        ainfo_get(be, "ancestorid", &ai);
 
1368
        dblayer_erase_index_file(be, ai, 0);
 
1369
 
 
1370
        if ((ret = ldbm_ancestorid_create_index(be)) != 0) {
 
1371
            import_log_notice(job, "Failed to create ancestorid index");
 
1372
            goto error;
 
1373
        }
 
1374
    }
 
1375
 
 
1376
    import_log_notice(job, "Flushing caches...");
 
1377
    if (0 != (ret = dblayer_flush(job->inst->inst_li)) ) {
 
1378
        import_log_notice(job, "Failed to flush database");
 
1379
        goto error;
 
1380
    }
 
1381
 
 
1382
    /* New way to exit the routine: check the return code.
 
1383
     * If it's non-zero, delete the database files. 
 
1384
     * Otherwise don't, but always close the database layer properly.
 
1385
     * Then return. This ensures that we can't make a half-good/half-bad
 
1386
     * Database. */
 
1387
        
 
1388
error:
 
1389
    /* If we fail, the database is now in a mess, so we delete it 
 
1390
       except dry run mode */
 
1391
    import_log_notice(job, "Closing files...");
 
1392
    cache_clear(&job->inst->inst_cache, CACHE_TYPE_ENTRY);
 
1393
    if (entryrdn_get_switch()) {
 
1394
        cache_clear(&job->inst->inst_dncache, CACHE_TYPE_DN);
 
1395
    }
 
1396
    if (aborted) {
 
1397
        /* If aborted, it's safer to rebuild the caches. */
 
1398
        cache_destroy_please(&job->inst->inst_cache, CACHE_TYPE_ENTRY);
 
1399
        if (entryrdn_get_switch()) { /* subtree-rename: on */
 
1400
            cache_destroy_please(&job->inst->inst_dncache, CACHE_TYPE_DN);
 
1401
        }
 
1402
        /* initialize the entry cache */
 
1403
        if (! cache_init(&(inst->inst_cache), DEFAULT_CACHE_SIZE,
 
1404
                         DEFAULT_CACHE_ENTRIES, CACHE_TYPE_ENTRY)) {
 
1405
            LDAPDebug0Args(LDAP_DEBUG_ANY, "import_main_offline: "
 
1406
                        "cache_init failed.  Server should be restarted.\n");
 
1407
        }
 
1408
 
 
1409
        /* initialize the dn cache */
 
1410
        if (! cache_init(&(inst->inst_dncache), DEFAULT_DNCACHE_SIZE,
 
1411
                     DEFAULT_DNCACHE_MAXCOUNT, CACHE_TYPE_DN)) {
 
1412
            LDAPDebug0Args(LDAP_DEBUG_ANY, "import_main_offline: "
 
1413
                        "dn cache_init failed.  Server should be restarted.\n");
 
1414
        }
 
1415
    }
 
1416
    if (0 != ret) {
 
1417
        if (!(job->flags & FLAG_DRYRUN)) { /* If not dryrun */
 
1418
            /* if running in the dry run mode, don't touch the db */
 
1419
            dblayer_delete_instance_dir(be);
 
1420
        }
 
1421
        dblayer_instance_close(job->inst->inst_be);
 
1422
    } else {
 
1423
        if (0 != (ret = dblayer_instance_close(job->inst->inst_be)) ) {
 
1424
            import_log_notice(job, "Failed to close database");
 
1425
        }
 
1426
    }
 
1427
    if (!(job->flags & FLAG_ONLINE))
 
1428
        dblayer_close(job->inst->inst_li, DBLAYER_IMPORT_MODE);
 
1429
    
 
1430
    time(&end);
 
1431
    if (verbose && (0 == ret)) {
 
1432
        int seconds_to_import = end - beginning;
 
1433
        size_t entries_processed = job->lead_ID - (job->starting_ID - 1);
 
1434
        double entries_per_second = 
 
1435
                    seconds_to_import ?
 
1436
                    (double)entries_processed / (double)seconds_to_import : 0;
 
1437
 
 
1438
        if (job->not_here_skipped) {
 
1439
            if (job->skipped) {
 
1440
                import_log_notice(job, 
 
1441
                            "%s complete.  Processed %lu entries " 
 
1442
                            "(%d bad entries were skipped, "
 
1443
                            "%d entries were skipped because they don't "
 
1444
                            "belong to this database) in %d seconds. "
 
1445
                            "(%.2f entries/sec)", 
 
1446
                            opstr, entries_processed,
 
1447
                            job->skipped, job->not_here_skipped,
 
1448
                            seconds_to_import, entries_per_second);
 
1449
            } else {
 
1450
                import_log_notice(job, 
 
1451
                            "%s complete.  Processed %lu entries "
 
1452
                            "(%d entries were skipped because they don't "
 
1453
                            "belong to this database) "
 
1454
                            "in %d seconds. (%.2f entries/sec)",
 
1455
                            opstr, entries_processed, 
 
1456
                            job->not_here_skipped, seconds_to_import, 
 
1457
                            entries_per_second);
 
1458
            }
 
1459
        } else {
 
1460
            if (job->skipped) {
 
1461
                import_log_notice(job, 
 
1462
                            "%s complete.  Processed %lu entries "
 
1463
                            "(%d were skipped) in %d seconds. "
 
1464
                            "(%.2f entries/sec)", 
 
1465
                            opstr, entries_processed,
 
1466
                            job->skipped, seconds_to_import,
 
1467
                            entries_per_second);
 
1468
            } else {
 
1469
                import_log_notice(job, 
 
1470
                            "%s complete.  Processed %lu entries "
 
1471
                            "in %d seconds. (%.2f entries/sec)",
 
1472
                            opstr, entries_processed, 
 
1473
                            seconds_to_import, entries_per_second);
 
1474
            }
 
1475
        }
 
1476
    }
 
1477
 
 
1478
    if (job->flags & FLAG_DRYRUN) {
 
1479
        if (0 == ret) {
 
1480
            import_log_notice(job, "%s complete.  %s is up-to-date.", 
 
1481
                              opstr, job->inst->inst_name);
 
1482
            ret = 1;
 
1483
            if (job->task) {
 
1484
                slapi_task_dec_refcount(job->task);
 
1485
            }
 
1486
            import_all_done(job, ret);
 
1487
        } else if (DRYRUN_QUIT == ret) {
 
1488
            import_log_notice(job, "%s complete.  %s needs upgradednformat.", 
 
1489
                              opstr, job->inst->inst_name);
 
1490
            if (job->task) {
 
1491
                slapi_task_dec_refcount(job->task);
 
1492
            }
 
1493
            import_all_done(job, ret);
 
1494
            ret = 0;
 
1495
        } else {
 
1496
            ret = -1;
 
1497
            if (job->task != NULL) {
 
1498
                slapi_task_finish(job->task, ret);
 
1499
            }
 
1500
        }
 
1501
    } else if (0 != ret) {
 
1502
        import_log_notice(job, "%s failed.", opstr);
 
1503
        if (job->task != NULL) {
 
1504
            slapi_task_finish(job->task, ret);
 
1505
        }
 
1506
    } else {
 
1507
        if (job->task) {
 
1508
            slapi_task_dec_refcount(job->task);
 
1509
        }
 
1510
        import_all_done(job, ret);
 
1511
    }
 
1512
 
 
1513
    /* This instance isn't busy anymore */
 
1514
    instance_set_not_busy(job->inst);
 
1515
    
 
1516
    import_free_job(job);
 
1517
    if (producer)
 
1518
        FREE(producer);
 
1519
    
 
1520
    return(ret);
 
1521
}
 
1522
 
 
1523
/*
 
1524
 * to be called by online import using PR_CreateThread()
 
1525
 * offline import directly calls import_main_offline()
 
1526
 *
 
1527
 */
 
1528
void import_main(void *arg)
 
1529
{
 
1530
    import_main_offline(arg);
 
1531
}
 
1532
 
 
1533
int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb)
 
1534
{
 
1535
    backend *be = NULL;
 
1536
    int noattrindexes = 0;
 
1537
    ImportJob *job = NULL;
 
1538
    char **name_array = NULL;
 
1539
    int total_files, i;
 
1540
    int up_flags = 0;
 
1541
    PRThread *thread = NULL;
 
1542
 
 
1543
    job = CALLOC(ImportJob);
 
1544
    if (job == NULL) {
 
1545
        LDAPDebug(LDAP_DEBUG_ANY, "not enough memory to do import job\n",
 
1546
                  0, 0, 0);
 
1547
        return -1;
 
1548
    }
 
1549
 
 
1550
    slapi_pblock_get( pb, SLAPI_BACKEND, &be);
 
1551
    PR_ASSERT(NULL != be);
 
1552
    job->inst = (ldbm_instance *)be->be_instance_info;
 
1553
    slapi_pblock_get( pb, SLAPI_LDIF2DB_NOATTRINDEXES, &noattrindexes );
 
1554
    slapi_pblock_get( pb, SLAPI_LDIF2DB_FILE, &name_array );
 
1555
    slapi_pblock_get(pb, SLAPI_SEQ_TYPE, &up_flags); /* For upgrade dn and
 
1556
                                                        dn2rdn */
 
1557
 
 
1558
    /* the removedupvals field is blatantly overloaded here to mean
 
1559
     * the chunk size too.  (chunk size = number of entries that should
 
1560
     * be imported before starting a new pass.  usually for debugging.)
 
1561
     */
 
1562
    slapi_pblock_get(pb, SLAPI_LDIF2DB_REMOVEDUPVALS, &job->merge_chunk_size);
 
1563
    if (job->merge_chunk_size == 1)
 
1564
        job->merge_chunk_size = 0;
 
1565
    /* get list of specifically included and/or excluded subtrees from
 
1566
     * the front-end */
 
1567
    ldbm_back_fetch_incl_excl(pb, &job->include_subtrees,
 
1568
                  &job->exclude_subtrees);
 
1569
    /* get cn=tasks info, if any */
 
1570
    slapi_pblock_get(pb, SLAPI_BACKEND_TASK, &job->task);
 
1571
    slapi_pblock_get(pb, SLAPI_LDIF2DB_ENCRYPT, &job->encrypt);
 
1572
    /* get uniqueid info */
 
1573
    slapi_pblock_get(pb, SLAPI_LDIF2DB_GENERATE_UNIQUEID, &job->uuid_gen_type);
 
1574
    if (job->uuid_gen_type == SLAPI_UNIQUEID_GENERATE_NAME_BASED) {
 
1575
        char *namespaceid;
 
1576
 
 
1577
        slapi_pblock_get(pb, SLAPI_LDIF2DB_NAMESPACEID, &namespaceid);
 
1578
        job->uuid_namespace = slapi_ch_strdup(namespaceid);
 
1579
    }
 
1580
 
 
1581
    job->flags = FLAG_USE_FILES;
 
1582
    if (NULL == name_array) {    /* no ldif file is given -> reindexing or
 
1583
                                                             upgradedn */
 
1584
        if (up_flags & SLAPI_UPGRADEDNFORMAT) {
 
1585
            job->flags |= FLAG_UPGRADEDNFORMAT;
 
1586
            if (up_flags & SLAPI_DRYRUN) {
 
1587
                job->flags |= FLAG_DRYRUN;
 
1588
            }
 
1589
        } else {
 
1590
            job->flags |= FLAG_REINDEXING; /* call index_producer */
 
1591
            if (up_flags & SLAPI_UPGRADEDB_DN2RDN) {
 
1592
                if (entryrdn_get_switch()) {
 
1593
                    job->flags |= FLAG_DN2RDN; /* migrate to the rdn format */
 
1594
                } else {
 
1595
                    LDAPDebug1Arg(LDAP_DEBUG_ANY,
 
1596
                                  "DN to RDN option is specified, "
 
1597
                                  "but %s is not enabled\n",
 
1598
                                  CONFIG_ENTRYRDN_SWITCH);
 
1599
                    import_free_job(job);
 
1600
                    FREE(job);
 
1601
                    return -1;
 
1602
                }
 
1603
            }
 
1604
        }
 
1605
    }
 
1606
    if (!noattrindexes) {
 
1607
        job->flags |= FLAG_INDEX_ATTRS;
 
1608
    }
 
1609
    for (i = 0; name_array && name_array[i] != NULL; i++) {
 
1610
        charray_add(&job->input_filenames, slapi_ch_strdup(name_array[i]));
 
1611
    }
 
1612
    job->starting_ID = 1;
 
1613
    job->first_ID = 1;
 
1614
    job->mothers = CALLOC(import_subcount_stuff);
 
1615
 
 
1616
    /* how much space should we allocate to index buffering? */
 
1617
    job->job_index_buffer_size = import_get_index_buffer_size();
 
1618
    if (job->job_index_buffer_size == 0) {
 
1619
        /* 10% of the allocated cache size + one meg */
 
1620
        PR_Lock(job->inst->inst_li->li_config_mutex);
 
1621
        job->job_index_buffer_size = 
 
1622
               (job->inst->inst_li->li_import_cachesize/10) + (1024*1024);
 
1623
        PR_Unlock(job->inst->inst_li->li_config_mutex);
 
1624
    }
 
1625
    import_subcount_stuff_init(job->mothers);
 
1626
 
 
1627
    if (job->task != NULL) {
 
1628
        /* count files, use that to track "progress" in cn=tasks */
 
1629
        total_files = 0;
 
1630
        while (name_array && name_array[total_files] != NULL)
 
1631
            total_files++;
 
1632
        /* add 1 to account for post-import cleanup (which can take a
 
1633
         * significant amount of time)
 
1634
         */
 
1635
        /* NGK - This should eventually be cleaned up to use the public
 
1636
         * task API. */
 
1637
        if (0 == total_files) {    /* reindexing */
 
1638
            job->task->task_work = 2;
 
1639
        } else {
 
1640
            job->task->task_work = total_files + 1;
 
1641
        }
 
1642
        job->task->task_progress = 0;
 
1643
        job->task->task_state = SLAPI_TASK_RUNNING;
 
1644
        slapi_task_set_data(job->task, job);
 
1645
        slapi_task_set_destructor_fn(job->task, import_task_destroy);
 
1646
        slapi_task_set_cancel_fn(job->task, import_task_abort);
 
1647
        job->flags |= FLAG_ONLINE;
 
1648
 
 
1649
        /* create thread for import_main, so we can return */
 
1650
        thread = PR_CreateThread(PR_USER_THREAD, import_main, (void *)job,
 
1651
                                 PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
 
1652
                                 PR_UNJOINABLE_THREAD,
 
1653
                                 SLAPD_DEFAULT_THREAD_STACKSIZE);
 
1654
        if (thread == NULL) {
 
1655
            PRErrorCode prerr = PR_GetError();
 
1656
            LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import thread, "
 
1657
                        SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
 
1658
                        prerr, slapd_pr_strerror(prerr), 0);
 
1659
            import_free_job(job);
 
1660
            FREE(job);
 
1661
            return -2;
 
1662
        }
 
1663
        return 0;
 
1664
    }
 
1665
 
 
1666
    /* old style -- do it all synchronously (THIS IS GOING AWAY SOON) */
 
1667
    return import_main_offline((void *)job);
 
1668
}