【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务
一、什么是rpc
rpc是“远程调用(remote procedure call)”的一个名称的缩写,并不是任何规范化的协议,也不是大众都认知的协议标准,我们更多时候使用时都是创建的自定义化(例如socket,netty)的消息方式进行调用,相比http协议,我们省掉了不少http中无用的消息内容。因此很多系统内部调用仍然采用自定义化的rpc调用模式进行通信,毕竟速度和性能是内网的关键指标之一,而标准化和语义无关性在外网中举足轻重。所以,为何api网关无法工作在rpc上,因为它没有一个像http/https那样的通用标准。
二、czarrpc简介
czarrpc是作者基于dotnetty实现的rpc通讯框架,参考了surging
和tars.net
优秀设计,目前正在内部使用中,下面就czarrpc调用方式做一个简单介绍,测试结构如下:
1、服务接口
新建一个czar.rpc.common
类库,首先需要引用czar.rpc
nuget包。
install-package czar.rpc
然后定义测试接口ihellorpc.cs
,也是目前支持的调用方式。
using czar.rpc.attributes; using czar.rpc.exceptions; using czar.rpc.metadata; using system; using system.collections.generic; using system.threading.tasks; namespace czar.rpc.common { /// <summary> /// 测试rpc实体 /// </summary> [businessexceptioninterceptor] [czarrpc("demo.rpc.hello")] public interface ihellorpc: irpcbaseservice { string hello(int no, string name); void helloholder(int no, out string name); task<string> hellotask(int no, string name); valuetask<string> hellovaluetask(int no, string name); [czaroneway] void hellooneway(int no, string name); task testbusinessexceptioninterceptor(); demomodel hellomodel(int d1, string d2, datetime d3); task<demomodel> hellomodelasync(int d1, string d2, datetime d3); demomodel hellosendmodel(demomodel model); demomodel hellosendmodelparm(string name,demomodel model); list<demomodel> hellosendmodellist(list<demomodel> model); } public class demomodel { /// <summary> /// 测试1 /// </summary> public int t1 { get; set; } /// <summary> /// 测试2 /// </summary> public string t2 { get; set; } /// <summary> /// 测试3 /// </summary> public datetime t3 { get; set; } public childmodel child { get; set; } } public class childmodel { public string c1 { get; set; } } }
2.服务端
新建一个控制台程序czar.rpc.server
,然后实现服务接口,因为都是测试数据,所以就随意实现了方法。
hellorpcserver.cs
using czar.rpc.exceptions; using system; using system.collections.generic; using system.threading.tasks; using system.linq; using system.net; using czar.rpc.common; namespace demo.rpc.server { public class hellorpcserver: ihellorpc { public endpoint czarendpoint { get; set; } public string hello(int no, string name) { string result = $"{no}: hi, {name}"; console.writeline(result); return result + " callback"; } public void helloholder(int no, out string name) { name = no.tostring() + " out"; } public void hellooneway(int no, string name) { /* 耗时操作 */ console.writeline($"from oneway - {no}: hi, {name}"); } public task<string> hellotask(int no, string name) { return task.fromresult(hello(no, name)); } public valuetask<string> hellovaluetask(int no, string name) { return new valuetask<string>(hello(no, name)); } public task testbusinessexceptioninterceptor() { throw new businessexception() { czarcode = "1", czarmessage = "test" }; } public demomodel hellomodel(int d1, string d2, datetime d3) { return new demomodel() { t1 = d1 + 1, t2 = d2 + "2", t3 = d3.adddays(1) }; } public async task<demomodel> hellomodelasync(int d1, string d2, datetime d3) { return await task.fromresult( new demomodel() { t1 = d1 + 1, t2 = d2 + "77777", t3 = d3.adddays(1) } ); } public demomodel hellosendmodel(demomodel model) { model.t1 = model.t1 + 10; model.t2 = model.t2 + "11"; model.t3 = model.t3.adddays(12); return model; } public demomodel hellosendmodelparm(string name, demomodel model) { model.t1 = model.t1 + 10; model.t2 = model.t2 + "11"; model.t3 = model.t3.adddays(12); if (model.child != null) { model.child.c1 = name+"说:"+ model.child.c1; } return model; } public list<demomodel> hellosendmodellist(list<demomodel> model) { return model.select(t => new demomodel() { t1=t.t1+10,t2=t.t2+"13",t3=t.t3.addyears(1),child=t.child }).tolist(); } } }
然后启动服务端监听。
class program { static void main(string[] args) { var host = new hostbuilder() .configurehostconfiguration(i => i.addjsonfile("czarconfig.json")) .configurelogging((hostcontext, configlogging) => { configlogging.addconsole(); }) .usecodec<jsoncodec>() .uselibuvtcphost() .useproxy() .useconsolelifetime() .build(); host.runasync().wait(); } }
启用外部使用czarconfig.json的配置文件,注意需要设置成始终复制。
{ "czarhost": { "port": 7711, //监听端口 "quietperiodseconds": 2, //退出静默时间 dotnetty特性 "shutdowntimeoutseconds": 2, //关闭超时时间 dotnetty特性 "isssl": "false", //是否启用 ssl, 客户端需要保持一致 "pfxpath": "cert/datasync.pfx", //证书 "pfxpassword": "123456" //证书密钥 } }
到此服务器端搭载完成。
3、客户端
新建客户端控制台程序czar.rpc.client
,然后配置rpc调用信息。
{ "czarhost": { "proxyendpoint": true, //是否启用动态服务地址,就是指定服务端ip "isssl": "false", //是否启用ssl "pfxpath": "cert/datasync.pfx", //证书 "pfxpassword": "123456", //证书密钥 "clientconfig": { //客户端配置 "demo.rpc.hello": { //对应服务[czarrpc("demo.rpc.hello")] 值 "host": "127.0.0.1", //服务端ip 如果proxyendpoint=false 时使用 "port": 7711, //服务端端口 如果proxyendpoint=false 时使用 "timeout": 10, //调用超时时间 "writeridletimeseconds";30 //空闲超时时间,默认为30秒,非内网环境建议设置成5分钟内。 } } } }
现在开始启用客户端信息。
class program { public static iserviceprovider service; public static iconfiguration config; static async task main(string[] args) { try { var builder = new configurationbuilder(); config = builder.addjsonfile("czarconfig.json").build(); service = new servicecollection() .addsingleton(config) .addlogging(j => j.addconsole()) .addlibuvtcpclient(config) .addproxy() .builddynamicproxyserviceprovider(); var rpc = service.getrequiredservice<ihellorpc>(); //使用的内部指定的服务器地址 rpc.czarendpoint = new ipendpoint(ipaddress.parse("127.0.0.1"), 7711); var result = string.empty; string t = "基本调用"; result = rpc.hello(18, t); console.writeline(result); result = "无返回结果"; rpc.helloholder(1, out result); console.writeline(result); result = await rpc.hellotask(2, "异步任务"); console.writeline(result); result = "单向"; rpc.hellooneway(3, "单向调用"); console.writeline(result); result = await rpc.hellovaluetask(4, "valuetask任务"); console.writeline(result); var modelresult = rpc.hellomodel(5, "返回实体", datetime.now); console.writeline($"{modelresult.t1} {modelresult.t2} {modelresult.t3.tolongdatestring()}"); var modelresult1 = await rpc.hellomodelasync(6, "返回task实体", datetime.now); console.writeline($"{modelresult1.t1} {modelresult1.t2} {modelresult1.t3.tolongdatestring()}"); var mm = new demomodel() { t1 = 7, t2 = "传实体返回实体", t3 = datetime.now, child = new childmodel() { c1 = "子类1" } }; var model2 = rpc.hellosendmodel(mm); console.writeline($"{model2.t1} {model2.t2} {model2.t3.tolongdatestring()} {model2.child.c1}"); var list = new list<demomodel>(); var mm1 = new demomodel() { t1 = 8, t2 = "传list返回list", t3 = datetime.now, child = new childmodel() { c1 = "子类2" } }; var mm3 = new demomodel() { t1 = 9, t2 = "传list返回list", t3 = datetime.now, child = new childmodel() { c1 = "子类3" } }; list.add(mm1); list.add(mm3); var list3 = rpc.hellosendmodellist(list); console.writeline($"{list3[0].t1} {list3[0].t2} {list3[0].t3.tolongdatestring()} {list3[0].child?.c1}"); var mm4 = new demomodel() { t1 = 9, t2 = "hellosendmodelparm", t3 = datetime.now, child = new childmodel() { c1 = "子类4" } }; var dd = rpc.hellosendmodelparm("hellosendmodelparm", mm4); console.writeline($"{dd.t1} {dd.t2} {dd.t3.tolongdatestring()} {dd.child.c1}"); //异常调用 await rpc.testbusinessexceptioninterceptor(); } catch (businessexception e) { console.writeline($"czarcode:{e.czarcode} czarmessage:{e.czarmessage}"); } catch (exception ex) { console.writeline(ex); } console.readline(); } }
现在整个rpc调用搭建完毕,然后分别启动服务器端和客户端,就可以看到屏幕输出内容如下。
客户端输出:
服务器端输出:
至此整个czarrpc的基本使用已经介绍完毕,感兴趣的朋友可以自行测试。
三、ocelot增加rpc支持
有了czarrpc
的通讯框架后,现在在ocelot
上实现rpc
功能简直易如反掌,现在开始添加我们的rpc
中间件,也让我们扩展的网关灵活起来。
还记得我介绍网关篇时添加中间件的步骤吗?如果不记得的可以先回去回顾下。
首先如何让网关知道这个后端调用是http
还是rpc
呢?这时应该会想到ocelot
路由配置里的downstreamscheme
,可以在这里判断我们定义的是http
还是rpc
即可。同时我们希望之前定义的所有中间件都生效,最后一步请求时如果配置下端路由rpc
,使用rpc
调用,否则使用http
调用,这样可以重复利用之前所有的中间件功能,减少重复开发。
在之前的开发的自定义限流和自定义授权中间件开发中,我们知道开发完的中间件放到哪里使用,这里就不介绍原理了,直接添加到buildczarocelotpipeline
里如下代码。
public static ocelotrequestdelegate buildczarocelotpipeline(this iocelotpipelinebuilder builder, ocelotpipelineconfiguration pipelineconfiguration) { // 注册一个全局异常 builder.useexceptionhandlermiddleware(); // 如果请求是websocket使用单独的管道 builder.mapwhen(context => context.httpcontext.websockets.iswebsocketrequest, app => { app.usedownstreamroutefindermiddleware(); app.usedownstreamrequestinitialiser(); app.useloadbalancingmiddleware(); app.usedownstreamurlcreatormiddleware(); app.usewebsocketsproxymiddleware(); }); // 添加自定义的错误管道 builder.useifnotnull(pipelineconfiguration.preerrorrespondermiddleware); //使用自定义的输出管道 builder.useczarrespondermiddleware(); // 下游路由匹配管道 builder.usedownstreamroutefindermiddleware(); //增加自定义扩展管道 if (pipelineconfiguration.mapwhenocelotpipeline != null) { foreach (var pipeline in pipelineconfiguration.mapwhenocelotpipeline) { builder.mapwhen(pipeline); } } // 使用http头部转换管道 builder.usehttpheaderstransformationmiddleware(); // 初始化下游请求管道 builder.usedownstreamrequestinitialiser(); // 使用自定义限流管道 builder.useratelimiting(); //使用请求id生成管道 builder.userequestidmiddleware(); //使用自定义授权前管道 builder.useifnotnull(pipelineconfiguration.preauthenticationmiddleware); //根据请求判断是否启用授权来使用管道 if (pipelineconfiguration.authenticationmiddleware == null) { builder.useauthenticationmiddleware(); } else { builder.use(pipelineconfiguration.authenticationmiddleware); } //添加自定义限流中间件 2018-11-18 金焰的世界 builder.useczarclientratelimitmiddleware(); //添加自定义授权中间件 2018-11-15 金焰的世界 builder.useahphauthenticationmiddleware(); //启用自定义的认证之前中间件 builder.useifnotnull(pipelineconfiguration.preauthorisationmiddleware); //是否使用自定义的认证中间件 if (pipelineconfiguration.authorisationmiddleware == null) { builder.useauthorisationmiddleware(); } else { builder.use(pipelineconfiguration.authorisationmiddleware); } // 使用自定义的参数构建中间件 builder.useifnotnull(pipelineconfiguration.prequerystringbuildermiddleware); // 使用负载均衡中间件 builder.useloadbalancingmiddleware(); // 使用下游地址创建中间件 builder.usedownstreamurlcreatormiddleware(); // 使用缓存中间件 builder.useoutputcachemiddleware(); //判断下游的是否启用rpc通信,切换到rpc处理 builder.mapwhen(context => context.downstreamreroute.downstreamscheme.equals("rpc", stringcomparison.ordinalignorecase), app => { app.useczarrpcmiddleware(); }); //使用下游请求中间件 builder.useczahttprequestermiddleware(); return builder.build(); }
这里是在最后请求前判断使用的下游请求方式,如果downstreamscheme
使用的rpc
,就使用rpc
中间件处理。
rpc处理的完整逻辑是,如何从http请求中获取想要解析的参数,这里需要设置匹配的优先级,目前设计的优先级为。
1、首先提取路由参数,如果匹配上就是用路由参数名称为key,值为value,按顺序组成第一批参数。
2、提取query参数,如有有值按顺序组成第二批参数。
3、如果非get请求,提取body内容,如果非空,组成第三批参数
4、从配置库里提取rpc路由调用的服务名称和函数名称,以及是否单向调用。
5、按照获取的数据进行rpc调用并等待返回。
看了上面的设计是不是思路很清晰了呢?
1、rpc路由表设计
create table ahphrerouterpcconfig ( rpcid int identity(1,1) not null, rerouteid int, //路由表主键 servantname varchar(100) not null, //调用的服务名称 funcname varchar(100) not null, //调用的方法名称 isoneway bit not null //是否单向调用 )
2、提取远程调用方法
根据上游路由获取远程调用的配置项目
public interface irpcrepository { /// <summary> /// 根据模板地址获取rpc请求方法 /// </summary> /// <param name="upurl">上游模板</param> /// <returns></returns> task<remoteinvokemessage> getremotemethodasync(string upurl); } public class sqlserverrpcrepository : irpcrepository { private readonly czarocelotconfiguration _option; public sqlserverrpcrepository(czarocelotconfiguration option) { _option = option; } /// <summary> /// 获取rpc调用方法 /// </summary> /// <param name="upurl"></param> /// <returns></returns> public async task<remoteinvokemessage> getremotemethodasync(string upurl) { using (var connection = new sqlconnection(_option.dbconnectionstrings)) { string sql = @"select t4.* from ahphglobalconfiguration t1 inner join ahphconfigreroutes t2 on t1.ahphid=t2.ahphid inner join ahphreroute t3 on t2.rerouteid=t3.rerouteid inner join ahphrerouterpcconfig t4 on t3.rerouteid=t4.rerouteid where isdefault=1 and t1.infostatus=1 and t3.infostatus=1 and upstreampathtemplate=@url"; var result = await connection.queryfirstordefaultasync<remoteinvokemessage>(sql, new { url = upurl }); return result; } } }
3、重写返回结果
由于rpc调用后是返回的json封装的信息,需要解析成对应的httpcontent。
using system.io; using system.net; using system.net.http; using system.threading.tasks; namespace czar.gateway.rpc { public class rpchttpcontent : httpcontent { private string result; public rpchttpcontent(string result) { this.result = result; } public rpchttpcontent(object result) { this.result = newtonsoft.json.jsonconvert.serializeobject(result); } protected override async task serializetostreamasync(stream stream, transportcontext context) { var writer = new streamwriter(stream); await writer.writeasync(result); await writer.flushasync(); } protected override bool trycomputelength(out long length) { length = result.length; return true; } } }
4、rpc中间件逻辑处理
有了前面的准备信息,现在基本可以完成逻辑代码的开发了,详细的中间件代码如下。
using czar.gateway.errors; using czar.rpc.clients; using ocelot.logging; using ocelot.middleware; using ocelot.responses; using system.collections.generic; using system.net; using system.threading.tasks; namespace czar.gateway.rpc.middleware { public class czarrpcmiddleware : ocelotmiddleware { private readonly ocelotrequestdelegate _next; private readonly irpcclientfactory _clientfactory; private readonly iczarrpcprocessor _czarrpcprocessor; public czarrpcmiddleware(ocelotrequestdelegate next, irpcclientfactory clientfactory, iocelotloggerfactory loggerfactory, iczarrpcprocessor czarrpcprocessor) : base(loggerfactory.createlogger<czarrpcmiddleware>()) { _next = next; _clientfactory = clientfactory; _czarrpcprocessor = czarrpcprocessor; } public async task invoke(downstreamcontext context) { var httpstatuscode = httpstatuscode.ok; var _param = new list<object>(); //1、提取路由参数 var tmpinfo = context.templateplaceholdernameandvalues; if (tmpinfo != null && tmpinfo.count > 0) { foreach (var tmp in tmpinfo) { _param.add(tmp.value); } } //2、提取query参数 foreach (var _q in context.httpcontext.request.query) { _param.add(_q.value.tostring()); } //3、从body里提取内容 if (context.httpcontext.request.method.toupper() != "get") { context.downstreamrequest.scheme = "http"; var requert = context.downstreamrequest.tohttprequestmessage(); if (requert.content!=null) { var json = "{}"; json = await requert.content.readasstringasync(); _param.add(json); } } //从缓存里提取 var req = await _czarrpcprocessor.getremotemethodasync(context.downstreamreroute.upstreampathtemplate.originalvalue); if (req != null) { req.parameters = _param.toarray(); var result = await _clientfactory.sendasync(req, getendpoint(context.downstreamrequest.host, context.downstreamrequest.port)); okresponse<rpchttpcontent> httpresponse; if (result.czarcode == czar.rpc.utilitys.rpcstatuscode.success) { httpresponse = new okresponse<rpchttpcontent>(new rpchttpcontent(result.czarresult?.tostring())); } else { httpresponse = new okresponse<rpchttpcontent>(new rpchttpcontent(result)); } context.httpcontext.response.contenttype = "application/json"; context.downstreamresponse = new downstreamresponse(httpresponse.data, httpstatuscode, httpresponse.data.headers, "ok"); } else {//输出错误 var error = new internalservererror($"请求路由 {context.httpcontext.request.path}未配置后端转发"); logger.logwarning($"{error}"); setpipelineerror(context, error); } } private endpoint getendpoint(string ipaddress, int port) { if (ipaddress.tryparse(ipaddress, out ipaddress ip)) { return new ipendpoint(ip, port); } else { return new dnsendpoint(ipaddress, port); } } } }
5、启动rpc客户端配置
目前rpc的客户端配置我们还没启动,只需要在addczarocelot
中添加相关注入即可。
var service = builder.first(x => x.servicetype == typeof(iconfiguration)); var configuration = (iconfiguration)service.implementationinstance; //rpc应用 builder.addsingleton<iczarrpcprocessor, czarrpcprocessor>(); builder.addsingleton<irpcrepository, sqlserverrpcrepository>(); builder.addlibuvtcpclient(configuration);
6、配置客户端
最后别忘了配置rpc客户端信息是否启用证书信息,为了配置信息的内容。
{ "czarhost": { "proxyendpoint": true, "isssl": "false", "pfxpath": "cert/datasync.pfx", "pfxpassword": "bl123456", "clientconfig": { "demo.rpc.hello": { "host": "127.0.0.1", "port": 7711, "timeout": 20 } } } }
现在让网关集成rpc功能全部配置完毕。
四、网关rpc功能测试
本次测试我在原有的网关基础上,增加不同类型的rpc调用,就按照不同维度测试rpc调用功能,本次测试案例是建立在czar.rpc 服务端基础上,正好可以测试。
1、测试路由参数
请求路径/hello/{no}/{name}
,调用的服务端方法hello
,传入的两个参数分别是no ,name
。
可以在服务器端添加断点调试,发现确实接收到请求信息,并正常返回,下面是postman
测试结果。
2、使用query方式传递参数
请求路径/rpc/query
,调用的服务端方法还是hello
,参数分别是no ,name
。
3、使用post方式传递json
请求路径/rpc/body
,调用的服务器方法是hellosendmodel
。
4、混合参数使用
请求的路径/rpc/bodyparm/{name}
,调用的服务器端方法是hellosendmodelparm
。
所有的返回结果可自行调试测试,发现都能达到预期结果。
同时此网关还是支持默认的http请求的,这里就不一一测试了。
五、总结
本篇我介绍了什么是rpc,以及czar.rpc的基本使用,然后使用czar.rpc框架集成到我们基于ocelot扩展网关中,并实现了不能方式的rpc调用,可以在几乎不改变现有流程的情况下很快速的集成进去,这也是ocelot开发框架的魅力所在。
如果在使用过程中有什么问题或建议,可以在.net core项目实战交流群(637326624)
中联系作者。
最后本文涉及的所有的源代码可在中下载预览。
推荐阅读
-
【.NET Core项目实战-统一认证平台】第二章网关篇-重构Ocelot来满足需求
-
【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务
-
【.NET Core项目实战-统一认证平台】第十二章 授权篇-深入理解JWT生成及验证流程
-
【.NET Core项目实战-统一认证平台】第三章 网关篇-数据库存储配置(1)
-
【.NET Core项目实战-统一认证平台】第六章 网关篇-自定义客户端授权
-
【.NET Core项目实战-统一认证平台】第十三章 授权篇-如何强制有效令牌过期
-
【.NET Core项目实战-统一认证平台】第十四章 授权篇-自定义授权方式
-
【.NET Core项目实战-统一认证平台】第十五章 网关篇-使用二级缓存提升性能
-
【.NET Core项目实战-统一认证平台】第十章 授权篇-客户端授权
-
【.NET Core项目实战-统一认证平台】第五章 网关篇-自定义缓存Redis