【.NET Core项目实战-统一认证平台】第二章网关篇-重构Ocelot来满足需求
【.net core项目实战-统一认证平台】开篇及目录索引
这篇文章,我们将从ocelot的中间件源码分析,目前ocelot已经实现那些功能,还有那些功能在我们实际项目中暂时还未实现,如果我们要使用这些功能,应该如何改造等方面来说明。
一、ocelot源码解读
在使用一个组件前,最好我们要了解其中的一些原理,否则在使用过程中遇到问题,也无从下手,今天我带着大家一起来解读下ocelot源码,并梳理出具体实现的原理和流程,便于我们根据需求扩展应用。
ocelot源码地址[https://github.com/threemammals/ocelot],
ocelot文档地址[]
查看.netcore
相关中间件源码,我们优先找到入口方法,比如ocelot中间件使用的是app.useocelot()
,我们直接搜索userocelot,我们会找到ocelotmiddlewareextensions
方法,里面是ocelot中间件实际运行的方式和流程。
然后继续顺藤摸瓜,查看详细的实现,我们会发现如下代码
public static async task<iapplicationbuilder> useocelot(this iapplicationbuilder builder, ocelotpipelineconfiguration pipelineconfiguration) { //创建配置信息 var configuration = await createconfiguration(builder); //监听配置信息 configurediagnosticlistener(builder); //创建执行管道 return createocelotpipeline(builder, pipelineconfiguration); }
然后我们继续跟踪到创建管道方法,可以发现ocelot的执行流程已经被找到,现在问题变的简单了,直接查看
private static iapplicationbuilder createocelotpipeline(iapplicationbuilder builder, ocelotpipelineconfiguration pipelineconfiguration) { var pipelinebuilder = new ocelotpipelinebuilder(builder.applicationservices); //详细创建的管道顺序在此方法 pipelinebuilder.buildocelotpipeline(pipelineconfiguration); var firstdelegate = pipelinebuilder.build(); /* inject first delegate into first piece of asp.net middleware..maybe not like this then because we are updating the http context in ocelot it comes out correct for rest of asp.net.. */ builder.properties["analysis.nextmiddlewarename"] = "transitiontoocelotmiddleware"; builder.use(async (context, task) => { var downstreamcontext = new downstreamcontext(context); await firstdelegate.invoke(downstreamcontext); }); return builder; }
管道创建流程及实现,会不会感觉到摸到大动脉了,核心的功能及原理基本找到了,那以后动手术也就可以避开一些坑了,我们可以对着这个执行顺序,再查看详细的源码,按照这个执行顺序查看源码,您就会发现整个思路非常清晰,每一步的实现一目了然。为了更直观的介绍源码的解读方式,这里我们就拿我们后续要操刀的中间件来讲解下中间件的具体实现。
public static class ocelotpipelineextensions { public static ocelotrequestdelegate buildocelotpipeline(this iocelotpipelinebuilder builder, ocelotpipelineconfiguration pipelineconfiguration) { // this is registered to catch any global exceptions that are not handled // it also sets the request id if anything is set globally builder.useexceptionhandlermiddleware(); // if the request is for websockets upgrade we fork into a different pipeline builder.mapwhen(context => context.httpcontext.websockets.iswebsocketrequest, app => { app.usedownstreamroutefindermiddleware(); app.usedownstreamrequestinitialiser(); app.useloadbalancingmiddleware(); app.usedownstreamurlcreatormiddleware(); app.usewebsocketsproxymiddleware(); }); // allow the user to respond with absolutely anything they want. builder.useifnotnull(pipelineconfiguration.preerrorrespondermiddleware); // this is registered first so it can catch any errors and issue an appropriate response builder.userespondermiddleware(); // then we get the downstream route information builder.usedownstreamroutefindermiddleware(); // this security module, ip whitelist blacklist, extended security mechanism builder.usesecuritymiddleware(); //expand other branch pipes if (pipelineconfiguration.mapwhenocelotpipeline != null) { foreach (var pipeline in pipelineconfiguration.mapwhenocelotpipeline) { builder.mapwhen(pipeline); } } // now we have the ds route we can transform headers and stuff? builder.usehttpheaderstransformationmiddleware(); // initialises downstream request builder.usedownstreamrequestinitialiser(); // we check whether the request is ratelimit, and if there is no continue processing builder.useratelimiting(); // this adds or updates the request id (initally we try and set this based on global config in the error handling middleware) // if anything was set at global level and we have a different setting at re route level the global stuff will be overwritten // this means you can get a scenario where you have a different request id from the first piece of middleware to the request id middleware. builder.userequestidmiddleware(); // allow pre authentication logic. the idea being people might want to run something custom before what is built in. builder.useifnotnull(pipelineconfiguration.preauthenticationmiddleware); // now we know where the client is going to go we can authenticate them. // we allow the ocelot middleware to be overriden by whatever the // user wants if (pipelineconfiguration.authenticationmiddleware == null) { builder.useauthenticationmiddleware(); } else { builder.use(pipelineconfiguration.authenticationmiddleware); } // the next thing we do is look at any claims transforms in case this is important for authorisation builder.useclaimstoclaimsmiddleware(); // allow pre authorisation logic. the idea being people might want to run something custom before what is built in. builder.useifnotnull(pipelineconfiguration.preauthorisationmiddleware); // now we have authenticated and done any claims transformation we // can authorise the request // we allow the ocelot middleware to be overriden by whatever the // user wants if (pipelineconfiguration.authorisationmiddleware == null) {//使用自定义认证,移除默认的认证方式 //builder.useauthorisationmiddleware(); } else { builder.use(pipelineconfiguration.authorisationmiddleware); } // now we can run the claims to headers transformation middleware builder.useclaimstoheadersmiddleware(); // allow the user to implement their own query string manipulation logic builder.useifnotnull(pipelineconfiguration.prequerystringbuildermiddleware); // now we can run any claims to query string transformation middleware builder.useclaimstoquerystringmiddleware(); // get the load balancer for this request builder.useloadbalancingmiddleware(); // this takes the downstream route we retrieved earlier and replaces any placeholders with the variables that should be used builder.usedownstreamurlcreatormiddleware(); // not sure if this is the best place for this but we use the downstream url // as the basis for our cache key. builder.useoutputcachemiddleware(); //we fire off the request and set the response on the scoped data repo builder.usehttprequestermiddleware(); return builder.build(); } private static void useifnotnull(this iocelotpipelinebuilder builder, func<downstreamcontext, func<task>, task> middleware) { if (middleware != null) { builder.use(middleware); } } }
限流中间件实现解析
实现代码如下builder.useratelimiting();
,我们转到定义,得到如下代码,详细的实现逻辑在clientratelimitmiddleware
方法里,继续转定义到这个方法,我把方法里用到的内容注释了下。
public static class ratelimitmiddlewareextensions { public static iocelotpipelinebuilder useratelimiting(this iocelotpipelinebuilder builder) { return builder.usemiddleware<clientratelimitmiddleware>(); } } public class clientratelimitmiddleware : ocelotmiddleware { private readonly ocelotrequestdelegate _next; private readonly iratelimitcounterhandler _counterhandler; private readonly clientratelimitprocessor _processor; public clientratelimitmiddleware(ocelotrequestdelegate next, iocelotloggerfactory loggerfactory, iratelimitcounterhandler counterhandler) :base(loggerfactory.createlogger<clientratelimitmiddleware>()) { _next = next; _counterhandler = counterhandler; _processor = new clientratelimitprocessor(counterhandler); } //熟悉的tnvoke方法,所有的逻辑都在此方法里。 public async task invoke(downstreamcontext context) { var options = context.downstreamreroute.ratelimitoptions; // 校验是否启用限流配置 if (!context.downstreamreroute.enableendpointendpointratelimiting) {//未启用直接进入下一个中间件 logger.loginformation($"endpointratelimiting is not enabled for {context.downstreamreroute.downstreampathtemplate.value}"); await _next.invoke(context); return; } // 获取配置的校验客户端的方式 var identity = setidentity(context.httpcontext, options); // 校验是否为白名单 if (iswhitelisted(identity, options)) {//白名单直接放行 logger.loginformation($"{context.downstreamreroute.downstreampathtemplate.value} is white listed from rate limiting"); await _next.invoke(context); return; } var rule = options.ratelimitrule; if (rule.limit > 0) {//限流数是否大于0 // 获取当前客户端请求情况,这里需要注意_processor是从哪里注入的,后续重 var counter = _processor.processrequest(identity, options); // 校验请求数是否大于限流数 if (counter.totalrequests > rule.limit) { //获取下次有效请求的时间,就是避免每次请求,都校验一次 var retryafter = _processor.retryafterfrom(counter.timestamp, rule); // 写入日志 logblockedrequest(context.httpcontext, identity, counter, rule, context.downstreamreroute); var retrystring = retryafter.tostring(system.globalization.cultureinfo.invariantculture); // 抛出超出限流异常并把下次可请求时间写入header里。 await returnquotaexceededresponse(context.httpcontext, options, retrystring); return; } } //如果启用了限流头部 if (!options.disableratelimitheaders) { var headers = _processor.getratelimitheaders(context.httpcontext, identity, options); context.httpcontext.response.onstarting(setratelimitheaders, state: headers); } //进入下一个中间件 await _next.invoke(context); } public virtual clientrequestidentity setidentity(httpcontext httpcontext, ratelimitoptions option) { var clientid = "client"; if (httpcontext.request.headers.keys.contains(option.clientidheader)) { clientid = httpcontext.request.headers[option.clientidheader].first(); } return new clientrequestidentity( clientid, httpcontext.request.path.tostring().tolowerinvariant(), httpcontext.request.method.tolowerinvariant() ); } public bool iswhitelisted(clientrequestidentity requestidentity, ratelimitoptions option) { if (option.clientwhitelist.contains(requestidentity.clientid)) { return true; } return false; } public virtual void logblockedrequest(httpcontext httpcontext, clientrequestidentity identity, ratelimitcounter counter, ratelimitrule rule, downstreamreroute downstreamreroute) { logger.loginformation( $"request {identity.httpverb}:{identity.path} from clientid {identity.clientid} has been blocked, quota {rule.limit}/{rule.period} exceeded by {counter.totalrequests}. blocked by rule { downstreamreroute.upstreampathtemplate.originalvalue }, traceidentifier {httpcontext.traceidentifier}."); } public virtual task returnquotaexceededresponse(httpcontext httpcontext, ratelimitoptions option, string retryafter) { var message = string.isnullorempty(option.quotaexceededmessage) ? $"api calls quota exceeded! maximum admitted {option.ratelimitrule.limit} per {option.ratelimitrule.period}." : option.quotaexceededmessage; if (!option.disableratelimitheaders) { httpcontext.response.headers["retry-after"] = retryafter; } httpcontext.response.statuscode = option.httpstatuscode; return httpcontext.response.writeasync(message); } private task setratelimitheaders(object ratelimitheaders) { var headers = (ratelimitheaders)ratelimitheaders; headers.context.response.headers["x-rate-limit-limit"] = headers.limit; headers.context.response.headers["x-rate-limit-remaining"] = headers.remaining; headers.context.response.headers["x-rate-limit-reset"] = headers.reset; return task.completedtask; } }
通过源码解析,发现实现一个限流还是很简单的吗!再进一步解析,iratelimitcounterhandler
clientratelimitprocessor里的相关接口
又是怎么实现的呢?这时候我们就需要了解下.netcore 的运行原理,其中configureservices
方法实现了依赖注入(di)的配置。这时候我们看下ocelot
是在哪里进行注入的呢?
services.addocelot()
是不是印象深刻呢?原来所有的注入信息都写在这里,那么问题简单了,ctrl+f
查找addocelot
方法,马上就能定位到servicecollectionextensions
方法,然后再转到定义ocelotbuilder
public static class servicecollectionextensions { public static iocelotbuilder addocelot(this iservicecollection services) { var service = services.first(x => x.servicetype == typeof(iconfiguration)); var configuration = (iconfiguration)service.implementationinstance; return new ocelotbuilder(services, configuration); } public static iocelotbuilder addocelot(this iservicecollection services, iconfiguration configuration) { return new ocelotbuilder(services, configuration); } }
又摸到大动脉啦,现在问题迎刃而解,原来所有的注入都写在这里,从这里可以找下我们熟悉的几个接口注入。
public ocelotbuilder(iservicecollection services, iconfiguration configurationroot) { configuration = configurationroot; services = services; services.configure<fileconfiguration>(configurationroot); services.tryaddsingleton<iocelotcache<fileconfiguration>, inmemorycache<fileconfiguration>>(); services.tryaddsingleton<iocelotcache<cachedresponse>, inmemorycache<cachedresponse>>(); services.tryaddsingleton<ihttpresponseheaderreplacer, httpresponseheaderreplacer>(); services.tryaddsingleton<ihttpcontextrequestheaderreplacer, httpcontextrequestheaderreplacer>(); services.tryaddsingleton<iheaderfindandreplacecreator, headerfindandreplacecreator>(); services.tryaddsingleton<iinternalconfigurationcreator, fileinternalconfigurationcreator>(); services.tryaddsingleton<iinternalconfigurationrepository, inmemoryinternalconfigurationrepository>(); services.tryaddsingleton<iconfigurationvalidator, fileconfigurationfluentvalidator>(); services.tryaddsingleton<hostandportvalidator>(); services.tryaddsingleton<ireroutescreator, reroutescreator>(); services.tryaddsingleton<iaggregatescreator, aggregatescreator>(); services.tryaddsingleton<ireroutekeycreator, reroutekeycreator>(); services.tryaddsingleton<iconfigurationcreator, configurationcreator>(); services.tryaddsingleton<idynamicscreator, dynamicscreator>(); services.tryaddsingleton<iloadbalanceroptionscreator, loadbalanceroptionscreator>(); services.tryaddsingleton<reroutefluentvalidator>(); services.tryaddsingleton<fileglobalconfigurationfluentvalidator>(); services.tryaddsingleton<fileqosoptionsfluentvalidator>(); services.tryaddsingleton<iclaimstothingcreator, claimstothingcreator>(); services.tryaddsingleton<iauthenticationoptionscreator, authenticationoptionscreator>(); services.tryaddsingleton<iupstreamtemplatepatterncreator, upstreamtemplatepatterncreator>(); services.tryaddsingleton<irequestidkeycreator, requestidkeycreator>(); services.tryaddsingleton<iserviceproviderconfigurationcreator,serviceproviderconfigurationcreator>(); services.tryaddsingleton<iqosoptionscreator, qosoptionscreator>(); services.tryaddsingleton<irerouteoptionscreator, rerouteoptionscreator>(); services.tryaddsingleton<iratelimitoptionscreator, ratelimitoptionscreator>(); services.tryaddsingleton<ibaseurlfinder, baseurlfinder>(); services.tryaddsingleton<iregioncreator, regioncreator>(); services.tryaddsingleton<ifileconfigurationrepository, diskfileconfigurationrepository>(); services.tryaddsingleton<ifileconfigurationsetter, fileandinternalconfigurationsetter>(); services.tryaddsingleton<iservicediscoveryproviderfactory, servicediscoveryproviderfactory>(); services.tryaddsingleton<iloadbalancerfactory, loadbalancerfactory>(); services.tryaddsingleton<iloadbalancerhouse, loadbalancerhouse>(); services.tryaddsingleton<iocelotloggerfactory, aspdotnetloggerfactory>(); services.tryaddsingleton<iremoveoutputheaders, removeoutputheaders>(); services.tryaddsingleton<iclaimtothingconfigurationparser, claimtothingconfigurationparser>(); services.tryaddsingleton<iclaimsauthoriser, claimsauthoriser>(); services.tryaddsingleton<iscopesauthoriser, scopesauthoriser>(); services.tryaddsingleton<iaddclaimstorequest, addclaimstorequest>(); services.tryaddsingleton<iaddheaderstorequest, addheaderstorequest>(); services.tryaddsingleton<iaddqueriestorequest, addqueriestorequest>(); services.tryaddsingleton<iclaimsparser, claimsparser>(); services.tryaddsingleton<iurlpathtourltemplatematcher, regexurlmatcher>(); services.tryaddsingleton<iplaceholdernameandvaluefinder, urlpathplaceholdernameandvaluefinder>(); services.tryaddsingleton<idownstreampathplaceholderreplacer, downstreamtemplatepathplaceholderreplacer>(); services.tryaddsingleton<idownstreamrouteprovider, downstreamroutefinder>(); services.tryaddsingleton<idownstreamrouteprovider, downstreamroutecreator>(); services.tryaddsingleton<idownstreamrouteproviderfactory, downstreamrouteproviderfactory>(); services.tryaddsingleton<ihttprequester, httpclienthttprequester>(); services.tryaddsingleton<ihttpresponder, httpcontextresponder>(); services.tryaddsingleton<ierrorstohttpstatuscodemapper, errorstohttpstatuscodemapper>(); services.tryaddsingleton<iratelimitcounterhandler, memorycacheratelimitcounterhandler>(); services.tryaddsingleton<ihttpclientcache, memoryhttpclientcache>(); services.tryaddsingleton<irequestmapper, requestmapper>(); services.tryaddsingleton<ihttphandleroptionscreator, httphandleroptionscreator>(); services.tryaddsingleton<idownstreamaddressescreator, downstreamaddressescreator>(); services.tryaddsingleton<idelegatinghandlerhandlerfactory, delegatinghandlerhandlerfactory>(); services.tryaddsingleton<ihttprequester, httpclienthttprequester>(); // see this for why we register this as singleton http://*.com/questions/37371264/invalidoperationexception-unable-to-resolve-service-for-type-microsoft-aspnetc // could maybe use a scoped data repository services.tryaddsingleton<ihttpcontextaccessor, httpcontextaccessor>(); services.tryaddsingleton<irequestscopeddatarepository, httpdatarepository>(); services.addmemorycache(); services.tryaddsingleton<ocelotdiagnosticlistener>(); services.tryaddsingleton<imultiplexer, multiplexer>(); services.tryaddsingleton<iresponseaggregator, simplejsonresponseaggregator>(); services.tryaddsingleton<itracinghandlerfactory, tracinghandlerfactory>(); services.tryaddsingleton<ifileconfigurationpolleroptions, inmemoryfileconfigurationpolleroptions>(); services.tryaddsingleton<iaddheaderstoresponse, addheaderstoresponse>(); services.tryaddsingleton<iplaceholders, placeholders>(); services.tryaddsingleton<iresponseaggregatorfactory, inmemoryresponseaggregatorfactory>(); services.tryaddsingleton<idefinedaggregatorprovider, servicelocatordefinedaggregatorprovider>(); services.tryaddsingleton<idownstreamrequestcreator, downstreamrequestcreator>(); services.tryaddsingleton<iframeworkdescription, frameworkdescription>(); services.tryaddsingleton<iqosfactory, qosfactory>(); services.tryaddsingleton<iexceptiontoerrormapper, httpexeptiontoerrormapper>(); //add security this.addsecurity(); //add asp.net services.. var assembly = typeof(fileconfigurationcontroller).gettypeinfo().assembly; services.addmvccore() .addapplicationpart(assembly) .addcontrollersasservices() .addauthorization() .addjsonformatters(); services.addlogging(); services.addmiddlewareanalysis(); services.addwebencoders(); }
至此ocelot
源码解析就到这里了,其他的具体实现代码就根据流程一个一个查看即可,这里就不详细讲解了,因为我们已经掌握整个ocelot代码的运行原理和实现方式及流程,项目里其他的一大堆的代码都是围绕这个流程去一步一步实现的。
有没有感觉添加一个中间件不是很复杂呢,是不是都跃跃欲试,准备尝试开发自己的自定义中间件啦,本篇就不介绍中间件的具体开发流程了,后续实战中会包含部分项目中需要用到的中间件,到时候会详细讲解如何规划和开发一个满足自己项目需求的中间件。
二、结合项目梳理功能
在完整学习完ocelot文档和源码后,我们基本掌握了ocelot目前已经实现的功能,再结合我们实际项目需求,我们梳理下还有哪些功能可能需要自己扩展实现。
项目设计网关基本需求包括路由、认证、授权、限流、缓存,仔细学习文档和源码后发现功能都已经存在,那是不是我们就可以直接拿来使用呢?这时候我们需要拿出一些复杂业务场景来对号入座,看能否实现复杂场景的一些应用。
1、授权
能否为每一个客户端设置独立的访问权限,如果客户端a可以访问服务a、服务b,客户端b只能访问服务a,从网关层面直接授权,不满足需求不路由到具体服务。从文档和代码分析后发现暂时未实现。
2、限流
能否为每一个客户端设置不能限流规则,例如客户端a为我们内容应用,我希望对服务a不启用限流,客户端b为第三方接入应用,我需要b访问服务a访问进行单独限流(30次/分钟),看能否通过配置实现自定义限流。从文档和代码分析后发现暂时未实现。
3、缓存
通过代码发现目前缓存实现的只是dictionary方式实现的缓存,不能实现分布式结构的应用。
通过分析我们发现列举的5个基本需求,尽然有3个在我们实际项目应用中可能会存在问题,如果不解决这些问题,很难直接拿这个完美的网关项目应用到正式项目,所以我们到通过扩展ocelot方法来实现我们的目的。
如何扩展呢
为了满足我们项目应用的需要,我们需要为每一个路由进行单独设置,如果还采用配置文件的方式,肯定无法满足需求,且后续网关动态增加路由、授权、限流等无法控制,所以我们需要把网关配置信息从配置文件中移到数据库中,由数据库中的路由表、限流表、授权表等方式记录当前网关的应用,且后续扩展直接在数据库中增加或减少相关配置,然后动态更新网关配置实现网关的高可用。
想一想是不是有点小激动,原来只要稍微改造下宝骏瞬间变宝马,那接下来的课程就是网关改造之旅,我会从设计、思想、编码等方面讲解下如何实现我们的第一辆宝马。
本系列文章我也是边想边写边实现,如果发现中间有任何描述或实现不当的地方,也请各位大神批评指正,我会第一时间整理并修正,避免让后续学习的人走弯路。
上一篇: Redis数据类型基本操作
推荐阅读
-
【.NET Core项目实战-统一认证平台】第二章网关篇-重构Ocelot来满足需求
-
【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务
-
【.NET Core项目实战-统一认证平台】第三章 网关篇-数据库存储配置(1)
-
【.NET Core项目实战-统一认证平台】第六章 网关篇-自定义客户端授权
-
【.NET Core项目实战-统一认证平台】第十五章 网关篇-使用二级缓存提升性能
-
【.NET Core项目实战-统一认证平台】第五章 网关篇-自定义缓存Redis
-
【.NET Core项目实战-统一认证平台】第七章 网关篇-自定义客户端限流
-
【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务
-
【.NET Core项目实战-统一认证平台】第二章网关篇-重构Ocelot来满足需求
-
【.NET Core项目实战-统一认证平台】第六章 网关篇-自定义客户端授权