~vcs-imports/mammoth-replicator/trunk

« back to all changes in this revision

Viewing changes to src/backend/storage/large_object/inv_api.c

  • Committer: alvherre
  • Date: 2005-12-16 21:24:52 UTC
  • Revision ID: svn-v4:db760fc0-0f08-0410-9d63-cc6633f64896:trunk:1
Initial import of the REL8_0_3 sources from the Pgsql CVS repository.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * inv_api.c
 
4
 *        routines for manipulating inversion fs large objects. This file
 
5
 *        contains the user-level large object application interface routines.
 
6
 *
 
7
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
 
8
 * Portions Copyright (c) 1994, Regents of the University of California
 
9
 *
 
10
 *
 
11
 * IDENTIFICATION
 
12
 *        $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.108 2004-12-31 22:00:59 pgsql Exp $
 
13
 *
 
14
 *-------------------------------------------------------------------------
 
15
 */
 
16
#include "postgres.h"
 
17
 
 
18
#include "access/genam.h"
 
19
#include "access/heapam.h"
 
20
#include "access/tuptoaster.h"
 
21
#include "catalog/catalog.h"
 
22
#include "catalog/catname.h"
 
23
#include "catalog/indexing.h"
 
24
#include "catalog/pg_largeobject.h"
 
25
#include "commands/comment.h"
 
26
#include "libpq/libpq-fs.h"
 
27
#include "storage/large_object.h"
 
28
#include "utils/fmgroids.h"
 
29
#include "utils/lsyscache.h"
 
30
#include "utils/resowner.h"
 
31
 
 
32
 
 
33
/*
 
34
 * All accesses to pg_largeobject and its index make use of a single Relation
 
35
 * reference, so that we only need to open pg_relation once per transaction.
 
36
 * To avoid problems when the first such reference occurs inside a
 
37
 * subtransaction, we execute a slightly klugy maneuver to assign ownership of
 
38
 * the Relation reference to TopTransactionResourceOwner.
 
39
 */
 
40
static Relation lo_heap_r = NULL;
 
41
static Relation lo_index_r = NULL;
 
42
 
 
43
 
 
44
/*
 
45
 * Open pg_largeobject and its index, if not already done in current xact
 
46
 */
 
47
static void
 
48
open_lo_relation(void)
 
49
{
 
50
        ResourceOwner currentOwner;
 
51
 
 
52
        if (lo_heap_r && lo_index_r)
 
53
                return;                                 /* already open in current xact */
 
54
 
 
55
        /* Arrange for the top xact to own these relation references */
 
56
        currentOwner = CurrentResourceOwner;
 
57
        PG_TRY();
 
58
        {
 
59
                CurrentResourceOwner = TopTransactionResourceOwner;
 
60
 
 
61
                /* Use RowExclusiveLock since we might either read or write */
 
62
                if (lo_heap_r == NULL)
 
63
                        lo_heap_r = heap_openr(LargeObjectRelationName, RowExclusiveLock);
 
64
                if (lo_index_r == NULL)
 
65
                        lo_index_r = index_openr(LargeObjectLOidPNIndex);
 
66
        }
 
67
        PG_CATCH();
 
68
        {
 
69
                /* Ensure CurrentResourceOwner is restored on error */
 
70
                CurrentResourceOwner = currentOwner;
 
71
                PG_RE_THROW();
 
72
        }
 
73
        PG_END_TRY();
 
74
        CurrentResourceOwner = currentOwner;
 
75
}
 
76
 
 
77
/*
 
78
 * Clean up at main transaction end
 
79
 */
 
80
void
 
81
close_lo_relation(bool isCommit)
 
82
{
 
83
        if (lo_heap_r || lo_index_r)
 
84
        {
 
85
                /*
 
86
                 * Only bother to close if committing; else abort cleanup will
 
87
                 * handle it
 
88
                 */
 
89
                if (isCommit)
 
90
                {
 
91
                        ResourceOwner currentOwner;
 
92
 
 
93
                        currentOwner = CurrentResourceOwner;
 
94
                        PG_TRY();
 
95
                        {
 
96
                                CurrentResourceOwner = TopTransactionResourceOwner;
 
97
 
 
98
                                if (lo_index_r)
 
99
                                        index_close(lo_index_r);
 
100
                                if (lo_heap_r)
 
101
                                        heap_close(lo_heap_r, NoLock);
 
102
                        }
 
103
                        PG_CATCH();
 
104
                        {
 
105
                                /* Ensure CurrentResourceOwner is restored on error */
 
106
                                CurrentResourceOwner = currentOwner;
 
107
                                PG_RE_THROW();
 
108
                        }
 
109
                        PG_END_TRY();
 
110
                        CurrentResourceOwner = currentOwner;
 
111
                }
 
112
                lo_heap_r = NULL;
 
113
                lo_index_r = NULL;
 
114
        }
 
115
}
 
116
 
 
117
 
 
118
static int32
 
119
getbytealen(bytea *data)
 
120
{
 
121
        Assert(!VARATT_IS_EXTENDED(data));
 
122
        if (VARSIZE(data) < VARHDRSZ)
 
123
                elog(ERROR, "invalid VARSIZE(data)");
 
124
        return (VARSIZE(data) - VARHDRSZ);
 
125
}
 
126
 
 
127
 
 
128
/*
 
129
 *      inv_create -- create a new large object.
 
130
 *
 
131
 *              Arguments:
 
132
 *                flags
 
133
 *
 
134
 *              Returns:
 
135
 *                large object descriptor, appropriately filled in.
 
136
 */
 
137
LargeObjectDesc *
 
138
inv_create(int flags)
 
139
{
 
140
        Oid                     file_oid;
 
141
        LargeObjectDesc *retval;
 
142
 
 
143
        /*
 
144
         * Allocate an OID to be the LO's identifier.
 
145
         */
 
146
        file_oid = newoid();
 
147
 
 
148
        /* Check for duplicate (shouldn't happen) */
 
149
        if (LargeObjectExists(file_oid))
 
150
                elog(ERROR, "large object %u already exists", file_oid);
 
151
 
 
152
        /*
 
153
         * Create the LO by writing an empty first page for it in
 
154
         * pg_largeobject
 
155
         */
 
156
        LargeObjectCreate(file_oid);
 
157
 
 
158
        /*
 
159
         * Advance command counter so that new tuple will be seen by later
 
160
         * large-object operations in this transaction.
 
161
         */
 
162
        CommandCounterIncrement();
 
163
 
 
164
        /*
 
165
         * Prepare LargeObjectDesc data structure for accessing LO
 
166
         */
 
167
        retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
 
168
 
 
169
        retval->id = file_oid;
 
170
        retval->subid = GetCurrentSubTransactionId();
 
171
        retval->offset = 0;
 
172
 
 
173
        if (flags & INV_WRITE)
 
174
                retval->flags = IFS_WRLOCK | IFS_RDLOCK;
 
175
        else if (flags & INV_READ)
 
176
                retval->flags = IFS_RDLOCK;
 
177
        else
 
178
                elog(ERROR, "invalid flags: %d", flags);
 
179
 
 
180
        return retval;
 
181
}
 
182
 
 
183
/*
 
184
 *      inv_open -- access an existing large object.
 
185
 *
 
186
 *              Returns:
 
187
 *                large object descriptor, appropriately filled in.
 
188
 */
 
189
LargeObjectDesc *
 
190
inv_open(Oid lobjId, int flags)
 
191
{
 
192
        LargeObjectDesc *retval;
 
193
 
 
194
        if (!LargeObjectExists(lobjId))
 
195
                ereport(ERROR,
 
196
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
 
197
                                 errmsg("large object %u does not exist", lobjId)));
 
198
 
 
199
        retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
 
200
 
 
201
        retval->id = lobjId;
 
202
        retval->subid = GetCurrentSubTransactionId();
 
203
        retval->offset = 0;
 
204
 
 
205
        if (flags & INV_WRITE)
 
206
                retval->flags = IFS_WRLOCK | IFS_RDLOCK;
 
207
        else if (flags & INV_READ)
 
208
                retval->flags = IFS_RDLOCK;
 
209
        else
 
210
                elog(ERROR, "invalid flags: %d", flags);
 
211
 
 
212
        return retval;
 
213
}
 
214
 
 
215
/*
 
216
 * Closes an existing large object descriptor.
 
217
 */
 
218
void
 
219
inv_close(LargeObjectDesc *obj_desc)
 
220
{
 
221
        Assert(PointerIsValid(obj_desc));
 
222
        pfree(obj_desc);
 
223
}
 
224
 
 
225
/*
 
226
 * Destroys an existing large object (not to be confused with a descriptor!)
 
227
 *
 
228
 * returns -1 if failed
 
229
 */
 
230
int
 
231
inv_drop(Oid lobjId)
 
232
{
 
233
        Oid                     classoid;
 
234
 
 
235
        LargeObjectDrop(lobjId);
 
236
 
 
237
        /* pg_largeobject doesn't have a hard-coded OID, so must look it up */
 
238
        classoid = get_system_catalog_relid(LargeObjectRelationName);
 
239
 
 
240
        /* Delete any comments on the large object */
 
241
        DeleteComments(lobjId, classoid, 0);
 
242
 
 
243
        /*
 
244
         * Advance command counter so that tuple removal will be seen by later
 
245
         * large-object operations in this transaction.
 
246
         */
 
247
        CommandCounterIncrement();
 
248
 
 
249
        return 1;
 
250
}
 
251
 
 
252
/*
 
253
 * Determine size of a large object
 
254
 *
 
255
 * NOTE: LOs can contain gaps, just like Unix files.  We actually return
 
256
 * the offset of the last byte + 1.
 
257
 */
 
258
static uint32
 
259
inv_getsize(LargeObjectDesc *obj_desc)
 
260
{
 
261
        bool            found = false;
 
262
        uint32          lastbyte = 0;
 
263
        ScanKeyData skey[1];
 
264
        IndexScanDesc sd;
 
265
        HeapTuple       tuple;
 
266
 
 
267
        Assert(PointerIsValid(obj_desc));
 
268
 
 
269
        open_lo_relation();
 
270
 
 
271
        ScanKeyInit(&skey[0],
 
272
                                Anum_pg_largeobject_loid,
 
273
                                BTEqualStrategyNumber, F_OIDEQ,
 
274
                                ObjectIdGetDatum(obj_desc->id));
 
275
 
 
276
        sd = index_beginscan(lo_heap_r, lo_index_r,
 
277
                                                 SnapshotNow, 1, skey);
 
278
 
 
279
        /*
 
280
         * Because the pg_largeobject index is on both loid and pageno, but we
 
281
         * constrain only loid, a backwards scan should visit all pages of the
 
282
         * large object in reverse pageno order.  So, it's sufficient to
 
283
         * examine the first valid tuple (== last valid page).
 
284
         */
 
285
        while ((tuple = index_getnext(sd, BackwardScanDirection)) != NULL)
 
286
        {
 
287
                Form_pg_largeobject data;
 
288
                bytea      *datafield;
 
289
                bool            pfreeit;
 
290
 
 
291
                found = true;
 
292
                data = (Form_pg_largeobject) GETSTRUCT(tuple);
 
293
                datafield = &(data->data);
 
294
                pfreeit = false;
 
295
                if (VARATT_IS_EXTENDED(datafield))
 
296
                {
 
297
                        datafield = (bytea *)
 
298
                                heap_tuple_untoast_attr((varattrib *) datafield);
 
299
                        pfreeit = true;
 
300
                }
 
301
                lastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield);
 
302
                if (pfreeit)
 
303
                        pfree(datafield);
 
304
                break;
 
305
        }
 
306
 
 
307
        index_endscan(sd);
 
308
 
 
309
        if (!found)
 
310
                ereport(ERROR,
 
311
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
 
312
                                 errmsg("large object %u does not exist", obj_desc->id)));
 
313
        return lastbyte;
 
314
}
 
315
 
 
316
int
 
317
inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
 
318
{
 
319
        Assert(PointerIsValid(obj_desc));
 
320
 
 
321
        switch (whence)
 
322
        {
 
323
                case SEEK_SET:
 
324
                        if (offset < 0)
 
325
                                elog(ERROR, "invalid seek offset: %d", offset);
 
326
                        obj_desc->offset = offset;
 
327
                        break;
 
328
                case SEEK_CUR:
 
329
                        if (offset < 0 && obj_desc->offset < ((uint32) (-offset)))
 
330
                                elog(ERROR, "invalid seek offset: %d", offset);
 
331
                        obj_desc->offset += offset;
 
332
                        break;
 
333
                case SEEK_END:
 
334
                        {
 
335
                                uint32          size = inv_getsize(obj_desc);
 
336
 
 
337
                                if (offset < 0 && size < ((uint32) (-offset)))
 
338
                                        elog(ERROR, "invalid seek offset: %d", offset);
 
339
                                obj_desc->offset = size + offset;
 
340
                        }
 
341
                        break;
 
342
                default:
 
343
                        elog(ERROR, "invalid whence: %d", whence);
 
344
        }
 
345
        return obj_desc->offset;
 
346
}
 
347
 
 
348
int
 
349
inv_tell(LargeObjectDesc *obj_desc)
 
350
{
 
351
        Assert(PointerIsValid(obj_desc));
 
352
 
 
353
        return obj_desc->offset;
 
354
}
 
355
 
 
356
int
 
357
inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
 
358
{
 
359
        int                     nread = 0;
 
360
        int                     n;
 
361
        int                     off;
 
362
        int                     len;
 
363
        int32           pageno = (int32) (obj_desc->offset / LOBLKSIZE);
 
364
        uint32          pageoff;
 
365
        ScanKeyData skey[2];
 
366
        IndexScanDesc sd;
 
367
        HeapTuple       tuple;
 
368
 
 
369
        Assert(PointerIsValid(obj_desc));
 
370
        Assert(buf != NULL);
 
371
 
 
372
        if (nbytes <= 0)
 
373
                return 0;
 
374
 
 
375
        open_lo_relation();
 
376
 
 
377
        ScanKeyInit(&skey[0],
 
378
                                Anum_pg_largeobject_loid,
 
379
                                BTEqualStrategyNumber, F_OIDEQ,
 
380
                                ObjectIdGetDatum(obj_desc->id));
 
381
 
 
382
        ScanKeyInit(&skey[1],
 
383
                                Anum_pg_largeobject_pageno,
 
384
                                BTGreaterEqualStrategyNumber, F_INT4GE,
 
385
                                Int32GetDatum(pageno));
 
386
 
 
387
        sd = index_beginscan(lo_heap_r, lo_index_r,
 
388
                                                 SnapshotNow, 2, skey);
 
389
 
 
390
        while ((tuple = index_getnext(sd, ForwardScanDirection)) != NULL)
 
391
        {
 
392
                Form_pg_largeobject data;
 
393
                bytea      *datafield;
 
394
                bool            pfreeit;
 
395
 
 
396
                data = (Form_pg_largeobject) GETSTRUCT(tuple);
 
397
 
 
398
                /*
 
399
                 * We assume the indexscan will deliver pages in order.  However,
 
400
                 * there may be missing pages if the LO contains unwritten
 
401
                 * "holes". We want missing sections to read out as zeroes.
 
402
                 */
 
403
                pageoff = ((uint32) data->pageno) * LOBLKSIZE;
 
404
                if (pageoff > obj_desc->offset)
 
405
                {
 
406
                        n = pageoff - obj_desc->offset;
 
407
                        n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
 
408
                        MemSet(buf + nread, 0, n);
 
409
                        nread += n;
 
410
                        obj_desc->offset += n;
 
411
                }
 
412
 
 
413
                if (nread < nbytes)
 
414
                {
 
415
                        Assert(obj_desc->offset >= pageoff);
 
416
                        off = (int) (obj_desc->offset - pageoff);
 
417
                        Assert(off >= 0 && off < LOBLKSIZE);
 
418
 
 
419
                        datafield = &(data->data);
 
420
                        pfreeit = false;
 
421
                        if (VARATT_IS_EXTENDED(datafield))
 
422
                        {
 
423
                                datafield = (bytea *)
 
424
                                        heap_tuple_untoast_attr((varattrib *) datafield);
 
425
                                pfreeit = true;
 
426
                        }
 
427
                        len = getbytealen(datafield);
 
428
                        if (len > off)
 
429
                        {
 
430
                                n = len - off;
 
431
                                n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
 
432
                                memcpy(buf + nread, VARDATA(datafield) + off, n);
 
433
                                nread += n;
 
434
                                obj_desc->offset += n;
 
435
                        }
 
436
                        if (pfreeit)
 
437
                                pfree(datafield);
 
438
                }
 
439
 
 
440
                if (nread >= nbytes)
 
441
                        break;
 
442
        }
 
443
 
 
444
        index_endscan(sd);
 
445
 
 
446
        return nread;
 
447
}
 
448
 
 
449
int
 
450
inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
 
451
{
 
452
        int                     nwritten = 0;
 
453
        int                     n;
 
454
        int                     off;
 
455
        int                     len;
 
456
        int32           pageno = (int32) (obj_desc->offset / LOBLKSIZE);
 
457
        ScanKeyData skey[2];
 
458
        IndexScanDesc sd;
 
459
        HeapTuple       oldtuple;
 
460
        Form_pg_largeobject olddata;
 
461
        bool            neednextpage;
 
462
        bytea      *datafield;
 
463
        bool            pfreeit;
 
464
        struct
 
465
        {
 
466
                bytea           hdr;
 
467
                char            data[LOBLKSIZE];
 
468
        }                       workbuf;
 
469
        char       *workb = VARATT_DATA(&workbuf.hdr);
 
470
        HeapTuple       newtup;
 
471
        Datum           values[Natts_pg_largeobject];
 
472
        char            nulls[Natts_pg_largeobject];
 
473
        char            replace[Natts_pg_largeobject];
 
474
        CatalogIndexState indstate;
 
475
 
 
476
        Assert(PointerIsValid(obj_desc));
 
477
        Assert(buf != NULL);
 
478
 
 
479
        if (nbytes <= 0)
 
480
                return 0;
 
481
 
 
482
        open_lo_relation();
 
483
 
 
484
        indstate = CatalogOpenIndexes(lo_heap_r);
 
485
 
 
486
        ScanKeyInit(&skey[0],
 
487
                                Anum_pg_largeobject_loid,
 
488
                                BTEqualStrategyNumber, F_OIDEQ,
 
489
                                ObjectIdGetDatum(obj_desc->id));
 
490
 
 
491
        ScanKeyInit(&skey[1],
 
492
                                Anum_pg_largeobject_pageno,
 
493
                                BTGreaterEqualStrategyNumber, F_INT4GE,
 
494
                                Int32GetDatum(pageno));
 
495
 
 
496
        sd = index_beginscan(lo_heap_r, lo_index_r,
 
497
                                                 SnapshotNow, 2, skey);
 
498
 
 
499
        oldtuple = NULL;
 
500
        olddata = NULL;
 
501
        neednextpage = true;
 
502
 
 
503
        while (nwritten < nbytes)
 
504
        {
 
505
                /*
 
506
                 * If possible, get next pre-existing page of the LO.  We assume
 
507
                 * the indexscan will deliver these in order --- but there may be
 
508
                 * holes.
 
509
                 */
 
510
                if (neednextpage)
 
511
                {
 
512
                        if ((oldtuple = index_getnext(sd, ForwardScanDirection)) != NULL)
 
513
                        {
 
514
                                olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
 
515
                                Assert(olddata->pageno >= pageno);
 
516
                        }
 
517
                        neednextpage = false;
 
518
                }
 
519
 
 
520
                /*
 
521
                 * If we have a pre-existing page, see if it is the page we want
 
522
                 * to write, or a later one.
 
523
                 */
 
524
                if (olddata != NULL && olddata->pageno == pageno)
 
525
                {
 
526
                        /*
 
527
                         * Update an existing page with fresh data.
 
528
                         *
 
529
                         * First, load old data into workbuf
 
530
                         */
 
531
                        datafield = &(olddata->data);
 
532
                        pfreeit = false;
 
533
                        if (VARATT_IS_EXTENDED(datafield))
 
534
                        {
 
535
                                datafield = (bytea *)
 
536
                                        heap_tuple_untoast_attr((varattrib *) datafield);
 
537
                                pfreeit = true;
 
538
                        }
 
539
                        len = getbytealen(datafield);
 
540
                        Assert(len <= LOBLKSIZE);
 
541
                        memcpy(workb, VARDATA(datafield), len);
 
542
                        if (pfreeit)
 
543
                                pfree(datafield);
 
544
 
 
545
                        /*
 
546
                         * Fill any hole
 
547
                         */
 
548
                        off = (int) (obj_desc->offset % LOBLKSIZE);
 
549
                        if (off > len)
 
550
                                MemSet(workb + len, 0, off - len);
 
551
 
 
552
                        /*
 
553
                         * Insert appropriate portion of new data
 
554
                         */
 
555
                        n = LOBLKSIZE - off;
 
556
                        n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
 
557
                        memcpy(workb + off, buf + nwritten, n);
 
558
                        nwritten += n;
 
559
                        obj_desc->offset += n;
 
560
                        off += n;
 
561
                        /* compute valid length of new page */
 
562
                        len = (len >= off) ? len : off;
 
563
                        VARATT_SIZEP(&workbuf.hdr) = len + VARHDRSZ;
 
564
 
 
565
                        /*
 
566
                         * Form and insert updated tuple
 
567
                         */
 
568
                        memset(values, 0, sizeof(values));
 
569
                        memset(nulls, ' ', sizeof(nulls));
 
570
                        memset(replace, ' ', sizeof(replace));
 
571
                        values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
 
572
                        replace[Anum_pg_largeobject_data - 1] = 'r';
 
573
                        newtup = heap_modifytuple(oldtuple, lo_heap_r,
 
574
                                                                          values, nulls, replace);
 
575
                        simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
 
576
                        CatalogIndexInsert(indstate, newtup);
 
577
                        heap_freetuple(newtup);
 
578
 
 
579
                        /*
 
580
                         * We're done with this old page.
 
581
                         */
 
582
                        oldtuple = NULL;
 
583
                        olddata = NULL;
 
584
                        neednextpage = true;
 
585
                }
 
586
                else
 
587
                {
 
588
                        /*
 
589
                         * Write a brand new page.
 
590
                         *
 
591
                         * First, fill any hole
 
592
                         */
 
593
                        off = (int) (obj_desc->offset % LOBLKSIZE);
 
594
                        if (off > 0)
 
595
                                MemSet(workb, 0, off);
 
596
 
 
597
                        /*
 
598
                         * Insert appropriate portion of new data
 
599
                         */
 
600
                        n = LOBLKSIZE - off;
 
601
                        n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
 
602
                        memcpy(workb + off, buf + nwritten, n);
 
603
                        nwritten += n;
 
604
                        obj_desc->offset += n;
 
605
                        /* compute valid length of new page */
 
606
                        len = off + n;
 
607
                        VARATT_SIZEP(&workbuf.hdr) = len + VARHDRSZ;
 
608
 
 
609
                        /*
 
610
                         * Form and insert updated tuple
 
611
                         */
 
612
                        memset(values, 0, sizeof(values));
 
613
                        memset(nulls, ' ', sizeof(nulls));
 
614
                        values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
 
615
                        values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
 
616
                        values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
 
617
                        newtup = heap_formtuple(lo_heap_r->rd_att, values, nulls);
 
618
                        simple_heap_insert(lo_heap_r, newtup);
 
619
                        CatalogIndexInsert(indstate, newtup);
 
620
                        heap_freetuple(newtup);
 
621
                }
 
622
                pageno++;
 
623
        }
 
624
 
 
625
        index_endscan(sd);
 
626
 
 
627
        CatalogCloseIndexes(indstate);
 
628
 
 
629
        /*
 
630
         * Advance command counter so that my tuple updates will be seen by
 
631
         * later large-object operations in this transaction.
 
632
         */
 
633
        CommandCounterIncrement();
 
634
 
 
635
        return nwritten;
 
636
}