在我目前的代码库中,我使用的是“作业”和“作业管理器”的概念.作业仅用于处理初始消息,发送响应,然后等待响应.
我已经有了基于同步套接字的现有代码,其中网络线程正在等待数据到达,然后将其传递给事件处理程序,最终传递给作业管理器.
作业管理器查找将处理消息的作业,并将其传递.
所以场景是这样的:
>工作经理获得新消息并启动工作.
>作业启动,处理消息并发送回复消息.
>此时,作业将等待对回复的响应.
这是一个伪代码示例:
class MyJob : Job { public override void RunJob( IPacketMsg packet ) { // handle packet var myReply = new Packet(); SendReply( myReply ); await GetResponse(); } }
但我不完全确定如何在第3步继续.工作经理将获得响应,然后将其交给正在运行的工作.但我不知道如何让工作等待回应.
我考虑过创建一个等待在WaitHandle上阻塞的任务,但这是最好的解决方案吗?
在这种情况下我还能做些什么吗?
编辑
关于Async CTP的主题,在没有使用UI的情况下会发生什么.我已经阅读了Eric Lippert的Async博客,但我不相信它曾经触及过如何在没有UI线程的情况下在后台工作的主题(它是关闭背景工作者还是……?)
解决方法
- Job manager gets a new message and launches a job.
- The job starts,processes the message,and sends a reply message.
- At this point the job would wait for a response to the reply.
首先,我应该提到Async CTP非常好地处理异步操作,但异步事件并没有那么多.您可能想要考虑基于Rx的方法.但是让我们继续使用Async CTP.
您有两个基本选项来创建任务:
>有代表.例如,Task.Factory.StartNew将在线程池上运行委托.自定义任务工厂和调度程序为任务委托提供了更多选项(例如,指定委托必须在STA线程上运行).
>没有代表.例如,TaskFactory.FromAsync包装现有的Begin / End方法对,TaskEx.FromResult返回“future constant”,TaskCompletionSource可用于显式控制Task(FromAsync和FromResult都在内部使用TCS).
如果作业处理受cpu约束,则将其传递给Task.Factory.StartNew是有意义的.我将假设作业处理受cpu限制.
// Responds to a new message by starting a new job on the thread pool. private void RespondToNewMessage(IPacketMsg message) { IJob job = ..; Task.Factory.StartNew(job.RunJob(message)); } // Holds tasks waiting for a response. private ConcurrentDictionary<int,TaskCompletionSource<IResponse>> responseTasks = ..; // Asynchronously gets a response for the specified reply. public Task<IResponse> GetResponseForReplyAsync(int replyId) { var tcs = new TaskCompletionSource<IResponse>(); responseTasks.Add(replyId,tcs); return tcs.Task; } // Responds to a new response by completing and removing its task. private void RespondToResponse(IResponse response) { var tcs = responseTasks[response.ReplyId]; responseTasks.Remove(response.ReplyId); tcs.TrySetComplete(response); }
这个想法是,工作经理还管理一系列优秀的回复.为了实现这一点,我引入了一个简单的int reply标识符,作业管理器可以使用该标识符来确定响应哪个响应.
现在工作可以像这样工作:
public override void RunJob(IPacketMsg packet) { // handle packet var myReply = new Packet(); var response = jobManager.GetResponseForReplyAsync(myReply.ReplyId); SendReply(myReply); await response; }
因为我们将作业放在线程池线程上,所以有一些棘手的事情:
>必须在发送回复之前调用GetResponseForReplyAsync(注册任务),然后等待.这是为了避免在我们有机会注册之前发送回复和收到回复的情况.
> RespondToResponse将在完成任务之前删除任务注册,以防万一完成任务导致另一个回复以相同的ID发送.
如果作业足够短,不需要将它们放在线程池线程上,那么可以简化解决方案.