小赖子的英国生活和资讯

DELPHI 2007 下的 PARALLEL FOR 实现

阅读 桌面完整版

Delphi XE7 之后 语法就加了 Parallel.For 用于多线程编程. 有一个第三方开源的库 OmniThreadLibrary (OTL) 也可以用 但是在 D2007 下由于没有 匿名函数和通用模板 一些OTL的高级语法就都不能用了. The AsyncCall 也是第三方开源的 库 支持 D2006到 XE2 但是也没有 Parallel.For 语法.

下面就简单在 D2007 下实现了 多线程 并行 FOR 语法. 虽然无法向 XE7之后 或者是OTL的语法那么的舒服 但是简单用一下还是挺方便的. PARALLEL FOR 能使多线程的代码变得相对简单直接一些.

首先 我们需要定义 传递参数 的类 用于保存需要用的的数据.

type
  TParallelParam = class
    public
      Data: Pointer;
      First: integer;
      Last: integer;

      constructor Create; overload;
      constructor Create(_Data: Pointer; _First, _Last: integer); overload;
      procedure SetValues(_First, _Last: integer); overload;
      procedure SetValues(_Data: Pointer; _First, _Last: integer); overload;
      destructor Destroy; override;
  end;

constructor TParallelParam.Create;
begin
  inherited Create;
end;

constructor TParallelParam.Create(_Data: Pointer; _First, _Last: integer);
begin
  inherited Create;
  Self.SetValues(_Data, _First, _Last);
end;

procedure TParallelParam.SetValues(_First, _Last: Integer);
begin
  Self.First := _First;
  Self.Last := _Last;
end;

procedure TParallelParam.SetValues(_Data: Pointer; _First, _Last: Integer);
begin
  Self.Data := _Data;
  Self.First := _First;
  Self.Last := _Last;
end;

destructor TParallelParam.Destroy;
begin
  inherited;
end;

TParallelParam 这个类就是数据的包装. 然后我们可以定义 并行调用过程的 声名.

type
  TParallelForFunc = procedure(Index: integer; const Param: TParallelParam);

对于每个并发的线程来说 index 索引值是不一样的. 所有需要用到的数据都可以 通过 指针(比如指向一个结构体) 来传递.

然后 我们封装一下 TThread 来传递这些数据.

type
  TWorkerThread = class(TThread)
    public
      _Method: TParallelForFunc;
      _Param: TParallelParam;
      _Index: integer;
      Done: boolean;

      constructor Create(CreateSuspended: Boolean); overload;
      destructor Destroy; override;
    protected
      procedure Execute; override;
  end;

procedure TWorkerThread.Execute;
begin
  try
    Self._Method(_Index, Self._Param);
  finally
    Self.Terminate;
    Self.Done := True; // 已经完成
  end;
end;

constructor TWorkerThread.Create(CreateSuspended: Boolean);
begin
  inherited Create(CreateSuspended);
  Self.Done := False;
end;

destructor TWorkerThread.Destroy;
begin
  inherited Destroy;
end;

有了这些定义 我们就可以实现这个并发的FOR语法 第一个参数就是并发的过程.

procedure TParallelFor(Method: TParallelForFunc; const Param: TParallelParam; First, Last: integer; ThreadNumber: integer);
var
  flags: array of boolean;
  threads: array of TWorkerThread;
  tnum, len, i, cnt, j: integer;
begin
  len := Last - First + 1;
  if (len <= 0) then
  begin
    Raise Exception.Create('TParallelFor.Len = 0');
  end;
  SetLength(flags, len);
  tnum := ThreadNumber;
  if (tnum < 2) then
  begin
    tnum := 2;
  end;
  SetLength(threads, tnum);
  i := 0;
  cnt := 0; // 完成的数目
  while (cnt < len) do  // 还没完成
  begin
    // 寻找下一个可用的线程
    for j := 0 to tnum - 1 do
    begin
      // 线程还没被创建 或者 已经完成
      if ((threads[j] = nil) or (threads[j].Done) or (threads[j].Terminated)) then
      begin
        // 获得下一个任务
        while (flags[i]) do
        begin                              
          i := (i + 1) mod len;
        end;
        // 标记为已经有线程在做了
        flags[i] := True;
        // 增加任务完成 +1
        Inc(cnt);
        // 开始
        if (threads[j] <> nil) then
        begin
          threads[j].Free;
          threads[j] := nil;
        end;
        if (threads[j] = nil) then
        begin
          threads[j] := TWorkerThread.Create(True);
          threads[j].FreeOnTerminate := False;
        end;
        with threads[j] do
        begin
          _Method := Method;
          _Param := Param;
          _Index := First + i;
          Priority := tpHigher;
          Resume; // 并发
        end;
        if (cnt >= len) then
        begin
          break;
        end;     
      end;
    end;
    Sleep(0); // 主线程 不停的等啊等
  end;
  // 剩余的线程需要完成 
  for i := 0 to tnum - 1 do
  begin
    if (threads[i] <> nil) then
    begin
      WaitForSingleObject(threads[i].Handle, INFINITE);
      threads[i].Free;
    end;
  end;
end;

使用用例

把上面的代码 放在一个独立 的单元中 然后这样使用

uses
  Parallel;

implementation

type
  _PAAB = ^_AAB;

procedure Parallel1(Arg: integer; const obj: TParallelParam);
var
  j, i, k: integer;
  map: _PAAB;
begin
  map := obj.Data;
  i := obj.First;
  j := obj.Last;
  if (i <= j) then
  begin
    for k := i to j do
    begin
      if (map^[Arg][k] = 0) then
      begin
        map^[Arg][k] := 2;
      end
      else
      begin
        break;
      end;
    end;
  end
  else
  begin
    for k := i downto j do
    begin
      if (map^[Arg][k] = 0) then
      begin
        map^[Arg][k] := 2;
      end
      else
      begin
        break;
      end;
    end;
  end;
end;

procedure TestParallel;
var
  map: _AAB;
begin
  SetLength(map, 1000, 1000);
  obj := TParallelParam.Create;
  try
    obj.Data := @map;
    obj.SetValues(0, High(map));
    TParallelFor(@Parallel1, obj, 0, High(map[0]), 8); // 8 线程
  finally
    obj.Free;
  end;
end;

end.

局限性

本来可以使用 AsyncCalls 库则可以相当简化线程的调用 AsyncCall(@fun, [parameters]) 但是 AsyncCall 使用了一个全局的线程池来管理线程 当在COM+DLL库中 当 单元被移除的时候 会发生死锁 dead lock.

相关英文技术贴

1. Delphi AsyncCalls Deadlock when Terminating the Threads (com+ dll finalization)
2. OTL can’t be compiled under D2007
3. Is it ok not to free objects before unit unloads from memory (finalization section)?
4. The Ultimate Delphi Threading Library

英文原贴: DELPHI 2007 下的 PARALLEL FOR 实现

强烈推荐

微信公众号: 小赖子的英国生活和资讯 JustYYUK

阅读 桌面完整版
Exit mobile version