Tomcat是如何实现异步Servlet的 - 掘金
DeferredResult解决了什么问题 - 掘金
SpringMVC 异步处理模式分析

内容概要

  • Tomcat 内部的异步请求实现
  • SpringMVCDeferredResult 如何对接异步请求的
  • Tomcat 过滤器在异步下的表现

Tomcat 内部的异步请求实现

我们直接借助 SpringBoot 框架来实现一个 Servlet,这里只展示 Servlet 代码:

  @WebServlet(urlPatterns = "/async", asyncSupported = true)
  @Slf4j
  public class AsyncServlet extends HttpServlet {
      ExecutorService executorService = Executors.newSingleThreadExecutor();
 
      @Override
      protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
          //开启异步,获取异步上下文    
          final AsyncContext ctx = req.startAsync();
          // 提交线程池异步执行    
          executorService.execute(new Runnable() {
              @Override
              public void run() {
                  try {
                      log.info("async Service 准备执行了");
                      //模拟耗时任务    
                      Thread.sleep(10000 L);
                      ctx.getResponse().getWriter().print("async servlet");
                      log.info("async Service 执行了");
                  } catch (IOException e) {
                      e.printStackTrace();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  //最后执行完成后完成回调。    
                  ctx.complete();
              }
          });
      }
  }

上面的代码实现了一个异步的 Servlet,实现了 doGet 方法注意在 SpringBoot 中使用需要再启动类加上 @ServletComponentScan 注解来扫描 Servlet。既然代码写好了,我们来看看实际运行效果。

我们发送一个请求后,看到页面有响应,同时,看到请求时间花费了 10.05s,那么我们这个 Servlet 算是能正常运行啦。有同学肯定会问,这不是异步 servlet 吗?你的响应时间并没有加快,有什么用呢?对,我们的响应时间并不能加快,还是会取决于我们的业务逻辑,但是我们的异步 servlet 请求后,依赖于业务的异步执行,我们可以立即返回,也就是说,Tomcat 的线程可以立即回收,默认情况下,Tomcat 的核心线程是 10 ,最大线程数是 200 ,我们能及时回收线程,也就意味着我们能处理更多的请求,能够增加我们的吞吐量,这也是异步 Servlet 的主要作用。

异步 Servlet 的内部原理

了解完异步 Servlet 的作用后,我们来看看,Tomcat 是如何是先异步 Servlet 的。其实上面的代码,主要核心逻辑就两部分,final AsyncContext ctx = req.startAsync()ctx.complete() 那我们来看看他们究竟做了什么?

   public AsyncContext startAsync(ServletRequest request,
       ServletResponse response) {
       if (!isAsyncSupported()) {
           IllegalStateException ise =
               new IllegalStateException(sm.getString("request.asyncNotSupported"));
           log.warn(sm.getString("coyoteRequest.noAsync",
               StringUtils.join(getNonAsyncClassNames())), ise);
           throw ise;
       }
 
       if (asyncContext == null) {
           asyncContext = new AsyncContextImpl(this);
       }
 
       asyncContext.setStarted(getContext(), request, response,
           request == getRequest() && response == getResponse().getResponse());
       asyncContext.setTimeout(getConnector().getAsyncTimeout());
 
       return asyncContext;
   }

我们发现 req.startAsync() 只是保存了一个异步上下文,同时设置一些基础信息,比如 Timeout,顺便提一下,这里设置的默认超时时间是 30S ,如果你的异步处理逻辑超过 30S ,此时执行 ctx.complete() 就会抛出 IllegalStateException 异常。

我们来看看 ctx.complete() 的逻辑

  public void complete() {
      if (log.isDebugEnabled()) {
          logDebug("complete   ");
      }
      check();
      request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);
  }
  //类:AbstractProcessor 
  public final void action(ActionCode actionCode, Object param) {
      case ASYNC_COMPLETE: {
          clearDispatches();
          if (asyncStateMachine.asyncComplete()) {
              processSocketEvent(SocketEvent.OPEN_READ, true);
          }
          break;
      }
  }
  //类:AbstractProcessor 
  protected void processSocketEvent(SocketEvent event, boolean dispatch) {
      SocketWrapperBase < ? > socketWrapper = getSocketWrapper();
      if (socketWrapper != null) {
          socketWrapper.processSocket(event, dispatch);
      }
  }
  //类:AbstractEndpoint
  public boolean processSocket(SocketWrapperBase < S > socketWrapper,
      SocketEvent event, boolean dispatch) {
      //省略部分代码
      SocketProcessorBase < S > sc = null;
      if (processorCache != null) {
          sc = processorCache.pop();
      }
      if (sc == null) {
          sc = createSocketProcessor(socketWrapper, event);
      } else {
          sc.reset(socketWrapper, event);
      }
      Executor executor = getExecutor();
      if (dispatch && executor != null) {
          executor.execute(sc);
      } else {
          sc.run();
      }
 
      return true;
  }

所以,这里最终会调用 AbstractEndpointprocessSocket 方法,之前看过我前面博客的同学应该有印象,EndPoint 是用来接受和处理请求的,接下来就会交给 Processor 去进行协议处理。

类: AbstractProcessorLight
public SocketState process(SocketWrapperBase < ? > socketWrapper, SocketEvent status)
throws IOException {
    //省略部分diam
    SocketState state = SocketState.CLOSED;
    Iterator < DispatchType > dispatches = null;
    do {
        if (dispatches != null) {
            DispatchType nextDispatch = dispatches.next();
            state = dispatch(nextDispatch.getSocketStatus());
        } else if (status == SocketEvent.DISCONNECT) {
 
        } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
            state = dispatch(status);
            if (state == SocketState.OPEN) {
                state = service(socketWrapper);
            }
        } else if (status == SocketEvent.OPEN_WRITE) {
            state = SocketState.LONG;
        } else if (status == SocketEvent.OPEN_READ) {
            state = service(socketWrapper);
        } else {
            state = SocketState.CLOSED;
        }
 
    } while (state == SocketState.ASYNC_END ||
        dispatches != null && state != SocketState.CLOSED);
 
    return state;
}

这部分是重点,AbstractProcessorLight 会根据 SocketEvent 的状态来判断是不是要去调用 service(socketWrapper),该方法最终会去调用到容器,从而完成业务逻辑的调用,我们这个请求是执行完成后调用的,肯定不能进容器了,不然就是死循环了,这里通过 isAsync() 判断,就会进入 dispatch(status),最终会调用 CoyoteAdapterasyncDispatch 方法

public boolean asyncDispatch(org.apache.coyote.Request req, org.apache.coyote.Response res,
    SocketEvent status) throws Exception {
    //省略部分代码  
    Request request = (Request) req.getNote(ADAPTER_NOTES);
    Response response = (Response) res.getNote(ADAPTER_NOTES);
    boolean success = true;
    AsyncContextImpl asyncConImpl = request.getAsyncContextInternal();
    try {
        if (!request.isAsync()) {
            response.setSuspended(false);
        }
 
        if (status == SocketEvent.TIMEOUT) {
            if (!asyncConImpl.timeout()) {
                asyncConImpl.setErrorState(null, false);
            }
        } else if (status == SocketEvent.ERROR) {
 
        }
 
        if (!request.isAsyncDispatching() && request.isAsync()) {
            WriteListener writeListener = res.getWriteListener();
            ReadListener readListener = req.getReadListener();
            if (writeListener != null && status == SocketEvent.OPEN_WRITE) {
                ClassLoader oldCL = null;
                try {
                    oldCL = request.getContext().bind(false, null);
                    res.onWritePossible(); //这里执行浏览器响应,写入数据
                    if (request.isFinished() && req.sendAllDataReadEvent() &&
                        readListener != null) {
                        readListener.onAllDataRead();
                    }
                } catch (Throwable t) {
 
                } finally {
                    request.getContext().unbind(false, oldCL);
                }
            }
        }
    }
    //这里判断异步正在进行,说明这不是一个完成方法的回调,是一个正常异步请求,继续调用容器。
    if (request.isAsyncDispatching()) {
        connector.getService().getContainer().getPipeline().getFirst().invoke(
            request, response);
        Throwable t = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
        if (t != null) {
            asyncConImpl.setErrorState(t, true);
        }
    }
    //注意,这里,如果超时或者出错,request.isAsync()会返回false,这里是为了尽快的输出错误给客户端。
    if (!request.isAsync()) {
        //这里也是输出逻辑
        request.finishRequest();
        response.finishResponse();
    }
    //销毁request和response
    if (!success || !request.isAsync()) {
        updateWrapperErrorCount(request, response);
        request.recycle();
        response.recycle();
    }
}
return success;
}

上面的代码就是 ctx.complete() 执行最终的方法了(当然省略了很多细节),完成了数据的输出,最终输出到浏览器。

这里有同学可能会说,我知道异步执行完后,调用 ctx.complete() 会输出到浏览器,但是,第一次 doGet 请求执行完成后,Tomcat 是怎么知道不用返回到客户端的呢?关键代码在 CoyoteAdapter 中的 service 方法,部分代码如下:

  postParseSuccess = postParseRequest(req, request, res, response);
  //省略部分代码
  if (postParseSuccess) {
      request.setAsyncSupported(
          connector.getService().getContainer().getPipeline().isAsyncSupported());
      connector.getService().getContainer().getPipeline().getFirst().invoke(
          request, response);
  }
  if (request.isAsync()) {
      async = true;
  } else {
      //输出数据到客户端
      request.finishRequest();
      response.finishResponse();
      if (!async) {
          updateWrapperErrorCount(request, response);
          //销毁request和response
          request.recycle();
          response.recycle();
      }

这部分代码在调用完 Servlet 后,会通过 request.isAsync() 来判断是否是异步请求,如果是异步请求,就设置 async = true。如果是非异步请求就执行输出数据到客户端逻辑,同时销毁 requestresponse。这里就完成了请求结束后不响应客户端的操作。

SpringMVCDeferredResult 如何对接异步请求的

1.编写 DeferredResult 返回类型 api

@GetMapping("/deferredresult/test")
public DeferredResult < String > testDeferredResult(long sleepTime) {
    DeferredResult < String > deferredResult = new DeferredResult < > (5000 L, "server side timeout");
    executorService.submit(() - > {
        try {
            Thread.sleep(sleepTime);
            deferredResult.setResult("server response successfully");
        } catch (InterruptedException e) {
            log.error("occur error", e);
        }
    });
    return deferredResult;
}

2.接口调用

525

这样就完成了 DeferredResult 异步调用,当然我们也可以在 DeferredResult 设置超时相关逻辑。

3.原理与源码分析

为了方便理解,找了一张图来看一下 DeferredResult 做了什么事情。
525

  • 接收到请求后,将请求暂存并且释放容器线程,用来接收新的请求
  • 容器超时逻辑和业务正常处理逻辑将结果塞到 DeferredResult 返回调用

spring 对于 DeferredResult 请求处理

1.请求预处理

当然 DeferredResult 处理逻辑也脱离不了 spring mvc 的支持,也是要走到 DispatcherServlet 来处理请求:

protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {
    HttpServletRequest processedRequest = request;
    HandlerExecutionChain mappedHandler = null;
    boolean multipartRequestParsed = false;
    //1.生成异步管理器  
    WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
    try {
        try {
            ModelAndView mv = null;
            Object dispatchException = null;
            try {
                //省略…  
                //2.异步处理逻辑  
                mv = ha.handle(processedRequest, response, mappedHandler.getHandler());
                //3.如果异步处理一开始,返回调用  
                if (asyncManager.isConcurrentHandlingStarted()) {
                    return;
                }
                this.applyDefaultViewName(processedRequest, mv);
                mappedHandler.applyPostHandle(processedRequest, response, mv);
            } catch (Exception var20) {
                dispatchException = var20;
            } catch (Throwable var21) {
                dispatchException = new NestedServletException("Handler dispatch failed", var21);
            }
            this.processDispatchResult(processedRequest, response, mappedHandler, mv, (Exception) dispatchException);
        } catch (Exception var22) {
            this.triggerAfterCompletion(processedRequest, response, mappedHandler, var22);
        } catch (Throwable var23) {
            this.triggerAfterCompletion(processedRequest, response, mappedHandler, new NestedServletException("Handler processing failed", var23));
        }
    } finally {
        //省略…  
    }
}

对于支持 DeferredResult 异步处理逻辑有三个关键点:

  • 生成异步管理器
  • 执行异步处理逻辑
  • 如果异步处理已经开始,返回调用

HandleAdapter#handle 会调用到 DeferredResultMethodReturnValueHandler 的 handleReturnValue 方法:

public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
    if (returnValue == null) {
        mavContainer.setRequestHandled(true);
    } else {
        DeferredResult result;
        if (returnValue instanceof DeferredResult) {
            result = (DeferredResult) returnValue;
        } else if (returnValue instanceof ListenableFuture) {
            result = this.adaptListenableFuture((ListenableFuture) returnValue);
        } else {
            if (!(returnValue instanceof CompletionStage)) {
                throw new IllegalStateException("Unexpected return value type: " + returnValue);
            }
            result = this.adaptCompletionStage((CompletionStage) returnValue);
        }
        WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, new Object[] {
            mavContainer
        });
    }
}

进入 WebAsyncManagerstartDeferredResultProcessing 方法:

public void startDeferredResultProcessing(final DeferredResult < ? > deferredResult, Object… processingContext) throws Exception {
    //省略…  
    this.startAsyncProcessing(processingContext);
    try {
        interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
        deferredResult.setResultHandler((result) - > {
            result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
            this.setConcurrentResultAndDispatch(result);
        });
    } catch (Throwable var7) {
        this.setConcurrentResultAndDispatch(var7);
    }
}

startAsyncProcessing 方法开启异步处理 (asyncManager.isConcurrentHandlingStarted 会用到),然后进入 DeferredResult 的 setResultHandler 设置结果处理器:

public final void setResultHandler(DeferredResult.DeferredResultHandler resultHandler) {
    Assert.notNull(resultHandler, "DeferredResultHandler is required");
    if (!this.expired) {
        Object resultToHandle;
        synchronized(this) {
            if (this.expired) {
                return;
            }
            resultToHandle = this.result;
            if (resultToHandle == RESULT_NONE) {
                this.resultHandler = resultHandler;
                return;
            }
        }
        try {
            resultHandler.handleResult(resultToHandle);
        } catch (Throwable var5) {
            logger.debug("Failed to process async result", var5);
        }
    }
}

由于逻辑未处理,结果未设置,所以逻辑会走到设置结果处理器代码块,然后返回,此时返回值解析过程结束了,同时由于异步 servlet 的特性,tomcat 的连接也得到了释放。
预处理流程如下:
525

2.返回值处理

这个时候容器连接得到了释放,然而问题并没有解决,请求处理只完成了一半,业务处理返回值并没有真正返回。
我们在业务线程池处理调用了 DeferredResult 的 setResult 方法,最终会调用内部 setResultInternal:

private boolean setResultInternal(Object result) {
    if (this.isSetOrExpired()) {
        return false;
    } else {
        DeferredResult.DeferredResultHandler resultHandlerToUse;
        synchronized(this) {
            if (this.isSetOrExpired()) {
                return false;
            }
            this.result = result;
            resultHandlerToUse = this.resultHandler;
            if (resultHandlerToUse == null) {
                return true;
            }
            this.resultHandler = null;
        }
        resultHandlerToUse.handleResult(result);
        return true;
    }
}

这里会调用之前传入的函数式接口来处理:

deferredResult.setResultHandler((result) - > {
    result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
    this.setConcurrentResultAndDispatch(result);
});

然后调用 setConcurrentResultAndDispatch:

private void setConcurrentResultAndDispatch(Object result) {
    synchronized(this) {
        if (this.concurrentResult != RESULT_NONE) {
            return;
        }
        this.concurrentResult = result;
        this.errorHandlingInProgress = result instanceof Throwable;
    }
    if (this.asyncWebRequest.isAsyncComplete()) {
        //…  
    } else {
        this.asyncWebRequest.dispatch();
    }
}

如果异步处理完成则返回调用,否则执行异步请求分发,该段代码执行完成会发起一次新的请求到后台,又被 DispatcherServlet 类接收到(但是不会再进入 controller 了),最终将结果响应给调用方。

tomcat 容器维度对异步支持

我们再从容器维度对 DeferredResult 异步请求的处理做一下分析,分别是请求超时和主动 setResult 返回。

1.请求超时

565

Connector 是 tomcat 的最核心的组件之一,主要的职责就是负责接收客户端连接和客户端请求的处理加工,初始化和启动会执行 Protocal 相关初始化和启动操作,看一下 AbstractProtocol 的启动:

public void start() throws Exception {  
 if (this.getLog().isInfoEnabled()) {  
 this.getLog().info(sm.getString("abstractProtocolHandler.start", new Object[]{this.getName()}));  
 this.logPortOffset();  
 }  
 this.endpoint.start();  
 this.monitorFuture = this.getUtilityExecutor().scheduleWithFixedDelay(() -> {  
 this.startAsyncTimeout();  
 }, 0L, 60L, TimeUnit.SECONDS);  
}

延时 60 秒执行启动异步超时支持逻辑,调用 startAsyncTimeout:

protected void startAsyncTimeout() {
    if (this.timeoutFuture == null || this.timeoutFuture.isDone()) {
        //省略…  
        this.timeoutFuture = this.getUtilityExecutor().scheduleAtFixedRate(() - > {
            long now = System.currentTimeMillis();
            Iterator var3 = this.waitingProcessors.iterator();
            while (var3.hasNext()) {
                Processor processor = (Processor) var3.next();
                processor.timeoutAsync(now);
            }
        }, 1 L, 1 L, TimeUnit.SECONDS);
    }
}

异步请求会被放入 waitingProcessors 中,并且设置了超时时间,tomcat 会有一个线程每隔 1 秒遍历 waitingProcessors 里面的 processor,检查是否过期,如果过期会往 tomcat 线程池投掷超时事件:

private void doTimeoutAsync() {
    this.setAsyncTimeout(-1 L);
    this.asyncTimeoutGeneration = this.asyncStateMachine.getCurrentGeneration();
    this.processSocketEvent(SocketEvent.TIMEOUT, true);
}

线程池跑到这个任务的时候就知道这个已经超时请求任务,此时就会将超时值塞入到请求中,具体是通过之前设置的 DeferredResult 相关的拦截器中的 handleTimeout,比如 spring 自己提供的拦截器:

public class TimeoutDeferredResultProcessingInterceptor implements DeferredResultProcessingInterceptor {
    public TimeoutDeferredResultProcessingInterceptor() {}
    public < T > boolean handleTimeout(NativeWebRequest request, DeferredResult < T > result) throws Exception {
        result.setErrorResult(new AsyncRequestTimeoutException());
        return false;
    }
}

最终会把值放到管理异步请求 AsyncManager 中并重新下发请求交给 DispatcherServlet#doDispatch 处理,第二次进来的时候发现 AsyncManager 已经有值了,把结果进行包装然后直接返回调用了。
超时逻辑处理流程如下:
525

2.setResult 主动返回

业务线程在执行完逻辑,将结果塞回到 DeferredResult 时也会调用 setResultInternal,赋值完成后调用 AsyncWebRequest#dispatch 方法重新下发请求,DispatcherServlet 处理时发现 AsyncManager 已经有值了,封装后直接返回,后边逻辑和超时逻辑一样。
处理流程如下:
525

Tomcat 异步过滤器

Tomcat 创建过滤器的逻辑如下。

public static ApplicationFilterChain createFilterChain(ServletRequest request,
    Wrapper wrapper, Servlet servlet) {
 
    // If there is no servlet to execute, return null  
    if (servlet == null) {
        return null;
    }
 
    // Create and initialize a filter chain object  
    ApplicationFilterChain filterChain = null;
    if (request instanceof Request) {
        Request req = (Request) request;
        if (Globals.IS_SECURITY_ENABLED) {
            // Security: Do not recycle  
            filterChain = new ApplicationFilterChain();
        } else {
            filterChain = (ApplicationFilterChain) req.getFilterChain();
            if (filterChain == null) {
                filterChain = new ApplicationFilterChain();
                req.setFilterChain(filterChain);
            }
        }
    } else {
        // Request dispatcher in use  
        filterChain = new ApplicationFilterChain();
    }
 
    filterChain.setServlet(servlet);
    filterChain.setServletSupportsAsync(wrapper.isAsyncSupported());
 
    // Acquire the filter mappings for this Context  
    StandardContext context = (StandardContext) wrapper.getParent();
    FilterMap filterMaps[] = context.findFilterMaps();
 
    // If there are no filter mappings, we are done  
    if ((filterMaps == null) || (filterMaps.length == 0)) {
        return filterChain;
    }
 
    // Acquire the information we will need to match filter mappings  
    DispatcherType dispatcher =
        (DispatcherType) request.getAttribute(Globals.DISPATCHER_TYPE_ATTR);
 
    String requestPath = null;
    Object attribute = request.getAttribute(Globals.DISPATCHER_REQUEST_PATH_ATTR);
    if (attribute != null) {
        requestPath = attribute.toString();
    }
 
    String servletName = wrapper.getName();
 
    // Add the relevant path-mapped filters to this filter chain  
    for (FilterMap filterMap: filterMaps) {
        if (!matchDispatcher(filterMap, dispatcher)) {
            continue;
        }
        if (!matchFiltersURL(filterMap, requestPath)) {
            continue;
        }
        ApplicationFilterConfig filterConfig = (ApplicationFilterConfig)
        context.findFilterConfig(filterMap.getFilterName());
        if (filterConfig == null) {
            // FIXME - log configuration problem  
            continue;
        }
        filterChain.addFilter(filterConfig);
    }
 
    // Add filters that match on servlet name second  
    for (FilterMap filterMap: filterMaps) {
        if (!matchDispatcher(filterMap, dispatcher)) {
            continue;
        }
        if (!matchFiltersServlet(filterMap, servletName)) {
            continue;
        }
        ApplicationFilterConfig filterConfig = (ApplicationFilterConfig)
        context.findFilterConfig(filterMap.getFilterName());
        if (filterConfig == null) {
            // FIXME - log configuration problem  
            continue;
        }
        filterChain.addFilter(filterConfig);
    }
 
    // Return the completed filter chain  
    return filterChain;
}

简单讲就是

  • 匹配分发类型
  • 匹配拦截的 URL
  • 匹配过滤的 servlet

这里 matchDispatcher 在异步中会变成 javax.servlet.DispatcherType#ASYNC ,从而判断过滤器是否能够正常过滤。如果过滤器创建时,没有设置该类型,则无法匹配。
设置方案如下

FilterRegistration.Dynamic servletFilter = servletContext.addFilter("gzip", new GzipFilter());
// 分发器类型
EnumSet < DispatcherType > dispatcherTypes = EnumSet.of(DispatcherType.REQUEST, DispatcherType.FORWARD, DispatcherType.ERROR);
servletFilter.addMappingForUrlPatterns(dispatcherTypes, false, ServerConfig.getInstance().getServletMapping());
public enum DispatcherType {  
    /**  
     * {@link RequestDispatcher#forward(ServletRequest, ServletResponse)}  
     */  
    FORWARD,  
    /**  
     * {@link RequestDispatcher#include(ServletRequest, ServletResponse)}  
     */  
    INCLUDE,  
    /**  
     * Normal (non-dispatched) requests.     
     **/    
     REQUEST,  
    /**  
     * {@link AsyncContext#dispatch()}, {@link AsyncContext#dispatch(String)}  
     * and  
     * {@link AsyncContext#addListener(AsyncListener, ServletRequest, ServletResponse)}  
     */  
    ASYNC,  
    /**  
     * When the container has passed processing to the error handler mechanism     * such as a defined error page.     
     * 
     **/    
     ERROR  
}