前两天看Spring框架参考手册,发现SpringMVC在4.0版本上支持异步请求处理。废话不多说,开始异步请求之旅。

什么是异步请求处理?

如果我们使用tomcat服务器来开发传统的servlet,那么用户的请求会经过以下流程进入到我们的servelt。

  1. 客户端发送http请求到tomcat监听的端口。
  2. tomcat connector会接收该请求到线程中,并根据http协议解析该请求。
  3. 解析完报文后,会初始化org.apache.coyote.Request,并实例化org.apache.coyote.Response。
  4. 经过装饰器转换为servelt的HttpServletRequest和HttpServletResponse。
  5. 经过tomcat的engine,host,context最终达到servlet的service方法。
  6. service方法中经过业务处理,将结果写入response中,并返回。
  7. 回到tomcat,tomcat会关闭response。
  8. tomcat释放掉该线程用于接收下一个请求。

看到以上的流程,我们可以得出一个结论:在service方法返回后,是绝对不能再写response,否则会报response已经关闭的异常。也就是说,在service中不能把一个response交给一个线程去运行,因为很可能service方法结束了,但线程中的response可能还在写。

那我们的确想把response放到另一个线程中去写数据该怎么办呢?servlet3标准支持了你的想法,并且把他称为异步servelt(async servlet)。tomcat从tomcat7开始也支持了servlet3.0标准。

upload successful

async servlet在tomcat中的流程是这样的:

  1. 客户端发送http请求到tomcat监听的端口。
  2. tomcat connector会接收该请求到线程中,并根据http协议解析该请求。
  3. 解析完报文后,会初始化org.apache.coyote.Request,并实例化org.apache.coyote.Response。
  4. 经过装饰器转换为servelt的HttpServletRequest和HttpServletResponse。
  5. 经过tomcat的engine,host,context最终达到servlet的service方法。
  6. service方法中开启异步化上下文(AsyncContext),在把response和AsyncContext交给别的线程后返回。
  7. 回到tomcat,tomcat判断该请求是否开启了异步化上下文,如果开启了,就不关闭response。
  8. tomcat释放掉该线程用于接收下一个请求。

异步请求处理的应用

如果想让你的web程序支持异步请求处理,首先得升级你的web.xml

web-app节点设置属性

<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    version="3.0">
  <!--提升xsd到3.0-->
</web-app>  

其次,需要在servelt的load-on-startup节点后增加async-supported节点。

<servlet>
        <servlet-name>xxxxx</servlet-name>
        <servlet-class>xxxxxServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
        <!--增加对异步的支持-->
        <async-supported>true</async-supported>
    </servlet>

如果你有filter,那么也需要在filter的filter-class节点后增加async-supported节点。

<filter>
        <filter-name>xxxxxFilter</filter-name>
        <filter-class>xxxxxFilter</filter-class>
        <!--增加对异步的支持-->
        <async-supported>true</async-supported>
</filter>

传统servlet开发的应用

在servlet中开启异步请求处理的代码如下

    // 在service方法中开启async
    AsyncContext context = request.startAsync(request, response); 
    
    // 将context和request和response交给自己定义的Runable接口实现类Async的实例,并提交到线程池中运行。
    // 在Async类实例的run方法处理完业务逻辑后,调用context.complete();即可正常结束该请求。
    threadPoolExecutor.execute(new Async(context,request,response))
    return;
    

使用SpringMVC开发的应用

spring配置

在SpringMVC中支持异步请求处理,需要加一项配置,我们以注解的方式为例。

在annotaion-driven节点内增加子节点async-support。

<mvc:annotation-driven>
    <!--增加对异步的支持 -->
    <!--default-timeout 表示这个异步的超时毫秒数,例如设置为5000s,表示5000s后强制断开连接,所以代码中的异步传输必须在5000s内结束-->
    <!--task-executor 表示这个异步运行在哪个线程池中,如果没有指定,默认使用SimpleAsyncTaskExecutor-->
    <mvc:async-support default-timeout="5000000" task-executor="your_thread_pool_executor">
        <!--callable 结果拦截器,可以没有-->
        <mvc:callable-interceptors>
            <bean class="callableInterceptor1"></bean>
            <bean class="callableInterceptor2"></bean>
        </mvc:callable-interceptors>
        <!--deferred 结果拦截器,可以没有-->
        <mvc:deferred-result-interceptors>
            <bean class="deferredResultInterceptor1"></bean>
            <bean class="deferredResultInterceptor2"></bean>
        </mvc:deferred-result-interceptors>
    </mvc:async-support>
</mvc:annotation-driven>

Callable和DeferredResult

最简单的使用就是使用Callable和DeferredResult返回一个对象,这个对象会被 * 发送Map会根据HttpMessageConverter被转为对应的字节转化为字节。

/**
 * 发送Map会根据HttpMessageConverter被转为对应的字节
 * @return
 */
@ResponseBody
@RequestMapping( value = "/callable" )
public Callable<Map<String, Integer> > callable()
{
    Callable<Map<String, Integer> > callable = new Callable<Map<String, Integer> >()
    {
        @Override
        public Map<String, Integer> call() throws Exception
        {
            try {
                Thread.sleep( 3000 );
            } catch ( InterruptedException e ) {
                e.printStackTrace();
            }
            Map<String, Integer> data = Maps.newHashMap();
            data.put( "ok", 1234 );
            return(data);
        }
    };
    return(callable);
}


/**
 * 发送Map会根据HttpMessageConverter被转为对应的字节
 * @return
 */
@ResponseBody
@RequestMapping( value = "/deferred" )
public DeferredResult<Map<String, Integer> > deferred()
{
    final DeferredResult<Map<String, Integer> > deferredResult = new DeferredResult<Map<String, Integer> >();
    new Thread( new Runnable()
            {
                @Override
                public void run()
                {
                    try {
                        Thread.sleep( 3000 );
                    } catch ( InterruptedException e ) {
                        e.printStackTrace();
                    }
                    Map<String, Integer> data = Maps.newHashMap();
                    data.put( "ok", 1234 );
                    deferredResult.setResult( data );
                }
            } ).start();
    return(deferredResult);
}

HTTP Streaming

ResponseBodyEmitter

Callable和DeferredResult只能异步返回一个数据,如果想在一个http连接上推送多个事件,也被成为长轮询(http 流)。

Spring MVC通过ResponseBodyEmitter返回值实现上述目标,而不是用@ResponseBody,其中发送的每一个Object都使用HttpMessageConverter写入。

前端接收较为困难,需要直接操作XHR。

这是一个例子:

@RequestMapping("/events")
public ResponseBodyEmitter handle() {
    ResponseBodyEmitter emitter = new ResponseBodyEmitter();
    // Save the emitter somewhere..
    return emitter;
}

// In some other thread
emitter.send("Hello once");

// and again later on
emitter.send("Hello again");

// and done at some point
emitter.complete();

HTTP Streaming With Server-Sent Events

SseEmitter是ResponseBodyEmitter的子类,它提供了对Server-Sent Events的支持。

/**
 * 前端用EventSource即可接收,而且发送完一个object后前端就能获取到由HttpMessageConverter转换的object的字节(一般是json)。
 * 需要限制时间到在xml中限制的时间
 * @return
 */
@RequestMapping( value = "/sse" )
public SseEmitter sse()
{
    final SseEmitter sseEmitter = new SseEmitter();
    /*
     * 假如需要立刻返回一个对象,就这这里发送一个对象即可。例如sseEmitter.send(dbService.query(xxx));
     * 这里放到了一个线程中发送数据,但实际上应该交给一个service或者component中。由专门的线程池或者线程去获取,查询操作,send数据
     */
    new Thread( new Runnable()
            {
                @Override
                public void run()
                {
                    for ( int i = 0; i < 3; i++ )
                    {
                        try {
                            Thread.sleep( 1500 );
                        } catch ( InterruptedException e ) {
                            e.printStackTrace();
                        }
                        Map<String, Integer> data = Maps.newHashMap();
                        data.put( "num", i );
                        try {
                            sseEmitter.send( data );
                        } catch ( IOException e ) {
                            e.printStackTrace();
                        }
                    }
                    sseEmitter.complete();
                }
            } ).start();
    return(sseEmitter);
}

前端使用EventSource接收,如果后端断开连接,默认会再次发起连接请求。

var source = new EventSource("/sse", {
    withCredentials: true
});

source.onopen = function(event) {
    console.log("EventSource open");
};

source.onmessage = function(event) {
    //var data = event.data;
    // handle message
    console.log("get message", event);
};

source.onerror = function(event) {
    // handle error event
    console.log(event);
};

// 如果后台关闭,前端也关闭
source.onclose = function(event) {
    console.log("get close", event);
    source.close();
}

注意:ie浏览器不支持该功能,如果需要支持ie等非先进浏览器,可以考虑使用websocket。

HTTP Streaming Directly To The OutputStream

ResponseBodyEmitter通过写对象,经过HttpMessageConverter转变为json数据发送到客户端。这也许是最常用的场景。但是有时候需要直接写OutputStream,例如下载。这个时候,可以使用StreamingResponseBody

下面是一个例子

/**
 * 直接发送bytes,适合下载数据用
 * @return
 */
@RequestMapping( value = "/direct" )
public StreamingResponseBody direct()
{
    StreamingResponseBody streamingResponseBody = new StreamingResponseBody()
    {
        @Override
        public void writeTo( OutputStream outputStream ) throws IOException
        {
            for ( int i = 0; i < 3; i++ )
            {
                try {
                    Thread.sleep( 3000 );
                } catch ( InterruptedException e ) {
                    e.printStackTrace();
                }
                try {
                    outputStream.write( ("some text" + i + "\n\n").getBytes() );
                } catch ( IOException e ) {
                    e.printStackTrace();
                }
            }
        }
    };
    return(streamingResponseBody);
}

总结

servlet3提供了异步的servlet,能够实现异步请求处理。socket依旧在,但是不归tomcat中http线程池的线程管。提前释放tomcat处理线程用于提高吞吐量,响应流不关闭,由业务方法自己处理。从这个角度来看基于servlet3的异步化完全有可能实现真正的服务端push。

标签: java, 并发, tomcat, springmvc, spring

添加新评论