当流源是Reader时,我无法实现流处理的良好并行化.在四核cpu上运行下面的代码我观察到3个内核首先被使用,然后突然下降到两个,然后是一个核心.整体cpu利用率在50%左右.
请注意以下示例的特点:
>只有6000行;
>每行约20ms进行处理;
>整个过程大约需要一分钟.
这意味着所有的压力都在cpu上,I / O很小.这个例子是一个自动并行化的坐式鸭子.
import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; ... class imports elided ... public class Main { static final AtomicLong totalTime = new AtomicLong(); public static void main(String[] args) throws IOException { final long start = System.nanoTime(); final Path inputPath = createInput(); System.out.println("Start processing"); try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) { Files.lines(inputPath).parallel().map(Main::processLine) .forEach(w::println); } final double cpuTime = totalTime.get(),realTime = System.nanoTime()-start; final int cores = Runtime.getRuntime().availableProcessors(); System.out.println(" Cores: " + cores); System.out.format(" cpu time: %.2f s\n",cpuTime/SECONDS.toNanos(1)); System.out.format(" Real time: %.2f s\n",realTime/SECONDS.toNanos(1)); System.out.format("cpu utilization: %.2f%%",100.0*cpuTime/realTime/cores); } private static String processLine(String line) { final long localStart = System.nanoTime(); double ret = 0; for (int i = 0; i < line.length(); i++) for (int j = 0; j < line.length(); j++) ret += Math.pow(line.charAt(i),line.charAt(j)/32.0); final long took = System.nanoTime()-localStart; totalTime.getAndAdd(took); return NANOSECONDS.toMillis(took) + " " + ret; } private static Path createInput() throws IOException { final Path inputPath = Paths.get("input.txt"); try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) { for (int i = 0; i < 6_000; i++) { final String text = String.valueOf(System.nanoTime()); for (int j = 0; j < 25; j++) w.print(text); w.println(); } } return inputPath; } }
我的典型输出:
Cores: 4 cpu time: 110.23 s Real time: 53.60 s cpu utilization: 51.41%
为了比较,如果我使用一个稍微修改的变体,我首先收集到列表中,然后处理:
Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine) .forEach(w::println);
我得到这个典型的输出:
Cores: 4 cpu time: 138.43 s Real time: 35.00 s cpu utilization: 98.87%
注意,我最初在servlet输入流的读者上观察到这一点,因此它不是FileReader的特定的.
解决方法
这里是答案,在Spliterators.IteratorSpliterator的源代码中,BufferedReader#lines()使用的代码:
@Override public Spliterator<T> trySplit() { /* * Split into arrays of arithmetically increasing batch * sizes. This will only improve parallel performance if * per-element Consumer actions are more costly than * transferring them into an array. The use of an * arithmetic progression in split sizes provides overhead * vs parallelism bounds that do not particularly favor or * penalize cases of lightweight vs heavyweight element * operations,across combinations of #elements vs #cores,* whether or not either are known. We generate * O(sqrt(#elements)) splits,allowing O(sqrt(#cores)) * potential speedup. */ Iterator<? extends T> i; long s; if ((i = it) == null) { i = it = collection.iterator(); s = est = (long) collection.size(); } else s = est; if (s > 1 && i.hasNext()) { int n = batch + BATCH_UNIT; if (n > s) n = (int) s; if (n > MAX_BATCH) n = MAX_BATCH; Object[] a = new Object[n]; int j = 0; do { a[j] = i.next(); } while (++j < n && i.hasNext()); batch = j; if (est != Long.MAX_VALUE) est -= j; return new ArraySpliterator<>(a,j,characteristics); } return null; }
也值得注意的是常数:
static final int BATCH_UNIT = 1 << 10; // batch array size increment static final int MAX_BATCH = 1 << 25; // max batch array size;
所以在我的例子中,我使用6,000个元素,因为批量大小为1024,所以我只需要批量批次.这正好解释了我的观察结果:最初使用三个内核,当两个小批次完成时,它们都会被丢弃.在此期间,我尝试了一个具有6万个元素的修改示例,然后我得到几乎100%的cpu利用率.
为了解决我的问题,我已经开发了下面的代码,它允许我将任何现有流转换成一个Spliterator#trySplit将其分割成指定大小的批次.从我的问题使用它的最简单的方法是这样的:
toFixedBatchStream(Files.newBufferedReader(inputPath).lines(),20)
在较低级别上,下面的类是一个Spliterator包装器,它改变了包装的spliterator的trySplit行为,并保留其他方面不变.
import static java.util.Spliterators.spliterator; import static java.util.stream.StreamSupport.stream; import java.util.Comparator; import java.util.Spliterator; import java.util.function.Consumer; import java.util.stream.Stream; public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> { private final Spliterator<T> spliterator; private final int batchSize; private final int characteristics; private long est; public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap,long est,int batchSize) { final int c = toWrap.characteristics(); this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c; this.spliterator = toWrap; this.est = est; this.batchSize = batchSize; } public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap,int batchSize) { this(toWrap,toWrap.estimateSize(),batchSize); } public static <T> Stream<T> toFixedBatchStream(Stream<T> in,int batchSize) { return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(),batchSize),true); } @Override public Spliterator<T> trySplit() { final HoldingConsumer<T> holder = new HoldingConsumer<>(); if (!spliterator.tryAdvance(holder)) return null; final Object[] a = new Object[batchSize]; int j = 0; do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder)); if (est != Long.MAX_VALUE) est -= j; return spliterator(a,characteristics()); } @Override public boolean tryAdvance(Consumer<? super T> action) { return spliterator.tryAdvance(action); } @Override public void forEachRemaining(Consumer<? super T> action) { spliterator.forEachRemaining(action); } @Override public Comparator<? super T> getComparator() { if (hasCharacteristics(SORTED)) return null; throw new IllegalStateException(); } @Override public long estimateSize() { return est; } @Override public int characteristics() { return characteristics; } static final class HoldingConsumer<T> implements Consumer<T> { Object value; @Override public void accept(T value) { this.value = value; } } }