85
85
static const char THIS_FILE[]=__FILE__;
87
static bool needUniqueCheck(Index *index, Record *record);
88
89
//////////////////////////////////////////////////////////////////////
89
90
// Construction/Destruction
347
348
// Make insert/update atomic, then check for unique index duplicats
349
Sync sync(&syncUpdate, "Table::insert");
350
350
recordNumber = record->recordNumber = dbb->insertStub(dataSection, transaction);
354
checkUniqueIndexes(transaction, record);
356
FOR_INDEXES(index, this);
357
index->insert(record, transaction);
361
352
// Verify that record is valid
364
355
transaction->addRecord(record);
365
356
insert(record, NULL, recordNumber);
358
insertIndexes(transaction, record);
368
359
updateInversion(record, transaction);
369
360
fireTriggers(transaction, PostInsert, NULL, record);
370
361
record->release();
507
498
for (int n = 0; (record = databaseFetch(bitNumber)); ++n)
509
if (insert(record, NULL, recordNumber))
500
if (insert(record, NULL, bitNumber))
823
813
activeVersions = false;
824
814
primaryKey = NULL;
825
815
formats = NEW Format* [FORMAT_HASH_SIZE];
817
for (int n = 0; n < SYNC_VERSIONS_SIZE; n++)
820
sprintf(name, "syncPriorVersions[%02d]", n);
821
syncPriorVersions[n].setName(name);
827
825
memset (formats, 0, sizeof (Format*) * FORMAT_HASH_SIZE);
837
835
syncObject.setName("Table::syncObject");
838
836
syncTriggers.setName("Table::syncTriggers");
839
837
syncScavenge.setName("Table::syncScavenge");
840
syncUpdate.setName("Table::syncUpdate");
841
838
syncAlter.setName("Table::syncAlter");
959
Record* Table::rollbackRecord(RecordVersion * recordToRollback, Transaction *transaction)
956
void Table::rollbackRecord(RecordVersion * recordToRollback, Transaction *transaction)
961
958
#ifdef CHECK_RECORD_ACTIVITY
962
959
recordToRollback->active = false;
966
963
recordToRollback->state = recRollback;
968
965
// Find the record that will become the current version.
966
// syncPrior is not needed here. No other thread can change this
967
// priorVersion now. Changing the base record is protected by
968
// RecordLeaf::syncObject
970
Record *priorRecord = recordToRollback->priorVersion;
970
Record *priorRecord = recordToRollback->getPriorVersion();
974
priorRecord->addRef();
973
975
priorRecord->setSuperceded(false);
975
978
// Replace the current version of this record.
977
980
if (!insert(priorRecord, recordToRollback, recordToRollback->recordNumber))
979
982
if (priorRecord == NULL && priorState == recDeleted)
982
recordToRollback->printRecord("Table::rollbackRecord");
983
insert(priorRecord, recordToRollback, recordToRollback->recordNumber);
985
// The store of this record into the record leaf failed. No way to recover.
987
recordToRollback->printRecord("Table::rollbackRecord failed");
988
//insert(priorRecord, recordToRollback, recordToRollback->recordNumber);
1162
1167
database->flushInversion(transaction);
1171
@brief index update , combined with unique check (atomic)
1173
Determine if the record we intend to write will have a duplicate conflict
1174
with any pending or visible records.
1175
@details For each unique index, obtain an exclusive lock and check the index
1176
update index if the search succeeded by not finding a duplicate.
1177
Retry if a wait occurred.
1178
If a duplicate is found, an exception should be caught by the caller.
1179
non-unique indexes are updated without any check
1181
void Table::updateIndexes(Transaction *transaction, RecordVersion *record, Record *oldRecord)
1185
FOR_INDEXES(index, this);
1186
Sync sync(&(index->syncUnique), "Table::updateIndexes");
1187
if(needUniqueCheck(index,record))
1190
sync.lock(Exclusive);
1191
if (!checkUniqueIndex(index, transaction, record , &sync))
1194
index->update(oldRecord, record, transaction);
1200
@brief Uniqueness check combined with index insert (atomic)
1202
void Table::insertIndexes(Transaction *transaction, RecordVersion *record)
1206
FOR_INDEXES(index, this);
1207
Sync sync(&index->syncUnique, "Table::insertIndexes");
1209
if (needUniqueCheck(index,record))
1212
sync.lock(Exclusive);
1214
if(!checkUniqueIndex(index, transaction, record, &sync))
1218
index->insert(record, transaction);
1165
1224
void Table::update(Transaction * transaction, Record * oldRecord, int numberFields, Field** updateFields, Value * * values)
1167
1226
database->preUpdate();
1224
1283
// Make insert/update atomic, then check for unique index duplicats
1228
checkUniqueIndexes(transaction, record);
1230
FOR_INDEXES(index, this);
1231
index->update(oldRecord, record, transaction);
1235
1286
scavenge.lock(Shared);
1236
1287
validateAndInsert(transaction, record);
1237
1288
transaction->addRecord(record);
1238
1289
updated = true;
1290
updateIndexes(transaction, record, oldRecord);
1240
1292
updateInversion(record, transaction);
1241
1293
fireTriggers(transaction, PostUpdate, oldRecord, record);
1262
if (record->priorVersion)
1263
record->priorVersion->setSuperceded(false);
1314
if (record->getPriorVersion())
1315
record->getPriorVersion()->setSuperceded(false);
1265
1317
if (record->state == recLock)
1266
1318
record->deleteData();
1290
1342
Record *record;
1292
1344
for (int32 next = 0; (record = fetchNext(next));)
1294
1346
next = record->recordNumber + 1;
1296
for (Record *version = record; version; version = version->getPriorVersion())
1297
if (version->hasRecord())
1298
FOR_FIELDS(field, this)
1299
if (field->flags & SEARCHABLE)
1302
version->getValue(field->id, &value);
1303
Filter stream(tableId, field->id, version->recordNumber, &value);
1304
//value.getStream(&stream, false);
1305
database->addInversion(&stream, transaction);
1349
Sync syncPrior(getSyncPrior(record->recordNumber), "Table::reIndexInversion");
1350
syncPrior.lock(Shared);
1352
for (Record *version = record; version; version = version->getPriorVersion())
1353
if (version->hasRecord())
1354
FOR_FIELDS(field, this)
1355
if (field->flags & SEARCHABLE)
1358
version->getValue(field->id, &value);
1359
Filter stream(tableId, field->id, version->recordNumber, &value);
1360
//value.getStream(&stream, false);
1361
database->addInversion(&stream, transaction);
1309
1366
record->release();
1392
1449
database->preUpdate();
1393
1450
Sync scavenge(&syncScavenge, "Table::deleteRecord");
1452
// syncPrior is not needed here. It is handled in fetchVersion()
1394
1453
Record *candidate = fetch(orgRecord->recordNumber);
1395
1454
checkAncestor(candidate, orgRecord);
1396
1455
RecordVersion *record;
1580
1639
// Update system.tables with new section ids and cardinality
1582
PreparedStatement *statement = database->prepareStatement("update system.tables set dataSection=?, blobSection=?, cardinality=? where tableId=?");
1641
PreparedStatement *statement = database->prepareStatement ( "update system.tables set dataSection=?,"
1642
" blobSection=?, cardinality=? where tableId=?");
1583
1643
statement->setInt(1, dataSectionId);
1584
1644
statement->setInt(2, blobSectionId);
1585
1645
statement->setLong(3, cardinality);
1720
1780
next = record->recordNumber + 1;
1722
for (Record *version = record; version; version = version->getPriorVersion())
1723
if (version->hasRecord())
1724
index->insert(version, transaction);
1783
Sync syncPrior(getSyncPrior(record->recordNumber), "Table::populateIndex");
1784
syncPrior.lock(Shared);
1786
for (Record *version = record; version; version = version->getPriorVersion())
1787
if (version->hasRecord())
1788
index->insert(version, transaction);
1726
1791
record->release();
1879
1944
void Table::expungeRecordVersions(RecordVersion *record, RecordScavenge *recordScavenge)
1881
1946
ASSERT(record->state != recLock);
1882
Record *prior = record->priorVersion;
1883
record->priorVersion = NULL;
1948
Record *prior = record->clearPriorVersion();
1885
1950
if (recordScavenge)
1886
1951
for (Record *rec = prior; rec; rec = rec->getPriorVersion())
2009
2077
dbb->expungeRecord(section, recordNumber);
2012
void Table::garbageCollect(Record * leaving, Record * staying, Transaction *transaction, bool quiet)
2080
void Table::garbageCollect(Record *leaving, Record *staying, Transaction *transaction, bool quiet)
2082
if (!leaving && !staying)
2085
Sync syncPrior(getSyncPrior(leaving ? leaving : staying), "Table::garbageCollect");
2086
syncPrior.lock(Shared);
2014
2088
// Clean up field indexes
2016
2090
FOR_INDEXES(index, this);
2205
fireTriggers(transaction, PostCommit, record->priorVersion, after);
2279
fireTriggers(transaction, PostCommit, record->getPriorVersion(), after);
2375
@brief Determine if the record we intend to write will have a duplicate conflict
2376
with any pending or visible records.
2377
@details For each index, call checkUniqueIndex.
2378
Return if the search succeeded by not finding a duplicate.
2379
Retry if a wait occurred.
2380
If a duplicate is found, an exception should be caught by the caller.
2383
void Table::checkUniqueIndexes(Transaction *transaction, RecordVersion *record)
2385
Record *oldRecord = record->priorVersion;
2391
FOR_INDEXES(index, this);
2393
if (INDEX_IS_UNIQUE(index->type) &&
2394
(!oldRecord || index->changed(record, oldRecord)))
2396
retry = checkUniqueIndex(index, transaction, record);
2410
2451
@brief Determine if the record we intend to write will have a duplicate conflict
2414
2455
Return false if no duplicate was found.
2417
bool Table::checkUniqueIndex(Index *index, Transaction *transaction, RecordVersion *record)
2458
bool Table::checkUniqueIndex(Index *index, Transaction *transaction, RecordVersion *record, Sync *sync)
2420
2461
IndexKey indexKey(index);
2424
2465
for (int32 recordNumber = 0; (recordNumber = bitmap.nextSet(recordNumber)) >= 0; ++recordNumber)
2426
int retry = checkUniqueRecordVersion(recordNumber, index, transaction, record);
2467
int retry = checkUniqueRecordVersion(recordNumber, index, transaction, record, sync);
2429
2470
return true; // restart the search since a wait occurred.
2441
2482
Throw an exception if a duplicate WAS found
2444
bool Table::checkUniqueRecordVersion(int32 recordNumber, Index *index, Transaction *transaction, RecordVersion *record)
2485
bool Table::checkUniqueRecordVersion(int32 recordNumber, Index *index, Transaction *transaction, RecordVersion *record, Sync *syncUnique)
2447
Record *oldRecord = record->priorVersion;
2488
Record *oldRecord = record->getPriorVersion();
2448
2489
Transaction *activeTransaction = NULL;
2449
2490
State state = CommittedVisible;
2459
2500
if ( !(rec = fetch(recordNumber)) )
2460
2501
return false; // Check next record number.
2503
Sync syncPrior(getSyncPrior(recordNumber), "Table::checkUniqueRecordVersion");
2504
syncPrior.lock(Shared);
2462
2506
for (Record *dup = rec; dup; dup = dup->getPriorVersion())
2464
2508
if (dup == record)
2531
2575
if (state == Active)
2533
// wait for that transaction, then restart checkUniqueIndexes()
2577
syncPrior.unlock(); // release lock before wait
2578
syncUnique->unlock(); // release lock before wait
2580
// Wait for that transaction, then restart checkUniqueIndexes()
2535
2582
state = transaction->getRelativeState(dup, WAIT_IF_ACTIVE);
2548
2595
else if (activeTransaction)
2597
syncPrior.unlock(); // release lock before wait
2598
syncUnique->unlock(); // release lock before wait
2550
2600
state = transaction->getRelativeState(activeTransaction,
2551
2601
activeTransaction->transactionId, WAIT_IF_ACTIVE);
2728
2778
next = record->recordNumber + 1;
2730
for (Record *version = record; version; version = version->getPriorVersion())
2731
if (version->hasRecord())
2732
for (field = fields; field; field = field->next)
2733
if (field->type == Asciiblob || field->type == Binaryblob)
2735
int id = version->getBlobId(field->id);
2738
references.set (id);
2781
Sync syncPrior(getSyncPrior(record->recordNumber), "Table::validateBlobs");
2782
syncPrior.lock(Shared);
2784
for (Record *version = record; version; version = version->getPriorVersion())
2785
if (version->hasRecord())
2786
for (field = fields; field; field = field->next)
2787
if (field->type == Asciiblob || field->type == Binaryblob)
2789
int id = version->getBlobId(field->id);
2792
references.set (id);
2741
2796
record->release();
2943
2998
// Make insert/update atomic, then check for unique index duplicats
2945
Sync sync(&syncUpdate, "Table::insert");
2949
checkUniqueIndexes(transaction, record);
2951
FOR_INDEXES(index, this);
2952
index->insert (record, transaction);
2956
3001
// Do the actual insert
2958
3003
transaction->addRecord(record);
2959
3004
bool ret = insert(record, NULL, recordNumber);
3006
insertIndexes(transaction, record);
2963
3009
record->release();
2991
3037
void Table::update(Transaction * transaction, Record *orgRecord, Stream *stream)
2993
3039
database->preUpdate();
2994
3041
Record *candidate = fetch(orgRecord->recordNumber);
2995
3042
checkAncestor(candidate, orgRecord);
3066
3113
attachment->preUpdate(this, record);
3069
// Make insert/update atomic, then check for unique index duplicats
3073
checkUniqueIndexes(transaction, record);
3075
FOR_INDEXES(index, this);
3076
index->update(oldRecord, record, transaction);
3080
3118
//updateInversion(record, transaction);
3081
3119
scavenge.lock(Shared);
3091
3129
updated = true;
3130
// Make insert/update atomic, then check for unique index duplicats
3131
updateIndexes(transaction, record, oldRecord);
3092
3132
//fireTriggers(transaction, PostUpdate, oldRecord, record);
3094
3134
// If this is a re-update in the same transaction and the same savepoint,
3118
if (record->priorVersion)
3119
record->priorVersion->setSuperceded(false);
3158
if (record->getPriorVersion())
3159
record->getPriorVersion()->setSuperceded(false);
3121
3161
if (record->state == recLock)
3122
3162
record->deleteData();
3219
3258
void Table::validateAndInsert(Transaction *transaction, RecordVersion *record)
3221
Sync sync(&syncObject, "Table::validateAndInsert");
3260
Sync syncTable(&syncObject, "Table::validateAndInsert");
3262
// Do not need syncPrior here since this is a new record.
3263
// No other thread can see this records priorVersion pointer.
3222
3265
Record *prior = record->getPriorVersion();
3224
3267
for (int n = 0; n < 10; ++n)
3228
sync.lock(Exclusive);
3271
syncTable.lock(Exclusive);
3229
3272
Record *current = fetch(record->recordNumber);
3346
3389
void Table::unlockRecord(RecordVersion* record, bool remove)
3348
3391
//int uc = record->useCount;
3349
ASSERT(record->priorVersion);
3392
ASSERT(record->getPriorVersion());
3351
3394
if (record->state == recLock)
3353
3396
record->state = recUnlocked;
3355
if (insert(record->priorVersion, record, record->recordNumber))
3398
if (insert(record->getPriorVersion(), record, record->recordNumber))
3357
3400
if (remove && record->transaction)
3358
3401
record->transaction->removeRecord(record);
3608
3657
Record *record = treeFetch(recordNumber);
3609
3658
Record *initial = record;
3660
Sync syncPrior(getSyncPrior(recordNumber), "Table::validateUpdate");
3661
syncPrior.lock(Shared);
3613
3665
if (record->getTransactionId() == transactionId)
3678
3732
void Table::deleteRecordBacklog(int32 recordNumber)
3680
Sync sync(&syncObject, "Table::rollbackRecord");
3734
Sync sync(&syncObject, "Table::deleteRecordBacklog");
3681
3735
sync.lock(Shared);
3682
3736
int32 backlogId = backloggedRecords->get(recordNumber);
3689
3743
database->backLog->deleteRecord(backlogId);
3747
SyncObject* Table::getSyncPrior(Record* record)
3749
int lockNumber = record->recordNumber % SYNC_VERSIONS_SIZE;
3750
return syncPriorVersions + lockNumber;
3753
SyncObject* Table::getSyncPrior(int recordNumber)
3755
int lockNumber = recordNumber % SYNC_VERSIONS_SIZE;
3756
return syncPriorVersions + lockNumber;
3759
static bool needUniqueCheck(Index *index, Record *record)
3761
Record *oldRecord = record->getPriorVersion();
3762
return (INDEX_IS_UNIQUE(index->type) &&
3763
(!oldRecord || index->changed(record, oldRecord)));