Spring 4.x 支持异步请求处理

前两天看Spring框架参考手册,发现SpringMVC在4.0版本上支持异步请求处理。废话不多说,开始异步请求之旅。

什么是异步请求处理?

如果我们使用tomcat服务器来开发传统的servlet,那么用户的请求会经过以下流程进入到我们的servelt。

  1. 客户端发送http请求到tomcat监听的端口。
  2. tomcat connector会接收该请求到线程中,并根据http协议解析该请求。
  3. 解析完报文后,会初始化org.apache.coyote.Request,并实例化org.apache.coyote.Response。
  4. 经过装饰器转换为servelt的HttpServletRequest和HttpServletResponse。
  5. 经过tomcat的engine,host,context最终达到servlet的service方法。
  6. service方法中经过业务处理,将结果写入response中,并返回。
  7. 回到tomcat,tomcat会关闭response。
  8. tomcat释放掉该线程用于接收下一个请求。

看到以上的流程,我们可以得出一个结论:在service方法返回后,是绝对不能再写response,否则会报response已经关闭的异常。也就是说,在service中不能把一个response交给一个线程去运行,因为很可能service方法结束了,但线程中的response可能还在写。

那我们的确想把response放到另一个线程中去写数据该怎么办呢?servlet3标准支持了你的想法,并且把他称为异步servelt(async servlet)。tomcat从tomcat7开始也支持了servlet3.0标准。

upload successful

async servlet在tomcat中的流程是这样的:

  1. 客户端发送http请求到tomcat监听的端口。
  2. tomcat connector会接收该请求到线程中,并根据http协议解析该请求。
  3. 解析完报文后,会初始化org.apache.coyote.Request,并实例化org.apache.coyote.Response。
  4. 经过装饰器转换为servelt的HttpServletRequest和HttpServletResponse。
  5. 经过tomcat的engine,host,context最终达到servlet的service方法。
  6. service方法中开启异步化上下文(AsyncContext),在把response和AsyncContext交给别的线程后返回。
  7. 回到tomcat,tomcat判断该请求是否开启了异步化上下文,如果开启了,就不关闭response。
  8. tomcat释放掉该线程用于接收下一个请求。

异步请求处理的应用

如果想让你的web程序支持异步请求处理,首先得升级你的web.xml

web-app节点设置属性

1
2
3
4
5
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
<!--提升xsd到3.0-->
</web-app>

其次,需要在servelt的load-on-startup节点后增加async-supported节点。

1
2
3
4
5
6
7
<servlet>
<servlet-name>xxxxx</servlet-name>
<servlet-class>xxxxxServlet</servlet-class>
<load-on-startup>1</load-on-startup>
<!--增加对异步的支持-->
<async-supported>true</async-supported>
</servlet>

如果你有filter,那么也需要在filter的filter-class节点后增加async-supported节点。

1
2
3
4
5
6
<filter>
<filter-name>xxxxxFilter</filter-name>
<filter-class>xxxxxFilter</filter-class>
<!--增加对异步的支持-->
<async-supported>true</async-supported>
</filter>

传统servlet开发的应用

在servlet中开启异步请求处理的代码如下

1
2
3
4
5
6
7
// 在service方法中开启async
AsyncContext context = request.startAsync(request, response);

// 将context和request和response交给自己定义的Runable接口实现类Async的实例,并提交到线程池中运行。
// 在Async类实例的run方法处理完业务逻辑后,调用context.complete();即可正常结束该请求。
threadPoolExecutor.execute(new Async(context,request,response))
return;

使用SpringMVC开发的应用

spring配置

在SpringMVC中支持异步请求处理,需要加一项配置,我们以注解的方式为例。

在annotaion-driven节点内增加子节点async-support。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<mvc:annotation-driven>
<!--增加对异步的支持 -->
<!--default-timeout 表示这个异步的超时毫秒数,例如设置为5000s,表示5000s后强制断开连接,所以代码中的异步传输必须在5000s内结束-->
<!--task-executor 表示这个异步运行在哪个线程池中,如果没有指定,默认使用SimpleAsyncTaskExecutor-->
<mvc:async-support default-timeout="5000000" task-executor="your_thread_pool_executor">
<!--callable 结果拦截器,可以没有-->
<mvc:callable-interceptors>
<bean class="callableInterceptor1"></bean>
<bean class="callableInterceptor2"></bean>
</mvc:callable-interceptors>
<!--deferred 结果拦截器,可以没有-->
<mvc:deferred-result-interceptors>
<bean class="deferredResultInterceptor1"></bean>
<bean class="deferredResultInterceptor2"></bean>
</mvc:deferred-result-interceptors>
</mvc:async-support>
</mvc:annotation-driven>

Callable和DeferredResult

最简单的使用就是使用Callable和DeferredResult返回一个对象,这个对象会被* 发送Map会根据HttpMessageConverter被转为对应的字节转化为字节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* 发送Map会根据HttpMessageConverter被转为对应的字节
* @return
*/
@ResponseBody
@RequestMapping( value = "/callable" )
public Callable<Map<String, Integer> > callable()
{
Callable<Map<String, Integer> > callable = new Callable<Map<String, Integer> >()
{
@Override
public Map<String, Integer> call() throws Exception
{
try {
Thread.sleep( 3000 );
} catch ( InterruptedException e ) {
e.printStackTrace();
}
Map<String, Integer> data = Maps.newHashMap();
data.put( "ok", 1234 );
return(data);
}
};
return(callable);
}


/**
* 发送Map会根据HttpMessageConverter被转为对应的字节
* @return
*/
@ResponseBody
@RequestMapping( value = "/deferred" )
public DeferredResult<Map<String, Integer> > deferred()
{
final DeferredResult<Map<String, Integer> > deferredResult = new DeferredResult<Map<String, Integer> >();
new Thread( new Runnable()
{
@Override
public void run()
{
try {
Thread.sleep( 3000 );
} catch ( InterruptedException e ) {
e.printStackTrace();
}
Map<String, Integer> data = Maps.newHashMap();
data.put( "ok", 1234 );
deferredResult.setResult( data );
}
} ).start();
return(deferredResult);
}

HTTP Streaming

ResponseBodyEmitter

Callable和DeferredResult只能异步返回一个数据,如果想在一个http连接上推送多个事件,也被成为长轮询(http 流)。

Spring MVC通过ResponseBodyEmitter返回值实现上述目标,而不是用@ResponseBody,其中发送的每一个Object都使用HttpMessageConverter写入。

前端接收较为困难,需要直接操作XHR。

这是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RequestMapping("/events")
public ResponseBodyEmitter handle() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// Save the emitter somewhere..
return emitter;
}

// In some other thread
emitter.send("Hello once");

// and again later on
emitter.send("Hello again");

// and done at some point
emitter.complete();

HTTP Streaming With Server-Sent Events

SseEmitter是ResponseBodyEmitter的子类,它提供了对Server-Sent Events的支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 前端用EventSource即可接收,而且发送完一个object后前端就能获取到由HttpMessageConverter转换的object的字节(一般是json)。
* 需要限制时间到在xml中限制的时间
* @return
*/
@RequestMapping( value = "/sse" )
public SseEmitter sse()
{
final SseEmitter sseEmitter = new SseEmitter();
/*
* 假如需要立刻返回一个对象,就这这里发送一个对象即可。例如sseEmitter.send(dbService.query(xxx));
* 这里放到了一个线程中发送数据,但实际上应该交给一个service或者component中。由专门的线程池或者线程去获取,查询操作,send数据
*/
new Thread( new Runnable()
{
@Override
public void run()
{
for ( int i = 0; i < 3; i++ )
{
try {
Thread.sleep( 1500 );
} catch ( InterruptedException e ) {
e.printStackTrace();
}
Map<String, Integer> data = Maps.newHashMap();
data.put( "num", i );
try {
sseEmitter.send( data );
} catch ( IOException e ) {
e.printStackTrace();
}
}
sseEmitter.complete();
}
} ).start();
return(sseEmitter);
}

前端使用EventSource接收,如果后端断开连接,默认会再次发起连接请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
var source = new EventSource("/sse", {
withCredentials: true
});

source.onopen = function(event) {
console.log("EventSource open");
};

source.onmessage = function(event) {
//var data = event.data;
// handle message
console.log("get message", event);
};

source.onerror = function(event) {
// handle error event
console.log(event);
};

// 如果后台关闭,前端也关闭
source.onclose = function(event) {
console.log("get close", event);
source.close();
}

注意:ie浏览器不支持该功能,如果需要支持ie等非先进浏览器,可以考虑使用websocket。

HTTP Streaming Directly To The OutputStream

ResponseBodyEmitter通过写对象,经过HttpMessageConverter转变为json数据发送到客户端。这也许是最常用的场景。但是有时候需要直接写OutputStream,例如下载。这个时候,可以使用StreamingResponseBody

下面是一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 直接发送bytes,适合下载数据用
* @return
*/
@RequestMapping( value = "/direct" )
public StreamingResponseBody direct()
{
StreamingResponseBody streamingResponseBody = new StreamingResponseBody()
{
@Override
public void writeTo( OutputStream outputStream ) throws IOException
{
for ( int i = 0; i < 3; i++ )
{
try {
Thread.sleep( 3000 );
} catch ( InterruptedException e ) {
e.printStackTrace();
}
try {
outputStream.write( ("some text" + i + "\n\n").getBytes() );
} catch ( IOException e ) {
e.printStackTrace();
}
}
}
};
return(streamingResponseBody);
}

总结

servlet3提供了异步的servlet,能够实现异步请求处理。socket依旧在,但是不归tomcat中http线程池的线程管。提前释放tomcat处理线程用于提高吞吐量,响应流不关闭,由业务方法自己处理。从这个角度来看基于servlet3的异步化完全有可能实现真正的服务端push。