1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25
* PBMS transaction cache.
29
#include "cslib/CSConfig.h"
32
#include "cslib/CSGlobal.h"
34
#include "trans_cache_ms.h"
36
#define LIST_INC_SIZE 256 // If the list starts to grow it is probably because a backup is in progress so it could get quite large.
37
#define MIN_LIST_SIZE 32 // A list size that should be able to handle a normal transaction load.
38
#define MIN_CACHE_RECORDS 2
40
typedef struct myTrans {
41
uint8_t tc_type; // The transaction type. If the first bit is set then the transaction is an autocommit.
42
uint32_t tc_db_id; // The database ID for the operation.
43
uint32_t tc_tab_id; // The table ID for the operation.
44
bool tc_rolled_back; // 'true' if this action has been rolled back.
45
uint64_t tc_blob_id; // The blob ID for the operation.
46
uint64_t tc_blob_ref_id; // The blob reference id.
47
uint64_t tc_position; // The log position of the record.
48
} myTransRec, *myTransPtr;
50
typedef struct TransList {
55
uint64_t log_position; // The transaction log position of the start of the transaction.
56
MS_TxnState terminated; //
57
size_t size; // The allocated size of the list.
58
size_t len; // The number of records in the list that are being used.
60
} TransListRec, *TransListPtr;
62
MSTransCache::MSTransCache(): CSSharedRefObject(),
69
tc_TotalTransCount(0),
70
tc_TotalCacheCount(0),
71
tc_ReLoadingThread(NULL),
78
MSTransCache::~MSTransCache()
81
for (uint32_t i = 0; i < tc_Size; i++) {
83
cs_free(tc_List[i].list);
89
MSTransCache *MSTransCache::newMSTransCache(uint32_t min_size)
91
MSTransCache *tl = NULL;
94
new_(tl, MSTransCache());
97
if (MIN_LIST_SIZE > min_size)
98
min_size = MIN_LIST_SIZE;
100
tl->tc_Initialize(min_size);
108
void MSTransCache::tc_Initialize(uint32_t size)
113
size++; // Add an extra for the overflow
114
tc_List = (TransListPtr) cs_malloc(size * sizeof(TransListRec));
116
// Give each new transaction list record a short list of transaction records
117
for (uint32_t i = 0; i < tc_Size; i++) {
118
tc_List[i].list = (myTransPtr) cs_malloc(MIN_CACHE_RECORDS * sizeof(myTransRec));
119
tc_List[i].size = MIN_CACHE_RECORDS;
122
tc_List[i].log_position = 0;
123
tc_List[i].terminated = MS_Running;
126
tc_OverFlow = tc_List + tc_Size;
128
tc_OverFlow->list = NULL;
129
tc_OverFlow->size = 0;
130
tc_OverFlow->len = 0;
131
tc_OverFlow->tid = 0;
132
tc_OverFlow->log_position = 0;
133
tc_OverFlow->terminated = MS_Running;
137
//--------------------
138
void MSTransCache::tc_SetSize(uint32_t cache_size)
144
if (cache_size < MIN_LIST_SIZE)
145
cache_size = MIN_LIST_SIZE;
147
// If the cache is being reduced then free the record
148
// lists if the transactions about to be removed.
149
for (uint32_t i = cache_size +1; i < tc_Size; i++) {
151
cs_free(tc_List[i].list);
154
// Add one to cache_size for overflow.
155
cs_realloc((void **) &tc_List, (cache_size +1) * sizeof(TransListRec));
157
if (cache_size > tc_Size) {
158
// Move the overflow record.
159
memcpy(tc_List + cache_size, tc_List + tc_Size, sizeof(TransListRec));
161
for (uint32_t i = tc_Size; i < cache_size; i++) {
162
tc_List[i].list = (myTransPtr) cs_malloc(MIN_CACHE_RECORDS * sizeof(myTransRec));
163
tc_List[i].size = MIN_CACHE_RECORDS;
166
tc_List[i].log_position = 0;
167
tc_List[i].terminated = MS_Running;
173
tc_Size = cache_size;
174
tc_OverFlow = tc_List + tc_Size;
181
bool MSTransCache::tc_ShoulReloadCache()
183
return (((tc_Used +1) < tc_Size) && tc_Full);
186
uint64_t MSTransCache::tc_StartCacheReload(bool startup)
192
ASSERT((startup) || tc_Full);
193
tc_ReLoadingThread = self;
194
tc_OverFlowTID = tc_OverFlow->tid;
197
self->myTransRef = 0;
202
return_(tc_OverFlow->log_position);
205
bool MSTransCache::tc_ContinueCacheReload()
207
// Reload should continue until the list is full again and the termination records
208
// for the first and overflow transactions have been found.
210
// It is assumed the reload will also stop if there are no more records to
211
// be read in from the log.
213
return ((tc_List[tc_First].terminated == MS_Running) || // Keep searching for the terminator for the first txn.
214
(tc_OverFlow->tid == tc_OverFlowTID) || // The old overflow txn has not yet been loaded.
215
(tc_OverFlow->terminated == MS_Running) // If the overflow tnx is terminated then the cache is also full.
220
void MSTransCache::tc_CompleteCacheReload()
224
tc_ReLoadingThread = NULL;
225
if (tc_OverFlowTID) { // Clear the overflow condition;
226
tc_OverFlow->tid = 0;
234
#define OVERFLOW_TREF (tc_Size)
235
#define MAX_TREF (OVERFLOW_TREF +1)
237
// Create a new transaction record for the specified
239
TRef MSTransCache::tc_NewTransaction(uint32_t tid)
246
if (self != tc_ReLoadingThread) {
247
tc_TotalTransCount++;
250
// Once we have entered an overflow state we remain in it until
251
// the cache has been reloaded even if there is now space in the cache.
252
// This is to ensure that the transactions are loaded into the cache
253
// in the correct order.
254
// While reloading, make sure that any attempt to add a transaction by any thread
255
// other than tc_ReLoadingThread recieves an overflow condition.
258
if (tc_ReLoadingThread != self) {
265
ASSERT(tc_OverFlow->tid == tid); // The first txn reloaded should be the overflow txn
269
if (tid == tc_OverFlowTID) {
271
tc_OverFlow->old_tid = tid;
273
tc_OverFlow->tid = 0;
274
tc_OverFlow->terminated = MS_Running;
275
ASSERT((tc_Used +1) < tc_Size); // There should be room in the list for the old everflow txn.
276
} else if (tc_OverFlowTID == 0) {
277
// We are seaching for the end of the overflow txn
278
// and found the start of another txn.
284
if ((tc_Used +1) == tc_Size){
285
// The cache is full.
287
tc_OverFlow->tid = tid; // save the tid of the first transaction to overflow.
288
tc_OverFlow->log_position = -1;
289
tc_OverFlow->len = 0;
290
tc_OverFlow->terminated = MS_Running;
300
if (self != tc_ReLoadingThread) {
301
tc_TotalCacheCount++;
308
static uint32_t last_tid = 0;
309
static bool last_state = false;
310
if (tc_Recovering != last_state)
313
last_state = tc_Recovering;
314
if (!( ((last_tid + 1) == tid) || !last_tid))
315
printf("Expected tid %"PRIu32"\n", last_tid + 1);
316
ASSERT( ((last_tid + 1) == tid) || !last_tid);
321
tc_List[ref].tid = tid;
322
tc_List[ref].len = 0;
323
tc_List[ref].log_position = -1;
324
tc_List[ref].terminated = MS_Running;
326
// Update these after initializing the structure because
327
// the reader thread may read it as soon as tc_EOL is updated.
331
if (tc_EOL == tc_Size)
336
self->myTransRef = ref;
337
self->myCacheVersion = tc_CacheVersion;
341
void MSTransCache::tc_FindTXNRef(uint32_t tid, TRef *tref)
343
uint32_t i = tc_First;
346
// Search for the record
347
if (tc_First > tc_EOL) {
348
for (; i < OVERFLOW_TREF && *tref >= MAX_TREF; i++) {
349
if (tc_List[i].tid == tid)
355
for (; i < tc_EOL && *tref >= MAX_TREF; i++) {
356
if (tc_List[i].tid == tid)
360
// Do not return the overflow reference if the tid = tc_OverFlowTID.
361
// This may seem a bit strange but it is needed so that the overflow txn
362
// will get a new non-overflow cache slot when it is reloaded.
363
if ((*tref >= MAX_TREF) && (tid == tc_OverFlow->tid) && (tid != tc_OverFlowTID))
364
*tref = OVERFLOW_TREF;
367
self->myTransRef = *tref;
368
self->myCacheVersion = tc_CacheVersion;
372
// Add a transaction record to an already existing transaction
373
// or possible creating a new one. Depending on the record added this may
374
// also commit or rollback the transaction.
375
void MSTransCache::tc_AddRec(uint64_t log_position, MSTransPtr rec, TRef tref)
383
if (tref == TRANS_CACHE_UNKNOWN_REF) { // It is coming from a reload
384
ASSERT(tc_ReLoadingThread == self); // Sanity check here
386
if ((self->myTID == rec->tr_id) && (self->myTransRef != TRANS_CACHE_UNKNOWN_REF))
387
tref = self->myTransRef;
389
tc_FindTXNRef(rec->tr_id, &tref);
390
if (tref == TRANS_CACHE_UNKNOWN_REF) {
391
if (!TRANS_IS_START(rec->tr_type))
392
goto done; // Ignore partial tansaction reloads.
394
tref = tc_NewTransaction(rec->tr_id);
399
ASSERT((tref <= MAX_TREF) || (tref == TRANS_CACHE_NEW_REF));
400
ASSERT(self->myTID == rec->tr_id);
403
if (tref >= OVERFLOW_TREF) {
404
if (tref == TRANS_CACHE_NEW_REF) {
405
ASSERT(TRANS_IS_START(rec->tr_type));
406
tref = tc_NewTransaction(rec->tr_id);
407
} else if (self->myCacheVersion != tc_CacheVersion) {
408
// Check to see if the transaction if now in the cache
409
tc_FindTXNRef(rec->tr_id, &tref);
412
if (tref >= OVERFLOW_TREF){ // Overflow.
413
if (tref == OVERFLOW_TREF) {
414
if (!tc_OverFlow->len)
415
tc_OverFlow->log_position = log_position;
418
if (TRANS_IS_TERMINATED(rec->tr_type)) {
419
if (rec->tr_type == MS_RollBackTxn)
420
tc_OverFlow->terminated = MS_RolledBack;
421
else if (rec->tr_type == MS_RecoveredTxn)
422
tc_OverFlow->terminated = MS_Recovered;
424
tc_OverFlow->terminated = MS_Committed;
432
lrec = tc_List + tref;
435
ASSERT(lrec->tid == rec->tr_id);
437
if (!lrec->len) { // The first record in the transaction
438
lrec->log_position = log_position;
439
} else if (( (TRANS_TYPE(rec->tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec->tr_type) == MS_DereferenceTxn)) && !tc_Recovering) {
440
// Make sure the record isn't already in the list.
441
// This can happen during cache reload.
442
for (uint32_t i = 0; i < lrec->len; i++) {
443
if (lrec->list[i].tc_position == log_position)
448
// During recovery there is no need to cache the records.
449
if (!tc_Recovering) {
450
switch (TRANS_TYPE(rec->tr_type)) {
453
case MS_RecoveredTxn:
454
// This is handled below;
457
case MS_PartialRollBackTxn:
459
// The rollback position is stored in the place for the database id.
460
for (uint32_t i = rec->tr_db_id;i < lrec->len; i++)
461
lrec->list[i].tc_rolled_back = true;
466
case MS_ReferenceTxn:
467
case MS_DereferenceTxn:
471
if (lrec->len == lrec->size) { //Grow the list if required
472
cs_realloc((void **) &(lrec->list), (lrec->size + 10)* sizeof(myTransRec));
476
my_rec = lrec->list + lrec->len;
477
my_rec->tc_type = rec->tr_type;
478
my_rec->tc_db_id = rec->tr_db_id;
479
my_rec->tc_tab_id = rec->tr_tab_id;
480
my_rec->tc_blob_id = rec->tr_blob_id;
481
my_rec->tc_blob_ref_id = rec->tr_blob_ref_id;
482
my_rec->tc_position = log_position;
483
my_rec->tc_rolled_back = false;
490
} else if ( (TRANS_TYPE(rec->tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec->tr_type) == MS_DereferenceTxn))
494
// Check to see if this is a commit or rollback
495
// Do this last because as soon as it is marked as terminated
496
// the reader thread may start processing it.
497
if (TRANS_IS_TERMINATED(rec->tr_type)) {
498
if (rec->tr_type == MS_RollBackTxn)
499
lrec->terminated = MS_RolledBack;
500
else if (rec->tr_type == MS_RecoveredTxn)
501
lrec->terminated = MS_Recovered;
503
lrec->terminated = MS_Committed;
511
// Get the transaction ref of the first transaction in the list.
512
// Sets committed to true or false depending on if the transaction is terminated.
513
// If there is no trsansaction then false is returned.
514
bool MSTransCache::tc_GetTransaction(TRef *ref, bool *terminated)
519
ASSERT(tc_List[tc_First].tid);
522
*terminated = (tc_List[tc_First].terminated != MS_Running);
528
bool MSTransCache::tc_GetTransactionStartPosition(uint64_t *log_position)
530
if ((!tc_Used) || (tc_List[tc_First].len == 0))
533
*log_position = tc_List[tc_First].log_position;
538
MS_TxnState MSTransCache::tc_TransactionState(TRef ref)
540
ASSERT((ref < tc_Size) && tc_List[ref].tid);
542
return tc_List[ref].terminated;
545
uint32_t MSTransCache::tc_GetTransactionID(TRef ref)
547
ASSERT((ref < tc_Size) && tc_List[ref].tid);
549
return (tc_List[ref].tid);
552
// Remove the transaction and all record associated with it.
553
void MSTransCache::tc_FreeTransaction(TRef tref)
557
ASSERT(tc_Used && (tref < tc_Size) && tc_List[tref].tid);
561
static uint32_t last_tid = 0;
562
static bool last_state = false;
563
if (tc_Recovering != last_state)
566
last_state = tc_Recovering;
567
ASSERT( ((last_tid + 1) == tc_List[tref].tid) || !last_tid);
568
last_tid = tc_List[tref].tid;
572
lrec = tc_List + tref;
574
lrec->old_tid = lrec->tid;
579
if (lrec->size > 10) { // Free up some excess records.
580
cs_realloc((void **) &(lrec->list), 10* sizeof(myTransRec));
587
if (tref == tc_First) { // Reset the start of the list.
588
TRef eol = tc_EOL; // cache this incase it changes
590
// Skip any unused records indicated by a zero tid.
591
if (tc_First > eol) {
592
for (; tc_First < tc_Size && !tc_List[tc_First].tid; tc_First++) ;
594
if (tc_First == tc_Size)
598
for (; tc_First < eol && !tc_List[tc_First].tid; tc_First++) ;
601
ASSERT( (tc_Used == 0 && tc_First == tc_EOL) || (tc_Used != 0 && tc_First != tc_EOL));
608
//--------------------
609
bool MSTransCache::tc_GetRecAt(TRef tref, size_t index, MSTransPtr rec, MS_TxnState *state)
614
ASSERT(tc_Used && (tref < tc_Size) && tc_List[tref].tid);
617
static uint32_t last_tid = 0;
618
ASSERT( ((last_tid + 1) == tc_List[tref].tid) || (last_tid == tc_List[tref].tid) || !last_tid);
619
last_tid = tc_List[tref].tid;
623
lrec = tc_List + tref;
624
if (index < lrec->len) {
625
myTransPtr my_rec = lrec->list + index;
627
rec->tr_type = my_rec->tc_type;
628
rec->tr_db_id = my_rec->tc_db_id;
629
rec->tr_tab_id = my_rec->tc_tab_id;
630
rec->tr_blob_id = my_rec->tc_blob_id;
631
rec->tr_blob_ref_id = my_rec->tc_blob_ref_id;
632
rec->tr_id = lrec->tid;
634
if (my_rec->tc_rolled_back)
635
*state = MS_RolledBack;
637
*state = lrec->terminated;
645
//--------------------
646
void MSTransCache::tc_dropDatabase(uint32_t db_id)
651
for (uint32_t i = 0; i < tc_Size; i++) {
652
myTransPtr rec = tc_List[i].list;
654
uint32_t list_len = tc_List[i].len;
656
if (rec->tc_db_id == db_id)