c# – 如何在没有引入竞争条件的情况下等待RX主题的回复?

我有一个服务允许来电者发送命令和异步接收响应.在一个实际的应用中,这些动作相当断开(一些动作会发送一个命令,响应将被独立处理).

但是,在我的测试中,我需要发送一个命令,然后在继续测试之前等待(第一个)响应.

响应使用RX发布,我的第一次尝试代码是这样的:

service.SendCommand("BLAH");
await service.Responses.FirstAsync();

这个问题是,FirstAsync只有在响应到来之后才能工作,等待已经被打了.如果服务流程非常快,那么测试就会等待着.

我的下一次尝试解决这个问题是在发送命令之前调用FirstAsync(),以便即使在等待之前到达,也会产生结果:

var firstResponse = service.Responses.FirstAsync();
service.SendCommand("BLAH");
await firstResponse;

然而,这仍然以同样的方式失败.似乎只有当等待命中(GetAwaiter被调用)它才开始收听;所以存在完全相同的竞争条件.

如果我将主题改为具有缓冲区(或计时器)的ReplaySubject,那么我可以“解决”这个;然而在我的生产课上这样做是没有意义的;它只会用于测试.

在RX中能做到这一点的“正确”方法是什么?如何设置一个将不会引入竞争条件的方式在流上接收第一个事件的东西?

这是一个小测试,以“单线程”的方式说明问题.这个测试会不定期地停止:

[Fact]
public async Task MyTest()
{
    var x = new Subject<bool>();

    // Subscribe to the first bool (but don't await it yet)
    var firstBool = x.FirstAsync();

    // Send the first bool
    x.OnNext(true);

    // Await the task that receives the first bool
    var b = await firstBool; // <-- hangs here; presumably because firstBool didn't start monitoring until GetAwaiter was called?


    Assert.Equal(true,b);
}

我甚至尝试在我的测试中调用Replay(),它会缓冲结果;但这并不改变任何事情:

[Fact]
public async Task MyTest()
{
    var x = new Subject<bool>();

    var firstBool = x.Replay();

    // Send the first bool
    x.OnNext(true);

    // Await the task that receives the first bool
    var b = await firstBool.FirstAsync(); // <-- Still hangs here


    Assert.Equal(true,b);
}

解决方法

您可以使用AsyncSubject来执行此操作
[Fact]
public async Task MyTest()
{
    var x = new Subject<bool>();

    var firstBool = x.FirstAsync().PublishLast(); // PublishLast wraps an AsyncSubject
    firstBool.Connect();

    // Send the first bool
    x.OnNext(true);

    // Await the task that receives the first bool
    var b = await firstBool;


    Assert.Equal(true,b);
}

AsyncSubject基本上缓存在调用OnComplete之前的最后接收的值,然后重播它.

相关文章

在项目中使用SharpZipLib压缩文件夹的时候,遇到如果目录较深,则压缩包中的文件夹同样比较深的问题。比...
项目需要,几十万张照片需要计算出每个照片的特征值(调用C++编写的DLL)。 业务流程:选择照片...
var array = new byte[4]; var i = Encoding.UTF8.GetBytes(100.ToString(&quot;x2&quot;));//...
其实很简单,因为Combox的Item是一个K/V的object,那么就可以把它的items转换成IEnumerable&lt;Dic...
把.net4.6安装包打包进安装程序。 关键脚本如下: 头部引用字符串对比库 !include &quot;WordFunc....
项目需求(Winform)可以批量打印某个模板,经过百度和摸索,使用iTextSharp+ZXing.NetʿreeSp...