public static <T> Stream<T> filterEven(Stream<T> src) { Spliterator<T> iter = src.spliterator(); AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE,Spliterator.ORDERED) { @Override public boolean tryAdvance(Consumer<? super T> action) { iter.tryAdvance(item -> {}); // discard return iter.tryAdvance(action); // use } }; return StreamSupport.stream(res,false); }
我可以通过以下方式使用它:
Stream<DomainObject> res = Files.lines(src) filterEven(res) .map(line -> toDomainObject(line))
然而,测量这种方法对使用带有副作用的filter()的下一个方法的性能,我注意到下一个方法表现更好:
final int[] counter = {0}; final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0; Stream<DomainObject> res = Files.lines(src) .filter(line -> isEvenLine ()) .map(line -> toDomainObject(line))
我用JMH测试了性能,我没有在基准测试中包含文件负载.我之前将它加载到一个数组中.然后每个基准测试首先创建一个Stream< String>从上一个数组,然后过滤偶数行,然后应用mapToInt()来提取int字段的值,最后提取max()操作.这是它的基准之一(你可以检查整个程序here,这里有data file with about 186 lines):
@Benchmark public int maxTempFilterEven(DataSource src){ Stream<String> content = Arrays.stream(src.data) .filter(s-> s.charAt(0) != '#') // Filter comments .skip(1); // Skip line: Not available return filterEven(content) // Filter daily info and skip hourly .mapToInt(line -> parseInt(line.substring(14,16))) .max() .getAsInt(); }
我不明白为什么filter()方法比filterEven()(~50ops / ms)具有更好的性能(~80ops / ms)?
解决方法
我想我知道原因,但不幸的是我不知道如何提高基于Spliterator的解决方案的性能(至少不重写整个Streams API功能).
旁注1:设计Stream API时,性能不是最重要的设计目标.如果性能至关重要,那么很可能在没有Stream API的情况下重写代码会使代码更快. (例如,Stream API不可避免地增加了内存分配,从而增加了GC压力).另一方面,在大多数情况下,Stream API以相对较小的性能降级为代价提供更好的更高级API.
第1部分或简短的理论答案
流被设计为实现一种内部迭代,因为消费和外部迭代(即基于Spliterator)的主要手段是一种“模拟”的附加手段.因此,外部迭代涉及一些开销.懒惰增加了外部迭代效率的一些限制,并且需要支持flatMap,因此在此过程中需要使用某种动态缓冲区.
旁注2在某些情况下,基于Spliterator的迭代可能与内部迭代一样快(即在这种情况下为过滤器).特别是在直接从包含数据的Stream创建Spliterator的情况下.要查看它,您可以修改测试以将第一个过滤器具体化为Strings数组:
String[] filteredData = Arrays.stream(src.data) .filter(s-> s.charAt(0) != '#') // Filter comments .skip(1) .toArray(String[]::new);
然后将maxTempFilter和maxTempFilterEven的性能进行比较,以接受预过滤的String [] filteredData.如果你想知道为什么会这样,你可能应该阅读这个长答案的其余部分或至少第2部分.
第2部分或更长的理论答案:
流被设计为主要通过一些终端操作来消费.虽然支持逐个迭代元素但不是设计为消耗流的主要方式.
请注意,使用“功能”流API,例如map,flatMap,filter,reduce和collect,你不能说在某个步骤“我已经有足够的数据,停止迭代源并推送值”.您可以丢弃一些传入的数据(如过滤器那样)但不能停止迭代. (获取和跳过转换实际上是使用Spliterator实现的;而anyMatch,allMatch,noneMatch,findFirst,findAny等使用非公共API j.u.s.Sink.cancellationRequested,它们也更容易,因为不能有多个终端操作).如果管道中的所有转换都是同步的,您可以将它们组合成单个聚合函数(Consumer)并在一个简单的循环中调用它(可选地将循环执行分成几个线程).这就是我基于状态的过滤器的简化版本所代表的内容(请参阅“显示一些代码”部分中的代码).如果管道中有flatMap但想法仍然相同,则会变得有点复杂.
基于Spliterator的转换从根本上是不同的,因为它为管道添加了异步的消费者驱动步骤.现在Spliterator而不是源Stream驱动迭代过程.如果您直接在源Stream上请求Spliterator,它可能会返回一些只是迭代其内部数据结构的实现,这就是为什么实现预过滤数据应该消除性能差异.但是,如果为某些非空管道创建Spliterator,除了要求源通过管道逐个推送元素直到某个元素通过所有过滤器之外,没有其他(简单)选择(另请参见第二个示例)给我看一些代码部分).源元素被逐个推送而不是在某些批次中被推动的事实是使Streams变得懒惰的基本决定的结果.需要一个缓冲区而不是一个元素是支持flatMap的结果:从源中推送一个元素可以为Spliterator生成许多元素.
第3部分或给我看一些代码
这部分试图为“理论”部分中描述的代码(实际代码和模拟代码的链接)提供一些支持.
首先,你应该知道当前的Streams API实现将非终端(中间)操作累积到一个惰性管道中(参见j.u.s.AbstractPipeline及其子节点,如j.u.s.ReferencePipeline.然后,当应用终端操作时,原始元素中的所有元素流被“推”通过管道.
你看到的是两件事的结果:
>流量管道在您遇到的情况下不同的事实
在里面有一个基于Spliterator的步骤.
>您的OddLines不是管道中的第一步
static int similarToFilter(String[] data) { final int[] counter = {0}; final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0; int skip = 1; boolean reduceEmpty = true; int reduceState = 0; for (String outerEl : data) { if (outerEl.charAt(0) != '#') { if (skip > 0) skip--; else { if (isEvenLine.test(outerEl)) { int intEl = parseInt(outerEl.substring(14,16)); if (reduceEmpty) { reduceState = intEl; reduceEmpty = false; } else { reduceState = Math.max(reduceState,intEl); } } } } } return reduceState; }
请注意,这实际上是一个单循环,内部有一些计算(过滤/转换).
另一方面,当您将Spliterator添加到管道中时,事情会发生显着变化,即使简化代码与实际发生的代码相似也会变得更大,例如:
interface Sp<T> { public boolean tryAdvance(Consumer<? super T> action); } static class ArraySp<T> implements Sp<T> { private final T[] array; private int pos; public ArraySp(T[] array) { this.array = array; } @Override public boolean tryAdvance(Consumer<? super T> action) { if (pos < array.length) { action.accept(array[pos]); pos++; return true; } else { return false; } } } static class WrappingSp<T> implements Sp<T>,Consumer<T> { private final Sp<T> sourceSp; private final Predicate<T> filter; private final ArrayList<T> buffer = new ArrayList<T>(); private int pos; public WrappingSp(Sp<T> sourceSp,Predicate<T> filter) { this.sourceSp = sourceSp; this.filter = filter; } @Override public void accept(T t) { buffer.add(t); } @Override public boolean tryAdvance(Consumer<? super T> action) { while (true) { if (pos >= buffer.size()) { pos = 0; buffer.clear(); sourceSp.tryAdvance(this); } // Failed to fill buffer if (buffer.size() == 0) return false; T nextElem = buffer.get(pos); pos++; if (filter.test(nextElem)) { action.accept(nextElem); return true; } } } } static class OddLineSp<T> implements Sp<T>,Consumer<T> { private Sp<T> sourceSp; public OddLineSp(Sp<T> sourceSp) { this.sourceSp = sourceSp; } @Override public boolean tryAdvance(Consumer<? super T> action) { if (sourceSp == null) return false; sourceSp.tryAdvance(this); if (!sourceSp.tryAdvance(action)) { sourceSp = null; } return true; } @Override public void accept(T t) { } } static class ReduceIntMax { boolean reduceEmpty = true; int reduceState = 0; public int getReduceState() { return reduceState; } public void accept(int t) { if (reduceEmpty) { reduceEmpty = false; reduceState = t; } else { reduceState = Math.max(reduceState,t); } } } static int similarToSpliterator(String[] data) { ArraySp<String> src = new ArraySp<>(data); int[] skip = new int[1]; skip[0] = 1; WrappingSp<String> firstFilter = new WrappingSp<String>(src,(s) -> { if (s.charAt(0) == '#') return false; if (skip[0] != 0) { skip[0]--; return false; } return true; }); OddLineSp<String> oddLines = new OddLineSp<>(firstFilter); final ReduceIntMax reduceIntMax = new ReduceIntMax(); while (oddLines.tryAdvance(s -> { int intValue = parseInt(s.substring(14,16)); reduceIntMax.accept(intValue); })) ; // do nothing in the loop body return reduceIntMax.getReduceState(); }
这段代码更大,因为在循环内部没有一些非平凡的有状态回调的情况下,逻辑是不可能的(或者至少非常难).这里接口Sp是j.u.s.Stream和j.u.Spliterator接口的混合.
>类ArraySp表示Arrays.stream的结果.
>类WrappingSp类似于j.u.s.StreamSpliterators.WrappingSpliterator,其在实际代码中表示用于任何非空管道的Spliterator接口的实现,即具有应用于其的至少一个中间操作的流(参见j.u.s.AbstractPipeline.spliterator method).在我的代码中,我将它与StatelessOp子类合并,并将逻辑负责过滤器方法实现.同样为了简化我实现了使用过滤器跳过.
> OddLineSp对应于您的OddLines及其产生的Stream
> ReduceIntMax表示针对int的Math.max的ReduceOps终端操作
那么在这个例子中重要的是什么?这里重要的是,由于您首先过滤原始流,因此OddLineSp是从非空管道(即WrappingSp)创建的.如果你仔细看看WrappingSp,你会注意到每次调用tryAdvance时,它都会将调用委托给sourceSp并将结果累积到缓冲区中.此外,由于管道中没有flatMap,缓冲区的元素将逐个复制.即每次调用WrappingSp.tryAdvance时,它都会调用ArraySp.tryAdvance,只返回一个元素(通过回调),并将其进一步传递给调用者提供的消费者(除非元素与过滤器不匹配,在这种情况下,ArraySp .tryAdvance将一次又一次地调用,但缓冲区永远不会一次填充多个元素).
旁注3:如果你想查看真实的代码,最有趣的地方是j.u.s.StreamSpliterators.WrappingSpliterator.tryAdvance
,它们会调用
j.u.s.StreamSpliterators.AbstractWrappingSpliterator.doAdvance
又调用j.u.s.StreamSpliterators.AbstractWrappingSpliterator.fillBuffer
,而j.u.s.StreamSpliterators.AbstractWrappingSpliterator.fillBuffer
又调用在j.u.s.StreamSpliterators.WrappingSpliterator.initPartialTraversalState
初始化的推动器
因此,影响性能的主要因素是复制到缓冲区.
不幸的是,对于我们来说,通常的Java开发人员,Stream API的当前实现几乎已经关闭,您不能仅使用继承或组合来修改内部行为的某些方面.
您可以使用一些基于反射的黑客来使复制到缓冲区更有效地适应您的特定情况并获得一些性能(但牺牲了流的懒惰)但您无法完全避免这种复制,因此基于Spliterator的代码将是反正慢了.
回到Sidenote#2中的示例,基于Spliterator的测试与物化过滤数据的工作速度更快,因为在OddLineSp之前管道中没有WrappingSp,因此不会复制到中间缓冲区.