解决网络型inputStream无法反复read的问题。mark/reset not supported
# 一、基础知识说明
对于大部分inputStream反复读写的问题,我们可以采用mark(int readLimit)、reset()方法解决。但是网络型的inputStream却不支持这种方法。
# 二、问题的产生
由于业务需求,需要对restTemplate添加一个拦截器,以实现根据第三方服务的response中的code字段 来判断后续进行哪一步操作。
我这里的真实场景是,需要判断第三方的token是否过期。如果过期,则获取token后,重新访问第三方服务。拦截器简易代码如下:
@Component
public class TokenRefreshInterceptor implements ClientHttpRequestInterceptor {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private TokenServer tokenServer;
@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution execution) throws IOException {
logger.info("进入拦截器TokenRefreshInterceptor。");
ClientHttpResponse response = execution.execute(httpRequest, bytes);
//读取网络inputStream
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(response.getBody()));
StringBuilder builder = new StringBuilder();
String temp;
while ((temp = bufferedReader.readLine()) != null){
builder.append(temp);
}
logger.info("TokenRefresh拦截器,收到response。httpRequest={}, response={}, body={}",httpRequest,response,builder.toString());
//如果过期,重新生成token
if(isTokenExpired(builder.toString())){
//重新设置token请求头,并请求
httpRequest.getHeaders().set("X-Access-Token",refreshToken);
return execution.execute(httpRequest,bytes);
}
//把response返回给业务代码,继续操作
return response;
}
在上面代码中,已经对网络型的inputStream进行了读操作。所以在后续的业务代码中获取responseBody为null。
通过查阅资料得知:inputStream的read操作,底层实现是通过控制stream上的指针,来读取数据的。由于在拦截器中已经read过数据,指针已经处于stream的末端,业务代码中就无法再读取到数据了。
# 三、尝试解决1
既然问题找到了,那么重新改变stream上的指针位置即可解决。于是查阅资料,发现可以通过mark(int readLimit)、reset()方法解决。
修改代码如下:
....
ClientHttpResponse response = execution.execute(httpRequest, bytes);
InputStream bodyStream = response.getBody();
//标记位置
bodyStream.mark(bodyStream.available());
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(bodyStream));
StringBuilder builder = new StringBuilder();
String temp;
while ((temp = bufferedReader.readLine()) != null){
builder.append(temp);
}
//重置指针
bodyStream.reset();
....
测试过程中报错如下:
Caused by: java.io.IOException: mark/reset not supported
at java.io.InputStream.reset(InputStream.java:348)
at java.io.FilterInputStream.reset(FilterInputStream.java:226)
at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.reset(HttpURLConnection.java:3408)
at cn.thecover.job.interceptor.TokenRefreshInterceptor.intercept(TokenRefreshInterceptor.java:43)
at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:92)
at org.springframework.http.client.InterceptingClientHttpRequest.executeInternal(InterceptingClientHttpRequest.java:76)
at org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
at org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:735)
通过查阅资料和实践 得出结论:网络型inputStream不支持mark/reset
# 四、尝试解决2
在查阅资料的过程中,有文章说针对不支持mark/reset的inputStream,可以在外层封装BufferedInputStream。
于是修改代码:
...
//使用bufferedReader进行封装
BufferedInputStream bufferedInputStream = new BufferedInputStream(bodyStream);
bufferedInputStream.mark(bodyStream.available());
StringBuilder builder = new StringBuilder();
String temp;
while ((temp = bufferedReader.readLine()) != null){
builder.append(temp);
}
//使用bufferedReader重置指针
bufferedInputStream.reset();
...
话外音:当时觉得不管怎么封装,其本质还是对网络型的inputStream的读写,真的能够成功?
抱着疑惑测试了一下,业务代码中获取的body果然还是为null。
# 五:尝试解决3
拦截器中复写方法的返回值为ClientHttpResponse,这是个接口。那么返回的实例哪一个呢?写demo进行debug。
进入SimpleClientHttpResponse源码
final class SimpleClientHttpResponse extends AbstractClientHttpResponse {
private final HttpURLConnection connection;
@Nullable
private HttpHeaders headers;
@Nullable
private InputStream responseStream;
SimpleClientHttpResponse(HttpURLConnection connection) {
this.connection = connection;
}
@Override
public int getRawStatusCode() throws IOException {
return this.connection.getResponseCode();
}
@Override
public String getStatusText() throws IOException {
String result = this.connection.getResponseMessage();
return (result != null) ? result : "";
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
// Header field 0 is the status line for most HttpURLConnections, but not on GAE
String name = this.connection.getHeaderFieldKey(0);
if (StringUtils.hasLength(name)) {
this.headers.add(name, this.connection.getHeaderField(0));
}
int i = 1;
while (true) {
name = this.connection.getHeaderFieldKey(i);
if (!StringUtils.hasLength(name)) {
break;
}
this.headers.add(name, this.connection.getHeaderField(i));
i++;
}
}
return this.headers;
}
@Override
public InputStream getBody() throws IOException {
InputStream errorStream = this.connection.getErrorStream();
this.responseStream = (errorStream != null ? errorStream : this.connection.getInputStream());
return this.responseStream;
}
@Override
public void close() {
try {
if (this.responseStream == null) {
getBody();
}
StreamUtils.drain(this.responseStream);
this.responseStream.close();
}
catch (Exception ex) {
// ignore
}
}
}
发现其中封装了HttpURLConnection、HttpHeaders、InputStream。
同时复写了方法:getRawStatusCode、getStatusText、getHeaders、getBody、close。
由于我这里主要是获取responseBody,故仔细阅读getBody(), 发现是通过connection来读取的。(关于restTemplate如何对inputStream进行读取操作,并转换为对应的body这里不做展开,可自行阅读源码)
但是connection在拦截器中已经使用过了,并且这里封装的inputStream在拦截器中也已经读取过了。我第一反应是偷梁换柱 把封装的inputStream给改了。但是这个类中又没有针对inputStream的set方法。
于是准备自定义个response来继承SimpleClientHttpResponse,然后改写其getBody()。
但是当我准备继承这个SimpleClientHttpResponse的时候,发现它是用final修饰的,无法进行继承。于是退而求其次:继承父类AbstractClientHttpResponse。
public class MyResponse extends AbstractClientHttpResponse {
private Integer rawStatusCode;
private String statusText;
private InputStream body;
private HttpHeaders headers;
public MyResponse(Integer rawStatusCode, String statusText, InputStream body, HttpHeaders headers/*, String bodyString*/) {
this.rawStatusCode = rawStatusCode;
this.statusText = statusText;
this.body = body;
this.headers = headers;
}
@Override
public int getRawStatusCode() throws IOException {
return rawStatusCode;
}
@Override
public String getStatusText() throws IOException {
return statusText;
}
@Override
public void close() {
try {
this.body.close();
}
catch (Exception ex) {
// ignore
}
}
@Override
public InputStream getBody() throws IOException {
return body;
}
@Override
public HttpHeaders getHeaders() {
return headers;
}
}
为了不影响其他方法(需要复写的其他方法)。
这个类通过构造方法,约束rawStatusCode、statusText、headers的返回值。
最重要的getBody()同样通过构造方法传入,不过在这里,我们就可以进行偷梁换柱 将inputStream改写为我们需要的inputStream。
以下为重新改写的拦截器代码:
ClientHttpResponse response = execution.execute(httpRequest, bytes);
InputStream bodyStream = response.getBody();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(bodyStream));
StringBuilder builder = new StringBuilder();
String temp;
while ((temp = bufferedReader.readLine()) != null){
builder.append(temp);
}
logger.info("TokenRefresh拦截器,收到response。httpRequest={}, response={}, body={}",httpRequest,response,builder.toString());
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8));
//通过构造函数传入SimpleClientHttpResponse中的相关结果,保证数据统一
//使用ByteArrayInputStream进行偷梁换柱
return newMyResponse(response.getRawStatusCode()response.getStatusText(),byteArrayInpuStream,response.getHeaders());
业务代码获得body如下:
本文地址:https://blog.csdn.net/qq_34019552/article/details/110855319