~ubuntu-branches/ubuntu/hardy/postgresql-8.4/hardy-backports

« back to all changes in this revision

Viewing changes to src/backend/storage/ipc/sinvaladt.c

  • Committer: Bazaar Package Importer
  • Author(s): Martin Pitt
  • Date: 2009-03-20 12:00:13 UTC
  • Revision ID: james.westby@ubuntu.com-20090320120013-hogj7egc5mjncc5g
Tags: upstream-8.4~0cvs20090328
ImportĀ upstreamĀ versionĀ 8.4~0cvs20090328

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * sinvaladt.c
 
4
 *        POSTGRES shared cache invalidation data manager.
 
5
 *
 
6
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
 
7
 * Portions Copyright (c) 1994, Regents of the University of California
 
8
 *
 
9
 *
 
10
 * IDENTIFICATION
 
11
 *        $PostgreSQL$
 
12
 *
 
13
 *-------------------------------------------------------------------------
 
14
 */
 
15
#include "postgres.h"
 
16
 
 
17
#include <signal.h>
 
18
#include <unistd.h>
 
19
 
 
20
#include "miscadmin.h"
 
21
#include "storage/backendid.h"
 
22
#include "storage/ipc.h"
 
23
#include "storage/proc.h"
 
24
#include "storage/shmem.h"
 
25
#include "storage/sinvaladt.h"
 
26
#include "storage/spin.h"
 
27
 
 
28
 
 
29
/*
 
30
 * Conceptually, the shared cache invalidation messages are stored in an
 
31
 * infinite array, where maxMsgNum is the next array subscript to store a
 
32
 * submitted message in, minMsgNum is the smallest array subscript containing
 
33
 * a message not yet read by all backends, and we always have maxMsgNum >=
 
34
 * minMsgNum.  (They are equal when there are no messages pending.)  For each
 
35
 * active backend, there is a nextMsgNum pointer indicating the next message it
 
36
 * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
 
37
 * backend.
 
38
 *
 
39
 * (In the current implementation, minMsgNum is a lower bound for the
 
40
 * per-process nextMsgNum values, but it isn't rigorously kept equal to the
 
41
 * smallest nextMsgNum --- it may lag behind.  We only update it when
 
42
 * SICleanupQueue is called, and we try not to do that often.)
 
43
 *
 
44
 * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
 
45
 * entries.  We translate MsgNum values into circular-buffer indexes by
 
46
 * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
 
47
 * MAXNUMMESSAGES is a constant and a power of 2).      As long as maxMsgNum
 
48
 * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
 
49
 * in the buffer.  If the buffer does overflow, we recover by setting the
 
50
 * "reset" flag for each backend that has fallen too far behind.  A backend
 
51
 * that is in "reset" state is ignored while determining minMsgNum.  When
 
52
 * it does finally attempt to receive inval messages, it must discard all
 
53
 * its invalidatable state, since it won't know what it missed.
 
54
 *
 
55
 * To reduce the probability of needing resets, we send a "catchup" interrupt
 
56
 * to any backend that seems to be falling unreasonably far behind.  The
 
57
 * normal behavior is that at most one such interrupt is in flight at a time;
 
58
 * when a backend completes processing a catchup interrupt, it executes
 
59
 * SICleanupQueue, which will signal the next-furthest-behind backend if
 
60
 * needed.  This avoids undue contention from multiple backends all trying
 
61
 * to catch up at once.  However, the furthest-back backend might be stuck
 
62
 * in a state where it can't catch up.  Eventually it will get reset, so it
 
63
 * won't cause any more problems for anyone but itself.  But we don't want
 
64
 * to find that a bunch of other backends are now too close to the reset
 
65
 * threshold to be saved.  So SICleanupQueue is designed to occasionally
 
66
 * send extra catchup interrupts as the queue gets fuller, to backends that
 
67
 * are far behind and haven't gotten one yet.  As long as there aren't a lot
 
68
 * of "stuck" backends, we won't need a lot of extra interrupts, since ones
 
69
 * that aren't stuck will propagate their interrupts to the next guy.
 
70
 *
 
71
 * We would have problems if the MsgNum values overflow an integer, so
 
72
 * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
 
73
 * from all the MsgNum variables simultaneously.  MSGNUMWRAPAROUND can be
 
74
 * large so that we don't need to do this often.  It must be a multiple of
 
75
 * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
 
76
 * to be moved when we do it.
 
77
 *
 
78
 * Access to the shared sinval array is protected by two locks, SInvalReadLock
 
79
 * and SInvalWriteLock.  Readers take SInvalReadLock in shared mode; this
 
80
 * authorizes them to modify their own ProcState but not to modify or even
 
81
 * look at anyone else's.  When we need to perform array-wide updates,
 
82
 * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
 
83
 * lock out all readers.  Writers take SInvalWriteLock (always in exclusive
 
84
 * mode) to serialize adding messages to the queue.  Note that a writer
 
85
 * can operate in parallel with one or more readers, because the writer
 
86
 * has no need to touch anyone's ProcState, except in the infrequent cases
 
87
 * when SICleanupQueue is needed.  The only point of overlap is that
 
88
 * the writer wants to change maxMsgNum while readers need to read it.
 
89
 * We deal with that by having a spinlock that readers must take for just
 
90
 * long enough to read maxMsgNum, while writers take it for just long enough
 
91
 * to write maxMsgNum.  (The exact rule is that you need the spinlock to
 
92
 * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
 
93
 * spinlock to write maxMsgNum unless you are holding both locks.)
 
94
 *
 
95
 * Note: since maxMsgNum is an int and hence presumably atomically readable/
 
96
 * writable, the spinlock might seem unnecessary.  The reason it is needed
 
97
 * is to provide a memory barrier: we need to be sure that messages written
 
98
 * to the array are actually there before maxMsgNum is increased, and that
 
99
 * readers will see that data after fetching maxMsgNum.  Multiprocessors
 
100
 * that have weak memory-ordering guarantees can fail without the memory
 
101
 * barrier instructions that are included in the spinlock sequences.
 
102
 */
 
103
 
 
104
 
 
105
/*
 
106
 * Configurable parameters.
 
107
 *
 
108
 * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
 
109
 * Must be a power of 2 for speed.
 
110
 *
 
111
 * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
 
112
 * Must be a multiple of MAXNUMMESSAGES.  Should be large.
 
113
 *
 
114
 * CLEANUP_MIN: the minimum number of messages that must be in the buffer
 
115
 * before we bother to call SICleanupQueue.
 
116
 *
 
117
 * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
 
118
 * we exceed CLEANUP_MIN.  Should be a power of 2 for speed.
 
119
 *
 
120
 * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
 
121
 * behind before we'll send it SIGUSR1.
 
122
 *
 
123
 * WRITE_QUANTUM: the max number of messages to push into the buffer per
 
124
 * iteration of SIInsertDataEntries.  Noncritical but should be less than
 
125
 * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
 
126
 * per iteration.
 
127
 */
 
128
 
 
129
#define MAXNUMMESSAGES 4096
 
130
#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
 
131
#define CLEANUP_MIN (MAXNUMMESSAGES / 2)
 
132
#define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
 
133
#define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
 
134
#define WRITE_QUANTUM 64
 
135
 
 
136
/* Per-backend state in shared invalidation structure */
 
137
typedef struct ProcState
 
138
{
 
139
        /* procPid is zero in an inactive ProcState array entry. */
 
140
        pid_t           procPid;                /* PID of backend, for signaling */
 
141
        /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
 
142
        int                     nextMsgNum;             /* next message number to read */
 
143
        bool            resetState;             /* backend needs to reset its state */
 
144
        bool            signaled;               /* backend has been sent catchup signal */
 
145
 
 
146
        /*
 
147
         * Next LocalTransactionId to use for each idle backend slot.  We keep
 
148
         * this here because it is indexed by BackendId and it is convenient to
 
149
         * copy the value to and from local memory when MyBackendId is set.
 
150
         * It's meaningless in an active ProcState entry.
 
151
         */
 
152
        LocalTransactionId nextLXID;
 
153
} ProcState;
 
154
 
 
155
/* Shared cache invalidation memory segment */
 
156
typedef struct SISeg
 
157
{
 
158
        /*
 
159
         * General state information
 
160
         */
 
161
        int                     minMsgNum;              /* oldest message still needed */
 
162
        int                     maxMsgNum;              /* next message number to be assigned */
 
163
        int                     nextThreshold;  /* # of messages to call SICleanupQueue */
 
164
        int                     lastBackend;    /* index of last active procState entry, +1 */
 
165
        int                     maxBackends;    /* size of procState array */
 
166
 
 
167
        slock_t         msgnumLock;             /* spinlock protecting maxMsgNum */
 
168
 
 
169
        /*
 
170
         * Circular buffer holding shared-inval messages
 
171
         */
 
172
        SharedInvalidationMessage buffer[MAXNUMMESSAGES];
 
173
 
 
174
        /*
 
175
         * Per-backend state info.
 
176
         *
 
177
         * We declare procState as 1 entry because C wants a fixed-size array, but
 
178
         * actually it is maxBackends entries long.
 
179
         */
 
180
        ProcState       procState[1];   /* reflects the invalidation state */
 
181
} SISeg;
 
182
 
 
183
static SISeg *shmInvalBuffer;   /* pointer to the shared inval buffer */
 
184
 
 
185
 
 
186
static LocalTransactionId nextLocalTransactionId;
 
187
 
 
188
static void CleanupInvalidationState(int status, Datum arg);
 
189
 
 
190
 
 
191
/*
 
192
 * SInvalShmemSize --- return shared-memory space needed
 
193
 */
 
194
Size
 
195
SInvalShmemSize(void)
 
196
{
 
197
        Size            size;
 
198
 
 
199
        size = offsetof(SISeg, procState);
 
200
        size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
 
201
 
 
202
        return size;
 
203
}
 
204
 
 
205
/*
 
206
 * SharedInvalBufferInit
 
207
 *              Create and initialize the SI message buffer
 
208
 */
 
209
void
 
210
CreateSharedInvalidationState(void)
 
211
{
 
212
        Size            size;
 
213
        int                     i;
 
214
        bool            found;
 
215
 
 
216
        /* Allocate space in shared memory */
 
217
        size = offsetof(SISeg, procState);
 
218
        size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
 
219
 
 
220
        shmInvalBuffer = (SISeg *)
 
221
                ShmemInitStruct("shmInvalBuffer", size, &found);
 
222
        if (found)
 
223
                return;
 
224
 
 
225
        /* Clear message counters, save size of procState array, init spinlock */
 
226
        shmInvalBuffer->minMsgNum = 0;
 
227
        shmInvalBuffer->maxMsgNum = 0;
 
228
        shmInvalBuffer->nextThreshold = CLEANUP_MIN;
 
229
        shmInvalBuffer->lastBackend = 0;
 
230
        shmInvalBuffer->maxBackends = MaxBackends;
 
231
        SpinLockInit(&shmInvalBuffer->msgnumLock);
 
232
 
 
233
        /* The buffer[] array is initially all unused, so we need not fill it */
 
234
 
 
235
        /* Mark all backends inactive, and initialize nextLXID */
 
236
        for (i = 0; i < shmInvalBuffer->maxBackends; i++)
 
237
        {
 
238
                shmInvalBuffer->procState[i].procPid = 0;                       /* inactive */
 
239
                shmInvalBuffer->procState[i].nextMsgNum = 0;            /* meaningless */
 
240
                shmInvalBuffer->procState[i].resetState = false;
 
241
                shmInvalBuffer->procState[i].signaled = false;
 
242
                shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
 
243
        }
 
244
}
 
245
 
 
246
/*
 
247
 * SharedInvalBackendInit
 
248
 *              Initialize a new backend to operate on the sinval buffer
 
249
 */
 
250
void
 
251
SharedInvalBackendInit(void)
 
252
{
 
253
        int                     index;
 
254
        ProcState  *stateP = NULL;
 
255
        SISeg      *segP = shmInvalBuffer;
 
256
 
 
257
        /*
 
258
         * This can run in parallel with read operations, and for that matter
 
259
         * with write operations; but not in parallel with additions and removals
 
260
         * of backends, nor in parallel with SICleanupQueue.  It doesn't seem
 
261
         * worth having a third lock, so we choose to use SInvalWriteLock to
 
262
         * serialize additions/removals.
 
263
         */
 
264
        LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
 
265
 
 
266
        /* Look for a free entry in the procState array */
 
267
        for (index = 0; index < segP->lastBackend; index++)
 
268
        {
 
269
                if (segP->procState[index].procPid == 0)                /* inactive slot? */
 
270
                {
 
271
                        stateP = &segP->procState[index];
 
272
                        break;
 
273
                }
 
274
        }
 
275
 
 
276
        if (stateP == NULL)
 
277
        {
 
278
                if (segP->lastBackend < segP->maxBackends)
 
279
                {
 
280
                        stateP = &segP->procState[segP->lastBackend];
 
281
                        Assert(stateP->procPid == 0);
 
282
                        segP->lastBackend++;
 
283
                }
 
284
                else
 
285
                {
 
286
                        /*
 
287
                         * out of procState slots: MaxBackends exceeded -- report normally
 
288
                         */
 
289
                        MyBackendId = InvalidBackendId;
 
290
                        LWLockRelease(SInvalWriteLock);
 
291
                        ereport(FATAL,
 
292
                                        (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
 
293
                                         errmsg("sorry, too many clients already")));
 
294
                }
 
295
        }
 
296
 
 
297
        MyBackendId = (stateP - &segP->procState[0]) + 1;
 
298
 
 
299
        /* Advertise assigned backend ID in MyProc */
 
300
        MyProc->backendId = MyBackendId;
 
301
 
 
302
        /* Fetch next local transaction ID into local memory */
 
303
        nextLocalTransactionId = stateP->nextLXID;
 
304
 
 
305
        /* mark myself active, with all extant messages already read */
 
306
        stateP->procPid = MyProcPid;
 
307
        stateP->nextMsgNum = segP->maxMsgNum;
 
308
        stateP->resetState = false;
 
309
        stateP->signaled = false;
 
310
 
 
311
        LWLockRelease(SInvalWriteLock);
 
312
 
 
313
        /* register exit routine to mark my entry inactive at exit */
 
314
        on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
 
315
 
 
316
        elog(DEBUG4, "my backend id is %d", MyBackendId);
 
317
}
 
318
 
 
319
/*
 
320
 * CleanupInvalidationState
 
321
 *              Mark the current backend as no longer active.
 
322
 *
 
323
 * This function is called via on_shmem_exit() during backend shutdown.
 
324
 *
 
325
 * arg is really of type "SISeg*".
 
326
 */
 
327
static void
 
328
CleanupInvalidationState(int status, Datum arg)
 
329
{
 
330
        SISeg      *segP = (SISeg *) DatumGetPointer(arg);
 
331
        ProcState  *stateP;
 
332
        int                     i;
 
333
 
 
334
        Assert(PointerIsValid(segP));
 
335
 
 
336
        LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
 
337
 
 
338
        stateP = &segP->procState[MyBackendId - 1];
 
339
 
 
340
        /* Update next local transaction ID for next holder of this backendID */
 
341
        stateP->nextLXID = nextLocalTransactionId;
 
342
 
 
343
        /* Mark myself inactive */
 
344
        stateP->procPid = 0;
 
345
        stateP->nextMsgNum = 0;
 
346
        stateP->resetState = false;
 
347
        stateP->signaled = false;
 
348
 
 
349
        /* Recompute index of last active backend */
 
350
        for (i = segP->lastBackend; i > 0; i--)
 
351
        {
 
352
                if (segP->procState[i - 1].procPid != 0)
 
353
                        break;
 
354
        }
 
355
        segP->lastBackend = i;
 
356
 
 
357
        LWLockRelease(SInvalWriteLock);
 
358
}
 
359
 
 
360
/*
 
361
 * BackendIdIsActive
 
362
 *              Test if the given backend ID is currently assigned to a process.
 
363
 */
 
364
bool
 
365
BackendIdIsActive(int backendID)
 
366
{
 
367
        bool            result;
 
368
        SISeg      *segP = shmInvalBuffer;
 
369
 
 
370
        /* Need to lock out additions/removals of backends */
 
371
        LWLockAcquire(SInvalWriteLock, LW_SHARED);
 
372
 
 
373
        if (backendID > 0 && backendID <= segP->lastBackend)
 
374
        {
 
375
                ProcState  *stateP = &segP->procState[backendID - 1];
 
376
 
 
377
                result = (stateP->procPid != 0);
 
378
        }
 
379
        else
 
380
                result = false;
 
381
 
 
382
        LWLockRelease(SInvalWriteLock);
 
383
 
 
384
        return result;
 
385
}
 
386
 
 
387
/*
 
388
 * SIInsertDataEntries
 
389
 *              Add new invalidation message(s) to the buffer.
 
390
 */
 
391
void
 
392
SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
 
393
{
 
394
        SISeg      *segP = shmInvalBuffer;
 
395
 
 
396
        /*
 
397
         * N can be arbitrarily large.  We divide the work into groups of no more
 
398
         * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
 
399
         * an unreasonably long time.  (This is not so much because we care about
 
400
         * letting in other writers, as that some just-caught-up backend might be
 
401
         * trying to do SICleanupQueue to pass on its signal, and we don't want it
 
402
         * to have to wait a long time.)  Also, we need to consider calling
 
403
         * SICleanupQueue every so often.
 
404
         */
 
405
        while (n > 0)
 
406
        {
 
407
                int             nthistime = Min(n, WRITE_QUANTUM);
 
408
                int             numMsgs;
 
409
                int             max;
 
410
 
 
411
                n -= nthistime;
 
412
 
 
413
                LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
 
414
 
 
415
                /*
 
416
                 * If the buffer is full, we *must* acquire some space.  Clean the
 
417
                 * queue and reset anyone who is preventing space from being freed.
 
418
                 * Otherwise, clean the queue only when it's exceeded the next
 
419
                 * fullness threshold.  We have to loop and recheck the buffer state
 
420
                 * after any call of SICleanupQueue.
 
421
                 */
 
422
                for (;;)
 
423
                {
 
424
                        numMsgs = segP->maxMsgNum - segP->minMsgNum;
 
425
                        if (numMsgs + nthistime > MAXNUMMESSAGES ||
 
426
                                numMsgs >= segP->nextThreshold)
 
427
                                SICleanupQueue(true, nthistime);
 
428
                        else
 
429
                                break;
 
430
                }
 
431
 
 
432
                /*
 
433
                 * Insert new message(s) into proper slot of circular buffer
 
434
                 */
 
435
                max = segP->maxMsgNum;
 
436
                while (nthistime-- > 0)
 
437
                {
 
438
                        segP->buffer[max % MAXNUMMESSAGES] = *data++;
 
439
                        max++;
 
440
                }
 
441
 
 
442
                /* Update current value of maxMsgNum using spinlock */
 
443
                {
 
444
                        /* use volatile pointer to prevent code rearrangement */
 
445
                        volatile SISeg *vsegP = segP;
 
446
 
 
447
                        SpinLockAcquire(&vsegP->msgnumLock);
 
448
                        vsegP->maxMsgNum = max;
 
449
                        SpinLockRelease(&vsegP->msgnumLock);
 
450
                }
 
451
 
 
452
                LWLockRelease(SInvalWriteLock);
 
453
        }
 
454
}
 
455
 
 
456
/*
 
457
 * SIGetDataEntries
 
458
 *              get next SI message(s) for current backend, if there are any
 
459
 *
 
460
 * Possible return values:
 
461
 *      0:   no SI message available
 
462
 *      n>0: next n SI messages have been extracted into data[]
 
463
 * -1:   SI reset message extracted
 
464
 *
 
465
 * If the return value is less than the array size "datasize", the caller
 
466
 * can assume that there are no more SI messages after the one(s) returned.
 
467
 * Otherwise, another call is needed to collect more messages.
 
468
 *
 
469
 * NB: this can run in parallel with other instances of SIGetDataEntries
 
470
 * executing on behalf of other backends, since each instance will modify only
 
471
 * fields of its own backend's ProcState, and no instance will look at fields
 
472
 * of other backends' ProcStates.  We express this by grabbing SInvalReadLock
 
473
 * in shared mode.  Note that this is not exactly the normal (read-only)
 
474
 * interpretation of a shared lock! Look closely at the interactions before
 
475
 * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
 
476
 *
 
477
 * NB: this can also run in parallel with SIInsertDataEntries.  It is not
 
478
 * guaranteed that we will return any messages added after the routine is
 
479
 * entered.
 
480
 *
 
481
 * Note: we assume that "datasize" is not so large that it might be important
 
482
 * to break our hold on SInvalReadLock into segments.
 
483
 */
 
484
int
 
485
SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
 
486
{
 
487
        SISeg      *segP;
 
488
        ProcState  *stateP;
 
489
        int                     max;
 
490
        int                     n;
 
491
        
 
492
        LWLockAcquire(SInvalReadLock, LW_SHARED);
 
493
 
 
494
        segP = shmInvalBuffer;
 
495
        stateP = &segP->procState[MyBackendId - 1];
 
496
 
 
497
        /* Fetch current value of maxMsgNum using spinlock */
 
498
        {
 
499
                /* use volatile pointer to prevent code rearrangement */
 
500
                volatile SISeg *vsegP = segP;
 
501
 
 
502
                SpinLockAcquire(&vsegP->msgnumLock);
 
503
                max = vsegP->maxMsgNum;
 
504
                SpinLockRelease(&vsegP->msgnumLock);
 
505
        }
 
506
 
 
507
        if (stateP->resetState)
 
508
        {
 
509
                /*
 
510
                 * Force reset.  We can say we have dealt with any messages added
 
511
                 * since the reset, as well; and that means we should clear the
 
512
                 * signaled flag, too.
 
513
                 */
 
514
                stateP->nextMsgNum = max;
 
515
                stateP->resetState = false;
 
516
                stateP->signaled = false;
 
517
                LWLockRelease(SInvalReadLock);
 
518
                return -1;
 
519
        }
 
520
 
 
521
        /*
 
522
         * Retrieve messages and advance backend's counter, until data array is
 
523
         * full or there are no more messages.
 
524
         *
 
525
         * There may be other backends that haven't read the message(s), so we
 
526
         * cannot delete them here.  SICleanupQueue() will eventually remove them
 
527
         * from the queue.
 
528
         */
 
529
        n = 0;
 
530
        while (n < datasize && stateP->nextMsgNum < max)
 
531
        {
 
532
                data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
 
533
                stateP->nextMsgNum++;
 
534
        }
 
535
 
 
536
        /*
 
537
         * Reset our "signaled" flag whenever we have caught up completely.
 
538
         */
 
539
        if (stateP->nextMsgNum >= max)
 
540
                stateP->signaled = false;
 
541
 
 
542
        LWLockRelease(SInvalReadLock);
 
543
        return n;
 
544
}
 
545
 
 
546
/*
 
547
 * SICleanupQueue
 
548
 *              Remove messages that have been consumed by all active backends
 
549
 *
 
550
 * callerHasWriteLock is TRUE if caller is holding SInvalWriteLock.
 
551
 * minFree is the minimum number of message slots to make free.
 
552
 *
 
553
 * Possible side effects of this routine include marking one or more
 
554
 * backends as "reset" in the array, and sending a catchup interrupt (SIGUSR1)
 
555
 * to some backend that seems to be getting too far behind.  We signal at
 
556
 * most one backend at a time, for reasons explained at the top of the file.
 
557
 *
 
558
 * Caution: because we transiently release write lock when we have to signal
 
559
 * some other backend, it is NOT guaranteed that there are still minFree
 
560
 * free message slots at exit.  Caller must recheck and perhaps retry.
 
561
 */
 
562
void
 
563
SICleanupQueue(bool callerHasWriteLock, int minFree)
 
564
{
 
565
        SISeg      *segP = shmInvalBuffer;
 
566
        int                     min,
 
567
                                minsig,
 
568
                                lowbound,
 
569
                                numMsgs,
 
570
                                i;
 
571
        ProcState  *needSig = NULL;
 
572
 
 
573
        /* Lock out all writers and readers */
 
574
        if (!callerHasWriteLock)
 
575
                LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
 
576
        LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
 
577
 
 
578
        /*
 
579
         * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify
 
580
         * the furthest-back backend that needs signaling (if any), and reset
 
581
         * any backends that are too far back.
 
582
         */
 
583
        min = segP->maxMsgNum;
 
584
        minsig = min - SIG_THRESHOLD;
 
585
        lowbound = min - MAXNUMMESSAGES + minFree;
 
586
 
 
587
        for (i = 0; i < segP->lastBackend; i++)
 
588
        {
 
589
                ProcState  *stateP = &segP->procState[i];
 
590
                int             n = stateP->nextMsgNum;
 
591
 
 
592
                /* Ignore if inactive or already in reset state */
 
593
                if (stateP->procPid == 0 || stateP->resetState)
 
594
                        continue;
 
595
 
 
596
                /*
 
597
                 * If we must free some space and this backend is preventing it,
 
598
                 * force him into reset state and then ignore until he catches up.
 
599
                 */
 
600
                if (n < lowbound)
 
601
                {
 
602
                        stateP->resetState = true;
 
603
                        /* no point in signaling him ... */
 
604
                        continue;
 
605
                }
 
606
 
 
607
                /* Track the global minimum nextMsgNum */
 
608
                if (n < min)
 
609
                        min = n;
 
610
 
 
611
                /* Also see who's furthest back of the unsignaled backends */
 
612
                if (n < minsig && !stateP->signaled)
 
613
                {
 
614
                        minsig = n;
 
615
                        needSig = stateP;
 
616
                }
 
617
        }
 
618
        segP->minMsgNum = min;
 
619
 
 
620
        /*
 
621
         * When minMsgNum gets really large, decrement all message counters so as
 
622
         * to forestall overflow of the counters.  This happens seldom enough
 
623
         * that folding it into the previous loop would be a loser.
 
624
         */
 
625
        if (min >= MSGNUMWRAPAROUND)
 
626
        {
 
627
                segP->minMsgNum -= MSGNUMWRAPAROUND;
 
628
                segP->maxMsgNum -= MSGNUMWRAPAROUND;
 
629
                for (i = 0; i < segP->lastBackend; i++)
 
630
                {
 
631
                        /* we don't bother skipping inactive entries here */
 
632
                        segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
 
633
                }
 
634
        }
 
635
 
 
636
        /*
 
637
         * Determine how many messages are still in the queue, and set the
 
638
         * threshold at which we should repeat SICleanupQueue().
 
639
         */
 
640
        numMsgs = segP->maxMsgNum - segP->minMsgNum;
 
641
        if (numMsgs < CLEANUP_MIN)
 
642
                segP->nextThreshold = CLEANUP_MIN;
 
643
        else
 
644
                segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
 
645
 
 
646
        /*
 
647
         * Lastly, signal anyone who needs a catchup interrupt.  Since kill()
 
648
         * might not be fast, we don't want to hold locks while executing it.
 
649
         */
 
650
        if (needSig)
 
651
        {
 
652
                pid_t   his_pid = needSig->procPid;
 
653
 
 
654
                needSig->signaled = true;
 
655
                LWLockRelease(SInvalReadLock);
 
656
                LWLockRelease(SInvalWriteLock);
 
657
                elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
 
658
                kill(his_pid, SIGUSR1);
 
659
                if (callerHasWriteLock)
 
660
                        LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
 
661
        }
 
662
        else
 
663
        {
 
664
                LWLockRelease(SInvalReadLock);
 
665
                if (!callerHasWriteLock)
 
666
                        LWLockRelease(SInvalWriteLock);
 
667
        }
 
668
}
 
669
 
 
670
 
 
671
/*
 
672
 * GetNextLocalTransactionId --- allocate a new LocalTransactionId
 
673
 *
 
674
 * We split VirtualTransactionIds into two parts so that it is possible
 
675
 * to allocate a new one without any contention for shared memory, except
 
676
 * for a bit of additional overhead during backend startup/shutdown.
 
677
 * The high-order part of a VirtualTransactionId is a BackendId, and the
 
678
 * low-order part is a LocalTransactionId, which we assign from a local
 
679
 * counter.  To avoid the risk of a VirtualTransactionId being reused
 
680
 * within a short interval, successive procs occupying the same backend ID
 
681
 * slot should use a consecutive sequence of local IDs, which is implemented
 
682
 * by copying nextLocalTransactionId as seen above.
 
683
 */
 
684
LocalTransactionId
 
685
GetNextLocalTransactionId(void)
 
686
{
 
687
        LocalTransactionId result;
 
688
 
 
689
        /* loop to avoid returning InvalidLocalTransactionId at wraparound */
 
690
        do
 
691
        {
 
692
                result = nextLocalTransactionId++;
 
693
        } while (!LocalTransactionIdIsValid(result));
 
694
 
 
695
        return result;
 
696
}