111
111
static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext);
113
113
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
114
RestoreOptions *ropt, bool is_parallel);
114
RestoreOptions *ropt, bool is_parallel);
115
115
static void restore_toc_entries_parallel(ArchiveHandle *AH);
116
116
static thandle spawn_restore(RestoreArgs *args);
117
117
static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
118
118
static bool work_in_progress(ParallelSlot *slots, int n_slots);
119
static int get_next_slot(ParallelSlot *slots, int n_slots);
119
static int get_next_slot(ParallelSlot *slots, int n_slots);
120
120
static TocEntry *get_next_work_item(ArchiveHandle *AH,
121
TocEntry **first_unprocessed,
122
ParallelSlot *slots, int n_slots);
121
TocEntry **first_unprocessed,
122
ParallelSlot *slots, int n_slots);
123
123
static parallel_restore_result parallel_restore(RestoreArgs *args);
124
124
static void mark_work_done(ArchiveHandle *AH, thandle worker, int status,
125
ParallelSlot *slots, int n_slots);
125
ParallelSlot *slots, int n_slots);
126
126
static void fix_dependencies(ArchiveHandle *AH);
127
127
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
128
128
static void repoint_table_dependencies(ArchiveHandle *AH,
129
DumpId tableId, DumpId tableDataId);
129
DumpId tableId, DumpId tableDataId);
130
130
static void identify_locking_dependencies(TocEntry *te,
131
TocEntry **tocsByDumpId);
131
TocEntry **tocsByDumpId);
132
132
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te);
133
133
static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
134
134
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
211
211
die_horribly(AH, modulename, "-C and -c are incompatible options\n");
214
* -1 is not compatible with -C, because we can't create a database
215
* inside a transaction block.
214
* -1 is not compatible with -C, because we can't create a database inside
215
* a transaction block.
217
217
if (ropt->create && ropt->single_txn)
218
218
die_horribly(AH, modulename, "-C and -1 are incompatible options\n");
452
452
if (AH->lastErrorTE == te)
455
* We failed to create the table.
456
* If --no-data-for-failed-tables was given,
457
* mark the corresponding TABLE DATA to be ignored.
455
* We failed to create the table. If
456
* --no-data-for-failed-tables was given, mark the
457
* corresponding TABLE DATA to be ignored.
459
* In the parallel case this must be done in the parent,
460
* so we just set the return value.
459
* In the parallel case this must be done in the parent, so we
460
* just set the return value.
462
462
if (ropt->noDataForFailedTables)
473
* We created the table successfully. Mark the
474
* corresponding TABLE DATA for possible truncation.
473
* We created the table successfully. Mark the corresponding
474
* TABLE DATA for possible truncation.
476
* In the parallel case this must be done in the parent,
477
* so we just set the return value.
476
* In the parallel case this must be done in the parent, so we
477
* just set the return value.
480
480
retval = WORKER_CREATE_DONE;
498
498
if ((reqs & REQ_DATA) != 0)
501
* hadDumper will be set if there is genuine data component for
502
* this node. Otherwise, we need to check the defn field for
503
* statements that need to be executed in data-only restores.
501
* hadDumper will be set if there is genuine data component for this
502
* node. Otherwise, we need to check the defn field for statements
503
* that need to be executed in data-only restores.
505
505
if (te->hadDumper)
508
508
* If we can output the data, then restore it.
510
if (AH->PrintTocDataPtr != NULL && (reqs & REQ_DATA) != 0)
510
if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
512
512
_printTocEntry(AH, te, ropt, true, false);
535
* In parallel restore, if we created the table earlier
536
* in the run then we wrap the COPY in a transaction and
537
* precede it with a TRUNCATE. If archiving is not on
538
* this prevents WAL-logging the COPY. This obtains a
539
* speedup similar to that from using single_txn mode
540
* in non-parallel restores.
535
* In parallel restore, if we created the table earlier in
536
* the run then we wrap the COPY in a transaction and
537
* precede it with a TRUNCATE. If archiving is not on
538
* this prevents WAL-logging the COPY. This obtains a
539
* speedup similar to that from using single_txn mode in
540
* non-parallel restores.
542
542
if (is_parallel && te->created)
562
* If we have a copy statement, use it. As of V1.3,
563
* these are separate to allow easy import from
564
* withing a database connection. Pre 1.3 archives can
565
* not use DB connections and are sent to output only.
562
* If we have a copy statement, use it. As of V1.3, these
563
* are separate to allow easy import from withing a
564
* database connection. Pre 1.3 archives can not use DB
565
* connections and are sent to output only.
567
* For V1.3+, the table data MUST have a copy
568
* statement so that we can go into appropriate mode
567
* For V1.3+, the table data MUST have a copy statement so
568
* that we can go into appropriate mode with libpq.
571
570
if (te->copyStmt && strlen(te->copyStmt) > 0)
1236
1235
res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1237
1236
ahlog(AH, 5, ngettext("wrote %lu byte of large object data (result = %lu)\n",
1238
"wrote %lu bytes of large object data (result = %lu)\n",
1237
"wrote %lu bytes of large object data (result = %lu)\n",
1239
1238
AH->lo_buf_used),
1240
1239
(unsigned long) AH->lo_buf_used, (unsigned long) res);
1241
1240
if (res != AH->lo_buf_used)
1787
1786
AH->lookaheadLen = 0; /* Don't bother since we've reset the file */
1790
write_msg(modulename, ngettext("read %lu byte into lookahead buffer\n",
1791
"read %lu bytes into lookahead buffer\n",
1793
(unsigned long) AH->lookaheadLen);
1796
1788
/* Close the file */
1798
1790
if (fclose(fh) != 0)
2323
2315
_doSetFixedOutputState(ArchiveHandle *AH)
2325
/* Disable statement_timeout in archive for pg_restore/psql */
2317
/* Disable statement_timeout in archive for pg_restore/psql */
2326
2318
ahprintf(AH, "SET statement_timeout = 0;\n");
2328
2320
/* Select the correct character set encoding */
3078
3070
thandle ret_child;
3081
ahlog(AH,2,"entering restore_toc_entries_parallel\n");
3073
ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
3083
3075
/* we haven't got round to making this work for all archive formats */
3084
3076
if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
3090
3082
fix_dependencies(AH);
3093
* Do all the early stuff in a single connection in the parent.
3094
* There's no great point in running it in parallel, in fact it will
3095
* actually run faster in a single connection because we avoid all the
3096
* connection and setup overhead.
3085
* Do all the early stuff in a single connection in the parent. There's no
3086
* great point in running it in parallel, in fact it will actually run
3087
* faster in a single connection because we avoid all the connection and
3098
3090
while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
3099
3091
NULL, 0)) != NULL)
3139
3131
* left to be done.
3142
ahlog(AH,1,"entering main parallel loop\n");
3134
ahlog(AH, 1, "entering main parallel loop\n");
3144
3136
while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
3145
3137
slots, n_slots)) != NULL ||
3222
3214
_doSetFixedOutputState(AH);
3225
* Make sure there is no non-ACL work left due to, say,
3226
* circular dependencies, or some other pathological condition.
3227
* If so, do it in the single parent connection.
3217
* Make sure there is no non-ACL work left due to, say, circular
3218
* dependencies, or some other pathological condition. If so, do it in the
3219
* single parent connection.
3229
3221
for (te = AH->toc->next; te != AH->toc; te = te->next)
3289
3281
return wait(work_status);
3291
3283
static HANDLE *handles = NULL;
3292
int hindex, snum, tnum;
3296
3290
/* first time around only, make space for handles to listen on */
3297
3291
if (handles == NULL)
3298
handles = (HANDLE *) calloc(sizeof(HANDLE),n_slots);
3292
handles = (HANDLE *) calloc(sizeof(HANDLE), n_slots);
3300
3294
/* set up list of handles to listen to */
3301
for (snum=0, tnum=0; snum < n_slots; snum++)
3295
for (snum = 0, tnum = 0; snum < n_slots; snum++)
3302
3296
if (slots[snum].child_id != 0)
3303
3297
handles[tnum++] = slots[snum].child_id;
3394
3389
get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed,
3395
3390
ParallelSlot *slots, int n_slots)
3397
bool pref_non_data = false; /* or get from AH->ropt */
3398
TocEntry *data_te = NULL;
3392
bool pref_non_data = false; /* or get from AH->ropt */
3393
TocEntry *data_te = NULL;
3403
3399
* Bogus heuristics for pref_non_data
3405
3401
if (pref_non_data)
3409
for (k=0; k < n_slots; k++)
3405
for (k = 0; k < n_slots; k++)
3410
3406
if (slots[k].args->te != NULL &&
3411
3407
slots[k].args->te->section == SECTION_DATA)
3439
3435
* Check to see if the item would need exclusive lock on something
3440
* that a currently running item also needs lock on, or vice versa.
3441
* If so, we don't want to schedule them together.
3436
* that a currently running item also needs lock on, or vice versa. If
3437
* so, we don't want to schedule them together.
3443
3439
for (i = 0; i < n_slots && !conflicts; i++)
3445
TocEntry *running_te;
3441
TocEntry *running_te;
3447
3443
if (slots[i].args == NULL)
3488
3484
parallel_restore(RestoreArgs *args)
3490
3486
ArchiveHandle *AH = args->AH;
3491
TocEntry *te = args->te;
3487
TocEntry *te = args->te;
3492
3488
RestoreOptions *ropt = AH->ropt;
3496
* Close and reopen the input file so we have a private file pointer
3497
* that doesn't stomp on anyone else's file pointer, if we're actually
3498
* going to need to read from the file. Otherwise, just close it
3499
* except on Windows, where it will possibly be needed by other threads.
3492
* Close and reopen the input file so we have a private file pointer that
3493
* doesn't stomp on anyone else's file pointer, if we're actually going to
3494
* need to read from the file. Otherwise, just close it except on Windows,
3495
* where it will possibly be needed by other threads.
3501
* Note: on Windows, since we are using threads not processes, the
3502
* reopen call *doesn't* close the original file pointer but just open
3497
* Note: on Windows, since we are using threads not processes, the reopen
3498
* call *doesn't* close the original file pointer but just open a new one.
3505
if (te->section == SECTION_DATA )
3500
if (te->section == SECTION_DATA)
3506
3501
(AH->ReopenPtr) (AH);
3605
3600
TocEntry **tocsByDumpId;
3610
3605
* For some of the steps here, it is convenient to have an array that
3611
* indexes the TOC entries by dump ID, rather than searching the TOC
3612
* list repeatedly. Entries for dump IDs not present in the TOC will
3606
* indexes the TOC entries by dump ID, rather than searching the TOC list
3607
* repeatedly. Entries for dump IDs not present in the TOC will be NULL.
3615
3609
* Also, initialize the depCount fields.
3629
3623
* dependencies.
3631
3625
* Note: currently, a TABLE DATA should always have exactly one
3632
* dependency, on its TABLE item. So we don't bother to search,
3633
* but look just at the first dependency. We do trouble to make sure
3634
* that it's a TABLE, if possible. However, if the dependency isn't
3635
* in the archive then just assume it was a TABLE; this is to cover
3636
* cases where the table was suppressed but we have the data and some
3637
* dependent post-data items.
3626
* dependency, on its TABLE item. So we don't bother to search, but look
3627
* just at the first dependency. We do trouble to make sure that it's a
3628
* TABLE, if possible. However, if the dependency isn't in the archive
3629
* then just assume it was a TABLE; this is to cover cases where the table
3630
* was suppressed but we have the data and some dependent post-data items.
3639
3632
for (te = AH->toc->next; te != AH->toc; te = te->next)
3641
3634
if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
3643
DumpId tableId = te->dependencies[0];
3636
DumpId tableId = te->dependencies[0];
3645
3638
if (tocsByDumpId[tableId - 1] == NULL ||
3646
3639
strcmp(tocsByDumpId[tableId - 1]->desc, "TABLE") == 0)
3654
* Pre-8.4 versions of pg_dump neglected to set up a dependency from
3655
* BLOB COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS
3656
* and only one BLOB COMMENTS in such files.)
3647
* Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
3648
* COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
3649
* one BLOB COMMENTS in such files.)
3658
3651
if (AH->version < K_VERS_1_11)
3762
3755
* We assume the item requires exclusive lock on each TABLE DATA item
3763
* listed among its dependencies. (This was originally a dependency
3764
* on the TABLE, but fix_dependencies repointed it to the data item.
3765
* Note that all the entry types we are interested in here are POST_DATA,
3766
* so they will all have been changed this way.)
3756
* listed among its dependencies. (This was originally a dependency on
3757
* the TABLE, but fix_dependencies repointed it to the data item. Note
3758
* that all the entry types we are interested in here are POST_DATA, so
3759
* they will all have been changed this way.)
3768
3761
lockids = (DumpId *) malloc(te->nDeps * sizeof(DumpId));
3770
3763
for (i = 0; i < te->nDeps; i++)
3772
DumpId depid = te->dependencies[i];
3765
DumpId depid = te->dependencies[i];
3774
3767
if (tocsByDumpId[depid - 1] &&
3775
3768
strcmp(tocsByDumpId[depid - 1]->desc, "TABLE DATA") == 0)
3794
3787
reduce_dependencies(ArchiveHandle *AH, TocEntry *te)
3796
DumpId target = te->dumpId;
3789
DumpId target = te->dumpId;
3799
ahlog(AH,2,"reducing dependencies for %d\n",target);
3792
ahlog(AH, 2, "reducing dependencies for %d\n", target);
3802
3795
* We must examine all entries, not only the ones after the target item,
3877
3870
ArchiveHandle *clone;
3879
3872
/* Make a "flat" copy */
3880
clone = (ArchiveHandle *) malloc(sizeof(ArchiveHandle));
3873
clone = (ArchiveHandle *) malloc(sizeof(ArchiveHandle));
3881
3874
if (clone == NULL)
3882
3875
die_horribly(AH, modulename, "out of memory\n");
3883
3876
memcpy(clone, AH, sizeof(ArchiveHandle));