1
/*-------------------------------------------------------------------------
4
* Export internal transaction IDs to user level.
6
* Note that only top-level transaction IDs are ever converted to TXID.
7
* This is important because TXIDs frequently persist beyond the global
8
* xmin horizon, or may even be shipped to other machines, so we cannot
9
* rely on being able to correlate subtransaction IDs with their parents
10
* via functions such as SubTransGetTopmostTransaction().
13
* Copyright (c) 2003-2009, PostgreSQL Global Development Group
14
* Author: Jan Wieck, Afilias USA INC.
15
* 64-bit txids: Marko Kreen, Skype Technologies
19
*-------------------------------------------------------------------------
24
#include "access/transam.h"
25
#include "access/xact.h"
27
#include "libpq/pqformat.h"
28
#include "utils/builtins.h"
29
#include "utils/snapmgr.h"
32
#ifndef INT64_IS_BUSTED
33
/* txid will be signed int8 in database, so must limit to 63 bits */
34
#define MAX_TXID UINT64CONST(0x7FFFFFFFFFFFFFFF)
36
/* we only really have 32 bits to work with :-( */
37
#define MAX_TXID UINT64CONST(0x7FFFFFFF)
40
/* Use unsigned variant internally */
43
/* sprintf format code for uint64 */
44
#define TXID_FMT UINT64_FORMAT
47
* If defined, use bsearch() function for searching for txids in snapshots
48
* that have more than the specified number of values.
50
#define USE_BSEARCH_IF_NXIP_GREATER 30
54
* Snapshot containing 8byte txids.
59
* 4-byte length hdr, should not be touched directly.
61
* Explicit embedding is ok as we want always correct alignment anyway.
65
uint32 nxip; /* number of txids in xip array */
68
txid xip[1]; /* in-progress txids, xmin <= xip[i] < xmax */
71
#define TXID_SNAPSHOT_SIZE(nxip) \
72
(offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
75
* Epoch values from xact.c
79
TransactionId last_xid;
85
* Fetch epoch data from xact.c.
88
load_xid_epoch(TxidEpoch *state)
90
GetNextXidAndEpoch(&state->last_xid, &state->epoch);
94
* do a TransactionId -> txid conversion for an XID near the given epoch
97
convert_xid(TransactionId xid, const TxidEpoch *state)
99
#ifndef INT64_IS_BUSTED
102
/* return special xid's as-is */
103
if (!TransactionIdIsNormal(xid))
106
/* xid can be on either side when near wrap-around */
107
epoch = (uint64) state->epoch;
108
if (xid > state->last_xid &&
109
TransactionIdPrecedes(xid, state->last_xid))
111
else if (xid < state->last_xid &&
112
TransactionIdFollows(xid, state->last_xid))
115
return (epoch << 32) | xid;
116
#else /* INT64_IS_BUSTED */
117
/* we can't do anything with the epoch, so ignore it */
118
return (txid) xid & MAX_TXID;
119
#endif /* INT64_IS_BUSTED */
123
* txid comparator for qsort/bsearch
126
cmp_txid(const void *aa, const void *bb)
128
txid a = *(const txid *) aa;
129
txid b = *(const txid *) bb;
139
* sort a snapshot's txids, so we can use bsearch() later.
141
* For consistency of on-disk representation, we always sort even if bsearch
145
sort_snapshot(TxidSnapshot *snap)
148
qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
152
* check txid visibility.
155
is_visible_txid(txid value, const TxidSnapshot *snap)
157
if (value < snap->xmin)
159
else if (value >= snap->xmax)
161
#ifdef USE_BSEARCH_IF_NXIP_GREATER
162
else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
166
res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
167
/* if found, transaction is still in progress */
168
return (res) ? false : true;
175
for (i = 0; i < snap->nxip; i++)
177
if (value == snap->xip[i])
185
* helper functions to use StringInfo for TxidSnapshot creation.
189
buf_init(txid xmin, txid xmax)
198
buf = makeStringInfo();
199
appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
204
buf_add_txid(StringInfo buf, txid xid)
206
TxidSnapshot *snap = (TxidSnapshot *) buf->data;
208
/* do this before possible realloc */
211
appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
214
static TxidSnapshot *
215
buf_finalize(StringInfo buf)
217
TxidSnapshot *snap = (TxidSnapshot *) buf->data;
219
SET_VARSIZE(snap, buf->len);
221
/* buf is not needed anymore */
229
* simple number parser.
231
* We return 0 on error, which is invalid value for txid.
234
str2txid(const char *s, const char **endp)
237
txid cutoff = MAX_TXID / 10;
238
txid cutlim = MAX_TXID % 10;
244
if (*s < '0' || *s > '9')
251
if (val > cutoff || (val == cutoff && d > cutlim))
265
* parse snapshot from cstring
267
static TxidSnapshot *
268
parse_snapshot(const char *str)
274
const char *str_start = str;
278
xmin = str2txid(str, &endp);
283
xmax = str2txid(str, &endp);
288
/* it should look sane */
289
if (xmin == 0 || xmax == 0 || xmin > xmax)
292
/* allocate buffer */
293
buf = buf_init(xmin, xmax);
295
/* loop over values */
298
/* read next value */
299
val = str2txid(str, &endp);
302
/* require the input to be in order */
303
if (val < xmin || val >= xmax || val <= last_val)
306
buf_add_txid(buf, val);
311
else if (*str != '\0')
315
return buf_finalize(buf);
318
elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start);
325
* txid_current() and txid_current_snapshot() are the only ones that
326
* communicate with core xid machinery. All the others work on data
331
* txid_current() returns int8
333
* Return the current toplevel transaction ID as TXID
336
txid_current(PG_FUNCTION_ARGS)
341
load_xid_epoch(&state);
343
val = convert_xid(GetTopTransactionId(), &state);
345
PG_RETURN_INT64(val);
349
* txid_current_snapshot() returns txid_snapshot
351
* Return current snapshot in TXID format
353
* Note that only top-transaction XIDs are included in the snapshot.
356
txid_current_snapshot(PG_FUNCTION_ARGS)
365
cur = GetActiveSnapshot();
367
elog(ERROR, "no active snapshot set");
369
load_xid_epoch(&state);
373
size = TXID_SNAPSHOT_SIZE(nxip);
375
SET_VARSIZE(snap, size);
378
snap->xmin = convert_xid(cur->xmin, &state);
379
snap->xmax = convert_xid(cur->xmax, &state);
381
for (i = 0; i < nxip; i++)
382
snap->xip[i] = convert_xid(cur->xip[i], &state);
384
/* we want them guaranteed to be in ascending order */
387
PG_RETURN_POINTER(snap);
391
* txid_snapshot_in(cstring) returns txid_snapshot
393
* input function for type txid_snapshot
396
txid_snapshot_in(PG_FUNCTION_ARGS)
398
char *str = PG_GETARG_CSTRING(0);
401
snap = parse_snapshot(str);
403
PG_RETURN_POINTER(snap);
407
* txid_snapshot_out(txid_snapshot) returns cstring
409
* output function for type txid_snapshot
412
txid_snapshot_out(PG_FUNCTION_ARGS)
414
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
418
initStringInfo(&str);
420
appendStringInfo(&str, TXID_FMT ":", snap->xmin);
421
appendStringInfo(&str, TXID_FMT ":", snap->xmax);
423
for (i = 0; i < snap->nxip; i++)
426
appendStringInfoChar(&str, ',');
427
appendStringInfo(&str, TXID_FMT, snap->xip[i]);
430
PG_RETURN_CSTRING(str.data);
434
* txid_snapshot_recv(internal) returns txid_snapshot
436
* binary input function for type txid_snapshot
438
* format: int4 nxip, int8 xmin, int8 xmax, int8 xip
441
txid_snapshot_recv(PG_FUNCTION_ARGS)
443
StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
454
* load nxip and check for nonsense.
456
* (nxip > avail) check is against int overflows in 'expect'.
458
nxip = pq_getmsgint(buf, 4);
459
avail = buf->len - buf->cursor;
460
expect = 8 + 8 + nxip * 8;
461
if (nxip < 0 || nxip > avail || expect > avail)
464
xmin = pq_getmsgint64(buf);
465
xmax = pq_getmsgint64(buf);
466
if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
469
snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
473
SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
475
for (i = 0; i < nxip; i++)
477
txid cur = pq_getmsgint64(buf);
479
if (cur <= last || cur < xmin || cur >= xmax)
484
PG_RETURN_POINTER(snap);
487
elog(ERROR, "invalid snapshot data");
492
* txid_snapshot_send(txid_snapshot) returns bytea
494
* binary output function for type txid_snapshot
496
* format: int4 nxip, int8 xmin, int8 xmax, int8 xip
499
txid_snapshot_send(PG_FUNCTION_ARGS)
501
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
505
pq_begintypsend(&buf);
506
pq_sendint(&buf, snap->nxip, 4);
507
pq_sendint64(&buf, snap->xmin);
508
pq_sendint64(&buf, snap->xmax);
509
for (i = 0; i < snap->nxip; i++)
510
pq_sendint64(&buf, snap->xip[i]);
511
PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
515
* txid_visible_in_snapshot(int8, txid_snapshot) returns bool
517
* is txid visible in snapshot ?
520
txid_visible_in_snapshot(PG_FUNCTION_ARGS)
522
txid value = PG_GETARG_INT64(0);
523
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
525
PG_RETURN_BOOL(is_visible_txid(value, snap));
529
* txid_snapshot_xmin(txid_snapshot) returns int8
531
* return snapshot's xmin
534
txid_snapshot_xmin(PG_FUNCTION_ARGS)
536
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
538
PG_RETURN_INT64(snap->xmin);
542
* txid_snapshot_xmax(txid_snapshot) returns int8
544
* return snapshot's xmax
547
txid_snapshot_xmax(PG_FUNCTION_ARGS)
549
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
551
PG_RETURN_INT64(snap->xmax);
555
* txid_snapshot_xip(txid_snapshot) returns setof int8
557
* return in-progress TXIDs in snapshot.
560
txid_snapshot_xip(PG_FUNCTION_ARGS)
562
FuncCallContext *fctx;
566
/* on first call initialize snap_state and get copy of snapshot */
567
if (SRF_IS_FIRSTCALL())
569
TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
571
fctx = SRF_FIRSTCALL_INIT();
573
/* make a copy of user snapshot */
574
snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
575
memcpy(snap, arg, VARSIZE(arg));
577
fctx->user_fctx = snap;
580
/* return values one-by-one */
581
fctx = SRF_PERCALL_SETUP();
582
snap = fctx->user_fctx;
583
if (fctx->call_cntr < snap->nxip)
585
value = snap->xip[fctx->call_cntr];
586
SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
590
SRF_RETURN_DONE(fctx);