1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
|
/*-------------------------------------------------------------------------
*
* hio.c
* POSTGRES heap access method input/output code.
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/access/heap/hio.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "access/hio.h"
#include "access/htup_details.h"
#include "access/visibilitymap.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
#include "storage/smgr.h"
/*
* RelationPutHeapTuple - place tuple at specified page
*
* !!! EREPORT(ERROR) IS DISALLOWED HERE !!! Must PANIC on failure!!!
*
* Note - caller must hold BUFFER_LOCK_EXCLUSIVE on the buffer.
*/
void
RelationPutHeapTuple(Relation relation,
Buffer buffer,
HeapTuple tuple,
bool token)
{
Page pageHeader;
OffsetNumber offnum;
/*
* A tuple that's being inserted speculatively should already have its
* token set.
*/
Assert(!token || HeapTupleHeaderIsSpeculative(tuple->t_data));
/*
* Do not allow tuples with invalid combinations of hint bits to be placed
* on a page. This combination is detected as corruption by the
* contrib/amcheck logic, so if you disable this assertion, make
* corresponding changes there.
*/
Assert(!((tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED) &&
(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI)));
/* Add the tuple to the page */
pageHeader = BufferGetPage(buffer);
offnum = PageAddItem(pageHeader, (Item) tuple->t_data,
tuple->t_len, InvalidOffsetNumber, false, true);
if (offnum == InvalidOffsetNumber)
elog(PANIC, "failed to add tuple to page");
/* Update tuple->t_self to the actual position where it was stored */
ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum);
/*
* Insert the correct position into CTID of the stored tuple, too (unless
* this is a speculative insertion, in which case the token is held in
* CTID field instead)
*/
if (!token)
{
ItemId itemId = PageGetItemId(pageHeader, offnum);
HeapTupleHeader item = (HeapTupleHeader) PageGetItem(pageHeader, itemId);
item->t_ctid = tuple->t_self;
}
}
/*
* Read in a buffer in mode, using bulk-insert strategy if bistate isn't NULL.
*/
static Buffer
ReadBufferBI(Relation relation, BlockNumber targetBlock,
ReadBufferMode mode, BulkInsertState bistate)
{
Buffer buffer;
/* If not bulk-insert, exactly like ReadBuffer */
if (!bistate)
return ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
mode, NULL);
/* If we have the desired block already pinned, re-pin and return it */
if (bistate->current_buf != InvalidBuffer)
{
if (BufferGetBlockNumber(bistate->current_buf) == targetBlock)
{
/*
* Currently the LOCK variants are only used for extending
* relation, which should never reach this branch.
*/
Assert(mode != RBM_ZERO_AND_LOCK &&
mode != RBM_ZERO_AND_CLEANUP_LOCK);
IncrBufferRefCount(bistate->current_buf);
return bistate->current_buf;
}
/* ... else drop the old buffer */
ReleaseBuffer(bistate->current_buf);
bistate->current_buf = InvalidBuffer;
}
/* Perform a read using the buffer strategy */
buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
mode, bistate->strategy);
/* Save the selected block as target for future inserts */
IncrBufferRefCount(buffer);
bistate->current_buf = buffer;
return buffer;
}
/*
* For each heap page which is all-visible, acquire a pin on the appropriate
* visibility map page, if we haven't already got one.
*
* buffer2 may be InvalidBuffer, if only one buffer is involved. buffer1
* must not be InvalidBuffer. If both buffers are specified, block1 must
* be less than block2.
*/
static void
GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
BlockNumber block1, BlockNumber block2,
Buffer *vmbuffer1, Buffer *vmbuffer2)
{
bool need_to_pin_buffer1;
bool need_to_pin_buffer2;
Assert(BufferIsValid(buffer1));
Assert(buffer2 == InvalidBuffer || block1 <= block2);
while (1)
{
/* Figure out which pins we need but don't have. */
need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1))
&& !visibilitymap_pin_ok(block1, *vmbuffer1);
need_to_pin_buffer2 = buffer2 != InvalidBuffer
&& PageIsAllVisible(BufferGetPage(buffer2))
&& !visibilitymap_pin_ok(block2, *vmbuffer2);
if (!need_to_pin_buffer1 && !need_to_pin_buffer2)
return;
/* We must unlock both buffers before doing any I/O. */
LockBuffer(buffer1, BUFFER_LOCK_UNLOCK);
if (buffer2 != InvalidBuffer && buffer2 != buffer1)
LockBuffer(buffer2, BUFFER_LOCK_UNLOCK);
/* Get pins. */
if (need_to_pin_buffer1)
visibilitymap_pin(relation, block1, vmbuffer1);
if (need_to_pin_buffer2)
visibilitymap_pin(relation, block2, vmbuffer2);
/* Relock buffers. */
LockBuffer(buffer1, BUFFER_LOCK_EXCLUSIVE);
if (buffer2 != InvalidBuffer && buffer2 != buffer1)
LockBuffer(buffer2, BUFFER_LOCK_EXCLUSIVE);
/*
* If there are two buffers involved and we pinned just one of them,
* it's possible that the second one became all-visible while we were
* busy pinning the first one. If it looks like that's a possible
* scenario, we'll need to make a second pass through this loop.
*/
if (buffer2 == InvalidBuffer || buffer1 == buffer2
|| (need_to_pin_buffer1 && need_to_pin_buffer2))
break;
}
}
/*
* Extend a relation by multiple blocks to avoid future contention on the
* relation extension lock. Our goal is to pre-extend the relation by an
* amount which ramps up as the degree of contention ramps up, but limiting
* the result to some sane overall value.
*/
static void
RelationAddExtraBlocks(Relation relation, BulkInsertState bistate)
{
BlockNumber blockNum,
firstBlock = InvalidBlockNumber;
int extraBlocks;
int lockWaiters;
/* Use the length of the lock wait queue to judge how much to extend. */
lockWaiters = RelationExtensionLockWaiterCount(relation);
if (lockWaiters <= 0)
return;
/*
* It might seem like multiplying the number of lock waiters by as much as
* 20 is too aggressive, but benchmarking revealed that smaller numbers
* were insufficient. 512 is just an arbitrary cap to prevent
* pathological results.
*/
extraBlocks = Min(512, lockWaiters * 20);
do
{
Buffer buffer;
Page page;
Size freespace;
/*
* Extend by one page. This should generally match the main-line
* extension code in RelationGetBufferForTuple, except that we hold
* the relation extension lock throughout, and we don't immediately
* initialize the page (see below).
*/
buffer = ReadBufferBI(relation, P_NEW, RBM_ZERO_AND_LOCK, bistate);
page = BufferGetPage(buffer);
if (!PageIsNew(page))
elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
BufferGetBlockNumber(buffer),
RelationGetRelationName(relation));
/*
* Add the page to the FSM without initializing. If we were to
* initialize here, the page would potentially get flushed out to disk
* before we add any useful content. There's no guarantee that that'd
* happen before a potential crash, so we need to deal with
* uninitialized pages anyway, thus avoid the potential for
* unnecessary writes.
*/
/* we'll need this info below */
blockNum = BufferGetBlockNumber(buffer);
freespace = BufferGetPageSize(buffer) - SizeOfPageHeaderData;
UnlockReleaseBuffer(buffer);
/* Remember first block number thus added. */
if (firstBlock == InvalidBlockNumber)
firstBlock = blockNum;
/*
* Immediately update the bottom level of the FSM. This has a good
* chance of making this page visible to other concurrently inserting
* backends, and we want that to happen without delay.
*/
RecordPageWithFreeSpace(relation, blockNum, freespace);
}
while (--extraBlocks > 0);
/*
* Updating the upper levels of the free space map is too expensive to do
* for every block, but it's worth doing once at the end to make sure that
* subsequent insertion activity sees all of those nifty free pages we
* just inserted.
*/
FreeSpaceMapVacuumRange(relation, firstBlock, blockNum + 1);
}
/*
* RelationGetBufferForTuple
*
* Returns pinned and exclusive-locked buffer of a page in given relation
* with free space >= given len.
*
* If otherBuffer is not InvalidBuffer, then it references a previously
* pinned buffer of another page in the same relation; on return, this
* buffer will also be exclusive-locked. (This case is used by heap_update;
* the otherBuffer contains the tuple being updated.)
*
* The reason for passing otherBuffer is that if two backends are doing
* concurrent heap_update operations, a deadlock could occur if they try
* to lock the same two buffers in opposite orders. To ensure that this
* can't happen, we impose the rule that buffers of a relation must be
* locked in increasing page number order. This is most conveniently done
* by having RelationGetBufferForTuple lock them both, with suitable care
* for ordering.
*
* NOTE: it is unlikely, but not quite impossible, for otherBuffer to be the
* same buffer we select for insertion of the new tuple (this could only
* happen if space is freed in that page after heap_update finds there's not
* enough there). In that case, the page will be pinned and locked only once.
*
* We also handle the possibility that the all-visible flag will need to be
* cleared on one or both pages. If so, pin on the associated visibility map
* page must be acquired before acquiring buffer lock(s), to avoid possibly
* doing I/O while holding buffer locks. The pins are passed back to the
* caller using the input-output arguments vmbuffer and vmbuffer_other.
* Note that in some cases the caller might have already acquired such pins,
* which is indicated by these arguments not being InvalidBuffer on entry.
*
* We normally use FSM to help us find free space. However,
* if HEAP_INSERT_SKIP_FSM is specified, we just append a new empty page to
* the end of the relation if the tuple won't fit on the current target page.
* This can save some cycles when we know the relation is new and doesn't
* contain useful amounts of free space.
*
* HEAP_INSERT_SKIP_FSM is also useful for non-WAL-logged additions to a
* relation, if the caller holds exclusive lock and is careful to invalidate
* relation's smgr_targblock before the first insertion --- that ensures that
* all insertions will occur into newly added pages and not be intermixed
* with tuples from other transactions. That way, a crash can't risk losing
* any committed data of other transactions. (See heap_insert's comments
* for additional constraints needed for safe usage of this behavior.)
*
* The caller can also provide a BulkInsertState object to optimize many
* insertions into the same relation. This keeps a pin on the current
* insertion target page (to save pin/unpin cycles) and also passes a
* BULKWRITE buffer selection strategy object to the buffer manager.
* Passing NULL for bistate selects the default behavior.
*
* We don't fill existing pages further than the fillfactor, except for large
* tuples in nearly-empty pages. This is OK since this routine is not
* consulted when updating a tuple and keeping it on the same page, which is
* the scenario fillfactor is meant to reserve space for.
*
* ereport(ERROR) is allowed here, so this routine *must* be called
* before any (unlogged) changes are made in buffer pool.
*/
Buffer
RelationGetBufferForTuple(Relation relation, Size len,
Buffer otherBuffer, int options,
BulkInsertState bistate,
Buffer *vmbuffer, Buffer *vmbuffer_other)
{
bool use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
Buffer buffer = InvalidBuffer;
Page page;
Size nearlyEmptyFreeSpace,
pageFreeSpace = 0,
saveFreeSpace = 0,
targetFreeSpace = 0;
BlockNumber targetBlock,
otherBlock;
bool needLock;
len = MAXALIGN(len); /* be conservative */
/* Bulk insert is not supported for updates, only inserts. */
Assert(otherBuffer == InvalidBuffer || !bistate);
/*
* If we're gonna fail for oversize tuple, do it right away
*/
if (len > MaxHeapTupleSize)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("row is too big: size %zu, maximum size %zu",
len, MaxHeapTupleSize)));
/* Compute desired extra freespace due to fillfactor option */
saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
HEAP_DEFAULT_FILLFACTOR);
/*
* Since pages without tuples can still have line pointers, we consider
* pages "empty" when the unavailable space is slight. This threshold is
* somewhat arbitrary, but it should prevent most unnecessary relation
* extensions while inserting large tuples into low-fillfactor tables.
*/
nearlyEmptyFreeSpace = MaxHeapTupleSize -
(MaxHeapTuplesPerPage / 8 * sizeof(ItemIdData));
if (len + saveFreeSpace > nearlyEmptyFreeSpace)
targetFreeSpace = Max(len, nearlyEmptyFreeSpace);
else
targetFreeSpace = len + saveFreeSpace;
if (otherBuffer != InvalidBuffer)
otherBlock = BufferGetBlockNumber(otherBuffer);
else
otherBlock = InvalidBlockNumber; /* just to keep compiler quiet */
/*
* We first try to put the tuple on the same page we last inserted a tuple
* on, as cached in the BulkInsertState or relcache entry. If that
* doesn't work, we ask the Free Space Map to locate a suitable page.
* Since the FSM's info might be out of date, we have to be prepared to
* loop around and retry multiple times. (To insure this isn't an infinite
* loop, we must update the FSM with the correct amount of free space on
* each page that proves not to be suitable.) If the FSM has no record of
* a page with enough free space, we give up and extend the relation.
*
* When use_fsm is false, we either put the tuple onto the existing target
* page or extend the relation.
*/
if (bistate && bistate->current_buf != InvalidBuffer)
targetBlock = BufferGetBlockNumber(bistate->current_buf);
else
targetBlock = RelationGetTargetBlock(relation);
if (targetBlock == InvalidBlockNumber && use_fsm)
{
/*
* We have no cached target page, so ask the FSM for an initial
* target.
*/
targetBlock = GetPageWithFreeSpace(relation, targetFreeSpace);
}
/*
* If the FSM knows nothing of the rel, try the last page before we give
* up and extend. This avoids one-tuple-per-page syndrome during
* bootstrapping or in a recently-started system.
*/
if (targetBlock == InvalidBlockNumber)
{
BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
if (nblocks > 0)
targetBlock = nblocks - 1;
}
loop:
while (targetBlock != InvalidBlockNumber)
{
/*
* Read and exclusive-lock the target block, as well as the other
* block if one was given, taking suitable care with lock ordering and
* the possibility they are the same block.
*
* If the page-level all-visible flag is set, caller will need to
* clear both that and the corresponding visibility map bit. However,
* by the time we return, we'll have x-locked the buffer, and we don't
* want to do any I/O while in that state. So we check the bit here
* before taking the lock, and pin the page if it appears necessary.
* Checking without the lock creates a risk of getting the wrong
* answer, so we'll have to recheck after acquiring the lock.
*/
if (otherBuffer == InvalidBuffer)
{
/* easy case */
buffer = ReadBufferBI(relation, targetBlock, RBM_NORMAL, bistate);
if (PageIsAllVisible(BufferGetPage(buffer)))
visibilitymap_pin(relation, targetBlock, vmbuffer);
/*
* If the page is empty, pin vmbuffer to set all_frozen bit later.
*/
if ((options & HEAP_INSERT_FROZEN) &&
(PageGetMaxOffsetNumber(BufferGetPage(buffer)) == 0))
visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}
else if (otherBlock == targetBlock)
{
/* also easy case */
buffer = otherBuffer;
if (PageIsAllVisible(BufferGetPage(buffer)))
visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}
else if (otherBlock < targetBlock)
{
/* lock other buffer first */
buffer = ReadBuffer(relation, targetBlock);
if (PageIsAllVisible(BufferGetPage(buffer)))
visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}
else
{
/* lock target buffer first */
buffer = ReadBuffer(relation, targetBlock);
if (PageIsAllVisible(BufferGetPage(buffer)))
visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
}
/*
* We now have the target page (and the other buffer, if any) pinned
* and locked. However, since our initial PageIsAllVisible checks
* were performed before acquiring the lock, the results might now be
* out of date, either for the selected victim buffer, or for the
* other buffer passed by the caller. In that case, we'll need to
* give up our locks, go get the pin(s) we failed to get earlier, and
* re-lock. That's pretty painful, but hopefully shouldn't happen
* often.
*
* Note that there's a small possibility that we didn't pin the page
* above but still have the correct page pinned anyway, either because
* we've already made a previous pass through this loop, or because
* caller passed us the right page anyway.
*
* Note also that it's possible that by the time we get the pin and
* retake the buffer locks, the visibility map bit will have been
* cleared by some other backend anyway. In that case, we'll have
* done a bit of extra work for no gain, but there's no real harm
* done.
*/
if (otherBuffer == InvalidBuffer || targetBlock <= otherBlock)
GetVisibilityMapPins(relation, buffer, otherBuffer,
targetBlock, otherBlock, vmbuffer,
vmbuffer_other);
else
GetVisibilityMapPins(relation, otherBuffer, buffer,
otherBlock, targetBlock, vmbuffer_other,
vmbuffer);
/*
* Now we can check to see if there's enough free space here. If so,
* we're done.
*/
page = BufferGetPage(buffer);
/*
* If necessary initialize page, it'll be used soon. We could avoid
* dirtying the buffer here, and rely on the caller to do so whenever
* it puts a tuple onto the page, but there seems not much benefit in
* doing so.
*/
if (PageIsNew(page))
{
PageInit(page, BufferGetPageSize(buffer), 0);
MarkBufferDirty(buffer);
}
pageFreeSpace = PageGetHeapFreeSpace(page);
if (targetFreeSpace <= pageFreeSpace)
{
/* use this page as future insert target, too */
RelationSetTargetBlock(relation, targetBlock);
return buffer;
}
/*
* Not enough space, so we must give up our page locks and pin (if
* any) and prepare to look elsewhere. We don't care which order we
* unlock the two buffers in, so this can be slightly simpler than the
* code above.
*/
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
if (otherBuffer == InvalidBuffer)
ReleaseBuffer(buffer);
else if (otherBlock != targetBlock)
{
LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
}
/* Without FSM, always fall out of the loop and extend */
if (!use_fsm)
break;
/*
* Update FSM as to condition of this page, and ask for another page
* to try.
*/
targetBlock = RecordAndGetPageWithFreeSpace(relation,
targetBlock,
pageFreeSpace,
targetFreeSpace);
}
/*
* Have to extend the relation.
*
* We have to use a lock to ensure no one else is extending the rel at the
* same time, else we will both try to initialize the same new page. We
* can skip locking for new or temp relations, however, since no one else
* could be accessing them.
*/
needLock = !RELATION_IS_LOCAL(relation);
/*
* If we need the lock but are not able to acquire it immediately, we'll
* consider extending the relation by multiple blocks at a time to manage
* contention on the relation extension lock. However, this only makes
* sense if we're using the FSM; otherwise, there's no point.
*/
if (needLock)
{
if (!use_fsm)
LockRelationForExtension(relation, ExclusiveLock);
else if (!ConditionalLockRelationForExtension(relation, ExclusiveLock))
{
/* Couldn't get the lock immediately; wait for it. */
LockRelationForExtension(relation, ExclusiveLock);
/*
* Check if some other backend has extended a block for us while
* we were waiting on the lock.
*/
targetBlock = GetPageWithFreeSpace(relation, targetFreeSpace);
/*
* If some other waiter has already extended the relation, we
* don't need to do so; just use the existing freespace.
*/
if (targetBlock != InvalidBlockNumber)
{
UnlockRelationForExtension(relation, ExclusiveLock);
goto loop;
}
/* Time to bulk-extend. */
RelationAddExtraBlocks(relation, bistate);
}
}
/*
* In addition to whatever extension we performed above, we always add at
* least one block to satisfy our own request.
*
* XXX This does an lseek - rather expensive - but at the moment it is the
* only way to accurately determine how many blocks are in a relation. Is
* it worth keeping an accurate file length in shared memory someplace,
* rather than relying on the kernel to do it for us?
*/
buffer = ReadBufferBI(relation, P_NEW, RBM_ZERO_AND_LOCK, bistate);
/*
* We need to initialize the empty new page. Double-check that it really
* is empty (this should never happen, but if it does we don't want to
* risk wiping out valid data).
*/
page = BufferGetPage(buffer);
if (!PageIsNew(page))
elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
BufferGetBlockNumber(buffer),
RelationGetRelationName(relation));
PageInit(page, BufferGetPageSize(buffer), 0);
MarkBufferDirty(buffer);
/*
* The page is empty, pin vmbuffer to set all_frozen bit.
*/
if (options & HEAP_INSERT_FROZEN)
{
Assert(PageGetMaxOffsetNumber(BufferGetPage(buffer)) == 0);
visibilitymap_pin(relation, BufferGetBlockNumber(buffer), vmbuffer);
}
/*
* Release the file-extension lock; it's now OK for someone else to extend
* the relation some more.
*/
if (needLock)
UnlockRelationForExtension(relation, ExclusiveLock);
/*
* Lock the other buffer. It's guaranteed to be of a lower page number
* than the new page. To conform with the deadlock prevent rules, we ought
* to lock otherBuffer first, but that would give other backends a chance
* to put tuples on our page. To reduce the likelihood of that, attempt to
* lock the other buffer conditionally, that's very likely to work.
* Otherwise we need to lock buffers in the correct order, and retry if
* the space has been used in the mean time.
*
* Alternatively, we could acquire the lock on otherBuffer before
* extending the relation, but that'd require holding the lock while
* performing IO, which seems worse than an unlikely retry.
*/
if (otherBuffer != InvalidBuffer)
{
Assert(otherBuffer != buffer);
targetBlock = BufferGetBlockNumber(buffer);
Assert(targetBlock > otherBlock);
if (unlikely(!ConditionalLockBuffer(otherBuffer)))
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
/*
* Because the buffers were unlocked for a while, it's possible,
* although unlikely, that an all-visible flag became set or that
* somebody used up the available space in the new page. We can
* use GetVisibilityMapPins to deal with the first case. In the
* second case, just retry from start.
*/
GetVisibilityMapPins(relation, otherBuffer, buffer,
otherBlock, targetBlock, vmbuffer_other,
vmbuffer);
if (len > PageGetHeapFreeSpace(page))
{
LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
UnlockReleaseBuffer(buffer);
goto loop;
}
}
}
if (len > PageGetHeapFreeSpace(page))
{
/* We should not get here given the test at the top */
elog(PANIC, "tuple is too big: size %zu", len);
}
/*
* Remember the new page as our target for future insertions.
*
* XXX should we enter the new page into the free space map immediately,
* or just keep it for this backend's exclusive use in the short run
* (until VACUUM sees it)? Seems to depend on whether you expect the
* current backend to make more insertions or not, which is probably a
* good bet most of the time. So for now, don't add it to FSM yet.
*/
RelationSetTargetBlock(relation, BufferGetBlockNumber(buffer));
return buffer;
}
|