基于Gor实现流量复制(加middleware功能增强)
最近做功能重构,在上线前要求验证重构后的代码与老代码实现逻辑是否一致,基于这个需求,需要在生产环境做一个功能将生产服务器上的流量复制一份发送到测试服务器上。
就这个事情这几天考察了三种技术,1.基于nginx+lua脚本,2.tcpcopy,3.gor。这里大概说一下这三种方案:
- 1.nginx+lua脚本
这种方案的思路是在生产服务器前端架一层壳子,将请求拦截,然后基于lua-nginx-module模块,写lua脚本,使用其内置的ngx.location.capture_multi,对后端发起多个异步并发请求,然后统一将结果返回给前端。
该方案需要安装nginx,以及依赖lua-nginx-module,ngx_devel_kit等模块,然后需要写lua代码来复制请求。
- 2.tcpcopy
这种方案是工作在网络等TCP和IP层做请求复制,因为其间架构调整过两次,现在的实现架构是在生产环境启动tcpcopy进程,测试环境启动intecept进程,然后配置复制请求的路径。
- 3.gor
这是今天搜到的用Go语言写的工具,在生产服务器上安装一个tar包,用root权限启动命令即可
1方案安装步骤较多,需要理解nginx处理请求的过程和lua脚本语法以及相关请求调用的API。2.安装简单一些,只需要在生产和测试服务器分别安装tcpcopy和intecept即可(当然前提是他们依赖的libpcap之类也有了,否则也要安装),然后启动命令加参数即可,但由于该方案工作在较为底层,看起来比较重,实际跑了个例子就没继续研究。3.安装最简单,下载一个tar包,解压,sudo执行即搞定。
个人比较倾向于3,所以这里就介绍一下gor的实现方式:
- 1.下载
根据操作系统环境下载安装包 https://github.com/buger/gor/releases,建议选择master分支的,我的是Mac,所以选择了tar包
- 2.解压
tar -xvf gor_v0.14.1_mac.tar.gz
- 3. 验证
sudo ./gor --input-raw :8080 --output-http http://192.168.22.33:8080
至此就搞定了,简单吧!!!
这条命令是监控本地的8080端口,并实时复制请求到需要192.168.22.33的8080端口上,需要本地root执行权限。
下面是我的扩展用法:
- 4. 保存请求到文件
sudo ./gor --input-raw :8080 --output-file requests.gor
这里将8080端口的请求保存到本地文件上,可以用于线上请求记录之后的功能回放
- 5. 根据文件回放请求
sudo ./gor --input-file requests.gor --output-http http://192.168.22.33:8080
将上面保存的文件请求回放到192.168.22.33服务器的8080端口上
- 6.url过滤
包含/order的URL才发送请求
sudo ./gor --input-raw :8080 --output-http http://192.168.22.33:8080 --http-allow-url ^/order.
- 7.url过滤+记录文件+请求回放
sudo ./gor --input-raw :8080 --output-file gor-order-requests.gor --output-http http://192.168.22.33:8080 --http-allow-url ^/order.
sudo ./gor --input-file gor-order-requests.gor --output-http http://192.168.22.33:8080
- 8.url过滤+记录文件+记录响应
sudo ./gor --input-raw-track-response --input-raw :8080 --output-file gor-order-request-response.gor --http-allow-url ^/order.
下面是别人整理的一些详细配置说明,可以参考一下
-cpuprofile string write cpu profile to file -debug verbose 打开debug模式,显示所有接口的流量 -http-allow-header value 用一个正则表达式来匹配http头部,如果请求的头部没有匹配上,则被拒绝 gor --input-raw :8080 --output-http staging.com --http-allow-header api-version:^v1 (default []) -http-allow-method value 类似于一个白名单机制来允许通过的http请求方法,除此之外的方法都被拒绝. gor --input-raw :8080 --output-http staging.com --http-allow-method GET --http-allow-method OPTIONS (default []) -http-allow-url value 一个正则表达式用来匹配url, 用来过滤完全匹配的的url,在此之外的都被过滤掉 gor --input-raw :8080 --output-http staging.com --http-allow-url ^www. (default []) -http-disallow-header value 用一个正则表达式来匹配http头部,匹配到的请求会被拒绝掉 gor --input-raw :8080 --output-http staging.com --http-disallow-header "User-Agent: Replayed by Gor" (default []) -http-disallow-url value 用一个正则表达式来匹配url,如果请求匹配上了,则会被拒绝 gor --input-raw :8080 --output-http staging.com --http-disallow-url ^www. (default []) -http-header-limiter value 读取请求,基于FNV32-1A散列来拒绝一定比例的特殊请求 gor --input-raw :8080 --output-http staging.com --http-header-imiter user-id:25% (default []) -http-original-host 在--output-http的输出中,通常gor会使用取代请求的http头,所以应该禁用该选项,保留原始的主机头 -http-param-limiter value Takes a fraction of requests, consistently taking or rejecting a request based on the FNV32-1A hash of a specific GET param: gor --input-raw :8080 --output-http staging.com --http-param-limiter user_id:25% (default []) -http-rewrite-url value Rewrite the request url based on a mapping: gor --input-raw :8080 --output-http staging.com --http-rewrite-url /v1/user/([^\/]+)/ping:/v2/user/$1/ping (default []) -http-set-header value Inject additional headers to http reqest: gor --input-raw :8080 --output-http staging.com --http-set-header 'User-Agent: Gor' (default []) -http-set-param value Set request url param, if param already exists it will be overwritten: gor --input-raw :8080 --output-http staging.com --http-set-param api_key=1 (default []) -input-dummy value Used for testing outputs. Emits 'Get /' request every 1s (default []) -input-file value 从一个文件中读取请求 gor --input-file ./requests.gor --output-http staging.com (default []) -input-http value 从一个http接口读取请求 # Listen for http on 9000 gor --input-http :9000 --output-http staging.com (default []) -input-raw value Capture traffic from given port (use RAW sockets and require *sudo* access): # Capture traffic from 8080 port gor --input-raw :8080 --output-http staging.com (default []) -input-tcp value 用来在多个gor之间流转流量 # Receive requests from other Gor instances on 28020 port, and redirect output to staging gor --input-tcp :28020 --output-http staging.com (default []) -memprofile string write memory profile to this file -middleware string Used for modifying traffic using external command -output-dummy value 用来测试输入,打印出接收的数据. (default []) -output-file value 把进入的请求写入一个文件中 gor --input-raw :80 --output-file ./requests.gor (default []) -output-http value 转发进入的请求到一个http地址上 # Redirect all incoming requests to staging.com address gor --input-raw :80 --output-http http://staging.com (default []) -output-http-elasticsearch string 把请求和响应状态发送到ElasticSearch: gor --input-raw :8080 --output-http staging.com --output-http-elasticsearch 'es_host:api_port/index_name' -output-http-redirects int 设置多少次重定向被允许 -output-http-stats 每5秒钟输出一次输出队列的状态 -output-http-timeout duration 指定http的request/response超时时间,默认是5秒 -output-http-workers int gor默认是动态的扩展工作者数量,你也可以指定固定数量的工作者 -output-tcp value 用来在多个gor之间流转流量 # Listen for requests on 80 port and forward them to other Gor instance on 28020 port gor --input-raw :80 --output-tcp replay.local:28020 (default []) -output-tcp-stats 每5秒钟报告一次tcp输出队列的状态 -split-output true By default each output gets same traffic. If set to true it splits traffic equally among all outputs. -stats 打开输出队列的状态 -verbose Turn on more verbose output
当然最好的还是直接关注作者的git项目:https://github.com/buger/gor/wiki
下面是我基于这个工具做的一个middleware的介绍:
关于Middleware的原理建议看看https://github.com/buger/gor/wiki/Middleware,不再赘述。这里介绍下我做的MiddleWare实现的功能:
1.我需要将生产环境的请求回放到测试环境,然后将生产的响应和测试的响应结果做比对,以校验功能重构是否正常。所以需要在生产的日志结果中加一个标记(比如自增长的ID或随机数等),同时在请求回放的时候能够将该标记带到测试环境去。
原来想法是加一个自定义的请求Header,经过试验发现这并不能将结果带到测试输出的请求响应文件中,导致无法根据两份日志文件比对。所以直接在请求体的第一行开头中加上一个自定义固定的URL参数:GorRequestId=***&,这个GorRequestId的值取的就是请求块中第一行的第二项。根据git上的描述,该值本来就是作者来做request和response的比对用的。
2.gor支持根据URL匹配过滤请求,但目前还不能同时过滤出请求对应的响应,我通过自定义的java版middleware来实现了这个需求,原理是在解析请求块的时候记录下需要输出的URL的requestId到一个HashSet中,在解析响应体的时候根据requestId匹配过滤输出。利用的就是请求和响应公用一个requestId这个特性。(这个问题我已经向作者提了isssue:https://github.com/buger/gor/issues/344,根据回复后续会实现该功能。)
下面就是我的代码实现:
package go.middleware; import javax.xml.bind.DatatypeConverter; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.HashSet; import java.util.Set; /** * Gor中间件Java版本,增强的功能有: * * 1.在请求体中注入参数GorRequestId,用于请求回放时的原始请求比对 * 2.支持根据url配置过滤请求和响应的输出 * <p> * Created by niwei on 16/7/22. */ public class Stdout { private static final String SPLITTER_HEADER_BODY_SPLITTER = "\r\n\r\n"; private static final String SPLITTER_HEAD_FIRST_LINE = "\n"; private static final String SPLITTER_HEADER_ITEM = " "; /** * payload type, possible values: 1 - request, 2 - original response, 3 - replayed response */ private static final String PAYLOAD_TYPE_REQUEST = "1"; private static final String PAYLOAD_TYPE_ORIGINAL_RESPONSE = "2"; /** * 定义新增加的requestId参数名称 */ private static String INJECT_TO_REQUEST_ENTITY_REQUEST_ID = "GorRequestId"; /** * 定义需要输出的请求和响应的requestId */ private static Set<String> recordRequestIds = new HashSet<>(); /** * convert hex to string * * @param hexStr * @return * @throws Exception */ public static String hexDecode(String hexStr) throws Exception { byte[] decodedHex = DatatypeConverter.parseHexBinary(hexStr); String decodedString = new String(decodedHex, "UTF-8"); return decodedString; } /** * convert string to hex * * @param str * @return * @throws Exception */ public static String encodeHex(String str) throws Exception { if (str == null) { return null; } byte[] strBytes = str.getBytes(); String encodeString = DatatypeConverter.printHexBinary(strBytes); return encodeString; } private static String getRequestHeader(String key, String value) { StringBuilder result = new StringBuilder(SPLITTER_HEAD_FIRST_LINE); result.append(key).append(":").append(SPLITTER_HEADER_ITEM).append(value); return result.toString(); } /** * gor原始内容增强 * * @param content 原始的gor工具输出的内容 * @param allowUrlRegular 允许记录文件的url正则表达式 * @return 增强后输出的内容 */ public static String enhanceContent(String content, String allowUrlRegular) { if ((allowUrlRegular == null) || (allowUrlRegular.trim().equals(""))){ allowUrlRegular = "*"; } String result = content; /** * get first line content */ String[] lines = content.split(SPLITTER_HEAD_FIRST_LINE); if (lines == null || lines.length <= 1) { return result; } String firstLine = lines[0]; String secondLine = lines[1]; String[] firstLineItems = firstLine.split(SPLITTER_HEADER_ITEM); if (firstLineItems.length != 3) { return result; } else { String payloadType = firstLineItems[0]; String requestId = firstLineItems[1]; if (PAYLOAD_TYPE_REQUEST.equals(payloadType)) { String[] secondLineItems = secondLine.split(SPLITTER_HEADER_ITEM); String url = secondLineItems[1]; String uri = url; int urlIndex = url.indexOf("?"); if (urlIndex > 0) { uri = url.substring(0, urlIndex); } String requestIdPair = INJECT_TO_REQUEST_ENTITY_REQUEST_ID + "=" + requestId + "&"; result = content.replaceFirst(SPLITTER_HEADER_BODY_SPLITTER, SPLITTER_HEADER_BODY_SPLITTER + requestIdPair); boolean isMatch = false; String[] allowUrls = allowUrlRegular.split(","); for (String allowUrl : allowUrls) { if (uri.matches(allowUrl)){ recordRequestIds.add(requestId); isMatch = true; break; } } if(!isMatch){ //URL不能匹配上的则不输出到文件 result = ""; } } else if (PAYLOAD_TYPE_ORIGINAL_RESPONSE.equals(payloadType)) { if (recordRequestIds.contains(requestId)) { recordRequestIds.remove(requestId); } else {//不再recordRequestIds记录中则不输出到文件 result = ""; } } } return result; } /** * java go.GorEnhance * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { String line; StringBuilder allowUrlRegular = new StringBuilder(); int bytesRead = 0; byte[] buffer = new byte[1024]; try (BufferedInputStream bufferedInput = new BufferedInputStream(Class.class.getClassLoader().getSystemResourceAsStream("go/middleware/allow-url.txt"))) { while ((bytesRead = bufferedInput.read(buffer)) != -1) { allowUrlRegular.append(new String(buffer, 0, bytesRead)); } } BufferedReader stdin = new BufferedReader(new InputStreamReader( System.in)); while ((line = stdin.readLine()) != null) { System.out.println(encodeHex(enhanceContent(hexDecode(line), allowUrlRegular.toString()))); } } }
在运行gor命令时,加上参数--middleware "java go.middleware.Stdout" 就可以了。代码中的go/middleware/allow-url.txt是在当前类的同级目录下增加的一个URL过滤的配置文件:比如.*confirm.*,就将只记录URL中包含confirm的请求,如果有多项URL则直接以逗号(,)分割即可。
本项目源码已经放在github上:https://github.com/niweicumt/copyflow
下一篇: 使用源码编译安装nginx