~ubuntu-branches/ubuntu/oneiric/postgresql-9.1/oneiric-security

« back to all changes in this revision

Viewing changes to src/backend/access/gist/gistxlog.c

  • Committer: Bazaar Package Importer
  • Author(s): Martin Pitt
  • Date: 2011-05-11 10:41:53 UTC
  • Revision ID: james.westby@ubuntu.com-20110511104153-psbh2o58553fv1m0
Tags: upstream-9.1~beta1
ImportĀ upstreamĀ versionĀ 9.1~beta1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * gistxlog.c
 
4
 *        WAL replay logic for GiST.
 
5
 *
 
6
 *
 
7
 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
 
8
 * Portions Copyright (c) 1994, Regents of the University of California
 
9
 *
 
10
 * IDENTIFICATION
 
11
 *                       src/backend/access/gist/gistxlog.c
 
12
 *-------------------------------------------------------------------------
 
13
 */
 
14
#include "postgres.h"
 
15
 
 
16
#include "access/gist_private.h"
 
17
#include "access/xlogutils.h"
 
18
#include "miscadmin.h"
 
19
#include "storage/bufmgr.h"
 
20
#include "utils/memutils.h"
 
21
#include "utils/rel.h"
 
22
 
 
23
typedef struct
 
24
{
 
25
        gistxlogPage *header;
 
26
        IndexTuple *itup;
 
27
} NewPage;
 
28
 
 
29
typedef struct
 
30
{
 
31
        gistxlogPageSplit *data;
 
32
        NewPage    *page;
 
33
} PageSplitRecord;
 
34
 
 
35
static MemoryContext opCtx;             /* working memory for operations */
 
36
 
 
37
/*
 
38
 * Replay the clearing of F_FOLLOW_RIGHT flag.
 
39
 */
 
40
static void
 
41
gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn,
 
42
                                                 BlockNumber leftblkno)
 
43
{
 
44
        Buffer          buffer;
 
45
 
 
46
        buffer = XLogReadBuffer(node, leftblkno, false);
 
47
        if (BufferIsValid(buffer))
 
48
        {
 
49
                Page            page = (Page) BufferGetPage(buffer);
 
50
 
 
51
                /*
 
52
                 * Note that we still update the page even if page LSN is equal to the
 
53
                 * LSN of this record, because the updated NSN is not included in the
 
54
                 * full page image.
 
55
                 */
 
56
                if (!XLByteLT(lsn, PageGetLSN(page)))
 
57
                {
 
58
                        GistPageGetOpaque(page)->nsn = lsn;
 
59
                        GistClearFollowRight(page);
 
60
 
 
61
                        PageSetLSN(page, lsn);
 
62
                        PageSetTLI(page, ThisTimeLineID);
 
63
                        MarkBufferDirty(buffer);
 
64
                }
 
65
                UnlockReleaseBuffer(buffer);
 
66
        }
 
67
}
 
68
 
 
69
/*
 
70
 * redo any page update (except page split)
 
71
 */
 
72
static void
 
73
gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
 
74
{
 
75
        char       *begin = XLogRecGetData(record);
 
76
        gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
 
77
        Buffer          buffer;
 
78
        Page            page;
 
79
        char       *data;
 
80
 
 
81
        if (BlockNumberIsValid(xldata->leftchild))
 
82
                gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
 
83
 
 
84
        /* nothing more to do if page was backed up (and no info to do it with) */
 
85
        if (record->xl_info & XLR_BKP_BLOCK_1)
 
86
                return;
 
87
 
 
88
        buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
 
89
        if (!BufferIsValid(buffer))
 
90
                return;
 
91
        page = (Page) BufferGetPage(buffer);
 
92
 
 
93
        if (XLByteLE(lsn, PageGetLSN(page)))
 
94
        {
 
95
                UnlockReleaseBuffer(buffer);
 
96
                return;
 
97
        }
 
98
 
 
99
        data = begin + sizeof(gistxlogPageUpdate);
 
100
 
 
101
        /* Delete old tuples */
 
102
        if (xldata->ntodelete > 0)
 
103
        {
 
104
                int                     i;
 
105
                OffsetNumber *todelete = (OffsetNumber *) data;
 
106
 
 
107
                data += sizeof(OffsetNumber) * xldata->ntodelete;
 
108
 
 
109
                for (i = 0; i < xldata->ntodelete; i++)
 
110
                        PageIndexTupleDelete(page, todelete[i]);
 
111
                if (GistPageIsLeaf(page))
 
112
                        GistMarkTuplesDeleted(page);
 
113
        }
 
114
 
 
115
        /* add tuples */
 
116
        if (data - begin < record->xl_len)
 
117
        {
 
118
                OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
 
119
                OffsetNumberNext(PageGetMaxOffsetNumber(page));
 
120
 
 
121
                while (data - begin < record->xl_len)
 
122
                {
 
123
                        IndexTuple      itup = (IndexTuple) data;
 
124
                        Size            sz = IndexTupleSize(itup);
 
125
                        OffsetNumber l;
 
126
 
 
127
                        data += sz;
 
128
 
 
129
                        l = PageAddItem(page, (Item) itup, sz, off, false, false);
 
130
                        if (l == InvalidOffsetNumber)
 
131
                                elog(ERROR, "failed to add item to GiST index page, size %d bytes",
 
132
                                         (int) sz);
 
133
                        off++;
 
134
                }
 
135
        }
 
136
        else
 
137
        {
 
138
                /*
 
139
                 * special case: leafpage, nothing to insert, nothing to delete, then
 
140
                 * vacuum marks page
 
141
                 */
 
142
                if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
 
143
                        GistClearTuplesDeleted(page);
 
144
        }
 
145
 
 
146
        if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
 
147
 
 
148
                /*
 
149
                 * all links on non-leaf root page was deleted by vacuum full, so root
 
150
                 * page becomes a leaf
 
151
                 */
 
152
                GistPageSetLeaf(page);
 
153
 
 
154
        GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
 
155
        PageSetLSN(page, lsn);
 
156
        PageSetTLI(page, ThisTimeLineID);
 
157
        MarkBufferDirty(buffer);
 
158
        UnlockReleaseBuffer(buffer);
 
159
}
 
160
 
 
161
static void
 
162
gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
 
163
{
 
164
        gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
 
165
        Buffer          buffer;
 
166
        Page            page;
 
167
 
 
168
        /* nothing else to do if page was backed up (and no info to do it with) */
 
169
        if (record->xl_info & XLR_BKP_BLOCK_1)
 
170
                return;
 
171
 
 
172
        buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
 
173
        if (!BufferIsValid(buffer))
 
174
                return;
 
175
 
 
176
        page = (Page) BufferGetPage(buffer);
 
177
        GistPageSetDeleted(page);
 
178
 
 
179
        PageSetLSN(page, lsn);
 
180
        PageSetTLI(page, ThisTimeLineID);
 
181
        MarkBufferDirty(buffer);
 
182
        UnlockReleaseBuffer(buffer);
 
183
}
 
184
 
 
185
static void
 
186
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
 
187
{
 
188
        char       *begin = XLogRecGetData(record),
 
189
                           *ptr;
 
190
        int                     j,
 
191
                                i = 0;
 
192
 
 
193
        decoded->data = (gistxlogPageSplit *) begin;
 
194
        decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
 
195
 
 
196
        ptr = begin + sizeof(gistxlogPageSplit);
 
197
        for (i = 0; i < decoded->data->npage; i++)
 
198
        {
 
199
                Assert(ptr - begin < record->xl_len);
 
200
                decoded->page[i].header = (gistxlogPage *) ptr;
 
201
                ptr += sizeof(gistxlogPage);
 
202
 
 
203
                decoded->page[i].itup = (IndexTuple *)
 
204
                        palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
 
205
                j = 0;
 
206
                while (j < decoded->page[i].header->num)
 
207
                {
 
208
                        Assert(ptr - begin < record->xl_len);
 
209
                        decoded->page[i].itup[j] = (IndexTuple) ptr;
 
210
                        ptr += IndexTupleSize((IndexTuple) ptr);
 
211
                        j++;
 
212
                }
 
213
        }
 
214
}
 
215
 
 
216
static void
 
217
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
 
218
{
 
219
        gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
 
220
        PageSplitRecord xlrec;
 
221
        Buffer          buffer;
 
222
        Page            page;
 
223
        int                     i;
 
224
        bool            isrootsplit = false;
 
225
 
 
226
        if (BlockNumberIsValid(xldata->leftchild))
 
227
                gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
 
228
        decodePageSplitRecord(&xlrec, record);
 
229
 
 
230
        /* loop around all pages */
 
231
        for (i = 0; i < xlrec.data->npage; i++)
 
232
        {
 
233
                NewPage    *newpage = xlrec.page + i;
 
234
                int                     flags;
 
235
 
 
236
                if (newpage->header->blkno == GIST_ROOT_BLKNO)
 
237
                {
 
238
                        Assert(i == 0);
 
239
                        isrootsplit = true;
 
240
                }
 
241
 
 
242
                buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
 
243
                Assert(BufferIsValid(buffer));
 
244
                page = (Page) BufferGetPage(buffer);
 
245
 
 
246
                /* ok, clear buffer */
 
247
                if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
 
248
                        flags = F_LEAF;
 
249
                else
 
250
                        flags = 0;
 
251
                GISTInitBuffer(buffer, flags);
 
252
 
 
253
                /* and fill it */
 
254
                gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
 
255
 
 
256
                if (newpage->header->blkno == GIST_ROOT_BLKNO)
 
257
                {
 
258
                        GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
 
259
                        GistPageGetOpaque(page)->nsn = xldata->orignsn;
 
260
                        GistClearFollowRight(page);
 
261
                }
 
262
                else
 
263
                {
 
264
                        if (i < xlrec.data->npage - 1)
 
265
                                GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
 
266
                        else
 
267
                                GistPageGetOpaque(page)->rightlink = xldata->origrlink;
 
268
                        GistPageGetOpaque(page)->nsn = xldata->orignsn;
 
269
                        if (i < xlrec.data->npage - 1 && !isrootsplit)
 
270
                                GistMarkFollowRight(page);
 
271
                        else
 
272
                                GistClearFollowRight(page);
 
273
                }
 
274
 
 
275
                PageSetLSN(page, lsn);
 
276
                PageSetTLI(page, ThisTimeLineID);
 
277
                MarkBufferDirty(buffer);
 
278
                UnlockReleaseBuffer(buffer);
 
279
        }
 
280
}
 
281
 
 
282
static void
 
283
gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
 
284
{
 
285
        RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
 
286
        Buffer          buffer;
 
287
        Page            page;
 
288
 
 
289
        buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
 
290
        Assert(BufferIsValid(buffer));
 
291
        page = (Page) BufferGetPage(buffer);
 
292
 
 
293
        GISTInitBuffer(buffer, F_LEAF);
 
294
 
 
295
        PageSetLSN(page, lsn);
 
296
        PageSetTLI(page, ThisTimeLineID);
 
297
 
 
298
        MarkBufferDirty(buffer);
 
299
        UnlockReleaseBuffer(buffer);
 
300
}
 
301
 
 
302
void
 
303
gist_redo(XLogRecPtr lsn, XLogRecord *record)
 
304
{
 
305
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
306
        MemoryContext oldCxt;
 
307
 
 
308
        /*
 
309
         * GIST indexes do not require any conflict processing. NB: If we ever
 
310
         * implement a similar optimization we have in b-tree, and remove killed
 
311
         * tuples outside VACUUM, we'll need to handle that here.
 
312
         */
 
313
        RestoreBkpBlocks(lsn, record, false);
 
314
 
 
315
        oldCxt = MemoryContextSwitchTo(opCtx);
 
316
        switch (info)
 
317
        {
 
318
                case XLOG_GIST_PAGE_UPDATE:
 
319
                        gistRedoPageUpdateRecord(lsn, record);
 
320
                        break;
 
321
                case XLOG_GIST_PAGE_DELETE:
 
322
                        gistRedoPageDeleteRecord(lsn, record);
 
323
                        break;
 
324
                case XLOG_GIST_PAGE_SPLIT:
 
325
                        gistRedoPageSplitRecord(lsn, record);
 
326
                        break;
 
327
                case XLOG_GIST_CREATE_INDEX:
 
328
                        gistRedoCreateIndex(lsn, record);
 
329
                        break;
 
330
                default:
 
331
                        elog(PANIC, "gist_redo: unknown op code %u", info);
 
332
        }
 
333
 
 
334
        MemoryContextSwitchTo(oldCxt);
 
335
        MemoryContextReset(opCtx);
 
336
}
 
337
 
 
338
static void
 
339
out_target(StringInfo buf, RelFileNode node)
 
340
{
 
341
        appendStringInfo(buf, "rel %u/%u/%u",
 
342
                                         node.spcNode, node.dbNode, node.relNode);
 
343
}
 
344
 
 
345
static void
 
346
out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
 
347
{
 
348
        out_target(buf, xlrec->node);
 
349
        appendStringInfo(buf, "; block number %u", xlrec->blkno);
 
350
}
 
351
 
 
352
static void
 
353
out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec)
 
354
{
 
355
        appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u",
 
356
                                xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
 
357
                                         xlrec->blkno);
 
358
}
 
359
 
 
360
static void
 
361
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
 
362
{
 
363
        appendStringInfo(buf, "page_split: ");
 
364
        out_target(buf, xlrec->node);
 
365
        appendStringInfo(buf, "; block number %u splits to %d pages",
 
366
                                         xlrec->origblkno, xlrec->npage);
 
367
}
 
368
 
 
369
void
 
370
gist_desc(StringInfo buf, uint8 xl_info, char *rec)
 
371
{
 
372
        uint8           info = xl_info & ~XLR_INFO_MASK;
 
373
 
 
374
        switch (info)
 
375
        {
 
376
                case XLOG_GIST_PAGE_UPDATE:
 
377
                        appendStringInfo(buf, "page_update: ");
 
378
                        out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
 
379
                        break;
 
380
                case XLOG_GIST_PAGE_DELETE:
 
381
                        out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
 
382
                        break;
 
383
                case XLOG_GIST_PAGE_SPLIT:
 
384
                        out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
 
385
                        break;
 
386
                case XLOG_GIST_CREATE_INDEX:
 
387
                        appendStringInfo(buf, "create_index: rel %u/%u/%u",
 
388
                                                         ((RelFileNode *) rec)->spcNode,
 
389
                                                         ((RelFileNode *) rec)->dbNode,
 
390
                                                         ((RelFileNode *) rec)->relNode);
 
391
                        break;
 
392
                default:
 
393
                        appendStringInfo(buf, "unknown gist op code %u", info);
 
394
                        break;
 
395
        }
 
396
}
 
397
 
 
398
void
 
399
gist_xlog_startup(void)
 
400
{
 
401
        opCtx = createTempGistContext();
 
402
}
 
403
 
 
404
void
 
405
gist_xlog_cleanup(void)
 
406
{
 
407
        MemoryContextDelete(opCtx);
 
408
}
 
409
 
 
410
/*
 
411
 * Write WAL record of a page split.
 
412
 */
 
413
XLogRecPtr
 
414
gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
 
415
                          SplitedPageLayout *dist,
 
416
                          BlockNumber origrlink, GistNSN orignsn,
 
417
                          Buffer leftchildbuf)
 
418
{
 
419
        XLogRecData *rdata;
 
420
        gistxlogPageSplit xlrec;
 
421
        SplitedPageLayout *ptr;
 
422
        int                     npage = 0,
 
423
                                cur;
 
424
        XLogRecPtr      recptr;
 
425
 
 
426
        for (ptr = dist; ptr; ptr = ptr->next)
 
427
                npage++;
 
428
 
 
429
        rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
 
430
 
 
431
        xlrec.node = node;
 
432
        xlrec.origblkno = blkno;
 
433
        xlrec.origrlink = origrlink;
 
434
        xlrec.orignsn = orignsn;
 
435
        xlrec.origleaf = page_is_leaf;
 
436
        xlrec.npage = (uint16) npage;
 
437
        xlrec.leftchild =
 
438
                BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
 
439
 
 
440
        rdata[0].data = (char *) &xlrec;
 
441
        rdata[0].len = sizeof(gistxlogPageSplit);
 
442
        rdata[0].buffer = InvalidBuffer;
 
443
 
 
444
        cur = 1;
 
445
 
 
446
        /*
 
447
         * Include a full page image of the child buf. (only necessary if a
 
448
         * checkpoint happened since the child page was split)
 
449
         */
 
450
        if (BufferIsValid(leftchildbuf))
 
451
        {
 
452
                rdata[cur - 1].next = &(rdata[cur]);
 
453
                rdata[cur].data = NULL;
 
454
                rdata[cur].len = 0;
 
455
                rdata[cur].buffer = leftchildbuf;
 
456
                rdata[cur].buffer_std = true;
 
457
                cur++;
 
458
        }
 
459
 
 
460
        for (ptr = dist; ptr; ptr = ptr->next)
 
461
        {
 
462
                rdata[cur - 1].next = &(rdata[cur]);
 
463
                rdata[cur].buffer = InvalidBuffer;
 
464
                rdata[cur].data = (char *) &(ptr->block);
 
465
                rdata[cur].len = sizeof(gistxlogPage);
 
466
                cur++;
 
467
 
 
468
                rdata[cur - 1].next = &(rdata[cur]);
 
469
                rdata[cur].buffer = InvalidBuffer;
 
470
                rdata[cur].data = (char *) (ptr->list);
 
471
                rdata[cur].len = ptr->lenlist;
 
472
                cur++;
 
473
        }
 
474
        rdata[cur - 1].next = NULL;
 
475
 
 
476
        recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
 
477
 
 
478
        pfree(rdata);
 
479
        return recptr;
 
480
}
 
481
 
 
482
/*
 
483
 * Write XLOG record describing a page update. The update can include any
 
484
 * number of deletions and/or insertions of tuples on a single index page.
 
485
 *
 
486
 * If this update inserts a downlink for a split page, also record that
 
487
 * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
 
488
 *
 
489
 * Note that both the todelete array and the tuples are marked as belonging
 
490
 * to the target buffer; they need not be stored in XLOG if XLogInsert decides
 
491
 * to log the whole buffer contents instead.  Also, we take care that there's
 
492
 * at least one rdata item referencing the buffer, even when ntodelete and
 
493
 * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
 
494
 */
 
495
XLogRecPtr
 
496
gistXLogUpdate(RelFileNode node, Buffer buffer,
 
497
                           OffsetNumber *todelete, int ntodelete,
 
498
                           IndexTuple *itup, int ituplen,
 
499
                           Buffer leftchildbuf)
 
500
{
 
501
        XLogRecData *rdata;
 
502
        gistxlogPageUpdate *xlrec;
 
503
        int                     cur,
 
504
                                i;
 
505
        XLogRecPtr      recptr;
 
506
 
 
507
        rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen));
 
508
        xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
 
509
 
 
510
        xlrec->node = node;
 
511
        xlrec->blkno = BufferGetBlockNumber(buffer);
 
512
        xlrec->ntodelete = ntodelete;
 
513
        xlrec->leftchild =
 
514
                BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
 
515
 
 
516
        rdata[0].buffer = buffer;
 
517
        rdata[0].buffer_std = true;
 
518
        rdata[0].data = NULL;
 
519
        rdata[0].len = 0;
 
520
        rdata[0].next = &(rdata[1]);
 
521
 
 
522
        rdata[1].data = (char *) xlrec;
 
523
        rdata[1].len = sizeof(gistxlogPageUpdate);
 
524
        rdata[1].buffer = InvalidBuffer;
 
525
        rdata[1].next = &(rdata[2]);
 
526
 
 
527
        rdata[2].data = (char *) todelete;
 
528
        rdata[2].len = sizeof(OffsetNumber) * ntodelete;
 
529
        rdata[2].buffer = buffer;
 
530
        rdata[2].buffer_std = true;
 
531
 
 
532
        cur = 3;
 
533
 
 
534
        /* new tuples */
 
535
        for (i = 0; i < ituplen; i++)
 
536
        {
 
537
                rdata[cur - 1].next = &(rdata[cur]);
 
538
                rdata[cur].data = (char *) (itup[i]);
 
539
                rdata[cur].len = IndexTupleSize(itup[i]);
 
540
                rdata[cur].buffer = buffer;
 
541
                rdata[cur].buffer_std = true;
 
542
                cur++;
 
543
        }
 
544
 
 
545
        /*
 
546
         * Include a full page image of the child buf. (only necessary if a
 
547
         * checkpoint happened since the child page was split)
 
548
         */
 
549
        if (BufferIsValid(leftchildbuf))
 
550
        {
 
551
                rdata[cur - 1].next = &(rdata[cur]);
 
552
                rdata[cur].data = NULL;
 
553
                rdata[cur].len = 0;
 
554
                rdata[cur].buffer = leftchildbuf;
 
555
                rdata[cur].buffer_std = true;
 
556
                cur++;
 
557
        }
 
558
        rdata[cur - 1].next = NULL;
 
559
 
 
560
        recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
 
561
 
 
562
        pfree(rdata);
 
563
        return recptr;
 
564
}