快速正脏
是否可以将传统事件侦听器重构为Java 8 Stream,以便侦听器事件成为流源?
很长的故事
客户端提交任意作业,然后侦听结果:
Client client = new JobClient() client.addTaskListener(this) client.submitJobAsync( new MultiTaskJob()) //returns void,important (see below) public void onTaskResult(TaskResult result){ if(result.isLastResult()) aggregateJobResults(result) else processResult(result) }
问题
对于提交的任何作业,客户端收到n个结果,但它不知道它将收到多少结果(它使用isLastResult()来确定何时停止和聚合).
目标
我想将侦听器重构为“供应商”或类似的东西,以便onTaskResult()是流源:
Supplier<TaskResult> taskResultSupplier = () -> Stream.of( .. ) //onTaskResult() Feeds this .map(result -> { if(result.isLastResult()) //logic here });
像这样的东西;如果我能在没有客户知道会有多少结果的情况下做到这一点,我就是金色的;现在,submitJobAsync()返回void,我想保持这种方式,但我也对选项持开放态度……
备择方案
在Tomasz Nurkiewicz在CompletableFutures上阅读类似的情况后,假设对客户进行了微小的更改,则存在备用选项:
List<CompletableFuture<TaskResult>> taskFutures = client.submitJobAsync( new MultiTaskJob())
在这里,客户获得CompletableFutures< TaskResult>的列表,因此我们需要在完成时收集期货的结果:
//processes all task result futures List<TaskResult> = taskFutures.stream() .map(taskResult -> taskResult.thenApply(this::processResult)) .collect(Collectors.<TaskResult>toList());
本文还说明了使用CompletableFuture.allOf(..)来执行最终处理,但只有在所有期货完成之后(它非常光滑);这就是我的情况下会发生聚合的地方.没有代码可以在这里显示,虽然这篇文章很好地解释了它(我总共有n00b的流,但如果我得到它工作,我会发布代码:-D)
解决方法
可以在TaskResults周围构建一个Stream.看这个例子:
import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Stream; import java.util.stream.StreamSupport; /** * Created for https://stackoverflow.com/q/27670421/1266906. */ public class AsyncToStream { public static void main(String[] args) { System.out.println("Unbuffered Test:"); AsyncTaskResultIterator<TaskResult> taskListener1 = new AsyncTaskResultIterator<>(); new TaskResultGenerator(taskListener1,5).start(); taskListener1.unbufferedStream().forEach(System.out::println); System.out.println("Buffered Test:"); AsyncTaskResultIterator<TaskResult> taskListener2 = new AsyncTaskResultIterator<>(); new TaskResultGenerator(taskListener2,5).start(); taskListener2.bufferedStream().forEach(System.out::println); } /** * This class wraps a sequence of TaskResults into an iterator upto the first TaskResult where {@code }isLastResult()} returns {@code true} */ public static class AsyncTaskResultIterator<T extends TaskResult> implements Iterator<T>,TaskListener<T> { /** * This acts as an asynchronous buffer so we can easily wait for the next TaskResult */ private final BlockingQueue<T> blockingQueue; /** * Becomes {@code true} once {@code TaskResult.isLastResult()} is received */ private boolean ended; public AsyncTaskResultIterator() { blockingQueue = new LinkedBlockingQueue<>(); } /** * Waits on a new TaskResult and returns it as long as the prevIoUs TaskResult did not specify {@code isLastResult()}. Afterwards no more elements can be retrieved. */ @Override public T next() { if (ended) { throw new NoSuchElementException(); } else { try { T next = blockingQueue.take(); ended = next.isLastResult(); return next; } catch (InterruptedException e) { throw new IllegalStateException("Could not retrieve next value",e); } } } @Override public boolean hasNext() { return !ended; } /** * Enqueue another TaskResult for retrieval */ @Override public void onTaskResult(T result) { if (ended) { throw new IllegalStateException("Already received a TaskResult with isLastResult() == true"); } try { blockingQueue.put(result); } catch (InterruptedException e) { throw new IllegalStateException("Could not enqueue next value",e); } } /** * Builds a Stream that acts upon the results just when they become available */ public Stream<T> unbufferedStream() { Spliterator<T> spliterator = Spliterators.spliteratorUnknownSize(this,0); return StreamSupport.stream(spliterator,false); } /** * Buffers all results and builds a Stream around the results */ public Stream<T> bufferedStream() { Stream.Builder<T> builder = Stream.builder(); this.forEachRemaining(builder); return builder.build(); } } public static class TaskResultImpl implements TaskResult { private boolean lastResult; private String name; public TaskResultImpl(boolean lastResult,String name) { this.lastResult = lastResult; this.name = name; } @Override public String toString() { return "TaskResultImpl{" + "lastResult=" + lastResult + ",name='" + name + '\'' + '}'; } @Override public boolean isLastResult() { return lastResult; } } public static interface TaskListener<T extends TaskResult> { public void onTaskResult(T result); } public static interface TaskResult { boolean isLastResult(); } private static class TaskResultGenerator extends Thread { private final TaskListener<TaskResult> taskListener; private final int count; public TaskResultGenerator(TaskListener<TaskResult> taskListener,int count) { this.taskListener = taskListener; this.count = count; } @Override public void run() { try { for (int i = 1; i < count; i++) { Thread.sleep(200); taskListener.onTaskResult(new TaskResultImpl(false,String.valueOf(i))); } Thread.sleep(200); taskListener.onTaskResult(new TaskResultImpl(true,String.valueOf(count))); } catch (InterruptedException e) { e.printStackTrace(); } } } }
你没有提供你的TaskResult和TaskListener定义所以我自己编造了. AsyncTaskResultIterator仅适用于单个TaskResult-sequences.如果没有提供具有isLastResult()== true的TaskResult next(),那么无缓冲的Stream和缓冲的Stream生成将无休止地等待.