这是我的Interval定义:
m_interval = Observable.Interval(TimeSpan.FromSeconds(5),m_schedulerProvider.EventLoop) .ObserveOn(m_schedulerProvider.EventLoop) .Select(l => Observable.FromAsync(DoWork)) .Concat() .Subscribe();
在上面的代码中,我在Interval和amp;中提供了IScheduler.从SchedulerProvider观察,以便我可以更快地进行单元测试(TestScheduler.AdvanceBy).此外,DoWork是一种异步方法.
在我的特定情况下,我希望每5秒调用一次DoWork函数.这里的问题是我希望5秒是DoWork结束和另一个开始之间的时间.因此,如果DoWork执行时间超过5秒,那么说10秒,第一个呼叫将是5秒,第二个呼叫是15秒.
不幸的是,以下测试证明它不像那样:
[Fact] public void MultiPluginStatusHelperShouldWaitForNextQuery() { m_queryHelperMock .Setup(x => x.CustomQueryAsync()) .Callback(() => Thread.Sleep(10000)) .Returns(Task.FromResult(new QueryCompletedEventData())) .Verifiable() ; var multiPluginStatusHelper = m_container.GetInstance<IMultiPluginStatusHelper>(); multiPluginStatusHelper.MillisecondsInterval = 5000; m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks); m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks); m_queryHelperMock.Verify(x => x.CustomQueryAsync(),Times.Once); }
DoWork调用CustomQueryAsync,测试失败,表示已调用两次.它应该只被调用一次,因为.Callback(()=> Thread.Sleep(1000))强制延迟.
我在这做错了什么?
我的实际实施来自this example.
解决方法
通常在轮询一些不可观察的数据源时会出现这个问题.当我遇到它时,我使用了一段时间后写的RepeatAfterDelay运算符:
public static IObservable<T> RepeatAfterDelay<T>(this IObservable<T> source,TimeSpan delay,IScheduler scheduler) { var repeatSignal = Observable .Empty<T>() .Delay(delay,scheduler); // when source finishes,wait for the specified // delay,then repeat. return source.Concat(repeatSignal).Repeat(); }
这就是我使用它的方式:
// do first set of work immediately,and then every 5 seconds do it again m_interval = Observable .FromAsync(DoWork) .RepeatAfterDelay(TimeSpan.FromSeconds(5),scheduler) .Subscribe(); // wait 5 seconds,then do first set of work,then again every 5 seconds m_interval = Observable .Timer(TimeSpan.FromSeconds(5),scheduler) .SelectMany(_ => Observable .FromAsync(DoWork) .RepeatAfterDelay(TimeSpan.FromSeconds(5),scheduler)) .Subscribe();