扩展gRPC支持consul服务发现和Polly策略
grpc由于需要用工具生成代码实现,可开发性不是很高,在扩展这方面不是很友好
最近研究了下,进行了扩展,不需要额外的工具生成,直接使用默认grpc.tools生成的代理类即可
相关源码在文章底部
客户端目标:
- 能配置consul地址和服务名称,在调用client时能正确请求到真实的服务地址
- 在调用方法时,能使用polly策略重试,超时,和熔断
查看grpc生成的代码,可以看到client实例化有有两个构造方法,以测试为例
/// <summary>creates a new client for greeter</summary> /// <param name="channel">the channel to use to make remote calls.</param> public greeterclient(grpc::channelbase channel) : base(channel) { } /// <summary>creates a new client for greeter that uses a custom <c>callinvoker</c>.</summary> /// <param name="callinvoker">the callinvoker to use to make remote calls.</param> public greeterclient(grpc::callinvoker callinvoker) : base(callinvoker) { }
1.可传入一个channelbase实例化
2.可传入一个callinvoker实例化
channel可实现为
channel createchannel(string address) { var channeloptions = new list<channeloption>() { new channeloption(channeloptions.maxreceivemessagelength, int.maxvalue), new channeloption(channeloptions.maxsendmessagelength, int.maxvalue), }; var channel = new channel(address, channelcredentials.insecure, channeloptions); return channel; }
在这里,可以从consul地址按服务名获取真实的服务地址,生成channel
callinvoker为一个抽象类,若要对方法执行过程干预,则需要重写这个方法,大致实现为
public class grpccallinvoker : callinvoker { public readonly channel channel; public grpccallinvoker(channel channel) { channel = grpcpreconditions.checknotnull(channel); } public override tresponse blockingunarycall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request) { return calls.blockingunarycall(createcall(method, host, options), request); } public override asyncunarycall<tresponse> asyncunarycall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request) { return calls.asyncunarycall(createcall(method, host, options), request); } public override asyncserverstreamingcall<tresponse> asyncserverstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request) { return calls.asyncserverstreamingcall(createcall(method, host, options), request); } public override asyncclientstreamingcall<trequest, tresponse> asyncclientstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options) { return calls.asyncclientstreamingcall(createcall(method, host, options)); } public override asyncduplexstreamingcall<trequest, tresponse> asyncduplexstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options) { return calls.asyncduplexstreamingcall(createcall(method, host, options)); } protected virtual callinvocationdetails<trequest, tresponse> createcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options) where trequest : class where tresponse : class { return new callinvocationdetails<trequest, tresponse>(channel, method, host, options); } }
这里可以传入上面创建的channel,在createcall方法里,则可以对调用方法进行控制
完整实现为
public class grpccallinvoker : callinvoker { grpcclientoptions _options; igrpcconnect _grpcconnect; public grpccallinvoker(igrpcconnect grpcconnect) { _options = grpcconnect.getoptions(); _grpcconnect = grpcconnect; } public override tresponse blockingunarycall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request) { return calls.blockingunarycall(createcall(method, host, options), request); } public override asyncunarycall<tresponse> asyncunarycall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request) { return calls.asyncunarycall(createcall(method, host, options), request); } public override asyncserverstreamingcall<tresponse> asyncserverstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request) { return calls.asyncserverstreamingcall(createcall(method, host, options), request); } public override asyncclientstreamingcall<trequest, tresponse> asyncclientstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options) { return calls.asyncclientstreamingcall(createcall(method, host, options)); } public override asyncduplexstreamingcall<trequest, tresponse> asyncduplexstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options) { return calls.asyncduplexstreamingcall(createcall(method, host, options)); } protected virtual callinvocationdetails<trequest, tresponse> createcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options) where trequest : class where tresponse : class { var methodname = $"{method.servicename}.{method.name}"; var key = methodname.substring(methodname.indexof(".") + 1).tolower(); var a = _options.methodpolicies.trygetvalue(key, out pollyattribute methodpollyattr); if (!a) { _options.methodpolicies.trygetvalue("", out methodpollyattr); } calloptions options2; //重写header if (options.headers != null) { options2 = options; } else { options2 = new calloptions(_grpcconnect.getmetadata(), options.deadline, options.cancellationtoken); } var pollydata = pollyextension.invoke(methodpollyattr, () => { var callres = new callinvocationdetails<trequest, tresponse>(_grpcconnect.getchannel(), method, host, options2); return new pollyextension.pollydata<callinvocationdetails<trequest, tresponse>>() { data = callres }; }, $"{methodname}"); var response = pollydata.data; if (!string.isnullorempty(pollydata.error)) { throw new exception(pollydata.error); } return response; //return new callinvocationdetails<trequest, tresponse>(channel.invoke(), method, host, options2); } }
其中传入了pollyattribute,由pollyextension.invoke来完成polly策略的实现,具体代码可在源码里找到
从上面代码可以看到,callinvoker里可以传入了igrpcconnect,由方法igrpcconnect.getchannel()获取channel
client实例化
.net framework实现为
public t getclient<t>() { var a = instancecache.trygetvalue(typeof(t), out object instance); if (!a) { var grpccallinvoker = new grpccallinvoker(this); instance = system.activator.createinstance(typeof(t), grpccallinvoker); instancecache.tryadd(typeof(t), instance); } return (t)instance; }
core则简单点,直接注入实现
var client = provider.getservice<greeter.greeterclient>();
服务端注册
和其它服务注册一样,填入正确的服务地址和名称就行了,但是在check里得改改,grpc的健康检查参数是不同的,并且在consul客户端里没有这个参数,得自已写
以下代码是我封装过的,可查看源码
public void configure(iapplicationbuilder app, iwebhostenvironment env) { if (env.isdevelopment()) { app.usedeveloperexceptionpage(); } app.userouting(); app.useendpoints(endpoints => { endpoints.mapgrpcservice<greeterservice>(); endpoints.mapgrpcservice<healthcheckservice>(); endpoints.mapget("/", async context => { await context.response.writeasync("communication with grpc endpoints must be made through a grpc client. to learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909"); }); }); //注册服务 var consulclient = new crl.core.consulclient.consul("http://localhost:8500"); var info = new crl.core.consulclient.serviceregistrationinfo { address = "127.0.0.1", name = "grpcserver", id = "grpcserver1", port = 50001, tags = new[] { "v1" }, check = new crl.core.consulclient.checkregistrationinfo() { grpc = "127.0.0.1:50001", interval = "10s", grpcusetls = false, deregistercriticalserviceafter = "90m" } }; consulclient.deregisterservice(info.id); var a = consulclient.registerservice(info); }
客户端完整封装代码为
core扩展方法,设置grpcclientoptions来配置consul地址和polly策略,直接注入了client类型
同时添加了统一header传递,使整个服务都能用一个头发送请求,不用再在方法后面跟参数
public static class grpcextensions { public static void addgrpcextend(this iservicecollection services, action<grpcclientoptions> setupaction, params assembly[] assemblies) { services.configure(setupaction); services.addsingleton<igrpcconnect, grpcconnect>(); services.addscoped<callinvoker, grpccallinvoker>(); foreach (var assembyle in assemblies) { var types = assembyle.gettypes(); foreach (var type in types) { if(typeof(clientbase).isassignablefrom(type)) { services.addsingleton(type); } } } } }
class program { static iserviceprovider provider; static program() { var builder = new configurationbuilder(); var configuration = builder.build(); var services = new servicecollection(); services.addsingleton<iconfiguration>(configuration); services.addoptions(); services.addgrpcextend(op => { op.host = "127.0.0.1"; op.port = 50001; op.useconsuldiscover("http://localhost:8500", "grpcserver");//使用consul服务发现 op.addpolicy("greeter.sayhello", new crl.core.remoting.pollyattribute() { retrycount = 3 });//定义方法polly策略 }, system.reflection.assembly.getexecutingassembly()); provider = services.buildserviceprovider(); } static void main(string[] args) { //设置允许不安全的http2支持 appcontext.setswitch("system.net.http.socketshttphandler.http2unencryptedsupport", true); var grpcconnect = provider.getservice<igrpcconnect>(); //认证 //https://www.cnblogs.com/stulzq/p/11897628.html var token = ""; var headers = new metadata { { "authorization", $"bearer {token}" } }; grpcconnect.setmetadata(headers); label1: var client = provider.getservice<greeter.greeterclient>(); var reply = client.sayhello( new hellorequest { name = "test" }); console.writeline("greeter 服务返回数据: " + reply.message); console.readline(); goto label1; } }
运行服务端,结果为
可以看到服务注册成功,状态检查也成功
运行客户端
客户端正确调用并返回了结果
项目源码:
https://github.com/crl2020/crl.netstandard/tree/master/grpc
除了grpc实现了服务发现和polly策略,本框架对api代理,动态api,rpc也一起实现了
api代理测试
https://github.com/crl2020/crl.netstandard/tree/master/dynamicwebapi/apiproxytest
动态api测试
https://github.com/crl2020/crl.netstandard/tree/master/dynamicwebapi/dynamicwebapiclient
rcp测试
https://github.com/crl2020/crl.netstandard/tree/master/rpc/rpcclient
ps:被上家公司坑了一把,又碰上疫情,最近一年简值了!-_ 求推荐郑州地区工作,加微信hubroxx