~ubuntu-branches/ubuntu/hardy/postgresql-8.4/hardy-backports

« back to all changes in this revision

Viewing changes to src/backend/utils/adt/txid.c

  • Committer: Bazaar Package Importer
  • Author(s): Martin Pitt
  • Date: 2009-03-20 12:00:13 UTC
  • Revision ID: james.westby@ubuntu.com-20090320120013-hogj7egc5mjncc5g
Tags: upstream-8.4~0cvs20090328
ImportĀ upstreamĀ versionĀ 8.4~0cvs20090328

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 * txid.c
 
3
 *
 
4
 *      Export internal transaction IDs to user level.
 
5
 *
 
6
 * Note that only top-level transaction IDs are ever converted to TXID.
 
7
 * This is important because TXIDs frequently persist beyond the global
 
8
 * xmin horizon, or may even be shipped to other machines, so we cannot
 
9
 * rely on being able to correlate subtransaction IDs with their parents
 
10
 * via functions such as SubTransGetTopmostTransaction().
 
11
 *
 
12
 *
 
13
 *      Copyright (c) 2003-2009, PostgreSQL Global Development Group
 
14
 *      Author: Jan Wieck, Afilias USA INC.
 
15
 *      64-bit txids: Marko Kreen, Skype Technologies
 
16
 *
 
17
 *      $PostgreSQL$
 
18
 *
 
19
 *-------------------------------------------------------------------------
 
20
 */
 
21
 
 
22
#include "postgres.h"
 
23
 
 
24
#include "access/transam.h"
 
25
#include "access/xact.h"
 
26
#include "funcapi.h"
 
27
#include "libpq/pqformat.h"
 
28
#include "utils/builtins.h"
 
29
#include "utils/snapmgr.h"
 
30
 
 
31
 
 
32
#ifndef INT64_IS_BUSTED
 
33
/* txid will be signed int8 in database, so must limit to 63 bits */
 
34
#define MAX_TXID   UINT64CONST(0x7FFFFFFFFFFFFFFF)
 
35
#else
 
36
/* we only really have 32 bits to work with :-( */
 
37
#define MAX_TXID   UINT64CONST(0x7FFFFFFF)
 
38
#endif
 
39
 
 
40
/* Use unsigned variant internally */
 
41
typedef uint64 txid;
 
42
 
 
43
/* sprintf format code for uint64 */
 
44
#define TXID_FMT UINT64_FORMAT
 
45
 
 
46
/*
 
47
 * If defined, use bsearch() function for searching for txids in snapshots
 
48
 * that have more than the specified number of values.
 
49
 */
 
50
#define USE_BSEARCH_IF_NXIP_GREATER 30
 
51
 
 
52
 
 
53
/*
 
54
 * Snapshot containing 8byte txids.
 
55
 */
 
56
typedef struct
 
57
{
 
58
        /*
 
59
         * 4-byte length hdr, should not be touched directly.
 
60
         *
 
61
         * Explicit embedding is ok as we want always correct alignment anyway.
 
62
         */
 
63
        int32           __varsz;
 
64
 
 
65
        uint32          nxip;                   /* number of txids in xip array */
 
66
        txid            xmin;
 
67
        txid            xmax;
 
68
        txid            xip[1];                 /* in-progress txids, xmin <= xip[i] < xmax */
 
69
} TxidSnapshot;
 
70
 
 
71
#define TXID_SNAPSHOT_SIZE(nxip) \
 
72
        (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
 
73
 
 
74
/*
 
75
 * Epoch values from xact.c
 
76
 */
 
77
typedef struct
 
78
{
 
79
        TransactionId last_xid;
 
80
        uint32          epoch;
 
81
} TxidEpoch;
 
82
 
 
83
 
 
84
/*
 
85
 * Fetch epoch data from xact.c.
 
86
 */
 
87
static void
 
88
load_xid_epoch(TxidEpoch *state)
 
89
{
 
90
        GetNextXidAndEpoch(&state->last_xid, &state->epoch);
 
91
}
 
92
 
 
93
/*
 
94
 * do a TransactionId -> txid conversion for an XID near the given epoch
 
95
 */
 
96
static txid
 
97
convert_xid(TransactionId xid, const TxidEpoch *state)
 
98
{
 
99
#ifndef INT64_IS_BUSTED
 
100
        uint64          epoch;
 
101
 
 
102
        /* return special xid's as-is */
 
103
        if (!TransactionIdIsNormal(xid))
 
104
                return (txid) xid;
 
105
 
 
106
        /* xid can be on either side when near wrap-around */
 
107
        epoch = (uint64) state->epoch;
 
108
        if (xid > state->last_xid &&
 
109
                TransactionIdPrecedes(xid, state->last_xid))
 
110
                epoch--;
 
111
        else if (xid < state->last_xid &&
 
112
                         TransactionIdFollows(xid, state->last_xid))
 
113
                epoch++;
 
114
 
 
115
        return (epoch << 32) | xid;
 
116
#else                                                   /* INT64_IS_BUSTED */
 
117
        /* we can't do anything with the epoch, so ignore it */
 
118
        return (txid) xid & MAX_TXID;
 
119
#endif   /* INT64_IS_BUSTED */
 
120
}
 
121
 
 
122
/*
 
123
 * txid comparator for qsort/bsearch
 
124
 */
 
125
static int
 
126
cmp_txid(const void *aa, const void *bb)
 
127
{
 
128
        txid            a = *(const txid *) aa;
 
129
        txid            b = *(const txid *) bb;
 
130
 
 
131
        if (a < b)
 
132
                return -1;
 
133
        if (a > b)
 
134
                return 1;
 
135
        return 0;
 
136
}
 
137
 
 
138
/*
 
139
 * sort a snapshot's txids, so we can use bsearch() later.
 
140
 *
 
141
 * For consistency of on-disk representation, we always sort even if bsearch
 
142
 * will not be used.
 
143
 */
 
144
static void
 
145
sort_snapshot(TxidSnapshot *snap)
 
146
{
 
147
        if (snap->nxip > 1)
 
148
                qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
 
149
}
 
150
 
 
151
/*
 
152
 * check txid visibility.
 
153
 */
 
154
static bool
 
155
is_visible_txid(txid value, const TxidSnapshot *snap)
 
156
{
 
157
        if (value < snap->xmin)
 
158
                return true;
 
159
        else if (value >= snap->xmax)
 
160
                return false;
 
161
#ifdef USE_BSEARCH_IF_NXIP_GREATER
 
162
        else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
 
163
        {
 
164
                void       *res;
 
165
 
 
166
                res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
 
167
                /* if found, transaction is still in progress */
 
168
                return (res) ? false : true;
 
169
        }
 
170
#endif
 
171
        else
 
172
        {
 
173
                uint32          i;
 
174
 
 
175
                for (i = 0; i < snap->nxip; i++)
 
176
                {
 
177
                        if (value == snap->xip[i])
 
178
                                return false;
 
179
                }
 
180
                return true;
 
181
        }
 
182
}
 
183
 
 
184
/*
 
185
 * helper functions to use StringInfo for TxidSnapshot creation.
 
186
 */
 
187
 
 
188
static StringInfo
 
189
buf_init(txid xmin, txid xmax)
 
190
{
 
191
        TxidSnapshot snap;
 
192
        StringInfo      buf;
 
193
 
 
194
        snap.xmin = xmin;
 
195
        snap.xmax = xmax;
 
196
        snap.nxip = 0;
 
197
 
 
198
        buf = makeStringInfo();
 
199
        appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
 
200
        return buf;
 
201
}
 
202
 
 
203
static void
 
204
buf_add_txid(StringInfo buf, txid xid)
 
205
{
 
206
        TxidSnapshot *snap = (TxidSnapshot *) buf->data;
 
207
 
 
208
        /* do this before possible realloc */
 
209
        snap->nxip++;
 
210
 
 
211
        appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
 
212
}
 
213
 
 
214
static TxidSnapshot *
 
215
buf_finalize(StringInfo buf)
 
216
{
 
217
        TxidSnapshot *snap = (TxidSnapshot *) buf->data;
 
218
 
 
219
        SET_VARSIZE(snap, buf->len);
 
220
 
 
221
        /* buf is not needed anymore */
 
222
        buf->data = NULL;
 
223
        pfree(buf);
 
224
 
 
225
        return snap;
 
226
}
 
227
 
 
228
/*
 
229
 * simple number parser.
 
230
 *
 
231
 * We return 0 on error, which is invalid value for txid.
 
232
 */
 
233
static txid
 
234
str2txid(const char *s, const char **endp)
 
235
{
 
236
        txid            val = 0;
 
237
        txid            cutoff = MAX_TXID / 10;
 
238
        txid            cutlim = MAX_TXID % 10;
 
239
 
 
240
        for (; *s; s++)
 
241
        {
 
242
                unsigned        d;
 
243
 
 
244
                if (*s < '0' || *s > '9')
 
245
                        break;
 
246
                d = *s - '0';
 
247
 
 
248
                /*
 
249
                 * check for overflow
 
250
                 */
 
251
                if (val > cutoff || (val == cutoff && d > cutlim))
 
252
                {
 
253
                        val = 0;
 
254
                        break;
 
255
                }
 
256
 
 
257
                val = val * 10 + d;
 
258
        }
 
259
        if (endp)
 
260
                *endp = s;
 
261
        return val;
 
262
}
 
263
 
 
264
/*
 
265
 * parse snapshot from cstring
 
266
 */
 
267
static TxidSnapshot *
 
268
parse_snapshot(const char *str)
 
269
{
 
270
        txid            xmin;
 
271
        txid            xmax;
 
272
        txid            last_val = 0,
 
273
                                val;
 
274
        const char *str_start = str;
 
275
        const char *endp;
 
276
        StringInfo      buf;
 
277
 
 
278
        xmin = str2txid(str, &endp);
 
279
        if (*endp != ':')
 
280
                goto bad_format;
 
281
        str = endp + 1;
 
282
 
 
283
        xmax = str2txid(str, &endp);
 
284
        if (*endp != ':')
 
285
                goto bad_format;
 
286
        str = endp + 1;
 
287
 
 
288
        /* it should look sane */
 
289
        if (xmin == 0 || xmax == 0 || xmin > xmax)
 
290
                goto bad_format;
 
291
 
 
292
        /* allocate buffer */
 
293
        buf = buf_init(xmin, xmax);
 
294
 
 
295
        /* loop over values */
 
296
        while (*str != '\0')
 
297
        {
 
298
                /* read next value */
 
299
                val = str2txid(str, &endp);
 
300
                str = endp;
 
301
 
 
302
                /* require the input to be in order */
 
303
                if (val < xmin || val >= xmax || val <= last_val)
 
304
                        goto bad_format;
 
305
 
 
306
                buf_add_txid(buf, val);
 
307
                last_val = val;
 
308
 
 
309
                if (*str == ',')
 
310
                        str++;
 
311
                else if (*str != '\0')
 
312
                        goto bad_format;
 
313
        }
 
314
 
 
315
        return buf_finalize(buf);
 
316
 
 
317
bad_format:
 
318
        elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start);
 
319
        return NULL;
 
320
}
 
321
 
 
322
/*
 
323
 * Public functions.
 
324
 *
 
325
 * txid_current() and txid_current_snapshot() are the only ones that
 
326
 * communicate with core xid machinery.  All the others work on data
 
327
 * returned by them.
 
328
 */
 
329
 
 
330
/*
 
331
 * txid_current() returns int8
 
332
 *
 
333
 *              Return the current toplevel transaction ID as TXID
 
334
 */
 
335
Datum
 
336
txid_current(PG_FUNCTION_ARGS)
 
337
{
 
338
        txid            val;
 
339
        TxidEpoch       state;
 
340
 
 
341
        load_xid_epoch(&state);
 
342
 
 
343
        val = convert_xid(GetTopTransactionId(), &state);
 
344
 
 
345
        PG_RETURN_INT64(val);
 
346
}
 
347
 
 
348
/*
 
349
 * txid_current_snapshot() returns txid_snapshot
 
350
 *
 
351
 *              Return current snapshot in TXID format
 
352
 *
 
353
 * Note that only top-transaction XIDs are included in the snapshot.
 
354
 */
 
355
Datum
 
356
txid_current_snapshot(PG_FUNCTION_ARGS)
 
357
{
 
358
        TxidSnapshot *snap;
 
359
        uint32          nxip,
 
360
                                i,
 
361
                                size;
 
362
        TxidEpoch       state;
 
363
        Snapshot        cur;
 
364
 
 
365
        cur = GetActiveSnapshot();
 
366
        if (cur == NULL)
 
367
                elog(ERROR, "no active snapshot set");
 
368
 
 
369
        load_xid_epoch(&state);
 
370
 
 
371
        /* allocate */
 
372
        nxip = cur->xcnt;
 
373
        size = TXID_SNAPSHOT_SIZE(nxip);
 
374
        snap = palloc(size);
 
375
        SET_VARSIZE(snap, size);
 
376
 
 
377
        /* fill */
 
378
        snap->xmin = convert_xid(cur->xmin, &state);
 
379
        snap->xmax = convert_xid(cur->xmax, &state);
 
380
        snap->nxip = nxip;
 
381
        for (i = 0; i < nxip; i++)
 
382
                snap->xip[i] = convert_xid(cur->xip[i], &state);
 
383
 
 
384
        /* we want them guaranteed to be in ascending order */
 
385
        sort_snapshot(snap);
 
386
 
 
387
        PG_RETURN_POINTER(snap);
 
388
}
 
389
 
 
390
/*
 
391
 * txid_snapshot_in(cstring) returns txid_snapshot
 
392
 *
 
393
 *              input function for type txid_snapshot
 
394
 */
 
395
Datum
 
396
txid_snapshot_in(PG_FUNCTION_ARGS)
 
397
{
 
398
        char       *str = PG_GETARG_CSTRING(0);
 
399
        TxidSnapshot *snap;
 
400
 
 
401
        snap = parse_snapshot(str);
 
402
 
 
403
        PG_RETURN_POINTER(snap);
 
404
}
 
405
 
 
406
/*
 
407
 * txid_snapshot_out(txid_snapshot) returns cstring
 
408
 *
 
409
 *              output function for type txid_snapshot
 
410
 */
 
411
Datum
 
412
txid_snapshot_out(PG_FUNCTION_ARGS)
 
413
{
 
414
        TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
 
415
        StringInfoData str;
 
416
        uint32          i;
 
417
 
 
418
        initStringInfo(&str);
 
419
 
 
420
        appendStringInfo(&str, TXID_FMT ":", snap->xmin);
 
421
        appendStringInfo(&str, TXID_FMT ":", snap->xmax);
 
422
 
 
423
        for (i = 0; i < snap->nxip; i++)
 
424
        {
 
425
                if (i > 0)
 
426
                        appendStringInfoChar(&str, ',');
 
427
                appendStringInfo(&str, TXID_FMT, snap->xip[i]);
 
428
        }
 
429
 
 
430
        PG_RETURN_CSTRING(str.data);
 
431
}
 
432
 
 
433
/*
 
434
 * txid_snapshot_recv(internal) returns txid_snapshot
 
435
 *
 
436
 *              binary input function for type txid_snapshot
 
437
 *
 
438
 *              format: int4 nxip, int8 xmin, int8 xmax, int8 xip
 
439
 */
 
440
Datum
 
441
txid_snapshot_recv(PG_FUNCTION_ARGS)
 
442
{
 
443
        StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
 
444
        TxidSnapshot *snap;
 
445
        txid            last = 0;
 
446
        int                     nxip;
 
447
        int                     i;
 
448
        int                     avail;
 
449
        int                     expect;
 
450
        txid            xmin,
 
451
                                xmax;
 
452
 
 
453
        /*
 
454
         * load nxip and check for nonsense.
 
455
         *
 
456
         * (nxip > avail) check is against int overflows in 'expect'.
 
457
         */
 
458
        nxip = pq_getmsgint(buf, 4);
 
459
        avail = buf->len - buf->cursor;
 
460
        expect = 8 + 8 + nxip * 8;
 
461
        if (nxip < 0 || nxip > avail || expect > avail)
 
462
                goto bad_format;
 
463
 
 
464
        xmin = pq_getmsgint64(buf);
 
465
        xmax = pq_getmsgint64(buf);
 
466
        if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
 
467
                goto bad_format;
 
468
 
 
469
        snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
 
470
        snap->xmin = xmin;
 
471
        snap->xmax = xmax;
 
472
        snap->nxip = nxip;
 
473
        SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
 
474
 
 
475
        for (i = 0; i < nxip; i++)
 
476
        {
 
477
                txid            cur = pq_getmsgint64(buf);
 
478
 
 
479
                if (cur <= last || cur < xmin || cur >= xmax)
 
480
                        goto bad_format;
 
481
                snap->xip[i] = cur;
 
482
                last = cur;
 
483
        }
 
484
        PG_RETURN_POINTER(snap);
 
485
 
 
486
bad_format:
 
487
        elog(ERROR, "invalid snapshot data");
 
488
        return (Datum) NULL;
 
489
}
 
490
 
 
491
/*
 
492
 * txid_snapshot_send(txid_snapshot) returns bytea
 
493
 *
 
494
 *              binary output function for type txid_snapshot
 
495
 *
 
496
 *              format: int4 nxip, int8 xmin, int8 xmax, int8 xip
 
497
 */
 
498
Datum
 
499
txid_snapshot_send(PG_FUNCTION_ARGS)
 
500
{
 
501
        TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
 
502
        StringInfoData buf;
 
503
        uint32          i;
 
504
 
 
505
        pq_begintypsend(&buf);
 
506
        pq_sendint(&buf, snap->nxip, 4);
 
507
        pq_sendint64(&buf, snap->xmin);
 
508
        pq_sendint64(&buf, snap->xmax);
 
509
        for (i = 0; i < snap->nxip; i++)
 
510
                pq_sendint64(&buf, snap->xip[i]);
 
511
        PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
 
512
}
 
513
 
 
514
/*
 
515
 * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
 
516
 *
 
517
 *              is txid visible in snapshot ?
 
518
 */
 
519
Datum
 
520
txid_visible_in_snapshot(PG_FUNCTION_ARGS)
 
521
{
 
522
        txid            value = PG_GETARG_INT64(0);
 
523
        TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
 
524
 
 
525
        PG_RETURN_BOOL(is_visible_txid(value, snap));
 
526
}
 
527
 
 
528
/*
 
529
 * txid_snapshot_xmin(txid_snapshot) returns int8
 
530
 *
 
531
 *              return snapshot's xmin
 
532
 */
 
533
Datum
 
534
txid_snapshot_xmin(PG_FUNCTION_ARGS)
 
535
{
 
536
        TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
 
537
 
 
538
        PG_RETURN_INT64(snap->xmin);
 
539
}
 
540
 
 
541
/*
 
542
 * txid_snapshot_xmax(txid_snapshot) returns int8
 
543
 *
 
544
 *              return snapshot's xmax
 
545
 */
 
546
Datum
 
547
txid_snapshot_xmax(PG_FUNCTION_ARGS)
 
548
{
 
549
        TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
 
550
 
 
551
        PG_RETURN_INT64(snap->xmax);
 
552
}
 
553
 
 
554
/*
 
555
 * txid_snapshot_xip(txid_snapshot) returns setof int8
 
556
 *
 
557
 *              return in-progress TXIDs in snapshot.
 
558
 */
 
559
Datum
 
560
txid_snapshot_xip(PG_FUNCTION_ARGS)
 
561
{
 
562
        FuncCallContext *fctx;
 
563
        TxidSnapshot *snap;
 
564
        txid            value;
 
565
 
 
566
        /* on first call initialize snap_state and get copy of snapshot */
 
567
        if (SRF_IS_FIRSTCALL())
 
568
        {
 
569
                TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
 
570
 
 
571
                fctx = SRF_FIRSTCALL_INIT();
 
572
 
 
573
                /* make a copy of user snapshot */
 
574
                snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
 
575
                memcpy(snap, arg, VARSIZE(arg));
 
576
 
 
577
                fctx->user_fctx = snap;
 
578
        }
 
579
 
 
580
        /* return values one-by-one */
 
581
        fctx = SRF_PERCALL_SETUP();
 
582
        snap = fctx->user_fctx;
 
583
        if (fctx->call_cntr < snap->nxip)
 
584
        {
 
585
                value = snap->xip[fctx->call_cntr];
 
586
                SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
 
587
        }
 
588
        else
 
589
        {
 
590
                SRF_RETURN_DONE(fctx);
 
591
        }
 
592
}