欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

HTTP请求分片传输

程序员文章站 2024-01-08 13:02:04
一.背景有个项目,客户的宽带只有上下行速度大概256KB/每秒,在模板过大的情况一直被网络重置,采用分片传输,降低传输容量二.nginx代理配置限速stream { upstream backend { server 61.145.229.28:7241 } server { listen 7241; proxy_connect_timeout 10s; proxy_timeout 3s;proxy_...

一.背景

有个项目,客户的宽带只有上下行速度大概256KB/每秒,在模板过大的情况一直被网络重置,采用分片传输,降低传输容量

二.nginx代理配置限速

stream {
    upstream backend {
        server 61.145.229.28:7241
    }
    server {
        listen 7241;
        proxy_connect_timeout 10s;
        proxy_timeout 3s;
		proxy_upload_rate 20K; 
		proxy_download_rate 20K;
        proxy_pass backend;
    }
}

三.Java程序分片

常量类 GeneralHttpConstant.java

package com.montnets.emp.util;

import org.omg.CORBA.Environment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class GeneralHttpConstant {

    /**
     * 链接超时时间,单位:秒
     */
    public static int CONNECT_TIMEOUT = 15;

    /**
     * 读取超时时间,单位:秒
     */
    public static int READ_TIMEOUT = 2 * 60;

    /**
     * 分次写入,单位:KB
     */
    public static int READ_SIZE = 40;

    /**
     * 每次写入后休眠时间,单位:毫秒
     */
    public static long SLEEP_TIME = 500;

    @Value("${general.http.connectTimeout}")
    public void setConnectTimeout(int connectTimeout) {
        CONNECT_TIMEOUT = connectTimeout;
    }
    @Value("${general.http.readTimeout}")
    public void setReadTimeout(int readTimeout) {
        READ_TIMEOUT = readTimeout;
    }
    @Value("${general.http.readSize}")
    public void setReadSize(int readSize) {
        READ_SIZE = readSize;
    }
    @Value("${general.http.sleepTime}")
    public void setSleepTime(long sleepTime) {
        SLEEP_TIME = sleepTime;
    }
}

发送实现 GeneralHttpUtils

package com.montnets.emp.util;

import com.montnets.emp.common.meditor.constant.RMSHttpConstant;
import com.montnets.emp.common.meditor.entity.JsonEntity;
import com.montnets.emp.common.meditor.entity.UploadTempParams;
import com.montnets.emp.common.meditor.util.MD5;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;

/**
 * @author: 普通的http请求
 * @date: 2020/05/23
 */
@Slf4j
public class GeneralHttpUtils {

    private static Logger logger = LoggerFactory.getLogger(GeneralHttpUtils.class);


    /**
     * 重发2次
     * @param uri
     * @param charset
     * @param contentType
     * @param time
     * @return
     */
    public static String reSendPostSeg1(String uri, UploadTempParams obj, String pwd, String charset, String contentType, int time) throws Exception {
        String resp = "";
        for (int i = 0; i < time; i++) {
            logger.info("第 " + (i+1) + " 次发送请求,请求报文:" + (obj.toString().length() > 600 ? obj.toString().substring(0, 600) : obj.toString()));
            resp = sendPostSeg1(uri, obj, charset, contentType);
            logger.info("第 " + (i+1) + " 次响应报文:" + resp);
            if(StringUtils.isNotBlank(resp)){
                break;
            }
            // 重发
            UploadTempParams param = new UploadTempParams();
            param.setUserid(obj.getUserid());
            param.setPwd(MD5.getMD5Str(obj.getUserid().toUpperCase() + RMSHttpConstant.FIXATION_SEQUENCE + pwd + param.getTimestamp()));
            param.setContent(obj.getContent());
            param.setSign(MD5.getMD5Str(obj.getContent() + param.getTimestamp() + RMSHttpConstant.FIXATION_SEQUENCE));
            param.setSmcontent(obj.getSmcontent());
            param.setSmparamreg(obj.getSmparamreg());
            param.setTitle(obj.getTitle());
            param.setTmplver(obj.getTmplver());
            param.setOrigin(obj.getOrigin());
            obj = param;
        }
        return resp;
    }

    /**
     * 分片传输http数据
     *
     * @return
     */
    public static String sendPostSeg1(String uri, UploadTempParams obj, String charset, String contentType) {
        long l = System.currentTimeMillis();
        String result = "";
        InputStream in = null;
        ByteArrayInputStream bArray = null;
        BufferedOutputStream out = null;
        InputStreamReader isr = null;
        BufferedReader buffer = null;
        HttpURLConnection urlcon = null;
        List<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
        JsonEntity jsonEntity = null;
        InputStream content = null;
        try {
            URL url = new URL(uri);
            logger.info("开始连接-->【" + uri + "】");
            urlcon = (HttpURLConnection) url.openConnection();
            logger.info("连接成功....");
            urlcon.setRequestMethod("POST");
            urlcon.setConnectTimeout(GeneralHttpConstant.CONNECT_TIMEOUT * 1000);
            urlcon.setReadTimeout(GeneralHttpConstant.READ_TIMEOUT * 1000);
            urlcon.setRequestProperty("Content-Type", contentType);
            // 富信模板上传服务端是短连接,此处与服务端一致
            urlcon.setRequestProperty("Connection", "close");
            urlcon.setRequestProperty("Accept-Encoding", "gzip,deflate");
            urlcon.setRequestProperty("Accept", "*");
            urlcon.setChunkedStreamingMode(1024 * GeneralHttpConstant.READ_SIZE);
//            urlcon.setRequestProperty("Cache-Control", null);
//            urlcon.setRequestProperty("Pragma", null);
            urlcon.setDoInput(true);
            urlcon.setDoOutput(true);
//            urlcon.setUseCaches(false);
            // 获取输出流,进行写数据
            //out = urlcon.getOutputStream();
             out = new BufferedOutputStream(urlcon.getOutputStream(),1024 * GeneralHttpConstant.READ_SIZE);
            int count = 0;
            int length = 0;
            // 数组大小
            int size = 1024 * GeneralHttpConstant.READ_SIZE;
            byte[] buf = new byte[size];
            parseParam(obj, params);
            jsonEntity = new JsonEntity(params, RMSHttpConstant.UTF8_ENCODE);
            content = jsonEntity.getContent();
            // 首先根据传递的字节数组将读取的字节的数量返回,在判断是否读取的空
            while ((length = content.read(buf)) != -1) {
                logger.info("第" + (++count) + "数据分片写入发送,length:" + length);
                out.write(buf, 0, length);

                out.flush();
//                out.close();
//                out=urlcon.getOutputStream();
//                urlcon.k
                Thread.sleep(GeneralHttpConstant.SLEEP_TIME);
            }
            // 获取输入流,获取响应
            in = urlcon.getInputStream();
            isr = new InputStreamReader(in, charset);
            buffer = new BufferedReader(isr);
            StringBuffer bs = new StringBuffer();
            String line = null;
            while ((line = buffer.readLine()) != null) {
                bs.append(line);
            }
            result = bs.toString();
        } catch (Exception e) {
            logger.error("[请求异常][地址:" + uri + "][错误信息:" + e.getMessage() + "]");
            result = "";
        } finally {
            try {
                if (null != buffer) {
                    buffer.close();
                }
            } catch (Exception e1) {
                logger.info("[关闭流异常][错误信息:" + e1.getMessage() + "]");
            }
            try {
                if (null != isr) {
                    isr.close();
                }
            } catch (Exception e2) {
                logger.info("[关闭流异常][错误信息:" + e2.getMessage() + "]");
            }
            try {
                if (null != in) {
                    in.close();
                }
            } catch (Exception e3) {
                logger.info("[关闭流异常][错误信息:" + e3.getMessage() + "]");
            }
            try {
                if (null != content) {
                    content.close();
                }
            } catch (Exception e13) {
                logger.info("[关闭流异常][错误信息:" + e13.getMessage() + "]");
            }
            try {
                if (null != bArray) {
                    bArray.close();
                }
            } catch (Exception e4) {
                logger.info("[关闭流异常][错误信息:" + e4.getMessage() + "]");
            }
            try {
                if (null != out) {
                    out.close();
                }
            } catch (Exception e5) {
                logger.info("[关闭流异常][错误信息:" + e5.getMessage() + "]");
            }
            try {
                if (null != urlcon) {
                    urlcon.disconnect();
                }
            } catch (Exception e6) {
                logger.info("[关闭HttpURLConnection异常][错误信息:" + e6.getMessage() + "]");
            }
        }
        logger.info("向【" + uri + "】发起http请求耗时:" + TimeFmtUtil.formatTime(System.currentTimeMillis()-l));
        return result;
    }

    /**
     * 重发2次
     * @param uri
     * @param msg
     * @param charset
     * @param contentType
     * @param time
     * @return
     */
    public static String reSendPostSeg(String uri, String msg, String charset, String contentType, int time){
        String resp = "";
        for (int i = 0; i < time; i++) {
            logger.info("第 " + (i+1) + " 次发送请求,请求报文:" + (msg.length() > 600 ? msg.substring(0, 600) : msg));
            resp = sendPostSeg(uri, msg, charset, contentType);
            logger.info("第 " + (i+1) + " 次响应报文:" + resp);
            if(StringUtils.isNotBlank(resp)){
                break;
            }
        }
        return resp;
    }

    /**
     * 分片传输http数据
     *
     * @return
     */
    public static String sendPostSeg(String uri, String msg, String charset, String contentType) {
        long l = System.currentTimeMillis();
        String result = "";
        InputStream in = null;
        ByteArrayInputStream bArray = null;
        OutputStream out = null;
        InputStreamReader isr = null;
        BufferedReader buffer = null;
        HttpURLConnection urlcon = null;
        try {
            URL url = new URL(uri);
            logger.info("开始连接-->【" + uri + "】");
            urlcon = (HttpURLConnection) url.openConnection();
            logger.info("连接成功....");
            urlcon.setRequestMethod("POST");
            urlcon.setConnectTimeout(GeneralHttpConstant.CONNECT_TIMEOUT * 1000);
            urlcon.setReadTimeout(GeneralHttpConstant.READ_TIMEOUT * 1000);
            urlcon.setRequestProperty("Content-Type", contentType);
            // 富信模板上传服务端是短连接,此处与服务端一致
            urlcon.setRequestProperty("Connection", "close");
            urlcon.setRequestProperty("Accept-Encoding", "gzip,deflate");
            urlcon.setRequestProperty("Accept", "*");
//            urlcon.setRequestProperty("Cache-Control", null);
//            urlcon.setRequestProperty("Pragma", null);
            urlcon.setDoInput(true);
            urlcon.setDoOutput(true);
//            urlcon.setUseCaches(false);
            // 获取输出流,进行写数据
            out = urlcon.getOutputStream();
            int count = 0;
            int length = 0;
            // 数组大小
            int size = 1024 * GeneralHttpConstant.READ_SIZE;
//            byte[] buf = new byte[size];
//            logger.info("长度:"+msg.getBytes(charset).length);
//            bArray = new ByteArrayInputStream(msg.getBytes(charset));
//            // 首先根据传递的字节数组将读取的字节的数量返回,在判断是否读取的空
//            while ((length = bArray.read(buf)) != -1) {
//                logger.info("第" + (++count) + "数据分片写入发送,length:" + length);
//                out.write(buf, 0, length);
//                out.flush();
//                Thread.sleep(GeneralHttpConstant.SLEEP_TIME);
//            }
            byte[] bytes = msg.getBytes(charset);
            out.write(bytes);
            out.flush();
            out.close();
            // 获取输入流,获取响应
            in = urlcon.getInputStream();
            isr = new InputStreamReader(in, charset);
            buffer = new BufferedReader(isr);
            StringBuffer bs = new StringBuffer();
            String line = null;
            while ((line = buffer.readLine()) != null) {
                bs.append(line);
            }
            result = bs.toString();
        } catch (Exception e) {
            logger.error("[请求异常][地址:" + uri + "][错误信息:" + e.getMessage() + "]");
            result = "";
        } finally {
            try {
                if (null != buffer) {
                    buffer.close();
                }
            } catch (Exception e1) {
                logger.info("[关闭流异常][错误信息:" + e1.getMessage() + "]");
            }
            try {
                if (null != isr) {
                    isr.close();
                }
            } catch (Exception e2) {
                logger.info("[关闭流异常][错误信息:" + e2.getMessage() + "]");
            }
            try {
                if (null != in) {
                    in.close();
                }
            } catch (Exception e3) {
                logger.info("[关闭流异常][错误信息:" + e3.getMessage() + "]");
            }
            try {
                if (null != bArray) {
                    bArray.close();
                }
            } catch (Exception e4) {
                logger.info("[关闭流异常][错误信息:" + e4.getMessage() + "]");
            }
            try {
                if (null != out) {
                    out.close();
                }
            } catch (Exception e5) {
                logger.info("[关闭流异常][错误信息:" + e5.getMessage() + "]");
            }
            try {
                if (null != urlcon) {
                    urlcon.disconnect();
                }
            } catch (Exception e6) {
                logger.info("[关闭HttpURLConnection异常][错误信息:" + e6.getMessage() + "]");
            }
        }
        logger.info("向【" + uri + "】发起http请求耗时:" + TimeFmtUtil.formatTime(System.currentTimeMillis()-l));
        return result;
    }


    public static void parseParam(Object obj, List<BasicNameValuePair> params) throws NoSuchMethodException,
            IllegalAccessException, InvocationTargetException {
        // 设置请求的参数
        String fieldName = null;
        String fieldNameUpper = null;
        Method getMethod = null;
        Class cls = obj.getClass();
        Field[] fields = cls.getDeclaredFields();
        Object value = null;
        for (int i = 0; i < fields.length; i++) {
            fieldName = fields[i].getName();
            if (!"serialVersionUID".equals(fieldName)) {
                fieldNameUpper = Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
                getMethod = cls.getMethod("get" + fieldNameUpper);
                value = getMethod.invoke(obj);
                if (value != null) {
                    params.add(new BasicNameValuePair(fieldName, String.valueOf(value)));
                }
            }
        }
    }

}

 

本文地址:https://blog.csdn.net/qq_36555327/article/details/107352471