##maven
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bus</artifactId> </dependency>
##config
@Configuration public class EventConfig { @Bean Environment environment() { return Environment.initializeIfEmpty().assignErrorJournal(); } @Bean @Autowired public EventBus eventBus(Environment environment,MyEventListener myEventListener) { EventBus eventBus = EventBus.create(environment,Environment.THREAD_POOL); eventBus.on($("myevent"),myEventListener); return eventBus; } }
env的默认配置从reactor-core-2.0.8.RELEASE.jar!/Meta-INF/reactor/reactor-environment.properties中读取
# # Copyright (c) 2011-2015 Pivotal Software Inc.,Inc. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing,software # distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ## # Dispatcher configuration # # Each dispatcher must be configured with a type: # # reactor.dispatchers.<name>.type = <type> # # Legal values for <type> are dispatcherGroup,ringBuffer,synchronous,and workQueue. # Depending on the type,further configuration is be possible: # # reactor.dispatchers.<name>.size: dispatcherGroup and workQueue Dispatchers # reactor.dispatchers.<name>.backlog: dispatcherGroup,and workQueue Dispatchers # # A size less than 1 may be specified to indicate that the size should be the same as the number # of cpus. # A thread pool executor dispatcher,named threadPoolExecutor reactor.dispatchers.threadPoolExecutor.type = threadPoolExecutor reactor.dispatchers.threadPoolExecutor.size = 0 # Backlog is how many Task objects to warm up internally reactor.dispatchers.threadPoolExecutor.backlog = 2048 # A group of dispatchers replicated from the default dispatcher,named dispatcherGroup reactor.dispatchers.dispatcherGroup.type = dispatcherGroup reactor.dispatchers.dispatcherGroup.size = 0 reactor.dispatchers.dispatcherGroup.backlog = 2048 # A ring buffer dispatcher,named ringBuffer reactor.dispatchers.shared.type = ringBuffer reactor.dispatchers.shared.backlog = 8192 # A work queue dispatcher,named workQueue reactor.dispatchers.workQueue.type = workQueue reactor.dispatchers.workQueue.size = 0 reactor.dispatchers.workQueue.backlog = 2048 # The dispatcher named shared should be the default dispatcher reactor.dispatchers.default = shared
##event listener
@Component public class MyEventListener implements Consumer<Event<MyEvent>> { private static final Logger LOGGER = LoggerFactory.getLogger(MyEventListener.class); @Override public void accept(Event<MyEvent> eventContextEvent) { MyEvent event = eventContextEvent.getData(); LOGGER.info("thread {},receive event:{}",Thread.currentThread().getName(),event.getData()); } }
##publish event
@Autowired EventBus eventBus; public void publishEvent(String data){ eventBus.notify("myevent",Event.wrap(new MyEvent(data))); }
##ThreadPoolExecutorDispatcher reactor-core-2.0.8.RELEASE-sources.jar!/reactor/core/dispatch/ThreadPoolExecutorDispatcher.java 在reactor-core-2.0.8.RELEASE-sources.jar!/reactor/Environment.java创建默认的ThreadPoolExecutorDispatcher
private static ThreadPoolExecutorDispatcher createThreadPoolExecutorDispatcher(DispatcherConfiguration dispatcherConfiguration) { int size = getSize(dispatcherConfiguration,0); int backlog = getBacklog(dispatcherConfiguration,128); return new ThreadPoolExecutorDispatcher(size,backlog,dispatcherConfiguration.getName()); }
构造器
/** * Create a new {@literal ThreadPoolExecutorDispatcher} with the given size,name,and {@link * java.util.concurrent.RejectedExecutionHandler}. * * @param poolSize * the pool size * @param backlog * the backlog size * @param threadName * the name prefix to use when creating threads */ public ThreadPoolExecutorDispatcher(int poolSize,int backlog,String threadName) { this(poolSize,threadName,new LinkedBlockingQueue<Runnable>(backlog),new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r,ThreadPoolExecutor executor) { r.run(); } }); }
默认采用的是LinkedBlockingQueue,大小为配置文件指定的backlog,RejectedExecutionHandler采用的是调用者执行。
/** * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method,* unless the executor has been shut down,in which case the task * is discarded. */ public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread,unless the executor * has been shut down,in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r,ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
因此,队列没有满的时候是异步的,满的时候就阻塞了。
##doc
原文链接:https://www.f2er.com/react/304244.html