Delphi XE7 之后 语法就加了 Parallel.For 用于多线程编程. 有一个第三方开源的库 OmniThreadLibrary (OTL) 也可以用 但是在 D2007 下由于没有 匿名函数和通用模板 一些OTL的高级语法就都不能用了. The AsyncCall 也是第三方开源的 库 支持 D2006到 XE2 但是也没有 Parallel.For 语法.
下面就简单在 D2007 下实现了 多线程 并行 FOR 语法. 虽然无法向 XE7之后 或者是OTL的语法那么的舒服 但是简单用一下还是挺方便的. PARALLEL FOR 能使多线程的代码变得相对简单直接一些.
首先 我们需要定义 传递参数 的类 用于保存需要用的的数据.
type TParallelParam = class public Data: Pointer; First: integer; Last: integer; constructor Create; overload; constructor Create(_Data: Pointer; _First, _Last: integer); overload; procedure SetValues(_First, _Last: integer); overload; procedure SetValues(_Data: Pointer; _First, _Last: integer); overload; destructor Destroy; override; end; constructor TParallelParam.Create; begin inherited Create; end; constructor TParallelParam.Create(_Data: Pointer; _First, _Last: integer); begin inherited Create; Self.SetValues(_Data, _First, _Last); end; procedure TParallelParam.SetValues(_First, _Last: Integer); begin Self.First := _First; Self.Last := _Last; end; procedure TParallelParam.SetValues(_Data: Pointer; _First, _Last: Integer); begin Self.Data := _Data; Self.First := _First; Self.Last := _Last; end; destructor TParallelParam.Destroy; begin inherited; end;
TParallelParam 这个类就是数据的包装. 然后我们可以定义 并行调用过程的 声名.
type TParallelForFunc = procedure(Index: integer; const Param: TParallelParam);
对于每个并发的线程来说 index 索引值是不一样的. 所有需要用到的数据都可以 通过 指针(比如指向一个结构体) 来传递.
然后 我们封装一下 TThread 来传递这些数据.
type TWorkerThread = class(TThread) public _Method: TParallelForFunc; _Param: TParallelParam; _Index: integer; Done: boolean; constructor Create(CreateSuspended: Boolean); overload; destructor Destroy; override; protected procedure Execute; override; end; procedure TWorkerThread.Execute; begin try Self._Method(_Index, Self._Param); finally Self.Terminate; Self.Done := True; // 已经完成 end; end; constructor TWorkerThread.Create(CreateSuspended: Boolean); begin inherited Create(CreateSuspended); Self.Done := False; end; destructor TWorkerThread.Destroy; begin inherited Destroy; end;
有了这些定义 我们就可以实现这个并发的FOR语法 第一个参数就是并发的过程.
procedure TParallelFor(Method: TParallelForFunc; const Param: TParallelParam; First, Last: integer; ThreadNumber: integer); var flags: array of boolean; threads: array of TWorkerThread; tnum, len, i, cnt, j: integer; begin len := Last - First + 1; if (len <= 0) then begin Raise Exception.Create('TParallelFor.Len = 0'); end; SetLength(flags, len); tnum := ThreadNumber; if (tnum < 2) then begin tnum := 2; end; SetLength(threads, tnum); i := 0; cnt := 0; // 完成的数目 while (cnt < len) do // 还没完成 begin // 寻找下一个可用的线程 for j := 0 to tnum - 1 do begin // 线程还没被创建 或者 已经完成 if ((threads[j] = nil) or (threads[j].Done) or (threads[j].Terminated)) then begin // 获得下一个任务 while (flags[i]) do begin i := (i + 1) mod len; end; // 标记为已经有线程在做了 flags[i] := True; // 增加任务完成 +1 Inc(cnt); // 开始 if (threads[j] <> nil) then begin threads[j].Free; threads[j] := nil; end; if (threads[j] = nil) then begin threads[j] := TWorkerThread.Create(True); threads[j].FreeOnTerminate := False; end; with threads[j] do begin _Method := Method; _Param := Param; _Index := First + i; Priority := tpHigher; Resume; // 并发 end; if (cnt >= len) then begin break; end; end; end; Sleep(0); // 主线程 不停的等啊等 end; // 剩余的线程需要完成 for i := 0 to tnum - 1 do begin if (threads[i] <> nil) then begin WaitForSingleObject(threads[i].Handle, INFINITE); threads[i].Free; end; end; end;
使用用例
把上面的代码 放在一个独立 的单元中 然后这样使用
uses Parallel; implementation type _PAAB = ^_AAB; procedure Parallel1(Arg: integer; const obj: TParallelParam); var j, i, k: integer; map: _PAAB; begin map := obj.Data; i := obj.First; j := obj.Last; if (i <= j) then begin for k := i to j do begin if (map^[Arg][k] = 0) then begin map^[Arg][k] := 2; end else begin break; end; end; end else begin for k := i downto j do begin if (map^[Arg][k] = 0) then begin map^[Arg][k] := 2; end else begin break; end; end; end; end; procedure TestParallel; var map: _AAB; begin SetLength(map, 1000, 1000); obj := TParallelParam.Create; try obj.Data := @map; obj.SetValues(0, High(map)); TParallelFor(@Parallel1, obj, 0, High(map[0]), 8); // 8 线程 finally obj.Free; end; end; end.
局限性
本来可以使用 AsyncCalls 库则可以相当简化线程的调用 AsyncCall(@fun, [parameters]) 但是 AsyncCall 使用了一个全局的线程池来管理线程 当在COM+DLL库中 当 单元被移除的时候 会发生死锁 dead lock.
相关英文技术贴
1. Delphi AsyncCalls Deadlock when Terminating the Threads (com+ dll finalization)
2. OTL can’t be compiled under D2007
3. Is it ok not to free objects before unit unloads from memory (finalization section)?
4. The Ultimate Delphi Threading Library
英文原贴: DELPHI 2007 下的 PARALLEL FOR 实现
强烈推荐
- 英国代购-畅购英伦
- TopCashBack 返现 (英国购物必备, 积少成多, 我2年来一共得了3000多英镑)
- Quidco 返现 (也是很不错的英国返现网站, 返现率高)
- 注册就送10美元, 免费使用2个月的 DigitalOcean 云主机(性价比超高, 每月只需5美元)
- 注册就送10美元, 免费使用4个月的 Vultr 云主机(性价比超高, 每月只需2.5美元)
- 注册就送10美元, 免费使用2个月的 阿里 云主机(性价比超高, 每月只需4.5美元)
- 注册就送20美元, 免费使用4个月的 Linode 云主机(性价比超高, 每月只需5美元) (折扣码: PodCastInit2022)
- PlusNet 英国光纤(超快, 超划算! 用户名 doctorlai)
- 刷了美国运通信用卡一年得到的积分 换了 485英镑
- 注册就送50英镑 – 英国最便宜最划算的电气提供商
- 能把比特币莱特币变现的银行卡! 不需要手续费就可以把虚拟货币法币兑换
微信公众号: 小赖子的英国生活和资讯 JustYYUK