~jlukas79/+junk/mysql-server

« back to all changes in this revision

Viewing changes to storage/falcon/Table.cpp

manual merge 6.0-main --> 6.0-bka-review

Show diffs side-by-side

added added

removed removed

Lines of Context:
84
84
#undef THIS_FILE
85
85
static const char THIS_FILE[]=__FILE__;
86
86
#endif
 
87
static bool needUniqueCheck(Index *index, Record *record);
87
88
 
88
89
//////////////////////////////////////////////////////////////////////
89
90
// Construction/Destruction
346
347
 
347
348
                // Make insert/update atomic, then check for unique index duplicats
348
349
                
349
 
                Sync sync(&syncUpdate, "Table::insert");
350
350
                recordNumber = record->recordNumber = dbb->insertStub(dataSection, transaction);
351
 
                
352
 
                if (indexes)
353
 
                        {
354
 
                        checkUniqueIndexes(transaction, record);
355
 
 
356
 
                        FOR_INDEXES(index, this);
357
 
                                index->insert(record, transaction);
358
 
                        END_FOR;
359
 
                        }
360
351
 
361
352
                // Verify that record is valid
362
353
 
364
355
                transaction->addRecord(record);
365
356
                insert(record, NULL, recordNumber);
366
357
                inserted = true;
367
 
                
 
358
                insertIndexes(transaction, record);
368
359
                updateInversion(record, transaction);
369
360
                fireTriggers(transaction, PostInsert, NULL, record);
370
361
                record->release();
506
497
                        
507
498
                        for (int n = 0; (record = databaseFetch(bitNumber)); ++n)
508
499
                                {
509
 
                                if (insert(record, NULL, recordNumber))
 
500
                                if (insert(record, NULL, bitNumber))
510
501
                                        {
511
502
                                        record->poke();
512
503
                                        
527
518
                                sync.unlock();
528
519
                                ASSERT(n < 2);
529
520
                                }
530
 
                                        
 
521
                        
531
522
                        // Record has gotten lost; no serious cause for concern
532
523
                        
533
524
                        sync.lock(Shared);
739
730
        buildFieldVector();
740
731
        resultSet->close();
741
732
        statement->close();
742
 
 
743
733
}
744
734
 
745
735
void Table::loadIndexes()
823
813
        activeVersions = false;
824
814
        primaryKey = NULL;
825
815
        formats = NEW Format* [FORMAT_HASH_SIZE];
 
816
 
 
817
        for (int n = 0; n < SYNC_VERSIONS_SIZE; n++)
 
818
                {
 
819
                char name[64];
 
820
                sprintf(name, "syncPriorVersions[%02d]", n);
 
821
                syncPriorVersions[n].setName(name);
 
822
                }
 
823
                
826
824
        triggers = NULL;
827
825
        memset (formats, 0, sizeof (Format*) * FORMAT_HASH_SIZE);
828
826
        maxFieldId = 0;
837
835
        syncObject.setName("Table::syncObject");
838
836
        syncTriggers.setName("Table::syncTriggers");
839
837
        syncScavenge.setName("Table::syncScavenge");
840
 
        syncUpdate.setName("Table::syncUpdate");
841
838
        syncAlter.setName("Table::syncAlter");
842
839
}
843
840
 
956
953
        return NULL;    
957
954
}
958
955
 
959
 
Record* Table::rollbackRecord(RecordVersion * recordToRollback, Transaction *transaction)
 
956
void Table::rollbackRecord(RecordVersion * recordToRollback, Transaction *transaction)
960
957
{
961
958
#ifdef CHECK_RECORD_ACTIVITY
962
959
        recordToRollback->active = false;
966
963
        recordToRollback->state = recRollback;
967
964
 
968
965
        // Find the record that will become the current version.
 
966
        // syncPrior is not needed here. No other thread can change this
 
967
        // priorVersion now. Changing the base record is protected by
 
968
        // RecordLeaf::syncObject
969
969
 
970
 
        Record *priorRecord = recordToRollback->priorVersion;
 
970
        Record *priorRecord = recordToRollback->getPriorVersion();
971
971
 
972
972
        if (priorRecord)
 
973
                {
 
974
                priorRecord->addRef();
973
975
                priorRecord->setSuperceded(false);
 
976
                }
974
977
 
975
978
        // Replace the current version of this record.
976
979
 
977
980
        if (!insert(priorRecord, recordToRollback, recordToRollback->recordNumber))
978
981
                {
979
982
                if (priorRecord == NULL && priorState == recDeleted)
980
 
                        return priorRecord;
981
 
                        
982
 
                recordToRollback->printRecord("Table::rollbackRecord");
983
 
                insert(priorRecord, recordToRollback, recordToRollback->recordNumber);
 
983
                        return;
 
984
 
 
985
                // The store of this record into the record leaf failed. No way to recover.
 
986
 
 
987
                recordToRollback->printRecord("Table::rollbackRecord failed");
 
988
                //insert(priorRecord, recordToRollback, recordToRollback->recordNumber);
984
989
                ASSERT(false);
985
990
                }
986
991
 
991
996
 
992
997
        if (backloggedRecords)
993
998
                deleteRecordBacklog(recordToRollback->recordNumber);
994
 
                        
995
 
        return priorRecord;
 
999
        
 
1000
        if (priorRecord)
 
1001
                priorRecord->release();
996
1002
}
997
1003
 
998
1004
void Table::addFormat(Format * format)
1136
1142
        database->flushInversion(transaction);
1137
1143
}
1138
1144
 
1139
 
 
1140
1145
void Table::makeNotSearchable(Field *field, Transaction *transaction)
1141
1146
{
1142
1147
        Record *record;
1162
1167
        database->flushInversion(transaction);
1163
1168
}
1164
1169
 
 
1170
/**
 
1171
@brief          index update , combined with unique check (atomic)
 
1172
 
 
1173
                Determine if the record we intend to write will have a duplicate conflict
 
1174
                        with any pending or visible records.
 
1175
@details        For each  unique index, obtain an exclusive lock and check the index
 
1176
                        update index if the search succeeded by not finding a duplicate.
 
1177
                        Retry if a wait occurred.
 
1178
                        If a duplicate is found, an exception should be caught by the caller.
 
1179
                non-unique indexes are  updated without any check
 
1180
**/
 
1181
void Table::updateIndexes(Transaction *transaction, RecordVersion *record, Record *oldRecord)
 
1182
{
 
1183
        if (indexes)
 
1184
        {
 
1185
        FOR_INDEXES(index, this);
 
1186
                Sync sync(&(index->syncUnique), "Table::updateIndexes");
 
1187
                if(needUniqueCheck(index,record))
 
1188
                        for(;;)
 
1189
                        {
 
1190
                        sync.lock(Exclusive);
 
1191
                        if (!checkUniqueIndex(index, transaction, record , &sync))
 
1192
                                break;
 
1193
                        }
 
1194
                index->update(oldRecord, record, transaction);
 
1195
        END_FOR;
 
1196
        }
 
1197
}
 
1198
 
 
1199
/**
 
1200
@brief          Uniqueness check combined with index insert (atomic)
 
1201
**/
 
1202
void Table::insertIndexes(Transaction *transaction, RecordVersion *record)
 
1203
{
 
1204
        if (indexes)
 
1205
                {
 
1206
                FOR_INDEXES(index, this);
 
1207
                        Sync sync(&index->syncUnique, "Table::insertIndexes");
 
1208
                        
 
1209
                        if (needUniqueCheck(index,record))
 
1210
                                for(;;)
 
1211
                                        {
 
1212
                                        sync.lock(Exclusive);
 
1213
                                        
 
1214
                                        if(!checkUniqueIndex(index, transaction, record, &sync))
 
1215
                                                break;
 
1216
                                        }
 
1217
                                
 
1218
                        index->insert(record, transaction);
 
1219
                END_FOR;
 
1220
                }
 
1221
}
 
1222
 
 
1223
 
1165
1224
void Table::update(Transaction * transaction, Record * oldRecord, int numberFields, Field** updateFields, Value * * values)
1166
1225
{
1167
1226
        database->preUpdate();
1223
1282
                
1224
1283
                // Make insert/update atomic, then check for unique index duplicats
1225
1284
 
1226
 
                if (indexes)
1227
 
                        {
1228
 
                        checkUniqueIndexes(transaction, record);
1229
 
 
1230
 
                        FOR_INDEXES(index, this);
1231
 
                                index->update(oldRecord, record, transaction);
1232
 
                        END_FOR;
1233
 
                        }
1234
1285
 
1235
1286
                scavenge.lock(Shared);
1236
1287
                validateAndInsert(transaction, record);
1237
1288
                transaction->addRecord(record);
1238
1289
                updated = true;
 
1290
                updateIndexes(transaction, record, oldRecord);
1239
1291
 
1240
1292
                updateInversion(record, transaction);
1241
1293
                fireTriggers(transaction, PostUpdate, oldRecord, record);
1259
1311
                
1260
1312
                if (record)
1261
1313
                        {
1262
 
                        if (record->priorVersion)
1263
 
                                record->priorVersion->setSuperceded(false);
 
1314
                        if (record->getPriorVersion())
 
1315
                                record->getPriorVersion()->setSuperceded(false);
1264
1316
                                                                
1265
1317
                        if (record->state == recLock)
1266
1318
                                record->deleteData();
1288
1340
                return;
1289
1341
 
1290
1342
        Record *record;
1291
 
 
 
1343
        
1292
1344
        for (int32 next = 0; (record = fetchNext(next));)
1293
1345
                {
1294
1346
                next = record->recordNumber + 1;
1295
1347
                
1296
 
                for (Record *version = record; version; version = version->getPriorVersion())
1297
 
                        if (version->hasRecord())
1298
 
                                FOR_FIELDS(field, this)
1299
 
                                        if (field->flags & SEARCHABLE)
1300
 
                                                {
1301
 
                                                Value value;
1302
 
                                                version->getValue(field->id, &value);
1303
 
                                                Filter stream(tableId, field->id, version->recordNumber, &value);
1304
 
                                                //value.getStream(&stream, false);
1305
 
                                                database->addInversion(&stream, transaction);
1306
 
                                                }
1307
 
                                END_FOR;
1308
 
                                
 
1348
                {
 
1349
                        Sync syncPrior(getSyncPrior(record->recordNumber), "Table::reIndexInversion");
 
1350
                        syncPrior.lock(Shared);
 
1351
        
 
1352
                        for (Record *version = record; version; version = version->getPriorVersion())
 
1353
                                if (version->hasRecord())
 
1354
                                        FOR_FIELDS(field, this)
 
1355
                                                if (field->flags & SEARCHABLE)
 
1356
                                                        {
 
1357
                                                        Value value;
 
1358
                                                        version->getValue(field->id, &value);
 
1359
                                                        Filter stream(tableId, field->id, version->recordNumber, &value);
 
1360
                                                        //value.getStream(&stream, false);
 
1361
                                                        database->addInversion(&stream, transaction);
 
1362
                                                        }
 
1363
                                        END_FOR;
 
1364
                }
 
1365
                        
1309
1366
                record->release();
1310
1367
                }
1311
1368
}
1391
1448
{
1392
1449
        database->preUpdate();
1393
1450
        Sync scavenge(&syncScavenge, "Table::deleteRecord");
 
1451
 
 
1452
        // syncPrior is not needed here.  It is handled in fetchVersion()
1394
1453
        Record *candidate = fetch(orgRecord->recordNumber);
1395
1454
        checkAncestor(candidate, orgRecord);
1396
1455
        RecordVersion *record;
1404
1463
                if (candidate->getSavePointId() == transaction->curSavePointId)
1405
1464
                        {
1406
1465
                        record = (RecordVersion*) candidate;
1407
 
                        ASSERT(record->priorVersion == orgRecord);
 
1466
                        ASSERT(record->getPriorVersion() == orgRecord);
1408
1467
                        wasLock = true;
1409
1468
                        }
1410
1469
                else
1579
1638
        
1580
1639
        // Update system.tables with new section ids and cardinality
1581
1640
        
1582
 
        PreparedStatement *statement = database->prepareStatement("update system.tables set dataSection=?, blobSection=?, cardinality=? where tableId=?");
 
1641
        PreparedStatement *statement = database->prepareStatement (     "update system.tables set dataSection=?,"
 
1642
                                                                                                                                " blobSection=?, cardinality=? where tableId=?");
1583
1643
        statement->setInt(1, dataSectionId);
1584
1644
        statement->setInt(2, blobSectionId);
1585
1645
        statement->setLong(3, cardinality);
1719
1779
                {
1720
1780
                next = record->recordNumber + 1;
1721
1781
 
1722
 
                for (Record *version = record; version; version = version->getPriorVersion())
1723
 
                        if (version->hasRecord())
1724
 
                                index->insert(version, transaction);
 
1782
                {
 
1783
                        Sync syncPrior(getSyncPrior(record->recordNumber), "Table::populateIndex");
 
1784
                        syncPrior.lock(Shared);
 
1785
                
 
1786
                        for (Record *version = record; version; version = version->getPriorVersion())
 
1787
                                if (version->hasRecord())
 
1788
                                        index->insert(version, transaction);
 
1789
                }
1725
1790
 
1726
1791
                record->release();
1727
1792
 
1879
1944
void Table::expungeRecordVersions(RecordVersion *record, RecordScavenge *recordScavenge)
1880
1945
{
1881
1946
        ASSERT(record->state != recLock);
1882
 
        Record *prior = record->priorVersion;
1883
 
        record->priorVersion = NULL;
 
1947
 
 
1948
        Record *prior = record->clearPriorVersion();
1884
1949
        
1885
1950
        if (recordScavenge)
1886
1951
                for (Record *rec = prior; rec; rec = rec->getPriorVersion())
1932
1997
                        return isDuplicate;
1933
1998
                }
1934
1999
 
 
2000
        Sync syncPrior(getSyncPrior(recordChain), "Table::duplicateBlob");
 
2001
        syncPrior.lock(Shared);
 
2002
        
1935
2003
        for (Record *record = recordChain; record; record = record->getPriorVersion())
1936
2004
                if (record->hasRecord())
1937
2005
                        {
2009
2077
        dbb->expungeRecord(section, recordNumber);
2010
2078
}
2011
2079
 
2012
 
void Table::garbageCollect(Record * leaving, Record * staying, Transaction *transaction, bool quiet)
 
2080
void Table::garbageCollect(Record *leaving, Record *staying, Transaction *transaction, bool quiet)
2013
2081
{
 
2082
        if (!leaving && !staying)
 
2083
                return;
 
2084
 
 
2085
        Sync syncPrior(getSyncPrior(leaving ? leaving : staying), "Table::garbageCollect");
 
2086
        syncPrior.lock(Shared);
 
2087
        
2014
2088
        // Clean up field indexes
2015
2089
 
2016
2090
        FOR_INDEXES(index, this);
2202
2276
 
2203
2277
        try
2204
2278
                {
2205
 
                fireTriggers(transaction, PostCommit, record->priorVersion, after);
 
2279
                fireTriggers(transaction, PostCommit, record->getPriorVersion(), after);
2206
2280
                }
2207
2281
        catch (...)
2208
2282
                {
2371
2445
 
2372
2446
        return true;
2373
2447
}
2374
 
/**
2375
 
@brief          Determine if the record we intend to write will have a duplicate conflict
2376
 
                        with any pending or visible records.
2377
 
@details        For each index, call checkUniqueIndex.  
2378
 
                        Return if the search succeeded by not finding a duplicate.
2379
 
                        Retry if a wait occurred.
2380
 
                        If a duplicate is found, an exception should be caught by the caller.
2381
 
**/
2382
 
 
2383
 
void Table::checkUniqueIndexes(Transaction *transaction, RecordVersion *record)
2384
 
{
2385
 
        Record *oldRecord = record->priorVersion;
2386
 
        bool retry = true;
2387
 
 
2388
 
        while (retry)
2389
 
                {
2390
 
                retry = false;
2391
 
                FOR_INDEXES(index, this);
2392
 
                        {
2393
 
                        if (INDEX_IS_UNIQUE(index->type) &&
2394
 
                                (!oldRecord || index->changed(record, oldRecord)))
2395
 
                                {
2396
 
                                retry = checkUniqueIndex(index, transaction, record);
2397
 
                                if (retry)
2398
 
                                        break;
2399
 
                                }
2400
 
                        if (retry)
2401
 
                                break;
2402
 
                        }
2403
 
                END_FOR;
2404
 
                }
2405
 
 
2406
 
        return;
2407
 
}
 
2448
 
2408
2449
 
2409
2450
/**
2410
2451
@brief          Determine if the record we intend to write will have a duplicate conflict
2414
2455
                        Return false if no duplicate was found.
2415
2456
**/
2416
2457
 
2417
 
bool Table::checkUniqueIndex(Index *index, Transaction *transaction, RecordVersion *record)
 
2458
bool Table::checkUniqueIndex(Index *index, Transaction *transaction, RecordVersion *record, Sync *sync)
2418
2459
{
2419
2460
        Bitmap bitmap;
2420
2461
        IndexKey indexKey(index);
2423
2464
 
2424
2465
        for (int32 recordNumber = 0; (recordNumber = bitmap.nextSet(recordNumber)) >= 0; ++recordNumber)
2425
2466
                {
2426
 
                int retry = checkUniqueRecordVersion(recordNumber, index, transaction, record);
 
2467
                int retry = checkUniqueRecordVersion(recordNumber, index, transaction, record, sync);
2427
2468
                
2428
2469
                if (retry)
2429
2470
                        return true;  // restart the search since a wait occurred.
2441
2482
                        Throw an exception if a duplicate WAS found
2442
2483
**/
2443
2484
 
2444
 
bool Table::checkUniqueRecordVersion(int32 recordNumber, Index *index, Transaction *transaction, RecordVersion *record)
 
2485
bool Table::checkUniqueRecordVersion(int32 recordNumber, Index *index, Transaction *transaction, RecordVersion *record, Sync *syncUnique)
2445
2486
{
2446
2487
        Record *rec;
2447
 
        Record *oldRecord = record->priorVersion;
 
2488
        Record *oldRecord = record->getPriorVersion();
2448
2489
        Transaction *activeTransaction = NULL;
2449
2490
        State state = CommittedVisible;
2450
2491
 
2459
2500
        if ( !(rec = fetch(recordNumber)) )
2460
2501
                return false;    // Check next record number.
2461
2502
 
 
2503
        Sync syncPrior(getSyncPrior(recordNumber), "Table::checkUniqueRecordVersion");
 
2504
        syncPrior.lock(Shared);
 
2505
        
2462
2506
        for (Record *dup = rec; dup; dup = dup->getPriorVersion())
2463
2507
                {
2464
2508
                if (dup == record)
2530
2574
                        {
2531
2575
                        if (state == Active)
2532
2576
                                {
2533
 
                                // wait for that transaction, then restart checkUniqueIndexes()
 
2577
                                syncPrior.unlock(); // release lock before wait
 
2578
                                syncUnique->unlock(); // release lock before wait
 
2579
 
 
2580
                                // Wait for that transaction, then restart checkUniqueIndexes()
2534
2581
 
2535
2582
                                state = transaction->getRelativeState(dup, WAIT_IF_ACTIVE);
2536
2583
 
2547
2594
 
2548
2595
                        else if (activeTransaction)
2549
2596
                                {
 
2597
                                syncPrior.unlock(); // release lock before wait
 
2598
                                syncUnique->unlock(); // release lock before wait
 
2599
 
2550
2600
                                state = transaction->getRelativeState(activeTransaction,
2551
2601
                                                activeTransaction->transactionId, WAIT_IF_ACTIVE);
2552
2602
 
2727
2777
                {
2728
2778
                next = record->recordNumber + 1;
2729
2779
                
2730
 
                for (Record *version = record; version; version = version->getPriorVersion())
2731
 
                        if (version->hasRecord())
2732
 
                                for (field = fields; field; field = field->next)
2733
 
                                        if (field->type == Asciiblob || field->type == Binaryblob)
2734
 
                                                {
2735
 
                                                int id = version->getBlobId(field->id);
2736
 
                                                
2737
 
                                                if (id >= 0)
2738
 
                                                        references.set (id);
2739
 
                                                }
 
2780
                {
 
2781
                        Sync syncPrior(getSyncPrior(record->recordNumber), "Table::validateBlobs");
 
2782
                        syncPrior.lock(Shared);
 
2783
                
 
2784
                        for (Record *version = record; version; version = version->getPriorVersion())
 
2785
                                if (version->hasRecord())
 
2786
                                        for (field = fields; field; field = field->next)
 
2787
                                                if (field->type == Asciiblob || field->type == Binaryblob)
 
2788
                                                        {
 
2789
                                                        int id = version->getBlobId(field->id);
 
2790
                                        
 
2791
                                                        if (id >= 0)
 
2792
                                                                references.set (id);
 
2793
                                                        }
 
2794
                }
2740
2795
                                                
2741
2796
                record->release();
2742
2797
                }
2942
2997
                
2943
2998
                // Make insert/update atomic, then check for unique index duplicats
2944
2999
 
2945
 
                Sync sync(&syncUpdate, "Table::insert");
2946
 
                
2947
 
                if (indexes)
2948
 
                        {
2949
 
                        checkUniqueIndexes(transaction, record);
2950
 
 
2951
 
                        FOR_INDEXES(index, this);
2952
 
                                index->insert (record, transaction);
2953
 
                        END_FOR;
2954
 
                        }
2955
3000
 
2956
3001
                // Do the actual insert
2957
3002
 
2958
3003
                transaction->addRecord(record);
2959
3004
                bool ret = insert(record, NULL, recordNumber);
 
3005
                inserted = true;
 
3006
                insertIndexes(transaction, record);
2960
3007
                ASSERT(ret);
2961
 
                inserted = true;
2962
3008
 
2963
3009
                record->release();
2964
3010
                }
2991
3037
void Table::update(Transaction * transaction, Record *orgRecord, Stream *stream)
2992
3038
{
2993
3039
        database->preUpdate();
 
3040
        
2994
3041
        Record *candidate = fetch(orgRecord->recordNumber);
2995
3042
        checkAncestor(candidate, orgRecord);
2996
3043
        
3066
3113
                                attachment->preUpdate(this, record);
3067
3114
                END_FOR;
3068
3115
 
3069
 
                // Make insert/update atomic, then check for unique index duplicats
3070
 
 
3071
 
                if (indexes)
3072
 
                        {
3073
 
                        checkUniqueIndexes(transaction, record);
3074
 
 
3075
 
                        FOR_INDEXES(index, this);
3076
 
                                index->update(oldRecord, record, transaction);
3077
 
                        END_FOR;
3078
 
                        }
 
3116
 
3079
3117
 
3080
3118
                //updateInversion(record, transaction);
3081
3119
                scavenge.lock(Shared);
3089
3127
                        }
3090
3128
 
3091
3129
                updated = true;
 
3130
                // Make insert/update atomic, then check for unique index duplicats
 
3131
                updateIndexes(transaction, record, oldRecord);
3092
3132
                //fireTriggers(transaction, PostUpdate, oldRecord, record);
3093
3133
 
3094
3134
                // If this is a re-update in the same transaction and the same savepoint,
3115
3155
        
3116
3156
                if (record)
3117
3157
                        {
3118
 
                        if (record->priorVersion)
3119
 
                                record->priorVersion->setSuperceded(false);
 
3158
                        if (record->getPriorVersion())
 
3159
                                record->getPriorVersion()->setSuperceded(false);
3120
3160
                                                                
3121
3161
                        if (record->state == recLock)
3122
3162
                                record->deleteData();
3130
3170
                }
3131
3171
}
3132
3172
 
3133
 
 
3134
3173
void Table::rename(const char *newSchema, const char *newName)
3135
3174
{
3136
3175
        try
3218
3257
 
3219
3258
void Table::validateAndInsert(Transaction *transaction, RecordVersion *record)
3220
3259
{
3221
 
        Sync sync(&syncObject, "Table::validateAndInsert");
 
3260
        Sync syncTable(&syncObject, "Table::validateAndInsert");
 
3261
 
 
3262
        // Do not need syncPrior here since this is a new record.
 
3263
        // No other thread can see this records priorVersion pointer.
 
3264
 
3222
3265
        Record *prior = record->getPriorVersion();
3223
3266
 
3224
3267
        for (int n = 0; n < 10; ++n)
3225
3268
                {
3226
3269
                if (prior)
3227
3270
                        {
3228
 
                        sync.lock(Exclusive);
 
3271
                        syncTable.lock(Exclusive);
3229
3272
                        Record *current = fetch(record->recordNumber);
3230
3273
 
3231
3274
                        if (current)
3240
3283
 
3241
3284
                                        TransId transId = current->getTransactionId();
3242
3285
                                        current->release();
3243
 
                                        sync.unlock();
 
3286
                                        syncTable.unlock();
3244
3287
 
3245
3288
                                        if (transaction->waitForTransaction(transId))
3246
3289
                                                {
3266
3309
                        
3267
3310
                record->active = false;
3268
3311
                }
3269
 
 
 
3312
                
3270
3313
        throw SQLError(UPDATE_CONFLICT, "unexpected update conflict in table %s.%s record %d", schemaName, name, record->recordNumber);
3271
3314
}
3272
3315
 
3346
3389
void Table::unlockRecord(RecordVersion* record, bool remove)
3347
3390
{
3348
3391
        //int uc = record->useCount;
3349
 
        ASSERT(record->priorVersion);
 
3392
        ASSERT(record->getPriorVersion());
3350
3393
        
3351
3394
        if (record->state == recLock)
3352
3395
                {
3353
3396
                record->state = recUnlocked;
3354
3397
 
3355
 
                if (insert(record->priorVersion, record, record->recordNumber))
 
3398
                if (insert(record->getPriorVersion(), record, record->recordNumber))
3356
3399
                        {
3357
3400
                        if (remove && record->transaction)
3358
3401
                                record->transaction->removeRecord(record);
3364
3407
 
3365
3408
void Table::checkAncestor(Record* current, Record* oldRecord)
3366
3409
{
 
3410
        Sync syncPrior(getSyncPrior(current), "Table::checkAncestor");
 
3411
        syncPrior.lock(Shared);
 
3412
        
3367
3413
        for (Record *record = current; record; record = record->getPriorVersion())
3368
3414
                if (record == oldRecord)
3369
3415
                        return;
3402
3448
                if (record->state != recLock)
3403
3449
                        return record;
3404
3450
 
 
3451
                Sync syncPrior(getSyncPrior(record), "Table::fetchForUpdate");
 
3452
                syncPrior.lock(Shared);
 
3453
        
3405
3454
                Record *prior = record->getPriorVersion();
3406
3455
                prior->addRef();
3407
3456
                record->release();
3608
3657
        Record *record = treeFetch(recordNumber);
3609
3658
        Record *initial = record;
3610
3659
        
 
3660
        Sync syncPrior(getSyncPrior(recordNumber), "Table::validateUpdate");
 
3661
        syncPrior.lock(Shared);
 
3662
        
3611
3663
        while (record)
3612
3664
                {
3613
3665
                if (record->getTransactionId() == transactionId)
3621
3673
                
3622
3674
                if (!transaction || transaction->state == Committed)
3623
3675
                        {
 
3676
#ifdef CHECK_RECORD_ACTIVITY
 
3677
                        record->active = false;
 
3678
#endif
3624
3679
                        record->release();
3625
 
                        
3626
3680
                        return false;
3627
3681
                        }
3628
3682
                
3677
3731
 
3678
3732
void Table::deleteRecordBacklog(int32 recordNumber)
3679
3733
{
3680
 
        Sync sync(&syncObject, "Table::rollbackRecord");
 
3734
        Sync sync(&syncObject, "Table::deleteRecordBacklog");
3681
3735
        sync.lock(Shared);
3682
3736
        int32 backlogId = backloggedRecords->get(recordNumber);
3683
3737
        
3689
3743
                database->backLog->deleteRecord(backlogId);
3690
3744
                }
3691
3745
}
 
3746
 
 
3747
SyncObject* Table::getSyncPrior(Record* record)
 
3748
{
 
3749
        int lockNumber = record->recordNumber % SYNC_VERSIONS_SIZE;
 
3750
        return syncPriorVersions + lockNumber;
 
3751
}
 
3752
 
 
3753
SyncObject* Table::getSyncPrior(int recordNumber)
 
3754
{
 
3755
        int lockNumber = recordNumber % SYNC_VERSIONS_SIZE;
 
3756
        return syncPriorVersions + lockNumber;
 
3757
}
 
3758
 
 
3759
static bool needUniqueCheck(Index *index, Record *record)
 
3760
{
 
3761
        Record *oldRecord = record->getPriorVersion();
 
3762
        return (INDEX_IS_UNIQUE(index->type) &&
 
3763
                (!oldRecord || index->changed(record, oldRecord)));
 
3764
}