1
/*-------------------------------------------------------------------------
4
* POSTGRES shared cache invalidation data manager.
6
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
7
* Portions Copyright (c) 1994, Regents of the University of California
13
*-------------------------------------------------------------------------
20
#include "miscadmin.h"
21
#include "storage/backendid.h"
22
#include "storage/ipc.h"
23
#include "storage/proc.h"
24
#include "storage/shmem.h"
25
#include "storage/sinvaladt.h"
26
#include "storage/spin.h"
30
* Conceptually, the shared cache invalidation messages are stored in an
31
* infinite array, where maxMsgNum is the next array subscript to store a
32
* submitted message in, minMsgNum is the smallest array subscript containing
33
* a message not yet read by all backends, and we always have maxMsgNum >=
34
* minMsgNum. (They are equal when there are no messages pending.) For each
35
* active backend, there is a nextMsgNum pointer indicating the next message it
36
* needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
39
* (In the current implementation, minMsgNum is a lower bound for the
40
* per-process nextMsgNum values, but it isn't rigorously kept equal to the
41
* smallest nextMsgNum --- it may lag behind. We only update it when
42
* SICleanupQueue is called, and we try not to do that often.)
44
* In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
45
* entries. We translate MsgNum values into circular-buffer indexes by
46
* computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
47
* MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
48
* doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
49
* in the buffer. If the buffer does overflow, we recover by setting the
50
* "reset" flag for each backend that has fallen too far behind. A backend
51
* that is in "reset" state is ignored while determining minMsgNum. When
52
* it does finally attempt to receive inval messages, it must discard all
53
* its invalidatable state, since it won't know what it missed.
55
* To reduce the probability of needing resets, we send a "catchup" interrupt
56
* to any backend that seems to be falling unreasonably far behind. The
57
* normal behavior is that at most one such interrupt is in flight at a time;
58
* when a backend completes processing a catchup interrupt, it executes
59
* SICleanupQueue, which will signal the next-furthest-behind backend if
60
* needed. This avoids undue contention from multiple backends all trying
61
* to catch up at once. However, the furthest-back backend might be stuck
62
* in a state where it can't catch up. Eventually it will get reset, so it
63
* won't cause any more problems for anyone but itself. But we don't want
64
* to find that a bunch of other backends are now too close to the reset
65
* threshold to be saved. So SICleanupQueue is designed to occasionally
66
* send extra catchup interrupts as the queue gets fuller, to backends that
67
* are far behind and haven't gotten one yet. As long as there aren't a lot
68
* of "stuck" backends, we won't need a lot of extra interrupts, since ones
69
* that aren't stuck will propagate their interrupts to the next guy.
71
* We would have problems if the MsgNum values overflow an integer, so
72
* whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
73
* from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
74
* large so that we don't need to do this often. It must be a multiple of
75
* MAXNUMMESSAGES so that the existing circular-buffer entries don't need
76
* to be moved when we do it.
78
* Access to the shared sinval array is protected by two locks, SInvalReadLock
79
* and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
80
* authorizes them to modify their own ProcState but not to modify or even
81
* look at anyone else's. When we need to perform array-wide updates,
82
* such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
83
* lock out all readers. Writers take SInvalWriteLock (always in exclusive
84
* mode) to serialize adding messages to the queue. Note that a writer
85
* can operate in parallel with one or more readers, because the writer
86
* has no need to touch anyone's ProcState, except in the infrequent cases
87
* when SICleanupQueue is needed. The only point of overlap is that
88
* the writer wants to change maxMsgNum while readers need to read it.
89
* We deal with that by having a spinlock that readers must take for just
90
* long enough to read maxMsgNum, while writers take it for just long enough
91
* to write maxMsgNum. (The exact rule is that you need the spinlock to
92
* read maxMsgNum if you are not holding SInvalWriteLock, and you need the
93
* spinlock to write maxMsgNum unless you are holding both locks.)
95
* Note: since maxMsgNum is an int and hence presumably atomically readable/
96
* writable, the spinlock might seem unnecessary. The reason it is needed
97
* is to provide a memory barrier: we need to be sure that messages written
98
* to the array are actually there before maxMsgNum is increased, and that
99
* readers will see that data after fetching maxMsgNum. Multiprocessors
100
* that have weak memory-ordering guarantees can fail without the memory
101
* barrier instructions that are included in the spinlock sequences.
106
* Configurable parameters.
108
* MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
109
* Must be a power of 2 for speed.
111
* MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
112
* Must be a multiple of MAXNUMMESSAGES. Should be large.
114
* CLEANUP_MIN: the minimum number of messages that must be in the buffer
115
* before we bother to call SICleanupQueue.
117
* CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
118
* we exceed CLEANUP_MIN. Should be a power of 2 for speed.
120
* SIG_THRESHOLD: the minimum number of messages a backend must have fallen
121
* behind before we'll send it SIGUSR1.
123
* WRITE_QUANTUM: the max number of messages to push into the buffer per
124
* iteration of SIInsertDataEntries. Noncritical but should be less than
125
* CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
129
#define MAXNUMMESSAGES 4096
130
#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
131
#define CLEANUP_MIN (MAXNUMMESSAGES / 2)
132
#define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
133
#define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
134
#define WRITE_QUANTUM 64
136
/* Per-backend state in shared invalidation structure */
137
typedef struct ProcState
139
/* procPid is zero in an inactive ProcState array entry. */
140
pid_t procPid; /* PID of backend, for signaling */
141
/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
142
int nextMsgNum; /* next message number to read */
143
bool resetState; /* backend needs to reset its state */
144
bool signaled; /* backend has been sent catchup signal */
147
* Next LocalTransactionId to use for each idle backend slot. We keep
148
* this here because it is indexed by BackendId and it is convenient to
149
* copy the value to and from local memory when MyBackendId is set.
150
* It's meaningless in an active ProcState entry.
152
LocalTransactionId nextLXID;
155
/* Shared cache invalidation memory segment */
159
* General state information
161
int minMsgNum; /* oldest message still needed */
162
int maxMsgNum; /* next message number to be assigned */
163
int nextThreshold; /* # of messages to call SICleanupQueue */
164
int lastBackend; /* index of last active procState entry, +1 */
165
int maxBackends; /* size of procState array */
167
slock_t msgnumLock; /* spinlock protecting maxMsgNum */
170
* Circular buffer holding shared-inval messages
172
SharedInvalidationMessage buffer[MAXNUMMESSAGES];
175
* Per-backend state info.
177
* We declare procState as 1 entry because C wants a fixed-size array, but
178
* actually it is maxBackends entries long.
180
ProcState procState[1]; /* reflects the invalidation state */
183
static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
186
static LocalTransactionId nextLocalTransactionId;
188
static void CleanupInvalidationState(int status, Datum arg);
192
* SInvalShmemSize --- return shared-memory space needed
195
SInvalShmemSize(void)
199
size = offsetof(SISeg, procState);
200
size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
206
* SharedInvalBufferInit
207
* Create and initialize the SI message buffer
210
CreateSharedInvalidationState(void)
216
/* Allocate space in shared memory */
217
size = offsetof(SISeg, procState);
218
size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
220
shmInvalBuffer = (SISeg *)
221
ShmemInitStruct("shmInvalBuffer", size, &found);
225
/* Clear message counters, save size of procState array, init spinlock */
226
shmInvalBuffer->minMsgNum = 0;
227
shmInvalBuffer->maxMsgNum = 0;
228
shmInvalBuffer->nextThreshold = CLEANUP_MIN;
229
shmInvalBuffer->lastBackend = 0;
230
shmInvalBuffer->maxBackends = MaxBackends;
231
SpinLockInit(&shmInvalBuffer->msgnumLock);
233
/* The buffer[] array is initially all unused, so we need not fill it */
235
/* Mark all backends inactive, and initialize nextLXID */
236
for (i = 0; i < shmInvalBuffer->maxBackends; i++)
238
shmInvalBuffer->procState[i].procPid = 0; /* inactive */
239
shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
240
shmInvalBuffer->procState[i].resetState = false;
241
shmInvalBuffer->procState[i].signaled = false;
242
shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
247
* SharedInvalBackendInit
248
* Initialize a new backend to operate on the sinval buffer
251
SharedInvalBackendInit(void)
254
ProcState *stateP = NULL;
255
SISeg *segP = shmInvalBuffer;
258
* This can run in parallel with read operations, and for that matter
259
* with write operations; but not in parallel with additions and removals
260
* of backends, nor in parallel with SICleanupQueue. It doesn't seem
261
* worth having a third lock, so we choose to use SInvalWriteLock to
262
* serialize additions/removals.
264
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
266
/* Look for a free entry in the procState array */
267
for (index = 0; index < segP->lastBackend; index++)
269
if (segP->procState[index].procPid == 0) /* inactive slot? */
271
stateP = &segP->procState[index];
278
if (segP->lastBackend < segP->maxBackends)
280
stateP = &segP->procState[segP->lastBackend];
281
Assert(stateP->procPid == 0);
287
* out of procState slots: MaxBackends exceeded -- report normally
289
MyBackendId = InvalidBackendId;
290
LWLockRelease(SInvalWriteLock);
292
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
293
errmsg("sorry, too many clients already")));
297
MyBackendId = (stateP - &segP->procState[0]) + 1;
299
/* Advertise assigned backend ID in MyProc */
300
MyProc->backendId = MyBackendId;
302
/* Fetch next local transaction ID into local memory */
303
nextLocalTransactionId = stateP->nextLXID;
305
/* mark myself active, with all extant messages already read */
306
stateP->procPid = MyProcPid;
307
stateP->nextMsgNum = segP->maxMsgNum;
308
stateP->resetState = false;
309
stateP->signaled = false;
311
LWLockRelease(SInvalWriteLock);
313
/* register exit routine to mark my entry inactive at exit */
314
on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
316
elog(DEBUG4, "my backend id is %d", MyBackendId);
320
* CleanupInvalidationState
321
* Mark the current backend as no longer active.
323
* This function is called via on_shmem_exit() during backend shutdown.
325
* arg is really of type "SISeg*".
328
CleanupInvalidationState(int status, Datum arg)
330
SISeg *segP = (SISeg *) DatumGetPointer(arg);
334
Assert(PointerIsValid(segP));
336
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
338
stateP = &segP->procState[MyBackendId - 1];
340
/* Update next local transaction ID for next holder of this backendID */
341
stateP->nextLXID = nextLocalTransactionId;
343
/* Mark myself inactive */
345
stateP->nextMsgNum = 0;
346
stateP->resetState = false;
347
stateP->signaled = false;
349
/* Recompute index of last active backend */
350
for (i = segP->lastBackend; i > 0; i--)
352
if (segP->procState[i - 1].procPid != 0)
355
segP->lastBackend = i;
357
LWLockRelease(SInvalWriteLock);
362
* Test if the given backend ID is currently assigned to a process.
365
BackendIdIsActive(int backendID)
368
SISeg *segP = shmInvalBuffer;
370
/* Need to lock out additions/removals of backends */
371
LWLockAcquire(SInvalWriteLock, LW_SHARED);
373
if (backendID > 0 && backendID <= segP->lastBackend)
375
ProcState *stateP = &segP->procState[backendID - 1];
377
result = (stateP->procPid != 0);
382
LWLockRelease(SInvalWriteLock);
388
* SIInsertDataEntries
389
* Add new invalidation message(s) to the buffer.
392
SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
394
SISeg *segP = shmInvalBuffer;
397
* N can be arbitrarily large. We divide the work into groups of no more
398
* than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
399
* an unreasonably long time. (This is not so much because we care about
400
* letting in other writers, as that some just-caught-up backend might be
401
* trying to do SICleanupQueue to pass on its signal, and we don't want it
402
* to have to wait a long time.) Also, we need to consider calling
403
* SICleanupQueue every so often.
407
int nthistime = Min(n, WRITE_QUANTUM);
413
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
416
* If the buffer is full, we *must* acquire some space. Clean the
417
* queue and reset anyone who is preventing space from being freed.
418
* Otherwise, clean the queue only when it's exceeded the next
419
* fullness threshold. We have to loop and recheck the buffer state
420
* after any call of SICleanupQueue.
424
numMsgs = segP->maxMsgNum - segP->minMsgNum;
425
if (numMsgs + nthistime > MAXNUMMESSAGES ||
426
numMsgs >= segP->nextThreshold)
427
SICleanupQueue(true, nthistime);
433
* Insert new message(s) into proper slot of circular buffer
435
max = segP->maxMsgNum;
436
while (nthistime-- > 0)
438
segP->buffer[max % MAXNUMMESSAGES] = *data++;
442
/* Update current value of maxMsgNum using spinlock */
444
/* use volatile pointer to prevent code rearrangement */
445
volatile SISeg *vsegP = segP;
447
SpinLockAcquire(&vsegP->msgnumLock);
448
vsegP->maxMsgNum = max;
449
SpinLockRelease(&vsegP->msgnumLock);
452
LWLockRelease(SInvalWriteLock);
458
* get next SI message(s) for current backend, if there are any
460
* Possible return values:
461
* 0: no SI message available
462
* n>0: next n SI messages have been extracted into data[]
463
* -1: SI reset message extracted
465
* If the return value is less than the array size "datasize", the caller
466
* can assume that there are no more SI messages after the one(s) returned.
467
* Otherwise, another call is needed to collect more messages.
469
* NB: this can run in parallel with other instances of SIGetDataEntries
470
* executing on behalf of other backends, since each instance will modify only
471
* fields of its own backend's ProcState, and no instance will look at fields
472
* of other backends' ProcStates. We express this by grabbing SInvalReadLock
473
* in shared mode. Note that this is not exactly the normal (read-only)
474
* interpretation of a shared lock! Look closely at the interactions before
475
* allowing SInvalReadLock to be grabbed in shared mode for any other reason!
477
* NB: this can also run in parallel with SIInsertDataEntries. It is not
478
* guaranteed that we will return any messages added after the routine is
481
* Note: we assume that "datasize" is not so large that it might be important
482
* to break our hold on SInvalReadLock into segments.
485
SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
492
LWLockAcquire(SInvalReadLock, LW_SHARED);
494
segP = shmInvalBuffer;
495
stateP = &segP->procState[MyBackendId - 1];
497
/* Fetch current value of maxMsgNum using spinlock */
499
/* use volatile pointer to prevent code rearrangement */
500
volatile SISeg *vsegP = segP;
502
SpinLockAcquire(&vsegP->msgnumLock);
503
max = vsegP->maxMsgNum;
504
SpinLockRelease(&vsegP->msgnumLock);
507
if (stateP->resetState)
510
* Force reset. We can say we have dealt with any messages added
511
* since the reset, as well; and that means we should clear the
512
* signaled flag, too.
514
stateP->nextMsgNum = max;
515
stateP->resetState = false;
516
stateP->signaled = false;
517
LWLockRelease(SInvalReadLock);
522
* Retrieve messages and advance backend's counter, until data array is
523
* full or there are no more messages.
525
* There may be other backends that haven't read the message(s), so we
526
* cannot delete them here. SICleanupQueue() will eventually remove them
530
while (n < datasize && stateP->nextMsgNum < max)
532
data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
533
stateP->nextMsgNum++;
537
* Reset our "signaled" flag whenever we have caught up completely.
539
if (stateP->nextMsgNum >= max)
540
stateP->signaled = false;
542
LWLockRelease(SInvalReadLock);
548
* Remove messages that have been consumed by all active backends
550
* callerHasWriteLock is TRUE if caller is holding SInvalWriteLock.
551
* minFree is the minimum number of message slots to make free.
553
* Possible side effects of this routine include marking one or more
554
* backends as "reset" in the array, and sending a catchup interrupt (SIGUSR1)
555
* to some backend that seems to be getting too far behind. We signal at
556
* most one backend at a time, for reasons explained at the top of the file.
558
* Caution: because we transiently release write lock when we have to signal
559
* some other backend, it is NOT guaranteed that there are still minFree
560
* free message slots at exit. Caller must recheck and perhaps retry.
563
SICleanupQueue(bool callerHasWriteLock, int minFree)
565
SISeg *segP = shmInvalBuffer;
571
ProcState *needSig = NULL;
573
/* Lock out all writers and readers */
574
if (!callerHasWriteLock)
575
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
576
LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
579
* Recompute minMsgNum = minimum of all backends' nextMsgNum, identify
580
* the furthest-back backend that needs signaling (if any), and reset
581
* any backends that are too far back.
583
min = segP->maxMsgNum;
584
minsig = min - SIG_THRESHOLD;
585
lowbound = min - MAXNUMMESSAGES + minFree;
587
for (i = 0; i < segP->lastBackend; i++)
589
ProcState *stateP = &segP->procState[i];
590
int n = stateP->nextMsgNum;
592
/* Ignore if inactive or already in reset state */
593
if (stateP->procPid == 0 || stateP->resetState)
597
* If we must free some space and this backend is preventing it,
598
* force him into reset state and then ignore until he catches up.
602
stateP->resetState = true;
603
/* no point in signaling him ... */
607
/* Track the global minimum nextMsgNum */
611
/* Also see who's furthest back of the unsignaled backends */
612
if (n < minsig && !stateP->signaled)
618
segP->minMsgNum = min;
621
* When minMsgNum gets really large, decrement all message counters so as
622
* to forestall overflow of the counters. This happens seldom enough
623
* that folding it into the previous loop would be a loser.
625
if (min >= MSGNUMWRAPAROUND)
627
segP->minMsgNum -= MSGNUMWRAPAROUND;
628
segP->maxMsgNum -= MSGNUMWRAPAROUND;
629
for (i = 0; i < segP->lastBackend; i++)
631
/* we don't bother skipping inactive entries here */
632
segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
637
* Determine how many messages are still in the queue, and set the
638
* threshold at which we should repeat SICleanupQueue().
640
numMsgs = segP->maxMsgNum - segP->minMsgNum;
641
if (numMsgs < CLEANUP_MIN)
642
segP->nextThreshold = CLEANUP_MIN;
644
segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
647
* Lastly, signal anyone who needs a catchup interrupt. Since kill()
648
* might not be fast, we don't want to hold locks while executing it.
652
pid_t his_pid = needSig->procPid;
654
needSig->signaled = true;
655
LWLockRelease(SInvalReadLock);
656
LWLockRelease(SInvalWriteLock);
657
elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
658
kill(his_pid, SIGUSR1);
659
if (callerHasWriteLock)
660
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
664
LWLockRelease(SInvalReadLock);
665
if (!callerHasWriteLock)
666
LWLockRelease(SInvalWriteLock);
672
* GetNextLocalTransactionId --- allocate a new LocalTransactionId
674
* We split VirtualTransactionIds into two parts so that it is possible
675
* to allocate a new one without any contention for shared memory, except
676
* for a bit of additional overhead during backend startup/shutdown.
677
* The high-order part of a VirtualTransactionId is a BackendId, and the
678
* low-order part is a LocalTransactionId, which we assign from a local
679
* counter. To avoid the risk of a VirtualTransactionId being reused
680
* within a short interval, successive procs occupying the same backend ID
681
* slot should use a consecutive sequence of local IDs, which is implemented
682
* by copying nextLocalTransactionId as seen above.
685
GetNextLocalTransactionId(void)
687
LocalTransactionId result;
689
/* loop to avoid returning InvalidLocalTransactionId at wraparound */
692
result = nextLocalTransactionId++;
693
} while (!LocalTransactionIdIsValid(result));