1
/*-------------------------------------------------------------------------
4
* routines for manipulating inversion fs large objects. This file
5
* contains the user-level large object application interface routines.
7
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
8
* Portions Copyright (c) 1994, Regents of the University of California
12
* $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.108 2004-12-31 22:00:59 pgsql Exp $
14
*-------------------------------------------------------------------------
18
#include "access/genam.h"
19
#include "access/heapam.h"
20
#include "access/tuptoaster.h"
21
#include "catalog/catalog.h"
22
#include "catalog/catname.h"
23
#include "catalog/indexing.h"
24
#include "catalog/pg_largeobject.h"
25
#include "commands/comment.h"
26
#include "libpq/libpq-fs.h"
27
#include "storage/large_object.h"
28
#include "utils/fmgroids.h"
29
#include "utils/lsyscache.h"
30
#include "utils/resowner.h"
34
* All accesses to pg_largeobject and its index make use of a single Relation
35
* reference, so that we only need to open pg_relation once per transaction.
36
* To avoid problems when the first such reference occurs inside a
37
* subtransaction, we execute a slightly klugy maneuver to assign ownership of
38
* the Relation reference to TopTransactionResourceOwner.
40
static Relation lo_heap_r = NULL;
41
static Relation lo_index_r = NULL;
45
* Open pg_largeobject and its index, if not already done in current xact
48
open_lo_relation(void)
50
ResourceOwner currentOwner;
52
if (lo_heap_r && lo_index_r)
53
return; /* already open in current xact */
55
/* Arrange for the top xact to own these relation references */
56
currentOwner = CurrentResourceOwner;
59
CurrentResourceOwner = TopTransactionResourceOwner;
61
/* Use RowExclusiveLock since we might either read or write */
62
if (lo_heap_r == NULL)
63
lo_heap_r = heap_openr(LargeObjectRelationName, RowExclusiveLock);
64
if (lo_index_r == NULL)
65
lo_index_r = index_openr(LargeObjectLOidPNIndex);
69
/* Ensure CurrentResourceOwner is restored on error */
70
CurrentResourceOwner = currentOwner;
74
CurrentResourceOwner = currentOwner;
78
* Clean up at main transaction end
81
close_lo_relation(bool isCommit)
83
if (lo_heap_r || lo_index_r)
86
* Only bother to close if committing; else abort cleanup will
91
ResourceOwner currentOwner;
93
currentOwner = CurrentResourceOwner;
96
CurrentResourceOwner = TopTransactionResourceOwner;
99
index_close(lo_index_r);
101
heap_close(lo_heap_r, NoLock);
105
/* Ensure CurrentResourceOwner is restored on error */
106
CurrentResourceOwner = currentOwner;
110
CurrentResourceOwner = currentOwner;
119
getbytealen(bytea *data)
121
Assert(!VARATT_IS_EXTENDED(data));
122
if (VARSIZE(data) < VARHDRSZ)
123
elog(ERROR, "invalid VARSIZE(data)");
124
return (VARSIZE(data) - VARHDRSZ);
129
* inv_create -- create a new large object.
135
* large object descriptor, appropriately filled in.
138
inv_create(int flags)
141
LargeObjectDesc *retval;
144
* Allocate an OID to be the LO's identifier.
148
/* Check for duplicate (shouldn't happen) */
149
if (LargeObjectExists(file_oid))
150
elog(ERROR, "large object %u already exists", file_oid);
153
* Create the LO by writing an empty first page for it in
156
LargeObjectCreate(file_oid);
159
* Advance command counter so that new tuple will be seen by later
160
* large-object operations in this transaction.
162
CommandCounterIncrement();
165
* Prepare LargeObjectDesc data structure for accessing LO
167
retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
169
retval->id = file_oid;
170
retval->subid = GetCurrentSubTransactionId();
173
if (flags & INV_WRITE)
174
retval->flags = IFS_WRLOCK | IFS_RDLOCK;
175
else if (flags & INV_READ)
176
retval->flags = IFS_RDLOCK;
178
elog(ERROR, "invalid flags: %d", flags);
184
* inv_open -- access an existing large object.
187
* large object descriptor, appropriately filled in.
190
inv_open(Oid lobjId, int flags)
192
LargeObjectDesc *retval;
194
if (!LargeObjectExists(lobjId))
196
(errcode(ERRCODE_UNDEFINED_OBJECT),
197
errmsg("large object %u does not exist", lobjId)));
199
retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
202
retval->subid = GetCurrentSubTransactionId();
205
if (flags & INV_WRITE)
206
retval->flags = IFS_WRLOCK | IFS_RDLOCK;
207
else if (flags & INV_READ)
208
retval->flags = IFS_RDLOCK;
210
elog(ERROR, "invalid flags: %d", flags);
216
* Closes an existing large object descriptor.
219
inv_close(LargeObjectDesc *obj_desc)
221
Assert(PointerIsValid(obj_desc));
226
* Destroys an existing large object (not to be confused with a descriptor!)
228
* returns -1 if failed
235
LargeObjectDrop(lobjId);
237
/* pg_largeobject doesn't have a hard-coded OID, so must look it up */
238
classoid = get_system_catalog_relid(LargeObjectRelationName);
240
/* Delete any comments on the large object */
241
DeleteComments(lobjId, classoid, 0);
244
* Advance command counter so that tuple removal will be seen by later
245
* large-object operations in this transaction.
247
CommandCounterIncrement();
253
* Determine size of a large object
255
* NOTE: LOs can contain gaps, just like Unix files. We actually return
256
* the offset of the last byte + 1.
259
inv_getsize(LargeObjectDesc *obj_desc)
267
Assert(PointerIsValid(obj_desc));
271
ScanKeyInit(&skey[0],
272
Anum_pg_largeobject_loid,
273
BTEqualStrategyNumber, F_OIDEQ,
274
ObjectIdGetDatum(obj_desc->id));
276
sd = index_beginscan(lo_heap_r, lo_index_r,
277
SnapshotNow, 1, skey);
280
* Because the pg_largeobject index is on both loid and pageno, but we
281
* constrain only loid, a backwards scan should visit all pages of the
282
* large object in reverse pageno order. So, it's sufficient to
283
* examine the first valid tuple (== last valid page).
285
while ((tuple = index_getnext(sd, BackwardScanDirection)) != NULL)
287
Form_pg_largeobject data;
292
data = (Form_pg_largeobject) GETSTRUCT(tuple);
293
datafield = &(data->data);
295
if (VARATT_IS_EXTENDED(datafield))
297
datafield = (bytea *)
298
heap_tuple_untoast_attr((varattrib *) datafield);
301
lastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield);
311
(errcode(ERRCODE_UNDEFINED_OBJECT),
312
errmsg("large object %u does not exist", obj_desc->id)));
317
inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
319
Assert(PointerIsValid(obj_desc));
325
elog(ERROR, "invalid seek offset: %d", offset);
326
obj_desc->offset = offset;
329
if (offset < 0 && obj_desc->offset < ((uint32) (-offset)))
330
elog(ERROR, "invalid seek offset: %d", offset);
331
obj_desc->offset += offset;
335
uint32 size = inv_getsize(obj_desc);
337
if (offset < 0 && size < ((uint32) (-offset)))
338
elog(ERROR, "invalid seek offset: %d", offset);
339
obj_desc->offset = size + offset;
343
elog(ERROR, "invalid whence: %d", whence);
345
return obj_desc->offset;
349
inv_tell(LargeObjectDesc *obj_desc)
351
Assert(PointerIsValid(obj_desc));
353
return obj_desc->offset;
357
inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
363
int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
369
Assert(PointerIsValid(obj_desc));
377
ScanKeyInit(&skey[0],
378
Anum_pg_largeobject_loid,
379
BTEqualStrategyNumber, F_OIDEQ,
380
ObjectIdGetDatum(obj_desc->id));
382
ScanKeyInit(&skey[1],
383
Anum_pg_largeobject_pageno,
384
BTGreaterEqualStrategyNumber, F_INT4GE,
385
Int32GetDatum(pageno));
387
sd = index_beginscan(lo_heap_r, lo_index_r,
388
SnapshotNow, 2, skey);
390
while ((tuple = index_getnext(sd, ForwardScanDirection)) != NULL)
392
Form_pg_largeobject data;
396
data = (Form_pg_largeobject) GETSTRUCT(tuple);
399
* We assume the indexscan will deliver pages in order. However,
400
* there may be missing pages if the LO contains unwritten
401
* "holes". We want missing sections to read out as zeroes.
403
pageoff = ((uint32) data->pageno) * LOBLKSIZE;
404
if (pageoff > obj_desc->offset)
406
n = pageoff - obj_desc->offset;
407
n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
408
MemSet(buf + nread, 0, n);
410
obj_desc->offset += n;
415
Assert(obj_desc->offset >= pageoff);
416
off = (int) (obj_desc->offset - pageoff);
417
Assert(off >= 0 && off < LOBLKSIZE);
419
datafield = &(data->data);
421
if (VARATT_IS_EXTENDED(datafield))
423
datafield = (bytea *)
424
heap_tuple_untoast_attr((varattrib *) datafield);
427
len = getbytealen(datafield);
431
n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
432
memcpy(buf + nread, VARDATA(datafield) + off, n);
434
obj_desc->offset += n;
450
inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
456
int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
460
Form_pg_largeobject olddata;
467
char data[LOBLKSIZE];
469
char *workb = VARATT_DATA(&workbuf.hdr);
471
Datum values[Natts_pg_largeobject];
472
char nulls[Natts_pg_largeobject];
473
char replace[Natts_pg_largeobject];
474
CatalogIndexState indstate;
476
Assert(PointerIsValid(obj_desc));
484
indstate = CatalogOpenIndexes(lo_heap_r);
486
ScanKeyInit(&skey[0],
487
Anum_pg_largeobject_loid,
488
BTEqualStrategyNumber, F_OIDEQ,
489
ObjectIdGetDatum(obj_desc->id));
491
ScanKeyInit(&skey[1],
492
Anum_pg_largeobject_pageno,
493
BTGreaterEqualStrategyNumber, F_INT4GE,
494
Int32GetDatum(pageno));
496
sd = index_beginscan(lo_heap_r, lo_index_r,
497
SnapshotNow, 2, skey);
503
while (nwritten < nbytes)
506
* If possible, get next pre-existing page of the LO. We assume
507
* the indexscan will deliver these in order --- but there may be
512
if ((oldtuple = index_getnext(sd, ForwardScanDirection)) != NULL)
514
olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
515
Assert(olddata->pageno >= pageno);
517
neednextpage = false;
521
* If we have a pre-existing page, see if it is the page we want
522
* to write, or a later one.
524
if (olddata != NULL && olddata->pageno == pageno)
527
* Update an existing page with fresh data.
529
* First, load old data into workbuf
531
datafield = &(olddata->data);
533
if (VARATT_IS_EXTENDED(datafield))
535
datafield = (bytea *)
536
heap_tuple_untoast_attr((varattrib *) datafield);
539
len = getbytealen(datafield);
540
Assert(len <= LOBLKSIZE);
541
memcpy(workb, VARDATA(datafield), len);
548
off = (int) (obj_desc->offset % LOBLKSIZE);
550
MemSet(workb + len, 0, off - len);
553
* Insert appropriate portion of new data
556
n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
557
memcpy(workb + off, buf + nwritten, n);
559
obj_desc->offset += n;
561
/* compute valid length of new page */
562
len = (len >= off) ? len : off;
563
VARATT_SIZEP(&workbuf.hdr) = len + VARHDRSZ;
566
* Form and insert updated tuple
568
memset(values, 0, sizeof(values));
569
memset(nulls, ' ', sizeof(nulls));
570
memset(replace, ' ', sizeof(replace));
571
values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
572
replace[Anum_pg_largeobject_data - 1] = 'r';
573
newtup = heap_modifytuple(oldtuple, lo_heap_r,
574
values, nulls, replace);
575
simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
576
CatalogIndexInsert(indstate, newtup);
577
heap_freetuple(newtup);
580
* We're done with this old page.
589
* Write a brand new page.
591
* First, fill any hole
593
off = (int) (obj_desc->offset % LOBLKSIZE);
595
MemSet(workb, 0, off);
598
* Insert appropriate portion of new data
601
n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
602
memcpy(workb + off, buf + nwritten, n);
604
obj_desc->offset += n;
605
/* compute valid length of new page */
607
VARATT_SIZEP(&workbuf.hdr) = len + VARHDRSZ;
610
* Form and insert updated tuple
612
memset(values, 0, sizeof(values));
613
memset(nulls, ' ', sizeof(nulls));
614
values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
615
values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
616
values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
617
newtup = heap_formtuple(lo_heap_r->rd_att, values, nulls);
618
simple_heap_insert(lo_heap_r, newtup);
619
CatalogIndexInsert(indstate, newtup);
620
heap_freetuple(newtup);
627
CatalogCloseIndexes(indstate);
630
* Advance command counter so that my tuple updates will be seen by
631
* later large-object operations in this transaction.
633
CommandCounterIncrement();