~ifolder-dev/simias/trunk-packaging

« back to all changes in this revision

Viewing changes to src/core/ChangeLog/.svn/text-base/ChangeLog.cs.svn-base

  • Committer: Jorge O. Castro
  • Date: 2007-12-03 06:56:46 UTC
  • Revision ID: jorge@ubuntu.com-20071203065646-mupcnjcwgm5mnhyt
* Remove a bunch of .svn directories we no longer need.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/****************************************************************************
2
 
 |
3
 
 | Copyright (c) 2007 Novell, Inc.
4
 
 | All Rights Reserved.
5
 
 |
6
 
 | This program is free software; you can redistribute it and/or
7
 
 | modify it under the terms of version 2 of the GNU General Public License as
8
 
 | published by the Free Software Foundation.
9
 
 |
10
 
 | This program is distributed in the hope that it will be useful,
11
 
 | but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 | GNU General Public License for more details.
14
 
 |
15
 
 | You should have received a copy of the GNU General Public License
16
 
 | along with this program; if not, contact Novell, Inc.
17
 
 |
18
 
 | To contact Novell about this file by physical or electronic mail,
19
 
 | you may find current contact information at www.novell.com 
20
 
 |
21
 
 |  Author: Mike Lasky <mlasky@novell.com>
22
 
 |***************************************************************************/
23
 
 
24
 
 
25
 
using System;
26
 
using System.Collections;
27
 
using System.Diagnostics;
28
 
using System.IO;
29
 
using System.Runtime.Serialization;
30
 
using System.Runtime.Serialization.Formatters.Binary;
31
 
using System.Text;
32
 
using System.Threading;
33
 
 
34
 
using Simias;
35
 
using Simias.Client;
36
 
using Simias.Client.Event;
37
 
using Simias.Event;
38
 
using Simias.Service;
39
 
using Simias.Storage;
40
 
 
41
 
namespace Simias.Storage
42
 
{
43
 
        /// <summary>
44
 
        /// Class used to implement an inprocess mutex that protects the log file from reentrancy.
45
 
        /// </summary>
46
 
        internal class LogMutex
47
 
        {
48
 
                #region Class Members
49
 
                /// <summary>
50
 
                /// Table used to keep track of per log file mutexes.
51
 
                /// </summary>
52
 
                static private Hashtable mutexTable = new Hashtable();
53
 
                #endregion
54
 
 
55
 
                #region Constructor
56
 
                /// <summary>
57
 
                /// Initializes a new instance of the Mutex class with default properties.
58
 
                /// </summary>
59
 
                /// <param name="collectionID">Identifier of the collection associated 
60
 
                /// with the log file.</param>
61
 
                public LogMutex( string collectionID )
62
 
                {
63
 
                        lock( typeof( LogMutex ) )
64
 
                        {
65
 
                                // See if a mutex already exists for this collection's logfile.
66
 
                                if ( !mutexTable.ContainsKey( collectionID ) )
67
 
                                {
68
 
                                        mutexTable.Add( collectionID, new Mutex() );
69
 
                                }
70
 
                        }
71
 
                }
72
 
 
73
 
                /// <summary>
74
 
                /// Initializes a new instance of the Mutex class with a Boolean value 
75
 
                /// indicating whether the calling thread should have initial ownership 
76
 
                /// of the mutex.
77
 
                /// </summary>
78
 
                /// <param name="collectionID">Identifier of the collection associated 
79
 
                /// with the log file.</param>
80
 
                /// <param name="initiallyOwned">true to give the calling thread initial 
81
 
                /// ownership of the mutex; otherwise, false.</param>
82
 
                public LogMutex( string collectionID, bool initiallyOwned )
83
 
                {
84
 
                        bool created = false;
85
 
                        lock( typeof( LogMutex ) )
86
 
                        {
87
 
                                if ( !mutexTable.ContainsKey( collectionID ) )
88
 
                                {
89
 
                                        mutexTable.Add( collectionID, new Mutex( initiallyOwned ) );
90
 
                                        created = true;
91
 
                                }
92
 
                        }
93
 
 
94
 
                        // If the mutex already existed and the caller specified to acquire
95
 
                        // the mutex before returning, get it now.
96
 
                        if ( !created && initiallyOwned )
97
 
                        {
98
 
                                WaitOne( collectionID );
99
 
                        }
100
 
                }
101
 
                #endregion
102
 
 
103
 
                #region Public Methods
104
 
                /// <summary>
105
 
                /// Releases all resources held by the current WaitHandle.
106
 
                /// </summary>
107
 
                /// <param name="collectionID">Identifier of the collection associated 
108
 
                /// with the log file.</param>
109
 
                public void Close( string collectionID )
110
 
                {
111
 
                        lock( typeof( LogMutex ) )
112
 
                        {
113
 
                                Mutex mutex = mutexTable[ collectionID ] as Mutex;
114
 
                                if ( mutex != null )
115
 
                                {
116
 
                                        mutex.Close();
117
 
                                        mutexTable.Remove( collectionID );
118
 
                                }
119
 
                        }
120
 
                }
121
 
 
122
 
                /// <summary>
123
 
                /// Releases the mutex once.
124
 
                /// </summary>
125
 
                /// <param name="collectionID">Identifier of the collection associated 
126
 
                /// with the log file.</param>
127
 
                public void ReleaseMutex( string collectionID )
128
 
                {
129
 
                        Mutex mutex = null;
130
 
 
131
 
                        lock ( typeof( LogMutex ) )
132
 
                        {
133
 
                                mutex = mutexTable[ collectionID ] as Mutex;
134
 
                        }
135
 
 
136
 
                        if ( mutex == null )
137
 
                        {
138
 
                                throw new SimiasException( "Log mutex does not exist for collection " + collectionID );
139
 
                        }
140
 
 
141
 
                        mutex.ReleaseMutex();
142
 
                }
143
 
 
144
 
                /// <summary>
145
 
                /// Blocks the current thread until the current WaitHandle receives a signal.
146
 
                /// </summary>
147
 
                /// <param name="collectionID">Identifier of the collection associated 
148
 
                /// with the log file.</param>
149
 
                /// <returns>true if the current instance receives a signal. If the current 
150
 
                /// instance is never signaled, WaitOne never returns.</returns>
151
 
                public bool WaitOne( string collectionID )
152
 
                {
153
 
                        Mutex mutex = null;
154
 
 
155
 
                        lock( typeof( LogMutex ) )
156
 
                        {
157
 
                                mutex = mutexTable[ collectionID ] as Mutex;
158
 
                        }
159
 
 
160
 
                        if ( mutex == null )
161
 
                        {
162
 
                                throw new SimiasException( "Log mutex does not exist for collection " + collectionID );
163
 
                        }
164
 
 
165
 
                        return mutex.WaitOne();
166
 
                }
167
 
 
168
 
                /// <summary>
169
 
                /// Blocks the current thread until the current WaitHandle receives a signal, 
170
 
                /// using 32-bit signed integer to measure the time interval and specifying 
171
 
                /// whether to exit the synchronization domain before the wait.
172
 
                /// </summary>
173
 
                /// <param name="collectionID">Identifier of the collection associated 
174
 
                /// with the log file.</param>
175
 
                /// <param name="millisecondsTimeout">The number of milliseconds to wait, or 
176
 
                /// Timeout.Infinite (-1) to wait indefinitely. </param>
177
 
                /// <param name="exitContext">true to exit the synchronization domain for the 
178
 
                /// context before the wait (if in a synchronized context), and reacquire it; otherwise, false.</param>
179
 
                /// <returns>true if the current instance receives a signal; otherwise, false.</returns>
180
 
                public bool WaitOne( string collectionID, int millisecondsTimeout, bool exitContext )
181
 
                {
182
 
                        Mutex mutex = null;
183
 
 
184
 
                        lock( typeof( LogMutex ) )
185
 
                        {
186
 
                                mutex = mutexTable[ collectionID ] as Mutex;
187
 
                        }
188
 
 
189
 
                        if ( mutex == null )
190
 
                        {
191
 
                                throw new SimiasException( "Log mutex does not exist for collection " + collectionID );
192
 
                        }
193
 
 
194
 
                        return mutex.WaitOne( millisecondsTimeout, exitContext );
195
 
                }
196
 
 
197
 
                /// <summary>
198
 
                /// Blocks the current thread until the current instance receives a signal, 
199
 
                /// using a TimeSpan to measure the time interval and specifying whether to 
200
 
                /// exit the synchronization domain before the wait.
201
 
                /// </summary>
202
 
                /// <param name="collectionID">Identifier of the collection associated 
203
 
                /// with the log file.</param>
204
 
                /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan 
205
 
                /// that represents -1 milliseconds to wait indefinitely.</param>
206
 
                /// <param name="exitContext">true to exit the synchronization domain for the 
207
 
                /// context before the wait (if in a synchronized context), and reacquire it; 
208
 
                /// otherwise, false.</param>
209
 
                /// <returns>true if the current instance receives a signal; otherwise, false.</returns>
210
 
                public bool WaitOne( string collectionID, TimeSpan timeout, bool exitContext )
211
 
                {
212
 
                        Mutex mutex = null;
213
 
 
214
 
                        lock( typeof( LogMutex ) )
215
 
                        {
216
 
                                mutex = mutexTable[ collectionID ] as Mutex;
217
 
                        }
218
 
 
219
 
                        if ( mutex == null )
220
 
                        {
221
 
                                throw new SimiasException( "Log mutex does not exist for collection " + collectionID );
222
 
                        }
223
 
 
224
 
                        return mutex.WaitOne( timeout, exitContext );
225
 
                }
226
 
                #endregion
227
 
        }
228
 
 
229
 
        /// <summary>
230
 
        /// Class used to queue change log events.
231
 
        /// </summary>
232
 
        internal class ChangeLogEvent
233
 
        {
234
 
                #region Class Members
235
 
                /// <summary>
236
 
                /// Type of change events that are watched for.
237
 
                /// </summary>
238
 
                public enum ChangeEventType
239
 
                {
240
 
                        /// <summary>
241
 
                        /// Collection was created. Create a ChangeLogWriter.
242
 
                        /// </summary>
243
 
                        CollectionCreate,
244
 
 
245
 
                        /// <summary>
246
 
                        /// Collection was deleted. Delete the ChangeLogWriter.
247
 
                        /// </summary>
248
 
                        CollectionDelete,
249
 
 
250
 
                        /// <summary>
251
 
                        /// Node was created in a collection.
252
 
                        /// </summary>
253
 
                        NodeCreate,
254
 
 
255
 
                        /// <summary>
256
 
                        /// Node was changed in a collection.
257
 
                        /// </summary>
258
 
                        NodeChange,
259
 
 
260
 
                        /// <summary>
261
 
                        /// Node was deleted in a collection.
262
 
                        /// </summary>
263
 
                        NodeDelete
264
 
                }
265
 
 
266
 
                /// <summary>
267
 
                /// Type of event.
268
 
                /// </summary>
269
 
                private ChangeEventType type;
270
 
 
271
 
                /// <summary>
272
 
                /// Context for the event.
273
 
                /// </summary>
274
 
                private NodeEventArgs args;
275
 
                #endregion
276
 
 
277
 
                #region Properties
278
 
                /// <summary>
279
 
                /// Gets the change event type.
280
 
                /// </summary>
281
 
                public ChangeEventType Type
282
 
                {
283
 
                        get { return type; }
284
 
                }
285
 
 
286
 
                /// <summary>
287
 
                /// Gets the event context.
288
 
                /// </summary>
289
 
                public NodeEventArgs Args
290
 
                {
291
 
                        get { return args; }
292
 
                }
293
 
                #endregion
294
 
 
295
 
                #region Constructor
296
 
                /// <summary>
297
 
                /// Initializes a new instance of the object.
298
 
                /// </summary>
299
 
                /// <param name="type">Type of change long event.</param>
300
 
                /// <param name="args">Context for the event.</param>
301
 
                public ChangeLogEvent( ChangeEventType type, NodeEventArgs args )
302
 
                {
303
 
                        this.type = type;
304
 
                        this.args = args;
305
 
                }
306
 
                #endregion
307
 
        }
308
 
 
309
 
        /// <summary>
310
 
        /// Exception that indicates that the event context cookie has expired and that
311
 
        /// sync must dredge for changes.
312
 
        /// </summary>
313
 
        public class CookieExpiredException : SimiasException
314
 
        {
315
 
                #region Constructors
316
 
                /// <summary>
317
 
                /// Initializes a new instance of the object class.
318
 
                /// </summary>
319
 
                public CookieExpiredException() :
320
 
                        base ( "The event context cookie has expired." )
321
 
                {
322
 
                }
323
 
 
324
 
                /// <summary>
325
 
                /// Initializes a new instance of the object class.
326
 
                /// </summary>
327
 
                /// <param name="innerException">The exception that cause the cookie to be invalid.</param>
328
 
                public CookieExpiredException( Exception innerException ) :
329
 
                        base ( "The event context cookie has expired.", innerException )
330
 
                {
331
 
                }
332
 
                #endregion
333
 
        }
334
 
 
335
 
 
336
 
        /// <summary>
337
 
        /// Class used as a context object to look up the next set of events for a specified ChangeLogReader
338
 
        /// object.
339
 
        /// </summary>
340
 
        [ Serializable ]
341
 
        public class EventContext
342
 
        {
343
 
                #region Class Members
344
 
                /// <summary>
345
 
                /// Date and time of event.
346
 
                /// </summary>
347
 
                private DateTime timeStamp;
348
 
 
349
 
                /// <summary>
350
 
                /// Assigned event ID.
351
 
                /// </summary>
352
 
                private ulong recordID;
353
 
 
354
 
                /// <summary>
355
 
                /// Hint to where the last record was read from in the file. It may not be valid.
356
 
                /// </summary>
357
 
                private long hint;
358
 
 
359
 
                /// <summary>
360
 
                /// Used as separator in string representation.
361
 
                /// </summary>
362
 
                private const char valueSeparator = ':';
363
 
                #endregion
364
 
 
365
 
                #region Properties
366
 
                /// <summary>
367
 
                /// Gets the timestamp portion of the context.
368
 
                /// </summary>
369
 
                internal DateTime TimeStamp
370
 
                {
371
 
                        get { return timeStamp; }
372
 
                        set { timeStamp = value; }
373
 
                }
374
 
 
375
 
                /// <summary>
376
 
                /// Gets the record ID portion of the context.
377
 
                /// </summary>
378
 
                internal ulong RecordID
379
 
                {
380
 
                        get { return recordID; }
381
 
                        set { recordID = value; }
382
 
                }
383
 
 
384
 
                /// <summary>
385
 
                /// Gets the hint of where the last event was read from.
386
 
                /// </summary>
387
 
                internal long Hint
388
 
                {
389
 
                        get { return hint; }
390
 
                        set { hint = value; }
391
 
                }
392
 
                #endregion
393
 
 
394
 
                #region Constructor
395
 
                /// <summary>
396
 
                /// Initializes a new instance of the object class.
397
 
                /// </summary>
398
 
                /// <param name="timeStamp">Date and time of event.</param>
399
 
                /// <param name="recordID">Assigned event ID.</param>
400
 
                /// <param name="hint">Hint to where the last record was read from in the file. It may not be valid.</param>
401
 
                internal EventContext( DateTime timeStamp, ulong recordID, long hint )
402
 
                {
403
 
                        this.timeStamp = timeStamp;
404
 
                        this.recordID = recordID;
405
 
                        this.hint = hint;
406
 
                }
407
 
 
408
 
                /// <summary>
409
 
                /// Initializes a new instance from a string obtained from ToString
410
 
                /// </summary>
411
 
                /// <param name="cookie">The string representation of the context.</param>
412
 
                public EventContext( string cookie)
413
 
                {
414
 
                        // Default values if cookie string is bad.
415
 
                        timeStamp = DateTime.MinValue;
416
 
                        recordID = 0;
417
 
                        hint = LogFileHeader.RecordSize;
418
 
 
419
 
                        if ( ( cookie != null ) && ( cookie != String.Empty ) )
420
 
                        {
421
 
                                string [] values = cookie.Split( valueSeparator );
422
 
                                if ( values.Length == 3 )
423
 
                                {
424
 
                                        timeStamp = new DateTime( long.Parse( values[ 0 ] ) );
425
 
                                        recordID = ulong.Parse( values[ 1 ] );
426
 
                                        hint = long.Parse( values[ 2 ] );
427
 
                                }
428
 
                        }
429
 
                }
430
 
 
431
 
                #endregion
432
 
 
433
 
                #region Public Methods
434
 
                /// <summary>
435
 
                /// Gets a string representation of this context.
436
 
                /// Can be used to store this cookie for later use.
437
 
                /// </summary>
438
 
                /// <returns>A formatted string representing the cookie.</returns>
439
 
                public override string ToString()
440
 
                {
441
 
                        return (timeStamp.Ticks.ToString() + valueSeparator + recordID.ToString() + valueSeparator + hint.ToString());
442
 
                }
443
 
                #endregion
444
 
        }
445
 
 
446
 
        /// <summary>
447
 
        /// Contains the layout of the LogFile header information.
448
 
        /// </summary>
449
 
        public class LogFileHeader
450
 
        {
451
 
                #region Class Members
452
 
                /// <summary>
453
 
                /// Encoded lengths of the object fields.
454
 
                /// </summary>
455
 
                private const int logFileIDSize = 16;
456
 
                private const int maxLogRecordsSize = 4;
457
 
                private const int maxFlagsSize = 4;
458
 
                private const int lastRecordSize = 8;
459
 
                private const int recordLocationSize = 8;
460
 
 
461
 
                /// <summary>
462
 
                /// This is the total encoded record size.
463
 
                /// </summary>
464
 
                private const int encodedRecordSize = logFileIDSize + 
465
 
                                                                                          maxLogRecordsSize + 
466
 
                                                                                          maxFlagsSize +
467
 
                                                                                          lastRecordSize +
468
 
                                                                                          recordLocationSize;
469
 
 
470
 
                /// <summary>
471
 
                /// Contains the identifier for this log file.
472
 
                /// </summary>
473
 
                private string logFileID;
474
 
 
475
 
                /// <summary>
476
 
                /// Maximum number of records to keep persisted in the file.
477
 
                /// </summary>
478
 
                private uint maxLogRecords;
479
 
 
480
 
                /// <summary>
481
 
                /// Flags
482
 
                /// </summary>
483
 
                private uint flags;
484
 
 
485
 
                /// <summary>
486
 
                /// Last record written to the file. This is just a hint and may
487
 
                /// or may not be valid.
488
 
                /// </summary>
489
 
                private ulong lastRecord;
490
 
 
491
 
                /// <summary>
492
 
                /// File position of last record written to the file. This is just
493
 
                /// a hint and may or may not be valid.
494
 
                /// </summary>
495
 
                private long recordLocation;
496
 
                #endregion
497
 
 
498
 
                #region Properties
499
 
                /// <summary>
500
 
                /// Gets the length of the record.
501
 
                /// </summary>
502
 
                public int Length
503
 
                {
504
 
                        get { return RecordSize; }
505
 
                }
506
 
 
507
 
                /// <summary>
508
 
                /// Gets or sets the logFileID.
509
 
                /// </summary>
510
 
                public string LogFileID
511
 
                {
512
 
                        get { return logFileID; }
513
 
                        set { logFileID = value; }
514
 
                }
515
 
 
516
 
                /// <summary>
517
 
                /// Gets or sets the maximum number of ChangeLog records in the file.
518
 
                /// </summary>
519
 
                public uint MaxLogRecords
520
 
                {
521
 
                        get { return maxLogRecords; }
522
 
                        set { maxLogRecords = value; }
523
 
                }
524
 
 
525
 
                /// <summary>
526
 
                /// Returns the size of the LogFileHeader record.
527
 
                /// </summary>
528
 
                static public int RecordSize
529
 
                {
530
 
                        get { return encodedRecordSize; }
531
 
                }
532
 
 
533
 
                /// <summary>
534
 
                /// Gets or sets the last record written to the file. This is only a
535
 
                /// hint and may or may not be valid.
536
 
                /// </summary>
537
 
                public ulong LastRecord
538
 
                {
539
 
                        get { return lastRecord; }
540
 
                        set { lastRecord = value; }
541
 
                }
542
 
 
543
 
                /// <summary>
544
 
                /// Gets or sets the file position of the last record in the file.
545
 
                /// This is just a hint and may or may not be valid.
546
 
                /// </summary>
547
 
                public long RecordLocation
548
 
                {
549
 
                        get { return recordLocation; }
550
 
                        set { recordLocation = value; }
551
 
                }
552
 
                #endregion
553
 
 
554
 
                #region Constructor
555
 
                /// <summary>
556
 
                /// Initializes a new instance of the struct.
557
 
                /// </summary>
558
 
                /// <param name="ID">Contains the identifier for this log file.</param>
559
 
                /// <param name="maxRecords">Maximum number of records to keep persisted in the file.</param>
560
 
                public LogFileHeader( string ID, uint maxRecords )
561
 
                {
562
 
                        logFileID = ID;
563
 
                        maxLogRecords = maxRecords;
564
 
                        flags = 0;
565
 
                        lastRecord = 0;
566
 
                        recordLocation = 0;
567
 
                }
568
 
 
569
 
                /// <summary>
570
 
                /// Initializes a new instance of the struct from an encoded byte array.
571
 
                /// </summary>
572
 
                /// <param name="encodedRecord">LogFileHeader encoded record.</param>
573
 
                public LogFileHeader( byte[] encodedRecord )
574
 
                {
575
 
                        int index = 0;
576
 
 
577
 
                        byte[] guidArray = new byte[ 16 ];
578
 
                        Array.Copy( encodedRecord, 0, guidArray, 0, guidArray.Length );
579
 
                        Guid guid = new Guid( guidArray );
580
 
                        logFileID = guid.ToString();
581
 
                        index += logFileIDSize;
582
 
 
583
 
                        maxLogRecords = BitConverter.ToUInt32( encodedRecord, index );
584
 
                        index += maxLogRecordsSize;
585
 
 
586
 
                        flags = BitConverter.ToUInt32( encodedRecord, index );
587
 
                        index += maxFlagsSize;
588
 
 
589
 
                        lastRecord = BitConverter.ToUInt64( encodedRecord, index );
590
 
                        index += lastRecordSize;
591
 
 
592
 
                        recordLocation = BitConverter.ToInt64( encodedRecord, index );
593
 
                        index += recordLocationSize;
594
 
                }
595
 
                #endregion
596
 
 
597
 
                #region Public Methods
598
 
                /// <summary>
599
 
                /// Converts the object to a formatted byte array.
600
 
                /// </summary>
601
 
                /// <returns>A formatted byte array containing the LogFileHeader data.</returns>
602
 
                public byte[] ToByteArray()
603
 
                {
604
 
                        int index = 0;
605
 
                        byte[] result = new byte[ RecordSize ];
606
 
 
607
 
                        // Convert the object members to byte arrays.
608
 
                        Guid guid = new Guid( logFileID );
609
 
                        byte[] lfi = guid.ToByteArray();
610
 
                        byte[] mlr = BitConverter.GetBytes( maxLogRecords );
611
 
                        byte[] flg = BitConverter.GetBytes( flags );
612
 
                        byte[] lr = BitConverter.GetBytes( lastRecord );
613
 
                        byte[] rl = BitConverter.GetBytes( recordLocation );
614
 
 
615
 
                        // Copy the converted byte arrays to the resulting array.
616
 
                        Array.Copy( lfi, 0, result, index, lfi.Length );
617
 
                        index += lfi.Length;
618
 
 
619
 
                        Array.Copy( mlr, 0, result, index, mlr.Length );
620
 
                        index += mlr.Length;
621
 
 
622
 
                        Array.Copy( flg, 0, result, index, flg.Length );
623
 
                        index += flg.Length;
624
 
 
625
 
                        Array.Copy( lr, 0, result, index, lr.Length );
626
 
                        index += lr.Length;
627
 
 
628
 
                        Array.Copy( rl, 0, result, index, rl.Length );
629
 
                        index += rl.Length;
630
 
 
631
 
                        return result;
632
 
                }
633
 
                #endregion
634
 
        }
635
 
 
636
 
        /// <summary>
637
 
        /// Contains the layout of a ChangeLog record.
638
 
        /// </summary>
639
 
        public class ChangeLogRecord
640
 
        {
641
 
                #region Class Members
642
 
                /// <summary>
643
 
                /// Recordable change log operations.
644
 
                /// </summary>
645
 
                public enum ChangeLogOp
646
 
                {
647
 
                        /// <summary>
648
 
                        /// The node exists but no log record has been created.
649
 
                        /// Do a brute force sync.
650
 
                        /// </summary>
651
 
                        Unknown,
652
 
 
653
 
                        /// <summary>
654
 
                        /// Node object was created.
655
 
                        /// </summary>
656
 
                        Created,
657
 
 
658
 
                        /// <summary>
659
 
                        /// Node object was deleted.
660
 
                        /// </summary>
661
 
                        Deleted,
662
 
 
663
 
                        /// <summary>
664
 
                        /// Node object was changed.
665
 
                        /// </summary>
666
 
                        Changed,
667
 
 
668
 
                        /// <summary>
669
 
                        /// Node object was renamed.
670
 
                        /// </summary>
671
 
                        Renamed
672
 
                };
673
 
 
674
 
                /// <summary>
675
 
                /// Encoded lengths of the object fields.
676
 
                /// </summary>
677
 
                private const int recordIDSize = 8;
678
 
                private const int epochSize = 8;
679
 
                private const int nodeIDSize = 16;
680
 
                private const int operationSize = 4;
681
 
                private const int flagSize = 2;
682
 
                private const int masterRevSize = 8;
683
 
                private const int slaveRevSize = 8;
684
 
                private const int fileLengthSize = 8;
685
 
                private const int typeSize = 4;
686
 
 
687
 
                /// <summary>
688
 
                /// This is the total encoded record size.
689
 
                /// </summary>
690
 
                private const int encodedRecordSize = recordIDSize + 
691
 
                                                                                          epochSize + 
692
 
                                                                                          nodeIDSize + 
693
 
                                                                                          operationSize + 
694
 
                                                                                          flagSize +
695
 
                                                                                          masterRevSize +
696
 
                                                                                          slaveRevSize +
697
 
                                                                                          fileLengthSize +
698
 
                                                                                          typeSize;
699
 
 
700
 
                /// <summary>
701
 
                /// Record identitifer for this entry.
702
 
                /// </summary>
703
 
                private ulong recordID;
704
 
 
705
 
                /// <summary>
706
 
                /// Date and time that event was recorded.
707
 
                /// </summary>
708
 
                private DateTime epoch;
709
 
 
710
 
                /// <summary>
711
 
                /// Identifier of Node object that triggered the event.
712
 
                /// </summary>
713
 
                private string nodeID;
714
 
 
715
 
                /// <summary>
716
 
                /// Node operation type.
717
 
                /// </summary>
718
 
                private ChangeLogOp operation;
719
 
 
720
 
                /// <summary>
721
 
                /// Flags passed to the event.
722
 
                /// </summary>
723
 
                private ushort flags;
724
 
 
725
 
                /// <summary>
726
 
                /// Master revision of node.
727
 
                /// </summary>
728
 
                private ulong masterRev;
729
 
 
730
 
                /// <summary>
731
 
                /// Local revision of node.
732
 
                /// </summary>
733
 
                private ulong slaveRev;
734
 
 
735
 
                /// <summary>
736
 
                /// Length of the file represented by the node if the node is a BaseFileTypeNode.
737
 
                /// </summary>
738
 
                private long fileLength;
739
 
 
740
 
                /// <summary>
741
 
                /// Base type of node.
742
 
                /// </summary>
743
 
                private NodeTypes.NodeTypeEnum type;
744
 
                #endregion
745
 
 
746
 
                #region Properties
747
 
                /// <summary>
748
 
                /// Gets or sets the record epoch.
749
 
                /// </summary>
750
 
                public DateTime Epoch
751
 
                {
752
 
                        get { return epoch; }
753
 
                        set { epoch = value; }
754
 
                }
755
 
 
756
 
                /// <summary>
757
 
                /// Gets or sets the event ID.
758
 
                /// </summary>
759
 
                public string EventID
760
 
                {
761
 
                        get { return nodeID; }
762
 
                        set { nodeID = value; }
763
 
                }
764
 
 
765
 
                /// <summary>
766
 
                /// Gets the length of the record.
767
 
                /// </summary>
768
 
                public int Length
769
 
                {
770
 
                        get { return RecordSize; }
771
 
                }
772
 
 
773
 
                /// <summary>
774
 
                /// Gets or set the event operation.
775
 
                /// </summary>
776
 
                public ChangeLogOp Operation
777
 
                {
778
 
                        get { return operation; }
779
 
                        set { operation = value; }
780
 
                }
781
 
 
782
 
                /// <summary>
783
 
                /// Gets or sets the record ID.
784
 
                /// </summary>
785
 
                public ulong RecordID
786
 
                {
787
 
                        get { return recordID; }
788
 
                        set { recordID = value; }
789
 
                }
790
 
 
791
 
                /// <summary>
792
 
                /// Gets or sets the flags.
793
 
                /// </summary>
794
 
                public ushort Flags
795
 
                {
796
 
                        get { return flags; }
797
 
                        set { flags = value; }
798
 
                }
799
 
 
800
 
                /// <summary>
801
 
                /// Gets or sets the master revision value.
802
 
                /// </summary>
803
 
                public ulong MasterRev
804
 
                {
805
 
                        get { return masterRev; }
806
 
                        set { masterRev = value; }
807
 
                }
808
 
 
809
 
                /// <summary>
810
 
                /// Gets or sets the slave revision value.
811
 
                /// </summary>
812
 
                public ulong SlaveRev
813
 
                {
814
 
                        get { return slaveRev; }
815
 
                        set { slaveRev = value; }
816
 
                }
817
 
 
818
 
                /// <summary>
819
 
                /// Gets or sets the file length value.
820
 
                /// </summary>
821
 
                public long FileLength
822
 
                {
823
 
                        get { return fileLength; }
824
 
                        set { fileLength = value; }
825
 
                }
826
 
 
827
 
                /// <summary>
828
 
                /// Gets or sets the base node type.
829
 
                /// </summary>
830
 
                public NodeTypes.NodeTypeEnum Type
831
 
                {
832
 
                        get { return type; }
833
 
                        set { type = value; }
834
 
                }
835
 
 
836
 
                /// <summary>
837
 
                /// Returns the size of the ChangeLogRecord.
838
 
                /// </summary>
839
 
                static public int RecordSize
840
 
                {
841
 
                        get { return encodedRecordSize; }
842
 
                }
843
 
                #endregion
844
 
 
845
 
                #region Constructor
846
 
                /// <summary>
847
 
                /// Initializes a new instance of the struct.
848
 
                /// </summary>
849
 
                /// <param name="operation">Node operation type.</param>
850
 
                /// <param name="args">NodeEventArgs object that contains the change information.</param>
851
 
                public ChangeLogRecord( ChangeLogOp operation, NodeEventArgs args )
852
 
                {
853
 
                        this.recordID = 0;
854
 
                        this.epoch = args.TimeStamp;
855
 
                        this.nodeID = args.ID;
856
 
                        this.operation = operation;
857
 
                        this.flags = ( ushort )args.Flags;
858
 
                        this.masterRev = args.MasterRev;
859
 
                        this.slaveRev = args.SlaveRev;
860
 
                        this.fileLength = args.FileSize;
861
 
                        try
862
 
                        {
863
 
                                this.type = ( NodeTypes.NodeTypeEnum )Enum.Parse( typeof( NodeTypes.NodeTypeEnum ), args.Type );
864
 
                        }
865
 
                        catch
866
 
                        {
867
 
                                this.type = NodeTypes.NodeTypeEnum.Node;
868
 
                        }
869
 
                }
870
 
 
871
 
                /// <summary>
872
 
                /// Initializes a new instance of the struct from an encoded byte array.
873
 
                /// </summary>
874
 
                /// <param name="encodedRecord">ChangeLogRecord encoded record.</param>
875
 
                public ChangeLogRecord( byte[] encodedRecord ) :
876
 
                        this( encodedRecord, 0 )
877
 
                {
878
 
                }
879
 
 
880
 
                /// <summary>
881
 
                /// Initializes a new instance of the struct from an encoded byte array and index.
882
 
                /// </summary>
883
 
                /// <param name="encodedRecord">ChangeLogRecord encoded record.</param>
884
 
                /// <param name="index">Start index into the byte array.</param>
885
 
                public ChangeLogRecord( byte[] encodedRecord, int index )
886
 
                {
887
 
                        recordID = BitConverter.ToUInt64( encodedRecord, index );
888
 
                        index += recordIDSize;
889
 
 
890
 
                        epoch = new DateTime( BitConverter.ToInt64( encodedRecord, index ) );
891
 
                        index += epochSize;
892
 
 
893
 
                        byte[] guidArray = new byte[ 16 ];
894
 
                        Array.Copy( encodedRecord, index, guidArray, 0, guidArray.Length );
895
 
                        Guid guid = new Guid( guidArray );
896
 
                        nodeID = guid.ToString();
897
 
                        index += nodeIDSize;
898
 
 
899
 
                        operation = ( ChangeLogOp )Enum.ToObject( typeof( ChangeLogOp ), BitConverter.ToInt32( encodedRecord, index ) );
900
 
                        index += operationSize;
901
 
 
902
 
                        flags = BitConverter.ToUInt16( encodedRecord, index );
903
 
                        index += flagSize;
904
 
 
905
 
                        masterRev = BitConverter.ToUInt64( encodedRecord, index );
906
 
                        index += masterRevSize;
907
 
 
908
 
                        slaveRev = BitConverter.ToUInt64( encodedRecord, index );
909
 
                        index += slaveRevSize;
910
 
 
911
 
                        fileLength = BitConverter.ToInt64( encodedRecord, index );
912
 
                        index += fileLengthSize;
913
 
 
914
 
                        try
915
 
                        {
916
 
                                type = ( NodeTypes.NodeTypeEnum )Enum.ToObject( typeof( NodeTypes.NodeTypeEnum ), BitConverter.ToInt32( encodedRecord, index ) );
917
 
                        }
918
 
                        catch
919
 
                        {
920
 
                                type = NodeTypes.NodeTypeEnum.Node;
921
 
                        }
922
 
                        index += typeSize;
923
 
                }
924
 
                #endregion
925
 
 
926
 
                #region Public Methods
927
 
                /// <summary>
928
 
                /// Converts the object to a formatted byte array.
929
 
                /// </summary>
930
 
                /// <returns>A formatted byte array containing the ChangeLogRecord data.</returns>
931
 
                public byte[] ToByteArray()
932
 
                {
933
 
                        int index = 0;
934
 
                        byte[] result = new byte[ RecordSize ];
935
 
 
936
 
                        // Convert the object members to byte arrays.
937
 
                        Guid guid = new Guid( nodeID );
938
 
                        byte[] nid = guid.ToByteArray();
939
 
                        byte[] rid = BitConverter.GetBytes( recordID );
940
 
                        byte[] ep = BitConverter.GetBytes( epoch.Ticks );
941
 
                        byte[] op = BitConverter.GetBytes( ( int )operation );
942
 
                        byte[] fl = BitConverter.GetBytes( flags );
943
 
                        byte[] mr = BitConverter.GetBytes( masterRev );
944
 
                        byte[] sr = BitConverter.GetBytes( slaveRev );
945
 
                        byte[] fil = BitConverter.GetBytes( fileLength );
946
 
                        byte[] tp = BitConverter.GetBytes( ( int )type );
947
 
 
948
 
                        // Copy the converted byte arrays to the resulting array.
949
 
                        Array.Copy( rid, 0, result, index, rid.Length );
950
 
                        index += rid.Length;
951
 
 
952
 
                        Array.Copy( ep, 0, result, index, ep.Length );
953
 
                        index += ep.Length;
954
 
 
955
 
                        Array.Copy( nid, 0, result, index, nid.Length );
956
 
                        index += nid.Length;
957
 
 
958
 
                        Array.Copy( op, 0, result, index, op.Length );
959
 
                        index += op.Length;
960
 
 
961
 
                        Array.Copy( fl, 0, result, index, fl.Length );
962
 
                        index += fl.Length;
963
 
 
964
 
                        Array.Copy( mr, 0, result, index, mr.Length );
965
 
                        index += mr.Length;
966
 
 
967
 
                        Array.Copy( sr, 0, result, index, sr.Length );
968
 
                        index += sr.Length;
969
 
 
970
 
                        Array.Copy( fil, 0, result, index, fil.Length );
971
 
                        index += fil.Length;
972
 
 
973
 
                        Array.Copy( tp, 0, result, index, tp.Length );
974
 
                        index += tp.Length;
975
 
 
976
 
                        return result;
977
 
                }
978
 
                #endregion
979
 
        }
980
 
 
981
 
        /// <summary>
982
 
        /// Class that lets the ChangeLog operate as a thread service.
983
 
        /// </summary>
984
 
        public class ChangeLog : IThreadService
985
 
        {
986
 
                #region Class Members
987
 
                /// <summary>
988
 
                /// Used to log messages.
989
 
                /// </summary>
990
 
                private static readonly ISimiasLog log = SimiasLogManager.GetLogger( typeof( ChangeLog ) );
991
 
 
992
 
                /// <summary>
993
 
                /// Table used to keep track of ChangeLogWriter objects.
994
 
                /// </summary>
995
 
                private static Hashtable logWriterTable = new Hashtable();
996
 
                #endregion
997
 
 
998
 
                #region Constructor
999
 
                /// <summary>
1000
 
                /// Initializes a new instance of the object class.
1001
 
                /// </summary>
1002
 
                public ChangeLog()
1003
 
                {
1004
 
                }
1005
 
                #endregion
1006
 
 
1007
 
                #region Public Methods
1008
 
                /// <summary>
1009
 
                /// Creates a change log writer for the specified collection.
1010
 
                /// </summary>
1011
 
                /// <param name="collectionID">The identifier for the collection.</param>
1012
 
                public void CreateChangeLogWriter( string collectionID )
1013
 
                {
1014
 
                        lock ( logWriterTable )
1015
 
                        {
1016
 
                                if ( !logWriterTable.ContainsKey( collectionID ) )
1017
 
                                {
1018
 
                                        // Allocate a ChangeLogWriter object for this collection and store it in the table.
1019
 
                                        logWriterTable.Add( collectionID, new ChangeLogWriter( collectionID ) );
1020
 
                                        log.Debug( "Added ChangeLogWriter for collection {0}", collectionID );
1021
 
                                }
1022
 
                        }
1023
 
                }
1024
 
 
1025
 
                /// <summary>
1026
 
                /// Deletes a change log writer for the specified collection.
1027
 
                /// </summary>
1028
 
                /// <param name="collectionID">The identifier for the collection.</param>
1029
 
                public void DeleteChangeLogWriter( string collectionID )
1030
 
                {
1031
 
                        lock ( logWriterTable )
1032
 
                        {
1033
 
                                // Make sure the writer is in the table.
1034
 
                                if ( logWriterTable.ContainsKey( collectionID ) )
1035
 
                                {
1036
 
                                        // Remove the ChangeLogWriter object from the table and dispose it.
1037
 
                                        ChangeLogWriter logWriter = logWriterTable[ collectionID ] as ChangeLogWriter;
1038
 
                                        logWriterTable.Remove( collectionID );
1039
 
 
1040
 
                                        // Get the path to the file before disposing it.
1041
 
                                        string logPath = logWriter.LogFile;
1042
 
                                        logWriter.Dispose();
1043
 
 
1044
 
                                        try { File.Delete( logPath ); } 
1045
 
                                        catch {}
1046
 
 
1047
 
                                        log.Debug( "Deleted ChangeLogWriter for collection {0}", collectionID );
1048
 
                                }
1049
 
                        }
1050
 
                }
1051
 
                #endregion
1052
 
 
1053
 
                #region IThreadService Members
1054
 
                /// <summary>
1055
 
                /// Starts the thread service.
1056
 
                /// </summary>
1057
 
                public void Start()
1058
 
                {
1059
 
                        // Get a store object.
1060
 
                        Store store = Store.GetStore();
1061
 
 
1062
 
                        // Get all of the collection objects and set up listeners for them.
1063
 
                        foreach (ShallowNode sn in store)
1064
 
                        {
1065
 
                                CreateChangeLogWriter( sn.ID );
1066
 
                        }
1067
 
 
1068
 
                        log.Info( "Change Log Service started." );
1069
 
                }
1070
 
 
1071
 
                /// <summary>
1072
 
                /// Resumes a paused service. 
1073
 
                /// </summary>
1074
 
                public void Resume()
1075
 
                {
1076
 
                }
1077
 
 
1078
 
                /// <summary>
1079
 
                /// Pauses a service's execution.
1080
 
                /// </summary>
1081
 
                public void Pause()
1082
 
                {
1083
 
                }
1084
 
 
1085
 
                /// <summary>
1086
 
                /// Custom.
1087
 
                /// </summary>
1088
 
                /// <param name="message"></param>
1089
 
                /// <param name="data"></param>
1090
 
                public int Custom(int message, string data)
1091
 
                {
1092
 
                        return 0;
1093
 
                }
1094
 
 
1095
 
                /// <summary>
1096
 
                /// Stops the service from executing.
1097
 
                /// </summary>
1098
 
                public void Stop()
1099
 
                {
1100
 
                        // Remove all of the log writers from the table and dispose them.
1101
 
                        lock ( logWriterTable )
1102
 
                        {
1103
 
                                foreach ( ChangeLogWriter logWriter in logWriterTable.Values )
1104
 
                                {
1105
 
                                        logWriter.Dispose();
1106
 
                                }
1107
 
 
1108
 
                                // Clear the hashtable.
1109
 
                                logWriterTable.Clear();
1110
 
                        }
1111
 
 
1112
 
                        log.Info( "Change Log Service stopped." );
1113
 
                }
1114
 
                #endregion
1115
 
        }
1116
 
 
1117
 
        /// <summary>
1118
 
        /// Object that records all changes to the Collection Store by listening to the Store events.
1119
 
        /// </summary>
1120
 
        internal class ChangeLogWriter : IDisposable
1121
 
        {
1122
 
                #region Class Members
1123
 
                /// <summary>
1124
 
                /// Default maximum number of records to persist.
1125
 
                /// </summary>
1126
 
                private const uint defaultMaxPersistedRecords = 25000;
1127
 
 
1128
 
                /// <summary>
1129
 
                /// Used to log messages.
1130
 
                /// </summary>
1131
 
                private static readonly ISimiasLog log = SimiasLogManager.GetLogger( typeof( ChangeLogWriter ) );
1132
 
 
1133
 
                /// <summary>
1134
 
                /// Collection that the log file belongs to.
1135
 
                /// </summary>
1136
 
                private string collectionID;
1137
 
        
1138
 
                /// <summary>
1139
 
                /// Inprocess mutex used to control access to the log file.
1140
 
                /// </summary>
1141
 
                private LogMutex mutex;
1142
 
 
1143
 
                /// <summary>
1144
 
                /// Specifies whether object is viable.
1145
 
                /// </summary>
1146
 
                private bool disposed = false;
1147
 
 
1148
 
                /// <summary>
1149
 
                /// Subscribes to Collection Store node events.
1150
 
                /// </summary>
1151
 
                private EventSubscriber subscriber;
1152
 
 
1153
 
                /// <summary>
1154
 
                /// Contains absolute path to the logfile.
1155
 
                /// </summary>
1156
 
                private string logFilePath;
1157
 
 
1158
 
                /// <summary>
1159
 
                /// Contains the next record ID to assign to a ChangeLogRecord.
1160
 
                /// </summary>
1161
 
                private ulong recordID = 0;
1162
 
 
1163
 
                /// <summary>
1164
 
                /// Contains the offset in the file to write.
1165
 
                /// </summary>
1166
 
                private long writePosition = LogFileHeader.RecordSize;
1167
 
 
1168
 
                /// <summary>
1169
 
                /// Contains the last write position boundary.
1170
 
                /// </summary>
1171
 
                private long maxWritePosition = ( defaultMaxPersistedRecords * ChangeLogRecord.RecordSize ) + LogFileHeader.RecordSize;
1172
 
 
1173
 
                /// <summary>
1174
 
                /// Queue used to process change events so the event thread does not have to block.
1175
 
                /// </summary>
1176
 
                private Queue eventQueue = new Queue();
1177
 
 
1178
 
                /// <summary>
1179
 
                /// Flag that indicates if a thread is processing work on the queue.
1180
 
                /// </summary>
1181
 
                private bool threadScheduled = false;
1182
 
 
1183
 
                /// <summary>
1184
 
                /// Contains the header to the log file.
1185
 
                /// </summary>
1186
 
                private LogFileHeader logHeader;
1187
 
                #endregion
1188
 
 
1189
 
                #region Properties
1190
 
                /// <summary>
1191
 
                /// Gets the file path for this change log.
1192
 
                /// </summary>
1193
 
                public string LogFile
1194
 
                {
1195
 
                        get { return logFilePath; }
1196
 
                }
1197
 
                #endregion
1198
 
 
1199
 
                #region Constructor
1200
 
                /// <summary>
1201
 
                /// Initializes a new instance of the object class.
1202
 
                /// </summary>
1203
 
                /// <param name="collectionID">Collection identifier to listen for events.</param>
1204
 
                public ChangeLogWriter( string collectionID )
1205
 
                {
1206
 
                        this.collectionID = collectionID;
1207
 
 
1208
 
                        // Create the mutex that will protect this logfile.
1209
 
                        mutex = new LogMutex( collectionID, true );
1210
 
                        try
1211
 
                        {
1212
 
                                // Get the path to the store managed directory for this collection.
1213
 
                                string logFileDir = Path.Combine( Store.StorePath, "changelog" );
1214
 
                                if ( !Directory.Exists( logFileDir ) )
1215
 
                                {
1216
 
                                        Directory.CreateDirectory( logFileDir );
1217
 
                                }
1218
 
 
1219
 
                                // Build the log file path.
1220
 
                                logFilePath = Path.Combine( logFileDir, collectionID + ".changelog" );
1221
 
 
1222
 
                                // Check to see if the file exists.
1223
 
                                if ( !File.Exists( logFilePath ) )
1224
 
                                {
1225
 
                                        // Create the file.
1226
 
                                        FileStream fs = new FileStream( logFilePath, FileMode.CreateNew, FileAccess.ReadWrite );
1227
 
                                        try
1228
 
                                        {
1229
 
                                                // Create the log file header.
1230
 
                                                logHeader = CreateLogFileHeader( fs, collectionID );
1231
 
                                        }
1232
 
                                        finally
1233
 
                                        {
1234
 
                                                fs.Close();
1235
 
                                        }
1236
 
                                }
1237
 
                                else
1238
 
                                {
1239
 
                                        // Open the existing log file.
1240
 
                                        FileStream fs = new FileStream( logFilePath, FileMode.Open, FileAccess.ReadWrite );
1241
 
                                        try
1242
 
                                        {
1243
 
                                                // Get the log header.
1244
 
                                                logHeader = GetLogFileHeader( fs );
1245
 
 
1246
 
                                                // Check to see if the log file was shutdown gracefully.
1247
 
                                                if ( CheckIntegrity( fs, logHeader, collectionID ) )
1248
 
                                                {
1249
 
                                                        // Setup the current write position.
1250
 
                                                        SetCurrentWritePosition( fs );
1251
 
                                                }
1252
 
                                        }
1253
 
                                        finally
1254
 
                                        {
1255
 
                                                fs.Close();
1256
 
                                        }
1257
 
                                }
1258
 
 
1259
 
                                // Setup the event listeners.
1260
 
                                subscriber = new EventSubscriber(collectionID );
1261
 
                                subscriber.NodeChanged += new NodeEventHandler( OnNodeChange );
1262
 
                                subscriber.NodeCreated += new NodeEventHandler( OnNodeCreate );
1263
 
                                subscriber.NodeDeleted += new NodeEventHandler( OnNodeDelete );
1264
 
                        }
1265
 
                        finally
1266
 
                        {
1267
 
                                mutex.ReleaseMutex( collectionID );
1268
 
                        }
1269
 
                }
1270
 
                #endregion
1271
 
 
1272
 
                #region Private Methods
1273
 
                /// <summary>
1274
 
                /// Checks to see if the file header is valid. If it was not, the file contents are
1275
 
                /// truncated and reinitialized.
1276
 
                /// </summary>
1277
 
                /// <param name="fs">File stream that reference the log file.</param>
1278
 
                /// <param name="header">The log file header.</param>
1279
 
                /// <param name="collectionID">ID of the collection being monitored.</param>
1280
 
                /// <returns>True if the file data is good, otherwise false.</returns>
1281
 
                private bool CheckIntegrity( FileStream fs, LogFileHeader header, string collectionID )
1282
 
                {
1283
 
                        bool result = true;
1284
 
 
1285
 
                        if ( header.LogFileID != collectionID )
1286
 
                        {
1287
 
                                log.Error( "Log file corrupted. Reinitializing contents." );
1288
 
 
1289
 
                                // Truncate the file data.
1290
 
                                fs.SetLength( 0 );
1291
 
                                logHeader = CreateLogFileHeader( fs, collectionID );
1292
 
                                result = false;
1293
 
                        }
1294
 
 
1295
 
                        return result;
1296
 
                }
1297
 
 
1298
 
                /// <summary>
1299
 
                /// Creates a default log file header and writes it to the log file.
1300
 
                /// </summary>
1301
 
                /// <param name="fs">File stream that reference the log file.</param>
1302
 
                /// <param name="collectionID">ID of collection being monitored.</param>
1303
 
                /// <returns>A LogFileHeader object.</returns>
1304
 
                private LogFileHeader CreateLogFileHeader( FileStream fs, string collectionID )
1305
 
                {
1306
 
                        // Build the new log file header.
1307
 
                        LogFileHeader header = new LogFileHeader( collectionID, defaultMaxPersistedRecords );
1308
 
 
1309
 
                        try
1310
 
                        {
1311
 
                                fs.Position = 0;
1312
 
                                fs.Write( header.ToByteArray(), 0, header.Length );
1313
 
                        }
1314
 
                        catch ( IOException e )
1315
 
                        {
1316
 
                                log.Error( "Failed to write log header. " + e.Message );
1317
 
                                throw;
1318
 
                        }
1319
 
 
1320
 
                        return header;
1321
 
                }
1322
 
 
1323
 
                /// <summary>
1324
 
                /// Looks for the next write position in the log file using the hint saved the last time the file
1325
 
                /// was written to. If the hint is invalid, then a brute force method is used.
1326
 
                /// </summary>
1327
 
                /// <param name="fs">FileStream object that references the log file.</param>
1328
 
                private bool FindCurrentWritePosition( FileStream fs )
1329
 
                {
1330
 
                        bool foundPosition = false;
1331
 
                        bool wrapped = false;
1332
 
 
1333
 
                        try
1334
 
                        {
1335
 
                                // See if a last position was saved.
1336
 
                                if ( logHeader.RecordLocation >= ( long )LogFileHeader.RecordSize )
1337
 
                                {
1338
 
                                        // Make sure that the saved record position in on a ChangeLogRecord boundary.
1339
 
                                        if ( ( ( logHeader.RecordLocation - ( long )LogFileHeader.RecordSize ) % ( long )ChangeLogRecord.RecordSize ) == 0 )
1340
 
                                        {
1341
 
                                                // Using the hint in the log file header, see if the write position still exists in the file.
1342
 
                                                if ( logHeader.RecordLocation == ( long )LogFileHeader.RecordSize )
1343
 
                                                {
1344
 
                                                        // There are no records in the log or the log has wrapped back to the beginning.
1345
 
                                                        if ( fs.Length >= ( LogFileHeader.RecordSize + ChangeLogRecord.RecordSize ) )
1346
 
                                                        {
1347
 
                                                                // The file has wrapped, read the entry from the end of the file.
1348
 
                                                                fs.Position = fs.Length - ChangeLogRecord.RecordSize;
1349
 
                                                                wrapped = true;
1350
 
                                                        }
1351
 
                                                        else
1352
 
                                                        {
1353
 
                                                                // There are no entries in the file. No need to do a read.
1354
 
                                                                writePosition = logHeader.RecordLocation;
1355
 
                                                                recordID = logHeader.LastRecord;
1356
 
                                                                foundPosition = true;
1357
 
                                                        }
1358
 
                                                }
1359
 
                                                else
1360
 
                                                {
1361
 
                                                        // There is at least one ChangeLogRecord and the file hasn't wrapped.
1362
 
                                                        fs.Position = logHeader.RecordLocation - ChangeLogRecord.RecordSize;
1363
 
                                                }
1364
 
 
1365
 
                                                // No need to read if the offset was already found.
1366
 
                                                if ( foundPosition == false )
1367
 
                                                {
1368
 
                                                        // Read the last valid change log record that was written. 
1369
 
                                                        byte[] buffer = new byte[ ChangeLogRecord.RecordSize ];
1370
 
                                                        int bytesRead = fs.Read( buffer, 0, buffer.Length );
1371
 
                                                        if ( bytesRead > 0 )
1372
 
                                                        {
1373
 
                                                                ChangeLogRecord record = new ChangeLogRecord( buffer );
1374
 
                                                                ulong lastRecordID = record.RecordID;
1375
 
 
1376
 
                                                                if ( record.RecordID == ( logHeader.LastRecord - 1 ) )
1377
 
                                                                {
1378
 
                                                                        // Position the next for the next read if the event log has rolled over.
1379
 
                                                                        if ( wrapped )
1380
 
                                                                        {
1381
 
                                                                                fs.Position = LogFileHeader.RecordSize;
1382
 
                                                                        }
1383
 
 
1384
 
                                                                        // If there is a next record, then the file has rolled over and we need
1385
 
                                                                        // to check the record ID to make sure it is less that the value that we
1386
 
                                                                        // saved. If there is not an other record, then the hint is valid.
1387
 
                                                                        bytesRead = fs.Read( buffer, 0, buffer.Length );
1388
 
                                                                        if ( bytesRead > 0 )
1389
 
                                                                        {
1390
 
                                                                                record = new ChangeLogRecord( buffer );
1391
 
                                                                                if ( lastRecordID > record.RecordID )
1392
 
                                                                                {
1393
 
                                                                                        // This is the rollover point. The saved location is valid.
1394
 
                                                                                        writePosition = logHeader.RecordLocation;
1395
 
                                                                                        recordID = logHeader.LastRecord;
1396
 
                                                                                        foundPosition = true;
1397
 
                                                                                }
1398
 
                                                                        }
1399
 
                                                                        else
1400
 
                                                                        {
1401
 
                                                                                // There are no more records and the file hasn't rolled over. The
1402
 
                                                                                // saved location is valid.
1403
 
                                                                                writePosition = logHeader.RecordLocation;
1404
 
                                                                                recordID = logHeader.LastRecord;
1405
 
                                                                                foundPosition = true;
1406
 
                                                                        }
1407
 
                                                                }
1408
 
                                                        }
1409
 
                                                }
1410
 
                                        }
1411
 
                                }
1412
 
                        }
1413
 
                        catch ( IOException e )
1414
 
                        {
1415
 
                                log.Error( "FindCurrentWritePosition():" + e.Message );
1416
 
                        }
1417
 
 
1418
 
                        return foundPosition;
1419
 
                }
1420
 
 
1421
 
                /// <summary>
1422
 
                /// Gets the log file header for the specified stream.
1423
 
                /// </summary>
1424
 
                /// <param name="fs">File stream containing the header.</param>
1425
 
                /// <returns>A LogFileHeader object from the specified file stream if successful. Otherwise
1426
 
                /// a null is returned.</returns>
1427
 
                private LogFileHeader GetLogFileHeader( FileStream fs )
1428
 
                {
1429
 
                        LogFileHeader logFileHeader = null;
1430
 
 
1431
 
                        try
1432
 
                        {
1433
 
                                // Position the file pointer to the beginning of the file.
1434
 
                                fs.Position = 0;
1435
 
 
1436
 
                                // Read the data.
1437
 
                                byte[] buffer = new byte[ LogFileHeader.RecordSize ];
1438
 
                                int bytesRead = fs.Read( buffer, 0, buffer.Length );
1439
 
                                if ( bytesRead == buffer.Length )
1440
 
                                {
1441
 
                                        logFileHeader = new LogFileHeader( buffer );
1442
 
                                }
1443
 
                        }
1444
 
                        catch ( IOException e )
1445
 
                        {
1446
 
                                log.Error( "Failed to read event log header. {0}", e.Message );
1447
 
                                throw;
1448
 
                        }
1449
 
 
1450
 
                        return logFileHeader;
1451
 
                }
1452
 
 
1453
 
                /// <summary>
1454
 
                /// Delegate that is called when a Node object has been changed.
1455
 
                /// </summary>
1456
 
                /// <param name="args">Event arguments.</param>
1457
 
                private void OnNodeChange( NodeEventArgs args )
1458
 
                {
1459
 
                        // Don't indicate local events.
1460
 
                        if (((NodeEventArgs.EventFlags)args.Flags & NodeEventArgs.EventFlags.LocalOnly) == 0)
1461
 
                        {
1462
 
                                // Queue the event and schedule to come back later to process.
1463
 
                                lock ( eventQueue )
1464
 
                                {
1465
 
                                        // Add the event to the queue.
1466
 
                                        eventQueue.Enqueue( new ChangeLogEvent( ChangeLogEvent.ChangeEventType.NodeChange, args ) );
1467
 
                                        
1468
 
                                        // See if a thread has already been scheduled to take care of this event.
1469
 
                                        if ( threadScheduled == false )
1470
 
                                        {
1471
 
                                                ThreadPool.QueueUserWorkItem( new WaitCallback( ProcessChangeLogEvent ) );
1472
 
                                                threadScheduled = true;
1473
 
                                        }
1474
 
                                }
1475
 
                        }
1476
 
                }
1477
 
 
1478
 
                /// <summary>
1479
 
                /// Delegate that is called when a Node object has been created.
1480
 
                /// </summary>
1481
 
                /// <param name="args">Event arguments.</param>
1482
 
                private void OnNodeCreate( NodeEventArgs args )
1483
 
                {
1484
 
                        // Don't indicate local events.
1485
 
                        if (((NodeEventArgs.EventFlags)args.Flags & NodeEventArgs.EventFlags.LocalOnly) == 0)
1486
 
                        {
1487
 
                                // Queue the event and schedule to come back later to process.
1488
 
                                lock ( eventQueue )
1489
 
                                {
1490
 
                                        // Add the event to the queue.
1491
 
                                        eventQueue.Enqueue( new ChangeLogEvent( ChangeLogEvent.ChangeEventType.NodeCreate, args ) );
1492
 
                                        
1493
 
                                        // See if a thread has already been scheduled to take care of this event.
1494
 
                                        if ( threadScheduled == false )
1495
 
                                        {
1496
 
                                                ThreadPool.QueueUserWorkItem( new WaitCallback( ProcessChangeLogEvent ) );
1497
 
                                                threadScheduled = true;
1498
 
                                        }
1499
 
                                }
1500
 
                        }
1501
 
                }
1502
 
 
1503
 
                /// <summary>
1504
 
                /// Delegate that is called when a Node object has been deleted.
1505
 
                /// </summary>
1506
 
                /// <param name="args">Event arguments.</param>
1507
 
                private void OnNodeDelete( NodeEventArgs args )
1508
 
                {
1509
 
                        // Don't indicate local events.
1510
 
                        if (((NodeEventArgs.EventFlags)args.Flags & NodeEventArgs.EventFlags.LocalOnly) == 0)
1511
 
                        {
1512
 
                                // Queue the event and schedule to come back later to process.
1513
 
                                lock ( eventQueue )
1514
 
                                {
1515
 
                                        // Add the event to the queue.
1516
 
                                        eventQueue.Enqueue( new ChangeLogEvent( ChangeLogEvent.ChangeEventType.NodeDelete, args ) );
1517
 
                                        
1518
 
                                        // See if a thread has already been scheduled to take care of this event.
1519
 
                                        if ( threadScheduled == false )
1520
 
                                        {
1521
 
                                                ThreadPool.QueueUserWorkItem( new WaitCallback( ProcessChangeLogEvent ) );
1522
 
                                                threadScheduled = true;
1523
 
                                        }
1524
 
                                }
1525
 
                        }
1526
 
                }
1527
 
 
1528
 
                /// <summary>
1529
 
                /// Processes node created, changed and deleted events.
1530
 
                /// </summary>
1531
 
                /// <param name="state">Not used.</param>
1532
 
                private void ProcessChangeLogEvent( object state )
1533
 
                {
1534
 
                        while ( true )
1535
 
                        {
1536
 
                                ChangeLogEvent work = null;
1537
 
 
1538
 
                                // Lock the queue before accessing it to get the work to do.
1539
 
                                lock ( eventQueue )
1540
 
                                {
1541
 
                                        if ( eventQueue.Count > 0 )
1542
 
                                        {
1543
 
                                                work = eventQueue.Dequeue() as ChangeLogEvent;
1544
 
                                        }
1545
 
                                        else
1546
 
                                        {
1547
 
                                                threadScheduled = false;
1548
 
                                                break;
1549
 
                                        }
1550
 
                                }
1551
 
 
1552
 
                                switch ( work.Type )
1553
 
                                {
1554
 
                                        case ChangeLogEvent.ChangeEventType.NodeChange:
1555
 
                                        {
1556
 
                                                ChangeLogRecord record = new ChangeLogRecord( ChangeLogRecord.ChangeLogOp.Changed, work.Args );
1557
 
                                                WriteLog( record );
1558
 
                                                break;
1559
 
                                        }
1560
 
 
1561
 
                                        case ChangeLogEvent.ChangeEventType.NodeCreate:
1562
 
                                        {
1563
 
                                                ChangeLogRecord record = new ChangeLogRecord( ChangeLogRecord.ChangeLogOp.Created, work.Args );
1564
 
                                                WriteLog( record );
1565
 
                                                break;
1566
 
                                        }
1567
 
                                        
1568
 
                                        case ChangeLogEvent.ChangeEventType.NodeDelete:
1569
 
                                        {
1570
 
                                                ChangeLogRecord record = new ChangeLogRecord( ChangeLogRecord.ChangeLogOp.Deleted, work.Args );
1571
 
                                                WriteLog( record );
1572
 
                                                break;
1573
 
                                        }
1574
 
                                }
1575
 
                        }
1576
 
                }
1577
 
 
1578
 
                /// <summary>
1579
 
                /// Sets the next write position in the log file.
1580
 
                /// </summary>
1581
 
                /// <param name="fs">FileStream object that references the log file.</param>
1582
 
                private void SetCurrentWritePosition( FileStream fs )
1583
 
                {
1584
 
                        // See if the hint is valid so we don't have to brute-force the lookup.
1585
 
                        if ( !FindCurrentWritePosition( fs ) )
1586
 
                        {
1587
 
                                try
1588
 
                                {
1589
 
                                        // Allocate a buffer to hold the records that are read.
1590
 
                                        byte[] buffer = new byte[ ChangeLogRecord.RecordSize * 1000 ];
1591
 
 
1592
 
                                        // Skip over the file header.
1593
 
                                        fs.Position = LogFileHeader.RecordSize;
1594
 
 
1595
 
                                        // Read the first record.
1596
 
                                        int bytesRead = fs.Read( buffer, 0, ChangeLogRecord.RecordSize );
1597
 
                                        if ( bytesRead > 0 )
1598
 
                                        {
1599
 
                                                // Instanitate the first record to compare.
1600
 
                                                ChangeLogRecord record1 = new ChangeLogRecord( buffer );
1601
 
                                                ChangeLogRecord record2 = null;
1602
 
 
1603
 
                                                // Read the next bunch of records.
1604
 
                                                bytesRead = fs.Read( buffer, 0, buffer.Length );
1605
 
                                                while ( bytesRead > 0 )
1606
 
                                                {
1607
 
                                                        int index = 0;
1608
 
                                                        while ( ( index + ChangeLogRecord.RecordSize ) <= bytesRead )
1609
 
                                                        {
1610
 
                                                                // Instantiate the next record so the id's can be compared.
1611
 
                                                                record2 = new ChangeLogRecord( buffer, index );
1612
 
 
1613
 
                                                                // See if the record id has rolled over.
1614
 
                                                                if ( record1.RecordID > record2.RecordID )
1615
 
                                                                {
1616
 
                                                                        // Found the roll over point. Calculate the next write position.
1617
 
                                                                        writePosition = ( fs.Position - bytesRead ) + index;
1618
 
                                                                        recordID = record1.RecordID + 1;
1619
 
                                                                        bytesRead = 0;
1620
 
                                                                        break;
1621
 
                                                                }
1622
 
                                                                else
1623
 
                                                                {
1624
 
                                                                        // Record id's are still increasing.
1625
 
                                                                        index += ChangeLogRecord.RecordSize;
1626
 
                                                                        record1 = record2;
1627
 
                                                                }
1628
 
                                                        }
1629
 
 
1630
 
                                                        // If we haven't found the roll over point, keep reading.
1631
 
                                                        if ( bytesRead > 0 )
1632
 
                                                        {
1633
 
                                                                // Read the next buffer full.
1634
 
                                                                bytesRead = fs.Read( buffer, 0, buffer.Length );
1635
 
                                                        }
1636
 
                                                }
1637
 
 
1638
 
                                                // There is either only one record in the file or the end of the file has been reached without
1639
 
                                                // detecting the rollover point.
1640
 
                                                if ( ( record2 == null ) || ( record1 == record2 ) )
1641
 
                                                {
1642
 
                                                        // Next write position is the current position if it isn't at the size limit.
1643
 
                                                        writePosition = ( fs.Position >= maxWritePosition ) ? LogFileHeader.RecordSize : fs.Position;
1644
 
                                                        recordID = record1.RecordID + 1;
1645
 
                                                }
1646
 
                                        }
1647
 
                                }
1648
 
                                catch ( IOException e )
1649
 
                                {
1650
 
                                        log.Error( e.Message );
1651
 
                                }
1652
 
                        }
1653
 
                }
1654
 
 
1655
 
                /// <summary>
1656
 
                /// Saves the change log header to the file.
1657
 
                /// </summary>
1658
 
                private void SetLogFileHeader()
1659
 
                {
1660
 
                        // Acquire the mutex protecting the log file.
1661
 
                        mutex.WaitOne( collectionID );
1662
 
                        try
1663
 
                        {
1664
 
                                try
1665
 
                                {
1666
 
                                        // Open the log file.
1667
 
                                        FileStream fs = new FileStream( logFilePath, FileMode.Open, FileAccess.ReadWrite );
1668
 
                                        try
1669
 
                                        {
1670
 
                                                // Initialize the log file header.
1671
 
                                                logHeader.LastRecord = recordID;
1672
 
                                                logHeader.RecordLocation = writePosition;
1673
 
 
1674
 
                                                // Position the file pointer to the right position within the file.
1675
 
                                                fs.Position = 0;
1676
 
                                                fs.Write( logHeader.ToByteArray(), 0, LogFileHeader.RecordSize );
1677
 
                                        }
1678
 
                                        finally
1679
 
                                        {
1680
 
                                                fs.Close();
1681
 
                                        }
1682
 
                                }
1683
 
                                catch( IOException e )
1684
 
                                {
1685
 
                                        log.Error( "Cannot save log file header - {0}", e.Message );
1686
 
                                }
1687
 
                        }
1688
 
                        finally
1689
 
                        {
1690
 
                                mutex.ReleaseMutex( collectionID );
1691
 
                        }
1692
 
                }
1693
 
 
1694
 
                /// <summary>
1695
 
                /// Writes the specified ChangeLogRecord to the ChangeLog file.
1696
 
                /// </summary>
1697
 
                /// <param name="record">ChangeLogRecord to write to file.</param>
1698
 
                private void WriteLog( ChangeLogRecord record )
1699
 
                {
1700
 
                        // Acquire the mutex protecting the log file.
1701
 
                        mutex.WaitOne( collectionID );
1702
 
                        try
1703
 
                        {
1704
 
                                try
1705
 
                                {
1706
 
                                        // Open the log file.
1707
 
                                        FileStream fs = new FileStream( logFilePath, FileMode.Open, FileAccess.ReadWrite );
1708
 
                                        try
1709
 
                                        {
1710
 
                                                // Add the next ID to the record.
1711
 
                                                record.RecordID = recordID;
1712
 
 
1713
 
                                                // Position the file pointer to the right position within the file.
1714
 
                                                fs.Position = writePosition;
1715
 
                                                fs.Write( record.ToByteArray(), 0, record.Length );
1716
 
 
1717
 
                                                // Update the members for the next write operation.
1718
 
                                                ++recordID;
1719
 
                                                writePosition += record.Length;
1720
 
                                                if ( writePosition >= maxWritePosition )
1721
 
                                                {
1722
 
                                                        writePosition = LogFileHeader.RecordSize;
1723
 
                                                }
1724
 
                                        }
1725
 
                                        finally
1726
 
                                        {
1727
 
                                                fs.Close();
1728
 
                                        }
1729
 
                                }
1730
 
                                catch( IOException e )
1731
 
                                {
1732
 
                                        log.Error( "Lost event - Epoch: {0}, ID: {1}, Operation: {2}. Exception {3}", record.Epoch, record.EventID, record.Operation, e.Message );
1733
 
                                }
1734
 
                        }
1735
 
                        finally
1736
 
                        {
1737
 
                                mutex.ReleaseMutex( collectionID );
1738
 
                        }
1739
 
                }
1740
 
                #endregion
1741
 
 
1742
 
                #region IDisposable Members
1743
 
                /// <summary>
1744
 
                /// Allows for quick release of managed and unmanaged resources.
1745
 
                /// Called by applications.
1746
 
                /// </summary>
1747
 
                public void Dispose()
1748
 
                {
1749
 
                        Dispose( true );
1750
 
                        GC.SuppressFinalize( this );
1751
 
 
1752
 
                        // Save the log file header after disposing the object
1753
 
                        // because we don't want to take anymore events after
1754
 
                        // the log file header is saved.
1755
 
                        SetLogFileHeader();
1756
 
                }
1757
 
 
1758
 
                /// <summary>
1759
 
                /// Dispose( bool disposing ) executes in two distinct scenarios.
1760
 
                /// If disposing equals true, the method has been called directly
1761
 
                /// or indirectly by a user's code. Managed and unmanaged resources
1762
 
                /// can be disposed.
1763
 
                /// If disposing equals false, the method has been called by the 
1764
 
                /// runtime from inside the finalizer and you should not reference 
1765
 
                /// other objects. Only unmanaged resources can be disposed.
1766
 
                /// </summary>
1767
 
                /// <param name="disposing">Specifies whether called from the finalizer or from the application.</param>
1768
 
                private void Dispose( bool disposing )
1769
 
                {
1770
 
                        // Check to see if Dispose has already been called.
1771
 
                        if ( !disposed )
1772
 
                        {
1773
 
                                // Protect callers from accessing the freed members.
1774
 
                                disposed = true;
1775
 
 
1776
 
                                // If disposing equals true, dispose all managed and unmanaged resources.
1777
 
                                if ( disposing )
1778
 
                                {
1779
 
                                        // Dispose managed resources.
1780
 
                                        subscriber.Dispose();
1781
 
                                }
1782
 
                        }
1783
 
                }
1784
 
                
1785
 
                /// <summary>
1786
 
                /// Use C# destructor syntax for finalization code.
1787
 
                /// This destructor will run only if the Dispose method does not get called.
1788
 
                /// It gives your base class the opportunity to finalize.
1789
 
                /// Do not provide destructors in types derived from this class.
1790
 
                /// </summary>
1791
 
                ~ChangeLogWriter()      
1792
 
                {
1793
 
                        Dispose( false );
1794
 
                }
1795
 
                #endregion
1796
 
        }
1797
 
 
1798
 
        /// <summary>
1799
 
        /// Object that retrieves specified Collection Store changes from the ChangeLog file.
1800
 
        /// </summary>
1801
 
        public class ChangeLogReader
1802
 
        {
1803
 
                #region Class Members
1804
 
                /// <summary>
1805
 
                /// Used to log messages.
1806
 
                /// </summary>
1807
 
                private static readonly ISimiasLog log = SimiasLogManager.GetLogger( typeof( ChangeLogReader ) );
1808
 
 
1809
 
                /// <summary>
1810
 
                /// Collection that the log file belongs to.
1811
 
                /// </summary>
1812
 
                private string collectionID;
1813
 
        
1814
 
                /// <summary>
1815
 
                /// Inprocess mutex used to control access to the log file.
1816
 
                /// </summary>
1817
 
                private LogMutex mutex;
1818
 
 
1819
 
                /// <summary>
1820
 
                /// Contains absolute path to the logfile.
1821
 
                /// </summary>
1822
 
                private string logFilePath;
1823
 
                #endregion
1824
 
 
1825
 
                #region Constructors
1826
 
                /// <summary>
1827
 
                /// Initializes a new instance of the object class.
1828
 
                /// </summary>
1829
 
                /// <param name="collection">Collection object to listen for events on.</param>
1830
 
                public ChangeLogReader( Collection collection )
1831
 
                {
1832
 
                        collectionID = collection.ID;
1833
 
 
1834
 
                        // Create the mutex that will protect this logfile.
1835
 
                        mutex = new LogMutex( collectionID );
1836
 
 
1837
 
                        // Build the log file path.
1838
 
                        logFilePath = Path.Combine( Path.Combine( Store.StorePath, "changelog" ), collection.ID + ".changelog" );
1839
 
                }
1840
 
                #endregion
1841
 
 
1842
 
                #region Private Methods
1843
 
                /// <summary>
1844
 
                /// Gets the offset of the next event to be read.
1845
 
                /// 
1846
 
                /// NOTE: The entire file must be locked before making this call.
1847
 
                /// </summary>
1848
 
                /// <param name="fs">FileStream object associated with the event log file.</param>
1849
 
                /// <param name="cookie">Event context indicating the next event to be read.</param>
1850
 
                /// <returns>True if the next offset was found, otherwise false is returned.</returns>
1851
 
                private bool GetReadPosition( FileStream fs, EventContext cookie )
1852
 
                {
1853
 
                        bool foundOffset = false;
1854
 
                        int bytesRead = 0;
1855
 
                        byte[] buffer = new byte[ ChangeLogRecord.RecordSize ];
1856
 
 
1857
 
                        // Make sure that there is a valid cookie.
1858
 
                        if ( cookie != null )
1859
 
                        {
1860
 
                                try
1861
 
                                {
1862
 
                                        // Using the hint in the cookie, see if the read position still exists in the file.
1863
 
                                        fs.Position = cookie.Hint;
1864
 
                                        bytesRead = fs.Read( buffer, 0, buffer.Length );
1865
 
                                        if ( bytesRead > 0 )
1866
 
                                        {
1867
 
                                                ChangeLogRecord record = new ChangeLogRecord( buffer );
1868
 
                                                if ( ( record.RecordID == cookie.RecordID ) && ( ( record.Epoch == cookie.TimeStamp ) || ( cookie.TimeStamp == DateTime.MinValue ) ) )
1869
 
                                                {
1870
 
                                                        // Found the record that we were looking for. If the cookie indicates the no data has
1871
 
                                                        // ever been read, then position the file pointer back to the first record so it doesn't
1872
 
                                                        // get skipped. Otherwise, if the record and cookie match exactly, the file pointer is
1873
 
                                                        // already at the right position to read the next record.
1874
 
                                                        if ( cookie.TimeStamp == DateTime.MinValue )
1875
 
                                                        {
1876
 
                                                                // We have yet to read a record, start at the beginning.
1877
 
                                                                fs.Position = LogFileHeader.RecordSize;
1878
 
                                                        }
1879
 
 
1880
 
                                                        foundOffset = true;
1881
 
                                                }
1882
 
                                        }
1883
 
                                        else if ( ( bytesRead == 0 ) && ( cookie.RecordID == 0 ) && ( cookie.TimeStamp == DateTime.MinValue ) )
1884
 
                                        {
1885
 
                                                fs.Position = LogFileHeader.RecordSize;
1886
 
                                                foundOffset = true;
1887
 
                                        }
1888
 
                                }
1889
 
                                catch ( Exception e )
1890
 
                                {
1891
 
                                        log.Error( "GetReadPosition():" + e.Message );
1892
 
                                }
1893
 
                        }
1894
 
 
1895
 
                        return foundOffset;
1896
 
                }
1897
 
                #endregion
1898
 
 
1899
 
                #region Public Methods
1900
 
                /// <summary>
1901
 
                /// Gets an EventContext object that contains the latest event information.
1902
 
                /// </summary>
1903
 
                /// <returns>An EventContext object that is up-to-date with the latest information in the event log.</returns>
1904
 
                public EventContext GetEventContext()
1905
 
                {
1906
 
                        EventContext cookie = null;
1907
 
 
1908
 
                        // Acquire the mutex protecting the log file.
1909
 
                        mutex.WaitOne( collectionID );
1910
 
                        try
1911
 
                        {
1912
 
                                try
1913
 
                                {
1914
 
                                        // Open the log file.
1915
 
                                        FileStream fs = new FileStream( logFilePath, FileMode.Open, FileAccess.Read );
1916
 
                                        try
1917
 
                                        {
1918
 
                                                // Allocate a buffer to hold the records that are read.
1919
 
                                                byte[] buffer = new byte[ ChangeLogRecord.RecordSize * 1000 ];
1920
 
 
1921
 
                                                // Skip over the file header.
1922
 
                                                fs.Position = LogFileHeader.RecordSize;
1923
 
 
1924
 
                                                // Read the first record.
1925
 
                                                int bytesRead = fs.Read( buffer, 0, ChangeLogRecord.RecordSize );
1926
 
                                                if ( bytesRead > 0 )
1927
 
                                                {
1928
 
                                                        // Instanitate the first record to compare.
1929
 
                                                        ChangeLogRecord record1 = new ChangeLogRecord( buffer );
1930
 
                                                        ChangeLogRecord record2 = null;
1931
 
 
1932
 
                                                        // Read the next bunch of records.
1933
 
                                                        bytesRead = fs.Read( buffer, 0, buffer.Length );
1934
 
                                                        while ( bytesRead > 0 )
1935
 
                                                        {
1936
 
                                                                int index = 0;
1937
 
                                                                while ( ( index + ChangeLogRecord.RecordSize ) <= bytesRead )
1938
 
                                                                {
1939
 
                                                                        // Instantiate the next record so the id's can be compared.
1940
 
                                                                        record2 = new ChangeLogRecord( buffer, index );
1941
 
 
1942
 
                                                                        // See if the record id has rolled over.
1943
 
                                                                        if ( record1.RecordID > record2.RecordID )
1944
 
                                                                        {
1945
 
                                                                                // Found the roll over point. Calculate the hint position and create
1946
 
                                                                                // the cookie.
1947
 
                                                                                long hint = ( fs.Position - bytesRead ) + ( index - ChangeLogRecord.RecordSize );
1948
 
                                                                                cookie = new EventContext( record1.Epoch, record1.RecordID, hint );
1949
 
                                                                                bytesRead = 0;
1950
 
                                                                                break;
1951
 
                                                                        }
1952
 
                                                                        else
1953
 
                                                                        {
1954
 
                                                                                // Record id's are still increasing.
1955
 
                                                                                index += ChangeLogRecord.RecordSize;
1956
 
                                                                                record1 = record2;
1957
 
                                                                        }
1958
 
                                                                }
1959
 
 
1960
 
                                                                // If we haven't found the roll over point, keep reading.
1961
 
                                                                if ( bytesRead > 0 )
1962
 
                                                                {
1963
 
                                                                        // Read the next buffer full.
1964
 
                                                                        bytesRead = fs.Read( buffer, 0, buffer.Length );
1965
 
                                                                }
1966
 
                                                        }
1967
 
 
1968
 
                                                        // There is either only one record in the file or the end of the file has been reached
1969
 
                                                        // without detecting a rollover point.
1970
 
                                                        if ( ( record2 == null ) || ( record1 == record2 ) )
1971
 
                                                        {
1972
 
                                                                cookie = new EventContext( record1.Epoch, record1.RecordID, fs.Position - ChangeLogRecord.RecordSize );
1973
 
                                                        }
1974
 
                                                }
1975
 
                                                else
1976
 
                                                {
1977
 
                                                        // There are no records in the file yet. Return to use the first record.
1978
 
                                                        cookie = new EventContext( DateTime.MinValue, 0, fs.Position );
1979
 
                                                }
1980
 
                                        }
1981
 
                                        finally
1982
 
                                        {
1983
 
                                                fs.Close();
1984
 
                                        }
1985
 
                                }
1986
 
                                catch( IOException e )
1987
 
                                {
1988
 
                                        log.Error( e.Message );
1989
 
                                }
1990
 
                        }
1991
 
                        finally
1992
 
                        {
1993
 
                                mutex.ReleaseMutex( collectionID );
1994
 
                        }
1995
 
 
1996
 
                        return cookie;
1997
 
                }
1998
 
 
1999
 
                /// <summary>
2000
 
                /// Gets the events that have been recorded in the ChangeLog from the specified event context.
2001
 
                /// </summary>
2002
 
                /// <param name="cookie">Event context received from call to GetEventContext method.</param>
2003
 
                /// <param name="changeList">Receives a list of ChangeLogRecords that are the changes to the Collection Store.</param>
2004
 
                /// <returns>True if there is more data to get. Otherwise false is returned.</returns>
2005
 
                public bool GetEvents( EventContext cookie, out ArrayList changeList )
2006
 
                {
2007
 
                        long lastRecordOffset;
2008
 
 
2009
 
                        // Initialize the out parameter.
2010
 
                        changeList = new ArrayList();
2011
 
                        byte[] buffer = new byte[ ChangeLogRecord.RecordSize * 100 ];
2012
 
                        int bytesRead = 0;
2013
 
 
2014
 
                        // Acquire the mutex protecting the log file.
2015
 
                        mutex.WaitOne( collectionID );
2016
 
                        try
2017
 
                        {
2018
 
                                try
2019
 
                                {
2020
 
                                        // Open the ChangeLog file.
2021
 
                                        FileStream fs = new FileStream( logFilePath, FileMode.Open, FileAccess.Read );
2022
 
                                        try
2023
 
                                        {
2024
 
                                                // Calculate where changes need to start being added based on the EventContext.
2025
 
                                                if ( !GetReadPosition( fs, cookie ) )
2026
 
                                                {
2027
 
                                                        // The cookie has been pushed out of the log.
2028
 
                                                        throw new CookieExpiredException();
2029
 
                                                }
2030
 
 
2031
 
                                                // Read the data.
2032
 
                                                bytesRead = fs.Read( buffer, 0, buffer.Length );
2033
 
 
2034
 
                                                // Calculate the offset to the last record read.
2035
 
                                                lastRecordOffset = fs.Position - ChangeLogRecord.RecordSize;
2036
 
                                        }
2037
 
                                        finally
2038
 
                                        {
2039
 
                                                fs.Close();
2040
 
                                        }
2041
 
                                }
2042
 
                                catch( Exception e )
2043
 
                                {
2044
 
                                        throw new CookieExpiredException( e );
2045
 
                                }
2046
 
                        }
2047
 
                        finally
2048
 
                        {
2049
 
                                mutex.ReleaseMutex( collectionID );
2050
 
                        }
2051
 
 
2052
 
                        // Make sure that something was read.
2053
 
                        for ( int i = 0; i < bytesRead; i += ChangeLogRecord.RecordSize )
2054
 
                        {
2055
 
                                changeList.Add( new ChangeLogRecord( buffer, i ) );
2056
 
                        }
2057
 
 
2058
 
                        // If there were events to pass back, update the cookie.
2059
 
                        if ( changeList.Count > 0 )
2060
 
                        {
2061
 
                                ChangeLogRecord record = ( ChangeLogRecord )changeList[ changeList.Count - 1 ];
2062
 
                                cookie.TimeStamp = record.Epoch;
2063
 
                                cookie.RecordID = record.RecordID;
2064
 
                                cookie.Hint = lastRecordOffset;
2065
 
                        }
2066
 
 
2067
 
                        // If less data was read that we had buffer for, we have all of the data.
2068
 
                        return ( bytesRead == buffer.Length ) ? true : false;
2069
 
                }
2070
 
                #endregion
2071
 
        }
2072
 
}