1
/*-------------------------------------------------------------------------
4
* routines for manipulating inversion fs large objects. This file
5
* contains the user-level large object application interface routines.
8
* Note: we access pg_largeobject.data using its C struct declaration.
9
* This is safe because it immediately follows pageno which is an int4 field,
10
* and therefore the data field will always be 4-byte aligned, even if it
11
* is in the short 1-byte-header format. We have to detoast it since it's
12
* quite likely to be in compressed or short format. We also need to check
13
* for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
15
* Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16
* does most of the backend code. We expect that CurrentMemoryContext will
17
* be a short-lived context. Data that must persist across function calls
18
* is kept either in CacheMemoryContext (the Relation structs) or in the
19
* memory context given to inv_open (for LargeObjectDesc structs).
22
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
23
* Portions Copyright (c) 1994, Regents of the University of California
27
* src/backend/storage/large_object/inv_api.c
29
*-------------------------------------------------------------------------
33
#include "access/genam.h"
34
#include "access/heapam.h"
35
#include "access/sysattr.h"
36
#include "access/tuptoaster.h"
37
#include "access/xact.h"
38
#include "catalog/catalog.h"
39
#include "catalog/dependency.h"
40
#include "catalog/indexing.h"
41
#include "catalog/objectaccess.h"
42
#include "catalog/pg_largeobject.h"
43
#include "catalog/pg_largeobject_metadata.h"
44
#include "commands/comment.h"
45
#include "libpq/libpq-fs.h"
46
#include "miscadmin.h"
47
#include "storage/large_object.h"
48
#include "utils/fmgroids.h"
49
#include "utils/rel.h"
50
#include "utils/resowner.h"
51
#include "utils/snapmgr.h"
52
#include "utils/syscache.h"
53
#include "utils/tqual.h"
57
* All accesses to pg_largeobject and its index make use of a single Relation
58
* reference, so that we only need to open pg_relation once per transaction.
59
* To avoid problems when the first such reference occurs inside a
60
* subtransaction, we execute a slightly klugy maneuver to assign ownership of
61
* the Relation reference to TopTransactionResourceOwner.
63
static Relation lo_heap_r = NULL;
64
static Relation lo_index_r = NULL;
68
* Open pg_largeobject and its index, if not already done in current xact
71
open_lo_relation(void)
73
ResourceOwner currentOwner;
75
if (lo_heap_r && lo_index_r)
76
return; /* already open in current xact */
78
/* Arrange for the top xact to own these relation references */
79
currentOwner = CurrentResourceOwner;
82
CurrentResourceOwner = TopTransactionResourceOwner;
84
/* Use RowExclusiveLock since we might either read or write */
85
if (lo_heap_r == NULL)
86
lo_heap_r = heap_open(LargeObjectRelationId, RowExclusiveLock);
87
if (lo_index_r == NULL)
88
lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
92
/* Ensure CurrentResourceOwner is restored on error */
93
CurrentResourceOwner = currentOwner;
97
CurrentResourceOwner = currentOwner;
101
* Clean up at main transaction end
104
close_lo_relation(bool isCommit)
106
if (lo_heap_r || lo_index_r)
109
* Only bother to close if committing; else abort cleanup will handle
114
ResourceOwner currentOwner;
116
currentOwner = CurrentResourceOwner;
119
CurrentResourceOwner = TopTransactionResourceOwner;
122
index_close(lo_index_r, NoLock);
124
heap_close(lo_heap_r, NoLock);
128
/* Ensure CurrentResourceOwner is restored on error */
129
CurrentResourceOwner = currentOwner;
133
CurrentResourceOwner = currentOwner;
142
* Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
143
* read with can be specified.
146
myLargeObjectExists(Oid loid, Snapshot snapshot)
154
ScanKeyInit(&skey[0],
155
ObjectIdAttributeNumber,
156
BTEqualStrategyNumber, F_OIDEQ,
157
ObjectIdGetDatum(loid));
159
pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
162
sd = systable_beginscan(pg_lo_meta,
163
LargeObjectMetadataOidIndexId, true,
166
tuple = systable_getnext(sd);
167
if (HeapTupleIsValid(tuple))
170
systable_endscan(sd);
172
heap_close(pg_lo_meta, AccessShareLock);
179
getbytealen(bytea *data)
181
Assert(!VARATT_IS_EXTENDED(data));
182
if (VARSIZE(data) < VARHDRSZ)
183
elog(ERROR, "invalid VARSIZE(data)");
184
return (VARSIZE(data) - VARHDRSZ);
189
* inv_create -- create a new large object
192
* lobjId - OID to use for new large object, or InvalidOid to pick one
197
* If lobjId is not InvalidOid, then an error occurs if the OID is already
201
inv_create(Oid lobjId)
206
* Create a new largeobject with empty data pages
208
lobjId_new = LargeObjectCreate(lobjId);
211
* dependency on the owner of largeobject
213
* The reason why we use LargeObjectRelationId instead of
214
* LargeObjectMetadataRelationId here is to provide backward compatibility
215
* to the applications which utilize a knowledge about internal layout of
216
* system catalogs. OID of pg_largeobject_metadata and loid of
217
* pg_largeobject are same value, so there are no actual differences here.
219
recordDependencyOnOwner(LargeObjectRelationId,
220
lobjId_new, GetUserId());
222
/* Post creation hook for new large object */
223
InvokeObjectAccessHook(OAT_POST_CREATE,
224
LargeObjectRelationId, lobjId_new, 0);
227
* Advance command counter to make new tuple visible to later operations.
229
CommandCounterIncrement();
235
* inv_open -- access an existing large object.
238
* Large object descriptor, appropriately filled in. The descriptor
239
* and subsidiary data are allocated in the specified memory context,
240
* which must be suitably long-lived for the caller's purposes.
243
inv_open(Oid lobjId, int flags, MemoryContext mcxt)
245
LargeObjectDesc *retval;
247
retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
248
sizeof(LargeObjectDesc));
251
retval->subid = GetCurrentSubTransactionId();
254
if (flags & INV_WRITE)
256
retval->snapshot = SnapshotNow;
257
retval->flags = IFS_WRLOCK | IFS_RDLOCK;
259
else if (flags & INV_READ)
262
* We must register the snapshot in TopTransaction's resowner, because
263
* it must stay alive until the LO is closed rather than until the
264
* current portal shuts down.
266
retval->snapshot = RegisterSnapshotOnOwner(GetActiveSnapshot(),
267
TopTransactionResourceOwner);
268
retval->flags = IFS_RDLOCK;
271
elog(ERROR, "invalid flags: %d", flags);
273
/* Can't use LargeObjectExists here because it always uses SnapshotNow */
274
if (!myLargeObjectExists(lobjId, retval->snapshot))
276
(errcode(ERRCODE_UNDEFINED_OBJECT),
277
errmsg("large object %u does not exist", lobjId)));
283
* Closes a large object descriptor previously made by inv_open(), and
284
* releases the long-term memory used by it.
287
inv_close(LargeObjectDesc *obj_desc)
289
Assert(PointerIsValid(obj_desc));
291
if (obj_desc->snapshot != SnapshotNow)
292
UnregisterSnapshotFromOwner(obj_desc->snapshot,
293
TopTransactionResourceOwner);
299
* Destroys an existing large object (not to be confused with a descriptor!)
301
* returns -1 if failed
306
ObjectAddress object;
309
* Delete any comments and dependencies on the large object
311
object.classId = LargeObjectRelationId;
312
object.objectId = lobjId;
313
object.objectSubId = 0;
314
performDeletion(&object, DROP_CASCADE);
317
* Advance command counter so that tuple removal will be seen by later
318
* large-object operations in this transaction.
320
CommandCounterIncrement();
326
* Determine size of a large object
328
* NOTE: LOs can contain gaps, just like Unix files. We actually return
329
* the offset of the last byte + 1.
332
inv_getsize(LargeObjectDesc *obj_desc)
339
Assert(PointerIsValid(obj_desc));
343
ScanKeyInit(&skey[0],
344
Anum_pg_largeobject_loid,
345
BTEqualStrategyNumber, F_OIDEQ,
346
ObjectIdGetDatum(obj_desc->id));
348
sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
349
obj_desc->snapshot, 1, skey);
352
* Because the pg_largeobject index is on both loid and pageno, but we
353
* constrain only loid, a backwards scan should visit all pages of the
354
* large object in reverse pageno order. So, it's sufficient to examine
355
* the first valid tuple (== last valid page).
357
tuple = systable_getnext_ordered(sd, BackwardScanDirection);
358
if (HeapTupleIsValid(tuple))
360
Form_pg_largeobject data;
364
if (HeapTupleHasNulls(tuple)) /* paranoia */
365
elog(ERROR, "null field found in pg_largeobject");
366
data = (Form_pg_largeobject) GETSTRUCT(tuple);
367
datafield = &(data->data); /* see note at top of file */
369
if (VARATT_IS_EXTENDED(datafield))
371
datafield = (bytea *)
372
heap_tuple_untoast_attr((struct varlena *) datafield);
375
lastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield);
380
systable_endscan_ordered(sd);
386
inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
388
Assert(PointerIsValid(obj_desc));
394
elog(ERROR, "invalid seek offset: %d", offset);
395
obj_desc->offset = offset;
398
if (offset < 0 && obj_desc->offset < ((uint32) (-offset)))
399
elog(ERROR, "invalid seek offset: %d", offset);
400
obj_desc->offset += offset;
404
uint32 size = inv_getsize(obj_desc);
406
if (offset < 0 && size < ((uint32) (-offset)))
407
elog(ERROR, "invalid seek offset: %d", offset);
408
obj_desc->offset = size + offset;
412
elog(ERROR, "invalid whence: %d", whence);
414
return obj_desc->offset;
418
inv_tell(LargeObjectDesc *obj_desc)
420
Assert(PointerIsValid(obj_desc));
422
return obj_desc->offset;
426
inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
432
int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
438
Assert(PointerIsValid(obj_desc));
446
ScanKeyInit(&skey[0],
447
Anum_pg_largeobject_loid,
448
BTEqualStrategyNumber, F_OIDEQ,
449
ObjectIdGetDatum(obj_desc->id));
451
ScanKeyInit(&skey[1],
452
Anum_pg_largeobject_pageno,
453
BTGreaterEqualStrategyNumber, F_INT4GE,
454
Int32GetDatum(pageno));
456
sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
457
obj_desc->snapshot, 2, skey);
459
while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
461
Form_pg_largeobject data;
465
if (HeapTupleHasNulls(tuple)) /* paranoia */
466
elog(ERROR, "null field found in pg_largeobject");
467
data = (Form_pg_largeobject) GETSTRUCT(tuple);
470
* We expect the indexscan will deliver pages in order. However,
471
* there may be missing pages if the LO contains unwritten "holes". We
472
* want missing sections to read out as zeroes.
474
pageoff = ((uint32) data->pageno) * LOBLKSIZE;
475
if (pageoff > obj_desc->offset)
477
n = pageoff - obj_desc->offset;
478
n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
479
MemSet(buf + nread, 0, n);
481
obj_desc->offset += n;
486
Assert(obj_desc->offset >= pageoff);
487
off = (int) (obj_desc->offset - pageoff);
488
Assert(off >= 0 && off < LOBLKSIZE);
490
datafield = &(data->data); /* see note at top of file */
492
if (VARATT_IS_EXTENDED(datafield))
494
datafield = (bytea *)
495
heap_tuple_untoast_attr((struct varlena *) datafield);
498
len = getbytealen(datafield);
502
n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
503
memcpy(buf + nread, VARDATA(datafield) + off, n);
505
obj_desc->offset += n;
515
systable_endscan_ordered(sd);
521
inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
527
int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
531
Form_pg_largeobject olddata;
538
char data[LOBLKSIZE]; /* make struct big enough */
539
int32 align_it; /* ensure struct is aligned well enough */
541
char *workb = VARDATA(&workbuf.hdr);
543
Datum values[Natts_pg_largeobject];
544
bool nulls[Natts_pg_largeobject];
545
bool replace[Natts_pg_largeobject];
546
CatalogIndexState indstate;
548
Assert(PointerIsValid(obj_desc));
551
/* enforce writability because snapshot is probably wrong otherwise */
552
if ((obj_desc->flags & IFS_WRLOCK) == 0)
554
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
555
errmsg("large object %u was not opened for writing",
558
/* check existence of the target largeobject */
559
if (!LargeObjectExists(obj_desc->id))
561
(errcode(ERRCODE_UNDEFINED_OBJECT),
562
errmsg("large object %u was already dropped", obj_desc->id)));
569
indstate = CatalogOpenIndexes(lo_heap_r);
571
ScanKeyInit(&skey[0],
572
Anum_pg_largeobject_loid,
573
BTEqualStrategyNumber, F_OIDEQ,
574
ObjectIdGetDatum(obj_desc->id));
576
ScanKeyInit(&skey[1],
577
Anum_pg_largeobject_pageno,
578
BTGreaterEqualStrategyNumber, F_INT4GE,
579
Int32GetDatum(pageno));
581
sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
582
obj_desc->snapshot, 2, skey);
588
while (nwritten < nbytes)
591
* If possible, get next pre-existing page of the LO. We expect the
592
* indexscan will deliver these in order --- but there may be holes.
596
if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
598
if (HeapTupleHasNulls(oldtuple)) /* paranoia */
599
elog(ERROR, "null field found in pg_largeobject");
600
olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
601
Assert(olddata->pageno >= pageno);
603
neednextpage = false;
607
* If we have a pre-existing page, see if it is the page we want to
608
* write, or a later one.
610
if (olddata != NULL && olddata->pageno == pageno)
613
* Update an existing page with fresh data.
615
* First, load old data into workbuf
617
datafield = &(olddata->data); /* see note at top of file */
619
if (VARATT_IS_EXTENDED(datafield))
621
datafield = (bytea *)
622
heap_tuple_untoast_attr((struct varlena *) datafield);
625
len = getbytealen(datafield);
626
Assert(len <= LOBLKSIZE);
627
memcpy(workb, VARDATA(datafield), len);
634
off = (int) (obj_desc->offset % LOBLKSIZE);
636
MemSet(workb + len, 0, off - len);
639
* Insert appropriate portion of new data
642
n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
643
memcpy(workb + off, buf + nwritten, n);
645
obj_desc->offset += n;
647
/* compute valid length of new page */
648
len = (len >= off) ? len : off;
649
SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
652
* Form and insert updated tuple
654
memset(values, 0, sizeof(values));
655
memset(nulls, false, sizeof(nulls));
656
memset(replace, false, sizeof(replace));
657
values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
658
replace[Anum_pg_largeobject_data - 1] = true;
659
newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
660
values, nulls, replace);
661
simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
662
CatalogIndexInsert(indstate, newtup);
663
heap_freetuple(newtup);
666
* We're done with this old page.
675
* Write a brand new page.
677
* First, fill any hole
679
off = (int) (obj_desc->offset % LOBLKSIZE);
681
MemSet(workb, 0, off);
684
* Insert appropriate portion of new data
687
n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
688
memcpy(workb + off, buf + nwritten, n);
690
obj_desc->offset += n;
691
/* compute valid length of new page */
693
SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
696
* Form and insert updated tuple
698
memset(values, 0, sizeof(values));
699
memset(nulls, false, sizeof(nulls));
700
values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
701
values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
702
values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
703
newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
704
simple_heap_insert(lo_heap_r, newtup);
705
CatalogIndexInsert(indstate, newtup);
706
heap_freetuple(newtup);
711
systable_endscan_ordered(sd);
713
CatalogCloseIndexes(indstate);
716
* Advance command counter so that my tuple updates will be seen by later
717
* large-object operations in this transaction.
719
CommandCounterIncrement();
725
inv_truncate(LargeObjectDesc *obj_desc, int len)
727
int32 pageno = (int32) (len / LOBLKSIZE);
732
Form_pg_largeobject olddata;
736
char data[LOBLKSIZE]; /* make struct big enough */
737
int32 align_it; /* ensure struct is aligned well enough */
739
char *workb = VARDATA(&workbuf.hdr);
741
Datum values[Natts_pg_largeobject];
742
bool nulls[Natts_pg_largeobject];
743
bool replace[Natts_pg_largeobject];
744
CatalogIndexState indstate;
746
Assert(PointerIsValid(obj_desc));
748
/* enforce writability because snapshot is probably wrong otherwise */
749
if ((obj_desc->flags & IFS_WRLOCK) == 0)
751
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
752
errmsg("large object %u was not opened for writing",
755
/* check existence of the target largeobject */
756
if (!LargeObjectExists(obj_desc->id))
758
(errcode(ERRCODE_UNDEFINED_OBJECT),
759
errmsg("large object %u was already dropped", obj_desc->id)));
763
indstate = CatalogOpenIndexes(lo_heap_r);
766
* Set up to find all pages with desired loid and pageno >= target
768
ScanKeyInit(&skey[0],
769
Anum_pg_largeobject_loid,
770
BTEqualStrategyNumber, F_OIDEQ,
771
ObjectIdGetDatum(obj_desc->id));
773
ScanKeyInit(&skey[1],
774
Anum_pg_largeobject_pageno,
775
BTGreaterEqualStrategyNumber, F_INT4GE,
776
Int32GetDatum(pageno));
778
sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
779
obj_desc->snapshot, 2, skey);
782
* If possible, get the page the truncation point is in. The truncation
783
* point may be beyond the end of the LO or in a hole.
786
if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
788
if (HeapTupleHasNulls(oldtuple)) /* paranoia */
789
elog(ERROR, "null field found in pg_largeobject");
790
olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
791
Assert(olddata->pageno >= pageno);
795
* If we found the page of the truncation point we need to truncate the
796
* data in it. Otherwise if we're in a hole, we need to create a page to
797
* mark the end of data.
799
if (olddata != NULL && olddata->pageno == pageno)
801
/* First, load old data into workbuf */
802
bytea *datafield = &(olddata->data); /* see note at top of
804
bool pfreeit = false;
807
if (VARATT_IS_EXTENDED(datafield))
809
datafield = (bytea *)
810
heap_tuple_untoast_attr((struct varlena *) datafield);
813
pagelen = getbytealen(datafield);
814
Assert(pagelen <= LOBLKSIZE);
815
memcpy(workb, VARDATA(datafield), pagelen);
822
off = len % LOBLKSIZE;
824
MemSet(workb + pagelen, 0, off - pagelen);
826
/* compute length of new page */
827
SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
830
* Form and insert updated tuple
832
memset(values, 0, sizeof(values));
833
memset(nulls, false, sizeof(nulls));
834
memset(replace, false, sizeof(replace));
835
values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
836
replace[Anum_pg_largeobject_data - 1] = true;
837
newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
838
values, nulls, replace);
839
simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
840
CatalogIndexInsert(indstate, newtup);
841
heap_freetuple(newtup);
846
* If the first page we found was after the truncation point, we're in
847
* a hole that we'll fill, but we need to delete the later page
848
* because the loop below won't visit it again.
852
Assert(olddata->pageno > pageno);
853
simple_heap_delete(lo_heap_r, &oldtuple->t_self);
857
* Write a brand new page.
859
* Fill the hole up to the truncation point
861
off = len % LOBLKSIZE;
863
MemSet(workb, 0, off);
865
/* compute length of new page */
866
SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
869
* Form and insert new tuple
871
memset(values, 0, sizeof(values));
872
memset(nulls, false, sizeof(nulls));
873
values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
874
values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
875
values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
876
newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
877
simple_heap_insert(lo_heap_r, newtup);
878
CatalogIndexInsert(indstate, newtup);
879
heap_freetuple(newtup);
883
* Delete any pages after the truncation point. If the initial search
884
* didn't find a page, then of course there's nothing more to do.
888
while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
890
simple_heap_delete(lo_heap_r, &oldtuple->t_self);
894
systable_endscan_ordered(sd);
896
CatalogCloseIndexes(indstate);
899
* Advance command counter so that tuple updates will be seen by later
900
* large-object operations in this transaction.
902
CommandCounterIncrement();