欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

springMVC请求异步处理之(DeferredResultMethod,ResponseBodyEmitter)ReturnValueHandler

程序员文章站 2022-06-28 17:43:10
前言在了解StreamingResponseBody,Callable,WebAsyncTask作为Controller方法返回值,针对请求异步处理后,我们再来了解下SpringMVC其他两个支持异步的返回值处理器。DeferredResultMethodReturnValueHandlerDeferredResultMethodReturnValueHandler是处理返回值DeferredResult的,DeferredResult翻译成中文即延迟结果,参看它的源码,可以发现它的功能和WebAsy...

前言

在了解StreamingResponseBody,Callable,WebAsyncTask作为Controller方法返回值,针对请求异步处理后,我们再来了解下SpringMVC其他两个支持异步的返回值处理器。

DeferredResultMethodReturnValueHandler

DeferredResultMethodReturnValueHandler是处理返回值DeferredResult的,DeferredResult翻译成中文即延迟结果,参看它的源码,可以发现它的功能和WebAsyncTask类似,原理都是另起线程来处理业务,(个人理解)但区别在于什么时候执行,WebAsyncTask是有请求进来,就会交给线程池去处理,而DeferredResult可以是任意的时候,超时了两者都可以通过设置超时回调函数进行处理。

DeferredResult可以在任意时间进行处理的前提是需要我们自己去进行管理,我们需要维护每一笔请求对应的DeferredResult,然后在合适的时间点调用DeferredResult#setResult即可。

demo引用自他人博客

@Slf4j
@RestController
public class ApolloController {

    // 值为List,因为监视同一个名称空间的长轮询可能有N个(毕竟可能有多个客户端用同一份配置嘛)
    private Map<String, List<DeferredResult<String>>> watchRequests = new ConcurrentHashMap<>();

    @GetMapping(value = "/all/watchrequests")
    public Object getWatchRequests() {
        return watchRequests;
    }

    // 模拟长轮询:apollo客户端来监听配置文件的变更~  可以指定namespace 监视指定的NameSpace
    @GetMapping(value = "/watch/{namespace}")
    public DeferredResult<String> watch(@PathVariable("namespace") String namespace) {
        log.info("Request received,namespace is" + namespace + ",当前时间:" + System.currentTimeMillis());

        DeferredResult<String> deferredResult = new DeferredResult<>();

        //当deferredResult完成时(不论是超时还是异常还是正常完成),都应该移除watchRequests中相应的watch key
        deferredResult.onCompletion(() -> {
            log.info("onCompletion,移除对namespace:" + namespace + "的监视~");
            List<DeferredResult<String>> list = watchRequests.get(namespace);
            list.remove(deferredResult);
            if (list.isEmpty()) {
                watchRequests.remove(namespace);
            }
        });
        //这里还需要添加异常以及超时时对deferredResult的处理
		...
        List<DeferredResult<String>> list = watchRequests.computeIfAbsent(namespace, (k) -> new ArrayList<>());
        list.add(deferredResult);
        return deferredResult;


    }

    //模拟发布namespace配置:修改配置
    @GetMapping(value = "/publish/{namespace}")
    public void publishConfig(@PathVariable("namespace") String namespace) {
        //do Something for update config
        if (watchRequests.containsKey(namespace)) {
            List<DeferredResult<String>> deferredResults = watchRequests.get(namespace);

            //通知所有watch这个namespace变更的长轮训配置变更结果
            for (DeferredResult<String> deferredResult : deferredResults) {
                deferredResult.setResult(namespace + " changed,时间为" + System.currentTimeMillis());
            }
        }

    }
}

ResponseBodyEmitterReturnValueHandler

ResponseBodyEmitterReturnValueHandler是针对ResponseBodyEmitter及其子类,另外还包括使用ResponseEntity包装的类的返回值处理器。在spring5.0中,还支持响应适配类型。

	public boolean supportsReturnType(MethodParameter returnType) {
		Class<?> bodyType = ResponseEntity.class.isAssignableFrom(returnType.getParameterType()) ?
				ResolvableType.forMethodParameter(returnType).getGeneric().resolve() :
				returnType.getParameterType();

		return (bodyType != null && (ResponseBodyEmitter.class.isAssignableFrom(bodyType) ||
				// 响应适配类型
				this.reactiveHandler.isReactiveType(bodyType)));
	}

ResponseBodyEmitter是一个流式发射器,不同于StreamingResponseBody,它可以发送多个值到OutputStream中,同时它利用了DeferredResult的特性, 直至调用它的complete方法才算发送完成。而它的子类SseEmitter则限定了响应头类型为MediaType.TEXT_EVENT_STREAM。接下来我们一起来看下这个返回处理器的源码。

public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
			ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
		// 返回值为空处理
		...
		HttpServletResponse response = webRequest.getNativeResponse(HttpServletResponse.class);
		
		ServerHttpResponse outputMessage = new ServletServerHttpResponse(response);
		// 返回值为ResponseEntity<ResponseBodyEmitter> 或 ResponseEntity<SseEmitter>时的处理
		...
		ServletRequest request = webRequest.getNativeRequest(ServletRequest.class);

		ResponseBodyEmitter emitter;
		if (returnValue instanceof ResponseBodyEmitter) {
			emitter = (ResponseBodyEmitter) returnValue;
		}else {
			// 这里是响应式编程解析的部分,暂时不去了解
			....
		}
		// 默认空实现,SseEmitter中覆盖重写,设置了响应头类型为MediaType.TEXT_EVENT_STREAM
		emitter.extendResponse(outputMessage);

		// 流式场景不需要对响应缓存
		ShallowEtagHeaderFilter.disableContentCaching(request);

		// 包装响应以忽略进一步的头更改,头将在第一次写入时刷新
		outputMessage = new StreamingServletServerHttpResponse(outputMessage);

		HttpMessageConvertingHandler handler;
		try {
			// 这里使用了DeferredResult
			DeferredResult<?> deferredResult = new DeferredResult<>(emitter.getTimeout());
			WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
			handler = new HttpMessageConvertingHandler(outputMessage, deferredResult);
		}
		catch (Throwable ex) {
			emitter.initializeWithError(ex);
			throw ex;
		}
		// 这块是主要逻辑
		emitter.initialize(handler);
	}
	synchronized void initialize(Handler handler) throws IOException {
		this.handler = handler;

		try {
			// 遍历之前发送的数据
			for (DataWithMediaType sendAttempt : this.earlySendAttempts) {
				// 这里会调用handler的send方法
				sendInternal(sendAttempt.getData(), sendAttempt.getMediaType());
			}
		}finally {
			this.earlySendAttempts.clear();
		}
		// 数据是否已经发完了
		if (this.complete) {
			// 有没有报错
			if (this.failure != null) {
				this.handler.completeWithError(this.failure);
			}else {
				// 这里最终会调用DefferedResult.setResult
				this.handler.complete();
			}
		}else {
			this.handler.onTimeout(this.timeoutCallback);
			this.handler.onError(this.errorCallback);
			this.handler.onCompletion(this.completionCallback);
		}
	}
private class HttpMessageConvertingHandler implements ResponseBodyEmitter.Handler {
		...
		
		@SuppressWarnings("unchecked")
		private <T> void sendInternal(T data, @Nullable MediaType mediaType) throws IOException {
			// RequestMappingHandlerAdapter实例化的时候会设置,例如ByteArrayHttpMessageConverter,StringHttpMessageConverter
			for (HttpMessageConverter<?> converter : ResponseBodyEmitterReturnValueHandler.this.sseMessageConverters) {
				if (converter.canWrite(data.getClass(), mediaType)) {
					// 将消息写入输出流
					((HttpMessageConverter<T>) converter).write(data, mediaType, this.outputMessage);
					this.outputMessage.flush();
					return;
				}
			}
			throw new IllegalArgumentException("No suitable converter for " + data.getClass());
		}

		@Override
		public void complete() {
			try {
				this.outputMessage.flush();
				// 将请求重新分派给容器
				this.deferredResult.setResult(null);
			}catch (IOException ex) {
				this.deferredResult.setErrorResult(ex);
			}
		}
		...
	}

demo

 	private Map<String, ResponseBodyEmitter> map = new ConcurrentHashMap<> ();

    @RequestMapping(value="/test6/{num}",produces = {"text/html;charset=utf-8"})
    @ResponseBody
    public ResponseBodyEmitter test6(@PathVariable String num){
        System.out.println ("请求开始" + Thread.currentThread ());
        ResponseBodyEmitter emitter = new ResponseBodyEmitter (15000L);
        emitter.onCompletion (()->{map.remove (num);});
        emitter.onError (t ->{map.remove (num);});
        emitter.onTimeout (()->{map.remove (num);});
        map.put (num,emitter);
        return emitter;
    }

    @RequestMapping(value="/test7/{num}")
    public void test7(@PathVariable String num,HttpServletResponse response) throws IOException {
        ServletOutputStream outputStream = response.getOutputStream ();
        if(map.containsKey (num)){

            try {
                for (int i = 0; i < 5; i++) {
                    map.get (num).send (RandomUtils.nextInt (1,100000));
                    Thread.sleep (1000L);
                }
            } catch (Exception e) {
                e.printStackTrace ();
            }finally {
                map.get (num).complete ();
                outputStream.write ("发送ok".getBytes ());
            }
        }else{
            outputStream.write ("无对应请求".getBytes ());
        }
    }

参考博客

Servlet 3.0 新特性详解
高性能关键技术之—体验Spring MVC的异步模式
DEFERREDRESULT使用方式和场景
Spring MVC的@ Async,DeferredResult和Callable之间的区别

本文地址:https://blog.csdn.net/weixin_36959304/article/details/109568743

相关标签: SpringMvc java