欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

扩展gRPC支持consul服务发现和Polly策略

程序员文章站 2022-05-23 18:50:41
gRPC由于需要用工具生成代码实现,可开发性不是很高,在扩展这方面不是很友好 最近研究了下,进行了扩展,不需要额外的工具生成,直接使用默认Grpc.Tools生成的代理类即可 相关源码在文章底部 客户端目标: 能配置consul地址和服务名称,在调用client时能正确请求到真实的服务地址 在调用方 ......

grpc由于需要用工具生成代码实现,可开发性不是很高,在扩展这方面不是很友好

最近研究了下,进行了扩展,不需要额外的工具生成,直接使用默认grpc.tools生成的代理类即可

相关源码在文章底部

客户端目标:

  • 能配置consul地址和服务名称,在调用client时能正确请求到真实的服务地址
  • 在调用方法时,能使用polly策略重试,超时,和熔断

查看grpc生成的代码,可以看到client实例化有有两个构造方法,以测试为例

      /// <summary>creates a new client for greeter</summary>
      /// <param name="channel">the channel to use to make remote calls.</param>
      public greeterclient(grpc::channelbase channel) : base(channel)
      {
      }
      /// <summary>creates a new client for greeter that uses a custom <c>callinvoker</c>.</summary>
      /// <param name="callinvoker">the callinvoker to use to make remote calls.</param>
      public greeterclient(grpc::callinvoker callinvoker) : base(callinvoker)
      {
      }

1.可传入一个channelbase实例化

2.可传入一个callinvoker实例化

channel可实现为

channel createchannel(string address)
        {
            var channeloptions = new list<channeloption>()
                    {
                        new channeloption(channeloptions.maxreceivemessagelength, int.maxvalue),
                        new channeloption(channeloptions.maxsendmessagelength, int.maxvalue),
                    };
            var channel = new channel(address, channelcredentials.insecure, channeloptions);
            return channel;
        }

在这里,可以从consul地址按服务名获取真实的服务地址,生成channel

 

callinvoker为一个抽象类,若要对方法执行过程干预,则需要重写这个方法,大致实现为

public class grpccallinvoker : callinvoker
    {
        public readonly channel channel;
        public grpccallinvoker(channel channel)
        {
            channel = grpcpreconditions.checknotnull(channel); 
        }

        public override tresponse blockingunarycall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request)
        {
            return calls.blockingunarycall(createcall(method, host, options), request);
        }

        public override asyncunarycall<tresponse> asyncunarycall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request)
        {
            return calls.asyncunarycall(createcall(method, host, options), request);
        }

        public override asyncserverstreamingcall<tresponse> asyncserverstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request)
        {
            return calls.asyncserverstreamingcall(createcall(method, host, options), request);
        }

        public override asyncclientstreamingcall<trequest, tresponse> asyncclientstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options)
        {
            return calls.asyncclientstreamingcall(createcall(method, host, options));
        }

        public override asyncduplexstreamingcall<trequest, tresponse> asyncduplexstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options)
        {
            return calls.asyncduplexstreamingcall(createcall(method, host, options));
        }

        protected virtual callinvocationdetails<trequest, tresponse> createcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options)
            where trequest : class 
            where tresponse : class
        {
            return new callinvocationdetails<trequest, tresponse>(channel, method, host, options);
        }
    }

这里可以传入上面创建的channel,在createcall方法里,则可以对调用方法进行控制

完整实现为

public class grpccallinvoker : callinvoker
    {
        grpcclientoptions _options;
        igrpcconnect _grpcconnect;
        public grpccallinvoker(igrpcconnect grpcconnect)
        {
            _options = grpcconnect.getoptions();
            _grpcconnect = grpcconnect;
        }

        public override tresponse blockingunarycall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request)
        {
            return calls.blockingunarycall(createcall(method, host, options), request);
        }

        public override asyncunarycall<tresponse> asyncunarycall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request)
        {
            return calls.asyncunarycall(createcall(method, host, options), request);
        }

        public override asyncserverstreamingcall<tresponse> asyncserverstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request)
        {
            return calls.asyncserverstreamingcall(createcall(method, host, options), request);
        }

        public override asyncclientstreamingcall<trequest, tresponse> asyncclientstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options)
        {
            return calls.asyncclientstreamingcall(createcall(method, host, options));
        }

        public override asyncduplexstreamingcall<trequest, tresponse> asyncduplexstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options)
        {
            return calls.asyncduplexstreamingcall(createcall(method, host, options));
        }

        protected virtual callinvocationdetails<trequest, tresponse> createcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options)
            where trequest : class
            where tresponse : class
        {
            var methodname = $"{method.servicename}.{method.name}";
            var key = methodname.substring(methodname.indexof(".") + 1).tolower();
            var a = _options.methodpolicies.trygetvalue(key, out pollyattribute methodpollyattr);
            if (!a)
            {
                _options.methodpolicies.trygetvalue("", out methodpollyattr);
            }
            calloptions options2;
            //重写header
            if (options.headers != null)
            {
                options2 = options;
            }
            else
            {
                options2 = new calloptions(_grpcconnect.getmetadata(), options.deadline, options.cancellationtoken);
            }

            var pollydata = pollyextension.invoke(methodpollyattr, () =>
            {
                var callres = new callinvocationdetails<trequest, tresponse>(_grpcconnect.getchannel(), method, host, options2);
                return new pollyextension.pollydata<callinvocationdetails<trequest, tresponse>>() { data = callres };
            }, $"{methodname}");
            var response = pollydata.data;
            if (!string.isnullorempty(pollydata.error))
            {
                throw new exception(pollydata.error);
            }
            return response;
            //return new callinvocationdetails<trequest, tresponse>(channel.invoke(), method, host, options2);
        }
    }

 

其中传入了pollyattribute,由pollyextension.invoke来完成polly策略的实现,具体代码可在源码里找到

从上面代码可以看到,callinvoker里可以传入了igrpcconnect,由方法igrpcconnect.getchannel()获取channel

client实例化

.net framework实现为

     public t getclient<t>()
        {
            var a = instancecache.trygetvalue(typeof(t), out object instance);
            if (!a)
            {
                var grpccallinvoker = new grpccallinvoker(this);
                instance = system.activator.createinstance(typeof(t), grpccallinvoker);
                instancecache.tryadd(typeof(t), instance);
            }
            return (t)instance;
        }

core则简单点,直接注入实现

var client = provider.getservice<greeter.greeterclient>();

服务端注册

和其它服务注册一样,填入正确的服务地址和名称就行了,但是在check里得改改,grpc的健康检查参数是不同的,并且在consul客户端里没有这个参数,得自已写

以下代码是我封装过的,可查看源码

public void configure(iapplicationbuilder app, iwebhostenvironment env)
        {
            if (env.isdevelopment())
            {
                app.usedeveloperexceptionpage();
            }

            app.userouting();

            app.useendpoints(endpoints =>
            {
                endpoints.mapgrpcservice<greeterservice>();
                endpoints.mapgrpcservice<healthcheckservice>();
                endpoints.mapget("/", async context =>
                {
                    await context.response.writeasync("communication with grpc endpoints must be made through a grpc client. to learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
                });
            });

            //注册服务
            var consulclient = new crl.core.consulclient.consul("http://localhost:8500");
            var info = new crl.core.consulclient.serviceregistrationinfo
            {
                address = "127.0.0.1",
                name = "grpcserver",
                id = "grpcserver1",
                port = 50001,
                tags = new[] { "v1" },
                check = new crl.core.consulclient.checkregistrationinfo()
                {
                    grpc = "127.0.0.1:50001",
                    interval = "10s",
                    grpcusetls = false,
                    deregistercriticalserviceafter = "90m"
                }
            };
            consulclient.deregisterservice(info.id);
            var a = consulclient.registerservice(info);

        }

 客户端完整封装代码为

core扩展方法,设置grpcclientoptions来配置consul地址和polly策略,直接注入了client类型

同时添加了统一header传递,使整个服务都能用一个头发送请求,不用再在方法后面跟参数

public static class grpcextensions
    {
        public static void addgrpcextend(this iservicecollection services, action<grpcclientoptions> setupaction, params assembly[] assemblies)
        {
            services.configure(setupaction);
            services.addsingleton<igrpcconnect, grpcconnect>();
            services.addscoped<callinvoker, grpccallinvoker>();
            foreach (var assembyle in assemblies)
            {
                var types = assembyle.gettypes();
                foreach (var type in types)
                {
                    if(typeof(clientbase).isassignablefrom(type))
                    {
                        services.addsingleton(type);
                    }
                }
            }
        }
    }
 class program
    {
        static iserviceprovider provider;
        static program()
        {
            var builder = new configurationbuilder();

            var configuration = builder.build();

            var services = new servicecollection();
            services.addsingleton<iconfiguration>(configuration);
            services.addoptions();

            services.addgrpcextend(op =>
            {
                op.host = "127.0.0.1";
                op.port = 50001;
                op.useconsuldiscover("http://localhost:8500", "grpcserver");//使用consul服务发现
                op.addpolicy("greeter.sayhello", new crl.core.remoting.pollyattribute() { retrycount = 3 });//定义方法polly策略
            }, system.reflection.assembly.getexecutingassembly());

            provider = services.buildserviceprovider();
        }


        static void main(string[] args)
        {            
            //设置允许不安全的http2支持
            appcontext.setswitch("system.net.http.socketshttphandler.http2unencryptedsupport", true);

            var grpcconnect = provider.getservice<igrpcconnect>();
            //认证
            //https://www.cnblogs.com/stulzq/p/11897628.html
            var token = "";
            var headers = new metadata { { "authorization", $"bearer {token}" } };
            grpcconnect.setmetadata(headers);


        label1:
            var client = provider.getservice<greeter.greeterclient>();
            var reply = client.sayhello(
                new hellorequest { name = "test" });
            console.writeline("greeter 服务返回数据: " + reply.message);

            console.readline();
            goto label1;
        }
    }

 

运行服务端,结果为

扩展gRPC支持consul服务发现和Polly策略

可以看到服务注册成功,状态检查也成功

运行客户端

扩展gRPC支持consul服务发现和Polly策略

客户端正确调用并返回了结果

项目源码:

https://github.com/crl2020/crl.netstandard/tree/master/grpc

除了grpc实现了服务发现和polly策略,本框架对api代理,动态api,rpc也一起实现了

api代理测试
https://github.com/crl2020/crl.netstandard/tree/master/dynamicwebapi/apiproxytest

动态api测试

https://github.com/crl2020/crl.netstandard/tree/master/dynamicwebapi/dynamicwebapiclient

rcp测试

https://github.com/crl2020/crl.netstandard/tree/master/rpc/rpcclient

ps:被上家公司坑了一把,又碰上疫情,最近一年简值了!-_ 求推荐郑州地区工作,加微信hubroxx