我有一个类在一个低级库中封装了一些异步行为.想一想读或写网络的东西.当类启动时,它将尝试连接到环境,并且当成功时,它将通过从工作线程调用来发信号.
我想将这个异步行为变成一个同步调用,并且我已经创建了一个非常简化的例子,下面就是如何实现的:
ManualResetEvent readyEvent = new ManualResetEvent(false); public void Start(TimeSpan timeout) { // Simulate a background process ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1))); // Wait for startup to complete. if (!this.readyEvent.WaitOne(timeout)) throw new TimeoutException(); } void AsyncStart(TimeSpan delay) { Thread.Sleep(delay); // Simulate startup delay. this.readyEvent.Set(); }
在工作线程上运行AsyncStart只是模拟库的异步行为的一种方法,不是我的真实代码的一部分,低级库提供线程并在回调中调用我的代码.
请注意,如果启动在超时间隔内尚未完成,则Start方法将抛出TimeoutException异常.
我想重写这个代码来使用Rx.这是我的第一个尝试:
Subject<Unit> readySubject = new Subject<Unit>(); public void Start(TimeSpan timeout) { ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1))); // Point A - see below this.readySubject.Timeout(timeout).First(); } void AsyncStart(TimeSpan delay) { Thread.Sleep(delay); this.readySubject.OnNext(new Unit()); }
这是一个体面的尝试,但不幸的是它包含一个种族条件.如果启动快速完成(例如,如果延迟为0),并且如果在A点存在额外的延迟,则OnNext将在First执行之前在readySubject上调用.实质上,IObservable我正在应用超时,首先从来没有看到该启动已经完成,并且将抛出一个TimeoutException.
似乎已经创建了Observable.Defer来处理这样的问题.这是稍微复杂的尝试使用Rx:
Subject<Unit> readySubject = new Subject<Unit>(); void Start(TimeSpan timeout) { var ready = Observable.Defer(() => { ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1))); // Point B - see below return this.readySubject.AsObservable(); }); ready.Timeout(timeout).First(); } void AsyncStart(TimeSpan delay) { Thread.Sleep(delay); this.readySubject.OnNext(new Unit()); }
现在异步操作不会立即启动,但只有在使用IObservable时才会启动.不幸的是,仍然存在竞争条件,但这次在点B.如果异步操作在Defer lambda返回之前开始调用OnNext,它仍然丢失,并且TimeoutException将被超时抛出.
我知道我可以使用像Replay这样的操作来缓冲事件,但是我没有使用Rx的初始示例不会使用任何缓冲.有没有办法使用Rx来解决我的问题,没有竞争条件?在本例中,在IObservable已连接到本例的情况下才启动异步操作Timeout和First?
基于Paul Betts的答案是工作解决方案:
void Start(TimeSpan timeout) { var readySubject = new AsyncSubject<Unit>(); ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject,TimeSpan.FromSeconds(1))); // Point C - see below readySubject.Timeout(timeout).First(); } void AsyncStart(ISubject<Unit> readySubject,TimeSpan delay) { Thread.Sleep(delay); readySubject.OnNext(new Unit()); readySubject.OnCompleted(); }
有趣的部分是当点C的延迟比AsyncStart完成所需的时间更长时. AsyncSubject保留发送的最后通知,Timeout和First将仍然按预期的方式执行.
解决方法
要知道的关键是,异步func由返回IObservable< TResult>的函数表示. – 这是魔术酱,让你在某些事情完成时发出信号.所以这里是如何“Rx-ify”更传统的异步功能,就像您在Silverlight Web服务中看到的那样:
IObservable<byte[]> readFromNetwork() { var ret = new AsyncSubject(); // Here's a traditional async function that you provide a callback to asyncReaderFunc(theFile,buffer => { ret.OnNext(buffer); ret.OnCompleted(); }); return ret; }
This is a decent attempt but unfortunately it contains a race condition.
这就是AsyncSubject所在的地方 – 这样可以确保即使asyncReaderFunc打到“订阅”,AsyncSubject仍然会“重播”发生了什么.
所以,现在我们有了我们的功能,我们可以做很多有趣的事情:
// Make it into a sync function byte[] results = readFromNetwork().First(); // Keep reading blocks one at a time until we run out readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => { Console.WriteLine("Read {0} bytes in chunk",bytes.Length); }) // Read the entire stream and get notified when the whole deal is finished readFromNetwork() .Repeat().TakeUntil(x => x == null || x.Length == 0) .Aggregate(new MemoryStream(),(ms,bytes) => ms.Write(bytes)) .Subscribe(ms => { Console.WriteLine("Got {0} bytes in total",ms.ToArray().Length); }); // Or just get the entire thing as a MemoryStream and wait for it var memoryStream = readFromNetwork() .Repeat().TakeUntil(x => x == null || x.Length == 0) .Aggregate(new MemoryStream(),bytes) => ms.Write(bytes)) .First();