欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

【.NET Core项目实战-统一认证平台】第二章网关篇-重构Ocelot来满足需求

程序员文章站 2022-05-16 14:15:50
" 【.NET Core项目实战 统一认证平台】开篇及目录索引 " 这篇文章,我们将从Ocelot的中间件源码分析,目前Ocelot已经实现那些功能,还有那些功能在我们实际项目中暂时还未实现,如果我们要使用这些功能,应该如何改造等方面来说明。 一、Ocelot源码解读 在使用一个组件前,最好我们要了 ......

【.net core项目实战-统一认证平台】开篇及目录索引

这篇文章,我们将从ocelot的中间件源码分析,目前ocelot已经实现那些功能,还有那些功能在我们实际项目中暂时还未实现,如果我们要使用这些功能,应该如何改造等方面来说明。

一、ocelot源码解读

在使用一个组件前,最好我们要了解其中的一些原理,否则在使用过程中遇到问题,也无从下手,今天我带着大家一起来解读下ocelot源码,并梳理出具体实现的原理和流程,便于我们根据需求扩展应用。
ocelot源码地址[https://github.com/threemammals/ocelot],
ocelot文档地址[]

查看.netcore相关中间件源码,我们优先找到入口方法,比如ocelot中间件使用的是app.useocelot(),我们直接搜索userocelot,我们会找到ocelotmiddlewareextensions方法,里面是ocelot中间件实际运行的方式和流程。
【.NET Core项目实战-统一认证平台】第二章网关篇-重构Ocelot来满足需求

然后继续顺藤摸瓜,查看详细的实现,我们会发现如下代码

public static async task<iapplicationbuilder> useocelot(this iapplicationbuilder builder, ocelotpipelineconfiguration pipelineconfiguration)
        {   
            //创建配置信息
            var configuration = await createconfiguration(builder);
            //监听配置信息
            configurediagnosticlistener(builder);
            //创建执行管道
            return createocelotpipeline(builder, pipelineconfiguration);
        }

然后我们继续跟踪到创建管道方法,可以发现ocelot的执行流程已经被找到,现在问题变的简单了,直接查看

private static iapplicationbuilder createocelotpipeline(iapplicationbuilder builder, ocelotpipelineconfiguration pipelineconfiguration)
{
    var pipelinebuilder = new ocelotpipelinebuilder(builder.applicationservices);
    //详细创建的管道顺序在此方法
    pipelinebuilder.buildocelotpipeline(pipelineconfiguration);

    var firstdelegate = pipelinebuilder.build();

    /*
            inject first delegate into first piece of asp.net middleware..maybe not like this
            then because we are updating the http context in ocelot it comes out correct for
            rest of asp.net..
            */

    builder.properties["analysis.nextmiddlewarename"] = "transitiontoocelotmiddleware";

    builder.use(async (context, task) =>
                {
                    var downstreamcontext = new downstreamcontext(context);
                    await firstdelegate.invoke(downstreamcontext);
                });

    return builder;
}

管道创建流程及实现,会不会感觉到摸到大动脉了,核心的功能及原理基本找到了,那以后动手术也就可以避开一些坑了,我们可以对着这个执行顺序,再查看详细的源码,按照这个执行顺序查看源码,您就会发现整个思路非常清晰,每一步的实现一目了然。为了更直观的介绍源码的解读方式,这里我们就拿我们后续要操刀的中间件来讲解下中间件的具体实现。

public static class ocelotpipelineextensions
    {
        public static ocelotrequestdelegate buildocelotpipeline(this iocelotpipelinebuilder builder,
            ocelotpipelineconfiguration pipelineconfiguration)
        {
            // this is registered to catch any global exceptions that are not handled
            // it also sets the request id if anything is set globally
            builder.useexceptionhandlermiddleware();

            // if the request is for websockets upgrade we fork into a different pipeline
            builder.mapwhen(context => context.httpcontext.websockets.iswebsocketrequest,
                app =>
                {
                    app.usedownstreamroutefindermiddleware();
                    app.usedownstreamrequestinitialiser();
                    app.useloadbalancingmiddleware();
                    app.usedownstreamurlcreatormiddleware();
                    app.usewebsocketsproxymiddleware();
                });

            // allow the user to respond with absolutely anything they want.
            builder.useifnotnull(pipelineconfiguration.preerrorrespondermiddleware);

            // this is registered first so it can catch any errors and issue an appropriate response
            builder.userespondermiddleware();

            // then we get the downstream route information
            builder.usedownstreamroutefindermiddleware();

            // this security module, ip whitelist blacklist, extended security mechanism
            builder.usesecuritymiddleware();

            //expand other branch pipes
            if (pipelineconfiguration.mapwhenocelotpipeline != null)
            {
                foreach (var pipeline in pipelineconfiguration.mapwhenocelotpipeline)
                {
                    builder.mapwhen(pipeline);
                }
            }

            // now we have the ds route we can transform headers and stuff?
            builder.usehttpheaderstransformationmiddleware();

            // initialises downstream request
            builder.usedownstreamrequestinitialiser();

            // we check whether the request is ratelimit, and if there is no continue processing
            builder.useratelimiting();

            // this adds or updates the request id (initally we try and set this based on global config in the error handling middleware)
            // if anything was set at global level and we have a different setting at re route level the global stuff will be overwritten
            // this means you can get a scenario where you have a different request id from the first piece of middleware to the request id middleware.
            builder.userequestidmiddleware();

            // allow pre authentication logic. the idea being people might want to run something custom before what is built in.
            builder.useifnotnull(pipelineconfiguration.preauthenticationmiddleware);

            // now we know where the client is going to go we can authenticate them.
            // we allow the ocelot middleware to be overriden by whatever the
            // user wants
            if (pipelineconfiguration.authenticationmiddleware == null)
            {
                builder.useauthenticationmiddleware();
            }
            else
            {
                builder.use(pipelineconfiguration.authenticationmiddleware);
            }

            // the next thing we do is look at any claims transforms in case this is important for authorisation
            builder.useclaimstoclaimsmiddleware();

            // allow pre authorisation logic. the idea being people might want to run something custom before what is built in.
            builder.useifnotnull(pipelineconfiguration.preauthorisationmiddleware);

            // now we have authenticated and done any claims transformation we 
            // can authorise the request
            // we allow the ocelot middleware to be overriden by whatever the
            // user wants
            if (pipelineconfiguration.authorisationmiddleware == null)
            {//使用自定义认证,移除默认的认证方式
                //builder.useauthorisationmiddleware();
            }
            else
            {
                builder.use(pipelineconfiguration.authorisationmiddleware);
            }

            // now we can run the claims to headers transformation middleware
            builder.useclaimstoheadersmiddleware();

            // allow the user to implement their own query string manipulation logic
            builder.useifnotnull(pipelineconfiguration.prequerystringbuildermiddleware);

            // now we can run any claims to query string transformation middleware
            builder.useclaimstoquerystringmiddleware();

            // get the load balancer for this request
            builder.useloadbalancingmiddleware();

            // this takes the downstream route we retrieved earlier and replaces any placeholders with the variables that should be used
            builder.usedownstreamurlcreatormiddleware();

            // not sure if this is the best place for this but we use the downstream url 
            // as the basis for our cache key.
            builder.useoutputcachemiddleware();

            //we fire off the request and set the response on the scoped data repo
            builder.usehttprequestermiddleware();

            return builder.build();
        }
    private static void useifnotnull(this iocelotpipelinebuilder builder,
            func<downstreamcontext, func<task>, task> middleware)
        {
            if (middleware != null)
            {
                builder.use(middleware);
            }
        }
    }

限流中间件实现解析

实现代码如下builder.useratelimiting();,我们转到定义,得到如下代码,详细的实现逻辑在clientratelimitmiddleware方法里,继续转定义到这个方法,我把方法里用到的内容注释了下。

public static class ratelimitmiddlewareextensions
{
    public static iocelotpipelinebuilder useratelimiting(this iocelotpipelinebuilder builder)
    {
        return builder.usemiddleware<clientratelimitmiddleware>();
    }
}

public class clientratelimitmiddleware : ocelotmiddleware
{
        private readonly ocelotrequestdelegate _next;
        private readonly iratelimitcounterhandler _counterhandler;
        private readonly clientratelimitprocessor _processor;

        public clientratelimitmiddleware(ocelotrequestdelegate next,
            iocelotloggerfactory loggerfactory,
            iratelimitcounterhandler counterhandler)
                :base(loggerfactory.createlogger<clientratelimitmiddleware>())
        {
            _next = next;
            _counterhandler = counterhandler;
            _processor = new clientratelimitprocessor(counterhandler);
        }
        //熟悉的tnvoke方法,所有的逻辑都在此方法里。
        public async task invoke(downstreamcontext context)
        {
            var options = context.downstreamreroute.ratelimitoptions;

            // 校验是否启用限流配置
            if (!context.downstreamreroute.enableendpointendpointratelimiting)
            {//未启用直接进入下一个中间件
                logger.loginformation($"endpointratelimiting is not enabled for {context.downstreamreroute.downstreampathtemplate.value}");
                await _next.invoke(context);
                return;
            }

            // 获取配置的校验客户端的方式
            var identity = setidentity(context.httpcontext, options);

            // 校验是否为白名单
            if (iswhitelisted(identity, options))
            {//白名单直接放行
                logger.loginformation($"{context.downstreamreroute.downstreampathtemplate.value} is white listed from rate limiting");
                await _next.invoke(context);
                return;
            }

            var rule = options.ratelimitrule;
            if (rule.limit > 0)
            {//限流数是否大于0
                // 获取当前客户端请求情况,这里需要注意_processor是从哪里注入的,后续重
                var counter = _processor.processrequest(identity, options);

                // 校验请求数是否大于限流数
                if (counter.totalrequests > rule.limit)
                {
                    //获取下次有效请求的时间,就是避免每次请求,都校验一次
                    var retryafter = _processor.retryafterfrom(counter.timestamp, rule);

                    // 写入日志
                    logblockedrequest(context.httpcontext, identity, counter, rule, context.downstreamreroute);

                    var retrystring = retryafter.tostring(system.globalization.cultureinfo.invariantculture);

                    // 抛出超出限流异常并把下次可请求时间写入header里。
                    await returnquotaexceededresponse(context.httpcontext, options, retrystring);

                    return;
                }
            }

            //如果启用了限流头部
            if (!options.disableratelimitheaders)
            {
                var headers = _processor.getratelimitheaders(context.httpcontext, identity, options);
                context.httpcontext.response.onstarting(setratelimitheaders, state: headers);
            }
            //进入下一个中间件
            await _next.invoke(context);
        }

        public virtual clientrequestidentity setidentity(httpcontext httpcontext, ratelimitoptions option)
        {
            var clientid = "client";
            if (httpcontext.request.headers.keys.contains(option.clientidheader))
            {
                clientid = httpcontext.request.headers[option.clientidheader].first();
            }

            return new clientrequestidentity(
                clientid,
                httpcontext.request.path.tostring().tolowerinvariant(),
                httpcontext.request.method.tolowerinvariant()
                );
        }

        public bool iswhitelisted(clientrequestidentity requestidentity, ratelimitoptions option)
        {
            if (option.clientwhitelist.contains(requestidentity.clientid))
            {
                return true;
            }

            return false;
        }

        public virtual void logblockedrequest(httpcontext httpcontext, clientrequestidentity identity, ratelimitcounter counter, ratelimitrule rule, downstreamreroute downstreamreroute)
        {
            logger.loginformation(
                $"request {identity.httpverb}:{identity.path} from clientid {identity.clientid} has been blocked, quota {rule.limit}/{rule.period} exceeded by {counter.totalrequests}. blocked by rule { downstreamreroute.upstreampathtemplate.originalvalue }, traceidentifier {httpcontext.traceidentifier}.");
        }

        public virtual task returnquotaexceededresponse(httpcontext httpcontext, ratelimitoptions option, string retryafter)
        {
            var message = string.isnullorempty(option.quotaexceededmessage) ? $"api calls quota exceeded! maximum admitted {option.ratelimitrule.limit} per {option.ratelimitrule.period}." : option.quotaexceededmessage;

            if (!option.disableratelimitheaders)
            {
                httpcontext.response.headers["retry-after"] = retryafter;
            }

            httpcontext.response.statuscode = option.httpstatuscode;
            return httpcontext.response.writeasync(message);
        }

        private task setratelimitheaders(object ratelimitheaders)
        {
            var headers = (ratelimitheaders)ratelimitheaders;

            headers.context.response.headers["x-rate-limit-limit"] = headers.limit;
            headers.context.response.headers["x-rate-limit-remaining"] = headers.remaining;
            headers.context.response.headers["x-rate-limit-reset"] = headers.reset;

            return task.completedtask;
        }
   }

通过源码解析,发现实现一个限流还是很简单的吗!再进一步解析,iratelimitcounterhandler clientratelimitprocessor里的相关接口又是怎么实现的呢?这时候我们就需要了解下.netcore 的运行原理,其中configureservices方法实现了依赖注入(di)的配置。这时候我们看下ocelot是在哪里进行注入的呢?

services.addocelot()是不是印象深刻呢?原来所有的注入信息都写在这里,那么问题简单了,ctrl+f查找addocelot方法,马上就能定位到servicecollectionextensions方法,然后再转到定义ocelotbuilder

public static class servicecollectionextensions
{
    public static iocelotbuilder addocelot(this iservicecollection services)
    {
        var service = services.first(x => x.servicetype == typeof(iconfiguration));
        var configuration = (iconfiguration)service.implementationinstance;
        return new ocelotbuilder(services, configuration);
    }

    public static iocelotbuilder addocelot(this iservicecollection services, iconfiguration configuration)
    {
        return new ocelotbuilder(services, configuration);
    }
}

又摸到大动脉啦,现在问题迎刃而解,原来所有的注入都写在这里,从这里可以找下我们熟悉的几个接口注入。

public ocelotbuilder(iservicecollection services, iconfiguration configurationroot)
{
    configuration = configurationroot;
    services = services;
    services.configure<fileconfiguration>(configurationroot);

    services.tryaddsingleton<iocelotcache<fileconfiguration>, inmemorycache<fileconfiguration>>();
    services.tryaddsingleton<iocelotcache<cachedresponse>, inmemorycache<cachedresponse>>();
    services.tryaddsingleton<ihttpresponseheaderreplacer, httpresponseheaderreplacer>();
    services.tryaddsingleton<ihttpcontextrequestheaderreplacer, httpcontextrequestheaderreplacer>();
    services.tryaddsingleton<iheaderfindandreplacecreator, headerfindandreplacecreator>();
    services.tryaddsingleton<iinternalconfigurationcreator, fileinternalconfigurationcreator>();
    services.tryaddsingleton<iinternalconfigurationrepository, inmemoryinternalconfigurationrepository>();
    services.tryaddsingleton<iconfigurationvalidator, fileconfigurationfluentvalidator>();
    services.tryaddsingleton<hostandportvalidator>();
    services.tryaddsingleton<ireroutescreator, reroutescreator>();
    services.tryaddsingleton<iaggregatescreator, aggregatescreator>();
    services.tryaddsingleton<ireroutekeycreator, reroutekeycreator>();
    services.tryaddsingleton<iconfigurationcreator, configurationcreator>();
    services.tryaddsingleton<idynamicscreator, dynamicscreator>();
    services.tryaddsingleton<iloadbalanceroptionscreator, loadbalanceroptionscreator>();
    services.tryaddsingleton<reroutefluentvalidator>();
    services.tryaddsingleton<fileglobalconfigurationfluentvalidator>();
    services.tryaddsingleton<fileqosoptionsfluentvalidator>();
    services.tryaddsingleton<iclaimstothingcreator, claimstothingcreator>();
    services.tryaddsingleton<iauthenticationoptionscreator, authenticationoptionscreator>();
    services.tryaddsingleton<iupstreamtemplatepatterncreator, upstreamtemplatepatterncreator>();
    services.tryaddsingleton<irequestidkeycreator, requestidkeycreator>();
    services.tryaddsingleton<iserviceproviderconfigurationcreator,serviceproviderconfigurationcreator>();
    services.tryaddsingleton<iqosoptionscreator, qosoptionscreator>();
    services.tryaddsingleton<irerouteoptionscreator, rerouteoptionscreator>();
    services.tryaddsingleton<iratelimitoptionscreator, ratelimitoptionscreator>();
    services.tryaddsingleton<ibaseurlfinder, baseurlfinder>();
    services.tryaddsingleton<iregioncreator, regioncreator>();
    services.tryaddsingleton<ifileconfigurationrepository, diskfileconfigurationrepository>();
    services.tryaddsingleton<ifileconfigurationsetter, fileandinternalconfigurationsetter>();
    services.tryaddsingleton<iservicediscoveryproviderfactory, servicediscoveryproviderfactory>();
    services.tryaddsingleton<iloadbalancerfactory, loadbalancerfactory>();
    services.tryaddsingleton<iloadbalancerhouse, loadbalancerhouse>();
    services.tryaddsingleton<iocelotloggerfactory, aspdotnetloggerfactory>();
    services.tryaddsingleton<iremoveoutputheaders, removeoutputheaders>();
    services.tryaddsingleton<iclaimtothingconfigurationparser, claimtothingconfigurationparser>();
    services.tryaddsingleton<iclaimsauthoriser, claimsauthoriser>();
    services.tryaddsingleton<iscopesauthoriser, scopesauthoriser>();
    services.tryaddsingleton<iaddclaimstorequest, addclaimstorequest>();
    services.tryaddsingleton<iaddheaderstorequest, addheaderstorequest>();
    services.tryaddsingleton<iaddqueriestorequest, addqueriestorequest>();
    services.tryaddsingleton<iclaimsparser, claimsparser>();
    services.tryaddsingleton<iurlpathtourltemplatematcher, regexurlmatcher>();
    services.tryaddsingleton<iplaceholdernameandvaluefinder, urlpathplaceholdernameandvaluefinder>();
    services.tryaddsingleton<idownstreampathplaceholderreplacer, downstreamtemplatepathplaceholderreplacer>();
    services.tryaddsingleton<idownstreamrouteprovider, downstreamroutefinder>();
    services.tryaddsingleton<idownstreamrouteprovider, downstreamroutecreator>();
    services.tryaddsingleton<idownstreamrouteproviderfactory, downstreamrouteproviderfactory>();
    services.tryaddsingleton<ihttprequester, httpclienthttprequester>();
    services.tryaddsingleton<ihttpresponder, httpcontextresponder>();
    services.tryaddsingleton<ierrorstohttpstatuscodemapper, errorstohttpstatuscodemapper>();
    services.tryaddsingleton<iratelimitcounterhandler, memorycacheratelimitcounterhandler>();
    services.tryaddsingleton<ihttpclientcache, memoryhttpclientcache>();
    services.tryaddsingleton<irequestmapper, requestmapper>();
    services.tryaddsingleton<ihttphandleroptionscreator, httphandleroptionscreator>();
    services.tryaddsingleton<idownstreamaddressescreator, downstreamaddressescreator>();
    services.tryaddsingleton<idelegatinghandlerhandlerfactory, delegatinghandlerhandlerfactory>();
    services.tryaddsingleton<ihttprequester, httpclienthttprequester>();

    // see this for why we register this as singleton http://*.com/questions/37371264/invalidoperationexception-unable-to-resolve-service-for-type-microsoft-aspnetc
    // could maybe use a scoped data repository
    services.tryaddsingleton<ihttpcontextaccessor, httpcontextaccessor>();
    services.tryaddsingleton<irequestscopeddatarepository, httpdatarepository>();
    services.addmemorycache();
    services.tryaddsingleton<ocelotdiagnosticlistener>();
    services.tryaddsingleton<imultiplexer, multiplexer>();
    services.tryaddsingleton<iresponseaggregator, simplejsonresponseaggregator>();
    services.tryaddsingleton<itracinghandlerfactory, tracinghandlerfactory>();
    services.tryaddsingleton<ifileconfigurationpolleroptions, inmemoryfileconfigurationpolleroptions>();
    services.tryaddsingleton<iaddheaderstoresponse, addheaderstoresponse>();
    services.tryaddsingleton<iplaceholders, placeholders>();
    services.tryaddsingleton<iresponseaggregatorfactory, inmemoryresponseaggregatorfactory>();
    services.tryaddsingleton<idefinedaggregatorprovider, servicelocatordefinedaggregatorprovider>();
    services.tryaddsingleton<idownstreamrequestcreator, downstreamrequestcreator>();
    services.tryaddsingleton<iframeworkdescription, frameworkdescription>();
    services.tryaddsingleton<iqosfactory, qosfactory>();
    services.tryaddsingleton<iexceptiontoerrormapper, httpexeptiontoerrormapper>();

    //add security 
    this.addsecurity();

    //add asp.net services..
    var assembly = typeof(fileconfigurationcontroller).gettypeinfo().assembly;

    services.addmvccore()
        .addapplicationpart(assembly)
        .addcontrollersasservices()
        .addauthorization()
        .addjsonformatters();

    services.addlogging();
    services.addmiddlewareanalysis();
    services.addwebencoders();
}

至此ocelot源码解析就到这里了,其他的具体实现代码就根据流程一个一个查看即可,这里就不详细讲解了,因为我们已经掌握整个ocelot代码的运行原理和实现方式及流程,项目里其他的一大堆的代码都是围绕这个流程去一步一步实现的。

有没有感觉添加一个中间件不是很复杂呢,是不是都跃跃欲试,准备尝试开发自己的自定义中间件啦,本篇就不介绍中间件的具体开发流程了,后续实战中会包含部分项目中需要用到的中间件,到时候会详细讲解如何规划和开发一个满足自己项目需求的中间件。

二、结合项目梳理功能

在完整学习完ocelot文档和源码后,我们基本掌握了ocelot目前已经实现的功能,再结合我们实际项目需求,我们梳理下还有哪些功能可能需要自己扩展实现。

项目设计网关基本需求包括路由、认证、授权、限流、缓存,仔细学习文档和源码后发现功能都已经存在,那是不是我们就可以直接拿来使用呢?这时候我们需要拿出一些复杂业务场景来对号入座,看能否实现复杂场景的一些应用。

1、授权

能否为每一个客户端设置独立的访问权限,如果客户端a可以访问服务a、服务b,客户端b只能访问服务a,从网关层面直接授权,不满足需求不路由到具体服务。从文档和代码分析后发现暂时未实现。

2、限流

能否为每一个客户端设置不能限流规则,例如客户端a为我们内容应用,我希望对服务a不启用限流,客户端b为第三方接入应用,我需要b访问服务a访问进行单独限流(30次/分钟),看能否通过配置实现自定义限流。从文档和代码分析后发现暂时未实现。

3、缓存

通过代码发现目前缓存实现的只是dictionary方式实现的缓存,不能实现分布式结构的应用。

通过分析我们发现列举的5个基本需求,尽然有3个在我们实际项目应用中可能会存在问题,如果不解决这些问题,很难直接拿这个完美的网关项目应用到正式项目,所以我们到通过扩展ocelot方法来实现我们的目的。

如何扩展呢

为了满足我们项目应用的需要,我们需要为每一个路由进行单独设置,如果还采用配置文件的方式,肯定无法满足需求,且后续网关动态增加路由、授权、限流等无法控制,所以我们需要把网关配置信息从配置文件中移到数据库中,由数据库中的路由表、限流表、授权表等方式记录当前网关的应用,且后续扩展直接在数据库中增加或减少相关配置,然后动态更新网关配置实现网关的高可用。

想一想是不是有点小激动,原来只要稍微改造下宝骏瞬间变宝马,那接下来的课程就是网关改造之旅,我会从设计、思想、编码等方面讲解下如何实现我们的第一辆宝马。

本系列文章我也是边想边写边实现,如果发现中间有任何描述或实现不当的地方,也请各位大神批评指正,我会第一时间整理并修正,避免让后续学习的人走弯路。