从源码上理解Netty并发工具-Promise

前端之家收集整理的这篇文章主要介绍了从源码上理解Netty并发工具-Promise前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

前提

最近一直在看Netty相关的内容,也在编写一个轻量级的RPC框架来练手,途中发现了Netty的源码有很多亮点,某些实现甚至可以用苛刻来形容。另外,Netty提供的工具类也是相当优秀,可以开箱即用。这里分析一下个人比较喜欢的领域,并发方面的一个Netty工具模块 - Promise

环境版本:

  • Netty:4.1.44.Final
  • JDK1.8

Promise简介

Promise,中文翻译为承诺或者许诺,含义是人与人之间,一个人对另一个人所说的具有一定憧憬的话,一般是可以实现的。

io.netty.util.concurrent.Promise在注释中只有一句话:特殊的可写的io.netty.util.concurrent.FuturePromise接口是io.netty.util.concurrent.Future的子接口)。而io.netty.util.concurrent.Futurejava.util.concurrent.Future的扩展,表示一个异步操作的结果。我们知道,JDK并发包中的Future是不可写,也没有提供可监听的入口(没有应用观察者模式),而Promise很好地弥补了这两个问题。另一方面从继承关系来看,DefaultPromise是这些接口的最终实现类,所以分析源码的时候需要把重心放在DefaultPromise类。一般一个模块提供的功能都由接口定义,这里分析一下两个接口的功能列表:

  • io.netty.util.concurrent.Promise
  • io.netty.util.concurrent.Future

先看io.netty.util.concurrent.Future接口:

  1. public interface Future<V> extends java.util.concurrent.Future<V> {
  2. // I/O操作是否执行成功
  3. boolean isSuccess();
  4. // 标记是否可以通过下面的cancel(boolean mayInterruptIfRunning)取消I/O操作
  5. boolean isCancellable();
  6. // 返回I/O操作的异常实例 - 如果I/O操作本身是成功的,此方法返回null
  7. Throwable cause();
  8. // 为当前Future实例添加监听Future操作完成的监听器 - isDone()方法激活之后所有监听器实例会得到回调
  9. Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
  10. Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  11. // 为当前Future移除监听Future操作完成的监听器
  12. Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
  13. Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  14. // 同步等待Future完成得到最终结果(成功)或者抛出异常(失败),响应中断
  15. Future<V> sync() throws InterruptedException;
  16. // 同步等待Future完成得到最终结果(成功)或者抛出异常(失败),不响应中断
  17. Future<V> syncUninterruptibly();
  18. // 等待Future完成,响应中断
  19. Future<V> await() throws InterruptedException;
  20. // 等待Future完成,不响应中断
  21. Future<V> awaitUninterruptibly();
  22. // 带超时时限的等待Future完成,响应中断
  23. boolean await(long timeout,TimeUnit unit) throws InterruptedException;
  24. boolean await(long timeoutMillis) throws InterruptedException;
  25. // 带超时时限的等待Future完成,不响应中断
  26. boolean awaitUninterruptibly(long timeout,TimeUnit unit);
  27. boolean awaitUninterruptibly(long timeoutMillis);
  28. // 非阻塞马上返回Future的结果,如果Future未完成,此方法一定返回null;有些场景下如果Future成功获取到的结果是null则需要二次检查isDone()方法是否为true
  29. V getNow();
  30. // 取消当前Future实例的执行,如果取消成功会抛出CancellationException异常
  31. @Override
  32. boolean cancel(boolean mayInterruptIfRunning);
  33. }

sync()await()方法类似,只是sync()会检查异常执行的情况,一旦发现执行异常马上把异常实例包装抛出,而await()方法对异常无感知。

接着看io.netty.util.concurrent.Promise接口:

  1. public interface Promise<V> extends Future<V> {
  2. // 标记当前Future成功,设置结果,如果设置成功,则通知所有的监听器,如果Future已经成功或者失败,则抛出IllegalStateException
  3. Promise<V> setSuccess(V result);
  4. // 标记当前Future成功,设置结果,如果设置成功,则通知所有的监听器并且返回true,否则返回false
  5. boolean trySuccess(V result);
  6. // 标记当前Future失败,设置结果为异常实例,如果设置成功,则通知所有的监听器,如果Future已经成功或者失败,则抛出IllegalStateException
  7. Promise<V> setFailure(Throwable cause);
  8. // 标记当前Future失败,设置结果为异常实例,如果设置成功,则通知所有的监听器并且返回true,否则返回false
  9. boolean tryFailure(Throwable cause);
  10. // 标记当前的Promise实例为不可取消,设置成功返回true,否则返回false
  11. boolean setUncancellable();
  12. // 下面的方法和io.netty.util.concurrent.Future中的方法基本一致,只是修改了返回类型为Promise
  13. @Override
  14. Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
  15. @Override
  16. Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  17. @Override
  18. Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
  19. @Override
  20. Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  21. @Override
  22. Promise<V> await() throws InterruptedException;
  23. @Override
  24. Promise<V> awaitUninterruptibly();
  25. @Override
  26. Promise<V> sync() throws InterruptedException;
  27. @Override
  28. Promise<V> syncUninterruptibly();
  29. }

到此,Promise接口的所有功能都分析完毕,接下来从源码角度详细分析Promise的实现。

Promise源码实现

Promise的实现类为io.netty.util.concurrent.DefaultPromise(其实DefaultPromise还有很多子类,某些实现是为了定制特定的场景做了扩展),而DefaultPromise继承自io.netty.util.concurrent.AbstractFuture

  1. public abstract class AbstractFuture<V> implements Future<V> {
  2. // 永久阻塞等待获取结果的方法
  3. @Override
  4. public V get() throws InterruptedException,ExecutionException {
  5. // 调用响应中断的永久等待方法进行阻塞
  6. await();
  7. // 从永久阻塞中唤醒后,先判断Future是否执行异常
  8. Throwable cause = cause();
  9. if (cause == null) {
  10. // 异常为空说明执行成功,调用getNow()方法返回结果
  11. return getNow();
  12. }
  13. // 异常为空不为空,这里区分特定的取消异常则转换为CancellationException抛出
  14. if (cause instanceof CancellationException) {
  15. throw (CancellationException) cause;
  16. }
  17. // 非取消异常的其他所有异常都被包装为执行异常ExecutionException抛出
  18. throw new ExecutionException(cause);
  19. }
  20. // 带超时阻塞等待获取结果的方法
  21. @Override
  22. public V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException {
  23. // 调用响应中断的带超时时限等待方法进行阻塞
  24. if (await(timeout,unit)) {
  25. // 从带超时时限阻塞中唤醒后,先判断Future是否执行异常
  26. Throwable cause = cause();
  27. if (cause == null) {
  28. // 异常为空说明执行成功,调用getNow()方法返回结果
  29. return getNow();
  30. }
  31. // 异常为空不为空,这里区分特定的取消异常则转换为CancellationException抛出
  32. if (cause instanceof CancellationException) {
  33. throw (CancellationException) cause;
  34. }
  35. // 在非等待超时的前提下,非取消异常的其他所有异常都被包装为执行异常ExecutionException抛出
  36. throw new ExecutionException(cause);
  37. }
  38. // 方法步入此处说明等待超时,则抛出超时异常TimeoutException
  39. throw new TimeoutException();
  40. }
  41. }

AbstractFuture仅仅对get()get(long timeout,TimeUnit unit)两个方法进行了实现,其实这两处的实现和java.util.concurrent.FutureTask中的实现方式十分相似。

DefaultPromise的源码比较多,这里分开多个部分去阅读,先看它的属性和构造函数

  1. public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
  2. // 正常日志的日志句柄,InternalLogger是Netty内部封装的日志接口
  3. private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
  4. // 任务拒绝执行时候的日志句柄 - Promise需要作为一个任务提交到线程中执行,如果任务拒绝则使用此日志句柄打印日志
  5. private static final InternalLogger rejectedExecutionLogger =
  6. InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
  7. // 监听器的最大栈深度,默认值为8,这个值是防止嵌套回调调用的时候栈深度过大导致内存溢出,后面会举个例子说明它的用法
  8. private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth",8));
  9. // 结果更新器,用于CAS更新结果result的值
  10. @SuppressWarnings("rawtypes")
  11. private static final AtomicReferenceFieldUpdater<DefaultPromise,Object> RESULT_UPDATER =
  12. AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class,Object.class,"result");
  13. // 用于填充result的值,当设置结果result传入null,Promise执行成功,用这个值去表示成功的结果
  14. private static final Object SUCCESS = new Object();
  15. // 用于填充result的值,表示Promise不能被取消
  16. private static final Object UNCANCELLABLE = new Object();
  17. // CancellationException实例的持有器,用于判断Promise取消状态和抛出CancellationException
  18. private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
  19. new CancellationException(),DefaultPromise.class,"cancel(...)"));
  20. // CANCELLATION_CAUSE_HOLDER的异常栈信息元素数组
  21. private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
  22. // 真正的结果对象,使用Object类型,最终有可能为null、真正的结果实例、SUCCESS、UNCANCELLABLE或者CANCELLATION_CAUSE_HOLDER等等
  23. private volatile Object result;
  24. // 事件执行器,这里暂时不做展开,可以理解为单个调度线程
  25. private final EventExecutor executor;
  26. // 监听器集合,可能是单个GenericFutureListener实例或者DefaultFutureListeners(监听器集合)实例
  27. private Object listeners;
  28. // 等待获取结果的线程数量
  29. private short waiters;
  30. // 标记是否正在回调监听器
  31. private boolean notifyingListeners;
  32. // 构造函数依赖于EventExecutor
  33. public DefaultPromise(EventExecutor executor) {
  34. this.executor = checkNotNull(executor,"executor");
  35. }
  36. protected DefaultPromise() {
  37. // only for subclasses - 这个构造函数预留给子类
  38. executor = null;
  39. }
  40. // ... 省略其他代码 ...
  41. // 私有静态内部类,用于存放Throwable实例,也就是持有异常的原因实例
  42. private static final class CauseHolder {
  43. final Throwable cause;
  44. CauseHolder(Throwable cause) {
  45. this.cause = cause;
  46. }
  47. }
  48. // 私有静态内部类,用于覆盖CancellationException的栈信息为前面定义的CANCELLATION_STACK,同时覆盖了toString()返回CancellationException的全类名
  49. private static final class LeanCancellationException extends CancellationException {
  50. private static final long serialVersionUID = 2794674970981187807L;
  51. @Override
  52. public Throwable fillInStackTrace() {
  53. setStackTrace(CANCELLATION_STACK);
  54. return this;
  55. }
  56. @Override
  57. public String toString() {
  58. return CancellationException.class.getName();
  59. }
  60. }
  61. // ... 省略其他代码 ...
  62. }

Promise目前支持两种类型的监听器:

  • GenericFutureListener支持泛型的Future监听器。
  • GenericProgressiveFutureListener:它是GenericFutureListener的子类,支持进度表示和支持泛型的Future监听器(有些场景需要多个步骤实现,类似于进度条那样)。
  1. // GenericFutureListener
  2. public interface GenericFutureListener<F extends Future<?>> extends EventListener {
  3. void operationComplete(F future) throws Exception;
  4. }
  5. // GenericProgressiveFutureListener
  6. public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
  7. void operationProgressed(F future,long progress,long total) throws Exception;
  8. }

为了让Promise支持多个监听器,Netty添加了一个默认修饰符修饰的DefaultFutureListeners类用于保存监听器实例数组:

  1. // DefaultFutureListeners
  2. final class DefaultFutureListeners {
  3. private GenericFutureListener<? extends Future<?>>[] listeners;
  4. private int size;
  5. private int progressiveSize; // the number of progressive listeners
  6. // 这个构造相对特别,是为了让Promise中的listeners(Object类型)实例由单个GenericFutureListener实例转换为DefaultFutureListeners类型
  7. @SuppressWarnings("unchecked")
  8. DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first,GenericFutureListener<? extends Future<?>> second) {
  9. listeners = new GenericFutureListener[2];
  10. listeners[0] = first;
  11. listeners[1] = second;
  12. size = 2;
  13. if (first instanceof GenericProgressiveFutureListener) {
  14. progressiveSize ++;
  15. }
  16. if (second instanceof GenericProgressiveFutureListener) {
  17. progressiveSize ++;
  18. }
  19. }
  20. public void add(GenericFutureListener<? extends Future<?>> l) {
  21. GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
  22. final int size = this.size;
  23. // 注意这里,每次扩容数组长度是原来的2倍
  24. if (size == listeners.length) {
  25. this.listeners = listeners = Arrays.copyOf(listeners,size << 1);
  26. }
  27. // 把当前的GenericFutureListener加入数组中
  28. listeners[size] = l;
  29. // 监听器总数量加1
  30. this.size = size + 1;
  31. // 如果为GenericProgressiveFutureListener,则带进度指示的监听器总数量加1
  32. if (l instanceof GenericProgressiveFutureListener) {
  33. progressiveSize ++;
  34. }
  35. }
  36. public void remove(GenericFutureListener<? extends Future<?>> l) {
  37. final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
  38. int size = this.size;
  39. for (int i = 0; i < size; i ++) {
  40. if (listeners[i] == l) {
  41. // 计算需要需要移动的监听器的下标
  42. int listenersToMove = size - i - 1;
  43. if (listenersToMove > 0) {
  44. // listenersToMove后面的元素全部移动到数组的前端
  45. System.arraycopy(listeners,i + 1,listeners,i,listenersToMove);
  46. }
  47. // 当前监听器总量的最后一个位置设置为null,数量减1
  48. listeners[-- size] = null;
  49. this.size = size;
  50. // 如果监听器是GenericProgressiveFutureListener,则带进度指示的监听器总数量减1
  51. if (l instanceof GenericProgressiveFutureListener) {
  52. progressiveSize --;
  53. }
  54. return;
  55. }
  56. }
  57. }
  58. // 返回监听器实例数组
  59. public GenericFutureListener<? extends Future<?>>[] listeners() {
  60. return listeners;
  61. }
  62. // 返回监听器总数量
  63. public int size() {
  64. return size;
  65. }
  66. // 返回带进度指示的监听器总数量
  67. public int progressiveSize() {
  68. return progressiveSize;
  69. }
  70. }

接下来看DefaultPromise的剩余方法实现,笔者觉得DefaultPromise方法实现在代码顺序上是有一定的艺术的。先看几个判断Promise执行状态的方法

  1. public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
  2. // ... 省略其他代码 ...
  3. @Override
  4. public boolean setUncancellable() {
  5. // 通过结果更新器CAS更新result为UNCANCELLABLE,期望旧值为null,更新值为UNCANCELLABLE属性,如果成功则返回true
  6. if (RESULT_UPDATER.compareAndSet(this,null,UNCANCELLABLE)) {
  7. return true;
  8. }
  9. Object result = this.result;
  10. // 步入这里说明result当前值不为null,isDone0()和isCancelled0()都是终态,这里如果命中终态就返回false
  11. //(笔者注:其实可以这样认为,这里result不能为null,如果不为终态,它只能是UNCANCELLABLE属性实例)
  12. return !isDone0(result) || !isCancelled0(result);
  13. }
  14. @Override
  15. public boolean isSuccess() {
  16. Object result = this.result;
  17. // 如果执行成功,则结果不为null,同时不为UNCANCELLABLE,同时不为CauseHolder类型
  18. //(笔者注:其实可以这样认为,Promise为成功,则result只能是一个开发者定义的实例或者SUCCESS属性实例)
  19. return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
  20. }
  21. @Override
  22. public boolean isCancellable() {
  23. // 是否可取消的,result为null说明Promise处于初始化状态尚未执行,则认为可以取消
  24. return result == null;
  25. }
  26. @Override
  27. public Throwable cause() {
  28. // 通过当前result获取Throwable实例
  29. return cause0(result);
  30. }
  31. private Throwable cause0(Object result) {
  32. // result非CauseHolder类型,则直接返回null
  33. if (!(result instanceof CauseHolder)) {
  34. return null;
  35. }
  36. // 如果result为CANCELLATION_CAUSE_HOLDER(静态CancellationException的持有)
  37. if (result == CANCELLATION_CAUSE_HOLDER) {
  38. // 则新建一个自定义LeanCancellationException实例
  39. CancellationException ce = new LeanCancellationException();
  40. // 如果CAS更新结果result为LeanCancellationException新实例则返回
  41. if (RESULT_UPDATER.compareAndSet(this,CANCELLATION_CAUSE_HOLDER,new CauseHolder(ce))) {
  42. return ce;
  43. }
  44. // 走到这里说明了result是非CANCELLATION_CAUSE_HOLDER的自定义CauseHolder实例
  45. result = this.result;
  46. }
  47. // 兜底返回CauseHolder持有的cause
  48. return ((CauseHolder) result).cause;
  49. }
  50. // 静态方法,判断Promise是否为取消,依据是result必须是CauseHolder类型,同时CauseHolder中的cause必须为CancellationException类型或者其子类
  51. private static boolean isCancelled0(Object result) {
  52. return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
  53. }
  54. // 静态方法,判断Promise是否完成,依据是result不为null同时不为UNCANCELLABLE属性实例
  55. private static boolean isDone0(Object result) {
  56. return result != null && result != UNCANCELLABLE;
  57. }
  58. // 判断Promise实例是否取消
  59. @Override
  60. public boolean isCancelled() {
  61. return isCancelled0(result);
  62. }
  63. // 判断Promise实例是否完成
  64. @Override
  65. public boolean isDone() {
  66. return isDone0(result);
  67. }
  68. // ... 省略其他代码 ...
  69. }

接着看监听器的添加和移除方法(这其中也包含了通知监听器的逻辑):

  1. public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
  2. // ... 省略其他代码 ...
  3. @Override
  4. public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
  5. // 入参非空校验
  6. checkNotNull(listener,"listener");
  7. // 加锁,锁定的对象是Promise实例自身
  8. synchronized (this) {
  9. // 添加监听器
  10. addListener0(listener);
  11. }
  12. // 如果Promise实例已经执行完毕,则通知监听器进行回调
  13. if (isDone()) {
  14. notifyListeners();
  15. }
  16. return this;
  17. }
  18. @Override
  19. public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
  20. // 入参非空校验
  21. checkNotNull(listeners,"listeners");
  22. // 加锁,锁定的对象是Promise实例自身
  23. synchronized (this) {
  24. // 遍历入参数组添加监听器,有空元素直接跳出
  25. for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
  26. if (listener == null) {
  27. break;
  28. }
  29. addListener0(listener);
  30. }
  31. }
  32. // 如果Promise实例已经执行完毕,则通知监听器进行回调
  33. if (isDone()) {
  34. notifyListeners();
  35. }
  36. return this;
  37. }
  38. @Override
  39. public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
  40. // 入参非空校验
  41. checkNotNull(listener,"listener");
  42. // 加锁,锁定的对象是Promise实例自身
  43. synchronized (this) {
  44. // 移除监听器
  45. removeListener0(listener);
  46. }
  47. return this;
  48. }
  49. @Override
  50. public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
  51. // 入参非空校验
  52. checkNotNull(listeners,"listeners");
  53. // 加锁,锁定的对象是Promise实例自身
  54. synchronized (this) {
  55. // 遍历入参数组移除监听器,有空元素直接跳出
  56. for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
  57. if (listener == null) {
  58. break;
  59. }
  60. removeListener0(listener);
  61. }
  62. }
  63. return this;
  64. }
  65. private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
  66. // 如果Promise实例持有listeners为null,则直接设置为入参listener
  67. if (listeners == null) {
  68. listeners = listener;
  69. } else if (listeners instanceof DefaultFutureListeners) {
  70. // 如果当前Promise实例持有listeners的是DefaultFutureListeners类型,则调用它的add()方法进行添加
  71. ((DefaultFutureListeners) listeners).add(listener);
  72. } else {
  73. // 步入这里说明当前Promise实例持有listeners为单个GenericFutureListener实例,需要转换为DefaultFutureListeners实例
  74. listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners,listener);
  75. }
  76. }
  77. private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
  78. // 如果当前Promise实例持有listeners的是DefaultFutureListeners类型,则调用它的remove()方法进行移除
  79. if (listeners instanceof DefaultFutureListeners) {
  80. ((DefaultFutureListeners) listeners).remove(listener);
  81. } else if (listeners == listener) {
  82. // 如果当前Promise实例持有listeners不为DefaultFutureListeners类型,也就是单个GenericFutureListener并且和传入的listener相同,
  83. // 则Promise实例持有listeners置为null
  84. listeners = null;
  85. }
  86. }
  87. private void notifyListeners() {
  88. EventExecutor executor = executor();
  89. // 当前执行线程是事件循环线程,那么直接同步调用,简单来说就是调用notifyListeners()方法的线程和EventExecutor是同一个线程
  90. if (executor.inEventLoop()) {
  91. // 下面的ThreadLocal和listenerStackDepth是调用栈深度保护相关,博文会另起一个章节专门讲解这个问题,这里可以暂时忽略
  92. final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
  93. final int stackDepth = threadLocals.futureListenerStackDepth();
  94. if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
  95. threadLocals.setFutureListenerStackDepth(stackDepth + 1);
  96. try {
  97. notifyListenersNow();
  98. } finally {
  99. threadLocals.setFutureListenerStackDepth(stackDepth);
  100. }
  101. return;
  102. }
  103. }
  104. // 当前执行线程不是事件循环线程,则把notifyListenersNow()包装为Runnable实例放到EventExecutor中执行
  105. safeExecute(executor,new Runnable() {
  106. @Override
  107. public void run() {
  108. notifyListenersNow();
  109. }
  110. });
  111. }
  112. // 使用EventExecutor进行任务执行,execute()方法抛出的异常会使用rejectedExecutionLogger句柄打印
  113. private static void safeExecute(EventExecutor executor,Runnable task) {
  114. try {
  115. executor.execute(task);
  116. } catch (Throwable t) {
  117. rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?",t);
  118. }
  119. }
  120. // 马上通知所有监听器进行回调
  121. private void notifyListenersNow() {
  122. Object listeners;
  123. // 这里加锁,在锁的保护下设置notifyingListeners的值,如果多个线程调用同一个Promise实例的notifyListenersNow()方法
  124. // 命中notifyingListeners的线程可以直接返回
  125. synchronized (this) {
  126. // Only proceed if there are listeners to notify and we are not already notifying listeners.
  127. if (notifyingListeners || this.listeners == null) {
  128. return;
  129. }
  130. notifyingListeners = true;
  131. // 临时变量listeners存放瞬时的监听器实例,方便下一步设置Promise实例的listeners为null
  132. listeners = this.listeners;
  133. // 重置当前Promise实例的listeners为null
  134. this.listeners = null;
  135. }
  136. for (;;) {
  137. if (listeners instanceof DefaultFutureListeners) {
  138. // 多个监听器情况下的通知
  139. notifyListeners0((DefaultFutureListeners) listeners);
  140. } else {
  141. // 单个监听器情况下的通知
  142. notifyListener0(this,(GenericFutureListener<?>) listeners);
  143. }
  144. synchronized (this) {
  145. if (this.listeners == null) {
  146. // 这里因为没有异常抛出的可能,不用在finally块中编写,重置notifyingListeners为false并且返回跳出循环
  147. notifyingListeners = false;
  148. return;
  149. }
  150. // 临时变量listeners存放瞬时的监听器实例,回调操作判断是基于临时实例去做 - 这里可能由另一个线程更新了listeners的值
  151. listeners = this.listeners;
  152. // 重置当前Promise实例的listeners为null,确保监听器只会被回调一次,下一次跳出for死循环
  153. this.listeners = null;
  154. }
  155. }
  156. }
  157. // 遍历DefaultFutureListeners中的listeners数组,调用静态方法notifyListener0()
  158. private void notifyListeners0(DefaultFutureListeners listeners) {
  159. GenericFutureListener<?>[] a = listeners.listeners();
  160. int size = listeners.size();
  161. for (int i = 0; i < size; i ++) {
  162. notifyListener0(this,a[i]);
  163. }
  164. }
  165. // 这个静态方法是最终监听器回调的方法,也就是简单调用GenericFutureListener#operationComplete()传入的是当前的Promise实例,捕获一切异常打印warn日志
  166. @SuppressWarnings({ "unchecked","rawtypes" })
  167. private static void notifyListener0(Future future,GenericFutureListener l) {
  168. try {
  169. l.operationComplete(future);
  170. } catch (Throwable t) {
  171. if (logger.isWarnEnabled()) {
  172. logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()",t);
  173. }
  174. }
  175. }
  176. }

然后看wait()sync()方法体系:

  1. public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
  2. // ... 省略其他代码 ...
  3. @Override
  4. public Promise<V> await() throws InterruptedException {
  5. // 如果Promise执行完毕,直接返回
  6. if (isDone()) {
  7. return this;
  8. }
  9. // 如果当前线程中断则直接抛出InterruptedException
  10. if (Thread.interrupted()) {
  11. throw new InterruptedException(toString());
  12. }
  13. // 死锁检测
  14. checkDeadLock();
  15. // 加锁,加锁对象是当前Promise实例
  16. synchronized (this) {
  17. // 这里设置一个死循环,终止条件是isDone()为true
  18. while (!isDone()) {
  19. // 等待线程数加1
  20. incWaiters();
  21. try {
  22. // 这里调用的是Object#wait()方法进行阻塞,如果线程被中断会抛出InterruptedException
  23. wait();
  24. } finally {
  25. // 解除阻塞后等待线程数减1
  26. decWaiters();
  27. }
  28. }
  29. }
  30. return this;
  31. }
  32. @Override
  33. public Promise<V> awaitUninterruptibly() {
  34. // 如果Promise执行完毕,直接返回
  35. if (isDone()) {
  36. return this;
  37. }
  38. // 死锁检测
  39. checkDeadLock();
  40. boolean interrupted = false;
  41. // 加锁,加锁对象是当前Promise实例
  42. synchronized (this) {
  43. // 这里设置一个死循环,终止条件是isDone()为true
  44. while (!isDone()) {
  45. // 等待线程数加1
  46. incWaiters();
  47. try {
  48. // 这里调用的是Object#wait()方法进行阻塞,捕获了InterruptedException异常,如果抛出InterruptedException记录线程的中断状态到interrupted
  49. wait();
  50. } catch (InterruptedException e) {
  51. // Interrupted while waiting.
  52. interrupted = true;
  53. } finally {
  54. // 解除阻塞后等待线程数减1
  55. decWaiters();
  56. }
  57. }
  58. }
  59. // 如果线程被中断跳出等待阻塞,则清除线程的中断标志位
  60. if (interrupted) {
  61. Thread.currentThread().interrupt();
  62. }
  63. return this;
  64. }
  65. // 后面的几个带超时时限的wait()方法都是调用await0()
  66. @Override
  67. public boolean await(long timeout,TimeUnit unit) throws InterruptedException {
  68. return await0(unit.toNanos(timeout),true);
  69. }
  70. @Override
  71. public boolean await(long timeoutMillis) throws InterruptedException {
  72. return await0(MILLISECONDS.toNanos(timeoutMillis),true);
  73. }
  74. @Override
  75. public boolean awaitUninterruptibly(long timeout,TimeUnit unit) {
  76. try {
  77. return await0(unit.toNanos(timeout),false);
  78. } catch (InterruptedException e) {
  79. // Should not be raised at all.
  80. throw new InternalError();
  81. }
  82. }
  83. @Override
  84. public boolean awaitUninterruptibly(long timeoutMillis) {
  85. try {
  86. return await0(MILLISECONDS.toNanos(timeoutMillis),false);
  87. } catch (InterruptedException e) {
  88. // Should not be raised at all.
  89. throw new InternalError();
  90. }
  91. }
  92. // 检查死锁,这里判断了等待线程是事件循环线程则直接抛出BlockingOperationException异常
  93. // 简单来说就是:Promise的执行线程和等待结果的线程,不能是同一个线程,否则依赖会成环
  94. protected void checkDeadLock() {
  95. EventExecutor e = executor();
  96. if (e != null && e.inEventLoop()) {
  97. throw new BlockingOperationException(toString());
  98. }
  99. }
  100. @Override
  101. public Promise<V> sync() throws InterruptedException {
  102. // 同步永久阻塞等待
  103. await();
  104. // 阻塞等待解除,如果执行存在异常,则直接抛出
  105. rethrowIfFailed();
  106. return this;
  107. }
  108. @Override
  109. public Promise<V> syncUninterruptibly() {
  110. // 同步永久阻塞等待 - 响应中断
  111. awaitUninterruptibly();
  112. // 塞等待解除,如果执行存在异常,则直接抛出
  113. rethrowIfFailed();
  114. return this;
  115. }
  116. // waiters加1,如果超过Short.MAX_VALUE则抛出IllegalStateException
  117. private void incWaiters() {
  118. if (waiters == Short.MAX_VALUE) {
  119. throw new IllegalStateException("too many waiters: " + this);
  120. }
  121. ++waiters;
  122. }
  123. // waiters减1
  124. private void decWaiters() {
  125. --waiters;
  126. }
  127. // cause不为null则抛出
  128. private void rethrowIfFailed() {
  129. Throwable cause = cause();
  130. if (cause == null) {
  131. return;
  132. }
  133. PlatformDependent.throwException(cause);
  134. }
  135. private boolean await0(long timeoutNanos,boolean interruptable) throws InterruptedException {
  136. // 如果Promise执行完毕,直接返回
  137. if (isDone()) {
  138. return true;
  139. }
  140. // 如果超时时限小于0那么返回isDone()的结果
  141. if (timeoutNanos <= 0) {
  142. return isDone();
  143. }
  144. // 如果允许中断,当前线程的中断标志位为true,则抛出InterruptedException
  145. if (interruptable && Thread.interrupted()) {
  146. throw new InterruptedException(toString());
  147. }
  148. // 死锁检测
  149. checkDeadLock();
  150. // 记录当前的纳秒时间戳
  151. long startTime = System.nanoTime();
  152. // 等待时间的长度 - 单位为纳秒
  153. long waitTime = timeoutNanos;
  154. // 记录线程是否被中断
  155. boolean interrupted = false;
  156. try {
  157. // 死循环
  158. for (;;) {
  159. synchronized (this) {
  160. // 如果Promise执行完毕,直接返回true - 这一步是先验判断,命中了就不需要阻塞等待
  161. if (isDone()) {
  162. return true;
  163. }
  164. // 等待线程数加1
  165. incWaiters();
  166. try {
  167. // 这里调用的是带超时时限的Object#wait()方法进行阻塞
  168. wait(waitTime / 1000000,(int) (waitTime % 1000000));
  169. } catch (InterruptedException e) {
  170. // 线程被中断并且外部允许中断,那么直接抛出InterruptedException
  171. if (interruptable) {
  172. throw e;
  173. } else {
  174. // 否则只记录中断过的状态
  175. interrupted = true;
  176. }
  177. } finally {
  178. // 解除阻塞后等待线程数减1
  179. decWaiters();
  180. }
  181. }
  182. // 解除阻塞后,如果Promise执行完毕,直接返回true
  183. if (isDone()) {
  184. return true;
  185. } else {
  186. // 步入这里说明Promise尚未执行完毕,则重新计算等待时间间隔的长度数量(修正),如果大于0则进入下一轮循环
  187. waitTime = timeoutNanos - (System.nanoTime() - startTime);
  188. if (waitTime <= 0) {
  189. return isDone();
  190. }
  191. }
  192. }
  193. } finally {
  194. // 如果线程被中断跳出等待阻塞,则清除线程的中断标志位
  195. if (interrupted) {
  196. Thread.currentThread().interrupt();
  197. }
  198. }
  199. }
  200. // ... 省略其他代码 ...
  201. }

最后是几个设置结果和获取结果的方法

  1. public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
  2. // ... 省略其他代码 ...
  3. @Override
  4. public Promise<V> setSuccess(V result) {
  5. // 设置成功结果,如果设置成功则返回当前Promise实例
  6. if (setSuccess0(result)) {
  7. return this;
  8. }
  9. // 设置失败说明了多次设置,Promise已经执行完毕,则抛出异常
  10. throw new IllegalStateException("complete already: " + this);
  11. }
  12. @Override
  13. public boolean trySuccess(V result) {
  14. // 设置成功结果,返回的布尔值表示成功或失败
  15. return setSuccess0(result);
  16. }
  17. @Override
  18. public Promise<V> setFailure(Throwable cause) {
  19. // 设置失败结果,如果设置成功则返回当前Promise实例
  20. if (setFailure0(cause)) {
  21. return this;
  22. }
  23. // 设置失败说明了多次设置,Promise已经执行完毕,则抛出异常
  24. throw new IllegalStateException("complete already: " + this,cause);
  25. }
  26. @Override
  27. public boolean tryFailure(Throwable cause) {
  28. // 设置失败结果,返回的布尔值表示成功或失败
  29. return setFailure0(cause);
  30. }
  31. @SuppressWarnings("unchecked")
  32. @Override
  33. public V getNow() {
  34. // 非阻塞获取结果,如果result是CauseHolder类型、SUCCESS属性实例或者UNCANCELLABLE实行实例则返回null,否则返回转换类型后的result值
  35. // 对异常无感知,如果CauseHolder包裹了异常,此方法依然返回null
  36. Object result = this.result;
  37. if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
  38. return null;
  39. }
  40. return (V) result;
  41. }
  42. @SuppressWarnings("unchecked")
  43. @Override
  44. public V get() throws InterruptedException,ExecutionException {
  45. // 永久阻塞获取结果
  46. Object result = this.result;
  47. // 如果Promise未执行完毕则进行永久阻塞等待
  48. if (!isDone0(result)) {
  49. await();
  50. // 更新结果临时变量
  51. result = this.result;
  52. }
  53. // result为SUCCESS属性实例或者UNCANCELLABLE属性实例的时候直接返回null
  54. if (result == SUCCESS || result == UNCANCELLABLE) {
  55. return null;
  56. }
  57. // 如果result为CauseHolder类型,则获取其中持有的cause属性,也有可能为null
  58. Throwable cause = cause0(result);
  59. if (cause == null) {
  60. // 执行成功的前提下转换类型后的result值返回
  61. return (V) result;
  62. }
  63. // 取消的情况,抛出CancellationException
  64. if (cause instanceof CancellationException) {
  65. throw (CancellationException) cause;
  66. }
  67. // 剩余的情况一律封装为ExecutionException异常
  68. throw new ExecutionException(cause);
  69. }
  70. @SuppressWarnings("unchecked")
  71. @Override
  72. public V get(long timeout,TimeoutException {
  73. // 带超时时限的阻塞获取结果
  74. Object result = this.result;
  75. // 如果Promise未执行完毕则进行带超时时限的阻塞等待
  76. if (!isDone0(result)) {
  77. if (!await(timeout,unit)) {
  78. // 等待超时直接抛出TimeoutException
  79. throw new TimeoutException();
  80. }
  81. // 更新结果临时变量
  82. result = this.result;
  83. }
  84. // result为SUCCESS属性实例或者UNCANCELLABLE属性实例的时候直接返回null
  85. if (result == SUCCESS || result == UNCANCELLABLE) {
  86. return null;
  87. }
  88. // 如果result为CauseHolder类型,则获取其中持有的cause属性,也有可能为null
  89. Throwable cause = cause0(result);
  90. if (cause == null) {
  91. // 执行成功的前提下转换类型后的result值返回
  92. return (V) result;
  93. }
  94. // 取消的情况,抛出CancellationException
  95. if (cause instanceof CancellationException) {
  96. throw (CancellationException) cause;
  97. }
  98. // 剩余的情况一律封装为ExecutionException异常
  99. throw new ExecutionException(cause);
  100. }
  101. @Override
  102. public boolean cancel(boolean mayInterruptIfRunning) {
  103. // CAS更新result为CANCELLATION_CAUSE_HOLDER,result的期望值必须为null
  104. if (RESULT_UPDATER.compareAndSet(this,CANCELLATION_CAUSE_HOLDER)) {
  105. // 判断是否需要进行等待线程的通知
  106. if (checkNotifyWaiters()) {
  107. // 通知监听器进行回调
  108. notifyListeners();
  109. }
  110. return true;
  111. }
  112. return false;
  113. }
  114. private boolean setSuccess0(V result) {
  115. // 设置执行成功的结果,如果入参result为null,则选用SUCCESS属性,否则使用result
  116. return setValue0(result == null ? SUCCESS : result);
  117. }
  118. private boolean setFailure0(Throwable cause) {
  119. // 设置执行失败的结果,入参是Throwable类型,封装为CauseHolder,存放在CauseHolder实例的cause属性
  120. return setValue0(new CauseHolder(checkNotNull(cause,"cause")));
  121. }
  122. private boolean setValue0(Object objResult) {
  123. // CAS更新result为入参objResult,result的期望值必须为null或者UNCANCELLABLE才能更新成功
  124. if (RESULT_UPDATER.compareAndSet(this,objResult) || RESULT_UPDATER.compareAndSet(this,UNCANCELLABLE,objResult)) {
  125. // 判断是否需要进行等待线程的通知
  126. if (checkNotifyWaiters()) {
  127. // 通知监听器进行回调
  128. notifyListeners();
  129. }
  130. return true;
  131. }
  132. return false;
  133. }
  134. // 判断是否需要进行等待线程的通知 - 其实是判断是否需要通知监听器回调
  135. private synchronized boolean checkNotifyWaiters() {
  136. // 如果等待线程数量大于0则调用Object#notifyAll()唤醒所有等待线程
  137. if (waiters > 0) {
  138. notifyAll();
  139. }
  140. // 如果listeners不为空(也就是存在监听器)的时候才返回true
  141. return listeners != null;
  142. }
  143. // ... 省略其他代码 ...
  144. }

Promise的基本使用

要使用NettyPromise模块,并不需要引入Netty的所有依赖,这里只需要引入netty-common

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-common</artifactId>
  4. <version>4.1.44.Final</version>
  5. </dependency>

EventExecutor选取方面,Netty已经准备了一个GlobalEventExecutor用于全局事件处理,这里可以直接选用(当然也可以自行实现EventExecutor或者用EventExecutor的其他实现类):

  1. EventExecutor executor = GlobalEventExecutor.INSTANCE;
  2. Promise<String> promise = new DefaultPromise<>(executor);

这里设计一个场景:异步下载一个链接的资源到磁盘上,下载完成之后需要异步通知下载完的磁盘文件路径,得到通知之后打印下载结果到控制台中。

  1. public class PromiseMain {
  2. public static void main(String[] args) throws Exception {
  3. String url = "http://xxx.yyy.zzz";
  4. EventExecutor executor = GlobalEventExecutor.INSTANCE;
  5. Promise<DownloadResult> promise = new DefaultPromise<>(executor);
  6. promise.addListener(new DownloadResultListener());
  7. Thread thread = new Thread(() -> {
  8. try {
  9. System.out.println("开始下载资源,url:" + url);
  10. long start = System.currentTimeMillis();
  11. // 模拟下载耗时
  12. Thread.sleep(2000);
  13. String location = "C:\\xxx\\yyy\\z.md";
  14. long cost = System.currentTimeMillis() - start;
  15. System.out.println(String.format("下载资源成功,url:%s,保存到:%s,耗时:%d ms",url,location,cost));
  16. DownloadResult result = new DownloadResult();
  17. result.setUrl(url);
  18. result.setFileDiskLocation(location);
  19. result.setCost(cost);
  20. // 通知结果
  21. promise.setSuccess(result);
  22. } catch (Exception ignore) {
  23. }
  24. },"Download-Thread");
  25. thread.start();
  26. Thread.sleep(Long.MAX_VALUE);
  27. }
  28. @Data
  29. private static class DownloadResult {
  30. private String url;
  31. private String fileDiskLocation;
  32. private long cost;
  33. }
  34. private static class DownloadResultListener implements GenericFutureListener<Future<DownloadResult>> {
  35. @Override
  36. public void operationComplete(Future<DownloadResult> future) throws Exception {
  37. if (future.isSuccess()) {
  38. DownloadResult downloadResult = future.getNow();
  39. System.out.println(String.format("下载完成通知,文件磁盘路径:%s,downloadResult.getUrl(),downloadResult.getFileDiskLocation(),downloadResult.getCost()));
  40. }
  41. }
  42. }
  43. }

执行后控制台输出

  1. 开始下载资源,url:http://xxx.yyy.zzz
  2. 下载资源成功,url:http://xxx.yyy.zzz,保存到:C:\xxx\yyy\z.md,耗时:2000 ms
  3. 下载完成通知,文件磁盘路径:C:\xxx\yyy\z.md,耗时:2000 ms

Promise适用的场景很多,除了异步通知的场景也能用于同步调用,它在设计上比JUCFuture灵活很多,基于Future扩展出很多新的特性,有需要的可以单独引入此依赖直接使用。

Promise监听器栈深度的问题

有些时候,由于封装或者人为编码异常等原因,监听器的回调可能出现基于多个Promise形成的链(参考Issue-5302a promise listener chain),这样子有可能出现递归调用深度过大而导致栈溢出,因此需要设置一个阈值,限制递归调用的最大栈深度,这个深度阈值暂且称为栈深度保护阈值,默认值是8,可以通过系统参数io.netty.defaultPromise.maxListenerStackDepth覆盖设置。这里贴出前面提到过的代码块:

  1. private void notifyListeners() {
  2. EventExecutor executor = executor();
  3. // 事件执行器必须是事件循环类型,也就是executor.inEventLoop()为true的时候才启用递归栈深度保护
  4. if (executor.inEventLoop()) {
  5. // 获取当前线程绑定的InternalThreadLocalMap实例,这里类似于ThreadLocal
  6. final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
  7. // 获取当前线程的监听器调用栈深度
  8. final int stackDepth = threadLocals.futureListenerStackDepth();
  9. // 监听器调用栈深度如果不超过阈值MAX_LISTENER_STACK_DEPTH
  10. if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
  11. // 调用notifyListenersNow()前先设置监听器调用栈深度 + 1
  12. threadLocals.setFutureListenerStackDepth(stackDepth + 1);
  13. try {
  14. notifyListenersNow();
  15. } finally {
  16. // 调用notifyListenersNow()完毕后设置监听器调用栈深度为调用前的数值,也就是恢复线程的监听器调用栈深度
  17. threadLocals.setFutureListenerStackDepth(stackDepth);
  18. }
  19. return;
  20. }
  21. }
  22. // 如果监听器调用栈深度超过阈值MAX_LISTENER_STACK_DEPTH,则直接每次通知监听器当成一个新的异步任务处理
  23. safeExecute(executor,new Runnable() {
  24. @Override
  25. public void run() {
  26. notifyListenersNow();
  27. }
  28. });
  29. }

如果我们想模拟一个例子触发监听器调用栈深度保护,那么只需要想办法在同一个EventLoop类型的线程中递归调用notifyListeners()方法即可。

最典型的例子就是在上一个Promise监听器回调的方法里面触发下一个Promise的监听器的setSuccess()(简单理解就是套娃),画个图理解一下:

测试代码

  1. public class PromiseListenerMain {
  2. private static final AtomicInteger COUNTER = new AtomicInteger(0);
  3. public static void main(String[] args) throws Exception {
  4. EventExecutor executor = ImmediateEventExecutor.INSTANCE;
  5. // root
  6. Promise<String> root = new DefaultPromise<>(executor);
  7. Promise<String> p1 = new DefaultPromise<>(executor);
  8. Promise<String> p2 = new DefaultPromise<>(executor);
  9. Promise<String> p3 = new DefaultPromise<>(executor);
  10. Promise<String> p4 = new DefaultPromise<>(executor);
  11. Promise<String> p5 = new DefaultPromise<>(executor);
  12. Promise<String> p6 = new DefaultPromise<>(executor);
  13. Promise<String> p7 = new DefaultPromise<>(executor);
  14. Promise<String> p8 = new DefaultPromise<>(executor);
  15. Promise<String> p9 = new DefaultPromise<>(executor);
  16. Promise<String> p10 = new DefaultPromise<>(executor);
  17. p1.addListener(new Listener(p2));
  18. p2.addListener(new Listener(p3));
  19. p3.addListener(new Listener(p4));
  20. p4.addListener(new Listener(p5));
  21. p5.addListener(new Listener(p6));
  22. p6.addListener(new Listener(p7));
  23. p7.addListener(new Listener(p8));
  24. p8.addListener(new Listener(p9));
  25. p9.addListener(new Listener(p10));
  26. root.addListener(new Listener(p1));
  27. root.setSuccess("success");
  28. Thread.sleep(Long.MAX_VALUE);
  29. }
  30. private static class Listener implements GenericFutureListener<Future<String>> {
  31. private final String name;
  32. private final Promise<String> promise;
  33. public Listener(Promise<String> promise) {
  34. this.name = "listener-" + COUNTER.getAndIncrement();
  35. this.promise = promise;
  36. }
  37. @Override
  38. public void operationComplete(Future<String> future) throws Exception {
  39. System.out.println(String.format("监听器[%s]回调成功...",name));
  40. if (null != promise) {
  41. promise.setSuccess("success");
  42. }
  43. }
  44. }
  45. }

因为有safeExecute()兜底执行,上面的所有Promise都会回调,这里可以采用IDEA的高级断点功能,在步入断点的地方添加额外的日志,输出如下:

  1. MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
  2. 监听器[listener-9]回调成功...
  3. MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
  4. 监听器[listener-0]回调成功...
  5. MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
  6. 监听器[listener-1]回调成功...
  7. MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
  8. 监听器[listener-2]回调成功...
  9. MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
  10. 监听器[listener-3]回调成功...
  11. MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
  12. 监听器[listener-4]回调成功...
  13. MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
  14. 监听器[listener-5]回调成功...
  15. MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
  16. 监听器[listener-6]回调成功...
  17. safeExecute(notifyListenersNow)执行----------
  18. 监听器[listener-7]回调成功...
  19. safeExecute(notifyListenersNow)执行----------
  20. 监听器[listener-8]回调成功...

这里笔者有点疑惑,如果调用栈深度大于8,超出的部分会包装为Runnable实例提交到事件执行器执行,岂不是把递归栈溢出的隐患变成了内存溢出的隐患(因为异步任务也有可能积压,除非拒绝任务提交,那么具体要看EventExecutor的实现了)?

小结

Netty提供的Promise工具的源码和使用方式都分析完了,设计理念和代码都是十分值得借鉴,同时能够开箱即用,可以在日常编码中直接引入,减少重复造轮子的劳动和风险。

个人博客

(本文完 e-a-20200123 c-3-d)

技术公众号(《Throwable文摘》),不定期推送笔者原创技术文章(绝不抄袭或者转载):

娱乐公众号(《天天沙雕》),甄选奇趣沙雕图文和视频不定期推送,缓解生活工作压力:

猜你在找的netty相关文章