试图在单个生产者多消费者方案中使用TThreadedQueue(Generics.Collections)。 (Delphi-XE)。
想法是将对象推入队列,并让几个工作线程排空队列。
想法是将对象推入队列,并让几个工作线程排空队列。
它不能按预期工作,但。
当两个或多个工作线程调用PopItem时,会从TThreadedQueue抛出访问冲突。
如果PopItem的调用序列化了一个临界区,一切都很好。
当然TThreadedQueue应该能够处理多个消费者,所以我缺少的东西或者这是一个纯的bug在TThreadedQueue?
这里是一个简单的例子来产生错误。
program TestThreadedQueue; {$APPTYPE CONSOLE} uses // FastMM4 in '..\..\..\FastMM4\FastMM4.pas',Windows,Messages,Classes,SysUtils,SyncObjs,Generics.Collections; type TThreadTaskMsg = class(TObject) private threadID : integer; threadMsg : string; public Constructor Create( ID : integer; const msg : string); end; type TThreadReader = class(TThread) private fPopQueue : TThreadedQueue<TObject>; fSync : TCriticalSection; fMsg : TThreadTaskMsg; fException : Exception; procedure DoSync; procedure DoHandleException; public Constructor Create( popQueue : TThreadedQueue<TObject>; sync : TCriticalSection); procedure Execute; override; end; Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>; sync : TCriticalSection); begin fPopQueue:= popQueue; fMsg:= nil; fSync:= sync; Self.FreeOnTerminate:= FALSE; fException:= nil; Inherited Create( FALSE); end; procedure TThreadReader.DoSync ; begin WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId)); end; procedure TThreadReader.DoHandleException; begin WriteLn('Exception ->' + fException.Message); end; procedure TThreadReader.Execute; var signal : TWaitResult; begin NameThreadForDebugging('QueuePop worker'); while not Terminated do begin try {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. } Sleep(20); {- Serializing calls to PopItem works } if Assigned(fSync) then fSync.Enter; try signal:= fPopQueue.PopItem( TObject(fMsg)); finally if Assigned(fSync) then fSync.Release; end; if (signal = wrSignaled) then begin try if Assigned(fMsg) then begin fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>'; fMsg.Free; // We are just dumping the message in this test //Synchronize( Self.DoSync); //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0); end; except on E:Exception do begin end; end; end; except FException:= Exception(ExceptObject); try if not (FException is EAbort) then begin {Synchronize(} DoHandleException; //); end; finally FException:= nil; end; end; end; end; Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string); begin Inherited Create; threadID:= ID; threadMsg:= msg; end; var fSync : TCriticalSection; fThreadQueue : TThreadedQueue<TObject>; fReaderArr : array[1..4] of TThreadReader; i : integer; begin try IsMultiThread:= TRUE; fSync:= TCriticalSection.Create; fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100); try {- Calling without fSync throws exceptions when two or more threads calls PopItem at the same time } WriteLn('Creating worker threads ...'); for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil); {- Calling with fSync works ! } //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync); WriteLn('Init done. Pushing items ...'); for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,'')); ReadLn; finally for i:= 1 to 4 do fReaderArr[i].Free; fThreadQueue.Free; fSync.Free; end; except on E: Exception do begin Writeln(E.ClassName,': ',E.Message); ReadLn; end; end; end.
更新:导致TThreadedQueue崩溃的TMonitor中的错误是在Delphi XE2中修复的。
更新2:上面的测试强调队列在空状态。 Darian Miller发现,在满状态下强调队列,仍然可以再现XE2中的错误。错误再次出现在TMonitor中。有关更多信息,请参阅下面的答案。还有一个链接到QC101114。
更新3:
有了Delphi-XE2更新4,有一个宣布的修正TMonitor将解决问题在TThreadedQueue。我的测试到目前为止不能再现任何错误在TThreadedQueue了。
当队列为空且已满时,测试单个生产者/多个消费者线程。
还测试了多个生产者/多个消费者。我改变了读者线程和写线程从1到100没有任何毛刺。但是知道历史,我敢于别人打破TMonitor。