我有一个IObservable< IObservable< T>>其中每个内部IObservable< T>是一个值流,后跟最终的OnCompleted事件.
我想将其转换为IObservable< IEnumerable< T>>,这是一个由未完成的任何内部流的最新值组成的流.它应该产生一个新的IEnumerable< T>每当从一个内部流(或内部流到期)产生新值时
最容易用大理石图显示(我希望它足够全面):
- input ---.----.---.----------------
- | | '-f-----g-|
- | 'd------e---------|
- 'a--b----c-----|
- result ---a--b-b--c-c-c-e-e-e---[]-
- d d d e f g
- f f
([]是一个空的IEnumerable< T>而 – |代表OnCompleted)
您可以看到它略微类似于CombineLatest操作.
我一直在玩Join和GroupJoin无济于事,但我觉得这几乎肯定是正确的方向.
我想在这个运算符中使用尽可能少的状态.
更新
我已经更新了这个问题,不仅包括单值序列 – 结果IObservable< IEnumerable< T>>应该只包含每个序列的最新值 – 如果序列没有产生值,则不应包括它.
解决方法
这是昨天基于您的解决方案的版本,针对新要求进行了调整.基本思路是将引用放入易腐变集合中,然后在内部序列生成新值时更新引用的值.
我还修改了正确跟踪内部订阅并取消订阅外部observable是否取消订阅.
最后,我修正了一些可能违反Rx指南的竞争条件.如果你的内部observable是从不同的线程同时触发,你可以同时调用obs.OnNext,这是一个很大的禁忌.所以我使用相同的锁来控制每个内部observable以防止这种情况(参见同步调用).请注意,因此,您可能会使用常规双链表而不是PerishableCollection,因为现在使用该集合的所有代码都在一个锁中,因此您不需要PerishableCollection的线程保证.
- // Acts as a reference to the current value stored in the list
- private class BoxedValue<T>
- {
- public T Value;
- public BoxedValue(T initialValue) { Value = initialValue; }
- }
- public static IObservable<IEnumerable<T>> MergeLatest<T>(this IObservable<IObservable<T>> source)
- {
- return Observable.Create<IEnumerable<T>>(obs =>
- {
- var collection = new PerishableCollection<BoxedValue<T>>();
- var outerSubscription = new SingleAssignmentDisposable();
- var subscriptions = new CompositeDisposable(outerSubscription);
- var innerLock = new object();
- outerSubscription.Disposable = source.Subscribe(duration =>
- {
- BoxedValue<T> value = null;
- var lifetime = new DisposableLifetime(); // essentially a CancellationToken
- var subscription = new SingleAssignmentDisposable();
- subscriptions.Add(subscription);
- subscription.Disposable = duration.Synchronize(innerLock)
- .Subscribe(
- x =>
- {
- if (value == null)
- {
- value = new BoxedValue<T>(x);
- collection.Add(value,lifetime.Lifetime);
- }
- else
- {
- value.Value = x;
- }
- obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
- },obs.OnError,// handle an error in the stream.
- () => // on complete
- {
- if (value != null)
- {
- lifetime.Dispose(); // removes the item
- obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
- subscriptions.Remove(subscription); // remove this subscription
- }
- }
- );
- });
- return subscriptions;
- });
- }