~pbms-core/pbms/5.11-beta

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
 *
 * PrimeBase Media Stream for MySQL
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 *
 * Barry Leslie
 *
 * 2009-06-09
 *
 * H&G2JCtL
 *
 * PBMS transaction handling.
 *
 * PBMS uses 1 circular transaction log. All BLOB reference operations are written to this log
 * and are applied to the repository when committed. There is 1 thread dedicated to reading the 
 * transaction log and applying the changes. During an engine level backup this thread is suspended 
 * so that no transactions will be applied to the repository files as they are backed up.
 *
 */
 
#ifndef __TRANSLOG_MS_H__
#define __TRANSLOG_MS_H__
#include <stddef.h>

#include "CSDefs.h"
#include "CSFile.h"
#define CHECK_TIDS
#define CRASH_TEST

#ifdef CRASH_TEST
extern csWord4	trans_test_crash_point;
#define MAX_CRASH_POINT 10
#else
#define MAX_CRASH_POINT 0
#endif

typedef csWord4 TRef;

/*
	Transaction log info:
	
	The transaction log is a circular log of fixed length records. There is assumed to be one 
	reader thread and multiple writer threads. As records are written the 'eol' (End Of Log)
	marker is advanced and as they are read the 'start' marker is advanved. When iether marker
	reaches the end of the log a wrap around is done the marker is position back to the top of
	the list.
	
	When both markers are at the same location then the log is empty. The log is full if the 
	eol marker is just behind the start marker.
	
	If an overflow occurs then the overflow flag in the log header is set and records are written 
	to the end of the log. New records will continue to be written to the end of log until the 
	reader thread has read ALL of the records in the non overflow portion of the list. When all 
	of these records have been read then the list size will be adjusted to include the overflow
	record and the start and eol markers are repositioned and the overflow flag in the 
	header is switched off.
	
	
*/
typedef struct MSDiskTransHead {
	CSDiskValue4			th_magic_4;					/* Table magic number. */
	CSDiskValue2			th_version_2;				/* The header version. */

	CSDiskValue4			th_next_txn_id_4;			/* The next valid transaction ID. */

	CSDiskValue2			th_check_point_2;			/* The frequency whith which the start/end positions are updated in the header. */
	
	CSDiskValue4			th_requested_cache_size_4;	/* The transaction cache list size in transaction. */

	CSDiskValue8			th_list_size_8;				/* The transaction log list size in records. */
	CSDiskValue8			th_requested_list_size_8;	/* The desired list size. The log will be adjusted to this size as soon as it is convenient.*/

	CSDiskValue1			th_recovered_1;				/* A flag to indicate if the log was closed properly. */

	CSDiskValue1			th_overflow_1;				/* A flag to indicate if overflow has occurred. */

	// th_start_8 and th_eol_8 are always written at the same time.
	CSDiskValue8			th_start_8;					/* The index of the first valid record. */
	CSDiskValue8			th_eol_8;					/* The index of the first unused record or End Of Log (eol). */
	CSDiskValue1			th_checksum_1;				/* The current record checksum seed. */
} MSDiskTransHeadRec, *MSDiskTransHeadPtr;


typedef struct MSTrans_tag {
	csWord4	tr_id;			// The transaction ID
	csWord1	tr_type;		// The transaction type. If the first bit is set then the transaction is an autocommit.
	csWord4	tr_db_id;		// The database ID for the operation.
	csWord4	tr_tab_id;		// The table ID for the operation.
	csWord8	tr_blob_id;		// The blob ID for the operation.
	csWord8	tr_blob_ref_id;	// The blob reference id.
	csWord1	tr_check;		// The transaction record checksum.
} MSTransRec, *MSTransPtr;


typedef struct MSTransStats {
	csWord8	ts_LogSize;			// The number of records in the transaction log.
	csWord4	ts_PercentFull;		// The % of the desired log size in use. This can be > 100%.
	csWord8	ts_MaxSize;			// The log size high water mark.
	csWord4	ts_OverflowCount;	// The number of times the log has overflowen.
	bool	ts_IsOverflowing;
	
	csWord4 ts_TransCacheSize;	// The number of transactions currently in the cache.
	csWord4 ts_PercentTransCacheUsed;	// The number of transactions currently in the cache.
	csWord4	ts_PercentCacheHit; // The % of the transactions that were cached on writing.
} MSTransStatsRec, *MSTransStatsPtr;

typedef enum {	MS_RollBackTxn = 0, 
				MS_PartialRollBackTxn,
				MS_CommitTxn, 
				MS_ReferenceTxn, 
				MS_DereferenceTxn, 
				MS_RecoveredTxn			
} MS_Txn;

typedef enum {	MS_Running = 0,
				MS_RolledBack, 
				MS_Committed, 
				MS_Recovered			
} MS_TxnState;


#define TRANS_SET_AUTOCOMMIT(t) (t |= 0X80)	
#define TRANS_IS_AUTOCOMMIT(t) (t & 0X80)	

#define TRANS_SET_START(t) (t |= 0X40)	
#define TRANS_IS_START(t) (t & 0X40)	

#define TRANS_TYPE_IS_TERMINATED(t) (((t) == MS_RollBackTxn) || ((t) == MS_CommitTxn) || ((t) == MS_RecoveredTxn))	
#define TRANS_IS_TERMINATED(t) (TRANS_TYPE_IS_TERMINATED(TRANS_TYPE(t))  || TRANS_IS_AUTOCOMMIT(t))	
#define TRANS_TYPE(t) (t & 0X0F)	

typedef bool (*CanContinueFunc)();
typedef void (*LoadFunc)(csWord8 log_position, MSTransPtr rec);

class MSTransCache;
class MSTrans : public CSSharedRefObject {

public:
	
	MSTrans();
	~MSTrans();
	
	void txn_LogTransaction(MS_Txn type, bool autocommit = false, csWord4 db_id = 0, csWord4 tab_id = 0, csWord8 blob_id = 0, csWord8 blob_ref_id = 0);

	void txn_LogPartialRollBack(csWord4 rollBackCount)
	{
		/* Partial rollbacks store the rollback count in the place of the database id. */
		txn_LogTransaction(MS_PartialRollBackTxn, false, rollBackCount);
	}
	
	void txn_SetCheckPoint(csWord2 checkpoint)
	{
		enter_();
		
		// Important lock order. Writer threads never lock the reader but the reader
		// may lock this object so always lock the reader first.
		lock_(txn_reader);
		lock_(this);
		
		txn_MaxCheckPoint = checkpoint;
		
		if (txn_MaxCheckPoint < 10)
			txn_MaxCheckPoint = 10;
			
		if (txn_MaxCheckPoint > txn_MaxRecords)
			txn_MaxCheckPoint = txn_MaxRecords/2;
		
		if (txn_MaxCheckPoint > txn_ReqestedMaxRecords)
			txn_MaxCheckPoint = txn_ReqestedMaxRecords/2;
		
		CS_SET_DISK_2(txn_DiskHeader.th_check_point_2, txn_MaxCheckPoint);
		
		txn_File->write(&(txn_DiskHeader.th_check_point_2), offsetof(MSDiskTransHeadRec, th_check_point_2), 2);
		txn_File->flush();
		txn_File->sync();
		
		unlock_(this);
		unlock_(txn_reader);
		
		exit_();
	}
	
	void txn_SetCacheSize(csWord4 new_size);
	
	// txn_SetLogSize() may not take effect immediately but will be done
	// when there is free space at the end of the log.
	void txn_SetLogSize(csWord8 new_size);
	
	void txn_Close();	
	
	csWord8	txn_GetSize();		// Returns the size of the log in transaction records.
	
	csWord8	txn_GetNumRecords()	// Returns the number of transactions records waiting to be processed.
	{							// This doesn't include overflow.
		csWord8 size;
		if (txn_Start == txn_EOL)
			size = 0;
		else if (txn_Start < txn_EOL) 
			size = txn_EOL - txn_Start;
		else 
			size = txn_MaxRecords - (txn_Start - txn_EOL);
			
		return size;
	}

	// While a backup is in progress the transaction thread will not be signaled 
	// about completed transactions.
	void txn_BackupStarting() 
	{
		txn_Doingbackup = true;
		txn_reader->suspend();
	}
	
	bool txn_haveNextTransaction();
	
	void txn_BackupCompleted()
	{
		txn_Doingbackup = false;
		txn_reader->resume();
	}
	
	// The following should only be called by the transaction processing thread.
	
	// txn_GetNextTransaction() gets the next completed transaction.
	// If there is none ready it waits for one.	
	void txn_GetNextTransaction(MSTransPtr tran, MS_TxnState *state); 
		
	void txn_SetReader(CSDaemon *reader) {txn_reader = reader;}
	
	// Search the transaction log for a MS_ReferenceTxn record for the given BLOB.
	bool txn_FindBlobRef(MS_TxnState *state, csWord4 db_id, csWord4 tab_id, csWord8 blob_id);
	
	// Mark all transactions for a given database as dropped. Including commited transactions.
	void txn_dropDatabase(csWord4 db_id);
	

	csWord8 txn_GetStartPosition() { return txn_Start;}
	
	const char	*txn_GetTXNLogPath() {return txn_File->myFilePath->getCString();}
private:
	friend class ReadTXNLog;
	
	csWord2		txn_MaxCheckPoint;	// The maximum records to be written ore read before the positions in the header are updated.

	// These fields are only used by the reader thread:
	bool		txn_Doingbackup;// Is the database being backed up.
	CSDaemon	*txn_reader;	// THe transaction log reader daemon. (unreferenced)
	bool		txn_IsTxnValid;	// Is the current transaction valid.
	TRef		txn_CurrentTxn; // The current transaction.
	csWord4		txn_TxnIndex;	// The record index into the current transaction.
	csInt4		txn_StartCheckPoint; // Counter to determin when the read position should be flushed.
	
	void txn_PerformIdleTasks();
	
	MSTransCache	*txn_TransCache;	// Transaction cache
	
	void txn_ResizeLog();
	
	void txn_NewTransaction(); // Clears the old transaction ID
	
	bool txn_IsFull()
	{
		return (txn_HaveOverflow || ((txn_GetNumRecords() +1) == txn_MaxRecords));
	}
	
	
	csWord4				txn_BlockingTransaction; // The transaction ID the transaction thread is waiting on.

	MSDiskTransHeadRec	txn_DiskHeader;
	CSFile				*txn_File;
	
	csInt4				txn_EOLCheckPoint; // Counter to determin when the EOL position should be flushed.
	
	// The size of the transaction log can be adjusted by setting txn_ReqestedMaxRecords.
	// The log size will be adjusted as soon as there are free slots at the bottom of the list.
	csWord8				txn_MaxRecords;			// The number of record slots in the current list.
	csWord8				txn_ReqestedMaxRecords;	// The number of record slots requested.  	

	csWord8				txn_HighWaterMark; // Keeps track of the log size high water mark.
	csWord8				txn_OverflowCount; // A count of the number of times the transaction log has over flown.
#ifdef DEBUG	
public:
	void				txn_DumpLog(const char *file);
#endif
	csWord4				txn_MaxTID;
	bool				txn_Recovered;				// Has the log been recovered.
	bool				txn_HaveOverflow;			// A flag to indicate the list has overfown.
	csWord8				txn_Overflow;				// The index of the next overflow record. 
	csWord8				txn_EOL;					// The index of the first unused record or End Of Log (eol). 
	csWord8				txn_Start;					// The index of the first valid record. 

public:	
	void txn_GetStats(MSTransStatsPtr stats);		// Get the current performance statistics.
	
private:
	csWord1				txn_Checksum;				// The current record checksum seed. 
	
	void txn_SetFile(CSFile *tr_file);		// Set the file to use for the transaction log.
	bool txn_ValidRecord(MSTransPtr rec);	// Check to see if a record is valid.
	void txn_GetRecordAt(csWord8 index, MSTransPtr rec); // Reads 1 record from the log.
	void txn_ResetReadPosition(csWord8 pos);	// Reset txn_Start
	void txn_ResetEOL();
		
	void txn_Recover();							// Recover the transaction log.
	
	void txn_ReadLog(csWord8 read_start, bool log_locked, CanContinueFunc canContinue, LoadFunc load); // A generic method for reading the log
	void txn_LoadTransactionCache(csWord8 read_start);	// Load the transactions in the log into cache.
	
	void txn_AddTransaction(csWord1 tran_type, bool autocommit = false, csWord4 db_id = 0, csWord4 tab_id = 0, csWord8 blob_id = 0, csWord8 blob_ref_id = 0);

	
public:
	static MSTrans* txn_NewMSTrans(const char *log_path, bool dump_log = false);
};

class ReadTXNLog {
	public:
	ReadTXNLog(MSTrans *txn_log): rl_log(txn_log){}
	
	MSTrans *rl_log;
	void rl_ReadLog(csWord8 read_start, bool log_locked);
	virtual bool rl_CanContinue();
	virtual void rl_Load(csWord8 log_position, MSTransPtr rec);
	void rl_Store(csWord8 log_position, MSTransPtr rec);
	void rl_Flush();
};

#endif //__TRANSLOG_MS_H__