我有一个非常简单的集成流程,其中RESTful请求使用发布 – 订阅通道转发给两个提供者.然后,将两个RESTful服务的结果聚合在一个阵列中.整合流程的草图如下所示:
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
).subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class)
)
)
)
.aggregate()
.get();
}
但是,在运行我的代码时,结果数组包含仅由一个RESTful服务返回的项.我缺少任何配置步骤吗?
UPDATE
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel-scatter")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather"))
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather")))
.get();
}
@Bean
IntegrationFlow gatherFlow() {
return IntegrationFlows.from("inputChannel-gather")
.aggregate(a -> a.outputProcessor(g -> new GenericMessage
最佳答案
实际上它不会那样工作.
原文链接:https://www.f2er.com/spring/437296.html.aggregate()是该publishSubscribeChannel的第三个订阅者.
你必须切断你的两个流量.像这样:
@Bean
public IntegrationFlow publishSubscribeFlow() {
return flow -> flow
.publishSubscribeChannel(s -> s
.applySequence(true)
.subscribe(f -> f
.handle((p,h) -> "Hello")
.channel("publishSubscribeAggregateFlow.input"))
.subscribe(f -> f
.handle((p,h) -> "World!")
.channel("publishSubscribeAggregateFlow.input"))
);
}
@Bean
public IntegrationFlow publishSubscribeAggregateFlow() {
return flow -> flow
.aggregate(a -> a.outputProcessor(g -> g.getMessages()
.stream()
.
请注意两个订阅者的.channel(“publishSubscribeAggregateFlow.input”)用法.
说实话,这是任何发布 – 订阅的重点.如果我们要聚合它们,我们必须知道在哪里发送所有订户的结果.
您的用例让我想起了Scatter-Gather EIP模式.
我们还没有在DSL中实现它.
随意提出GH issue问题,我们将尝试在即将发布的1.2版本中处理它.
UPDATE
关于此事的GH问题:https://github.com/spring-projects/spring-integration-java-dsl/issues/75