~ubuntu-branches/ubuntu/precise/postgresql-9.1/precise-security

« back to all changes in this revision

Viewing changes to src/backend/storage/lmgr/lock.c

  • Committer: Bazaar Package Importer
  • Author(s): Martin Pitt
  • Date: 2011-05-11 10:41:53 UTC
  • Revision ID: james.westby@ubuntu.com-20110511104153-psbh2o58553fv1m0
Tags: upstream-9.1~beta1
ImportĀ upstreamĀ versionĀ 9.1~beta1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * lock.c
 
4
 *        POSTGRES primary lock mechanism
 
5
 *
 
6
 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
 
7
 * Portions Copyright (c) 1994, Regents of the University of California
 
8
 *
 
9
 *
 
10
 * IDENTIFICATION
 
11
 *        src/backend/storage/lmgr/lock.c
 
12
 *
 
13
 * NOTES
 
14
 *        A lock table is a shared memory hash table.  When
 
15
 *        a process tries to acquire a lock of a type that conflicts
 
16
 *        with existing locks, it is put to sleep using the routines
 
17
 *        in storage/lmgr/proc.c.
 
18
 *
 
19
 *        For the most part, this code should be invoked via lmgr.c
 
20
 *        or another lock-management module, not directly.
 
21
 *
 
22
 *      Interface:
 
23
 *
 
24
 *      InitLocks(), GetLocksMethodTable(),
 
25
 *      LockAcquire(), LockRelease(), LockReleaseAll(),
 
26
 *      LockCheckConflicts(), GrantLock()
 
27
 *
 
28
 *-------------------------------------------------------------------------
 
29
 */
 
30
#include "postgres.h"
 
31
 
 
32
#include <signal.h>
 
33
#include <unistd.h>
 
34
 
 
35
#include "access/transam.h"
 
36
#include "access/twophase.h"
 
37
#include "access/twophase_rmgr.h"
 
38
#include "miscadmin.h"
 
39
#include "pg_trace.h"
 
40
#include "pgstat.h"
 
41
#include "storage/standby.h"
 
42
#include "utils/memutils.h"
 
43
#include "utils/ps_status.h"
 
44
#include "utils/resowner.h"
 
45
 
 
46
 
 
47
/* This configuration variable is used to set the lock table size */
 
48
int                     max_locks_per_xact; /* set by guc.c */
 
49
 
 
50
#define NLOCKENTS() \
 
51
        mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
 
52
 
 
53
 
 
54
/*
 
55
 * Data structures defining the semantics of the standard lock methods.
 
56
 *
 
57
 * The conflict table defines the semantics of the various lock modes.
 
58
 */
 
59
static const LOCKMASK LockConflicts[] = {
 
60
        0,
 
61
 
 
62
        /* AccessShareLock */
 
63
        (1 << AccessExclusiveLock),
 
64
 
 
65
        /* RowShareLock */
 
66
        (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
 
67
 
 
68
        /* RowExclusiveLock */
 
69
        (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
 
70
        (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
 
71
 
 
72
        /* ShareUpdateExclusiveLock */
 
73
        (1 << ShareUpdateExclusiveLock) |
 
74
        (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
 
75
        (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
 
76
 
 
77
        /* ShareLock */
 
78
        (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
 
79
        (1 << ShareRowExclusiveLock) |
 
80
        (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
 
81
 
 
82
        /* ShareRowExclusiveLock */
 
83
        (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
 
84
        (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
 
85
        (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
 
86
 
 
87
        /* ExclusiveLock */
 
88
        (1 << RowShareLock) |
 
89
        (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
 
90
        (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
 
91
        (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
 
92
 
 
93
        /* AccessExclusiveLock */
 
94
        (1 << AccessShareLock) | (1 << RowShareLock) |
 
95
        (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
 
96
        (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
 
97
        (1 << ExclusiveLock) | (1 << AccessExclusiveLock)
 
98
 
 
99
};
 
100
 
 
101
/* Names of lock modes, for debug printouts */
 
102
static const char *const lock_mode_names[] =
 
103
{
 
104
        "INVALID",
 
105
        "AccessShareLock",
 
106
        "RowShareLock",
 
107
        "RowExclusiveLock",
 
108
        "ShareUpdateExclusiveLock",
 
109
        "ShareLock",
 
110
        "ShareRowExclusiveLock",
 
111
        "ExclusiveLock",
 
112
        "AccessExclusiveLock"
 
113
};
 
114
 
 
115
#ifndef LOCK_DEBUG
 
116
static bool Dummy_trace = false;
 
117
#endif
 
118
 
 
119
static const LockMethodData default_lockmethod = {
 
120
        AccessExclusiveLock,            /* highest valid lock mode number */
 
121
        true,
 
122
        LockConflicts,
 
123
        lock_mode_names,
 
124
#ifdef LOCK_DEBUG
 
125
        &Trace_locks
 
126
#else
 
127
        &Dummy_trace
 
128
#endif
 
129
};
 
130
 
 
131
static const LockMethodData user_lockmethod = {
 
132
        AccessExclusiveLock,            /* highest valid lock mode number */
 
133
        true,
 
134
        LockConflicts,
 
135
        lock_mode_names,
 
136
#ifdef LOCK_DEBUG
 
137
        &Trace_userlocks
 
138
#else
 
139
        &Dummy_trace
 
140
#endif
 
141
};
 
142
 
 
143
/*
 
144
 * map from lock method id to the lock table data structures
 
145
 */
 
146
static const LockMethod LockMethods[] = {
 
147
        NULL,
 
148
        &default_lockmethod,
 
149
        &user_lockmethod
 
150
};
 
151
 
 
152
 
 
153
/* Record that's written to 2PC state file when a lock is persisted */
 
154
typedef struct TwoPhaseLockRecord
 
155
{
 
156
        LOCKTAG         locktag;
 
157
        LOCKMODE        lockmode;
 
158
} TwoPhaseLockRecord;
 
159
 
 
160
 
 
161
/*
 
162
 * Pointers to hash tables containing lock state
 
163
 *
 
164
 * The LockMethodLockHash and LockMethodProcLockHash hash tables are in
 
165
 * shared memory; LockMethodLocalHash is local to each backend.
 
166
 */
 
167
static HTAB *LockMethodLockHash;
 
168
static HTAB *LockMethodProcLockHash;
 
169
static HTAB *LockMethodLocalHash;
 
170
 
 
171
 
 
172
/* private state for GrantAwaitedLock */
 
173
static LOCALLOCK *awaitedLock;
 
174
static ResourceOwner awaitedOwner;
 
175
 
 
176
 
 
177
#ifdef LOCK_DEBUG
 
178
 
 
179
/*------
 
180
 * The following configuration options are available for lock debugging:
 
181
 *
 
182
 *         TRACE_LOCKS          -- give a bunch of output what's going on in this file
 
183
 *         TRACE_USERLOCKS      -- same but for user locks
 
184
 *         TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
 
185
 *                                                 (use to avoid output on system tables)
 
186
 *         TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
 
187
 *         DEBUG_DEADLOCKS      -- currently dumps locks at untimely occasions ;)
 
188
 *
 
189
 * Furthermore, but in storage/lmgr/lwlock.c:
 
190
 *         TRACE_LWLOCKS        -- trace lightweight locks (pretty useless)
 
191
 *
 
192
 * Define LOCK_DEBUG at compile time to get all these enabled.
 
193
 * --------
 
194
 */
 
195
 
 
196
int                     Trace_lock_oidmin = FirstNormalObjectId;
 
197
bool            Trace_locks = false;
 
198
bool            Trace_userlocks = false;
 
199
int                     Trace_lock_table = 0;
 
200
bool            Debug_deadlocks = false;
 
201
 
 
202
 
 
203
inline static bool
 
204
LOCK_DEBUG_ENABLED(const LOCKTAG *tag)
 
205
{
 
206
        return
 
207
                (*(LockMethods[tag->locktag_lockmethodid]->trace_flag) &&
 
208
                 ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin))
 
209
                || (Trace_lock_table &&
 
210
                        (tag->locktag_field2 == Trace_lock_table));
 
211
}
 
212
 
 
213
 
 
214
inline static void
 
215
LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
 
216
{
 
217
        if (LOCK_DEBUG_ENABLED(&lock->tag))
 
218
                elog(LOG,
 
219
                         "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
 
220
                         "req(%d,%d,%d,%d,%d,%d,%d)=%d "
 
221
                         "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
 
222
                         where, lock,
 
223
                         lock->tag.locktag_field1, lock->tag.locktag_field2,
 
224
                         lock->tag.locktag_field3, lock->tag.locktag_field4,
 
225
                         lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
 
226
                         lock->grantMask,
 
227
                         lock->requested[1], lock->requested[2], lock->requested[3],
 
228
                         lock->requested[4], lock->requested[5], lock->requested[6],
 
229
                         lock->requested[7], lock->nRequested,
 
230
                         lock->granted[1], lock->granted[2], lock->granted[3],
 
231
                         lock->granted[4], lock->granted[5], lock->granted[6],
 
232
                         lock->granted[7], lock->nGranted,
 
233
                         lock->waitProcs.size,
 
234
                         LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
 
235
}
 
236
 
 
237
 
 
238
inline static void
 
239
PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
 
240
{
 
241
        if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag))
 
242
                elog(LOG,
 
243
                         "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)",
 
244
                         where, proclockP, proclockP->tag.myLock,
 
245
                         PROCLOCK_LOCKMETHOD(*(proclockP)),
 
246
                         proclockP->tag.myProc, (int) proclockP->holdMask);
 
247
}
 
248
#else                                                   /* not LOCK_DEBUG */
 
249
 
 
250
#define LOCK_PRINT(where, lock, type)
 
251
#define PROCLOCK_PRINT(where, proclockP)
 
252
#endif   /* not LOCK_DEBUG */
 
253
 
 
254
 
 
255
static uint32 proclock_hash(const void *key, Size keysize);
 
256
static void RemoveLocalLock(LOCALLOCK *locallock);
 
257
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
 
258
static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
 
259
static void ReleaseLockForOwner(LOCALLOCK *locallock, ResourceOwner owner);
 
260
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
 
261
                        PROCLOCK *proclock, LockMethod lockMethodTable);
 
262
static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
 
263
                        LockMethod lockMethodTable, uint32 hashcode,
 
264
                        bool wakeupNeeded);
 
265
 
 
266
 
 
267
/*
 
268
 * InitLocks -- Initialize the lock manager's data structures.
 
269
 *
 
270
 * This is called from CreateSharedMemoryAndSemaphores(), which see for
 
271
 * more comments.  In the normal postmaster case, the shared hash tables
 
272
 * are created here, as well as a locallock hash table that will remain
 
273
 * unused and empty in the postmaster itself.  Backends inherit the pointers
 
274
 * to the shared tables via fork(), and also inherit an image of the locallock
 
275
 * hash table, which they proceed to use.  In the EXEC_BACKEND case, each
 
276
 * backend re-executes this code to obtain pointers to the already existing
 
277
 * shared hash tables and to create its locallock hash table.
 
278
 */
 
279
void
 
280
InitLocks(void)
 
281
{
 
282
        HASHCTL         info;
 
283
        int                     hash_flags;
 
284
        long            init_table_size,
 
285
                                max_table_size;
 
286
 
 
287
        /*
 
288
         * Compute init/max size to request for lock hashtables.  Note these
 
289
         * calculations must agree with LockShmemSize!
 
290
         */
 
291
        max_table_size = NLOCKENTS();
 
292
        init_table_size = max_table_size / 2;
 
293
 
 
294
        /*
 
295
         * Allocate hash table for LOCK structs.  This stores per-locked-object
 
296
         * information.
 
297
         */
 
298
        MemSet(&info, 0, sizeof(info));
 
299
        info.keysize = sizeof(LOCKTAG);
 
300
        info.entrysize = sizeof(LOCK);
 
301
        info.hash = tag_hash;
 
302
        info.num_partitions = NUM_LOCK_PARTITIONS;
 
303
        hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
 
304
 
 
305
        LockMethodLockHash = ShmemInitHash("LOCK hash",
 
306
                                                                           init_table_size,
 
307
                                                                           max_table_size,
 
308
                                                                           &info,
 
309
                                                                           hash_flags);
 
310
 
 
311
        /* Assume an average of 2 holders per lock */
 
312
        max_table_size *= 2;
 
313
        init_table_size *= 2;
 
314
 
 
315
        /*
 
316
         * Allocate hash table for PROCLOCK structs.  This stores
 
317
         * per-lock-per-holder information.
 
318
         */
 
319
        info.keysize = sizeof(PROCLOCKTAG);
 
320
        info.entrysize = sizeof(PROCLOCK);
 
321
        info.hash = proclock_hash;
 
322
        info.num_partitions = NUM_LOCK_PARTITIONS;
 
323
        hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
 
324
 
 
325
        LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash",
 
326
                                                                                   init_table_size,
 
327
                                                                                   max_table_size,
 
328
                                                                                   &info,
 
329
                                                                                   hash_flags);
 
330
 
 
331
        /*
 
332
         * Allocate non-shared hash table for LOCALLOCK structs.  This stores lock
 
333
         * counts and resource owner information.
 
334
         *
 
335
         * The non-shared table could already exist in this process (this occurs
 
336
         * when the postmaster is recreating shared memory after a backend crash).
 
337
         * If so, delete and recreate it.  (We could simply leave it, since it
 
338
         * ought to be empty in the postmaster, but for safety let's zap it.)
 
339
         */
 
340
        if (LockMethodLocalHash)
 
341
                hash_destroy(LockMethodLocalHash);
 
342
 
 
343
        info.keysize = sizeof(LOCALLOCKTAG);
 
344
        info.entrysize = sizeof(LOCALLOCK);
 
345
        info.hash = tag_hash;
 
346
        hash_flags = (HASH_ELEM | HASH_FUNCTION);
 
347
 
 
348
        LockMethodLocalHash = hash_create("LOCALLOCK hash",
 
349
                                                                          16,
 
350
                                                                          &info,
 
351
                                                                          hash_flags);
 
352
}
 
353
 
 
354
 
 
355
/*
 
356
 * Fetch the lock method table associated with a given lock
 
357
 */
 
358
LockMethod
 
359
GetLocksMethodTable(const LOCK *lock)
 
360
{
 
361
        LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
 
362
 
 
363
        Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
 
364
        return LockMethods[lockmethodid];
 
365
}
 
366
 
 
367
 
 
368
/*
 
369
 * Compute the hash code associated with a LOCKTAG.
 
370
 *
 
371
 * To avoid unnecessary recomputations of the hash code, we try to do this
 
372
 * just once per function, and then pass it around as needed.  Aside from
 
373
 * passing the hashcode to hash_search_with_hash_value(), we can extract
 
374
 * the lock partition number from the hashcode.
 
375
 */
 
376
uint32
 
377
LockTagHashCode(const LOCKTAG *locktag)
 
378
{
 
379
        return get_hash_value(LockMethodLockHash, (const void *) locktag);
 
380
}
 
381
 
 
382
/*
 
383
 * Compute the hash code associated with a PROCLOCKTAG.
 
384
 *
 
385
 * Because we want to use just one set of partition locks for both the
 
386
 * LOCK and PROCLOCK hash tables, we have to make sure that PROCLOCKs
 
387
 * fall into the same partition number as their associated LOCKs.
 
388
 * dynahash.c expects the partition number to be the low-order bits of
 
389
 * the hash code, and therefore a PROCLOCKTAG's hash code must have the
 
390
 * same low-order bits as the associated LOCKTAG's hash code.  We achieve
 
391
 * this with this specialized hash function.
 
392
 */
 
393
static uint32
 
394
proclock_hash(const void *key, Size keysize)
 
395
{
 
396
        const PROCLOCKTAG *proclocktag = (const PROCLOCKTAG *) key;
 
397
        uint32          lockhash;
 
398
        Datum           procptr;
 
399
 
 
400
        Assert(keysize == sizeof(PROCLOCKTAG));
 
401
 
 
402
        /* Look into the associated LOCK object, and compute its hash code */
 
403
        lockhash = LockTagHashCode(&proclocktag->myLock->tag);
 
404
 
 
405
        /*
 
406
         * To make the hash code also depend on the PGPROC, we xor the proc
 
407
         * struct's address into the hash code, left-shifted so that the
 
408
         * partition-number bits don't change.  Since this is only a hash, we
 
409
         * don't care if we lose high-order bits of the address; use an
 
410
         * intermediate variable to suppress cast-pointer-to-int warnings.
 
411
         */
 
412
        procptr = PointerGetDatum(proclocktag->myProc);
 
413
        lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
 
414
 
 
415
        return lockhash;
 
416
}
 
417
 
 
418
/*
 
419
 * Compute the hash code associated with a PROCLOCKTAG, given the hashcode
 
420
 * for its underlying LOCK.
 
421
 *
 
422
 * We use this just to avoid redundant calls of LockTagHashCode().
 
423
 */
 
424
static inline uint32
 
425
ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
 
426
{
 
427
        uint32          lockhash = hashcode;
 
428
        Datum           procptr;
 
429
 
 
430
        /*
 
431
         * This must match proclock_hash()!
 
432
         */
 
433
        procptr = PointerGetDatum(proclocktag->myProc);
 
434
        lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
 
435
 
 
436
        return lockhash;
 
437
}
 
438
 
 
439
 
 
440
/*
 
441
 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
 
442
 *              set lock if/when no conflicts.
 
443
 *
 
444
 * Inputs:
 
445
 *      locktag: unique identifier for the lockable object
 
446
 *      lockmode: lock mode to acquire
 
447
 *      sessionLock: if true, acquire lock for session not current transaction
 
448
 *      dontWait: if true, don't wait to acquire lock
 
449
 *
 
450
 * Returns one of:
 
451
 *              LOCKACQUIRE_NOT_AVAIL           lock not available, and dontWait=true
 
452
 *              LOCKACQUIRE_OK                          lock successfully acquired
 
453
 *              LOCKACQUIRE_ALREADY_HELD        incremented count for lock already held
 
454
 *
 
455
 * In the normal case where dontWait=false and the caller doesn't need to
 
456
 * distinguish a freshly acquired lock from one already taken earlier in
 
457
 * this same transaction, there is no need to examine the return value.
 
458
 *
 
459
 * Side Effects: The lock is acquired and recorded in lock tables.
 
460
 *
 
461
 * NOTE: if we wait for the lock, there is no way to abort the wait
 
462
 * short of aborting the transaction.
 
463
 */
 
464
LockAcquireResult
 
465
LockAcquire(const LOCKTAG *locktag,
 
466
                        LOCKMODE lockmode,
 
467
                        bool sessionLock,
 
468
                        bool dontWait)
 
469
{
 
470
        return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
 
471
}
 
472
 
 
473
/*
 
474
 * LockAcquireExtended - allows us to specify additional options
 
475
 *
 
476
 * reportMemoryError specifies whether a lock request that fills the
 
477
 * lock table should generate an ERROR or not. This allows a priority
 
478
 * caller to note that the lock table is full and then begin taking
 
479
 * extreme action to reduce the number of other lock holders before
 
480
 * retrying the action.
 
481
 */
 
482
LockAcquireResult
 
483
LockAcquireExtended(const LOCKTAG *locktag,
 
484
                                        LOCKMODE lockmode,
 
485
                                        bool sessionLock,
 
486
                                        bool dontWait,
 
487
                                        bool reportMemoryError)
 
488
{
 
489
        LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
 
490
        LockMethod      lockMethodTable;
 
491
        LOCALLOCKTAG localtag;
 
492
        LOCALLOCK  *locallock;
 
493
        LOCK       *lock;
 
494
        PROCLOCK   *proclock;
 
495
        PROCLOCKTAG proclocktag;
 
496
        bool            found;
 
497
        ResourceOwner owner;
 
498
        uint32          hashcode;
 
499
        uint32          proclock_hashcode;
 
500
        int                     partition;
 
501
        LWLockId        partitionLock;
 
502
        int                     status;
 
503
        bool            log_lock = false;
 
504
 
 
505
        if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
 
506
                elog(ERROR, "unrecognized lock method: %d", lockmethodid);
 
507
        lockMethodTable = LockMethods[lockmethodid];
 
508
        if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
 
509
                elog(ERROR, "unrecognized lock mode: %d", lockmode);
 
510
 
 
511
        if (RecoveryInProgress() && !InRecovery &&
 
512
                (locktag->locktag_type == LOCKTAG_OBJECT ||
 
513
                 locktag->locktag_type == LOCKTAG_RELATION) &&
 
514
                lockmode > RowExclusiveLock)
 
515
                ereport(ERROR,
 
516
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 
517
                                 errmsg("cannot acquire lock mode %s on database objects while recovery is in progress",
 
518
                                                lockMethodTable->lockModeNames[lockmode]),
 
519
                                 errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery.")));
 
520
 
 
521
#ifdef LOCK_DEBUG
 
522
        if (LOCK_DEBUG_ENABLED(locktag))
 
523
                elog(LOG, "LockAcquire: lock [%u,%u] %s",
 
524
                         locktag->locktag_field1, locktag->locktag_field2,
 
525
                         lockMethodTable->lockModeNames[lockmode]);
 
526
#endif
 
527
 
 
528
        /* Session locks are never transactional, else check table */
 
529
        if (!sessionLock && lockMethodTable->transactional)
 
530
                owner = CurrentResourceOwner;
 
531
        else
 
532
                owner = NULL;
 
533
 
 
534
        /*
 
535
         * Find or create a LOCALLOCK entry for this lock and lockmode
 
536
         */
 
537
        MemSet(&localtag, 0, sizeof(localtag));         /* must clear padding */
 
538
        localtag.lock = *locktag;
 
539
        localtag.mode = lockmode;
 
540
 
 
541
        locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
 
542
                                                                                  (void *) &localtag,
 
543
                                                                                  HASH_ENTER, &found);
 
544
 
 
545
        /*
 
546
         * if it's a new locallock object, initialize it
 
547
         */
 
548
        if (!found)
 
549
        {
 
550
                locallock->lock = NULL;
 
551
                locallock->proclock = NULL;
 
552
                locallock->hashcode = LockTagHashCode(&(localtag.lock));
 
553
                locallock->nLocks = 0;
 
554
                locallock->numLockOwners = 0;
 
555
                locallock->maxLockOwners = 8;
 
556
                locallock->lockOwners = NULL;
 
557
                locallock->lockOwners = (LOCALLOCKOWNER *)
 
558
                        MemoryContextAlloc(TopMemoryContext,
 
559
                                                  locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
 
560
        }
 
561
        else
 
562
        {
 
563
                /* Make sure there will be room to remember the lock */
 
564
                if (locallock->numLockOwners >= locallock->maxLockOwners)
 
565
                {
 
566
                        int                     newsize = locallock->maxLockOwners * 2;
 
567
 
 
568
                        locallock->lockOwners = (LOCALLOCKOWNER *)
 
569
                                repalloc(locallock->lockOwners,
 
570
                                                 newsize * sizeof(LOCALLOCKOWNER));
 
571
                        locallock->maxLockOwners = newsize;
 
572
                }
 
573
        }
 
574
 
 
575
        /*
 
576
         * If we already hold the lock, we can just increase the count locally.
 
577
         */
 
578
        if (locallock->nLocks > 0)
 
579
        {
 
580
                GrantLockLocal(locallock, owner);
 
581
                return LOCKACQUIRE_ALREADY_HELD;
 
582
        }
 
583
 
 
584
        /*
 
585
         * Emit a WAL record if acquisition of this lock needs to be replayed in a
 
586
         * standby server. Only AccessExclusiveLocks can conflict with lock types
 
587
         * that read-only transactions can acquire in a standby server.
 
588
         *
 
589
         * Make sure this definition matches the one in
 
590
         * GetRunningTransactionLocks().
 
591
         *
 
592
         * First we prepare to log, then after lock acquired we issue log record.
 
593
         */
 
594
        if (lockmode >= AccessExclusiveLock &&
 
595
                locktag->locktag_type == LOCKTAG_RELATION &&
 
596
                !RecoveryInProgress() &&
 
597
                XLogStandbyInfoActive())
 
598
        {
 
599
                LogAccessExclusiveLockPrepare();
 
600
                log_lock = true;
 
601
        }
 
602
 
 
603
        /*
 
604
         * Otherwise we've got to mess with the shared lock table.
 
605
         */
 
606
        hashcode = locallock->hashcode;
 
607
        partition = LockHashPartition(hashcode);
 
608
        partitionLock = LockHashPartitionLock(hashcode);
 
609
 
 
610
        LWLockAcquire(partitionLock, LW_EXCLUSIVE);
 
611
 
 
612
        /*
 
613
         * Find or create a lock with this tag.
 
614
         *
 
615
         * Note: if the locallock object already existed, it might have a pointer
 
616
         * to the lock already ... but we probably should not assume that that
 
617
         * pointer is valid, since a lock object with no locks can go away
 
618
         * anytime.
 
619
         */
 
620
        lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
 
621
                                                                                                (void *) locktag,
 
622
                                                                                                hashcode,
 
623
                                                                                                HASH_ENTER_NULL,
 
624
                                                                                                &found);
 
625
        if (!lock)
 
626
        {
 
627
                LWLockRelease(partitionLock);
 
628
                if (reportMemoryError)
 
629
                        ereport(ERROR,
 
630
                                        (errcode(ERRCODE_OUT_OF_MEMORY),
 
631
                                         errmsg("out of shared memory"),
 
632
                                         errhint("You might need to increase max_locks_per_transaction.")));
 
633
                else
 
634
                        return LOCKACQUIRE_NOT_AVAIL;
 
635
        }
 
636
        locallock->lock = lock;
 
637
 
 
638
        /*
 
639
         * if it's a new lock object, initialize it
 
640
         */
 
641
        if (!found)
 
642
        {
 
643
                lock->grantMask = 0;
 
644
                lock->waitMask = 0;
 
645
                SHMQueueInit(&(lock->procLocks));
 
646
                ProcQueueInit(&(lock->waitProcs));
 
647
                lock->nRequested = 0;
 
648
                lock->nGranted = 0;
 
649
                MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
 
650
                MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
 
651
                LOCK_PRINT("LockAcquire: new", lock, lockmode);
 
652
        }
 
653
        else
 
654
        {
 
655
                LOCK_PRINT("LockAcquire: found", lock, lockmode);
 
656
                Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
 
657
                Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
 
658
                Assert(lock->nGranted <= lock->nRequested);
 
659
        }
 
660
 
 
661
        /*
 
662
         * Create the hash key for the proclock table.
 
663
         */
 
664
        proclocktag.myLock = lock;
 
665
        proclocktag.myProc = MyProc;
 
666
 
 
667
        proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
 
668
 
 
669
        /*
 
670
         * Find or create a proclock entry with this tag
 
671
         */
 
672
        proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
 
673
                                                                                                                (void *) &proclocktag,
 
674
                                                                                                                proclock_hashcode,
 
675
                                                                                                                HASH_ENTER_NULL,
 
676
                                                                                                                &found);
 
677
        if (!proclock)
 
678
        {
 
679
                /* Ooops, not enough shmem for the proclock */
 
680
                if (lock->nRequested == 0)
 
681
                {
 
682
                        /*
 
683
                         * There are no other requestors of this lock, so garbage-collect
 
684
                         * the lock object.  We *must* do this to avoid a permanent leak
 
685
                         * of shared memory, because there won't be anything to cause
 
686
                         * anyone to release the lock object later.
 
687
                         */
 
688
                        Assert(SHMQueueEmpty(&(lock->procLocks)));
 
689
                        if (!hash_search_with_hash_value(LockMethodLockHash,
 
690
                                                                                         (void *) &(lock->tag),
 
691
                                                                                         hashcode,
 
692
                                                                                         HASH_REMOVE,
 
693
                                                                                         NULL))
 
694
                                elog(PANIC, "lock table corrupted");
 
695
                }
 
696
                LWLockRelease(partitionLock);
 
697
                if (reportMemoryError)
 
698
                        ereport(ERROR,
 
699
                                        (errcode(ERRCODE_OUT_OF_MEMORY),
 
700
                                         errmsg("out of shared memory"),
 
701
                                         errhint("You might need to increase max_locks_per_transaction.")));
 
702
                else
 
703
                        return LOCKACQUIRE_NOT_AVAIL;
 
704
        }
 
705
        locallock->proclock = proclock;
 
706
 
 
707
        /*
 
708
         * If new, initialize the new entry
 
709
         */
 
710
        if (!found)
 
711
        {
 
712
                proclock->holdMask = 0;
 
713
                proclock->releaseMask = 0;
 
714
                /* Add proclock to appropriate lists */
 
715
                SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
 
716
                SHMQueueInsertBefore(&(MyProc->myProcLocks[partition]),
 
717
                                                         &proclock->procLink);
 
718
                PROCLOCK_PRINT("LockAcquire: new", proclock);
 
719
        }
 
720
        else
 
721
        {
 
722
                PROCLOCK_PRINT("LockAcquire: found", proclock);
 
723
                Assert((proclock->holdMask & ~lock->grantMask) == 0);
 
724
 
 
725
#ifdef CHECK_DEADLOCK_RISK
 
726
 
 
727
                /*
 
728
                 * Issue warning if we already hold a lower-level lock on this object
 
729
                 * and do not hold a lock of the requested level or higher. This
 
730
                 * indicates a deadlock-prone coding practice (eg, we'd have a
 
731
                 * deadlock if another backend were following the same code path at
 
732
                 * about the same time).
 
733
                 *
 
734
                 * This is not enabled by default, because it may generate log entries
 
735
                 * about user-level coding practices that are in fact safe in context.
 
736
                 * It can be enabled to help find system-level problems.
 
737
                 *
 
738
                 * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
 
739
                 * better to use a table.  For now, though, this works.
 
740
                 */
 
741
                {
 
742
                        int                     i;
 
743
 
 
744
                        for (i = lockMethodTable->numLockModes; i > 0; i--)
 
745
                        {
 
746
                                if (proclock->holdMask & LOCKBIT_ON(i))
 
747
                                {
 
748
                                        if (i >= (int) lockmode)
 
749
                                                break;  /* safe: we have a lock >= req level */
 
750
                                        elog(LOG, "deadlock risk: raising lock level"
 
751
                                                 " from %s to %s on object %u/%u/%u",
 
752
                                                 lockMethodTable->lockModeNames[i],
 
753
                                                 lockMethodTable->lockModeNames[lockmode],
 
754
                                                 lock->tag.locktag_field1, lock->tag.locktag_field2,
 
755
                                                 lock->tag.locktag_field3);
 
756
                                        break;
 
757
                                }
 
758
                        }
 
759
                }
 
760
#endif   /* CHECK_DEADLOCK_RISK */
 
761
        }
 
762
 
 
763
        /*
 
764
         * lock->nRequested and lock->requested[] count the total number of
 
765
         * requests, whether granted or waiting, so increment those immediately.
 
766
         * The other counts don't increment till we get the lock.
 
767
         */
 
768
        lock->nRequested++;
 
769
        lock->requested[lockmode]++;
 
770
        Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
 
771
 
 
772
        /*
 
773
         * We shouldn't already hold the desired lock; else locallock table is
 
774
         * broken.
 
775
         */
 
776
        if (proclock->holdMask & LOCKBIT_ON(lockmode))
 
777
                elog(ERROR, "lock %s on object %u/%u/%u is already held",
 
778
                         lockMethodTable->lockModeNames[lockmode],
 
779
                         lock->tag.locktag_field1, lock->tag.locktag_field2,
 
780
                         lock->tag.locktag_field3);
 
781
 
 
782
        /*
 
783
         * If lock requested conflicts with locks requested by waiters, must join
 
784
         * wait queue.  Otherwise, check for conflict with already-held locks.
 
785
         * (That's last because most complex check.)
 
786
         */
 
787
        if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
 
788
                status = STATUS_FOUND;
 
789
        else
 
790
                status = LockCheckConflicts(lockMethodTable, lockmode,
 
791
                                                                        lock, proclock, MyProc);
 
792
 
 
793
        if (status == STATUS_OK)
 
794
        {
 
795
                /* No conflict with held or previously requested locks */
 
796
                GrantLock(lock, proclock, lockmode);
 
797
                GrantLockLocal(locallock, owner);
 
798
        }
 
799
        else
 
800
        {
 
801
                Assert(status == STATUS_FOUND);
 
802
 
 
803
                /*
 
804
                 * We can't acquire the lock immediately.  If caller specified no
 
805
                 * blocking, remove useless table entries and return NOT_AVAIL without
 
806
                 * waiting.
 
807
                 */
 
808
                if (dontWait)
 
809
                {
 
810
                        if (proclock->holdMask == 0)
 
811
                        {
 
812
                                SHMQueueDelete(&proclock->lockLink);
 
813
                                SHMQueueDelete(&proclock->procLink);
 
814
                                if (!hash_search_with_hash_value(LockMethodProcLockHash,
 
815
                                                                                                 (void *) &(proclock->tag),
 
816
                                                                                                 proclock_hashcode,
 
817
                                                                                                 HASH_REMOVE,
 
818
                                                                                                 NULL))
 
819
                                        elog(PANIC, "proclock table corrupted");
 
820
                        }
 
821
                        else
 
822
                                PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
 
823
                        lock->nRequested--;
 
824
                        lock->requested[lockmode]--;
 
825
                        LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
 
826
                        Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
 
827
                        Assert(lock->nGranted <= lock->nRequested);
 
828
                        LWLockRelease(partitionLock);
 
829
                        if (locallock->nLocks == 0)
 
830
                                RemoveLocalLock(locallock);
 
831
                        return LOCKACQUIRE_NOT_AVAIL;
 
832
                }
 
833
 
 
834
                /*
 
835
                 * In Hot Standby perform early deadlock detection in normal backends.
 
836
                 * If deadlock found we release partition lock but do not return.
 
837
                 */
 
838
                if (RecoveryInProgress() && !InRecovery)
 
839
                        CheckRecoveryConflictDeadlock(partitionLock);
 
840
 
 
841
                /*
 
842
                 * Set bitmask of locks this process already holds on this object.
 
843
                 */
 
844
                MyProc->heldLocks = proclock->holdMask;
 
845
 
 
846
                /*
 
847
                 * Sleep till someone wakes me up.
 
848
                 */
 
849
 
 
850
                TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1,
 
851
                                                                                 locktag->locktag_field2,
 
852
                                                                                 locktag->locktag_field3,
 
853
                                                                                 locktag->locktag_field4,
 
854
                                                                                 locktag->locktag_type,
 
855
                                                                                 lockmode);
 
856
 
 
857
                WaitOnLock(locallock, owner);
 
858
 
 
859
                TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
 
860
                                                                                locktag->locktag_field2,
 
861
                                                                                locktag->locktag_field3,
 
862
                                                                                locktag->locktag_field4,
 
863
                                                                                locktag->locktag_type,
 
864
                                                                                lockmode);
 
865
 
 
866
                /*
 
867
                 * NOTE: do not do any material change of state between here and
 
868
                 * return.      All required changes in locktable state must have been
 
869
                 * done when the lock was granted to us --- see notes in WaitOnLock.
 
870
                 */
 
871
 
 
872
                /*
 
873
                 * Check the proclock entry status, in case something in the ipc
 
874
                 * communication doesn't work correctly.
 
875
                 */
 
876
                if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
 
877
                {
 
878
                        PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
 
879
                        LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
 
880
                        /* Should we retry ? */
 
881
                        LWLockRelease(partitionLock);
 
882
                        elog(ERROR, "LockAcquire failed");
 
883
                }
 
884
                PROCLOCK_PRINT("LockAcquire: granted", proclock);
 
885
                LOCK_PRINT("LockAcquire: granted", lock, lockmode);
 
886
        }
 
887
 
 
888
        LWLockRelease(partitionLock);
 
889
 
 
890
        /*
 
891
         * Emit a WAL record if acquisition of this lock need to be replayed in a
 
892
         * standby server.
 
893
         */
 
894
        if (log_lock)
 
895
        {
 
896
                /*
 
897
                 * Decode the locktag back to the original values, to avoid sending
 
898
                 * lots of empty bytes with every message.      See lock.h to check how a
 
899
                 * locktag is defined for LOCKTAG_RELATION
 
900
                 */
 
901
                LogAccessExclusiveLock(locktag->locktag_field1,
 
902
                                                           locktag->locktag_field2);
 
903
        }
 
904
 
 
905
        return LOCKACQUIRE_OK;
 
906
}
 
907
 
 
908
/*
 
909
 * Subroutine to free a locallock entry
 
910
 */
 
911
static void
 
912
RemoveLocalLock(LOCALLOCK *locallock)
 
913
{
 
914
        pfree(locallock->lockOwners);
 
915
        locallock->lockOwners = NULL;
 
916
        if (!hash_search(LockMethodLocalHash,
 
917
                                         (void *) &(locallock->tag),
 
918
                                         HASH_REMOVE, NULL))
 
919
                elog(WARNING, "locallock table corrupted");
 
920
}
 
921
 
 
922
/*
 
923
 * LockCheckConflicts -- test whether requested lock conflicts
 
924
 *              with those already granted
 
925
 *
 
926
 * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
 
927
 *
 
928
 * NOTES:
 
929
 *              Here's what makes this complicated: one process's locks don't
 
930
 * conflict with one another, no matter what purpose they are held for
 
931
 * (eg, session and transaction locks do not conflict).
 
932
 * So, we must subtract off our own locks when determining whether the
 
933
 * requested new lock conflicts with those already held.
 
934
 */
 
935
int
 
936
LockCheckConflicts(LockMethod lockMethodTable,
 
937
                                   LOCKMODE lockmode,
 
938
                                   LOCK *lock,
 
939
                                   PROCLOCK *proclock,
 
940
                                   PGPROC *proc)
 
941
{
 
942
        int                     numLockModes = lockMethodTable->numLockModes;
 
943
        LOCKMASK        myLocks;
 
944
        LOCKMASK        otherLocks;
 
945
        int                     i;
 
946
 
 
947
        /*
 
948
         * first check for global conflicts: If no locks conflict with my request,
 
949
         * then I get the lock.
 
950
         *
 
951
         * Checking for conflict: lock->grantMask represents the types of
 
952
         * currently held locks.  conflictTable[lockmode] has a bit set for each
 
953
         * type of lock that conflicts with request.   Bitwise compare tells if
 
954
         * there is a conflict.
 
955
         */
 
956
        if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
 
957
        {
 
958
                PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
 
959
                return STATUS_OK;
 
960
        }
 
961
 
 
962
        /*
 
963
         * Rats.  Something conflicts.  But it could still be my own lock. We have
 
964
         * to construct a conflict mask that does not reflect our own locks, but
 
965
         * only lock types held by other processes.
 
966
         */
 
967
        myLocks = proclock->holdMask;
 
968
        otherLocks = 0;
 
969
        for (i = 1; i <= numLockModes; i++)
 
970
        {
 
971
                int                     myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0;
 
972
 
 
973
                if (lock->granted[i] > myHolding)
 
974
                        otherLocks |= LOCKBIT_ON(i);
 
975
        }
 
976
 
 
977
        /*
 
978
         * now check again for conflicts.  'otherLocks' describes the types of
 
979
         * locks held by other processes.  If one of these conflicts with the kind
 
980
         * of lock that I want, there is a conflict and I have to sleep.
 
981
         */
 
982
        if (!(lockMethodTable->conflictTab[lockmode] & otherLocks))
 
983
        {
 
984
                /* no conflict. OK to get the lock */
 
985
                PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
 
986
                return STATUS_OK;
 
987
        }
 
988
 
 
989
        PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock);
 
990
        return STATUS_FOUND;
 
991
}
 
992
 
 
993
/*
 
994
 * GrantLock -- update the lock and proclock data structures to show
 
995
 *              the lock request has been granted.
 
996
 *
 
997
 * NOTE: if proc was blocked, it also needs to be removed from the wait list
 
998
 * and have its waitLock/waitProcLock fields cleared.  That's not done here.
 
999
 *
 
1000
 * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
 
1001
 * table entry; but since we may be awaking some other process, we can't do
 
1002
 * that here; it's done by GrantLockLocal, instead.
 
1003
 */
 
1004
void
 
1005
GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
 
1006
{
 
1007
        lock->nGranted++;
 
1008
        lock->granted[lockmode]++;
 
1009
        lock->grantMask |= LOCKBIT_ON(lockmode);
 
1010
        if (lock->granted[lockmode] == lock->requested[lockmode])
 
1011
                lock->waitMask &= LOCKBIT_OFF(lockmode);
 
1012
        proclock->holdMask |= LOCKBIT_ON(lockmode);
 
1013
        LOCK_PRINT("GrantLock", lock, lockmode);
 
1014
        Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
 
1015
        Assert(lock->nGranted <= lock->nRequested);
 
1016
}
 
1017
 
 
1018
/*
 
1019
 * UnGrantLock -- opposite of GrantLock.
 
1020
 *
 
1021
 * Updates the lock and proclock data structures to show that the lock
 
1022
 * is no longer held nor requested by the current holder.
 
1023
 *
 
1024
 * Returns true if there were any waiters waiting on the lock that
 
1025
 * should now be woken up with ProcLockWakeup.
 
1026
 */
 
1027
static bool
 
1028
UnGrantLock(LOCK *lock, LOCKMODE lockmode,
 
1029
                        PROCLOCK *proclock, LockMethod lockMethodTable)
 
1030
{
 
1031
        bool            wakeupNeeded = false;
 
1032
 
 
1033
        Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
 
1034
        Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
 
1035
        Assert(lock->nGranted <= lock->nRequested);
 
1036
 
 
1037
        /*
 
1038
         * fix the general lock stats
 
1039
         */
 
1040
        lock->nRequested--;
 
1041
        lock->requested[lockmode]--;
 
1042
        lock->nGranted--;
 
1043
        lock->granted[lockmode]--;
 
1044
 
 
1045
        if (lock->granted[lockmode] == 0)
 
1046
        {
 
1047
                /* change the conflict mask.  No more of this lock type. */
 
1048
                lock->grantMask &= LOCKBIT_OFF(lockmode);
 
1049
        }
 
1050
 
 
1051
        LOCK_PRINT("UnGrantLock: updated", lock, lockmode);
 
1052
 
 
1053
        /*
 
1054
         * We need only run ProcLockWakeup if the released lock conflicts with at
 
1055
         * least one of the lock types requested by waiter(s).  Otherwise whatever
 
1056
         * conflict made them wait must still exist.  NOTE: before MVCC, we could
 
1057
         * skip wakeup if lock->granted[lockmode] was still positive. But that's
 
1058
         * not true anymore, because the remaining granted locks might belong to
 
1059
         * some waiter, who could now be awakened because he doesn't conflict with
 
1060
         * his own locks.
 
1061
         */
 
1062
        if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
 
1063
                wakeupNeeded = true;
 
1064
 
 
1065
        /*
 
1066
         * Now fix the per-proclock state.
 
1067
         */
 
1068
        proclock->holdMask &= LOCKBIT_OFF(lockmode);
 
1069
        PROCLOCK_PRINT("UnGrantLock: updated", proclock);
 
1070
 
 
1071
        return wakeupNeeded;
 
1072
}
 
1073
 
 
1074
/*
 
1075
 * CleanUpLock -- clean up after releasing a lock.      We garbage-collect the
 
1076
 * proclock and lock objects if possible, and call ProcLockWakeup if there
 
1077
 * are remaining requests and the caller says it's OK.  (Normally, this
 
1078
 * should be called after UnGrantLock, and wakeupNeeded is the result from
 
1079
 * UnGrantLock.)
 
1080
 *
 
1081
 * The appropriate partition lock must be held at entry, and will be
 
1082
 * held at exit.
 
1083
 */
 
1084
static void
 
1085
CleanUpLock(LOCK *lock, PROCLOCK *proclock,
 
1086
                        LockMethod lockMethodTable, uint32 hashcode,
 
1087
                        bool wakeupNeeded)
 
1088
{
 
1089
        /*
 
1090
         * If this was my last hold on this lock, delete my entry in the proclock
 
1091
         * table.
 
1092
         */
 
1093
        if (proclock->holdMask == 0)
 
1094
        {
 
1095
                uint32          proclock_hashcode;
 
1096
 
 
1097
                PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
 
1098
                SHMQueueDelete(&proclock->lockLink);
 
1099
                SHMQueueDelete(&proclock->procLink);
 
1100
                proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
 
1101
                if (!hash_search_with_hash_value(LockMethodProcLockHash,
 
1102
                                                                                 (void *) &(proclock->tag),
 
1103
                                                                                 proclock_hashcode,
 
1104
                                                                                 HASH_REMOVE,
 
1105
                                                                                 NULL))
 
1106
                        elog(PANIC, "proclock table corrupted");
 
1107
        }
 
1108
 
 
1109
        if (lock->nRequested == 0)
 
1110
        {
 
1111
                /*
 
1112
                 * The caller just released the last lock, so garbage-collect the lock
 
1113
                 * object.
 
1114
                 */
 
1115
                LOCK_PRINT("CleanUpLock: deleting", lock, 0);
 
1116
                Assert(SHMQueueEmpty(&(lock->procLocks)));
 
1117
                if (!hash_search_with_hash_value(LockMethodLockHash,
 
1118
                                                                                 (void *) &(lock->tag),
 
1119
                                                                                 hashcode,
 
1120
                                                                                 HASH_REMOVE,
 
1121
                                                                                 NULL))
 
1122
                        elog(PANIC, "lock table corrupted");
 
1123
        }
 
1124
        else if (wakeupNeeded)
 
1125
        {
 
1126
                /* There are waiters on this lock, so wake them up. */
 
1127
                ProcLockWakeup(lockMethodTable, lock);
 
1128
        }
 
1129
}
 
1130
 
 
1131
/*
 
1132
 * GrantLockLocal -- update the locallock data structures to show
 
1133
 *              the lock request has been granted.
 
1134
 *
 
1135
 * We expect that LockAcquire made sure there is room to add a new
 
1136
 * ResourceOwner entry.
 
1137
 */
 
1138
static void
 
1139
GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
 
1140
{
 
1141
        LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
 
1142
        int                     i;
 
1143
 
 
1144
        Assert(locallock->numLockOwners < locallock->maxLockOwners);
 
1145
        /* Count the total */
 
1146
        locallock->nLocks++;
 
1147
        /* Count the per-owner lock */
 
1148
        for (i = 0; i < locallock->numLockOwners; i++)
 
1149
        {
 
1150
                if (lockOwners[i].owner == owner)
 
1151
                {
 
1152
                        lockOwners[i].nLocks++;
 
1153
                        return;
 
1154
                }
 
1155
        }
 
1156
        lockOwners[i].owner = owner;
 
1157
        lockOwners[i].nLocks = 1;
 
1158
        locallock->numLockOwners++;
 
1159
}
 
1160
 
 
1161
/*
 
1162
 * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
 
1163
 *              WaitOnLock on.
 
1164
 *
 
1165
 * proc.c needs this for the case where we are booted off the lock by
 
1166
 * timeout, but discover that someone granted us the lock anyway.
 
1167
 *
 
1168
 * We could just export GrantLockLocal, but that would require including
 
1169
 * resowner.h in lock.h, which creates circularity.
 
1170
 */
 
1171
void
 
1172
GrantAwaitedLock(void)
 
1173
{
 
1174
        GrantLockLocal(awaitedLock, awaitedOwner);
 
1175
}
 
1176
 
 
1177
/*
 
1178
 * WaitOnLock -- wait to acquire a lock
 
1179
 *
 
1180
 * Caller must have set MyProc->heldLocks to reflect locks already held
 
1181
 * on the lockable object by this process.
 
1182
 *
 
1183
 * The appropriate partition lock must be held at entry.
 
1184
 */
 
1185
static void
 
1186
WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
 
1187
{
 
1188
        LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
 
1189
        LockMethod      lockMethodTable = LockMethods[lockmethodid];
 
1190
        char       *volatile new_status = NULL;
 
1191
 
 
1192
        LOCK_PRINT("WaitOnLock: sleeping on lock",
 
1193
                           locallock->lock, locallock->tag.mode);
 
1194
 
 
1195
        /* Report change to waiting status */
 
1196
        if (update_process_title)
 
1197
        {
 
1198
                const char *old_status;
 
1199
                int                     len;
 
1200
 
 
1201
                old_status = get_ps_display(&len);
 
1202
                new_status = (char *) palloc(len + 8 + 1);
 
1203
                memcpy(new_status, old_status, len);
 
1204
                strcpy(new_status + len, " waiting");
 
1205
                set_ps_display(new_status, false);
 
1206
                new_status[len] = '\0'; /* truncate off " waiting" */
 
1207
        }
 
1208
        pgstat_report_waiting(true);
 
1209
 
 
1210
        awaitedLock = locallock;
 
1211
        awaitedOwner = owner;
 
1212
 
 
1213
        /*
 
1214
         * NOTE: Think not to put any shared-state cleanup after the call to
 
1215
         * ProcSleep, in either the normal or failure path.  The lock state must
 
1216
         * be fully set by the lock grantor, or by CheckDeadLock if we give up
 
1217
         * waiting for the lock.  This is necessary because of the possibility
 
1218
         * that a cancel/die interrupt will interrupt ProcSleep after someone else
 
1219
         * grants us the lock, but before we've noticed it. Hence, after granting,
 
1220
         * the locktable state must fully reflect the fact that we own the lock;
 
1221
         * we can't do additional work on return.
 
1222
         *
 
1223
         * We can and do use a PG_TRY block to try to clean up after failure, but
 
1224
         * this still has a major limitation: elog(FATAL) can occur while waiting
 
1225
         * (eg, a "die" interrupt), and then control won't come back here. So all
 
1226
         * cleanup of essential state should happen in LockWaitCancel, not here.
 
1227
         * We can use PG_TRY to clear the "waiting" status flags, since doing that
 
1228
         * is unimportant if the process exits.
 
1229
         */
 
1230
        PG_TRY();
 
1231
        {
 
1232
                if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
 
1233
                {
 
1234
                        /*
 
1235
                         * We failed as a result of a deadlock, see CheckDeadLock(). Quit
 
1236
                         * now.
 
1237
                         */
 
1238
                        awaitedLock = NULL;
 
1239
                        LOCK_PRINT("WaitOnLock: aborting on lock",
 
1240
                                           locallock->lock, locallock->tag.mode);
 
1241
                        LWLockRelease(LockHashPartitionLock(locallock->hashcode));
 
1242
 
 
1243
                        /*
 
1244
                         * Now that we aren't holding the partition lock, we can give an
 
1245
                         * error report including details about the detected deadlock.
 
1246
                         */
 
1247
                        DeadLockReport();
 
1248
                        /* not reached */
 
1249
                }
 
1250
        }
 
1251
        PG_CATCH();
 
1252
        {
 
1253
                /* In this path, awaitedLock remains set until LockWaitCancel */
 
1254
 
 
1255
                /* Report change to non-waiting status */
 
1256
                pgstat_report_waiting(false);
 
1257
                if (update_process_title)
 
1258
                {
 
1259
                        set_ps_display(new_status, false);
 
1260
                        pfree(new_status);
 
1261
                }
 
1262
 
 
1263
                /* and propagate the error */
 
1264
                PG_RE_THROW();
 
1265
        }
 
1266
        PG_END_TRY();
 
1267
 
 
1268
        awaitedLock = NULL;
 
1269
 
 
1270
        /* Report change to non-waiting status */
 
1271
        pgstat_report_waiting(false);
 
1272
        if (update_process_title)
 
1273
        {
 
1274
                set_ps_display(new_status, false);
 
1275
                pfree(new_status);
 
1276
        }
 
1277
 
 
1278
        LOCK_PRINT("WaitOnLock: wakeup on lock",
 
1279
                           locallock->lock, locallock->tag.mode);
 
1280
}
 
1281
 
 
1282
/*
 
1283
 * Remove a proc from the wait-queue it is on (caller must know it is on one).
 
1284
 * This is only used when the proc has failed to get the lock, so we set its
 
1285
 * waitStatus to STATUS_ERROR.
 
1286
 *
 
1287
 * Appropriate partition lock must be held by caller.  Also, caller is
 
1288
 * responsible for signaling the proc if needed.
 
1289
 *
 
1290
 * NB: this does not clean up any locallock object that may exist for the lock.
 
1291
 */
 
1292
void
 
1293
RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
 
1294
{
 
1295
        LOCK       *waitLock = proc->waitLock;
 
1296
        PROCLOCK   *proclock = proc->waitProcLock;
 
1297
        LOCKMODE        lockmode = proc->waitLockMode;
 
1298
        LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
 
1299
 
 
1300
        /* Make sure proc is waiting */
 
1301
        Assert(proc->waitStatus == STATUS_WAITING);
 
1302
        Assert(proc->links.next != NULL);
 
1303
        Assert(waitLock);
 
1304
        Assert(waitLock->waitProcs.size > 0);
 
1305
        Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
 
1306
 
 
1307
        /* Remove proc from lock's wait queue */
 
1308
        SHMQueueDelete(&(proc->links));
 
1309
        waitLock->waitProcs.size--;
 
1310
 
 
1311
        /* Undo increments of request counts by waiting process */
 
1312
        Assert(waitLock->nRequested > 0);
 
1313
        Assert(waitLock->nRequested > proc->waitLock->nGranted);
 
1314
        waitLock->nRequested--;
 
1315
        Assert(waitLock->requested[lockmode] > 0);
 
1316
        waitLock->requested[lockmode]--;
 
1317
        /* don't forget to clear waitMask bit if appropriate */
 
1318
        if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
 
1319
                waitLock->waitMask &= LOCKBIT_OFF(lockmode);
 
1320
 
 
1321
        /* Clean up the proc's own state, and pass it the ok/fail signal */
 
1322
        proc->waitLock = NULL;
 
1323
        proc->waitProcLock = NULL;
 
1324
        proc->waitStatus = STATUS_ERROR;
 
1325
 
 
1326
        /*
 
1327
         * Delete the proclock immediately if it represents no already-held locks.
 
1328
         * (This must happen now because if the owner of the lock decides to
 
1329
         * release it, and the requested/granted counts then go to zero,
 
1330
         * LockRelease expects there to be no remaining proclocks.) Then see if
 
1331
         * any other waiters for the lock can be woken up now.
 
1332
         */
 
1333
        CleanUpLock(waitLock, proclock,
 
1334
                                LockMethods[lockmethodid], hashcode,
 
1335
                                true);
 
1336
}
 
1337
 
 
1338
/*
 
1339
 * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it.
 
1340
 *              Release a session lock if 'sessionLock' is true, else release a
 
1341
 *              regular transaction lock.
 
1342
 *
 
1343
 * Side Effects: find any waiting processes that are now wakable,
 
1344
 *              grant them their requested locks and awaken them.
 
1345
 *              (We have to grant the lock here to avoid a race between
 
1346
 *              the waking process and any new process to
 
1347
 *              come along and request the lock.)
 
1348
 */
 
1349
bool
 
1350
LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 
1351
{
 
1352
        LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
 
1353
        LockMethod      lockMethodTable;
 
1354
        LOCALLOCKTAG localtag;
 
1355
        LOCALLOCK  *locallock;
 
1356
        LOCK       *lock;
 
1357
        PROCLOCK   *proclock;
 
1358
        LWLockId        partitionLock;
 
1359
        bool            wakeupNeeded;
 
1360
 
 
1361
        if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
 
1362
                elog(ERROR, "unrecognized lock method: %d", lockmethodid);
 
1363
        lockMethodTable = LockMethods[lockmethodid];
 
1364
        if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
 
1365
                elog(ERROR, "unrecognized lock mode: %d", lockmode);
 
1366
 
 
1367
#ifdef LOCK_DEBUG
 
1368
        if (LOCK_DEBUG_ENABLED(locktag))
 
1369
                elog(LOG, "LockRelease: lock [%u,%u] %s",
 
1370
                         locktag->locktag_field1, locktag->locktag_field2,
 
1371
                         lockMethodTable->lockModeNames[lockmode]);
 
1372
#endif
 
1373
 
 
1374
        /*
 
1375
         * Find the LOCALLOCK entry for this lock and lockmode
 
1376
         */
 
1377
        MemSet(&localtag, 0, sizeof(localtag));         /* must clear padding */
 
1378
        localtag.lock = *locktag;
 
1379
        localtag.mode = lockmode;
 
1380
 
 
1381
        locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
 
1382
                                                                                  (void *) &localtag,
 
1383
                                                                                  HASH_FIND, NULL);
 
1384
 
 
1385
        /*
 
1386
         * let the caller print its own error message, too. Do not ereport(ERROR).
 
1387
         */
 
1388
        if (!locallock || locallock->nLocks <= 0)
 
1389
        {
 
1390
                elog(WARNING, "you don't own a lock of type %s",
 
1391
                         lockMethodTable->lockModeNames[lockmode]);
 
1392
                return FALSE;
 
1393
        }
 
1394
 
 
1395
        /*
 
1396
         * Decrease the count for the resource owner.
 
1397
         */
 
1398
        {
 
1399
                LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
 
1400
                ResourceOwner owner;
 
1401
                int                     i;
 
1402
 
 
1403
                /* Session locks are never transactional, else check table */
 
1404
                if (!sessionLock && lockMethodTable->transactional)
 
1405
                        owner = CurrentResourceOwner;
 
1406
                else
 
1407
                        owner = NULL;
 
1408
 
 
1409
                for (i = locallock->numLockOwners - 1; i >= 0; i--)
 
1410
                {
 
1411
                        if (lockOwners[i].owner == owner)
 
1412
                        {
 
1413
                                Assert(lockOwners[i].nLocks > 0);
 
1414
                                if (--lockOwners[i].nLocks == 0)
 
1415
                                {
 
1416
                                        /* compact out unused slot */
 
1417
                                        locallock->numLockOwners--;
 
1418
                                        if (i < locallock->numLockOwners)
 
1419
                                                lockOwners[i] = lockOwners[locallock->numLockOwners];
 
1420
                                }
 
1421
                                break;
 
1422
                        }
 
1423
                }
 
1424
                if (i < 0)
 
1425
                {
 
1426
                        /* don't release a lock belonging to another owner */
 
1427
                        elog(WARNING, "you don't own a lock of type %s",
 
1428
                                 lockMethodTable->lockModeNames[lockmode]);
 
1429
                        return FALSE;
 
1430
                }
 
1431
        }
 
1432
 
 
1433
        /*
 
1434
         * Decrease the total local count.      If we're still holding the lock, we're
 
1435
         * done.
 
1436
         */
 
1437
        locallock->nLocks--;
 
1438
 
 
1439
        if (locallock->nLocks > 0)
 
1440
                return TRUE;
 
1441
 
 
1442
        /*
 
1443
         * Otherwise we've got to mess with the shared lock table.
 
1444
         */
 
1445
        partitionLock = LockHashPartitionLock(locallock->hashcode);
 
1446
 
 
1447
        LWLockAcquire(partitionLock, LW_EXCLUSIVE);
 
1448
 
 
1449
        /*
 
1450
         * We don't need to re-find the lock or proclock, since we kept their
 
1451
         * addresses in the locallock table, and they couldn't have been removed
 
1452
         * while we were holding a lock on them.
 
1453
         */
 
1454
        lock = locallock->lock;
 
1455
        LOCK_PRINT("LockRelease: found", lock, lockmode);
 
1456
        proclock = locallock->proclock;
 
1457
        PROCLOCK_PRINT("LockRelease: found", proclock);
 
1458
 
 
1459
        /*
 
1460
         * Double-check that we are actually holding a lock of the type we want to
 
1461
         * release.
 
1462
         */
 
1463
        if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
 
1464
        {
 
1465
                PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
 
1466
                LWLockRelease(partitionLock);
 
1467
                elog(WARNING, "you don't own a lock of type %s",
 
1468
                         lockMethodTable->lockModeNames[lockmode]);
 
1469
                RemoveLocalLock(locallock);
 
1470
                return FALSE;
 
1471
        }
 
1472
 
 
1473
        /*
 
1474
         * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
 
1475
         */
 
1476
        wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
 
1477
 
 
1478
        CleanUpLock(lock, proclock,
 
1479
                                lockMethodTable, locallock->hashcode,
 
1480
                                wakeupNeeded);
 
1481
 
 
1482
        LWLockRelease(partitionLock);
 
1483
 
 
1484
        RemoveLocalLock(locallock);
 
1485
        return TRUE;
 
1486
}
 
1487
 
 
1488
/*
 
1489
 * LockReleaseSession -- Release all session locks of the specified lock method
 
1490
 *              that are held by the current process.
 
1491
 */
 
1492
void
 
1493
LockReleaseSession(LOCKMETHODID lockmethodid)
 
1494
{
 
1495
        HASH_SEQ_STATUS status;
 
1496
        LOCALLOCK  *locallock;
 
1497
 
 
1498
        if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
 
1499
                elog(ERROR, "unrecognized lock method: %d", lockmethodid);
 
1500
 
 
1501
        hash_seq_init(&status, LockMethodLocalHash);
 
1502
 
 
1503
        while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
 
1504
        {
 
1505
                /* Ignore items that are not of the specified lock method */
 
1506
                if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
 
1507
                        continue;
 
1508
 
 
1509
                ReleaseLockForOwner(locallock, NULL);
 
1510
        }
 
1511
}
 
1512
 
 
1513
/*
 
1514
 * LockReleaseAll -- Release all locks of the specified lock method that
 
1515
 *              are held by the current process.
 
1516
 *
 
1517
 * Well, not necessarily *all* locks.  The available behaviors are:
 
1518
 *              allLocks == true: release all locks including session locks.
 
1519
 *              allLocks == false: release all non-session locks.
 
1520
 */
 
1521
void
 
1522
LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
 
1523
{
 
1524
        HASH_SEQ_STATUS status;
 
1525
        LockMethod      lockMethodTable;
 
1526
        int                     i,
 
1527
                                numLockModes;
 
1528
        LOCALLOCK  *locallock;
 
1529
        LOCK       *lock;
 
1530
        PROCLOCK   *proclock;
 
1531
        int                     partition;
 
1532
 
 
1533
        if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
 
1534
                elog(ERROR, "unrecognized lock method: %d", lockmethodid);
 
1535
        lockMethodTable = LockMethods[lockmethodid];
 
1536
 
 
1537
#ifdef LOCK_DEBUG
 
1538
        if (*(lockMethodTable->trace_flag))
 
1539
                elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
 
1540
#endif
 
1541
 
 
1542
        numLockModes = lockMethodTable->numLockModes;
 
1543
 
 
1544
        /*
 
1545
         * First we run through the locallock table and get rid of unwanted
 
1546
         * entries, then we scan the process's proclocks and get rid of those. We
 
1547
         * do this separately because we may have multiple locallock entries
 
1548
         * pointing to the same proclock, and we daren't end up with any dangling
 
1549
         * pointers.
 
1550
         */
 
1551
        hash_seq_init(&status, LockMethodLocalHash);
 
1552
 
 
1553
        while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
 
1554
        {
 
1555
                if (locallock->proclock == NULL || locallock->lock == NULL)
 
1556
                {
 
1557
                        /*
 
1558
                         * We must've run out of shared memory while trying to set up this
 
1559
                         * lock.  Just forget the local entry.
 
1560
                         */
 
1561
                        Assert(locallock->nLocks == 0);
 
1562
                        RemoveLocalLock(locallock);
 
1563
                        continue;
 
1564
                }
 
1565
 
 
1566
                /* Ignore items that are not of the lockmethod to be removed */
 
1567
                if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
 
1568
                        continue;
 
1569
 
 
1570
                /*
 
1571
                 * If we are asked to release all locks, we can just zap the entry.
 
1572
                 * Otherwise, must scan to see if there are session locks. We assume
 
1573
                 * there is at most one lockOwners entry for session locks.
 
1574
                 */
 
1575
                if (!allLocks)
 
1576
                {
 
1577
                        LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
 
1578
 
 
1579
                        /* If it's above array position 0, move it down to 0 */
 
1580
                        for (i = locallock->numLockOwners - 1; i > 0; i--)
 
1581
                        {
 
1582
                                if (lockOwners[i].owner == NULL)
 
1583
                                {
 
1584
                                        lockOwners[0] = lockOwners[i];
 
1585
                                        break;
 
1586
                                }
 
1587
                        }
 
1588
 
 
1589
                        if (locallock->numLockOwners > 0 &&
 
1590
                                lockOwners[0].owner == NULL &&
 
1591
                                lockOwners[0].nLocks > 0)
 
1592
                        {
 
1593
                                /* Fix the locallock to show just the session locks */
 
1594
                                locallock->nLocks = lockOwners[0].nLocks;
 
1595
                                locallock->numLockOwners = 1;
 
1596
                                /* We aren't deleting this locallock, so done */
 
1597
                                continue;
 
1598
                        }
 
1599
                }
 
1600
 
 
1601
                /* Mark the proclock to show we need to release this lockmode */
 
1602
                if (locallock->nLocks > 0)
 
1603
                        locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
 
1604
 
 
1605
                /* And remove the locallock hashtable entry */
 
1606
                RemoveLocalLock(locallock);
 
1607
        }
 
1608
 
 
1609
        /*
 
1610
         * Now, scan each lock partition separately.
 
1611
         */
 
1612
        for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
 
1613
        {
 
1614
                LWLockId        partitionLock = FirstLockMgrLock + partition;
 
1615
                SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
 
1616
 
 
1617
                proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
 
1618
                                                                                         offsetof(PROCLOCK, procLink));
 
1619
 
 
1620
                if (!proclock)
 
1621
                        continue;                       /* needn't examine this partition */
 
1622
 
 
1623
                LWLockAcquire(partitionLock, LW_EXCLUSIVE);
 
1624
 
 
1625
                while (proclock)
 
1626
                {
 
1627
                        bool            wakeupNeeded = false;
 
1628
                        PROCLOCK   *nextplock;
 
1629
 
 
1630
                        /* Get link first, since we may unlink/delete this proclock */
 
1631
                        nextplock = (PROCLOCK *)
 
1632
                                SHMQueueNext(procLocks, &proclock->procLink,
 
1633
                                                         offsetof(PROCLOCK, procLink));
 
1634
 
 
1635
                        Assert(proclock->tag.myProc == MyProc);
 
1636
 
 
1637
                        lock = proclock->tag.myLock;
 
1638
 
 
1639
                        /* Ignore items that are not of the lockmethod to be removed */
 
1640
                        if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
 
1641
                                goto next_item;
 
1642
 
 
1643
                        /*
 
1644
                         * In allLocks mode, force release of all locks even if locallock
 
1645
                         * table had problems
 
1646
                         */
 
1647
                        if (allLocks)
 
1648
                                proclock->releaseMask = proclock->holdMask;
 
1649
                        else
 
1650
                                Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
 
1651
 
 
1652
                        /*
 
1653
                         * Ignore items that have nothing to be released, unless they have
 
1654
                         * holdMask == 0 and are therefore recyclable
 
1655
                         */
 
1656
                        if (proclock->releaseMask == 0 && proclock->holdMask != 0)
 
1657
                                goto next_item;
 
1658
 
 
1659
                        PROCLOCK_PRINT("LockReleaseAll", proclock);
 
1660
                        LOCK_PRINT("LockReleaseAll", lock, 0);
 
1661
                        Assert(lock->nRequested >= 0);
 
1662
                        Assert(lock->nGranted >= 0);
 
1663
                        Assert(lock->nGranted <= lock->nRequested);
 
1664
                        Assert((proclock->holdMask & ~lock->grantMask) == 0);
 
1665
 
 
1666
                        /*
 
1667
                         * Release the previously-marked lock modes
 
1668
                         */
 
1669
                        for (i = 1; i <= numLockModes; i++)
 
1670
                        {
 
1671
                                if (proclock->releaseMask & LOCKBIT_ON(i))
 
1672
                                        wakeupNeeded |= UnGrantLock(lock, i, proclock,
 
1673
                                                                                                lockMethodTable);
 
1674
                        }
 
1675
                        Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
 
1676
                        Assert(lock->nGranted <= lock->nRequested);
 
1677
                        LOCK_PRINT("LockReleaseAll: updated", lock, 0);
 
1678
 
 
1679
                        proclock->releaseMask = 0;
 
1680
 
 
1681
                        /* CleanUpLock will wake up waiters if needed. */
 
1682
                        CleanUpLock(lock, proclock,
 
1683
                                                lockMethodTable,
 
1684
                                                LockTagHashCode(&lock->tag),
 
1685
                                                wakeupNeeded);
 
1686
 
 
1687
        next_item:
 
1688
                        proclock = nextplock;
 
1689
                }                                               /* loop over PROCLOCKs within this partition */
 
1690
 
 
1691
                LWLockRelease(partitionLock);
 
1692
        }                                                       /* loop over partitions */
 
1693
 
 
1694
#ifdef LOCK_DEBUG
 
1695
        if (*(lockMethodTable->trace_flag))
 
1696
                elog(LOG, "LockReleaseAll done");
 
1697
#endif
 
1698
}
 
1699
 
 
1700
/*
 
1701
 * LockReleaseCurrentOwner
 
1702
 *              Release all locks belonging to CurrentResourceOwner
 
1703
 */
 
1704
void
 
1705
LockReleaseCurrentOwner(void)
 
1706
{
 
1707
        HASH_SEQ_STATUS status;
 
1708
        LOCALLOCK  *locallock;
 
1709
 
 
1710
        hash_seq_init(&status, LockMethodLocalHash);
 
1711
 
 
1712
        while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
 
1713
        {
 
1714
                /* Ignore items that must be nontransactional */
 
1715
                if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
 
1716
                        continue;
 
1717
 
 
1718
                ReleaseLockForOwner(locallock, CurrentResourceOwner);
 
1719
        }
 
1720
}
 
1721
 
 
1722
/*
 
1723
 * Subroutine to release a lock belonging to the 'owner' if found.
 
1724
 * 'owner' can be NULL to release a session lock.
 
1725
 */
 
1726
static void
 
1727
ReleaseLockForOwner(LOCALLOCK *locallock, ResourceOwner owner)
 
1728
{
 
1729
        int                     i;
 
1730
        LOCALLOCKOWNER *lockOwners;
 
1731
 
 
1732
        /* Scan to see if there are any locks belonging to the owner */
 
1733
        lockOwners = locallock->lockOwners;
 
1734
        for (i = locallock->numLockOwners - 1; i >= 0; i--)
 
1735
        {
 
1736
                if (lockOwners[i].owner == owner)
 
1737
                {
 
1738
                        Assert(lockOwners[i].nLocks > 0);
 
1739
                        if (lockOwners[i].nLocks < locallock->nLocks)
 
1740
                        {
 
1741
                                /*
 
1742
                                 * We will still hold this lock after forgetting this
 
1743
                                 * ResourceOwner.
 
1744
                                 */
 
1745
                                locallock->nLocks -= lockOwners[i].nLocks;
 
1746
                                /* compact out unused slot */
 
1747
                                locallock->numLockOwners--;
 
1748
                                if (i < locallock->numLockOwners)
 
1749
                                        lockOwners[i] = lockOwners[locallock->numLockOwners];
 
1750
                        }
 
1751
                        else
 
1752
                        {
 
1753
                                Assert(lockOwners[i].nLocks == locallock->nLocks);
 
1754
                                /* We want to call LockRelease just once */
 
1755
                                lockOwners[i].nLocks = 1;
 
1756
                                locallock->nLocks = 1;
 
1757
                                if (!LockRelease(&locallock->tag.lock,
 
1758
                                                                 locallock->tag.mode,
 
1759
                                                                 owner == NULL))
 
1760
                                        elog(WARNING, "ReleaseLockForOwner: failed??");
 
1761
                        }
 
1762
                        break;
 
1763
                }
 
1764
        }
 
1765
}
 
1766
 
 
1767
/*
 
1768
 * LockReassignCurrentOwner
 
1769
 *              Reassign all locks belonging to CurrentResourceOwner to belong
 
1770
 *              to its parent resource owner
 
1771
 */
 
1772
void
 
1773
LockReassignCurrentOwner(void)
 
1774
{
 
1775
        ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
 
1776
        HASH_SEQ_STATUS status;
 
1777
        LOCALLOCK  *locallock;
 
1778
        LOCALLOCKOWNER *lockOwners;
 
1779
 
 
1780
        Assert(parent != NULL);
 
1781
 
 
1782
        hash_seq_init(&status, LockMethodLocalHash);
 
1783
 
 
1784
        while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
 
1785
        {
 
1786
                int                     i;
 
1787
                int                     ic = -1;
 
1788
                int                     ip = -1;
 
1789
 
 
1790
                /* Ignore items that must be nontransactional */
 
1791
                if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
 
1792
                        continue;
 
1793
 
 
1794
                /*
 
1795
                 * Scan to see if there are any locks belonging to current owner or
 
1796
                 * its parent
 
1797
                 */
 
1798
                lockOwners = locallock->lockOwners;
 
1799
                for (i = locallock->numLockOwners - 1; i >= 0; i--)
 
1800
                {
 
1801
                        if (lockOwners[i].owner == CurrentResourceOwner)
 
1802
                                ic = i;
 
1803
                        else if (lockOwners[i].owner == parent)
 
1804
                                ip = i;
 
1805
                }
 
1806
 
 
1807
                if (ic < 0)
 
1808
                        continue;                       /* no current locks */
 
1809
 
 
1810
                if (ip < 0)
 
1811
                {
 
1812
                        /* Parent has no slot, so just give it child's slot */
 
1813
                        lockOwners[ic].owner = parent;
 
1814
                }
 
1815
                else
 
1816
                {
 
1817
                        /* Merge child's count with parent's */
 
1818
                        lockOwners[ip].nLocks += lockOwners[ic].nLocks;
 
1819
                        /* compact out unused slot */
 
1820
                        locallock->numLockOwners--;
 
1821
                        if (ic < locallock->numLockOwners)
 
1822
                                lockOwners[ic] = lockOwners[locallock->numLockOwners];
 
1823
                }
 
1824
        }
 
1825
}
 
1826
 
 
1827
 
 
1828
/*
 
1829
 * GetLockConflicts
 
1830
 *              Get an array of VirtualTransactionIds of xacts currently holding locks
 
1831
 *              that would conflict with the specified lock/lockmode.
 
1832
 *              xacts merely awaiting such a lock are NOT reported.
 
1833
 *
 
1834
 * The result array is palloc'd and is terminated with an invalid VXID.
 
1835
 *
 
1836
 * Of course, the result could be out of date by the time it's returned,
 
1837
 * so use of this function has to be thought about carefully.
 
1838
 *
 
1839
 * Note we never include the current xact's vxid in the result array,
 
1840
 * since an xact never blocks itself.  Also, prepared transactions are
 
1841
 * ignored, which is a bit more debatable but is appropriate for current
 
1842
 * uses of the result.
 
1843
 */
 
1844
VirtualTransactionId *
 
1845
GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
 
1846
{
 
1847
        static VirtualTransactionId *vxids;
 
1848
        LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
 
1849
        LockMethod      lockMethodTable;
 
1850
        LOCK       *lock;
 
1851
        LOCKMASK        conflictMask;
 
1852
        SHM_QUEUE  *procLocks;
 
1853
        PROCLOCK   *proclock;
 
1854
        uint32          hashcode;
 
1855
        LWLockId        partitionLock;
 
1856
        int                     count = 0;
 
1857
 
 
1858
        if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
 
1859
                elog(ERROR, "unrecognized lock method: %d", lockmethodid);
 
1860
        lockMethodTable = LockMethods[lockmethodid];
 
1861
        if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
 
1862
                elog(ERROR, "unrecognized lock mode: %d", lockmode);
 
1863
 
 
1864
        /*
 
1865
         * Allocate memory to store results, and fill with InvalidVXID.  We only
 
1866
         * need enough space for MaxBackends + a terminator, since prepared xacts
 
1867
         * don't count. InHotStandby allocate once in TopMemoryContext.
 
1868
         */
 
1869
        if (InHotStandby)
 
1870
        {
 
1871
                if (vxids == NULL)
 
1872
                        vxids = (VirtualTransactionId *)
 
1873
                                MemoryContextAlloc(TopMemoryContext,
 
1874
                                                   sizeof(VirtualTransactionId) * (MaxBackends + 1));
 
1875
        }
 
1876
        else
 
1877
                vxids = (VirtualTransactionId *)
 
1878
                        palloc0(sizeof(VirtualTransactionId) * (MaxBackends + 1));
 
1879
 
 
1880
        /*
 
1881
         * Look up the lock object matching the tag.
 
1882
         */
 
1883
        hashcode = LockTagHashCode(locktag);
 
1884
        partitionLock = LockHashPartitionLock(hashcode);
 
1885
 
 
1886
        LWLockAcquire(partitionLock, LW_SHARED);
 
1887
 
 
1888
        lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
 
1889
                                                                                                (void *) locktag,
 
1890
                                                                                                hashcode,
 
1891
                                                                                                HASH_FIND,
 
1892
                                                                                                NULL);
 
1893
        if (!lock)
 
1894
        {
 
1895
                /*
 
1896
                 * If the lock object doesn't exist, there is nothing holding a lock
 
1897
                 * on this lockable object.
 
1898
                 */
 
1899
                LWLockRelease(partitionLock);
 
1900
                return vxids;
 
1901
        }
 
1902
 
 
1903
        /*
 
1904
         * Examine each existing holder (or awaiter) of the lock.
 
1905
         */
 
1906
        conflictMask = lockMethodTable->conflictTab[lockmode];
 
1907
 
 
1908
        procLocks = &(lock->procLocks);
 
1909
 
 
1910
        proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
 
1911
                                                                                 offsetof(PROCLOCK, lockLink));
 
1912
 
 
1913
        while (proclock)
 
1914
        {
 
1915
                if (conflictMask & proclock->holdMask)
 
1916
                {
 
1917
                        PGPROC     *proc = proclock->tag.myProc;
 
1918
 
 
1919
                        /* A backend never blocks itself */
 
1920
                        if (proc != MyProc)
 
1921
                        {
 
1922
                                VirtualTransactionId vxid;
 
1923
 
 
1924
                                GET_VXID_FROM_PGPROC(vxid, *proc);
 
1925
 
 
1926
                                /*
 
1927
                                 * If we see an invalid VXID, then either the xact has already
 
1928
                                 * committed (or aborted), or it's a prepared xact.  In either
 
1929
                                 * case we may ignore it.
 
1930
                                 */
 
1931
                                if (VirtualTransactionIdIsValid(vxid))
 
1932
                                        vxids[count++] = vxid;
 
1933
                        }
 
1934
                }
 
1935
 
 
1936
                proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
 
1937
                                                                                         offsetof(PROCLOCK, lockLink));
 
1938
        }
 
1939
 
 
1940
        LWLockRelease(partitionLock);
 
1941
 
 
1942
        if (count > MaxBackends)        /* should never happen */
 
1943
                elog(PANIC, "too many conflicting locks found");
 
1944
 
 
1945
        return vxids;
 
1946
}
 
1947
 
 
1948
 
 
1949
/*
 
1950
 * AtPrepare_Locks
 
1951
 *              Do the preparatory work for a PREPARE: make 2PC state file records
 
1952
 *              for all locks currently held.
 
1953
 *
 
1954
 * Non-transactional locks are ignored, as are VXID locks.
 
1955
 *
 
1956
 * There are some special cases that we error out on: we can't be holding
 
1957
 * any session locks (should be OK since only VACUUM uses those) and we
 
1958
 * can't be holding any locks on temporary objects (since that would mess
 
1959
 * up the current backend if it tries to exit before the prepared xact is
 
1960
 * committed).
 
1961
 */
 
1962
void
 
1963
AtPrepare_Locks(void)
 
1964
{
 
1965
        HASH_SEQ_STATUS status;
 
1966
        LOCALLOCK  *locallock;
 
1967
 
 
1968
        /*
 
1969
         * We don't need to touch shared memory for this --- all the necessary
 
1970
         * state information is in the locallock table.
 
1971
         */
 
1972
        hash_seq_init(&status, LockMethodLocalHash);
 
1973
 
 
1974
        while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
 
1975
        {
 
1976
                TwoPhaseLockRecord record;
 
1977
                LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
 
1978
                int                     i;
 
1979
 
 
1980
                /* Ignore nontransactional locks */
 
1981
                if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
 
1982
                        continue;
 
1983
 
 
1984
                /*
 
1985
                 * Ignore VXID locks.  We don't want those to be held by prepared
 
1986
                 * transactions, since they aren't meaningful after a restart.
 
1987
                 */
 
1988
                if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
 
1989
                        continue;
 
1990
 
 
1991
                /* Ignore it if we don't actually hold the lock */
 
1992
                if (locallock->nLocks <= 0)
 
1993
                        continue;
 
1994
 
 
1995
                /* Scan to verify there are no session locks */
 
1996
                for (i = locallock->numLockOwners - 1; i >= 0; i--)
 
1997
                {
 
1998
                        /* elog not ereport since this should not happen */
 
1999
                        if (lockOwners[i].owner == NULL)
 
2000
                                elog(ERROR, "cannot PREPARE when session locks exist");
 
2001
                }
 
2002
 
 
2003
                /*
 
2004
                 * Create a 2PC record.
 
2005
                 */
 
2006
                memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
 
2007
                record.lockmode = locallock->tag.mode;
 
2008
 
 
2009
                RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
 
2010
                                                           &record, sizeof(TwoPhaseLockRecord));
 
2011
        }
 
2012
}
 
2013
 
 
2014
/*
 
2015
 * PostPrepare_Locks
 
2016
 *              Clean up after successful PREPARE
 
2017
 *
 
2018
 * Here, we want to transfer ownership of our locks to a dummy PGPROC
 
2019
 * that's now associated with the prepared transaction, and we want to
 
2020
 * clean out the corresponding entries in the LOCALLOCK table.
 
2021
 *
 
2022
 * Note: by removing the LOCALLOCK entries, we are leaving dangling
 
2023
 * pointers in the transaction's resource owner.  This is OK at the
 
2024
 * moment since resowner.c doesn't try to free locks retail at a toplevel
 
2025
 * transaction commit or abort.  We could alternatively zero out nLocks
 
2026
 * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
 
2027
 * but that probably costs more cycles.
 
2028
 */
 
2029
void
 
2030
PostPrepare_Locks(TransactionId xid)
 
2031
{
 
2032
        PGPROC     *newproc = TwoPhaseGetDummyProc(xid);
 
2033
        HASH_SEQ_STATUS status;
 
2034
        LOCALLOCK  *locallock;
 
2035
        LOCK       *lock;
 
2036
        PROCLOCK   *proclock;
 
2037
        PROCLOCKTAG proclocktag;
 
2038
        bool            found;
 
2039
        int                     partition;
 
2040
 
 
2041
        /* This is a critical section: any error means big trouble */
 
2042
        START_CRIT_SECTION();
 
2043
 
 
2044
        /*
 
2045
         * First we run through the locallock table and get rid of unwanted
 
2046
         * entries, then we scan the process's proclocks and transfer them to the
 
2047
         * target proc.
 
2048
         *
 
2049
         * We do this separately because we may have multiple locallock entries
 
2050
         * pointing to the same proclock, and we daren't end up with any dangling
 
2051
         * pointers.
 
2052
         */
 
2053
        hash_seq_init(&status, LockMethodLocalHash);
 
2054
 
 
2055
        while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
 
2056
        {
 
2057
                if (locallock->proclock == NULL || locallock->lock == NULL)
 
2058
                {
 
2059
                        /*
 
2060
                         * We must've run out of shared memory while trying to set up this
 
2061
                         * lock.  Just forget the local entry.
 
2062
                         */
 
2063
                        Assert(locallock->nLocks == 0);
 
2064
                        RemoveLocalLock(locallock);
 
2065
                        continue;
 
2066
                }
 
2067
 
 
2068
                /* Ignore nontransactional locks */
 
2069
                if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
 
2070
                        continue;
 
2071
 
 
2072
                /* Ignore VXID locks */
 
2073
                if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
 
2074
                        continue;
 
2075
 
 
2076
                /* We already checked there are no session locks */
 
2077
 
 
2078
                /* Mark the proclock to show we need to release this lockmode */
 
2079
                if (locallock->nLocks > 0)
 
2080
                        locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
 
2081
 
 
2082
                /* And remove the locallock hashtable entry */
 
2083
                RemoveLocalLock(locallock);
 
2084
        }
 
2085
 
 
2086
        /*
 
2087
         * Now, scan each lock partition separately.
 
2088
         */
 
2089
        for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
 
2090
        {
 
2091
                LWLockId        partitionLock = FirstLockMgrLock + partition;
 
2092
                SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
 
2093
 
 
2094
                proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
 
2095
                                                                                         offsetof(PROCLOCK, procLink));
 
2096
 
 
2097
                if (!proclock)
 
2098
                        continue;                       /* needn't examine this partition */
 
2099
 
 
2100
                LWLockAcquire(partitionLock, LW_EXCLUSIVE);
 
2101
 
 
2102
                while (proclock)
 
2103
                {
 
2104
                        PROCLOCK   *nextplock;
 
2105
                        LOCKMASK        holdMask;
 
2106
                        PROCLOCK   *newproclock;
 
2107
 
 
2108
                        /* Get link first, since we may unlink/delete this proclock */
 
2109
                        nextplock = (PROCLOCK *)
 
2110
                                SHMQueueNext(procLocks, &proclock->procLink,
 
2111
                                                         offsetof(PROCLOCK, procLink));
 
2112
 
 
2113
                        Assert(proclock->tag.myProc == MyProc);
 
2114
 
 
2115
                        lock = proclock->tag.myLock;
 
2116
 
 
2117
                        /* Ignore nontransactional locks */
 
2118
                        if (!LockMethods[LOCK_LOCKMETHOD(*lock)]->transactional)
 
2119
                                goto next_item;
 
2120
 
 
2121
                        /* Ignore VXID locks */
 
2122
                        if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
 
2123
                                goto next_item;
 
2124
 
 
2125
                        PROCLOCK_PRINT("PostPrepare_Locks", proclock);
 
2126
                        LOCK_PRINT("PostPrepare_Locks", lock, 0);
 
2127
                        Assert(lock->nRequested >= 0);
 
2128
                        Assert(lock->nGranted >= 0);
 
2129
                        Assert(lock->nGranted <= lock->nRequested);
 
2130
                        Assert((proclock->holdMask & ~lock->grantMask) == 0);
 
2131
 
 
2132
                        /*
 
2133
                         * Since there were no session locks, we should be releasing all
 
2134
                         * locks
 
2135
                         */
 
2136
                        if (proclock->releaseMask != proclock->holdMask)
 
2137
                                elog(PANIC, "we seem to have dropped a bit somewhere");
 
2138
 
 
2139
                        holdMask = proclock->holdMask;
 
2140
 
 
2141
                        /*
 
2142
                         * We cannot simply modify proclock->tag.myProc to reassign
 
2143
                         * ownership of the lock, because that's part of the hash key and
 
2144
                         * the proclock would then be in the wrong hash chain.  So, unlink
 
2145
                         * and delete the old proclock; create a new one with the right
 
2146
                         * contents; and link it into place.  We do it in this order to be
 
2147
                         * certain we won't run out of shared memory (the way dynahash.c
 
2148
                         * works, the deleted object is certain to be available for
 
2149
                         * reallocation).
 
2150
                         */
 
2151
                        SHMQueueDelete(&proclock->lockLink);
 
2152
                        SHMQueueDelete(&proclock->procLink);
 
2153
                        if (!hash_search(LockMethodProcLockHash,
 
2154
                                                         (void *) &(proclock->tag),
 
2155
                                                         HASH_REMOVE, NULL))
 
2156
                                elog(PANIC, "proclock table corrupted");
 
2157
 
 
2158
                        /*
 
2159
                         * Create the hash key for the new proclock table.
 
2160
                         */
 
2161
                        proclocktag.myLock = lock;
 
2162
                        proclocktag.myProc = newproc;
 
2163
 
 
2164
                        newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
 
2165
                                                                                                   (void *) &proclocktag,
 
2166
                                                                                                   HASH_ENTER_NULL, &found);
 
2167
                        if (!newproclock)
 
2168
                                ereport(PANIC,  /* should not happen */
 
2169
                                                (errcode(ERRCODE_OUT_OF_MEMORY),
 
2170
                                                 errmsg("out of shared memory"),
 
2171
                                                 errdetail("Not enough memory for reassigning the prepared transaction's locks.")));
 
2172
 
 
2173
                        /*
 
2174
                         * If new, initialize the new entry
 
2175
                         */
 
2176
                        if (!found)
 
2177
                        {
 
2178
                                newproclock->holdMask = 0;
 
2179
                                newproclock->releaseMask = 0;
 
2180
                                /* Add new proclock to appropriate lists */
 
2181
                                SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink);
 
2182
                                SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
 
2183
                                                                         &newproclock->procLink);
 
2184
                                PROCLOCK_PRINT("PostPrepare_Locks: new", newproclock);
 
2185
                        }
 
2186
                        else
 
2187
                        {
 
2188
                                PROCLOCK_PRINT("PostPrepare_Locks: found", newproclock);
 
2189
                                Assert((newproclock->holdMask & ~lock->grantMask) == 0);
 
2190
                        }
 
2191
 
 
2192
                        /*
 
2193
                         * Pass over the identified lock ownership.
 
2194
                         */
 
2195
                        Assert((newproclock->holdMask & holdMask) == 0);
 
2196
                        newproclock->holdMask |= holdMask;
 
2197
 
 
2198
        next_item:
 
2199
                        proclock = nextplock;
 
2200
                }                                               /* loop over PROCLOCKs within this partition */
 
2201
 
 
2202
                LWLockRelease(partitionLock);
 
2203
        }                                                       /* loop over partitions */
 
2204
 
 
2205
        END_CRIT_SECTION();
 
2206
}
 
2207
 
 
2208
 
 
2209
/*
 
2210
 * Estimate shared-memory space used for lock tables
 
2211
 */
 
2212
Size
 
2213
LockShmemSize(void)
 
2214
{
 
2215
        Size            size = 0;
 
2216
        long            max_table_size;
 
2217
 
 
2218
        /* lock hash table */
 
2219
        max_table_size = NLOCKENTS();
 
2220
        size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK)));
 
2221
 
 
2222
        /* proclock hash table */
 
2223
        max_table_size *= 2;
 
2224
        size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK)));
 
2225
 
 
2226
        /*
 
2227
         * Since NLOCKENTS is only an estimate, add 10% safety margin.
 
2228
         */
 
2229
        size = add_size(size, size / 10);
 
2230
 
 
2231
        return size;
 
2232
}
 
2233
 
 
2234
/*
 
2235
 * GetLockStatusData - Return a summary of the lock manager's internal
 
2236
 * status, for use in a user-level reporting function.
 
2237
 *
 
2238
 * The return data consists of an array of PROCLOCK objects, with the
 
2239
 * associated PGPROC and LOCK objects for each.  Note that multiple
 
2240
 * copies of the same PGPROC and/or LOCK objects are likely to appear.
 
2241
 * It is the caller's responsibility to match up duplicates if wanted.
 
2242
 *
 
2243
 * The design goal is to hold the LWLocks for as short a time as possible;
 
2244
 * thus, this function simply makes a copy of the necessary data and releases
 
2245
 * the locks, allowing the caller to contemplate and format the data for as
 
2246
 * long as it pleases.
 
2247
 */
 
2248
LockData *
 
2249
GetLockStatusData(void)
 
2250
{
 
2251
        LockData   *data;
 
2252
        PROCLOCK   *proclock;
 
2253
        HASH_SEQ_STATUS seqstat;
 
2254
        int                     els;
 
2255
        int                     el;
 
2256
        int                     i;
 
2257
 
 
2258
        data = (LockData *) palloc(sizeof(LockData));
 
2259
 
 
2260
        /*
 
2261
         * Acquire lock on the entire shared lock data structure.  We can't
 
2262
         * operate one partition at a time if we want to deliver a self-consistent
 
2263
         * view of the state.
 
2264
         *
 
2265
         * Since this is a read-only operation, we take shared instead of
 
2266
         * exclusive lock.      There's not a whole lot of point to this, because all
 
2267
         * the normal operations require exclusive lock, but it doesn't hurt
 
2268
         * anything either. It will at least allow two backends to do
 
2269
         * GetLockStatusData in parallel.
 
2270
         *
 
2271
         * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
 
2272
         */
 
2273
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
 
2274
                LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
 
2275
 
 
2276
        /* Now we can safely count the number of proclocks */
 
2277
        els = hash_get_num_entries(LockMethodProcLockHash);
 
2278
 
 
2279
        data->nelements = els;
 
2280
        data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * els);
 
2281
        data->procs = (PGPROC *) palloc(sizeof(PGPROC) * els);
 
2282
        data->locks = (LOCK *) palloc(sizeof(LOCK) * els);
 
2283
 
 
2284
        /* Now scan the tables to copy the data */
 
2285
        hash_seq_init(&seqstat, LockMethodProcLockHash);
 
2286
 
 
2287
        el = 0;
 
2288
        while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
 
2289
        {
 
2290
                PGPROC     *proc = proclock->tag.myProc;
 
2291
                LOCK       *lock = proclock->tag.myLock;
 
2292
 
 
2293
                memcpy(&(data->proclocks[el]), proclock, sizeof(PROCLOCK));
 
2294
                memcpy(&(data->procs[el]), proc, sizeof(PGPROC));
 
2295
                memcpy(&(data->locks[el]), lock, sizeof(LOCK));
 
2296
 
 
2297
                el++;
 
2298
        }
 
2299
 
 
2300
        /*
 
2301
         * And release locks.  We do this in reverse order for two reasons: (1)
 
2302
         * Anyone else who needs more than one of the locks will be trying to lock
 
2303
         * them in increasing order; we don't want to release the other process
 
2304
         * until it can get all the locks it needs. (2) This avoids O(N^2)
 
2305
         * behavior inside LWLockRelease.
 
2306
         */
 
2307
        for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
 
2308
                LWLockRelease(FirstLockMgrLock + i);
 
2309
 
 
2310
        Assert(el == data->nelements);
 
2311
 
 
2312
        return data;
 
2313
}
 
2314
 
 
2315
/*
 
2316
 * Returns a list of currently held AccessExclusiveLocks, for use
 
2317
 * by GetRunningTransactionData().
 
2318
 */
 
2319
xl_standby_lock *
 
2320
GetRunningTransactionLocks(int *nlocks)
 
2321
{
 
2322
        PROCLOCK   *proclock;
 
2323
        HASH_SEQ_STATUS seqstat;
 
2324
        int                     i;
 
2325
        int                     index;
 
2326
        int                     els;
 
2327
        xl_standby_lock *accessExclusiveLocks;
 
2328
 
 
2329
        /*
 
2330
         * Acquire lock on the entire shared lock data structure.
 
2331
         *
 
2332
         * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
 
2333
         */
 
2334
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
 
2335
                LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
 
2336
 
 
2337
        /* Now scan the tables to copy the data */
 
2338
        hash_seq_init(&seqstat, LockMethodProcLockHash);
 
2339
 
 
2340
        /* Now we can safely count the number of proclocks */
 
2341
        els = hash_get_num_entries(LockMethodProcLockHash);
 
2342
 
 
2343
        /*
 
2344
         * Allocating enough space for all locks in the lock table is overkill,
 
2345
         * but it's more convenient and faster than having to enlarge the array.
 
2346
         */
 
2347
        accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock));
 
2348
 
 
2349
        /*
 
2350
         * If lock is a currently granted AccessExclusiveLock then it will have
 
2351
         * just one proclock holder, so locks are never accessed twice in this
 
2352
         * particular case. Don't copy this code for use elsewhere because in the
 
2353
         * general case this will give you duplicate locks when looking at
 
2354
         * non-exclusive lock types.
 
2355
         */
 
2356
        index = 0;
 
2357
        while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
 
2358
        {
 
2359
                /* make sure this definition matches the one used in LockAcquire */
 
2360
                if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) &&
 
2361
                        proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION)
 
2362
                {
 
2363
                        PGPROC     *proc = proclock->tag.myProc;
 
2364
                        LOCK       *lock = proclock->tag.myLock;
 
2365
 
 
2366
                        accessExclusiveLocks[index].xid = proc->xid;
 
2367
                        accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1;
 
2368
                        accessExclusiveLocks[index].relOid = lock->tag.locktag_field2;
 
2369
 
 
2370
                        index++;
 
2371
                }
 
2372
        }
 
2373
 
 
2374
        /*
 
2375
         * And release locks.  We do this in reverse order for two reasons: (1)
 
2376
         * Anyone else who needs more than one of the locks will be trying to lock
 
2377
         * them in increasing order; we don't want to release the other process
 
2378
         * until it can get all the locks it needs. (2) This avoids O(N^2)
 
2379
         * behavior inside LWLockRelease.
 
2380
         */
 
2381
        for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
 
2382
                LWLockRelease(FirstLockMgrLock + i);
 
2383
 
 
2384
        *nlocks = index;
 
2385
        return accessExclusiveLocks;
 
2386
}
 
2387
 
 
2388
/* Provide the textual name of any lock mode */
 
2389
const char *
 
2390
GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
 
2391
{
 
2392
        Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods));
 
2393
        Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes);
 
2394
        return LockMethods[lockmethodid]->lockModeNames[mode];
 
2395
}
 
2396
 
 
2397
#ifdef LOCK_DEBUG
 
2398
/*
 
2399
 * Dump all locks in the given proc's myProcLocks lists.
 
2400
 *
 
2401
 * Caller is responsible for having acquired appropriate LWLocks.
 
2402
 */
 
2403
void
 
2404
DumpLocks(PGPROC *proc)
 
2405
{
 
2406
        SHM_QUEUE  *procLocks;
 
2407
        PROCLOCK   *proclock;
 
2408
        LOCK       *lock;
 
2409
        int                     i;
 
2410
 
 
2411
        if (proc == NULL)
 
2412
                return;
 
2413
 
 
2414
        if (proc->waitLock)
 
2415
                LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
 
2416
 
 
2417
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
 
2418
        {
 
2419
                procLocks = &(proc->myProcLocks[i]);
 
2420
 
 
2421
                proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
 
2422
                                                                                         offsetof(PROCLOCK, procLink));
 
2423
 
 
2424
                while (proclock)
 
2425
                {
 
2426
                        Assert(proclock->tag.myProc == proc);
 
2427
 
 
2428
                        lock = proclock->tag.myLock;
 
2429
 
 
2430
                        PROCLOCK_PRINT("DumpLocks", proclock);
 
2431
                        LOCK_PRINT("DumpLocks", lock, 0);
 
2432
 
 
2433
                        proclock = (PROCLOCK *)
 
2434
                                SHMQueueNext(procLocks, &proclock->procLink,
 
2435
                                                         offsetof(PROCLOCK, procLink));
 
2436
                }
 
2437
        }
 
2438
}
 
2439
 
 
2440
/*
 
2441
 * Dump all lmgr locks.
 
2442
 *
 
2443
 * Caller is responsible for having acquired appropriate LWLocks.
 
2444
 */
 
2445
void
 
2446
DumpAllLocks(void)
 
2447
{
 
2448
        PGPROC     *proc;
 
2449
        PROCLOCK   *proclock;
 
2450
        LOCK       *lock;
 
2451
        HASH_SEQ_STATUS status;
 
2452
 
 
2453
        proc = MyProc;
 
2454
 
 
2455
        if (proc && proc->waitLock)
 
2456
                LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
 
2457
 
 
2458
        hash_seq_init(&status, LockMethodProcLockHash);
 
2459
 
 
2460
        while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
 
2461
        {
 
2462
                PROCLOCK_PRINT("DumpAllLocks", proclock);
 
2463
 
 
2464
                lock = proclock->tag.myLock;
 
2465
                if (lock)
 
2466
                        LOCK_PRINT("DumpAllLocks", lock, 0);
 
2467
                else
 
2468
                        elog(LOG, "DumpAllLocks: proclock->tag.myLock = NULL");
 
2469
        }
 
2470
}
 
2471
#endif   /* LOCK_DEBUG */
 
2472
 
 
2473
/*
 
2474
 * LOCK 2PC resource manager's routines
 
2475
 */
 
2476
 
 
2477
/*
 
2478
 * Re-acquire a lock belonging to a transaction that was prepared.
 
2479
 *
 
2480
 * Because this function is run at db startup, re-acquiring the locks should
 
2481
 * never conflict with running transactions because there are none.  We
 
2482
 * assume that the lock state represented by the stored 2PC files is legal.
 
2483
 *
 
2484
 * When switching from Hot Standby mode to normal operation, the locks will
 
2485
 * be already held by the startup process. The locks are acquired for the new
 
2486
 * procs without checking for conflicts, so we don'get a conflict between the
 
2487
 * startup process and the dummy procs, even though we will momentarily have
 
2488
 * a situation where two procs are holding the same AccessExclusiveLock,
 
2489
 * which isn't normally possible because the conflict. If we're in standby
 
2490
 * mode, but a recovery snapshot hasn't been established yet, it's possible
 
2491
 * that some but not all of the locks are already held by the startup process.
 
2492
 *
 
2493
 * This approach is simple, but also a bit dangerous, because if there isn't
 
2494
 * enough shared memory to acquire the locks, an error will be thrown, which
 
2495
 * is promoted to FATAL and recovery will abort, bringing down postmaster.
 
2496
 * A safer approach would be to transfer the locks like we do in
 
2497
 * AtPrepare_Locks, but then again, in hot standby mode it's possible for
 
2498
 * read-only backends to use up all the shared lock memory anyway, so that
 
2499
 * replaying the WAL record that needs to acquire a lock will throw an error
 
2500
 * and PANIC anyway.
 
2501
 */
 
2502
void
 
2503
lock_twophase_recover(TransactionId xid, uint16 info,
 
2504
                                          void *recdata, uint32 len)
 
2505
{
 
2506
        TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
 
2507
        PGPROC     *proc = TwoPhaseGetDummyProc(xid);
 
2508
        LOCKTAG    *locktag;
 
2509
        LOCKMODE        lockmode;
 
2510
        LOCKMETHODID lockmethodid;
 
2511
        LOCK       *lock;
 
2512
        PROCLOCK   *proclock;
 
2513
        PROCLOCKTAG proclocktag;
 
2514
        bool            found;
 
2515
        uint32          hashcode;
 
2516
        uint32          proclock_hashcode;
 
2517
        int                     partition;
 
2518
        LWLockId        partitionLock;
 
2519
        LockMethod      lockMethodTable;
 
2520
 
 
2521
        Assert(len == sizeof(TwoPhaseLockRecord));
 
2522
        locktag = &rec->locktag;
 
2523
        lockmode = rec->lockmode;
 
2524
        lockmethodid = locktag->locktag_lockmethodid;
 
2525
 
 
2526
        if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
 
2527
                elog(ERROR, "unrecognized lock method: %d", lockmethodid);
 
2528
        lockMethodTable = LockMethods[lockmethodid];
 
2529
 
 
2530
        hashcode = LockTagHashCode(locktag);
 
2531
        partition = LockHashPartition(hashcode);
 
2532
        partitionLock = LockHashPartitionLock(hashcode);
 
2533
 
 
2534
        LWLockAcquire(partitionLock, LW_EXCLUSIVE);
 
2535
 
 
2536
        /*
 
2537
         * Find or create a lock with this tag.
 
2538
         */
 
2539
        lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
 
2540
                                                                                                (void *) locktag,
 
2541
                                                                                                hashcode,
 
2542
                                                                                                HASH_ENTER_NULL,
 
2543
                                                                                                &found);
 
2544
        if (!lock)
 
2545
        {
 
2546
                LWLockRelease(partitionLock);
 
2547
                ereport(ERROR,
 
2548
                                (errcode(ERRCODE_OUT_OF_MEMORY),
 
2549
                                 errmsg("out of shared memory"),
 
2550
                  errhint("You might need to increase max_locks_per_transaction.")));
 
2551
        }
 
2552
 
 
2553
        /*
 
2554
         * if it's a new lock object, initialize it
 
2555
         */
 
2556
        if (!found)
 
2557
        {
 
2558
                lock->grantMask = 0;
 
2559
                lock->waitMask = 0;
 
2560
                SHMQueueInit(&(lock->procLocks));
 
2561
                ProcQueueInit(&(lock->waitProcs));
 
2562
                lock->nRequested = 0;
 
2563
                lock->nGranted = 0;
 
2564
                MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
 
2565
                MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
 
2566
                LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
 
2567
        }
 
2568
        else
 
2569
        {
 
2570
                LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
 
2571
                Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
 
2572
                Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
 
2573
                Assert(lock->nGranted <= lock->nRequested);
 
2574
        }
 
2575
 
 
2576
        /*
 
2577
         * Create the hash key for the proclock table.
 
2578
         */
 
2579
        proclocktag.myLock = lock;
 
2580
        proclocktag.myProc = proc;
 
2581
 
 
2582
        proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
 
2583
 
 
2584
        /*
 
2585
         * Find or create a proclock entry with this tag
 
2586
         */
 
2587
        proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
 
2588
                                                                                                                (void *) &proclocktag,
 
2589
                                                                                                                proclock_hashcode,
 
2590
                                                                                                                HASH_ENTER_NULL,
 
2591
                                                                                                                &found);
 
2592
        if (!proclock)
 
2593
        {
 
2594
                /* Ooops, not enough shmem for the proclock */
 
2595
                if (lock->nRequested == 0)
 
2596
                {
 
2597
                        /*
 
2598
                         * There are no other requestors of this lock, so garbage-collect
 
2599
                         * the lock object.  We *must* do this to avoid a permanent leak
 
2600
                         * of shared memory, because there won't be anything to cause
 
2601
                         * anyone to release the lock object later.
 
2602
                         */
 
2603
                        Assert(SHMQueueEmpty(&(lock->procLocks)));
 
2604
                        if (!hash_search_with_hash_value(LockMethodLockHash,
 
2605
                                                                                         (void *) &(lock->tag),
 
2606
                                                                                         hashcode,
 
2607
                                                                                         HASH_REMOVE,
 
2608
                                                                                         NULL))
 
2609
                                elog(PANIC, "lock table corrupted");
 
2610
                }
 
2611
                LWLockRelease(partitionLock);
 
2612
                ereport(ERROR,
 
2613
                                (errcode(ERRCODE_OUT_OF_MEMORY),
 
2614
                                 errmsg("out of shared memory"),
 
2615
                  errhint("You might need to increase max_locks_per_transaction.")));
 
2616
        }
 
2617
 
 
2618
        /*
 
2619
         * If new, initialize the new entry
 
2620
         */
 
2621
        if (!found)
 
2622
        {
 
2623
                proclock->holdMask = 0;
 
2624
                proclock->releaseMask = 0;
 
2625
                /* Add proclock to appropriate lists */
 
2626
                SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
 
2627
                SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
 
2628
                                                         &proclock->procLink);
 
2629
                PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
 
2630
        }
 
2631
        else
 
2632
        {
 
2633
                PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
 
2634
                Assert((proclock->holdMask & ~lock->grantMask) == 0);
 
2635
        }
 
2636
 
 
2637
        /*
 
2638
         * lock->nRequested and lock->requested[] count the total number of
 
2639
         * requests, whether granted or waiting, so increment those immediately.
 
2640
         */
 
2641
        lock->nRequested++;
 
2642
        lock->requested[lockmode]++;
 
2643
        Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
 
2644
 
 
2645
        /*
 
2646
         * We shouldn't already hold the desired lock.
 
2647
         */
 
2648
        if (proclock->holdMask & LOCKBIT_ON(lockmode))
 
2649
                elog(ERROR, "lock %s on object %u/%u/%u is already held",
 
2650
                         lockMethodTable->lockModeNames[lockmode],
 
2651
                         lock->tag.locktag_field1, lock->tag.locktag_field2,
 
2652
                         lock->tag.locktag_field3);
 
2653
 
 
2654
        /*
 
2655
         * We ignore any possible conflicts and just grant ourselves the lock. Not
 
2656
         * only because we don't bother, but also to avoid deadlocks when
 
2657
         * switching from standby to normal mode. See function comment.
 
2658
         */
 
2659
        GrantLock(lock, proclock, lockmode);
 
2660
 
 
2661
        LWLockRelease(partitionLock);
 
2662
}
 
2663
 
 
2664
/*
 
2665
 * Re-acquire a lock belonging to a transaction that was prepared, when
 
2666
 * when starting up into hot standby mode.
 
2667
 */
 
2668
void
 
2669
lock_twophase_standby_recover(TransactionId xid, uint16 info,
 
2670
                                                          void *recdata, uint32 len)
 
2671
{
 
2672
        TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
 
2673
        LOCKTAG    *locktag;
 
2674
        LOCKMODE        lockmode;
 
2675
        LOCKMETHODID lockmethodid;
 
2676
 
 
2677
        Assert(len == sizeof(TwoPhaseLockRecord));
 
2678
        locktag = &rec->locktag;
 
2679
        lockmode = rec->lockmode;
 
2680
        lockmethodid = locktag->locktag_lockmethodid;
 
2681
 
 
2682
        if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
 
2683
                elog(ERROR, "unrecognized lock method: %d", lockmethodid);
 
2684
 
 
2685
        if (lockmode == AccessExclusiveLock &&
 
2686
                locktag->locktag_type == LOCKTAG_RELATION)
 
2687
        {
 
2688
                StandbyAcquireAccessExclusiveLock(xid,
 
2689
                                                                                locktag->locktag_field1 /* dboid */ ,
 
2690
                                                                          locktag->locktag_field2 /* reloid */ );
 
2691
        }
 
2692
}
 
2693
 
 
2694
 
 
2695
/*
 
2696
 * 2PC processing routine for COMMIT PREPARED case.
 
2697
 *
 
2698
 * Find and release the lock indicated by the 2PC record.
 
2699
 */
 
2700
void
 
2701
lock_twophase_postcommit(TransactionId xid, uint16 info,
 
2702
                                                 void *recdata, uint32 len)
 
2703
{
 
2704
        TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
 
2705
        PGPROC     *proc = TwoPhaseGetDummyProc(xid);
 
2706
        LOCKTAG    *locktag;
 
2707
        LOCKMODE        lockmode;
 
2708
        LOCKMETHODID lockmethodid;
 
2709
        LOCK       *lock;
 
2710
        PROCLOCK   *proclock;
 
2711
        PROCLOCKTAG proclocktag;
 
2712
        uint32          hashcode;
 
2713
        uint32          proclock_hashcode;
 
2714
        LWLockId        partitionLock;
 
2715
        LockMethod      lockMethodTable;
 
2716
        bool            wakeupNeeded;
 
2717
 
 
2718
        Assert(len == sizeof(TwoPhaseLockRecord));
 
2719
        locktag = &rec->locktag;
 
2720
        lockmode = rec->lockmode;
 
2721
        lockmethodid = locktag->locktag_lockmethodid;
 
2722
 
 
2723
        if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
 
2724
                elog(ERROR, "unrecognized lock method: %d", lockmethodid);
 
2725
        lockMethodTable = LockMethods[lockmethodid];
 
2726
 
 
2727
        hashcode = LockTagHashCode(locktag);
 
2728
        partitionLock = LockHashPartitionLock(hashcode);
 
2729
 
 
2730
        LWLockAcquire(partitionLock, LW_EXCLUSIVE);
 
2731
 
 
2732
        /*
 
2733
         * Re-find the lock object (it had better be there).
 
2734
         */
 
2735
        lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
 
2736
                                                                                                (void *) locktag,
 
2737
                                                                                                hashcode,
 
2738
                                                                                                HASH_FIND,
 
2739
                                                                                                NULL);
 
2740
        if (!lock)
 
2741
                elog(PANIC, "failed to re-find shared lock object");
 
2742
 
 
2743
        /*
 
2744
         * Re-find the proclock object (ditto).
 
2745
         */
 
2746
        proclocktag.myLock = lock;
 
2747
        proclocktag.myProc = proc;
 
2748
 
 
2749
        proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
 
2750
 
 
2751
        proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
 
2752
                                                                                                                (void *) &proclocktag,
 
2753
                                                                                                                proclock_hashcode,
 
2754
                                                                                                                HASH_FIND,
 
2755
                                                                                                                NULL);
 
2756
        if (!proclock)
 
2757
                elog(PANIC, "failed to re-find shared proclock object");
 
2758
 
 
2759
        /*
 
2760
         * Double-check that we are actually holding a lock of the type we want to
 
2761
         * release.
 
2762
         */
 
2763
        if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
 
2764
        {
 
2765
                PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
 
2766
                LWLockRelease(partitionLock);
 
2767
                elog(WARNING, "you don't own a lock of type %s",
 
2768
                         lockMethodTable->lockModeNames[lockmode]);
 
2769
                return;
 
2770
        }
 
2771
 
 
2772
        /*
 
2773
         * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
 
2774
         */
 
2775
        wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
 
2776
 
 
2777
        CleanUpLock(lock, proclock,
 
2778
                                lockMethodTable, hashcode,
 
2779
                                wakeupNeeded);
 
2780
 
 
2781
        LWLockRelease(partitionLock);
 
2782
}
 
2783
 
 
2784
/*
 
2785
 * 2PC processing routine for ROLLBACK PREPARED case.
 
2786
 *
 
2787
 * This is actually just the same as the COMMIT case.
 
2788
 */
 
2789
void
 
2790
lock_twophase_postabort(TransactionId xid, uint16 info,
 
2791
                                                void *recdata, uint32 len)
 
2792
{
 
2793
        lock_twophase_postcommit(xid, info, recdata, len);
 
2794
}