~ubuntu-branches/ubuntu/oneiric/postgresql-9.1/oneiric-security

« back to all changes in this revision

Viewing changes to src/backend/storage/large_object/inv_api.c

  • Committer: Bazaar Package Importer
  • Author(s): Martin Pitt
  • Date: 2011-05-11 10:41:53 UTC
  • Revision ID: james.westby@ubuntu.com-20110511104153-psbh2o58553fv1m0
Tags: upstream-9.1~beta1
ImportĀ upstreamĀ versionĀ 9.1~beta1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * inv_api.c
 
4
 *        routines for manipulating inversion fs large objects. This file
 
5
 *        contains the user-level large object application interface routines.
 
6
 *
 
7
 *
 
8
 * Note: we access pg_largeobject.data using its C struct declaration.
 
9
 * This is safe because it immediately follows pageno which is an int4 field,
 
10
 * and therefore the data field will always be 4-byte aligned, even if it
 
11
 * is in the short 1-byte-header format.  We have to detoast it since it's
 
12
 * quite likely to be in compressed or short format.  We also need to check
 
13
 * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
 
14
 *
 
15
 * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
 
16
 * does most of the backend code.  We expect that CurrentMemoryContext will
 
17
 * be a short-lived context.  Data that must persist across function calls
 
18
 * is kept either in CacheMemoryContext (the Relation structs) or in the
 
19
 * memory context given to inv_open (for LargeObjectDesc structs).
 
20
 *
 
21
 *
 
22
 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
 
23
 * Portions Copyright (c) 1994, Regents of the University of California
 
24
 *
 
25
 *
 
26
 * IDENTIFICATION
 
27
 *        src/backend/storage/large_object/inv_api.c
 
28
 *
 
29
 *-------------------------------------------------------------------------
 
30
 */
 
31
#include "postgres.h"
 
32
 
 
33
#include "access/genam.h"
 
34
#include "access/heapam.h"
 
35
#include "access/sysattr.h"
 
36
#include "access/tuptoaster.h"
 
37
#include "access/xact.h"
 
38
#include "catalog/catalog.h"
 
39
#include "catalog/dependency.h"
 
40
#include "catalog/indexing.h"
 
41
#include "catalog/objectaccess.h"
 
42
#include "catalog/pg_largeobject.h"
 
43
#include "catalog/pg_largeobject_metadata.h"
 
44
#include "commands/comment.h"
 
45
#include "libpq/libpq-fs.h"
 
46
#include "miscadmin.h"
 
47
#include "storage/large_object.h"
 
48
#include "utils/fmgroids.h"
 
49
#include "utils/rel.h"
 
50
#include "utils/resowner.h"
 
51
#include "utils/snapmgr.h"
 
52
#include "utils/syscache.h"
 
53
#include "utils/tqual.h"
 
54
 
 
55
 
 
56
/*
 
57
 * All accesses to pg_largeobject and its index make use of a single Relation
 
58
 * reference, so that we only need to open pg_relation once per transaction.
 
59
 * To avoid problems when the first such reference occurs inside a
 
60
 * subtransaction, we execute a slightly klugy maneuver to assign ownership of
 
61
 * the Relation reference to TopTransactionResourceOwner.
 
62
 */
 
63
static Relation lo_heap_r = NULL;
 
64
static Relation lo_index_r = NULL;
 
65
 
 
66
 
 
67
/*
 
68
 * Open pg_largeobject and its index, if not already done in current xact
 
69
 */
 
70
static void
 
71
open_lo_relation(void)
 
72
{
 
73
        ResourceOwner currentOwner;
 
74
 
 
75
        if (lo_heap_r && lo_index_r)
 
76
                return;                                 /* already open in current xact */
 
77
 
 
78
        /* Arrange for the top xact to own these relation references */
 
79
        currentOwner = CurrentResourceOwner;
 
80
        PG_TRY();
 
81
        {
 
82
                CurrentResourceOwner = TopTransactionResourceOwner;
 
83
 
 
84
                /* Use RowExclusiveLock since we might either read or write */
 
85
                if (lo_heap_r == NULL)
 
86
                        lo_heap_r = heap_open(LargeObjectRelationId, RowExclusiveLock);
 
87
                if (lo_index_r == NULL)
 
88
                        lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
 
89
        }
 
90
        PG_CATCH();
 
91
        {
 
92
                /* Ensure CurrentResourceOwner is restored on error */
 
93
                CurrentResourceOwner = currentOwner;
 
94
                PG_RE_THROW();
 
95
        }
 
96
        PG_END_TRY();
 
97
        CurrentResourceOwner = currentOwner;
 
98
}
 
99
 
 
100
/*
 
101
 * Clean up at main transaction end
 
102
 */
 
103
void
 
104
close_lo_relation(bool isCommit)
 
105
{
 
106
        if (lo_heap_r || lo_index_r)
 
107
        {
 
108
                /*
 
109
                 * Only bother to close if committing; else abort cleanup will handle
 
110
                 * it
 
111
                 */
 
112
                if (isCommit)
 
113
                {
 
114
                        ResourceOwner currentOwner;
 
115
 
 
116
                        currentOwner = CurrentResourceOwner;
 
117
                        PG_TRY();
 
118
                        {
 
119
                                CurrentResourceOwner = TopTransactionResourceOwner;
 
120
 
 
121
                                if (lo_index_r)
 
122
                                        index_close(lo_index_r, NoLock);
 
123
                                if (lo_heap_r)
 
124
                                        heap_close(lo_heap_r, NoLock);
 
125
                        }
 
126
                        PG_CATCH();
 
127
                        {
 
128
                                /* Ensure CurrentResourceOwner is restored on error */
 
129
                                CurrentResourceOwner = currentOwner;
 
130
                                PG_RE_THROW();
 
131
                        }
 
132
                        PG_END_TRY();
 
133
                        CurrentResourceOwner = currentOwner;
 
134
                }
 
135
                lo_heap_r = NULL;
 
136
                lo_index_r = NULL;
 
137
        }
 
138
}
 
139
 
 
140
 
 
141
/*
 
142
 * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
 
143
 * read with can be specified.
 
144
 */
 
145
static bool
 
146
myLargeObjectExists(Oid loid, Snapshot snapshot)
 
147
{
 
148
        Relation        pg_lo_meta;
 
149
        ScanKeyData skey[1];
 
150
        SysScanDesc sd;
 
151
        HeapTuple       tuple;
 
152
        bool            retval = false;
 
153
 
 
154
        ScanKeyInit(&skey[0],
 
155
                                ObjectIdAttributeNumber,
 
156
                                BTEqualStrategyNumber, F_OIDEQ,
 
157
                                ObjectIdGetDatum(loid));
 
158
 
 
159
        pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
 
160
                                                   AccessShareLock);
 
161
 
 
162
        sd = systable_beginscan(pg_lo_meta,
 
163
                                                        LargeObjectMetadataOidIndexId, true,
 
164
                                                        snapshot, 1, skey);
 
165
 
 
166
        tuple = systable_getnext(sd);
 
167
        if (HeapTupleIsValid(tuple))
 
168
                retval = true;
 
169
 
 
170
        systable_endscan(sd);
 
171
 
 
172
        heap_close(pg_lo_meta, AccessShareLock);
 
173
 
 
174
        return retval;
 
175
}
 
176
 
 
177
 
 
178
static int32
 
179
getbytealen(bytea *data)
 
180
{
 
181
        Assert(!VARATT_IS_EXTENDED(data));
 
182
        if (VARSIZE(data) < VARHDRSZ)
 
183
                elog(ERROR, "invalid VARSIZE(data)");
 
184
        return (VARSIZE(data) - VARHDRSZ);
 
185
}
 
186
 
 
187
 
 
188
/*
 
189
 *      inv_create -- create a new large object
 
190
 *
 
191
 *      Arguments:
 
192
 *        lobjId - OID to use for new large object, or InvalidOid to pick one
 
193
 *
 
194
 *      Returns:
 
195
 *        OID of new object
 
196
 *
 
197
 * If lobjId is not InvalidOid, then an error occurs if the OID is already
 
198
 * in use.
 
199
 */
 
200
Oid
 
201
inv_create(Oid lobjId)
 
202
{
 
203
        Oid                     lobjId_new;
 
204
 
 
205
        /*
 
206
         * Create a new largeobject with empty data pages
 
207
         */
 
208
        lobjId_new = LargeObjectCreate(lobjId);
 
209
 
 
210
        /*
 
211
         * dependency on the owner of largeobject
 
212
         *
 
213
         * The reason why we use LargeObjectRelationId instead of
 
214
         * LargeObjectMetadataRelationId here is to provide backward compatibility
 
215
         * to the applications which utilize a knowledge about internal layout of
 
216
         * system catalogs. OID of pg_largeobject_metadata and loid of
 
217
         * pg_largeobject are same value, so there are no actual differences here.
 
218
         */
 
219
        recordDependencyOnOwner(LargeObjectRelationId,
 
220
                                                        lobjId_new, GetUserId());
 
221
 
 
222
        /* Post creation hook for new large object */
 
223
        InvokeObjectAccessHook(OAT_POST_CREATE,
 
224
                                                   LargeObjectRelationId, lobjId_new, 0);
 
225
 
 
226
        /*
 
227
         * Advance command counter to make new tuple visible to later operations.
 
228
         */
 
229
        CommandCounterIncrement();
 
230
 
 
231
        return lobjId_new;
 
232
}
 
233
 
 
234
/*
 
235
 *      inv_open -- access an existing large object.
 
236
 *
 
237
 *              Returns:
 
238
 *                Large object descriptor, appropriately filled in.  The descriptor
 
239
 *                and subsidiary data are allocated in the specified memory context,
 
240
 *                which must be suitably long-lived for the caller's purposes.
 
241
 */
 
242
LargeObjectDesc *
 
243
inv_open(Oid lobjId, int flags, MemoryContext mcxt)
 
244
{
 
245
        LargeObjectDesc *retval;
 
246
 
 
247
        retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
 
248
                                                                                                        sizeof(LargeObjectDesc));
 
249
 
 
250
        retval->id = lobjId;
 
251
        retval->subid = GetCurrentSubTransactionId();
 
252
        retval->offset = 0;
 
253
 
 
254
        if (flags & INV_WRITE)
 
255
        {
 
256
                retval->snapshot = SnapshotNow;
 
257
                retval->flags = IFS_WRLOCK | IFS_RDLOCK;
 
258
        }
 
259
        else if (flags & INV_READ)
 
260
        {
 
261
                /*
 
262
                 * We must register the snapshot in TopTransaction's resowner, because
 
263
                 * it must stay alive until the LO is closed rather than until the
 
264
                 * current portal shuts down.
 
265
                 */
 
266
                retval->snapshot = RegisterSnapshotOnOwner(GetActiveSnapshot(),
 
267
                                                                                                TopTransactionResourceOwner);
 
268
                retval->flags = IFS_RDLOCK;
 
269
        }
 
270
        else
 
271
                elog(ERROR, "invalid flags: %d", flags);
 
272
 
 
273
        /* Can't use LargeObjectExists here because it always uses SnapshotNow */
 
274
        if (!myLargeObjectExists(lobjId, retval->snapshot))
 
275
                ereport(ERROR,
 
276
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
 
277
                                 errmsg("large object %u does not exist", lobjId)));
 
278
 
 
279
        return retval;
 
280
}
 
281
 
 
282
/*
 
283
 * Closes a large object descriptor previously made by inv_open(), and
 
284
 * releases the long-term memory used by it.
 
285
 */
 
286
void
 
287
inv_close(LargeObjectDesc *obj_desc)
 
288
{
 
289
        Assert(PointerIsValid(obj_desc));
 
290
 
 
291
        if (obj_desc->snapshot != SnapshotNow)
 
292
                UnregisterSnapshotFromOwner(obj_desc->snapshot,
 
293
                                                                        TopTransactionResourceOwner);
 
294
 
 
295
        pfree(obj_desc);
 
296
}
 
297
 
 
298
/*
 
299
 * Destroys an existing large object (not to be confused with a descriptor!)
 
300
 *
 
301
 * returns -1 if failed
 
302
 */
 
303
int
 
304
inv_drop(Oid lobjId)
 
305
{
 
306
        ObjectAddress object;
 
307
 
 
308
        /*
 
309
         * Delete any comments and dependencies on the large object
 
310
         */
 
311
        object.classId = LargeObjectRelationId;
 
312
        object.objectId = lobjId;
 
313
        object.objectSubId = 0;
 
314
        performDeletion(&object, DROP_CASCADE);
 
315
 
 
316
        /*
 
317
         * Advance command counter so that tuple removal will be seen by later
 
318
         * large-object operations in this transaction.
 
319
         */
 
320
        CommandCounterIncrement();
 
321
 
 
322
        return 1;
 
323
}
 
324
 
 
325
/*
 
326
 * Determine size of a large object
 
327
 *
 
328
 * NOTE: LOs can contain gaps, just like Unix files.  We actually return
 
329
 * the offset of the last byte + 1.
 
330
 */
 
331
static uint32
 
332
inv_getsize(LargeObjectDesc *obj_desc)
 
333
{
 
334
        uint32          lastbyte = 0;
 
335
        ScanKeyData skey[1];
 
336
        SysScanDesc sd;
 
337
        HeapTuple       tuple;
 
338
 
 
339
        Assert(PointerIsValid(obj_desc));
 
340
 
 
341
        open_lo_relation();
 
342
 
 
343
        ScanKeyInit(&skey[0],
 
344
                                Anum_pg_largeobject_loid,
 
345
                                BTEqualStrategyNumber, F_OIDEQ,
 
346
                                ObjectIdGetDatum(obj_desc->id));
 
347
 
 
348
        sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
 
349
                                                                        obj_desc->snapshot, 1, skey);
 
350
 
 
351
        /*
 
352
         * Because the pg_largeobject index is on both loid and pageno, but we
 
353
         * constrain only loid, a backwards scan should visit all pages of the
 
354
         * large object in reverse pageno order.  So, it's sufficient to examine
 
355
         * the first valid tuple (== last valid page).
 
356
         */
 
357
        tuple = systable_getnext_ordered(sd, BackwardScanDirection);
 
358
        if (HeapTupleIsValid(tuple))
 
359
        {
 
360
                Form_pg_largeobject data;
 
361
                bytea      *datafield;
 
362
                bool            pfreeit;
 
363
 
 
364
                if (HeapTupleHasNulls(tuple))   /* paranoia */
 
365
                        elog(ERROR, "null field found in pg_largeobject");
 
366
                data = (Form_pg_largeobject) GETSTRUCT(tuple);
 
367
                datafield = &(data->data);              /* see note at top of file */
 
368
                pfreeit = false;
 
369
                if (VARATT_IS_EXTENDED(datafield))
 
370
                {
 
371
                        datafield = (bytea *)
 
372
                                heap_tuple_untoast_attr((struct varlena *) datafield);
 
373
                        pfreeit = true;
 
374
                }
 
375
                lastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield);
 
376
                if (pfreeit)
 
377
                        pfree(datafield);
 
378
        }
 
379
 
 
380
        systable_endscan_ordered(sd);
 
381
 
 
382
        return lastbyte;
 
383
}
 
384
 
 
385
int
 
386
inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
 
387
{
 
388
        Assert(PointerIsValid(obj_desc));
 
389
 
 
390
        switch (whence)
 
391
        {
 
392
                case SEEK_SET:
 
393
                        if (offset < 0)
 
394
                                elog(ERROR, "invalid seek offset: %d", offset);
 
395
                        obj_desc->offset = offset;
 
396
                        break;
 
397
                case SEEK_CUR:
 
398
                        if (offset < 0 && obj_desc->offset < ((uint32) (-offset)))
 
399
                                elog(ERROR, "invalid seek offset: %d", offset);
 
400
                        obj_desc->offset += offset;
 
401
                        break;
 
402
                case SEEK_END:
 
403
                        {
 
404
                                uint32          size = inv_getsize(obj_desc);
 
405
 
 
406
                                if (offset < 0 && size < ((uint32) (-offset)))
 
407
                                        elog(ERROR, "invalid seek offset: %d", offset);
 
408
                                obj_desc->offset = size + offset;
 
409
                        }
 
410
                        break;
 
411
                default:
 
412
                        elog(ERROR, "invalid whence: %d", whence);
 
413
        }
 
414
        return obj_desc->offset;
 
415
}
 
416
 
 
417
int
 
418
inv_tell(LargeObjectDesc *obj_desc)
 
419
{
 
420
        Assert(PointerIsValid(obj_desc));
 
421
 
 
422
        return obj_desc->offset;
 
423
}
 
424
 
 
425
int
 
426
inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
 
427
{
 
428
        int                     nread = 0;
 
429
        int                     n;
 
430
        int                     off;
 
431
        int                     len;
 
432
        int32           pageno = (int32) (obj_desc->offset / LOBLKSIZE);
 
433
        uint32          pageoff;
 
434
        ScanKeyData skey[2];
 
435
        SysScanDesc sd;
 
436
        HeapTuple       tuple;
 
437
 
 
438
        Assert(PointerIsValid(obj_desc));
 
439
        Assert(buf != NULL);
 
440
 
 
441
        if (nbytes <= 0)
 
442
                return 0;
 
443
 
 
444
        open_lo_relation();
 
445
 
 
446
        ScanKeyInit(&skey[0],
 
447
                                Anum_pg_largeobject_loid,
 
448
                                BTEqualStrategyNumber, F_OIDEQ,
 
449
                                ObjectIdGetDatum(obj_desc->id));
 
450
 
 
451
        ScanKeyInit(&skey[1],
 
452
                                Anum_pg_largeobject_pageno,
 
453
                                BTGreaterEqualStrategyNumber, F_INT4GE,
 
454
                                Int32GetDatum(pageno));
 
455
 
 
456
        sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
 
457
                                                                        obj_desc->snapshot, 2, skey);
 
458
 
 
459
        while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
 
460
        {
 
461
                Form_pg_largeobject data;
 
462
                bytea      *datafield;
 
463
                bool            pfreeit;
 
464
 
 
465
                if (HeapTupleHasNulls(tuple))   /* paranoia */
 
466
                        elog(ERROR, "null field found in pg_largeobject");
 
467
                data = (Form_pg_largeobject) GETSTRUCT(tuple);
 
468
 
 
469
                /*
 
470
                 * We expect the indexscan will deliver pages in order.  However,
 
471
                 * there may be missing pages if the LO contains unwritten "holes". We
 
472
                 * want missing sections to read out as zeroes.
 
473
                 */
 
474
                pageoff = ((uint32) data->pageno) * LOBLKSIZE;
 
475
                if (pageoff > obj_desc->offset)
 
476
                {
 
477
                        n = pageoff - obj_desc->offset;
 
478
                        n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
 
479
                        MemSet(buf + nread, 0, n);
 
480
                        nread += n;
 
481
                        obj_desc->offset += n;
 
482
                }
 
483
 
 
484
                if (nread < nbytes)
 
485
                {
 
486
                        Assert(obj_desc->offset >= pageoff);
 
487
                        off = (int) (obj_desc->offset - pageoff);
 
488
                        Assert(off >= 0 && off < LOBLKSIZE);
 
489
 
 
490
                        datafield = &(data->data);      /* see note at top of file */
 
491
                        pfreeit = false;
 
492
                        if (VARATT_IS_EXTENDED(datafield))
 
493
                        {
 
494
                                datafield = (bytea *)
 
495
                                        heap_tuple_untoast_attr((struct varlena *) datafield);
 
496
                                pfreeit = true;
 
497
                        }
 
498
                        len = getbytealen(datafield);
 
499
                        if (len > off)
 
500
                        {
 
501
                                n = len - off;
 
502
                                n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
 
503
                                memcpy(buf + nread, VARDATA(datafield) + off, n);
 
504
                                nread += n;
 
505
                                obj_desc->offset += n;
 
506
                        }
 
507
                        if (pfreeit)
 
508
                                pfree(datafield);
 
509
                }
 
510
 
 
511
                if (nread >= nbytes)
 
512
                        break;
 
513
        }
 
514
 
 
515
        systable_endscan_ordered(sd);
 
516
 
 
517
        return nread;
 
518
}
 
519
 
 
520
int
 
521
inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
 
522
{
 
523
        int                     nwritten = 0;
 
524
        int                     n;
 
525
        int                     off;
 
526
        int                     len;
 
527
        int32           pageno = (int32) (obj_desc->offset / LOBLKSIZE);
 
528
        ScanKeyData skey[2];
 
529
        SysScanDesc sd;
 
530
        HeapTuple       oldtuple;
 
531
        Form_pg_largeobject olddata;
 
532
        bool            neednextpage;
 
533
        bytea      *datafield;
 
534
        bool            pfreeit;
 
535
        struct
 
536
        {
 
537
                bytea           hdr;
 
538
                char            data[LOBLKSIZE];        /* make struct big enough */
 
539
                int32           align_it;       /* ensure struct is aligned well enough */
 
540
        }                       workbuf;
 
541
        char       *workb = VARDATA(&workbuf.hdr);
 
542
        HeapTuple       newtup;
 
543
        Datum           values[Natts_pg_largeobject];
 
544
        bool            nulls[Natts_pg_largeobject];
 
545
        bool            replace[Natts_pg_largeobject];
 
546
        CatalogIndexState indstate;
 
547
 
 
548
        Assert(PointerIsValid(obj_desc));
 
549
        Assert(buf != NULL);
 
550
 
 
551
        /* enforce writability because snapshot is probably wrong otherwise */
 
552
        if ((obj_desc->flags & IFS_WRLOCK) == 0)
 
553
                ereport(ERROR,
 
554
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 
555
                                 errmsg("large object %u was not opened for writing",
 
556
                                                obj_desc->id)));
 
557
 
 
558
        /* check existence of the target largeobject */
 
559
        if (!LargeObjectExists(obj_desc->id))
 
560
                ereport(ERROR,
 
561
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
 
562
                           errmsg("large object %u was already dropped", obj_desc->id)));
 
563
 
 
564
        if (nbytes <= 0)
 
565
                return 0;
 
566
 
 
567
        open_lo_relation();
 
568
 
 
569
        indstate = CatalogOpenIndexes(lo_heap_r);
 
570
 
 
571
        ScanKeyInit(&skey[0],
 
572
                                Anum_pg_largeobject_loid,
 
573
                                BTEqualStrategyNumber, F_OIDEQ,
 
574
                                ObjectIdGetDatum(obj_desc->id));
 
575
 
 
576
        ScanKeyInit(&skey[1],
 
577
                                Anum_pg_largeobject_pageno,
 
578
                                BTGreaterEqualStrategyNumber, F_INT4GE,
 
579
                                Int32GetDatum(pageno));
 
580
 
 
581
        sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
 
582
                                                                        obj_desc->snapshot, 2, skey);
 
583
 
 
584
        oldtuple = NULL;
 
585
        olddata = NULL;
 
586
        neednextpage = true;
 
587
 
 
588
        while (nwritten < nbytes)
 
589
        {
 
590
                /*
 
591
                 * If possible, get next pre-existing page of the LO.  We expect the
 
592
                 * indexscan will deliver these in order --- but there may be holes.
 
593
                 */
 
594
                if (neednextpage)
 
595
                {
 
596
                        if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
 
597
                        {
 
598
                                if (HeapTupleHasNulls(oldtuple))                /* paranoia */
 
599
                                        elog(ERROR, "null field found in pg_largeobject");
 
600
                                olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
 
601
                                Assert(olddata->pageno >= pageno);
 
602
                        }
 
603
                        neednextpage = false;
 
604
                }
 
605
 
 
606
                /*
 
607
                 * If we have a pre-existing page, see if it is the page we want to
 
608
                 * write, or a later one.
 
609
                 */
 
610
                if (olddata != NULL && olddata->pageno == pageno)
 
611
                {
 
612
                        /*
 
613
                         * Update an existing page with fresh data.
 
614
                         *
 
615
                         * First, load old data into workbuf
 
616
                         */
 
617
                        datafield = &(olddata->data);           /* see note at top of file */
 
618
                        pfreeit = false;
 
619
                        if (VARATT_IS_EXTENDED(datafield))
 
620
                        {
 
621
                                datafield = (bytea *)
 
622
                                        heap_tuple_untoast_attr((struct varlena *) datafield);
 
623
                                pfreeit = true;
 
624
                        }
 
625
                        len = getbytealen(datafield);
 
626
                        Assert(len <= LOBLKSIZE);
 
627
                        memcpy(workb, VARDATA(datafield), len);
 
628
                        if (pfreeit)
 
629
                                pfree(datafield);
 
630
 
 
631
                        /*
 
632
                         * Fill any hole
 
633
                         */
 
634
                        off = (int) (obj_desc->offset % LOBLKSIZE);
 
635
                        if (off > len)
 
636
                                MemSet(workb + len, 0, off - len);
 
637
 
 
638
                        /*
 
639
                         * Insert appropriate portion of new data
 
640
                         */
 
641
                        n = LOBLKSIZE - off;
 
642
                        n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
 
643
                        memcpy(workb + off, buf + nwritten, n);
 
644
                        nwritten += n;
 
645
                        obj_desc->offset += n;
 
646
                        off += n;
 
647
                        /* compute valid length of new page */
 
648
                        len = (len >= off) ? len : off;
 
649
                        SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
 
650
 
 
651
                        /*
 
652
                         * Form and insert updated tuple
 
653
                         */
 
654
                        memset(values, 0, sizeof(values));
 
655
                        memset(nulls, false, sizeof(nulls));
 
656
                        memset(replace, false, sizeof(replace));
 
657
                        values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
 
658
                        replace[Anum_pg_largeobject_data - 1] = true;
 
659
                        newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
 
660
                                                                           values, nulls, replace);
 
661
                        simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
 
662
                        CatalogIndexInsert(indstate, newtup);
 
663
                        heap_freetuple(newtup);
 
664
 
 
665
                        /*
 
666
                         * We're done with this old page.
 
667
                         */
 
668
                        oldtuple = NULL;
 
669
                        olddata = NULL;
 
670
                        neednextpage = true;
 
671
                }
 
672
                else
 
673
                {
 
674
                        /*
 
675
                         * Write a brand new page.
 
676
                         *
 
677
                         * First, fill any hole
 
678
                         */
 
679
                        off = (int) (obj_desc->offset % LOBLKSIZE);
 
680
                        if (off > 0)
 
681
                                MemSet(workb, 0, off);
 
682
 
 
683
                        /*
 
684
                         * Insert appropriate portion of new data
 
685
                         */
 
686
                        n = LOBLKSIZE - off;
 
687
                        n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
 
688
                        memcpy(workb + off, buf + nwritten, n);
 
689
                        nwritten += n;
 
690
                        obj_desc->offset += n;
 
691
                        /* compute valid length of new page */
 
692
                        len = off + n;
 
693
                        SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
 
694
 
 
695
                        /*
 
696
                         * Form and insert updated tuple
 
697
                         */
 
698
                        memset(values, 0, sizeof(values));
 
699
                        memset(nulls, false, sizeof(nulls));
 
700
                        values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
 
701
                        values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
 
702
                        values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
 
703
                        newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
 
704
                        simple_heap_insert(lo_heap_r, newtup);
 
705
                        CatalogIndexInsert(indstate, newtup);
 
706
                        heap_freetuple(newtup);
 
707
                }
 
708
                pageno++;
 
709
        }
 
710
 
 
711
        systable_endscan_ordered(sd);
 
712
 
 
713
        CatalogCloseIndexes(indstate);
 
714
 
 
715
        /*
 
716
         * Advance command counter so that my tuple updates will be seen by later
 
717
         * large-object operations in this transaction.
 
718
         */
 
719
        CommandCounterIncrement();
 
720
 
 
721
        return nwritten;
 
722
}
 
723
 
 
724
void
 
725
inv_truncate(LargeObjectDesc *obj_desc, int len)
 
726
{
 
727
        int32           pageno = (int32) (len / LOBLKSIZE);
 
728
        int                     off;
 
729
        ScanKeyData skey[2];
 
730
        SysScanDesc sd;
 
731
        HeapTuple       oldtuple;
 
732
        Form_pg_largeobject olddata;
 
733
        struct
 
734
        {
 
735
                bytea           hdr;
 
736
                char            data[LOBLKSIZE];        /* make struct big enough */
 
737
                int32           align_it;       /* ensure struct is aligned well enough */
 
738
        }                       workbuf;
 
739
        char       *workb = VARDATA(&workbuf.hdr);
 
740
        HeapTuple       newtup;
 
741
        Datum           values[Natts_pg_largeobject];
 
742
        bool            nulls[Natts_pg_largeobject];
 
743
        bool            replace[Natts_pg_largeobject];
 
744
        CatalogIndexState indstate;
 
745
 
 
746
        Assert(PointerIsValid(obj_desc));
 
747
 
 
748
        /* enforce writability because snapshot is probably wrong otherwise */
 
749
        if ((obj_desc->flags & IFS_WRLOCK) == 0)
 
750
                ereport(ERROR,
 
751
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 
752
                                 errmsg("large object %u was not opened for writing",
 
753
                                                obj_desc->id)));
 
754
 
 
755
        /* check existence of the target largeobject */
 
756
        if (!LargeObjectExists(obj_desc->id))
 
757
                ereport(ERROR,
 
758
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
 
759
                           errmsg("large object %u was already dropped", obj_desc->id)));
 
760
 
 
761
        open_lo_relation();
 
762
 
 
763
        indstate = CatalogOpenIndexes(lo_heap_r);
 
764
 
 
765
        /*
 
766
         * Set up to find all pages with desired loid and pageno >= target
 
767
         */
 
768
        ScanKeyInit(&skey[0],
 
769
                                Anum_pg_largeobject_loid,
 
770
                                BTEqualStrategyNumber, F_OIDEQ,
 
771
                                ObjectIdGetDatum(obj_desc->id));
 
772
 
 
773
        ScanKeyInit(&skey[1],
 
774
                                Anum_pg_largeobject_pageno,
 
775
                                BTGreaterEqualStrategyNumber, F_INT4GE,
 
776
                                Int32GetDatum(pageno));
 
777
 
 
778
        sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
 
779
                                                                        obj_desc->snapshot, 2, skey);
 
780
 
 
781
        /*
 
782
         * If possible, get the page the truncation point is in. The truncation
 
783
         * point may be beyond the end of the LO or in a hole.
 
784
         */
 
785
        olddata = NULL;
 
786
        if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
 
787
        {
 
788
                if (HeapTupleHasNulls(oldtuple))                /* paranoia */
 
789
                        elog(ERROR, "null field found in pg_largeobject");
 
790
                olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
 
791
                Assert(olddata->pageno >= pageno);
 
792
        }
 
793
 
 
794
        /*
 
795
         * If we found the page of the truncation point we need to truncate the
 
796
         * data in it.  Otherwise if we're in a hole, we need to create a page to
 
797
         * mark the end of data.
 
798
         */
 
799
        if (olddata != NULL && olddata->pageno == pageno)
 
800
        {
 
801
                /* First, load old data into workbuf */
 
802
                bytea      *datafield = &(olddata->data);               /* see note at top of
 
803
                                                                                                                 * file */
 
804
                bool            pfreeit = false;
 
805
                int                     pagelen;
 
806
 
 
807
                if (VARATT_IS_EXTENDED(datafield))
 
808
                {
 
809
                        datafield = (bytea *)
 
810
                                heap_tuple_untoast_attr((struct varlena *) datafield);
 
811
                        pfreeit = true;
 
812
                }
 
813
                pagelen = getbytealen(datafield);
 
814
                Assert(pagelen <= LOBLKSIZE);
 
815
                memcpy(workb, VARDATA(datafield), pagelen);
 
816
                if (pfreeit)
 
817
                        pfree(datafield);
 
818
 
 
819
                /*
 
820
                 * Fill any hole
 
821
                 */
 
822
                off = len % LOBLKSIZE;
 
823
                if (off > pagelen)
 
824
                        MemSet(workb + pagelen, 0, off - pagelen);
 
825
 
 
826
                /* compute length of new page */
 
827
                SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
 
828
 
 
829
                /*
 
830
                 * Form and insert updated tuple
 
831
                 */
 
832
                memset(values, 0, sizeof(values));
 
833
                memset(nulls, false, sizeof(nulls));
 
834
                memset(replace, false, sizeof(replace));
 
835
                values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
 
836
                replace[Anum_pg_largeobject_data - 1] = true;
 
837
                newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
 
838
                                                                   values, nulls, replace);
 
839
                simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
 
840
                CatalogIndexInsert(indstate, newtup);
 
841
                heap_freetuple(newtup);
 
842
        }
 
843
        else
 
844
        {
 
845
                /*
 
846
                 * If the first page we found was after the truncation point, we're in
 
847
                 * a hole that we'll fill, but we need to delete the later page
 
848
                 * because the loop below won't visit it again.
 
849
                 */
 
850
                if (olddata != NULL)
 
851
                {
 
852
                        Assert(olddata->pageno > pageno);
 
853
                        simple_heap_delete(lo_heap_r, &oldtuple->t_self);
 
854
                }
 
855
 
 
856
                /*
 
857
                 * Write a brand new page.
 
858
                 *
 
859
                 * Fill the hole up to the truncation point
 
860
                 */
 
861
                off = len % LOBLKSIZE;
 
862
                if (off > 0)
 
863
                        MemSet(workb, 0, off);
 
864
 
 
865
                /* compute length of new page */
 
866
                SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
 
867
 
 
868
                /*
 
869
                 * Form and insert new tuple
 
870
                 */
 
871
                memset(values, 0, sizeof(values));
 
872
                memset(nulls, false, sizeof(nulls));
 
873
                values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
 
874
                values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
 
875
                values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
 
876
                newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
 
877
                simple_heap_insert(lo_heap_r, newtup);
 
878
                CatalogIndexInsert(indstate, newtup);
 
879
                heap_freetuple(newtup);
 
880
        }
 
881
 
 
882
        /*
 
883
         * Delete any pages after the truncation point.  If the initial search
 
884
         * didn't find a page, then of course there's nothing more to do.
 
885
         */
 
886
        if (olddata != NULL)
 
887
        {
 
888
                while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
 
889
                {
 
890
                        simple_heap_delete(lo_heap_r, &oldtuple->t_self);
 
891
                }
 
892
        }
 
893
 
 
894
        systable_endscan_ordered(sd);
 
895
 
 
896
        CatalogCloseIndexes(indstate);
 
897
 
 
898
        /*
 
899
         * Advance command counter so that tuple updates will be seen by later
 
900
         * large-object operations in this transaction.
 
901
         */
 
902
        CommandCounterIncrement();
 
903
}