~ximion/listaller/master

« back to all changes in this revision

Viewing changes to lib/mtprocs.pas

  • Committer: Matthias Klumpp
  • Date: 2009-12-30 22:33:48 UTC
  • Revision ID: git-v1:9eb5299c9fc4fc3bc980b625e4876139716bb101
Restructured source code

Moved all additional code to /src, created dirs for 3rd-party components,
all libraries go to /lib, daemons to /helper, source code documentation to
/docs, all bindings to /bindings.
Included configure script which needs a little wor now.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
{ Unit for light weight threads.
 
2
 
 
3
  This file is part of the Free Pascal run time library.
 
4
 
 
5
  Copyright (C) 2008 Mattias Gaertner mattias@freepascal.org
 
6
 
 
7
  See the file COPYING.FPC, included in this distribution,
 
8
  for details about the copyright.
 
9
 
 
10
  This program is distributed in the hope that it will be useful,
 
11
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 
13
 
 
14
 **********************************************************************}
 
15
{
 
16
  Abstract:
 
17
    Light weight threads.
 
18
    This unit provides methods to easily run a procedure/method with several
 
19
    threads at once.
 
20
}
 
21
unit MTProcs;
 
22
 
 
23
{$mode objfpc}{$H+}
 
24
 
 
25
{$inline on}
 
26
 
 
27
interface
 
28
 
 
29
uses
 
30
  Classes, SysUtils, MTPCPU;
 
31
 
 
32
type
 
33
  TProcThreadGroup = class;
 
34
  TProcThreadPool = class;
 
35
  TProcThread = class;
 
36
 
 
37
  { TMultiThreadProcItem }
 
38
 
 
39
  TMTPThreadState = (
 
40
    mtptsNone,
 
41
    mtptsActive,
 
42
    mtptsWaitingForIndex,
 
43
    mtptsWaitingFailed,
 
44
    mtptsInactive,
 
45
    mtptsTerminated
 
46
    );
 
47
 
 
48
  TMultiThreadProcItem = class
 
49
  private
 
50
    FGroup: TProcThreadGroup;
 
51
    FIndex: PtrInt;
 
52
    FThread: TProcThread;
 
53
    FWaitingForIndexEnd: PtrInt;
 
54
    FWaitingForIndexStart: PtrInt;
 
55
    fWaitForPool: PRTLEvent;
 
56
    FState: TMTPThreadState;
 
57
  public
 
58
    destructor Destroy; override;
 
59
    function WaitForIndexRange(StartIndex, EndIndex: PtrInt): boolean;
 
60
    function WaitForIndex(Index: PtrInt): boolean; inline;
 
61
    procedure CalcBlock(Index, BlockSize, LoopLength: PtrInt;
 
62
                        out BlockStart, BlockEnd: PtrInt); inline;
 
63
    property Index: PtrInt read FIndex;
 
64
    property Group: TProcThreadGroup read FGroup;
 
65
    property WaitingForIndexStart: PtrInt read FWaitingForIndexStart;
 
66
    property WaitingForIndexEnd: PtrInt read FWaitingForIndexEnd;
 
67
    property Thread: TProcThread read FThread;
 
68
  end;
 
69
 
 
70
  { TProcThread }
 
71
 
 
72
  TMTPThreadList = (
 
73
    mtptlPool,
 
74
    mtptlGroup
 
75
    );
 
76
 
 
77
  TProcThread = class(TThread)
 
78
  private
 
79
    FItem: TMultiThreadProcItem;
 
80
    FNext, FPrev: array[TMTPThreadList] of TProcThread;
 
81
    procedure AddToList(var First: TProcThread; ListType: TMTPThreadList); inline;
 
82
    procedure RemoveFromList(var First: TProcThread; ListType: TMTPThreadList); inline;
 
83
    procedure Terminating(aPool: TProcThreadPool; E: Exception);
 
84
  public
 
85
    constructor Create;
 
86
    destructor Destroy; override;
 
87
    procedure Execute; override;
 
88
    property Item: TMultiThreadProcItem read FItem;
 
89
  end;
 
90
 
 
91
  TMTMethod = procedure(Index: PtrInt; Data: Pointer;
 
92
                        Item: TMultiThreadProcItem) of object;
 
93
  TMTProcedure = procedure(Index: PtrInt; Data: Pointer;
 
94
                           Item: TMultiThreadProcItem);
 
95
 
 
96
  { TProcThreadGroup
 
97
    Each task creates a new group of threads.
 
98
    A group can either need more threads or it has finished and waits for its
 
99
    threads to end.
 
100
    The thread that created the group is not in the list FFirstThread. }
 
101
 
 
102
  TMTPGroupState = (
 
103
    mtpgsNone,
 
104
    mtpgsNeedThreads, // the groups waiting for more threads to help
 
105
    mtpgsFinishing,   // the groups waiting for its threads to finish
 
106
    mtpgsException    // there was an exception => close asap
 
107
    );
 
108
 
 
109
  TProcThreadGroup = class
 
110
  private
 
111
    FEndIndex: PtrInt;
 
112
    FException: Exception;
 
113
    FFirstRunningIndex: PtrInt;
 
114
    FFirstThread: TProcThread;
 
115
    FLastRunningIndex: PtrInt;
 
116
    FMaxThreads: PtrInt;
 
117
    FNext, FPrev: TProcThreadGroup;
 
118
    FPool: TProcThreadPool;
 
119
    FStarterItem: TMultiThreadProcItem;
 
120
    FStartIndex: PtrInt;
 
121
    FState: TMTPGroupState;
 
122
    FTaskData: Pointer;
 
123
    FTaskFrame: Pointer;
 
124
    FTaskMethod: TMTMethod;
 
125
    FTaskProcedure: TMTProcedure;
 
126
    FThreadCount: PtrInt;
 
127
    procedure AddToList(var First: TProcThreadGroup; ListType: TMTPGroupState); inline;
 
128
    procedure RemoveFromList(var First: TProcThreadGroup); inline;
 
129
    function NeedMoreThreads: boolean; inline;
 
130
    procedure AddThread(AThread: TProcThread);
 
131
    procedure RemoveThread(AThread: TProcThread); inline;
 
132
    procedure Run(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem); inline;
 
133
    procedure IndexComplete(Index: PtrInt);
 
134
    procedure WakeThreadsWaitingForIndex;
 
135
    function HasFinishedIndex(aStartIndex, aEndIndex: PtrInt): boolean;
 
136
    procedure EnterExceptionState(E: Exception);
 
137
  public
 
138
    constructor Create;
 
139
    destructor Destroy; override;
 
140
    property Pool: TProcThreadPool read FPool;
 
141
    property StartIndex: PtrInt read FStartIndex;
 
142
    property EndIndex: PtrInt read FEndIndex;
 
143
    property FirstRunningIndex: PtrInt read FFirstRunningIndex; // first started
 
144
    property LastRunningIndex: PtrInt read FLastRunningIndex; // last started
 
145
    property TaskData: Pointer read FTaskData;
 
146
    property TaskMethod: TMTMethod read FTaskMethod;
 
147
    property TaskProcedure: TMTProcedure read FTaskProcedure;
 
148
    property TaskFrame: Pointer read FTaskFrame;
 
149
    property MaxThreads: PtrInt read FMaxThreads;
 
150
    property StarterItem: TMultiThreadProcItem read FStarterItem;
 
151
  end;
 
152
 
 
153
  { TLightWeightThreadPool
 
154
    Group 0 are the inactive threads }
 
155
 
 
156
  { TProcThreadPool }
 
157
 
 
158
  TProcThreadPool = class
 
159
  private
 
160
    FMaxThreadCount: PtrInt;
 
161
    FThreadCount: PtrInt;
 
162
    FFirstInactiveThread: TProcThread;
 
163
    FFirstActiveThread: TProcThread;
 
164
    FFirstTerminatedThread: TProcThread;
 
165
    FFirstGroupNeedThreads: TProcThreadGroup;
 
166
    FFirstGroupFinishing: TProcThreadGroup;
 
167
    FCritSection: TRTLCriticalSection;
 
168
    FDestroying: boolean;
 
169
    Aborted: Boolean;
 
170
    procedure SetMaxThreadCount(const AValue: PtrInt);
 
171
    procedure CleanTerminatedThreads;
 
172
    procedure DoParallelIntern(const AMethod: TMTMethod;
 
173
      const AProc: TMTProcedure; const AFrame: Pointer;
 
174
      StartIndex, EndIndex: PtrInt;
 
175
      Data: Pointer = nil; MaxThreads: PtrInt = 0);
 
176
  public
 
177
    // for debugging only: the critical section is public:
 
178
    procedure EnterPoolCriticalSection; inline;
 
179
    procedure LeavePoolCriticalSection; inline;
 
180
  public
 
181
    constructor Create;
 
182
    destructor Destroy; override;
 
183
 
 
184
    procedure DoParallel(const AMethod: TMTMethod;
 
185
      StartIndex, EndIndex: PtrInt;
 
186
      Data: Pointer = nil; MaxThreads: PtrInt = 0); inline;
 
187
    procedure DoParallel(const AProc: TMTProcedure;
 
188
      StartIndex, EndIndex: PtrInt;
 
189
      Data: Pointer = nil; MaxThreads: PtrInt = 0); inline;
 
190
 
 
191
    // experimental
 
192
    procedure DoParallelLocalProc(const LocalProc: Pointer;
 
193
      StartIndex, EndIndex: PtrInt;
 
194
      Data: Pointer = nil; MaxThreads: PtrInt = 0); // do not make this inline!
 
195
 
 
196
    // utility functions for loops:
 
197
    procedure CalcBlockSize(LoopLength: PtrInt;
 
198
      out BlockCount, BlockSize: PtrInt; MinBlockSize: PtrInt = 0); inline;
 
199
    procedure StopThreads;
 
200
  public
 
201
    property MaxThreadCount: PtrInt read FMaxThreadCount write SetMaxThreadCount;
 
202
    property ThreadCount: PtrInt read FThreadCount;
 
203
  end;
 
204
 
 
205
var
 
206
  ProcThreadPool: TProcThreadPool = nil;
 
207
 
 
208
 
 
209
implementation
 
210
 
 
211
{ TMultiThreadProcItem }
 
212
 
 
213
destructor TMultiThreadProcItem.Destroy;
 
214
begin
 
215
  if fWaitForPool<>nil then begin
 
216
    RTLeventdestroy(fWaitForPool);
 
217
    fWaitForPool:=nil;
 
218
  end;
 
219
  inherited Destroy;
 
220
end;
 
221
 
 
222
function TMultiThreadProcItem.WaitForIndexRange(
 
223
  StartIndex, EndIndex: PtrInt): boolean;
 
224
var
 
225
  aPool: TProcThreadPool;
 
226
begin
 
227
  //WriteLn('TLightWeightThreadItem.WaitForIndexRange START Index='+IntToStr(Index)+' StartIndex='+IntToStr(StartIndex)+' EndIndex='+IntToStr(EndIndex));
 
228
  if (EndIndex>=Index) then exit(false);
 
229
  if EndIndex<StartIndex then exit(true);
 
230
  if Group=nil then exit(true); // a single threaded group has no group object
 
231
  // multi threaded group
 
232
  aPool:=Group.Pool;
 
233
  if aPool.FDestroying then exit(false); // no more wait allowed
 
234
  aPool.EnterPoolCriticalSection;
 
235
  try
 
236
    if Group.FState=mtpgsException then begin
 
237
      //WriteLn('TLightWeightThreadItem.WaitForIndexRange Index='+IntToStr(Index)+', Group closing because of error');
 
238
      exit(false);
 
239
    end;
 
240
    if Group.HasFinishedIndex(StartIndex,EndIndex) then begin
 
241
      //WriteLn('TLightWeightThreadItem.WaitForIndexRange Index='+IntToStr(Index)+', range already finished');
 
242
      exit(true);
 
243
    end;
 
244
    FState:=mtptsWaitingForIndex;
 
245
    FWaitingForIndexStart:=StartIndex;
 
246
    FWaitingForIndexEnd:=EndIndex;
 
247
    if fWaitForPool=nil then
 
248
      fWaitForPool:=RTLEventCreate;
 
249
    RTLeventResetEvent(fWaitForPool);
 
250
  finally
 
251
    aPool.LeavePoolCriticalSection;
 
252
  end;
 
253
  //WriteLn('TLightWeightThreadItem.WaitForIndexRange '+IntToStr(Index)+' waiting ... ');
 
254
  RTLeventWaitFor(fWaitForPool);
 
255
  Result:=FState=mtptsActive;
 
256
  FState:=mtptsActive;
 
257
  //WriteLn('TLightWeightThreadItem.WaitForIndexRange END '+IntToStr(Index));
 
258
end;
 
259
 
 
260
function TMultiThreadProcItem.WaitForIndex(Index: PtrInt): boolean; inline;
 
261
begin
 
262
  Result:=WaitForIndexRange(Index,Index);
 
263
end;
 
264
 
 
265
procedure TMultiThreadProcItem.CalcBlock(Index, BlockSize, LoopLength: PtrInt;
 
266
  out BlockStart, BlockEnd: PtrInt);
 
267
begin
 
268
  BlockStart:=BlockSize*Index;
 
269
  BlockEnd:=BlockStart+BlockSize;
 
270
  if LoopLength<BlockEnd then BlockEnd:=LoopLength;
 
271
  dec(BlockEnd);
 
272
end;
 
273
 
 
274
{ TProcThread }
 
275
 
 
276
procedure TProcThread.AddToList(var First: TProcThread;
 
277
  ListType: TMTPThreadList);
 
278
begin
 
279
  FNext[ListType]:=First;
 
280
  if FNext[ListType]<>nil then
 
281
    FNext[ListType].FPrev[ListType]:=Self;
 
282
  First:=Self;
 
283
end;
 
284
 
 
285
procedure TProcThread.RemoveFromList(var First: TProcThread;
 
286
  ListType: TMTPThreadList);
 
287
begin
 
288
  if First=Self then
 
289
    First:=FNext[ListType];
 
290
  if FNext[ListType]<>nil then
 
291
    FNext[ListType].FPrev[ListType]:=FPrev[ListType];
 
292
  if FPrev[ListType]<>nil then
 
293
    FPrev[ListType].FNext[ListType]:=FNext[ListType];
 
294
  FNext[ListType]:=nil;
 
295
  FPrev[ListType]:=nil;
 
296
end;
 
297
 
 
298
procedure TProcThread.Terminating(aPool: TProcThreadPool;
 
299
  E: Exception);
 
300
begin
 
301
  aPool.EnterPoolCriticalSection;
 
302
  try
 
303
    // remove from group
 
304
    if Item.FGroup<>nil then begin
 
305
      // an exception occured
 
306
      Item.FGroup.EnterExceptionState(E);
 
307
      Item.FGroup.RemoveThread(Self);
 
308
      Item.FGroup:=nil;
 
309
    end;
 
310
    // move to pool's terminated threads
 
311
    case Item.FState of
 
312
    mtptsActive:   RemoveFromList(aPool.FFirstActiveThread,mtptlPool);
 
313
    mtptsInactive: RemoveFromList(aPool.FFirstInactiveThread,mtptlPool);
 
314
    end;
 
315
    AddToList(aPool.FFirstTerminatedThread,mtptlPool);
 
316
    Item.FState:=mtptsTerminated;
 
317
  finally
 
318
    aPool.LeavePoolCriticalSection;
 
319
  end;
 
320
end;
 
321
 
 
322
constructor TProcThread.Create;
 
323
begin
 
324
  inherited Create(true);
 
325
  fItem:=TMultiThreadProcItem.Create;
 
326
  fItem.fWaitForPool:=RTLEventCreate;
 
327
  fItem.FThread:=Self;
 
328
end;
 
329
 
 
330
destructor TProcThread.Destroy;
 
331
begin
 
332
  FreeAndNil(FItem);
 
333
  inherited Destroy;
 
334
end;
 
335
 
 
336
procedure TProcThread.Execute;
 
337
var
 
338
  aPool: TProcThreadPool;
 
339
  Group: TProcThreadGroup;
 
340
  ok: Boolean;
 
341
  E: Exception;
 
342
begin
 
343
  aPool:=Item.Group.Pool;
 
344
  ok:=false;
 
345
  try
 
346
    repeat
 
347
      // work
 
348
      Group:=Item.Group;
 
349
      Group.Run(Item.Index,Group.TaskData,Item);
 
350
 
 
351
      aPool.EnterPoolCriticalSection;
 
352
      try
 
353
        Group.IndexComplete(Item.Index);
 
354
 
 
355
        // find next work
 
356
        if Group.LastRunningIndex<Group.EndIndex then begin
 
357
          // next index of group
 
358
          inc(Group.FLastRunningIndex);
 
359
          Item.FIndex:=Group.FLastRunningIndex;
 
360
        end else begin
 
361
          // remove from group
 
362
          RemoveFromList(Group.FFirstThread,mtptlGroup);
 
363
          dec(Group.FThreadCount);
 
364
          Item.FGroup:=nil;
 
365
          Group:=nil;
 
366
          if aPool.FFirstGroupNeedThreads<>nil then begin
 
367
            // add to new group
 
368
            aPool.FFirstGroupNeedThreads.AddThread(Self);
 
369
            Group:=Item.Group;
 
370
          end else begin
 
371
            // mark inactive
 
372
            RemoveFromList(aPool.FFirstActiveThread,mtptlPool);
 
373
            AddToList(aPool.FFirstInactiveThread,mtptlPool);
 
374
            Item.FState:=mtptsInactive;
 
375
            RTLeventResetEvent(Item.fWaitForPool);
 
376
          end;
 
377
        end;
 
378
      finally
 
379
        aPool.LeavePoolCriticalSection;
 
380
      end;
 
381
      // wait for new work
 
382
      if Item.FState=mtptsInactive then
 
383
        RTLeventWaitFor(Item.fWaitForPool);
 
384
    until Item.Group=nil;
 
385
    ok:=true;
 
386
  except
 
387
    // stop the exception and store it
 
388
    E:=Exception(AcquireExceptionObject);
 
389
    Terminating(aPool,E);
 
390
  end;
 
391
  if ok then
 
392
    Terminating(aPool,nil);
 
393
end;
 
394
 
 
395
{ TProcThreadGroup }
 
396
 
 
397
procedure TProcThreadGroup.AddToList(var First: TProcThreadGroup;
 
398
  ListType: TMTPGroupState);
 
399
begin
 
400
  FNext:=First;
 
401
  if FNext<>nil then
 
402
    FNext.FPrev:=Self;
 
403
  First:=Self;
 
404
  FState:=ListType;
 
405
end;
 
406
 
 
407
procedure TProcThreadGroup.RemoveFromList(
 
408
  var First: TProcThreadGroup);
 
409
begin
 
410
  if First=Self then
 
411
    First:=FNext;
 
412
  if FNext<>nil then
 
413
    FNext.FPrev:=FPrev;
 
414
  if FPrev<>nil then
 
415
    FPrev.FNext:=FNext;
 
416
  FNext:=nil;
 
417
  FPrev:=nil;
 
418
  FState:=mtpgsNone;
 
419
end;
 
420
 
 
421
function TProcThreadGroup.NeedMoreThreads: boolean;
 
422
begin
 
423
  Result:=(FLastRunningIndex<FEndIndex) and (FThreadCount<FMaxThreads)
 
424
      and (FState<>mtpgsException);
 
425
end;
 
426
 
 
427
procedure TProcThreadGroup.AddThread(AThread: TProcThread);
 
428
begin
 
429
  AThread.Item.FGroup:=Self;
 
430
  AThread.AddToList(FFirstThread,mtptlGroup);
 
431
  inc(FThreadCount);
 
432
  inc(FLastRunningIndex);
 
433
  AThread.Item.FIndex:=FLastRunningIndex;
 
434
  if not NeedMoreThreads then begin
 
435
    RemoveFromList(Pool.FFirstGroupNeedThreads);
 
436
    AddToList(Pool.FFirstGroupFinishing,mtpgsFinishing);
 
437
  end;
 
438
end;
 
439
 
 
440
procedure TProcThreadGroup.RemoveThread(AThread: TProcThread);
 
441
begin
 
442
  AThread.RemoveFromList(FFirstThread,mtptlGroup);
 
443
  dec(FThreadCount);
 
444
end;
 
445
 
 
446
procedure TProcThreadGroup.Run(Index: PtrInt; Data: Pointer;
 
447
  Item: TMultiThreadProcItem); inline;
 
448
begin
 
449
  if Assigned(FTaskFrame) then begin
 
450
    CallLocalProc(FTaskProcedure,FTaskFrame,Index,Data,Item)
 
451
  end else begin
 
452
    if Assigned(FTaskProcedure) then
 
453
      FTaskProcedure(Index,Data,Item)
 
454
    else
 
455
      FTaskMethod(Index,Data,Item)
 
456
  end;
 
457
end;
 
458
 
 
459
procedure TProcThreadGroup.IndexComplete(Index: PtrInt);
 
460
var
 
461
  AThread: TProcThread;
 
462
  NewFirstRunningThread: PtrInt;
 
463
begin
 
464
  // update FirstRunningIndex
 
465
  NewFirstRunningThread:=FStarterItem.Index;
 
466
  AThread:=FFirstThread;
 
467
  while AThread<>nil do begin
 
468
    if (NewFirstRunningThread>aThread.Item.Index)
 
469
    and (aThread.Item.Index<>Index) then
 
470
      NewFirstRunningThread:=aThread.Item.Index;
 
471
    aThread:=aThread.FNext[mtptlGroup];
 
472
  end;
 
473
  FFirstRunningIndex:=NewFirstRunningThread;
 
474
  // wake up threads (Note: do this even if FFirstRunningIndex has not changed)
 
475
  WakeThreadsWaitingForIndex;
 
476
end;
 
477
 
 
478
procedure TProcThreadGroup.WakeThreadsWaitingForIndex;
 
479
var
 
480
  aThread: TProcThread;
 
481
begin
 
482
  if FState<>mtpgsException then begin
 
483
    // wake up waiting threads
 
484
    aThread:=FFirstThread;
 
485
    while aThread<>nil do begin
 
486
      if (aThread.Item.FState=mtptsWaitingForIndex)
 
487
      and HasFinishedIndex(aThread.Item.WaitingForIndexStart,
 
488
                           aThread.Item.WaitingForIndexEnd)
 
489
      then begin
 
490
        // wake up the thread
 
491
        aThread.Item.FState:=mtptsActive;
 
492
        RTLeventSetEvent(aThread.Item.fWaitForPool);
 
493
      end;
 
494
      aThread:=aThread.FNext[mtptlGroup];
 
495
    end;
 
496
    if (FStarterItem.FState=mtptsWaitingForIndex)
 
497
    and HasFinishedIndex(FStarterItem.WaitingForIndexStart,FStarterItem.WaitingForIndexEnd)
 
498
    then begin
 
499
      // wake up the starter thread of this group
 
500
      FStarterItem.FState:=mtptsActive;
 
501
      RTLeventSetEvent(FStarterItem.fWaitForPool);
 
502
    end;
 
503
  end else begin
 
504
    // end group: wake up waiting threads
 
505
    aThread:=FFirstThread;
 
506
    while aThread<>nil do begin
 
507
      if (aThread.Item.FState=mtptsWaitingForIndex)
 
508
      then begin
 
509
        // end group: wake up the thread
 
510
        aThread.Item.FState:=mtptsWaitingFailed;
 
511
        RTLeventSetEvent(aThread.Item.fWaitForPool);
 
512
      end;
 
513
      aThread:=aThread.FNext[mtptlGroup];
 
514
    end;
 
515
    if (FStarterItem.FState=mtptsWaitingForIndex)
 
516
    then begin
 
517
      // end group: wake up the starter thread of this group
 
518
      FStarterItem.FState:=mtptsWaitingFailed;
 
519
      RTLeventSetEvent(FStarterItem.fWaitForPool);
 
520
    end;
 
521
  end;
 
522
end;
 
523
 
 
524
function TProcThreadGroup.HasFinishedIndex(
 
525
  aStartIndex, aEndIndex: PtrInt): boolean;
 
526
var
 
527
  AThread: TProcThread;
 
528
begin
 
529
  // test the finished range
 
530
  if FFirstRunningIndex>aEndIndex then exit(true);
 
531
  // test the unfinished range
 
532
  if FLastRunningIndex<aEndIndex then exit(false);
 
533
  // test the active range
 
534
  AThread:=FFirstThread;
 
535
  while AThread<>nil do begin
 
536
    if (AThread.Item.Index>=aStartIndex)
 
537
    and (AThread.Item.Index<=aEndIndex) then
 
538
      exit(false);
 
539
    AThread:=AThread.FNext[mtptlGroup];
 
540
  end;
 
541
  if (FStarterItem.Index>=aStartIndex)
 
542
  and (FStarterItem.Index<=aEndIndex) then
 
543
    exit(false);
 
544
  Result:=true;
 
545
end;
 
546
 
 
547
procedure TProcThreadGroup.EnterExceptionState(E: Exception);
 
548
begin
 
549
  if FState=mtpgsException then exit;
 
550
  case FState of
 
551
  mtpgsFinishing: RemoveFromList(Pool.FFirstGroupFinishing);
 
552
  mtpgsNeedThreads: RemoveFromList(Pool.FFirstGroupNeedThreads);
 
553
  end;
 
554
  FState:=mtpgsException;
 
555
  FException:=E;
 
556
  WakeThreadsWaitingForIndex;
 
557
end;
 
558
 
 
559
constructor TProcThreadGroup.Create;
 
560
begin
 
561
  FStarterItem:=TMultiThreadProcItem.Create;
 
562
  FStarterItem.FGroup:=Self;
 
563
end;
 
564
 
 
565
destructor TProcThreadGroup.Destroy;
 
566
begin
 
567
  FreeAndNil(FStarterItem);
 
568
  inherited Destroy;
 
569
end;
 
570
 
 
571
{ TProcThreadPool }
 
572
 
 
573
procedure TProcThreadPool.SetMaxThreadCount(const AValue: PtrInt);
 
574
begin
 
575
  if FMaxThreadCount=AValue then exit;
 
576
  if AValue<1 then raise Exception.Create('TLightWeightThreadPool.SetMaxThreadCount');
 
577
  FMaxThreadCount:=AValue;
 
578
end;
 
579
 
 
580
procedure TProcThreadPool.CleanTerminatedThreads;
 
581
var
 
582
  AThread: TProcThread;
 
583
begin
 
584
  while FFirstTerminatedThread<>nil do begin
 
585
    AThread:=FFirstTerminatedThread;
 
586
    AThread.RemoveFromList(FFirstTerminatedThread,mtptlPool);
 
587
    AThread.Free;
 
588
  end;
 
589
end;
 
590
 
 
591
constructor TProcThreadPool.Create;
 
592
begin
 
593
  FMaxThreadCount:=GetSystemThreadCount;
 
594
  if FMaxThreadCount<1 then
 
595
    FMaxThreadCount:=1;
 
596
  InitCriticalSection(FCritSection);
 
597
end;
 
598
 
 
599
destructor TProcThreadPool.Destroy;
 
600
 
 
601
  procedure WakeWaitingStarterItems(Group: TProcThreadGroup);
 
602
  begin
 
603
    while Group<>nil do begin
 
604
      if Group.StarterItem.FState=mtptsWaitingForIndex then begin
 
605
        Group.StarterItem.FState:=mtptsWaitingFailed;
 
606
        RTLeventSetEvent(Group.StarterItem.fWaitForPool);
 
607
      end;
 
608
      Group:=Group.FNext;
 
609
    end;
 
610
  end;
 
611
 
 
612
var
 
613
  AThread: TProcThread;
 
614
begin
 
615
  FDestroying:=true;
 
616
  // wake up all waiting threads
 
617
  EnterPoolCriticalSection;
 
618
  try
 
619
    AThread:=FFirstActiveThread;
 
620
    while AThread<>nil do begin
 
621
      if aThread.Item.FState=mtptsWaitingForIndex then begin
 
622
        aThread.Item.FState:=mtptsWaitingFailed;
 
623
        RTLeventSetEvent(AThread.Item.fWaitForPool);
 
624
      end;
 
625
      AThread:=AThread.FNext[mtptlPool];
 
626
    end;
 
627
    WakeWaitingStarterItems(FFirstGroupNeedThreads);
 
628
    WakeWaitingStarterItems(FFirstGroupFinishing);
 
629
  finally
 
630
    LeavePoolCriticalSection;
 
631
  end;
 
632
 
 
633
  // wait for all active threads to become inactive
 
634
  while FFirstActiveThread<>nil do
 
635
    Sleep(10);
 
636
 
 
637
  // wake up all inactive threads (without new work they will terminate)
 
638
  EnterPoolCriticalSection;
 
639
  try
 
640
    AThread:=FFirstInactiveThread;
 
641
    while AThread<>nil do begin
 
642
      RTLeventSetEvent(AThread.Item.fWaitForPool);
 
643
      AThread:=AThread.FNext[mtptlPool];
 
644
    end;
 
645
  finally
 
646
    LeavePoolCriticalSection;
 
647
  end;
 
648
 
 
649
  // wait for all threads to terminate
 
650
  while FFirstInactiveThread<>nil do
 
651
    Sleep(10);
 
652
 
 
653
  // free threads
 
654
  CleanTerminatedThreads;
 
655
 
 
656
  DoneCriticalsection(FCritSection);
 
657
  inherited Destroy;
 
658
end;
 
659
 
 
660
procedure TProcThreadPool.EnterPoolCriticalSection;
 
661
begin
 
662
  EnterCriticalsection(FCritSection);
 
663
end;
 
664
 
 
665
procedure TProcThreadPool.LeavePoolCriticalSection;
 
666
begin
 
667
  LeaveCriticalsection(FCritSection);
 
668
end;
 
669
 
 
670
procedure TProcThreadPool.DoParallel(const AMethod: TMTMethod;
 
671
  StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
 
672
begin
 
673
  if not Assigned(AMethod) then exit;
 
674
  DoParallelIntern(AMethod,nil,nil,StartIndex,EndIndex,Data,MaxThreads);
 
675
end;
 
676
 
 
677
procedure TProcThreadPool.DoParallel(const AProc: TMTProcedure;
 
678
  StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
 
679
begin
 
680
  if not Assigned(AProc) then exit;
 
681
  DoParallelIntern(nil,AProc,nil,StartIndex,EndIndex,Data,MaxThreads);
 
682
end;
 
683
 
 
684
procedure TProcThreadPool.StopThreads;
 
685
begin
 
686
 Aborted:=true;
 
687
end;
 
688
 
 
689
procedure TProcThreadPool.DoParallelLocalProc(const LocalProc: Pointer;
 
690
  StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
 
691
var
 
692
  Frame: Pointer;
 
693
begin
 
694
  if not Assigned(LocalProc) then exit;
 
695
  Frame:=get_caller_frame(get_frame);
 
696
  DoParallelIntern(nil,TMTProcedure(LocalProc),Frame,StartIndex,EndIndex,
 
697
                   Data,MaxThreads);
 
698
end;
 
699
 
 
700
procedure TProcThreadPool.CalcBlockSize(LoopLength: PtrInt; out BlockCount,
 
701
  BlockSize: PtrInt; MinBlockSize: PtrInt);
 
702
begin
 
703
  if LoopLength<=0 then begin
 
704
    BlockCount:=0;
 
705
    BlockSize:=1;
 
706
    exit;
 
707
  end;
 
708
  // split work into equally sized blocks
 
709
  BlockCount:=ProcThreadPool.MaxThreadCount;
 
710
  BlockSize:=(LoopLength div BlockCount);
 
711
  if (BlockSize<MinBlockSize) then BlockSize:=MinBlockSize;
 
712
  BlockCount:=((LoopLength-1) div BlockSize)+1;
 
713
end;
 
714
 
 
715
procedure TProcThreadPool.DoParallelIntern(const AMethod: TMTMethod;
 
716
  const AProc: TMTProcedure; const AFrame: Pointer;
 
717
  StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
 
718
var
 
719
  Group: TProcThreadGroup;
 
720
  Index: PtrInt;
 
721
  AThread: TProcThread;
 
722
  NewThread: Boolean;
 
723
  Item: TMultiThreadProcItem;
 
724
  HelperThreadException: Exception;
 
725
begin
 
726
  if (StartIndex>EndIndex) then exit; // nothing to do
 
727
  if FDestroying then raise Exception.Create('Pool destroyed');
 
728
 
 
729
  if (MaxThreads>MaxThreadCount) or (MaxThreads<=0) then
 
730
    MaxThreads:=MaxThreadCount;
 
731
  if (StartIndex=EndIndex) or (MaxThreads<=1) then begin
 
732
    // single threaded
 
733
    Item:=TMultiThreadProcItem.Create;
 
734
    try
 
735
      for Index:=StartIndex to EndIndex do begin
 
736
        Item.FIndex:=Index;
 
737
        if Assigned(AFrame) then begin
 
738
          CallLocalProc(AProc,AFrame,Index,Data,Item)
 
739
        end else begin
 
740
          if Assigned(AProc) then
 
741
            AProc(Index,Data,Item)
 
742
          else
 
743
            AMethod(Index,Data,Item)
 
744
        end;
 
745
      end;
 
746
    finally
 
747
      Item.Free;
 
748
    end;
 
749
    exit;
 
750
  end;
 
751
 
 
752
  // create a new group
 
753
  Group:=TProcThreadGroup.Create;
 
754
  Group.FPool:=Self;
 
755
  Group.FTaskData:=Data;
 
756
  Group.FTaskMethod:=AMethod;
 
757
  Group.FTaskProcedure:=AProc;
 
758
  Group.FTaskFrame:=AFrame;
 
759
  Group.FStartIndex:=StartIndex;
 
760
  Group.FEndIndex:=EndIndex;
 
761
  Group.FFirstRunningIndex:=StartIndex;
 
762
  Group.FLastRunningIndex:=StartIndex;
 
763
  Group.FMaxThreads:=MaxThreads;
 
764
  Group.FThreadCount:=1;
 
765
  Group.FStarterItem.FState:=mtptsActive;
 
766
  Group.FStarterItem.FIndex:=StartIndex;
 
767
  HelperThreadException:=nil;
 
768
  try
 
769
    // start threads
 
770
    EnterPoolCriticalSection;
 
771
    try
 
772
      Group.AddToList(FFirstGroupNeedThreads,mtpgsNeedThreads);
 
773
      while Group.NeedMoreThreads do begin
 
774
        AThread:=FFirstInactiveThread;
 
775
        NewThread:=false;
 
776
        if AThread<>nil then begin
 
777
          AThread.RemoveFromList(FFirstInactiveThread,mtptlPool);
 
778
        end else if FThreadCount<FMaxThreadCount then begin
 
779
          AThread:=TProcThread.Create;
 
780
          if Assigned(AThread.FatalException) then
 
781
            raise AThread.FatalException;
 
782
          NewThread:=true;
 
783
          inc(FThreadCount);
 
784
        end else begin
 
785
          break;
 
786
        end;
 
787
        // add to Group
 
788
        Group.AddThread(AThread);
 
789
        // start thread
 
790
        AThread.AddToList(FFirstActiveThread,mtptlPool);
 
791
        AThread.Item.FState:=mtptsActive;
 
792
        if NewThread then
 
793
          AThread.Resume
 
794
        else
 
795
          RTLeventSetEvent(AThread.Item.fWaitForPool);
 
796
      end;
 
797
    finally
 
798
      LeavePoolCriticalSection;
 
799
    end;
 
800
 
 
801
    // run until no more Index left
 
802
    Index:=StartIndex;
 
803
    repeat
 
804
      Group.FStarterItem.FIndex:=Index;
 
805
      Group.Run(Index,Data,Group.FStarterItem);
 
806
 
 
807
      EnterPoolCriticalSection;
 
808
      try
 
809
        Group.IndexComplete(Index);
 
810
        if (Group.FLastRunningIndex<Group.EndIndex) and (Group.FState<>mtpgsException)
 
811
        then begin
 
812
          inc(Group.FLastRunningIndex);
 
813
          Index:=Group.FLastRunningIndex;
 
814
        end else begin
 
815
          Index:=StartIndex;
 
816
        end;
 
817
      finally
 
818
        LeavePoolCriticalSection;
 
819
      end;
 
820
    until (Index=StartIndex)or(Aborted);
 
821
  finally
 
822
    // wait for Group to finish
 
823
    if Group.FFirstThread<>nil then begin
 
824
      EnterPoolCriticalSection;
 
825
      try
 
826
        Group.FStarterItem.FState:=mtptsInactive;
 
827
        Group.FStarterItem.fIndex:=EndIndex;// needed for Group.HasFinishedIndex
 
828
        // wake threads waiting for starter thread to finish
 
829
        if Group.FStarterItem.FState<>mtptsInactive then
 
830
          Group.EnterExceptionState(nil)
 
831
        else
 
832
          Group.WakeThreadsWaitingForIndex;
 
833
      finally
 
834
        LeavePoolCriticalSection;
 
835
      end;
 
836
      // waiting with exponential spin lock
 
837
      Index:=0;
 
838
      while Group.FFirstThread<>nil do begin
 
839
        sleep(Index);
 
840
        Index:=Index*2+1;
 
841
        if Index>30 then Index:=30;
 
842
      end;
 
843
    end;
 
844
    // remove group from pool
 
845
    EnterPoolCriticalSection;
 
846
    try
 
847
      case Group.FState of
 
848
      mtpgsNeedThreads: Group.RemoveFromList(FFirstGroupNeedThreads);
 
849
      mtpgsFinishing: Group.RemoveFromList(FFirstGroupFinishing);
 
850
      end;
 
851
    finally
 
852
      LeavePoolCriticalSection;
 
853
    end;
 
854
    HelperThreadException:=Group.FException;
 
855
    Group.Free;
 
856
    // free terminated threads (terminated, because of exceptions)
 
857
    CleanTerminatedThreads;
 
858
  end;
 
859
  // if the exception occured in a helper thread raise it now
 
860
  if HelperThreadException<>nil then
 
861
    raise HelperThreadException;
 
862
end;
 
863
 
 
864
initialization
 
865
  ProcThreadPool:=TProcThreadPool.Create;
 
866
 
 
867
finalization
 
868
  ProcThreadPool.Free;
 
869
  ProcThreadPool:=nil;
 
870
 
 
871
end.
 
872