c# – “合并”流的流以生成每个流的最新值的流

前端之家收集整理的这篇文章主要介绍了c# – “合并”流的流以生成每个流的最新值的流前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我有一个IObservable< IObservable< T>>其中每个内部IObservable< T>是一个值流,后跟最终的OnCompleted事件.

我想将其转换为IObservable< IEnumerable< T>>,这是一个由未完成的任何内部流的最新值组成的流.它应该产生一个新的IEnumerable< T>每当从一个内部流(或内部流到期)产生新值时

最容易用大理石图显示(我希望它足够全面):

  1. input ---.----.---.----------------
  2. | | '-f-----g-|
  3. | 'd------e---------|
  4. 'a--b----c-----|
  5.  
  6. result ---a--b-b--c-c-c-e-e-e---[]-
  7. d d d e f g
  8. f f

([]是一个空的IEnumerable< T>而 – |代表OnCompleted)

您可以看到它略微类似于CombineLatest操作.
我一直在玩Join和GroupJoin无济于事,但我觉得这几乎肯定是正确的方向.

我想在这个运算符中使用尽可能少的状态.

更新

我已经更新了这个问题,不仅包括单值序列 – 结果IObservable< IEnumerable< T>>应该只包含每个序列的最新值 – 如果序列没有产生值,则不应包括它.

解决方法

这是昨天基于您的解决方案的版本,针对新要求进行了调整.基本思路是将引用放入易腐变集合中,然后在内部序列生成新值时更新引用的值.

我还修改了正确跟踪内部订阅并取消订阅外部observable是否取消订阅.

如果任何流产生错误,也进行修改以将其全部拆除.

最后,我修正了一些可能违反Rx指南的竞争条件.如果你的内部observable是从不同的线程同时触发,你可以同时调用obs.OnNext,这是一个很大的禁忌.所以我使用相同的锁来控制每个内部observable以防止这种情况(参见同步调用).请注意,因此,您可能会使用常规双链表而不是PerishableCollection,因为现在使用该集合的所有代码都在一个锁中,因此您不需要PerishableCollection的线程保证.

  1. // Acts as a reference to the current value stored in the list
  2. private class BoxedValue<T>
  3. {
  4. public T Value;
  5. public BoxedValue(T initialValue) { Value = initialValue; }
  6. }
  7.  
  8. public static IObservable<IEnumerable<T>> MergeLatest<T>(this IObservable<IObservable<T>> source)
  9. {
  10. return Observable.Create<IEnumerable<T>>(obs =>
  11. {
  12. var collection = new PerishableCollection<BoxedValue<T>>();
  13. var outerSubscription = new SingleAssignmentDisposable();
  14. var subscriptions = new CompositeDisposable(outerSubscription);
  15. var innerLock = new object();
  16.  
  17. outerSubscription.Disposable = source.Subscribe(duration =>
  18. {
  19. BoxedValue<T> value = null;
  20. var lifetime = new DisposableLifetime(); // essentially a CancellationToken
  21. var subscription = new SingleAssignmentDisposable();
  22.  
  23. subscriptions.Add(subscription);
  24. subscription.Disposable = duration.Synchronize(innerLock)
  25. .Subscribe(
  26. x =>
  27. {
  28. if (value == null)
  29. {
  30. value = new BoxedValue<T>(x);
  31. collection.Add(value,lifetime.Lifetime);
  32. }
  33. else
  34. {
  35. value.Value = x;
  36. }
  37. obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
  38. },obs.OnError,// handle an error in the stream.
  39. () => // on complete
  40. {
  41. if (value != null)
  42. {
  43. lifetime.Dispose(); // removes the item
  44. obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
  45. subscriptions.Remove(subscription); // remove this subscription
  46. }
  47. }
  48. );
  49. });
  50.  
  51. return subscriptions;
  52. });
  53. }

猜你在找的C#相关文章