diff options
Diffstat (limited to 'src/backend/storage/large_object')
-rw-r--r-- | src/backend/storage/large_object/Makefile | 18 | ||||
-rw-r--r-- | src/backend/storage/large_object/inv_api.c | 955 |
2 files changed, 973 insertions, 0 deletions
diff --git a/src/backend/storage/large_object/Makefile b/src/backend/storage/large_object/Makefile new file mode 100644 index 0000000..8a6bc36 --- /dev/null +++ b/src/backend/storage/large_object/Makefile @@ -0,0 +1,18 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for storage/large_object +# +# IDENTIFICATION +# src/backend/storage/large_object/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/storage/large_object +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + inv_api.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/large_object/inv_api.c b/src/backend/storage/large_object/inv_api.c new file mode 100644 index 0000000..c98606a --- /dev/null +++ b/src/backend/storage/large_object/inv_api.c @@ -0,0 +1,955 @@ +/*------------------------------------------------------------------------- + * + * inv_api.c + * routines for manipulating inversion fs large objects. This file + * contains the user-level large object application interface routines. + * + * + * Note: we access pg_largeobject.data using its C struct declaration. + * This is safe because it immediately follows pageno which is an int4 field, + * and therefore the data field will always be 4-byte aligned, even if it + * is in the short 1-byte-header format. We have to detoast it since it's + * quite likely to be in compressed or short format. We also need to check + * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL. + * + * Note: many of these routines leak memory in CurrentMemoryContext, as indeed + * does most of the backend code. We expect that CurrentMemoryContext will + * be a short-lived context. Data that must persist across function calls + * is kept either in CacheMemoryContext (the Relation structs) or in the + * memory context given to inv_open (for LargeObjectDesc structs). + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/storage/large_object/inv_api.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <limits.h> + +#include "access/detoast.h" +#include "access/genam.h" +#include "access/htup_details.h" +#include "access/sysattr.h" +#include "access/table.h" +#include "access/xact.h" +#include "catalog/dependency.h" +#include "catalog/indexing.h" +#include "catalog/objectaccess.h" +#include "catalog/pg_largeobject.h" +#include "catalog/pg_largeobject_metadata.h" +#include "libpq/libpq-fs.h" +#include "miscadmin.h" +#include "storage/large_object.h" +#include "utils/acl.h" +#include "utils/fmgroids.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" + + +/* + * GUC: backwards-compatibility flag to suppress LO permission checks + */ +bool lo_compat_privileges; + +/* + * All accesses to pg_largeobject and its index make use of a single Relation + * reference, so that we only need to open pg_relation once per transaction. + * To avoid problems when the first such reference occurs inside a + * subtransaction, we execute a slightly klugy maneuver to assign ownership of + * the Relation reference to TopTransactionResourceOwner. + */ +static Relation lo_heap_r = NULL; +static Relation lo_index_r = NULL; + + +/* + * Open pg_largeobject and its index, if not already done in current xact + */ +static void +open_lo_relation(void) +{ + ResourceOwner currentOwner; + + if (lo_heap_r && lo_index_r) + return; /* already open in current xact */ + + /* Arrange for the top xact to own these relation references */ + currentOwner = CurrentResourceOwner; + CurrentResourceOwner = TopTransactionResourceOwner; + + /* Use RowExclusiveLock since we might either read or write */ + if (lo_heap_r == NULL) + lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock); + if (lo_index_r == NULL) + lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock); + + CurrentResourceOwner = currentOwner; +} + +/* + * Clean up at main transaction end + */ +void +close_lo_relation(bool isCommit) +{ + if (lo_heap_r || lo_index_r) + { + /* + * Only bother to close if committing; else abort cleanup will handle + * it + */ + if (isCommit) + { + ResourceOwner currentOwner; + + currentOwner = CurrentResourceOwner; + CurrentResourceOwner = TopTransactionResourceOwner; + + if (lo_index_r) + index_close(lo_index_r, NoLock); + if (lo_heap_r) + table_close(lo_heap_r, NoLock); + + CurrentResourceOwner = currentOwner; + } + lo_heap_r = NULL; + lo_index_r = NULL; + } +} + + +/* + * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to + * read with can be specified. + */ +static bool +myLargeObjectExists(Oid loid, Snapshot snapshot) +{ + Relation pg_lo_meta; + ScanKeyData skey[1]; + SysScanDesc sd; + HeapTuple tuple; + bool retval = false; + + ScanKeyInit(&skey[0], + Anum_pg_largeobject_metadata_oid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(loid)); + + pg_lo_meta = table_open(LargeObjectMetadataRelationId, + AccessShareLock); + + sd = systable_beginscan(pg_lo_meta, + LargeObjectMetadataOidIndexId, true, + snapshot, 1, skey); + + tuple = systable_getnext(sd); + if (HeapTupleIsValid(tuple)) + retval = true; + + systable_endscan(sd); + + table_close(pg_lo_meta, AccessShareLock); + + return retval; +} + + +/* + * Extract data field from a pg_largeobject tuple, detoasting if needed + * and verifying that the length is sane. Returns data pointer (a bytea *), + * data length, and an indication of whether to pfree the data pointer. + */ +static void +getdatafield(Form_pg_largeobject tuple, + bytea **pdatafield, + int *plen, + bool *pfreeit) +{ + bytea *datafield; + int len; + bool freeit; + + datafield = &(tuple->data); /* see note at top of file */ + freeit = false; + if (VARATT_IS_EXTENDED(datafield)) + { + datafield = (bytea *) + detoast_attr((struct varlena *) datafield); + freeit = true; + } + len = VARSIZE(datafield) - VARHDRSZ; + if (len < 0 || len > LOBLKSIZE) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d", + tuple->loid, tuple->pageno, len))); + *pdatafield = datafield; + *plen = len; + *pfreeit = freeit; +} + + +/* + * inv_create -- create a new large object + * + * Arguments: + * lobjId - OID to use for new large object, or InvalidOid to pick one + * + * Returns: + * OID of new object + * + * If lobjId is not InvalidOid, then an error occurs if the OID is already + * in use. + */ +Oid +inv_create(Oid lobjId) +{ + Oid lobjId_new; + + /* + * Create a new largeobject with empty data pages + */ + lobjId_new = LargeObjectCreate(lobjId); + + /* + * dependency on the owner of largeobject + * + * The reason why we use LargeObjectRelationId instead of + * LargeObjectMetadataRelationId here is to provide backward compatibility + * to the applications which utilize a knowledge about internal layout of + * system catalogs. OID of pg_largeobject_metadata and loid of + * pg_largeobject are same value, so there are no actual differences here. + */ + recordDependencyOnOwner(LargeObjectRelationId, + lobjId_new, GetUserId()); + + /* Post creation hook for new large object */ + InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0); + + /* + * Advance command counter to make new tuple visible to later operations. + */ + CommandCounterIncrement(); + + return lobjId_new; +} + +/* + * inv_open -- access an existing large object. + * + * Returns a large object descriptor, appropriately filled in. + * The descriptor and subsidiary data are allocated in the specified + * memory context, which must be suitably long-lived for the caller's + * purposes. If the returned descriptor has a snapshot associated + * with it, the caller must ensure that it also lives long enough, + * e.g. by calling RegisterSnapshotOnOwner + */ +LargeObjectDesc * +inv_open(Oid lobjId, int flags, MemoryContext mcxt) +{ + LargeObjectDesc *retval; + Snapshot snapshot = NULL; + int descflags = 0; + + /* + * Historically, no difference is made between (INV_WRITE) and (INV_WRITE + * | INV_READ), the caller being allowed to read the large object + * descriptor in either case. + */ + if (flags & INV_WRITE) + descflags |= IFS_WRLOCK | IFS_RDLOCK; + if (flags & INV_READ) + descflags |= IFS_RDLOCK; + + if (descflags == 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid flags for opening a large object: %d", + flags))); + + /* Get snapshot. If write is requested, use an instantaneous snapshot. */ + if (descflags & IFS_WRLOCK) + snapshot = NULL; + else + snapshot = GetActiveSnapshot(); + + /* Can't use LargeObjectExists here because we need to specify snapshot */ + if (!myLargeObjectExists(lobjId, snapshot)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("large object %u does not exist", lobjId))); + + /* Apply permission checks, again specifying snapshot */ + if ((descflags & IFS_RDLOCK) != 0) + { + if (!lo_compat_privileges && + pg_largeobject_aclcheck_snapshot(lobjId, + GetUserId(), + ACL_SELECT, + snapshot) != ACLCHECK_OK) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied for large object %u", + lobjId))); + } + if ((descflags & IFS_WRLOCK) != 0) + { + if (!lo_compat_privileges && + pg_largeobject_aclcheck_snapshot(lobjId, + GetUserId(), + ACL_UPDATE, + snapshot) != ACLCHECK_OK) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied for large object %u", + lobjId))); + } + + /* OK to create a descriptor */ + retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt, + sizeof(LargeObjectDesc)); + retval->id = lobjId; + retval->offset = 0; + retval->flags = descflags; + + /* caller sets if needed, not used by the functions in this file */ + retval->subid = InvalidSubTransactionId; + + /* + * The snapshot (if any) is just the currently active snapshot. The + * caller will replace it with a longer-lived copy if needed. + */ + retval->snapshot = snapshot; + + return retval; +} + +/* + * Closes a large object descriptor previously made by inv_open(), and + * releases the long-term memory used by it. + */ +void +inv_close(LargeObjectDesc *obj_desc) +{ + Assert(PointerIsValid(obj_desc)); + pfree(obj_desc); +} + +/* + * Destroys an existing large object (not to be confused with a descriptor!) + * + * Note we expect caller to have done any required permissions check. + */ +int +inv_drop(Oid lobjId) +{ + ObjectAddress object; + + /* + * Delete any comments and dependencies on the large object + */ + object.classId = LargeObjectRelationId; + object.objectId = lobjId; + object.objectSubId = 0; + performDeletion(&object, DROP_CASCADE, 0); + + /* + * Advance command counter so that tuple removal will be seen by later + * large-object operations in this transaction. + */ + CommandCounterIncrement(); + + /* For historical reasons, we always return 1 on success. */ + return 1; +} + +/* + * Determine size of a large object + * + * NOTE: LOs can contain gaps, just like Unix files. We actually return + * the offset of the last byte + 1. + */ +static uint64 +inv_getsize(LargeObjectDesc *obj_desc) +{ + uint64 lastbyte = 0; + ScanKeyData skey[1]; + SysScanDesc sd; + HeapTuple tuple; + + Assert(PointerIsValid(obj_desc)); + + open_lo_relation(); + + ScanKeyInit(&skey[0], + Anum_pg_largeobject_loid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(obj_desc->id)); + + sd = systable_beginscan_ordered(lo_heap_r, lo_index_r, + obj_desc->snapshot, 1, skey); + + /* + * Because the pg_largeobject index is on both loid and pageno, but we + * constrain only loid, a backwards scan should visit all pages of the + * large object in reverse pageno order. So, it's sufficient to examine + * the first valid tuple (== last valid page). + */ + tuple = systable_getnext_ordered(sd, BackwardScanDirection); + if (HeapTupleIsValid(tuple)) + { + Form_pg_largeobject data; + bytea *datafield; + int len; + bool pfreeit; + + if (HeapTupleHasNulls(tuple)) /* paranoia */ + elog(ERROR, "null field found in pg_largeobject"); + data = (Form_pg_largeobject) GETSTRUCT(tuple); + getdatafield(data, &datafield, &len, &pfreeit); + lastbyte = (uint64) data->pageno * LOBLKSIZE + len; + if (pfreeit) + pfree(datafield); + } + + systable_endscan_ordered(sd); + + return lastbyte; +} + +int64 +inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence) +{ + int64 newoffset; + + Assert(PointerIsValid(obj_desc)); + + /* + * We allow seek/tell if you have either read or write permission, so no + * need for a permission check here. + */ + + /* + * Note: overflow in the additions is possible, but since we will reject + * negative results, we don't need any extra test for that. + */ + switch (whence) + { + case SEEK_SET: + newoffset = offset; + break; + case SEEK_CUR: + newoffset = obj_desc->offset + offset; + break; + case SEEK_END: + newoffset = inv_getsize(obj_desc) + offset; + break; + default: + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid whence setting: %d", whence))); + newoffset = 0; /* keep compiler quiet */ + break; + } + + /* + * use errmsg_internal here because we don't want to expose INT64_FORMAT + * in translatable strings; doing better is not worth the trouble + */ + if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg_internal("invalid large object seek target: " INT64_FORMAT, + newoffset))); + + obj_desc->offset = newoffset; + return newoffset; +} + +int64 +inv_tell(LargeObjectDesc *obj_desc) +{ + Assert(PointerIsValid(obj_desc)); + + /* + * We allow seek/tell if you have either read or write permission, so no + * need for a permission check here. + */ + + return obj_desc->offset; +} + +int +inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes) +{ + int nread = 0; + int64 n; + int64 off; + int len; + int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE); + uint64 pageoff; + ScanKeyData skey[2]; + SysScanDesc sd; + HeapTuple tuple; + + Assert(PointerIsValid(obj_desc)); + Assert(buf != NULL); + + if ((obj_desc->flags & IFS_RDLOCK) == 0) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied for large object %u", + obj_desc->id))); + + if (nbytes <= 0) + return 0; + + open_lo_relation(); + + ScanKeyInit(&skey[0], + Anum_pg_largeobject_loid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(obj_desc->id)); + + ScanKeyInit(&skey[1], + Anum_pg_largeobject_pageno, + BTGreaterEqualStrategyNumber, F_INT4GE, + Int32GetDatum(pageno)); + + sd = systable_beginscan_ordered(lo_heap_r, lo_index_r, + obj_desc->snapshot, 2, skey); + + while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL) + { + Form_pg_largeobject data; + bytea *datafield; + bool pfreeit; + + if (HeapTupleHasNulls(tuple)) /* paranoia */ + elog(ERROR, "null field found in pg_largeobject"); + data = (Form_pg_largeobject) GETSTRUCT(tuple); + + /* + * We expect the indexscan will deliver pages in order. However, + * there may be missing pages if the LO contains unwritten "holes". We + * want missing sections to read out as zeroes. + */ + pageoff = ((uint64) data->pageno) * LOBLKSIZE; + if (pageoff > obj_desc->offset) + { + n = pageoff - obj_desc->offset; + n = (n <= (nbytes - nread)) ? n : (nbytes - nread); + MemSet(buf + nread, 0, n); + nread += n; + obj_desc->offset += n; + } + + if (nread < nbytes) + { + Assert(obj_desc->offset >= pageoff); + off = (int) (obj_desc->offset - pageoff); + Assert(off >= 0 && off < LOBLKSIZE); + + getdatafield(data, &datafield, &len, &pfreeit); + if (len > off) + { + n = len - off; + n = (n <= (nbytes - nread)) ? n : (nbytes - nread); + memcpy(buf + nread, VARDATA(datafield) + off, n); + nread += n; + obj_desc->offset += n; + } + if (pfreeit) + pfree(datafield); + } + + if (nread >= nbytes) + break; + } + + systable_endscan_ordered(sd); + + return nread; +} + +int +inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes) +{ + int nwritten = 0; + int n; + int off; + int len; + int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE); + ScanKeyData skey[2]; + SysScanDesc sd; + HeapTuple oldtuple; + Form_pg_largeobject olddata; + bool neednextpage; + bytea *datafield; + bool pfreeit; + union + { + bytea hdr; + /* this is to make the union big enough for a LO data chunk: */ + char data[LOBLKSIZE + VARHDRSZ]; + /* ensure union is aligned well enough: */ + int32 align_it; + } workbuf; + char *workb = VARDATA(&workbuf.hdr); + HeapTuple newtup; + Datum values[Natts_pg_largeobject]; + bool nulls[Natts_pg_largeobject]; + bool replace[Natts_pg_largeobject]; + CatalogIndexState indstate; + + Assert(PointerIsValid(obj_desc)); + Assert(buf != NULL); + + /* enforce writability because snapshot is probably wrong otherwise */ + if ((obj_desc->flags & IFS_WRLOCK) == 0) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied for large object %u", + obj_desc->id))); + + if (nbytes <= 0) + return 0; + + /* this addition can't overflow because nbytes is only int32 */ + if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid large object write request size: %d", + nbytes))); + + open_lo_relation(); + + indstate = CatalogOpenIndexes(lo_heap_r); + + ScanKeyInit(&skey[0], + Anum_pg_largeobject_loid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(obj_desc->id)); + + ScanKeyInit(&skey[1], + Anum_pg_largeobject_pageno, + BTGreaterEqualStrategyNumber, F_INT4GE, + Int32GetDatum(pageno)); + + sd = systable_beginscan_ordered(lo_heap_r, lo_index_r, + obj_desc->snapshot, 2, skey); + + oldtuple = NULL; + olddata = NULL; + neednextpage = true; + + while (nwritten < nbytes) + { + /* + * If possible, get next pre-existing page of the LO. We expect the + * indexscan will deliver these in order --- but there may be holes. + */ + if (neednextpage) + { + if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL) + { + if (HeapTupleHasNulls(oldtuple)) /* paranoia */ + elog(ERROR, "null field found in pg_largeobject"); + olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple); + Assert(olddata->pageno >= pageno); + } + neednextpage = false; + } + + /* + * If we have a pre-existing page, see if it is the page we want to + * write, or a later one. + */ + if (olddata != NULL && olddata->pageno == pageno) + { + /* + * Update an existing page with fresh data. + * + * First, load old data into workbuf + */ + getdatafield(olddata, &datafield, &len, &pfreeit); + memcpy(workb, VARDATA(datafield), len); + if (pfreeit) + pfree(datafield); + + /* + * Fill any hole + */ + off = (int) (obj_desc->offset % LOBLKSIZE); + if (off > len) + MemSet(workb + len, 0, off - len); + + /* + * Insert appropriate portion of new data + */ + n = LOBLKSIZE - off; + n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten); + memcpy(workb + off, buf + nwritten, n); + nwritten += n; + obj_desc->offset += n; + off += n; + /* compute valid length of new page */ + len = (len >= off) ? len : off; + SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ); + + /* + * Form and insert updated tuple + */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replace, false, sizeof(replace)); + values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf); + replace[Anum_pg_largeobject_data - 1] = true; + newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r), + values, nulls, replace); + CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup, + indstate); + heap_freetuple(newtup); + + /* + * We're done with this old page. + */ + oldtuple = NULL; + olddata = NULL; + neednextpage = true; + } + else + { + /* + * Write a brand new page. + * + * First, fill any hole + */ + off = (int) (obj_desc->offset % LOBLKSIZE); + if (off > 0) + MemSet(workb, 0, off); + + /* + * Insert appropriate portion of new data + */ + n = LOBLKSIZE - off; + n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten); + memcpy(workb + off, buf + nwritten, n); + nwritten += n; + obj_desc->offset += n; + /* compute valid length of new page */ + len = off + n; + SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ); + + /* + * Form and insert updated tuple + */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id); + values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno); + values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf); + newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls); + CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate); + heap_freetuple(newtup); + } + pageno++; + } + + systable_endscan_ordered(sd); + + CatalogCloseIndexes(indstate); + + /* + * Advance command counter so that my tuple updates will be seen by later + * large-object operations in this transaction. + */ + CommandCounterIncrement(); + + return nwritten; +} + +void +inv_truncate(LargeObjectDesc *obj_desc, int64 len) +{ + int32 pageno = (int32) (len / LOBLKSIZE); + int32 off; + ScanKeyData skey[2]; + SysScanDesc sd; + HeapTuple oldtuple; + Form_pg_largeobject olddata; + union + { + bytea hdr; + /* this is to make the union big enough for a LO data chunk: */ + char data[LOBLKSIZE + VARHDRSZ]; + /* ensure union is aligned well enough: */ + int32 align_it; + } workbuf; + char *workb = VARDATA(&workbuf.hdr); + HeapTuple newtup; + Datum values[Natts_pg_largeobject]; + bool nulls[Natts_pg_largeobject]; + bool replace[Natts_pg_largeobject]; + CatalogIndexState indstate; + + Assert(PointerIsValid(obj_desc)); + + /* enforce writability because snapshot is probably wrong otherwise */ + if ((obj_desc->flags & IFS_WRLOCK) == 0) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied for large object %u", + obj_desc->id))); + + /* + * use errmsg_internal here because we don't want to expose INT64_FORMAT + * in translatable strings; doing better is not worth the trouble + */ + if (len < 0 || len > MAX_LARGE_OBJECT_SIZE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg_internal("invalid large object truncation target: " INT64_FORMAT, + len))); + + open_lo_relation(); + + indstate = CatalogOpenIndexes(lo_heap_r); + + /* + * Set up to find all pages with desired loid and pageno >= target + */ + ScanKeyInit(&skey[0], + Anum_pg_largeobject_loid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(obj_desc->id)); + + ScanKeyInit(&skey[1], + Anum_pg_largeobject_pageno, + BTGreaterEqualStrategyNumber, F_INT4GE, + Int32GetDatum(pageno)); + + sd = systable_beginscan_ordered(lo_heap_r, lo_index_r, + obj_desc->snapshot, 2, skey); + + /* + * If possible, get the page the truncation point is in. The truncation + * point may be beyond the end of the LO or in a hole. + */ + olddata = NULL; + if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL) + { + if (HeapTupleHasNulls(oldtuple)) /* paranoia */ + elog(ERROR, "null field found in pg_largeobject"); + olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple); + Assert(olddata->pageno >= pageno); + } + + /* + * If we found the page of the truncation point we need to truncate the + * data in it. Otherwise if we're in a hole, we need to create a page to + * mark the end of data. + */ + if (olddata != NULL && olddata->pageno == pageno) + { + /* First, load old data into workbuf */ + bytea *datafield; + int pagelen; + bool pfreeit; + + getdatafield(olddata, &datafield, &pagelen, &pfreeit); + memcpy(workb, VARDATA(datafield), pagelen); + if (pfreeit) + pfree(datafield); + + /* + * Fill any hole + */ + off = len % LOBLKSIZE; + if (off > pagelen) + MemSet(workb + pagelen, 0, off - pagelen); + + /* compute length of new page */ + SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ); + + /* + * Form and insert updated tuple + */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replace, false, sizeof(replace)); + values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf); + replace[Anum_pg_largeobject_data - 1] = true; + newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r), + values, nulls, replace); + CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup, + indstate); + heap_freetuple(newtup); + } + else + { + /* + * If the first page we found was after the truncation point, we're in + * a hole that we'll fill, but we need to delete the later page + * because the loop below won't visit it again. + */ + if (olddata != NULL) + { + Assert(olddata->pageno > pageno); + CatalogTupleDelete(lo_heap_r, &oldtuple->t_self); + } + + /* + * Write a brand new page. + * + * Fill the hole up to the truncation point + */ + off = len % LOBLKSIZE; + if (off > 0) + MemSet(workb, 0, off); + + /* compute length of new page */ + SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ); + + /* + * Form and insert new tuple + */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id); + values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno); + values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf); + newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls); + CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate); + heap_freetuple(newtup); + } + + /* + * Delete any pages after the truncation point. If the initial search + * didn't find a page, then of course there's nothing more to do. + */ + if (olddata != NULL) + { + while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL) + { + CatalogTupleDelete(lo_heap_r, &oldtuple->t_self); + } + } + + systable_endscan_ordered(sd); + + CatalogCloseIndexes(indstate); + + /* + * Advance command counter so that tuple updates will be seen by later + * large-object operations in this transaction. + */ + CommandCounterIncrement(); +} |