欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

解决网络型inputStream无法反复read的问题。mark/reset not supported

程序员文章站 2022-03-26 10:31:01
# 一、基础知识说明对于大部分inputStream反复读写的问题,我们可以采用mark(int readLimit)、reset()方法解决。但是网络型的inputStream却不支持这种方法。# 二、问题的产生由于业务需求,需要对restTemplate添加一个拦截器,以实现根据第三方服务的response中的code字段 来判断后续进行哪一步操作。我这里的真实场景是,需要判断第三方的token是否过期。如果过期,则获取token后,重新访问第三方服务。拦截器简易代码如下:@Com...

# 一、基础知识说明

对于大部分inputStream反复读写的问题,我们可以采用mark(int readLimit)、reset()方法解决。但是网络型的inputStream却不支持这种方法。

# 二、问题的产生

由于业务需求,需要对restTemplate添加一个拦截器,以实现根据第三方服务的response中的code字段 来判断后续进行哪一步操作。

我这里的真实场景是,需要判断第三方的token是否过期。如果过期,则获取token后,重新访问第三方服务。拦截器简易代码如下:

@Component
public class TokenRefreshInterceptor implements ClientHttpRequestInterceptor {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private TokenServer tokenServer;

    @Override
    public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution execution) throws IOException {
        logger.info("进入拦截器TokenRefreshInterceptor。");
        ClientHttpResponse response = execution.execute(httpRequest, bytes);
        //读取网络inputStream
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(response.getBody()));
        StringBuilder builder = new StringBuilder();
        String temp;
        while ((temp = bufferedReader.readLine()) != null){
            builder.append(temp);
        }

        logger.info("TokenRefresh拦截器,收到response。httpRequest={}, response={}, body={}",httpRequest,response,builder.toString());

        //如果过期,重新生成token
        if(isTokenExpired(builder.toString())){
            //重新设置token请求头,并请求
            httpRequest.getHeaders().set("X-Access-Token",refreshToken);
            return execution.execute(httpRequest,bytes);
        }
        //把response返回给业务代码,继续操作
        return response;
    }

 

在上面代码中,已经对网络型的inputStream进行了读操作。所以在后续的业务代码中获取responseBody为null。

解决网络型inputStream无法反复read的问题。mark/reset not supported

通过查阅资料得知:inputStream的read操作,底层实现是通过控制stream上的指针,来读取数据的。由于在拦截器中已经read过数据,指针已经处于stream的末端,业务代码中就无法再读取到数据了。

# 三、尝试解决1

既然问题找到了,那么重新改变stream上的指针位置即可解决。于是查阅资料,发现可以通过mark(int readLimit)、reset()方法解决。

修改代码如下:

....
ClientHttpResponse response = execution.execute(httpRequest, bytes);
InputStream bodyStream = response.getBody();
//标记位置
bodyStream.mark(bodyStream.available());
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(bodyStream));
StringBuilder builder = new StringBuilder();
String temp;
while ((temp = bufferedReader.readLine()) != null){
    builder.append(temp);
}
//重置指针
bodyStream.reset();
....

 

测试过程中报错如下:

Caused by: java.io.IOException: mark/reset not supported
    at java.io.InputStream.reset(InputStream.java:348)
    at java.io.FilterInputStream.reset(FilterInputStream.java:226)
    at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.reset(HttpURLConnection.java:3408)
    at cn.thecover.job.interceptor.TokenRefreshInterceptor.intercept(TokenRefreshInterceptor.java:43)
    at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:92)
    at org.springframework.http.client.InterceptingClientHttpRequest.executeInternal(InterceptingClientHttpRequest.java:76)
    at org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
    at org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
    at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:735)

 

通过查阅资料和实践 得出结论:网络型inputStream不支持mark/reset

# 四、尝试解决2

在查阅资料的过程中,有文章说针对不支持mark/reset的inputStream,可以在外层封装BufferedInputStream。

于是修改代码:

...
//使用bufferedReader进行封装
BufferedInputStream bufferedInputStream = new BufferedInputStream(bodyStream);
bufferedInputStream.mark(bodyStream.available());
StringBuilder builder = new StringBuilder();
String temp;
while ((temp = bufferedReader.readLine()) != null){
    builder.append(temp);
}
//使用bufferedReader重置指针
bufferedInputStream.reset();
...


话外音:当时觉得不管怎么封装,其本质还是对网络型的inputStream的读写,真的能够成功?

抱着疑惑测试了一下,业务代码中获取的body果然还是为null。

# 五:尝试解决3

拦截器中复写方法的返回值为ClientHttpResponse,这是个接口。那么返回的实例哪一个呢?写demo进行debug。

解决网络型inputStream无法反复read的问题。mark/reset not supported

进入SimpleClientHttpResponse源码

final class SimpleClientHttpResponse extends AbstractClientHttpResponse {

    private final HttpURLConnection connection;

    @Nullable
    private HttpHeaders headers;

    @Nullable
    private InputStream responseStream;


    SimpleClientHttpResponse(HttpURLConnection connection) {
        this.connection = connection;
    }


    @Override
    public int getRawStatusCode() throws IOException {
        return this.connection.getResponseCode();
    }

    @Override
    public String getStatusText() throws IOException {
        String result = this.connection.getResponseMessage();
        return (result != null) ? result : "";
    }

    @Override
    public HttpHeaders getHeaders() {
        if (this.headers == null) {
            this.headers = new HttpHeaders();
            // Header field 0 is the status line for most HttpURLConnections, but not on GAE
            String name = this.connection.getHeaderFieldKey(0);
            if (StringUtils.hasLength(name)) {
                this.headers.add(name, this.connection.getHeaderField(0));
            }
            int i = 1;
            while (true) {
                name = this.connection.getHeaderFieldKey(i);
                if (!StringUtils.hasLength(name)) {
                    break;
                }
                this.headers.add(name, this.connection.getHeaderField(i));
                i++;
            }
        }
        return this.headers;
    }

    @Override
    public InputStream getBody() throws IOException {
        InputStream errorStream = this.connection.getErrorStream();
        this.responseStream = (errorStream != null ? errorStream : this.connection.getInputStream());
        return this.responseStream;
    }

    @Override
    public void close() {
        try {
            if (this.responseStream == null) {
                getBody();
            }
            StreamUtils.drain(this.responseStream);
            this.responseStream.close();
        }
        catch (Exception ex) {
            // ignore
        }
    }

}


发现其中封装了HttpURLConnection、HttpHeaders、InputStream。

同时复写了方法:getRawStatusCode、getStatusText、getHeaders、getBody、close。

由于我这里主要是获取responseBody,故仔细阅读getBody(), 发现是通过connection来读取的。(关于restTemplate如何对inputStream进行读取操作,并转换为对应的body这里不做展开,可自行阅读源码)

但是connection在拦截器中已经使用过了,并且这里封装的inputStream在拦截器中也已经读取过了。我第一反应是偷梁换柱 把封装的inputStream给改了。但是这个类中又没有针对inputStream的set方法。

于是准备自定义个response来继承SimpleClientHttpResponse,然后改写其getBody()。

但是当我准备继承这个SimpleClientHttpResponse的时候,发现它是用final修饰的,无法进行继承。于是退而求其次:继承父类AbstractClientHttpResponse。

public class MyResponse extends AbstractClientHttpResponse {

    private Integer rawStatusCode;
    private String statusText;
    private InputStream body;
    private HttpHeaders headers;


    public MyResponse(Integer rawStatusCode, String statusText, InputStream body, HttpHeaders headers/*, String bodyString*/) {
        this.rawStatusCode = rawStatusCode;
        this.statusText = statusText;
        this.body = body;
        this.headers = headers;
    }

    @Override
    public int getRawStatusCode() throws IOException {
        return rawStatusCode;
    }

    @Override
    public String getStatusText() throws IOException {
        return statusText;
    }

    @Override
    public void close() {
        try {
            this.body.close();
        }
        catch (Exception ex) {
            // ignore
        }
    }

    @Override
    public InputStream getBody() throws IOException {
        return body;
    }

    @Override
    public HttpHeaders getHeaders() {
        return headers;
    }

}


为了不影响其他方法(需要复写的其他方法)。
这个类通过构造方法,约束rawStatusCode、statusText、headers的返回值。

最重要的getBody()同样通过构造方法传入,不过在这里,我们就可以进行偷梁换柱 将inputStream改写为我们需要的inputStream。

以下为重新改写的拦截器代码:

ClientHttpResponse response = execution.execute(httpRequest, bytes);
InputStream bodyStream = response.getBody();

BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(bodyStream));
StringBuilder builder = new StringBuilder();
String temp;
while ((temp = bufferedReader.readLine()) != null){
    builder.append(temp);
}

logger.info("TokenRefresh拦截器,收到response。httpRequest={}, response={}, body={}",httpRequest,response,builder.toString());

ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8));
//通过构造函数传入SimpleClientHttpResponse中的相关结果,保证数据统一
//使用ByteArrayInputStream进行偷梁换柱
return newMyResponse(response.getRawStatusCode()response.getStatusText(),byteArrayInpuStream,response.getHeaders());

 

业务代码获得body如下:

解决网络型inputStream无法反复read的问题。mark/reset not supported

 


    

本文地址:https://blog.csdn.net/qq_34019552/article/details/110855319

相关标签: java http 网络