HTTP请求分片传输
程序员文章站
2022-04-11 13:05:52
一.背景有个项目,客户的宽带只有上下行速度大概256KB/每秒,在模板过大的情况一直被网络重置,采用分片传输,降低传输容量二.nginx代理配置限速stream { upstream backend { server 61.145.229.28:7241 } server { listen 7241; proxy_connect_timeout 10s; proxy_timeout 3s;proxy_...
一.背景
有个项目,客户的宽带只有上下行速度大概256KB/每秒,在模板过大的情况一直被网络重置,采用分片传输,降低传输容量
二.nginx代理配置限速
stream {
upstream backend {
server 61.145.229.28:7241
}
server {
listen 7241;
proxy_connect_timeout 10s;
proxy_timeout 3s;
proxy_upload_rate 20K;
proxy_download_rate 20K;
proxy_pass backend;
}
}
三.Java程序分片
常量类 GeneralHttpConstant.java
package com.montnets.emp.util;
import org.omg.CORBA.Environment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class GeneralHttpConstant {
/**
* 链接超时时间,单位:秒
*/
public static int CONNECT_TIMEOUT = 15;
/**
* 读取超时时间,单位:秒
*/
public static int READ_TIMEOUT = 2 * 60;
/**
* 分次写入,单位:KB
*/
public static int READ_SIZE = 40;
/**
* 每次写入后休眠时间,单位:毫秒
*/
public static long SLEEP_TIME = 500;
@Value("${general.http.connectTimeout}")
public void setConnectTimeout(int connectTimeout) {
CONNECT_TIMEOUT = connectTimeout;
}
@Value("${general.http.readTimeout}")
public void setReadTimeout(int readTimeout) {
READ_TIMEOUT = readTimeout;
}
@Value("${general.http.readSize}")
public void setReadSize(int readSize) {
READ_SIZE = readSize;
}
@Value("${general.http.sleepTime}")
public void setSleepTime(long sleepTime) {
SLEEP_TIME = sleepTime;
}
}
发送实现 GeneralHttpUtils
package com.montnets.emp.util;
import com.montnets.emp.common.meditor.constant.RMSHttpConstant;
import com.montnets.emp.common.meditor.entity.JsonEntity;
import com.montnets.emp.common.meditor.entity.UploadTempParams;
import com.montnets.emp.common.meditor.util.MD5;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
/**
* @author: 普通的http请求
* @date: 2020/05/23
*/
@Slf4j
public class GeneralHttpUtils {
private static Logger logger = LoggerFactory.getLogger(GeneralHttpUtils.class);
/**
* 重发2次
* @param uri
* @param charset
* @param contentType
* @param time
* @return
*/
public static String reSendPostSeg1(String uri, UploadTempParams obj, String pwd, String charset, String contentType, int time) throws Exception {
String resp = "";
for (int i = 0; i < time; i++) {
logger.info("第 " + (i+1) + " 次发送请求,请求报文:" + (obj.toString().length() > 600 ? obj.toString().substring(0, 600) : obj.toString()));
resp = sendPostSeg1(uri, obj, charset, contentType);
logger.info("第 " + (i+1) + " 次响应报文:" + resp);
if(StringUtils.isNotBlank(resp)){
break;
}
// 重发
UploadTempParams param = new UploadTempParams();
param.setUserid(obj.getUserid());
param.setPwd(MD5.getMD5Str(obj.getUserid().toUpperCase() + RMSHttpConstant.FIXATION_SEQUENCE + pwd + param.getTimestamp()));
param.setContent(obj.getContent());
param.setSign(MD5.getMD5Str(obj.getContent() + param.getTimestamp() + RMSHttpConstant.FIXATION_SEQUENCE));
param.setSmcontent(obj.getSmcontent());
param.setSmparamreg(obj.getSmparamreg());
param.setTitle(obj.getTitle());
param.setTmplver(obj.getTmplver());
param.setOrigin(obj.getOrigin());
obj = param;
}
return resp;
}
/**
* 分片传输http数据
*
* @return
*/
public static String sendPostSeg1(String uri, UploadTempParams obj, String charset, String contentType) {
long l = System.currentTimeMillis();
String result = "";
InputStream in = null;
ByteArrayInputStream bArray = null;
BufferedOutputStream out = null;
InputStreamReader isr = null;
BufferedReader buffer = null;
HttpURLConnection urlcon = null;
List<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
JsonEntity jsonEntity = null;
InputStream content = null;
try {
URL url = new URL(uri);
logger.info("开始连接-->【" + uri + "】");
urlcon = (HttpURLConnection) url.openConnection();
logger.info("连接成功....");
urlcon.setRequestMethod("POST");
urlcon.setConnectTimeout(GeneralHttpConstant.CONNECT_TIMEOUT * 1000);
urlcon.setReadTimeout(GeneralHttpConstant.READ_TIMEOUT * 1000);
urlcon.setRequestProperty("Content-Type", contentType);
// 富信模板上传服务端是短连接,此处与服务端一致
urlcon.setRequestProperty("Connection", "close");
urlcon.setRequestProperty("Accept-Encoding", "gzip,deflate");
urlcon.setRequestProperty("Accept", "*");
urlcon.setChunkedStreamingMode(1024 * GeneralHttpConstant.READ_SIZE);
// urlcon.setRequestProperty("Cache-Control", null);
// urlcon.setRequestProperty("Pragma", null);
urlcon.setDoInput(true);
urlcon.setDoOutput(true);
// urlcon.setUseCaches(false);
// 获取输出流,进行写数据
//out = urlcon.getOutputStream();
out = new BufferedOutputStream(urlcon.getOutputStream(),1024 * GeneralHttpConstant.READ_SIZE);
int count = 0;
int length = 0;
// 数组大小
int size = 1024 * GeneralHttpConstant.READ_SIZE;
byte[] buf = new byte[size];
parseParam(obj, params);
jsonEntity = new JsonEntity(params, RMSHttpConstant.UTF8_ENCODE);
content = jsonEntity.getContent();
// 首先根据传递的字节数组将读取的字节的数量返回,在判断是否读取的空
while ((length = content.read(buf)) != -1) {
logger.info("第" + (++count) + "数据分片写入发送,length:" + length);
out.write(buf, 0, length);
out.flush();
// out.close();
// out=urlcon.getOutputStream();
// urlcon.k
Thread.sleep(GeneralHttpConstant.SLEEP_TIME);
}
// 获取输入流,获取响应
in = urlcon.getInputStream();
isr = new InputStreamReader(in, charset);
buffer = new BufferedReader(isr);
StringBuffer bs = new StringBuffer();
String line = null;
while ((line = buffer.readLine()) != null) {
bs.append(line);
}
result = bs.toString();
} catch (Exception e) {
logger.error("[请求异常][地址:" + uri + "][错误信息:" + e.getMessage() + "]");
result = "";
} finally {
try {
if (null != buffer) {
buffer.close();
}
} catch (Exception e1) {
logger.info("[关闭流异常][错误信息:" + e1.getMessage() + "]");
}
try {
if (null != isr) {
isr.close();
}
} catch (Exception e2) {
logger.info("[关闭流异常][错误信息:" + e2.getMessage() + "]");
}
try {
if (null != in) {
in.close();
}
} catch (Exception e3) {
logger.info("[关闭流异常][错误信息:" + e3.getMessage() + "]");
}
try {
if (null != content) {
content.close();
}
} catch (Exception e13) {
logger.info("[关闭流异常][错误信息:" + e13.getMessage() + "]");
}
try {
if (null != bArray) {
bArray.close();
}
} catch (Exception e4) {
logger.info("[关闭流异常][错误信息:" + e4.getMessage() + "]");
}
try {
if (null != out) {
out.close();
}
} catch (Exception e5) {
logger.info("[关闭流异常][错误信息:" + e5.getMessage() + "]");
}
try {
if (null != urlcon) {
urlcon.disconnect();
}
} catch (Exception e6) {
logger.info("[关闭HttpURLConnection异常][错误信息:" + e6.getMessage() + "]");
}
}
logger.info("向【" + uri + "】发起http请求耗时:" + TimeFmtUtil.formatTime(System.currentTimeMillis()-l));
return result;
}
/**
* 重发2次
* @param uri
* @param msg
* @param charset
* @param contentType
* @param time
* @return
*/
public static String reSendPostSeg(String uri, String msg, String charset, String contentType, int time){
String resp = "";
for (int i = 0; i < time; i++) {
logger.info("第 " + (i+1) + " 次发送请求,请求报文:" + (msg.length() > 600 ? msg.substring(0, 600) : msg));
resp = sendPostSeg(uri, msg, charset, contentType);
logger.info("第 " + (i+1) + " 次响应报文:" + resp);
if(StringUtils.isNotBlank(resp)){
break;
}
}
return resp;
}
/**
* 分片传输http数据
*
* @return
*/
public static String sendPostSeg(String uri, String msg, String charset, String contentType) {
long l = System.currentTimeMillis();
String result = "";
InputStream in = null;
ByteArrayInputStream bArray = null;
OutputStream out = null;
InputStreamReader isr = null;
BufferedReader buffer = null;
HttpURLConnection urlcon = null;
try {
URL url = new URL(uri);
logger.info("开始连接-->【" + uri + "】");
urlcon = (HttpURLConnection) url.openConnection();
logger.info("连接成功....");
urlcon.setRequestMethod("POST");
urlcon.setConnectTimeout(GeneralHttpConstant.CONNECT_TIMEOUT * 1000);
urlcon.setReadTimeout(GeneralHttpConstant.READ_TIMEOUT * 1000);
urlcon.setRequestProperty("Content-Type", contentType);
// 富信模板上传服务端是短连接,此处与服务端一致
urlcon.setRequestProperty("Connection", "close");
urlcon.setRequestProperty("Accept-Encoding", "gzip,deflate");
urlcon.setRequestProperty("Accept", "*");
// urlcon.setRequestProperty("Cache-Control", null);
// urlcon.setRequestProperty("Pragma", null);
urlcon.setDoInput(true);
urlcon.setDoOutput(true);
// urlcon.setUseCaches(false);
// 获取输出流,进行写数据
out = urlcon.getOutputStream();
int count = 0;
int length = 0;
// 数组大小
int size = 1024 * GeneralHttpConstant.READ_SIZE;
// byte[] buf = new byte[size];
// logger.info("长度:"+msg.getBytes(charset).length);
// bArray = new ByteArrayInputStream(msg.getBytes(charset));
// // 首先根据传递的字节数组将读取的字节的数量返回,在判断是否读取的空
// while ((length = bArray.read(buf)) != -1) {
// logger.info("第" + (++count) + "数据分片写入发送,length:" + length);
// out.write(buf, 0, length);
// out.flush();
// Thread.sleep(GeneralHttpConstant.SLEEP_TIME);
// }
byte[] bytes = msg.getBytes(charset);
out.write(bytes);
out.flush();
out.close();
// 获取输入流,获取响应
in = urlcon.getInputStream();
isr = new InputStreamReader(in, charset);
buffer = new BufferedReader(isr);
StringBuffer bs = new StringBuffer();
String line = null;
while ((line = buffer.readLine()) != null) {
bs.append(line);
}
result = bs.toString();
} catch (Exception e) {
logger.error("[请求异常][地址:" + uri + "][错误信息:" + e.getMessage() + "]");
result = "";
} finally {
try {
if (null != buffer) {
buffer.close();
}
} catch (Exception e1) {
logger.info("[关闭流异常][错误信息:" + e1.getMessage() + "]");
}
try {
if (null != isr) {
isr.close();
}
} catch (Exception e2) {
logger.info("[关闭流异常][错误信息:" + e2.getMessage() + "]");
}
try {
if (null != in) {
in.close();
}
} catch (Exception e3) {
logger.info("[关闭流异常][错误信息:" + e3.getMessage() + "]");
}
try {
if (null != bArray) {
bArray.close();
}
} catch (Exception e4) {
logger.info("[关闭流异常][错误信息:" + e4.getMessage() + "]");
}
try {
if (null != out) {
out.close();
}
} catch (Exception e5) {
logger.info("[关闭流异常][错误信息:" + e5.getMessage() + "]");
}
try {
if (null != urlcon) {
urlcon.disconnect();
}
} catch (Exception e6) {
logger.info("[关闭HttpURLConnection异常][错误信息:" + e6.getMessage() + "]");
}
}
logger.info("向【" + uri + "】发起http请求耗时:" + TimeFmtUtil.formatTime(System.currentTimeMillis()-l));
return result;
}
public static void parseParam(Object obj, List<BasicNameValuePair> params) throws NoSuchMethodException,
IllegalAccessException, InvocationTargetException {
// 设置请求的参数
String fieldName = null;
String fieldNameUpper = null;
Method getMethod = null;
Class cls = obj.getClass();
Field[] fields = cls.getDeclaredFields();
Object value = null;
for (int i = 0; i < fields.length; i++) {
fieldName = fields[i].getName();
if (!"serialVersionUID".equals(fieldName)) {
fieldNameUpper = Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
getMethod = cls.getMethod("get" + fieldNameUpper);
value = getMethod.invoke(obj);
if (value != null) {
params.add(new BasicNameValuePair(fieldName, String.valueOf(value)));
}
}
}
}
}
本文地址:https://blog.csdn.net/qq_36555327/article/details/107352471