我处于一个我正在工作的任务列表(启用驱动器,更改位置,等待停止,禁用)的情况.
“等待”监视IObservable< Status>,我想等待(所以我可以通过ContinueWith和其他任务进行线程).
我从OnNext处理订阅者开始执行以下任务,但这只是丑陋.我现在想到的是这种扩展方法:
public static Task<T> WaitFor<T>(this IObservable<T> source,Func<T,bool> pred) { var tcs = new TaskCompletionSource<T>(); source .Where(pred) .DistinctUntilChanged() .Take(1) //OnCompletes the observable,subscription will self-dispose .Subscribe(val => tcs.TrySetResult(val),ex => tcs.TrySetException(ex),() => tcs.TrySetCanceled()); return tcs.Task; }
(更新svick的处理OnCompleted和OnError的建议)
问题:
这是好,坏,还是丑?
我错过了一个可以做到这一点的现有扩展?
>正确的顺序是Where and DistinctUntilChanged? (我想他们是)
解决方法
至少我会改变这种扩展方法是这样的:
public static Task<T> WaitFor<T>(this IObservable<T> source,bool> pred) { return source .Where(pred) .DistinctUntilChanged() .Take(1) .ToTask(); }
使用.ToTask()比引入TaskCompletionSource要好得多.您需要引用System.Reactive.Threading.Tasks命名空间来获取.ToTask()扩展方法.
此外,DistinctUntilChanged在此代码中是多余的.你只能得到一个值,所以默认情况下它必须是不同的.
现在,我的下一个建议可能有点有争议.这个扩展是一个坏主意,因为它隐藏了正在发生的真正语义.
如果我有这两个代码片段:
var t = xs.WaitFor(x => x > 10);
要么:
var t = xs.Where(x => x > 10).Take(1).ToTask();
我通常喜欢第二个snippit,因为它清楚地显示了我发生了什么 – 我不需要记住WaitFor的语义.
除非您将WaitFor的名称更具描述性 – 或许是TakeOneAsTaskWhere,那么您将使用明确的代码来使用操作符,并使代码更难管理.
以下是不是更容易记住语义?
var t = xs.TakeOneAsTaskWhere(x => x > 10);
对我来说,底线是Rx运算符是组合的,而不是封装,但是如果你要封装它们,那么它们的含义就必须清楚.
我希望这有帮助.