Application Events and Listeners
1、自定义事件和监听
1.1、定义事件
1 package com.cjs.boot.event; 2 3 import lombok.Data; 4 org.springframework.context.ApplicationEvent; 5 6 @Data 7 public class BlackListEvent extends ApplicationEvent { 8 9 private String address; 10 11 public BlackListEvent(Object source,String address) { 12 super(source); 13 this.address = address; 14 } 15 }
1.2、定义监听
org.springframework.context.ApplicationListener; org.springframework.context.event.EventListener; 5 org.springframework.stereotype.Component; 6 7 8 class BlackListListener implements ApplicationListener<BlackListEvent> { 9 10 @Override void onApplicationEvent(BlackListEvent event) { 12 System.out.println("监听到BlackListEvent事件: " + event.getAddress()); try14 Thread.sleep(2000); 15 } catch (InterruptedException e) { 16 e.printStackTrace(); 17 } 18 19 }
1.3、注册监听
com.cjs.boot; com.cjs.boot.event.BlackListListener; org.springframework.boot.SpringApplication; org.springframework.boot.autoconfigure.SpringBootApplication; org.springframework.boot.web.server.ErrorPage; org.springframework.boot.web.server.ErrorPageRegistrar; org.springframework.boot.web.server.ErrorPageRegistry; 9 org.springframework.cache.annotation.EnableCaching; org.springframework.context.annotation.Bean; 11 org.springframework.http.HttpStatus; 12 org.springframework.scheduling.annotation.EnableAsync; 13 @SpringBootApplication 15 class CjsSpringbootExampleApplication { 16 17 static main(String[] args) { 18 19 SpringApplication springApplication = new SpringApplication(CjsSpringbootExampleApplication.20 springApplication.addListeners(new BlackListListener()); 21 springApplication.run(args); 22 23 }
1.4、发布事件
com.cjs.boot.controller; com.cjs.boot.event.BlackListEvent; org.springframework.beans.factory.annotation.Autowired; org.springframework.context.ApplicationContext; org.springframework.context.ApplicationEventPublisher; org.springframework.web.bind.annotation.GetMapping; org.springframework.web.bind.annotation.RequestMapping; org.springframework.web.bind.annotation.RestController; @RestController 12 @RequestMapping("/activity") 13 ActivityController { 14 // @Autowired private ApplicationEventPublisher publisher; 17 19 ApplicationContext publisher; 20 21 @GetMapping("/sayHello.json"22 sayHello() { 23 24 /** 25 * You may register as many event listeners as you wish,but note that by default event listeners receive events synchronously. 26 This means the publishEvent() method blocks until all listeners have finished processing the event. 27 */ 28 29 BlackListEvent event = new BlackListEvent(this,"abc@126.com"30 publisher.publishEvent(event); 31 System.out.println("事件发布成功"32 33 34 }
2、基于注解的事件监听
com.cjs.boot.event; org.springframework.stereotype.Component; @Component BlackListListener { @EventListener processBlackListEvent(BlackListEvent event) { System.out.println(123); } } --- com.cjs.boot; org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication CjsSpringbootExampleApplication { main(String[] args) { SpringApplication.run(CjsSpringbootExampleApplication.,args); } }
3、异步监听
1 @EventListener 2 @Async 3 processBlackListEvent(BlackListEvent event) { 4 BlackListEvent is processed in a separate thread 5 }
4、应用
lombok.extern.slf4j.Slf4j; org.springframework.scheduling.annotation.Async; 11 java.util.ArrayList; java.util.List; java.util.concurrent.ExecutionException; java.util.concurrent.Future; java.util.concurrent.atomic.AtomicInteger; 19 * 批量送券 20 @Slf4j 22 @Component 23 BatchSendCouponListener { 24 26 CouponPresentLogService couponPresentLogService; 27 28 @Async 29 @EventListener 30 processBatchSendCouponEvent(BatchSendCouponEvent batchSendCouponEvent) { 31 Long cpId = batchSendCouponEvent.getCouponPresentId(); 32 log.info("收到BatchSendCouponEvent,cpId={}"33 List<CouponPresentLogEntity> list = couponPresentLogService.selectByPid(cpId); 34 35 handle(cpId,list,036 37 38 private void handle(Long cpId,List<CouponPresentLogEntity> list,int times) { 39 if (times >= 2) { 40 log.info("超过重试次数退出,cpId: {},剩余: {}"41 return; 42 43 44 List<Future<CouponPresentLogEntity>> futureList = new ArrayList<>(); 45 46 for (CouponPresentLogEntity entity : list) { 47 futureList.add(couponPresentLogService.present(entity)); 48 49 50 AtomicInteger count = new AtomicInteger(051 收集失败的 52 List<CouponPresentLogEntity> failList = 53 for (Future<CouponPresentLogEntity> future : futureList) { 54 55 CouponPresentLogEntity couponPresentLogEntity = future.get(); 56 if (couponPresentLogEntity.getStatus() != PresentStatusEnum.SUCCESS.getType().intValue()) { 57 failList.add(couponPresentLogEntity); 58 } 59 count.getAndIncrement(); 60 if (count.intValue() >= list.size()) { 61 List<CouponPresentLogEntity> failPresentLogList = couponPresentLogService.selectFailLogByPid(cpId); 62 if (null != failPresentLogList && failPresentLogList.size() > 063 times++64 log.info("第{}次重试,CPID: {},总计: {},失败: {}"65 handle(cpId,failPresentLogList,times); 66 } 67 68 } 69 log.error(e.getMessage(),e); 70 } (ExecutionException e) { 71 72 } 73 74 75 76 }
2 org.springframework.scheduling.annotation.AsyncResult; org.springframework.stereotype.Service; javax.annotation.Resource; import java.util.concurrent.*@Service class CouponPresentLogServiceImpl implements CouponPresentLogService { 15 CouponPresentLogDao couponPresentLogDao; @Resource CouponSendRpcService couponSendRpcService; 19 @Async("myThreadPoolTaskExecutor"20 21 public Future<CouponPresentLogEntity> present(CouponPresentLogEntity entity) { 22 23 CouponBaseResponse rst = couponSendRpcService.send(entity.getUserId(),entity.getCouponBatchKey(),"1"24 null != rst && rst.isSuccess()) { entity.setStatus(PresentStatusEnum.SUCCESS.getType()); entity.setFailureReason(PresentStatusEnum.SUCCESS.getName()); 27 }else28 String reason = (null == rst) ? "响应异常" : rst.getMsg(); 29 entity.setFailureReason(reason); 30 entity.setStatus(PresentStatusEnum.FAILURE.getType()); 31 32 } (Exception ex) { 33 log.error(ex.getMessage(),ex); 34 entity.setFailureReason(ex.getMessage()); 35 entity.setStatus(PresentStatusEnum.FAILURE.getType()); 37 couponPresentLogDao.update(entity); 38 return new AsyncResult<CouponPresentLogEntity>(entity); 40 41 42 }
5、统计异步任务执行的进度
利用Future获取执行结果,比如上面的例子中,由于不是直接提交的任务,所以用AsyncResult来返回结果@H_301_707@
上面的例子中,一个大任务,然后下面有许多子任务。在主任务中,统计各子任务的执行情况,是成功还是失败,然后统计成功多少,失败多少@H_301_707@
也可以这样写:@H_301_707@
@H_301_707@
@Autowired ThreadPoolTaskExecutor taskExecutor; Future<Object> future = taskExecutor.submit(new Callable<Object>() { @Override public Object call() throws Exception { null; } });
@H_301_707@
原文链接:https://www.f2er.com/spring/882838.html