1
/****************************************************************************
3
| Copyright (c) 2007 Novell, Inc.
6
| This program is free software; you can redistribute it and/or
7
| modify it under the terms of version 2 of the GNU General Public License as
8
| published by the Free Software Foundation.
10
| This program is distributed in the hope that it will be useful,
11
| but WITHOUT ANY WARRANTY; without even the implied warranty of
12
| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
| GNU General Public License for more details.
15
| You should have received a copy of the GNU General Public License
16
| along with this program; if not, contact Novell, Inc.
18
| To contact Novell about this file by physical or electronic mail,
19
| you may find current contact information at www.novell.com
21
| Author: Mike Lasky <mlasky@novell.com>
22
|***************************************************************************/
26
using System.Collections;
27
using System.Diagnostics;
29
using System.Runtime.Serialization;
30
using System.Runtime.Serialization.Formatters.Binary;
32
using System.Threading;
36
using Simias.Client.Event;
41
namespace Simias.Storage
44
/// Class used to implement an inprocess mutex that protects the log file from reentrancy.
46
internal class LogMutex
50
/// Table used to keep track of per log file mutexes.
52
static private Hashtable mutexTable = new Hashtable();
57
/// Initializes a new instance of the Mutex class with default properties.
59
/// <param name="collectionID">Identifier of the collection associated
60
/// with the log file.</param>
61
public LogMutex( string collectionID )
63
lock( typeof( LogMutex ) )
65
// See if a mutex already exists for this collection's logfile.
66
if ( !mutexTable.ContainsKey( collectionID ) )
68
mutexTable.Add( collectionID, new Mutex() );
74
/// Initializes a new instance of the Mutex class with a Boolean value
75
/// indicating whether the calling thread should have initial ownership
78
/// <param name="collectionID">Identifier of the collection associated
79
/// with the log file.</param>
80
/// <param name="initiallyOwned">true to give the calling thread initial
81
/// ownership of the mutex; otherwise, false.</param>
82
public LogMutex( string collectionID, bool initiallyOwned )
85
lock( typeof( LogMutex ) )
87
if ( !mutexTable.ContainsKey( collectionID ) )
89
mutexTable.Add( collectionID, new Mutex( initiallyOwned ) );
94
// If the mutex already existed and the caller specified to acquire
95
// the mutex before returning, get it now.
96
if ( !created && initiallyOwned )
98
WaitOne( collectionID );
103
#region Public Methods
105
/// Releases all resources held by the current WaitHandle.
107
/// <param name="collectionID">Identifier of the collection associated
108
/// with the log file.</param>
109
public void Close( string collectionID )
111
lock( typeof( LogMutex ) )
113
Mutex mutex = mutexTable[ collectionID ] as Mutex;
117
mutexTable.Remove( collectionID );
123
/// Releases the mutex once.
125
/// <param name="collectionID">Identifier of the collection associated
126
/// with the log file.</param>
127
public void ReleaseMutex( string collectionID )
131
lock ( typeof( LogMutex ) )
133
mutex = mutexTable[ collectionID ] as Mutex;
138
throw new SimiasException( "Log mutex does not exist for collection " + collectionID );
141
mutex.ReleaseMutex();
145
/// Blocks the current thread until the current WaitHandle receives a signal.
147
/// <param name="collectionID">Identifier of the collection associated
148
/// with the log file.</param>
149
/// <returns>true if the current instance receives a signal. If the current
150
/// instance is never signaled, WaitOne never returns.</returns>
151
public bool WaitOne( string collectionID )
155
lock( typeof( LogMutex ) )
157
mutex = mutexTable[ collectionID ] as Mutex;
162
throw new SimiasException( "Log mutex does not exist for collection " + collectionID );
165
return mutex.WaitOne();
169
/// Blocks the current thread until the current WaitHandle receives a signal,
170
/// using 32-bit signed integer to measure the time interval and specifying
171
/// whether to exit the synchronization domain before the wait.
173
/// <param name="collectionID">Identifier of the collection associated
174
/// with the log file.</param>
175
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or
176
/// Timeout.Infinite (-1) to wait indefinitely. </param>
177
/// <param name="exitContext">true to exit the synchronization domain for the
178
/// context before the wait (if in a synchronized context), and reacquire it; otherwise, false.</param>
179
/// <returns>true if the current instance receives a signal; otherwise, false.</returns>
180
public bool WaitOne( string collectionID, int millisecondsTimeout, bool exitContext )
184
lock( typeof( LogMutex ) )
186
mutex = mutexTable[ collectionID ] as Mutex;
191
throw new SimiasException( "Log mutex does not exist for collection " + collectionID );
194
return mutex.WaitOne( millisecondsTimeout, exitContext );
198
/// Blocks the current thread until the current instance receives a signal,
199
/// using a TimeSpan to measure the time interval and specifying whether to
200
/// exit the synchronization domain before the wait.
202
/// <param name="collectionID">Identifier of the collection associated
203
/// with the log file.</param>
204
/// <param name="timeout">The number of milliseconds to wait, or a TimeSpan
205
/// that represents -1 milliseconds to wait indefinitely.</param>
206
/// <param name="exitContext">true to exit the synchronization domain for the
207
/// context before the wait (if in a synchronized context), and reacquire it;
208
/// otherwise, false.</param>
209
/// <returns>true if the current instance receives a signal; otherwise, false.</returns>
210
public bool WaitOne( string collectionID, TimeSpan timeout, bool exitContext )
214
lock( typeof( LogMutex ) )
216
mutex = mutexTable[ collectionID ] as Mutex;
221
throw new SimiasException( "Log mutex does not exist for collection " + collectionID );
224
return mutex.WaitOne( timeout, exitContext );
230
/// Class used to queue change log events.
232
internal class ChangeLogEvent
234
#region Class Members
236
/// Type of change events that are watched for.
238
public enum ChangeEventType
241
/// Collection was created. Create a ChangeLogWriter.
246
/// Collection was deleted. Delete the ChangeLogWriter.
251
/// Node was created in a collection.
256
/// Node was changed in a collection.
261
/// Node was deleted in a collection.
269
private ChangeEventType type;
272
/// Context for the event.
274
private NodeEventArgs args;
279
/// Gets the change event type.
281
public ChangeEventType Type
287
/// Gets the event context.
289
public NodeEventArgs Args
297
/// Initializes a new instance of the object.
299
/// <param name="type">Type of change long event.</param>
300
/// <param name="args">Context for the event.</param>
301
public ChangeLogEvent( ChangeEventType type, NodeEventArgs args )
310
/// Exception that indicates that the event context cookie has expired and that
311
/// sync must dredge for changes.
313
public class CookieExpiredException : SimiasException
317
/// Initializes a new instance of the object class.
319
public CookieExpiredException() :
320
base ( "The event context cookie has expired." )
325
/// Initializes a new instance of the object class.
327
/// <param name="innerException">The exception that cause the cookie to be invalid.</param>
328
public CookieExpiredException( Exception innerException ) :
329
base ( "The event context cookie has expired.", innerException )
337
/// Class used as a context object to look up the next set of events for a specified ChangeLogReader
341
public class EventContext
343
#region Class Members
345
/// Date and time of event.
347
private DateTime timeStamp;
350
/// Assigned event ID.
352
private ulong recordID;
355
/// Hint to where the last record was read from in the file. It may not be valid.
360
/// Used as separator in string representation.
362
private const char valueSeparator = ':';
367
/// Gets the timestamp portion of the context.
369
internal DateTime TimeStamp
371
get { return timeStamp; }
372
set { timeStamp = value; }
376
/// Gets the record ID portion of the context.
378
internal ulong RecordID
380
get { return recordID; }
381
set { recordID = value; }
385
/// Gets the hint of where the last event was read from.
390
set { hint = value; }
396
/// Initializes a new instance of the object class.
398
/// <param name="timeStamp">Date and time of event.</param>
399
/// <param name="recordID">Assigned event ID.</param>
400
/// <param name="hint">Hint to where the last record was read from in the file. It may not be valid.</param>
401
internal EventContext( DateTime timeStamp, ulong recordID, long hint )
403
this.timeStamp = timeStamp;
404
this.recordID = recordID;
409
/// Initializes a new instance from a string obtained from ToString
411
/// <param name="cookie">The string representation of the context.</param>
412
public EventContext( string cookie)
414
// Default values if cookie string is bad.
415
timeStamp = DateTime.MinValue;
417
hint = LogFileHeader.RecordSize;
419
if ( ( cookie != null ) && ( cookie != String.Empty ) )
421
string [] values = cookie.Split( valueSeparator );
422
if ( values.Length == 3 )
424
timeStamp = new DateTime( long.Parse( values[ 0 ] ) );
425
recordID = ulong.Parse( values[ 1 ] );
426
hint = long.Parse( values[ 2 ] );
433
#region Public Methods
435
/// Gets a string representation of this context.
436
/// Can be used to store this cookie for later use.
438
/// <returns>A formatted string representing the cookie.</returns>
439
public override string ToString()
441
return (timeStamp.Ticks.ToString() + valueSeparator + recordID.ToString() + valueSeparator + hint.ToString());
447
/// Contains the layout of the LogFile header information.
449
public class LogFileHeader
451
#region Class Members
453
/// Encoded lengths of the object fields.
455
private const int logFileIDSize = 16;
456
private const int maxLogRecordsSize = 4;
457
private const int maxFlagsSize = 4;
458
private const int lastRecordSize = 8;
459
private const int recordLocationSize = 8;
462
/// This is the total encoded record size.
464
private const int encodedRecordSize = logFileIDSize +
471
/// Contains the identifier for this log file.
473
private string logFileID;
476
/// Maximum number of records to keep persisted in the file.
478
private uint maxLogRecords;
486
/// Last record written to the file. This is just a hint and may
487
/// or may not be valid.
489
private ulong lastRecord;
492
/// File position of last record written to the file. This is just
493
/// a hint and may or may not be valid.
495
private long recordLocation;
500
/// Gets the length of the record.
504
get { return RecordSize; }
508
/// Gets or sets the logFileID.
510
public string LogFileID
512
get { return logFileID; }
513
set { logFileID = value; }
517
/// Gets or sets the maximum number of ChangeLog records in the file.
519
public uint MaxLogRecords
521
get { return maxLogRecords; }
522
set { maxLogRecords = value; }
526
/// Returns the size of the LogFileHeader record.
528
static public int RecordSize
530
get { return encodedRecordSize; }
534
/// Gets or sets the last record written to the file. This is only a
535
/// hint and may or may not be valid.
537
public ulong LastRecord
539
get { return lastRecord; }
540
set { lastRecord = value; }
544
/// Gets or sets the file position of the last record in the file.
545
/// This is just a hint and may or may not be valid.
547
public long RecordLocation
549
get { return recordLocation; }
550
set { recordLocation = value; }
556
/// Initializes a new instance of the struct.
558
/// <param name="ID">Contains the identifier for this log file.</param>
559
/// <param name="maxRecords">Maximum number of records to keep persisted in the file.</param>
560
public LogFileHeader( string ID, uint maxRecords )
563
maxLogRecords = maxRecords;
570
/// Initializes a new instance of the struct from an encoded byte array.
572
/// <param name="encodedRecord">LogFileHeader encoded record.</param>
573
public LogFileHeader( byte[] encodedRecord )
577
byte[] guidArray = new byte[ 16 ];
578
Array.Copy( encodedRecord, 0, guidArray, 0, guidArray.Length );
579
Guid guid = new Guid( guidArray );
580
logFileID = guid.ToString();
581
index += logFileIDSize;
583
maxLogRecords = BitConverter.ToUInt32( encodedRecord, index );
584
index += maxLogRecordsSize;
586
flags = BitConverter.ToUInt32( encodedRecord, index );
587
index += maxFlagsSize;
589
lastRecord = BitConverter.ToUInt64( encodedRecord, index );
590
index += lastRecordSize;
592
recordLocation = BitConverter.ToInt64( encodedRecord, index );
593
index += recordLocationSize;
597
#region Public Methods
599
/// Converts the object to a formatted byte array.
601
/// <returns>A formatted byte array containing the LogFileHeader data.</returns>
602
public byte[] ToByteArray()
605
byte[] result = new byte[ RecordSize ];
607
// Convert the object members to byte arrays.
608
Guid guid = new Guid( logFileID );
609
byte[] lfi = guid.ToByteArray();
610
byte[] mlr = BitConverter.GetBytes( maxLogRecords );
611
byte[] flg = BitConverter.GetBytes( flags );
612
byte[] lr = BitConverter.GetBytes( lastRecord );
613
byte[] rl = BitConverter.GetBytes( recordLocation );
615
// Copy the converted byte arrays to the resulting array.
616
Array.Copy( lfi, 0, result, index, lfi.Length );
619
Array.Copy( mlr, 0, result, index, mlr.Length );
622
Array.Copy( flg, 0, result, index, flg.Length );
625
Array.Copy( lr, 0, result, index, lr.Length );
628
Array.Copy( rl, 0, result, index, rl.Length );
637
/// Contains the layout of a ChangeLog record.
639
public class ChangeLogRecord
641
#region Class Members
643
/// Recordable change log operations.
645
public enum ChangeLogOp
648
/// The node exists but no log record has been created.
649
/// Do a brute force sync.
654
/// Node object was created.
659
/// Node object was deleted.
664
/// Node object was changed.
669
/// Node object was renamed.
675
/// Encoded lengths of the object fields.
677
private const int recordIDSize = 8;
678
private const int epochSize = 8;
679
private const int nodeIDSize = 16;
680
private const int operationSize = 4;
681
private const int flagSize = 2;
682
private const int masterRevSize = 8;
683
private const int slaveRevSize = 8;
684
private const int fileLengthSize = 8;
685
private const int typeSize = 4;
688
/// This is the total encoded record size.
690
private const int encodedRecordSize = recordIDSize +
701
/// Record identitifer for this entry.
703
private ulong recordID;
706
/// Date and time that event was recorded.
708
private DateTime epoch;
711
/// Identifier of Node object that triggered the event.
713
private string nodeID;
716
/// Node operation type.
718
private ChangeLogOp operation;
721
/// Flags passed to the event.
723
private ushort flags;
726
/// Master revision of node.
728
private ulong masterRev;
731
/// Local revision of node.
733
private ulong slaveRev;
736
/// Length of the file represented by the node if the node is a BaseFileTypeNode.
738
private long fileLength;
741
/// Base type of node.
743
private NodeTypes.NodeTypeEnum type;
748
/// Gets or sets the record epoch.
750
public DateTime Epoch
752
get { return epoch; }
753
set { epoch = value; }
757
/// Gets or sets the event ID.
759
public string EventID
761
get { return nodeID; }
762
set { nodeID = value; }
766
/// Gets the length of the record.
770
get { return RecordSize; }
774
/// Gets or set the event operation.
776
public ChangeLogOp Operation
778
get { return operation; }
779
set { operation = value; }
783
/// Gets or sets the record ID.
785
public ulong RecordID
787
get { return recordID; }
788
set { recordID = value; }
792
/// Gets or sets the flags.
796
get { return flags; }
797
set { flags = value; }
801
/// Gets or sets the master revision value.
803
public ulong MasterRev
805
get { return masterRev; }
806
set { masterRev = value; }
810
/// Gets or sets the slave revision value.
812
public ulong SlaveRev
814
get { return slaveRev; }
815
set { slaveRev = value; }
819
/// Gets or sets the file length value.
821
public long FileLength
823
get { return fileLength; }
824
set { fileLength = value; }
828
/// Gets or sets the base node type.
830
public NodeTypes.NodeTypeEnum Type
833
set { type = value; }
837
/// Returns the size of the ChangeLogRecord.
839
static public int RecordSize
841
get { return encodedRecordSize; }
847
/// Initializes a new instance of the struct.
849
/// <param name="operation">Node operation type.</param>
850
/// <param name="args">NodeEventArgs object that contains the change information.</param>
851
public ChangeLogRecord( ChangeLogOp operation, NodeEventArgs args )
854
this.epoch = args.TimeStamp;
855
this.nodeID = args.ID;
856
this.operation = operation;
857
this.flags = ( ushort )args.Flags;
858
this.masterRev = args.MasterRev;
859
this.slaveRev = args.SlaveRev;
860
this.fileLength = args.FileSize;
863
this.type = ( NodeTypes.NodeTypeEnum )Enum.Parse( typeof( NodeTypes.NodeTypeEnum ), args.Type );
867
this.type = NodeTypes.NodeTypeEnum.Node;
872
/// Initializes a new instance of the struct from an encoded byte array.
874
/// <param name="encodedRecord">ChangeLogRecord encoded record.</param>
875
public ChangeLogRecord( byte[] encodedRecord ) :
876
this( encodedRecord, 0 )
881
/// Initializes a new instance of the struct from an encoded byte array and index.
883
/// <param name="encodedRecord">ChangeLogRecord encoded record.</param>
884
/// <param name="index">Start index into the byte array.</param>
885
public ChangeLogRecord( byte[] encodedRecord, int index )
887
recordID = BitConverter.ToUInt64( encodedRecord, index );
888
index += recordIDSize;
890
epoch = new DateTime( BitConverter.ToInt64( encodedRecord, index ) );
893
byte[] guidArray = new byte[ 16 ];
894
Array.Copy( encodedRecord, index, guidArray, 0, guidArray.Length );
895
Guid guid = new Guid( guidArray );
896
nodeID = guid.ToString();
899
operation = ( ChangeLogOp )Enum.ToObject( typeof( ChangeLogOp ), BitConverter.ToInt32( encodedRecord, index ) );
900
index += operationSize;
902
flags = BitConverter.ToUInt16( encodedRecord, index );
905
masterRev = BitConverter.ToUInt64( encodedRecord, index );
906
index += masterRevSize;
908
slaveRev = BitConverter.ToUInt64( encodedRecord, index );
909
index += slaveRevSize;
911
fileLength = BitConverter.ToInt64( encodedRecord, index );
912
index += fileLengthSize;
916
type = ( NodeTypes.NodeTypeEnum )Enum.ToObject( typeof( NodeTypes.NodeTypeEnum ), BitConverter.ToInt32( encodedRecord, index ) );
920
type = NodeTypes.NodeTypeEnum.Node;
926
#region Public Methods
928
/// Converts the object to a formatted byte array.
930
/// <returns>A formatted byte array containing the ChangeLogRecord data.</returns>
931
public byte[] ToByteArray()
934
byte[] result = new byte[ RecordSize ];
936
// Convert the object members to byte arrays.
937
Guid guid = new Guid( nodeID );
938
byte[] nid = guid.ToByteArray();
939
byte[] rid = BitConverter.GetBytes( recordID );
940
byte[] ep = BitConverter.GetBytes( epoch.Ticks );
941
byte[] op = BitConverter.GetBytes( ( int )operation );
942
byte[] fl = BitConverter.GetBytes( flags );
943
byte[] mr = BitConverter.GetBytes( masterRev );
944
byte[] sr = BitConverter.GetBytes( slaveRev );
945
byte[] fil = BitConverter.GetBytes( fileLength );
946
byte[] tp = BitConverter.GetBytes( ( int )type );
948
// Copy the converted byte arrays to the resulting array.
949
Array.Copy( rid, 0, result, index, rid.Length );
952
Array.Copy( ep, 0, result, index, ep.Length );
955
Array.Copy( nid, 0, result, index, nid.Length );
958
Array.Copy( op, 0, result, index, op.Length );
961
Array.Copy( fl, 0, result, index, fl.Length );
964
Array.Copy( mr, 0, result, index, mr.Length );
967
Array.Copy( sr, 0, result, index, sr.Length );
970
Array.Copy( fil, 0, result, index, fil.Length );
973
Array.Copy( tp, 0, result, index, tp.Length );
982
/// Class that lets the ChangeLog operate as a thread service.
984
public class ChangeLog : IThreadService
986
#region Class Members
988
/// Used to log messages.
990
private static readonly ISimiasLog log = SimiasLogManager.GetLogger( typeof( ChangeLog ) );
993
/// Table used to keep track of ChangeLogWriter objects.
995
private static Hashtable logWriterTable = new Hashtable();
1000
/// Initializes a new instance of the object class.
1007
#region Public Methods
1009
/// Creates a change log writer for the specified collection.
1011
/// <param name="collectionID">The identifier for the collection.</param>
1012
public void CreateChangeLogWriter( string collectionID )
1014
lock ( logWriterTable )
1016
if ( !logWriterTable.ContainsKey( collectionID ) )
1018
// Allocate a ChangeLogWriter object for this collection and store it in the table.
1019
logWriterTable.Add( collectionID, new ChangeLogWriter( collectionID ) );
1020
log.Debug( "Added ChangeLogWriter for collection {0}", collectionID );
1026
/// Deletes a change log writer for the specified collection.
1028
/// <param name="collectionID">The identifier for the collection.</param>
1029
public void DeleteChangeLogWriter( string collectionID )
1031
lock ( logWriterTable )
1033
// Make sure the writer is in the table.
1034
if ( logWriterTable.ContainsKey( collectionID ) )
1036
// Remove the ChangeLogWriter object from the table and dispose it.
1037
ChangeLogWriter logWriter = logWriterTable[ collectionID ] as ChangeLogWriter;
1038
logWriterTable.Remove( collectionID );
1040
// Get the path to the file before disposing it.
1041
string logPath = logWriter.LogFile;
1042
logWriter.Dispose();
1044
try { File.Delete( logPath ); }
1047
log.Debug( "Deleted ChangeLogWriter for collection {0}", collectionID );
1053
#region IThreadService Members
1055
/// Starts the thread service.
1059
// Get a store object.
1060
Store store = Store.GetStore();
1062
// Get all of the collection objects and set up listeners for them.
1063
foreach (ShallowNode sn in store)
1065
CreateChangeLogWriter( sn.ID );
1068
log.Info( "Change Log Service started." );
1072
/// Resumes a paused service.
1074
public void Resume()
1079
/// Pauses a service's execution.
1088
/// <param name="message"></param>
1089
/// <param name="data"></param>
1090
public int Custom(int message, string data)
1096
/// Stops the service from executing.
1100
// Remove all of the log writers from the table and dispose them.
1101
lock ( logWriterTable )
1103
foreach ( ChangeLogWriter logWriter in logWriterTable.Values )
1105
logWriter.Dispose();
1108
// Clear the hashtable.
1109
logWriterTable.Clear();
1112
log.Info( "Change Log Service stopped." );
1118
/// Object that records all changes to the Collection Store by listening to the Store events.
1120
internal class ChangeLogWriter : IDisposable
1122
#region Class Members
1124
/// Default maximum number of records to persist.
1126
private const uint defaultMaxPersistedRecords = 25000;
1129
/// Used to log messages.
1131
private static readonly ISimiasLog log = SimiasLogManager.GetLogger( typeof( ChangeLogWriter ) );
1134
/// Collection that the log file belongs to.
1136
private string collectionID;
1139
/// Inprocess mutex used to control access to the log file.
1141
private LogMutex mutex;
1144
/// Specifies whether object is viable.
1146
private bool disposed = false;
1149
/// Subscribes to Collection Store node events.
1151
private EventSubscriber subscriber;
1154
/// Contains absolute path to the logfile.
1156
private string logFilePath;
1159
/// Contains the next record ID to assign to a ChangeLogRecord.
1161
private ulong recordID = 0;
1164
/// Contains the offset in the file to write.
1166
private long writePosition = LogFileHeader.RecordSize;
1169
/// Contains the last write position boundary.
1171
private long maxWritePosition = ( defaultMaxPersistedRecords * ChangeLogRecord.RecordSize ) + LogFileHeader.RecordSize;
1174
/// Queue used to process change events so the event thread does not have to block.
1176
private Queue eventQueue = new Queue();
1179
/// Flag that indicates if a thread is processing work on the queue.
1181
private bool threadScheduled = false;
1184
/// Contains the header to the log file.
1186
private LogFileHeader logHeader;
1191
/// Gets the file path for this change log.
1193
public string LogFile
1195
get { return logFilePath; }
1201
/// Initializes a new instance of the object class.
1203
/// <param name="collectionID">Collection identifier to listen for events.</param>
1204
public ChangeLogWriter( string collectionID )
1206
this.collectionID = collectionID;
1208
// Create the mutex that will protect this logfile.
1209
mutex = new LogMutex( collectionID, true );
1212
// Get the path to the store managed directory for this collection.
1213
string logFileDir = Path.Combine( Store.StorePath, "changelog" );
1214
if ( !Directory.Exists( logFileDir ) )
1216
Directory.CreateDirectory( logFileDir );
1219
// Build the log file path.
1220
logFilePath = Path.Combine( logFileDir, collectionID + ".changelog" );
1222
// Check to see if the file exists.
1223
if ( !File.Exists( logFilePath ) )
1226
FileStream fs = new FileStream( logFilePath, FileMode.CreateNew, FileAccess.ReadWrite );
1229
// Create the log file header.
1230
logHeader = CreateLogFileHeader( fs, collectionID );
1239
// Open the existing log file.
1240
FileStream fs = new FileStream( logFilePath, FileMode.Open, FileAccess.ReadWrite );
1243
// Get the log header.
1244
logHeader = GetLogFileHeader( fs );
1246
// Check to see if the log file was shutdown gracefully.
1247
if ( CheckIntegrity( fs, logHeader, collectionID ) )
1249
// Setup the current write position.
1250
SetCurrentWritePosition( fs );
1259
// Setup the event listeners.
1260
subscriber = new EventSubscriber(collectionID );
1261
subscriber.NodeChanged += new NodeEventHandler( OnNodeChange );
1262
subscriber.NodeCreated += new NodeEventHandler( OnNodeCreate );
1263
subscriber.NodeDeleted += new NodeEventHandler( OnNodeDelete );
1267
mutex.ReleaseMutex( collectionID );
1272
#region Private Methods
1274
/// Checks to see if the file header is valid. If it was not, the file contents are
1275
/// truncated and reinitialized.
1277
/// <param name="fs">File stream that reference the log file.</param>
1278
/// <param name="header">The log file header.</param>
1279
/// <param name="collectionID">ID of the collection being monitored.</param>
1280
/// <returns>True if the file data is good, otherwise false.</returns>
1281
private bool CheckIntegrity( FileStream fs, LogFileHeader header, string collectionID )
1285
if ( header.LogFileID != collectionID )
1287
log.Error( "Log file corrupted. Reinitializing contents." );
1289
// Truncate the file data.
1291
logHeader = CreateLogFileHeader( fs, collectionID );
1299
/// Creates a default log file header and writes it to the log file.
1301
/// <param name="fs">File stream that reference the log file.</param>
1302
/// <param name="collectionID">ID of collection being monitored.</param>
1303
/// <returns>A LogFileHeader object.</returns>
1304
private LogFileHeader CreateLogFileHeader( FileStream fs, string collectionID )
1306
// Build the new log file header.
1307
LogFileHeader header = new LogFileHeader( collectionID, defaultMaxPersistedRecords );
1312
fs.Write( header.ToByteArray(), 0, header.Length );
1314
catch ( IOException e )
1316
log.Error( "Failed to write log header. " + e.Message );
1324
/// Looks for the next write position in the log file using the hint saved the last time the file
1325
/// was written to. If the hint is invalid, then a brute force method is used.
1327
/// <param name="fs">FileStream object that references the log file.</param>
1328
private bool FindCurrentWritePosition( FileStream fs )
1330
bool foundPosition = false;
1331
bool wrapped = false;
1335
// See if a last position was saved.
1336
if ( logHeader.RecordLocation >= ( long )LogFileHeader.RecordSize )
1338
// Make sure that the saved record position in on a ChangeLogRecord boundary.
1339
if ( ( ( logHeader.RecordLocation - ( long )LogFileHeader.RecordSize ) % ( long )ChangeLogRecord.RecordSize ) == 0 )
1341
// Using the hint in the log file header, see if the write position still exists in the file.
1342
if ( logHeader.RecordLocation == ( long )LogFileHeader.RecordSize )
1344
// There are no records in the log or the log has wrapped back to the beginning.
1345
if ( fs.Length >= ( LogFileHeader.RecordSize + ChangeLogRecord.RecordSize ) )
1347
// The file has wrapped, read the entry from the end of the file.
1348
fs.Position = fs.Length - ChangeLogRecord.RecordSize;
1353
// There are no entries in the file. No need to do a read.
1354
writePosition = logHeader.RecordLocation;
1355
recordID = logHeader.LastRecord;
1356
foundPosition = true;
1361
// There is at least one ChangeLogRecord and the file hasn't wrapped.
1362
fs.Position = logHeader.RecordLocation - ChangeLogRecord.RecordSize;
1365
// No need to read if the offset was already found.
1366
if ( foundPosition == false )
1368
// Read the last valid change log record that was written.
1369
byte[] buffer = new byte[ ChangeLogRecord.RecordSize ];
1370
int bytesRead = fs.Read( buffer, 0, buffer.Length );
1371
if ( bytesRead > 0 )
1373
ChangeLogRecord record = new ChangeLogRecord( buffer );
1374
ulong lastRecordID = record.RecordID;
1376
if ( record.RecordID == ( logHeader.LastRecord - 1 ) )
1378
// Position the next for the next read if the event log has rolled over.
1381
fs.Position = LogFileHeader.RecordSize;
1384
// If there is a next record, then the file has rolled over and we need
1385
// to check the record ID to make sure it is less that the value that we
1386
// saved. If there is not an other record, then the hint is valid.
1387
bytesRead = fs.Read( buffer, 0, buffer.Length );
1388
if ( bytesRead > 0 )
1390
record = new ChangeLogRecord( buffer );
1391
if ( lastRecordID > record.RecordID )
1393
// This is the rollover point. The saved location is valid.
1394
writePosition = logHeader.RecordLocation;
1395
recordID = logHeader.LastRecord;
1396
foundPosition = true;
1401
// There are no more records and the file hasn't rolled over. The
1402
// saved location is valid.
1403
writePosition = logHeader.RecordLocation;
1404
recordID = logHeader.LastRecord;
1405
foundPosition = true;
1413
catch ( IOException e )
1415
log.Error( "FindCurrentWritePosition():" + e.Message );
1418
return foundPosition;
1422
/// Gets the log file header for the specified stream.
1424
/// <param name="fs">File stream containing the header.</param>
1425
/// <returns>A LogFileHeader object from the specified file stream if successful. Otherwise
1426
/// a null is returned.</returns>
1427
private LogFileHeader GetLogFileHeader( FileStream fs )
1429
LogFileHeader logFileHeader = null;
1433
// Position the file pointer to the beginning of the file.
1437
byte[] buffer = new byte[ LogFileHeader.RecordSize ];
1438
int bytesRead = fs.Read( buffer, 0, buffer.Length );
1439
if ( bytesRead == buffer.Length )
1441
logFileHeader = new LogFileHeader( buffer );
1444
catch ( IOException e )
1446
log.Error( "Failed to read event log header. {0}", e.Message );
1450
return logFileHeader;
1454
/// Delegate that is called when a Node object has been changed.
1456
/// <param name="args">Event arguments.</param>
1457
private void OnNodeChange( NodeEventArgs args )
1459
// Don't indicate local events.
1460
if (((NodeEventArgs.EventFlags)args.Flags & NodeEventArgs.EventFlags.LocalOnly) == 0)
1462
// Queue the event and schedule to come back later to process.
1465
// Add the event to the queue.
1466
eventQueue.Enqueue( new ChangeLogEvent( ChangeLogEvent.ChangeEventType.NodeChange, args ) );
1468
// See if a thread has already been scheduled to take care of this event.
1469
if ( threadScheduled == false )
1471
ThreadPool.QueueUserWorkItem( new WaitCallback( ProcessChangeLogEvent ) );
1472
threadScheduled = true;
1479
/// Delegate that is called when a Node object has been created.
1481
/// <param name="args">Event arguments.</param>
1482
private void OnNodeCreate( NodeEventArgs args )
1484
// Don't indicate local events.
1485
if (((NodeEventArgs.EventFlags)args.Flags & NodeEventArgs.EventFlags.LocalOnly) == 0)
1487
// Queue the event and schedule to come back later to process.
1490
// Add the event to the queue.
1491
eventQueue.Enqueue( new ChangeLogEvent( ChangeLogEvent.ChangeEventType.NodeCreate, args ) );
1493
// See if a thread has already been scheduled to take care of this event.
1494
if ( threadScheduled == false )
1496
ThreadPool.QueueUserWorkItem( new WaitCallback( ProcessChangeLogEvent ) );
1497
threadScheduled = true;
1504
/// Delegate that is called when a Node object has been deleted.
1506
/// <param name="args">Event arguments.</param>
1507
private void OnNodeDelete( NodeEventArgs args )
1509
// Don't indicate local events.
1510
if (((NodeEventArgs.EventFlags)args.Flags & NodeEventArgs.EventFlags.LocalOnly) == 0)
1512
// Queue the event and schedule to come back later to process.
1515
// Add the event to the queue.
1516
eventQueue.Enqueue( new ChangeLogEvent( ChangeLogEvent.ChangeEventType.NodeDelete, args ) );
1518
// See if a thread has already been scheduled to take care of this event.
1519
if ( threadScheduled == false )
1521
ThreadPool.QueueUserWorkItem( new WaitCallback( ProcessChangeLogEvent ) );
1522
threadScheduled = true;
1529
/// Processes node created, changed and deleted events.
1531
/// <param name="state">Not used.</param>
1532
private void ProcessChangeLogEvent( object state )
1536
ChangeLogEvent work = null;
1538
// Lock the queue before accessing it to get the work to do.
1541
if ( eventQueue.Count > 0 )
1543
work = eventQueue.Dequeue() as ChangeLogEvent;
1547
threadScheduled = false;
1552
switch ( work.Type )
1554
case ChangeLogEvent.ChangeEventType.NodeChange:
1556
ChangeLogRecord record = new ChangeLogRecord( ChangeLogRecord.ChangeLogOp.Changed, work.Args );
1561
case ChangeLogEvent.ChangeEventType.NodeCreate:
1563
ChangeLogRecord record = new ChangeLogRecord( ChangeLogRecord.ChangeLogOp.Created, work.Args );
1568
case ChangeLogEvent.ChangeEventType.NodeDelete:
1570
ChangeLogRecord record = new ChangeLogRecord( ChangeLogRecord.ChangeLogOp.Deleted, work.Args );
1579
/// Sets the next write position in the log file.
1581
/// <param name="fs">FileStream object that references the log file.</param>
1582
private void SetCurrentWritePosition( FileStream fs )
1584
// See if the hint is valid so we don't have to brute-force the lookup.
1585
if ( !FindCurrentWritePosition( fs ) )
1589
// Allocate a buffer to hold the records that are read.
1590
byte[] buffer = new byte[ ChangeLogRecord.RecordSize * 1000 ];
1592
// Skip over the file header.
1593
fs.Position = LogFileHeader.RecordSize;
1595
// Read the first record.
1596
int bytesRead = fs.Read( buffer, 0, ChangeLogRecord.RecordSize );
1597
if ( bytesRead > 0 )
1599
// Instanitate the first record to compare.
1600
ChangeLogRecord record1 = new ChangeLogRecord( buffer );
1601
ChangeLogRecord record2 = null;
1603
// Read the next bunch of records.
1604
bytesRead = fs.Read( buffer, 0, buffer.Length );
1605
while ( bytesRead > 0 )
1608
while ( ( index + ChangeLogRecord.RecordSize ) <= bytesRead )
1610
// Instantiate the next record so the id's can be compared.
1611
record2 = new ChangeLogRecord( buffer, index );
1613
// See if the record id has rolled over.
1614
if ( record1.RecordID > record2.RecordID )
1616
// Found the roll over point. Calculate the next write position.
1617
writePosition = ( fs.Position - bytesRead ) + index;
1618
recordID = record1.RecordID + 1;
1624
// Record id's are still increasing.
1625
index += ChangeLogRecord.RecordSize;
1630
// If we haven't found the roll over point, keep reading.
1631
if ( bytesRead > 0 )
1633
// Read the next buffer full.
1634
bytesRead = fs.Read( buffer, 0, buffer.Length );
1638
// There is either only one record in the file or the end of the file has been reached without
1639
// detecting the rollover point.
1640
if ( ( record2 == null ) || ( record1 == record2 ) )
1642
// Next write position is the current position if it isn't at the size limit.
1643
writePosition = ( fs.Position >= maxWritePosition ) ? LogFileHeader.RecordSize : fs.Position;
1644
recordID = record1.RecordID + 1;
1648
catch ( IOException e )
1650
log.Error( e.Message );
1656
/// Saves the change log header to the file.
1658
private void SetLogFileHeader()
1660
// Acquire the mutex protecting the log file.
1661
mutex.WaitOne( collectionID );
1666
// Open the log file.
1667
FileStream fs = new FileStream( logFilePath, FileMode.Open, FileAccess.ReadWrite );
1670
// Initialize the log file header.
1671
logHeader.LastRecord = recordID;
1672
logHeader.RecordLocation = writePosition;
1674
// Position the file pointer to the right position within the file.
1676
fs.Write( logHeader.ToByteArray(), 0, LogFileHeader.RecordSize );
1683
catch( IOException e )
1685
log.Error( "Cannot save log file header - {0}", e.Message );
1690
mutex.ReleaseMutex( collectionID );
1695
/// Writes the specified ChangeLogRecord to the ChangeLog file.
1697
/// <param name="record">ChangeLogRecord to write to file.</param>
1698
private void WriteLog( ChangeLogRecord record )
1700
// Acquire the mutex protecting the log file.
1701
mutex.WaitOne( collectionID );
1706
// Open the log file.
1707
FileStream fs = new FileStream( logFilePath, FileMode.Open, FileAccess.ReadWrite );
1710
// Add the next ID to the record.
1711
record.RecordID = recordID;
1713
// Position the file pointer to the right position within the file.
1714
fs.Position = writePosition;
1715
fs.Write( record.ToByteArray(), 0, record.Length );
1717
// Update the members for the next write operation.
1719
writePosition += record.Length;
1720
if ( writePosition >= maxWritePosition )
1722
writePosition = LogFileHeader.RecordSize;
1730
catch( IOException e )
1732
log.Error( "Lost event - Epoch: {0}, ID: {1}, Operation: {2}. Exception {3}", record.Epoch, record.EventID, record.Operation, e.Message );
1737
mutex.ReleaseMutex( collectionID );
1742
#region IDisposable Members
1744
/// Allows for quick release of managed and unmanaged resources.
1745
/// Called by applications.
1747
public void Dispose()
1750
GC.SuppressFinalize( this );
1752
// Save the log file header after disposing the object
1753
// because we don't want to take anymore events after
1754
// the log file header is saved.
1759
/// Dispose( bool disposing ) executes in two distinct scenarios.
1760
/// If disposing equals true, the method has been called directly
1761
/// or indirectly by a user's code. Managed and unmanaged resources
1762
/// can be disposed.
1763
/// If disposing equals false, the method has been called by the
1764
/// runtime from inside the finalizer and you should not reference
1765
/// other objects. Only unmanaged resources can be disposed.
1767
/// <param name="disposing">Specifies whether called from the finalizer or from the application.</param>
1768
private void Dispose( bool disposing )
1770
// Check to see if Dispose has already been called.
1773
// Protect callers from accessing the freed members.
1776
// If disposing equals true, dispose all managed and unmanaged resources.
1779
// Dispose managed resources.
1780
subscriber.Dispose();
1786
/// Use C# destructor syntax for finalization code.
1787
/// This destructor will run only if the Dispose method does not get called.
1788
/// It gives your base class the opportunity to finalize.
1789
/// Do not provide destructors in types derived from this class.
1799
/// Object that retrieves specified Collection Store changes from the ChangeLog file.
1801
public class ChangeLogReader
1803
#region Class Members
1805
/// Used to log messages.
1807
private static readonly ISimiasLog log = SimiasLogManager.GetLogger( typeof( ChangeLogReader ) );
1810
/// Collection that the log file belongs to.
1812
private string collectionID;
1815
/// Inprocess mutex used to control access to the log file.
1817
private LogMutex mutex;
1820
/// Contains absolute path to the logfile.
1822
private string logFilePath;
1825
#region Constructors
1827
/// Initializes a new instance of the object class.
1829
/// <param name="collection">Collection object to listen for events on.</param>
1830
public ChangeLogReader( Collection collection )
1832
collectionID = collection.ID;
1834
// Create the mutex that will protect this logfile.
1835
mutex = new LogMutex( collectionID );
1837
// Build the log file path.
1838
logFilePath = Path.Combine( Path.Combine( Store.StorePath, "changelog" ), collection.ID + ".changelog" );
1842
#region Private Methods
1844
/// Gets the offset of the next event to be read.
1846
/// NOTE: The entire file must be locked before making this call.
1848
/// <param name="fs">FileStream object associated with the event log file.</param>
1849
/// <param name="cookie">Event context indicating the next event to be read.</param>
1850
/// <returns>True if the next offset was found, otherwise false is returned.</returns>
1851
private bool GetReadPosition( FileStream fs, EventContext cookie )
1853
bool foundOffset = false;
1855
byte[] buffer = new byte[ ChangeLogRecord.RecordSize ];
1857
// Make sure that there is a valid cookie.
1858
if ( cookie != null )
1862
// Using the hint in the cookie, see if the read position still exists in the file.
1863
fs.Position = cookie.Hint;
1864
bytesRead = fs.Read( buffer, 0, buffer.Length );
1865
if ( bytesRead > 0 )
1867
ChangeLogRecord record = new ChangeLogRecord( buffer );
1868
if ( ( record.RecordID == cookie.RecordID ) && ( ( record.Epoch == cookie.TimeStamp ) || ( cookie.TimeStamp == DateTime.MinValue ) ) )
1870
// Found the record that we were looking for. If the cookie indicates the no data has
1871
// ever been read, then position the file pointer back to the first record so it doesn't
1872
// get skipped. Otherwise, if the record and cookie match exactly, the file pointer is
1873
// already at the right position to read the next record.
1874
if ( cookie.TimeStamp == DateTime.MinValue )
1876
// We have yet to read a record, start at the beginning.
1877
fs.Position = LogFileHeader.RecordSize;
1883
else if ( ( bytesRead == 0 ) && ( cookie.RecordID == 0 ) && ( cookie.TimeStamp == DateTime.MinValue ) )
1885
fs.Position = LogFileHeader.RecordSize;
1889
catch ( Exception e )
1891
log.Error( "GetReadPosition():" + e.Message );
1899
#region Public Methods
1901
/// Gets an EventContext object that contains the latest event information.
1903
/// <returns>An EventContext object that is up-to-date with the latest information in the event log.</returns>
1904
public EventContext GetEventContext()
1906
EventContext cookie = null;
1908
// Acquire the mutex protecting the log file.
1909
mutex.WaitOne( collectionID );
1914
// Open the log file.
1915
FileStream fs = new FileStream( logFilePath, FileMode.Open, FileAccess.Read );
1918
// Allocate a buffer to hold the records that are read.
1919
byte[] buffer = new byte[ ChangeLogRecord.RecordSize * 1000 ];
1921
// Skip over the file header.
1922
fs.Position = LogFileHeader.RecordSize;
1924
// Read the first record.
1925
int bytesRead = fs.Read( buffer, 0, ChangeLogRecord.RecordSize );
1926
if ( bytesRead > 0 )
1928
// Instanitate the first record to compare.
1929
ChangeLogRecord record1 = new ChangeLogRecord( buffer );
1930
ChangeLogRecord record2 = null;
1932
// Read the next bunch of records.
1933
bytesRead = fs.Read( buffer, 0, buffer.Length );
1934
while ( bytesRead > 0 )
1937
while ( ( index + ChangeLogRecord.RecordSize ) <= bytesRead )
1939
// Instantiate the next record so the id's can be compared.
1940
record2 = new ChangeLogRecord( buffer, index );
1942
// See if the record id has rolled over.
1943
if ( record1.RecordID > record2.RecordID )
1945
// Found the roll over point. Calculate the hint position and create
1947
long hint = ( fs.Position - bytesRead ) + ( index - ChangeLogRecord.RecordSize );
1948
cookie = new EventContext( record1.Epoch, record1.RecordID, hint );
1954
// Record id's are still increasing.
1955
index += ChangeLogRecord.RecordSize;
1960
// If we haven't found the roll over point, keep reading.
1961
if ( bytesRead > 0 )
1963
// Read the next buffer full.
1964
bytesRead = fs.Read( buffer, 0, buffer.Length );
1968
// There is either only one record in the file or the end of the file has been reached
1969
// without detecting a rollover point.
1970
if ( ( record2 == null ) || ( record1 == record2 ) )
1972
cookie = new EventContext( record1.Epoch, record1.RecordID, fs.Position - ChangeLogRecord.RecordSize );
1977
// There are no records in the file yet. Return to use the first record.
1978
cookie = new EventContext( DateTime.MinValue, 0, fs.Position );
1986
catch( IOException e )
1988
log.Error( e.Message );
1993
mutex.ReleaseMutex( collectionID );
2000
/// Gets the events that have been recorded in the ChangeLog from the specified event context.
2002
/// <param name="cookie">Event context received from call to GetEventContext method.</param>
2003
/// <param name="changeList">Receives a list of ChangeLogRecords that are the changes to the Collection Store.</param>
2004
/// <returns>True if there is more data to get. Otherwise false is returned.</returns>
2005
public bool GetEvents( EventContext cookie, out ArrayList changeList )
2007
long lastRecordOffset;
2009
// Initialize the out parameter.
2010
changeList = new ArrayList();
2011
byte[] buffer = new byte[ ChangeLogRecord.RecordSize * 100 ];
2014
// Acquire the mutex protecting the log file.
2015
mutex.WaitOne( collectionID );
2020
// Open the ChangeLog file.
2021
FileStream fs = new FileStream( logFilePath, FileMode.Open, FileAccess.Read );
2024
// Calculate where changes need to start being added based on the EventContext.
2025
if ( !GetReadPosition( fs, cookie ) )
2027
// The cookie has been pushed out of the log.
2028
throw new CookieExpiredException();
2032
bytesRead = fs.Read( buffer, 0, buffer.Length );
2034
// Calculate the offset to the last record read.
2035
lastRecordOffset = fs.Position - ChangeLogRecord.RecordSize;
2042
catch( Exception e )
2044
throw new CookieExpiredException( e );
2049
mutex.ReleaseMutex( collectionID );
2052
// Make sure that something was read.
2053
for ( int i = 0; i < bytesRead; i += ChangeLogRecord.RecordSize )
2055
changeList.Add( new ChangeLogRecord( buffer, i ) );
2058
// If there were events to pass back, update the cookie.
2059
if ( changeList.Count > 0 )
2061
ChangeLogRecord record = ( ChangeLogRecord )changeList[ changeList.Count - 1 ];
2062
cookie.TimeStamp = record.Epoch;
2063
cookie.RecordID = record.RecordID;
2064
cookie.Hint = lastRecordOffset;
2067
// If less data was read that we had buffer for, we have all of the data.
2068
return ( bytesRead == buffer.Length ) ? true : false;