以下代码:
Observable .just(0,1,2,3,4,5,6,7,8,9) .doOnNext(item -> System.out.println("source emitting " + item)) .groupBy(item -> { System.out.println("groupBy called for " + item); return item % 3; }) .subscribe(observable -> { System.out.println("got observable " + observable + " for key " + observable.getKey()); observable.subscribe(item -> { System.out.println("key " + observable.getKey() + ",item " + item); }); });
让我感到困惑.我得到的输出是:
source emitting 0 groupBy called for 0 got observable rx.observables.GroupedObservable@42110406 for key 0 key 0,item 0 source emitting 1 groupBy called for 1 got observable rx.observables.GroupedObservable@1698c449 for key 1 key 1,item 1 source emitting 2 groupBy called for 2 got observable rx.observables.GroupedObservable@5ef04b5 for key 2 key 2,item 2 source emitting 3 groupBy called for 3 key 0,item 3 source emitting 4 groupBy called for 4 key 1,item 4 source emitting 5 groupBy called for 5 key 2,item 5 source emitting 6 groupBy called for 6 key 0,item 6 source emitting 7 groupBy called for 7 key 1,item 7 source emitting 8 groupBy called for 8 key 2,item 8 source emitting 9 groupBy called for 9 key 0,item 9
因此,在顶级订阅方法中,我按预期从GroupedObservable获得3个可观察对象.然后,我一个接一个地订阅了分组的observable – 这里我不明白的事情:
为什么原始项目仍以原始序列(即0,3 ……)发出,而不是0,9 ……用于键0,后面是1,7为键1,键2的后面是2,8;
我想我明白了如何创建组:
1. 0 is emitted,the key function is called and it gets 0 2. it is checked if an observable for 0 exists,it doesn't,so a new one is created and emitted,and then it emits 0 3. the same happens for source items 1 and 2 as they both create new groups,and observables with key 1 and 2 are emitted,and they emit 1 and 2 correspondingly 4. source item 3 is emitted,the key function is called and it gets 0 5. it is checked if an observable for 0 exists,it does -> no new grouped observable is created nor emitted,but 3 is emitted by the already existing observable 6. etc. until the source sequence is drained
似乎虽然我逐个获得了分组的可观测量,但它们的排放在某种程度上是交错的.这是怎么发生的?
解决方法
Why are the original items still emitted in the original sequence (i.e. 0,…) and not 0,9 … for key 0,followed by 1,7 for key 1,followed by 2,8 for key 2?
你已经回答了自己的问题.您按照它们发出的顺序对项目流进行操作.因此,每一个都被发射出来,它会从运算符链中传递下来,你会看到你在这里显示的输出.
您期望的替代输出需要链等待,直到源停止为所有组发出项目.假设你有Observable.just(0,0).那么你期望(0,0),(1,4),(2)作为你的输出组.如果你拥有4个无限的流量怎么办?您的订户永远不会从第一组收到0,3 ..
您可以创建您正在寻找的行为. toList运算符将缓存输出直到源完成,然后传递List< R>.给订户:
.subscribe(observable -> { System.out.println("got observable " + observable + " for key " + observable.getKey()); observable.toList().subscribe(items -> { // items is a List<Integer> System.out.println("key " + observable.getKey() + ",items " + items); }); });