【.NET Core项目实战-统一认证平台】第七章 网关篇-自定义客户端限流
【.net core项目实战-统一认证平台】开篇及目录索引
上篇文章我介绍了如何在网关上增加自定义客户端授权功能,从设计到编码实现,一步一步详细讲解,相信大家也掌握了自定义中间件的开发技巧了,本篇我们将介绍如何实现自定义客户端的限流功能,来进一步完善网关的基础功能。
.netcore项目实战交流群(637326624),有兴趣的朋友可以在群里交流讨论。
一、功能描述
限流就是为了保证网关在高并发或瞬时并发时,在服务能承受范围内,牺牲部分请求为代价,保证系统的整体可用性而做的安全策略,避免单个服务影响整体网关的服务能力。
比如网关有商品查询接口 ,能接受的极限请求是每秒100次查询,如果此时不限流,可能因为瞬时请求太大,造成服务卡死或崩溃的情况,这种情况可以使用ocelot
客户端全局限流即可满足需求,现在又有一个需求,我需要把接口开放给a公司,他们也要查询这个商品接口,这时a公司请求频率也是我们设置的每秒100次请求,显然我们不希望a公司有这么高的请求频率,我只会给a公司最大每秒一次的请求,那怎么实现呢?这时我们就无法通过ocelot
配置限流来进行自定义控制了,这块就需要我们增加自定义限流管道来实现功能。
下面我们就该功能如何实现展开讲解,希望大家先理解下功能需求,然后在延伸到具体实现。
二、数据库设计
限流这块设计表结构和关系如下。
主要有限流规则表、路由限流规则表、限流组表、限流组策略表、客户端授权限流组表、客户端白名单表组成,设计思想就是客户端请求时先检查是否在白名单,如果白名单不存在,就检查是否在限流组里,如果在限流组里校验限流的规则是什么,然后比对这个规则和当前请求次数看是否能够继续访问,如果超过限流策略直接返回429状态,否则路由到下端请求。
梳理下后发现流程不是很复杂,最起码实现的思路非常清晰,然后我们就运用上篇自定义授权中间件的方式来开发我们第二个中间件,自定义限流中间件。
三、功能实现
1、功能开启配置
网关应该支持自定义客户端限流中间件是否启用,因为一些小型项目是不需要对每个客户端进行单独限流的,中型和大型项目才有可能遇到自定义配置情况,所以我们需要在配置文件增加配置选项。在ahphocelotconfiguration.cs
配置类中增加属性,默认不开启。
/// <summary> /// 金焰的世界 /// 2018-11-18 /// 是否开启自定义限流,默认不开启 /// </summary> public bool clientratelimit { get; set; } = false; /// <summary> /// 金焰的世界 /// 2018-11-18 /// 客户端限流缓存时间,默认30分钟 /// </summary> public int clientratelimitcachetime { get; set; } = 1800;
那我们如何把自定义的限流增加到网关流程里呢?这块我们就需要订制自己的限流中间件。
2、实现客户端限流中间件
首先我们定义一个自定义限流中间件ahphclientratelimitmiddleware
,需要继承ocelotmiddleware
,然后我们要实现invoke
方法,详细代码如下。
using ctr.ahphocelot.configuration; using ctr.ahphocelot.errors; using ocelot.logging; using ocelot.middleware; using system; using system.collections.generic; using system.linq; using system.text; using system.threading.tasks; namespace ctr.ahphocelot.ratelimit.middleware { /// <summary> /// 金焰的世界 /// 2018-11-18 /// 自定义客户端限流中间件 /// </summary> public class ahphclientratelimitmiddleware : ocelotmiddleware { private readonly iclientratelimitprocessor _clientratelimitprocessor; private readonly ocelotrequestdelegate _next; private readonly ahphocelotconfiguration _options; public ahphclientratelimitmiddleware(ocelotrequestdelegate next, iocelotloggerfactory loggerfactory, iclientratelimitprocessor clientratelimitprocessor, ahphocelotconfiguration options) : base(loggerfactory.createlogger<ahphclientratelimitmiddleware>()) { _next = next; _clientratelimitprocessor = clientratelimitprocessor; _options = options; } public async task invoke(downstreamcontext context) { var clientid = "client_cjy"; //使用默认的客户端 if (!context.iserror) { if (!_options.clientratelimit) { logger.loginformation($"未启用客户端限流中间件"); await _next.invoke(context); } else { //非认证的渠道 if (!context.downstreamreroute.isauthenticated) { if (context.httpcontext.request.headers.keys.contains(_options.clientkey)) { clientid = context.httpcontext.request.headers[_options.clientkey].first(); } } else {//认证过的渠道,从claim中提取 var clientclaim = context.httpcontext.user.claims.firstordefault(p => p.type == _options.clientkey); if (!string.isnullorempty(clientclaim?.value)) { clientid = clientclaim?.value; } } //路由地址 var path = context.downstreamreroute.upstreampathtemplate.originalvalue; //1、校验路由是否有限流策略 //2、校验客户端是否被限流了 //3、校验客户端是否启动白名单 //4、校验是否触发限流及计数 if (await _clientratelimitprocessor.checkclientratelimitresultasync(clientid, path)) { await _next.invoke(context); } else { var error = new ratelimitoptionserror($"请求路由 {context.httpcontext.request.path}触发限流策略"); logger.logwarning($"路由地址 {context.httpcontext.request.path} 触发限流策略. {error}"); setpipelineerror(context, error); } } } else { await _next.invoke(context); } } } }
首先我们来分析下我们的代码,为了知道是哪个客户端请求了我们网关,需要提取clientid
,分别从无需授权接口和需要授权接口两个方式提取,如果提取不到值直接给定默认值,放到全局限流里,防止绕过限流策略。然后根据客户端通过4步检验下是否允许访问(后面会介绍这4步怎么实现),如果满足限流策略直接返回限流错误提醒。
有了这个中间件,那么如何添加到ocelot的管道里呢?上一篇介绍的非常详细,这篇我就不介绍了,自定义限流中间件扩展ahphclientratelimitmiddlewareextensions
,代码如下。
using ocelot.middleware.pipeline; using system; using system.collections.generic; using system.text; namespace ctr.ahphocelot.ratelimit.middleware { /// <summary> /// 金焰的世界 /// 2018-11-18 /// 限流中间件扩展 /// </summary> public static class ahphclientratelimitmiddlewareextensions { public static iocelotpipelinebuilder useahphauthenticationmiddleware(this iocelotpipelinebuilder builder) { return builder.usemiddleware<ahphclientratelimitmiddleware>(); } } }
有了这个中间件扩展后,我们就在管道的合适地方加入我们自定义的中间件。我们添加我们自定义的管道扩展ocelotpipelineextensions
,然后把自定义限流中间件加入到认证之后。
//添加自定义限流中间件 2018-11-18 金焰的世界 builder.useahphclientratelimitmiddleware();
现在我们完成了网关的扩展和应用,是时候把定义的iclientratelimitprocessor
接口实现了 ,是不是感觉做一个中间件很简单呢?而且每一步都是层层关联,只要一步一步按照自己的想法往下写就能实现。
3、结合数据库实现校验及缓存
首先我们新建ahphclientratelimitprocessor
类来实现接口,中间增加必要的缓存和业务逻辑,详细代码如下。
using ctr.ahphocelot.configuration; using ocelot.cache; using system; using system.collections.generic; using system.text; using system.threading.tasks; namespace ctr.ahphocelot.ratelimit { /// <summary> /// 金焰的世界 /// 2018-11-19 /// 实现客户端限流处理器 /// </summary> public class ahphclientratelimitprocessor : iclientratelimitprocessor { private readonly ahphocelotconfiguration _options; private readonly iocelotcache<clientrolemodel> _ocelotcache; private readonly iocelotcache<ratelimitrulemodel> _ratelimitrulecache; private readonly iocelotcache<ahphclientratelimitcounter?> _clientratelimitcounter; private readonly iclientratelimitrepository _clientratelimitrepository; private static readonly object _processlocker = new object(); public ahphclientratelimitprocessor(ahphocelotconfiguration options,iclientratelimitrepository clientratelimitrepository, iocelotcache<ahphclientratelimitcounter?> clientratelimitcounter, iocelotcache<clientrolemodel> ocelotcache, iocelotcache<ratelimitrulemodel> ratelimitrulecache) { _options = options; _clientratelimitrepository = clientratelimitrepository; _clientratelimitcounter = clientratelimitcounter; _ocelotcache = ocelotcache; _ratelimitrulecache = ratelimitrulecache; } /// <summary> /// 校验客户端限流结果 /// </summary> /// <param name="clientid">客户端id</param> /// <param name="path">请求地址</param> /// <returns></returns> public async task<bool> checkclientratelimitresultasync(string clientid, string path) { var result = false; var clientrule = new list<ahphclientratelimitoptions>(); //1、校验路由是否有限流策略 result = !await checkrerouteruleasync(path); if (!result) {//2、校验客户端是否被限流了 var limitresult = await checkclientratelimitasync(clientid, path); result = !limitresult.ratelimit; clientrule = limitresult.ratelimitoptions; } if (!result) {//3、校验客户端是否启动白名单 result = await checkclientreroutewhitelistasync(clientid, path); } if (!result) {//4、校验是否触发限流及计数 result = checkratelimitresult(clientrule); } return result; } /// <summary> /// 检验是否启用限流规则 /// </summary> /// <param name="path">请求地址</param> /// <returns></returns> private async task<bool> checkrerouteruleasync(string path) { var region = _options.rediskeyprefix + "checkrerouteruleasync"; var key = region + path; var cacheresult = _ocelotcache.get(key, region); if (cacheresult != null) {//提取缓存数据 return cacheresult.role; } else {//重新获取限流策略 var result = await _clientratelimitrepository.checkrerouteruleasync(path); _ocelotcache.add(key, new clientrolemodel() { cachetime = datetime.now, role = result }, timespan.fromseconds(_options.clientratelimitcachetime), region); return result; } } /// <summary> /// 校验客户端限流规则 /// </summary> /// <param name="clientid">客户端id</param> /// <param name="path">请求地址</param> /// <returns></returns> private async task<(bool ratelimit, list<ahphclientratelimitoptions> ratelimitoptions)> checkclientratelimitasync(string clientid, string path) { var region = _options.rediskeyprefix + "checkclientratelimitasync"; var key = region + clientid + path; var cacheresult = _ratelimitrulecache.get(key, region); if (cacheresult != null) {//提取缓存数据 return (cacheresult.ratelimit, cacheresult.ratelimitoptions); } else {//重新获取限流策略 var result = await _clientratelimitrepository.checkclientratelimitasync(clientid, path); _ratelimitrulecache.add(key, new ratelimitrulemodel() { ratelimit=result.ratelimit, ratelimitoptions=result.ratelimitoptions }, timespan.fromseconds(_options.clientratelimitcachetime), region); return result; } } /// <summary> /// 校验是否设置了路由白名单 /// </summary> /// <param name="clientid">客户端id</param> /// <param name="path">请求地址</param> /// <returns></returns> private async task<bool> checkclientreroutewhitelistasync(string clientid, string path) { var region = _options.rediskeyprefix + "checkclientreroutewhitelistasync"; var key = region +clientid+ path; var cacheresult = _ocelotcache.get(key, region); if (cacheresult != null) {//提取缓存数据 return cacheresult.role; } else {//重新获取限流策略 var result = await _clientratelimitrepository.checkclientreroutewhitelistasync(clientid,path); _ocelotcache.add(key, new clientrolemodel() { cachetime = datetime.now, role = result }, timespan.fromseconds(_options.clientratelimitcachetime), region); return result; } } /// <summary> /// 校验完整的限流规则 /// </summary> /// <param name="ratelimitoptions">限流配置</param> /// <returns></returns> private bool checkratelimitresult(list<ahphclientratelimitoptions> ratelimitoptions) { bool result = true; if (ratelimitoptions != null && ratelimitoptions.count > 0) {//校验策略 foreach (var op in ratelimitoptions) { ahphclientratelimitcounter counter = new ahphclientratelimitcounter(datetime.utcnow, 1); //分别对每个策略校验 var enableprefix = _options.rediskeyprefix + "ratelimitrule"; var key = ahphocelothelper.computecounterkey(enableprefix, op.clientid, op.period, op.ratelimitpath); var periodtimestamp = ahphocelothelper.converttosecond(op.period); lock (_processlocker) { var ratelimitcounter = _clientratelimitcounter.get(key, enableprefix); if (ratelimitcounter.hasvalue) {//提取当前的计数情况 // 请求次数增长 var totalrequests = ratelimitcounter.value.totalrequests + 1; // 深拷贝 counter = new ahphclientratelimitcounter(ratelimitcounter.value.timestamp, totalrequests); } else {//写入限流策略 _clientratelimitcounter.add(key, counter,timespan.fromseconds(periodtimestamp), enableprefix); } } if (counter.totalrequests > op.limit) {//更新请求记录,并标记为失败 result = false; } if (counter.totalrequests > 1 && counter.totalrequests <= op.limit) {//更新缓存配置信息 //获取限流剩余时间 var cur = (int)(counter.timestamp.addseconds(periodtimestamp) - datetime.utcnow).totalseconds; _clientratelimitcounter.add(key, counter, timespan.fromseconds(cur), enableprefix); } } } return result; } } }
我们来分析下这块代码,里面涉及了限流的提取和实现规则,首先我们注入了数据库实体接口和缓存信息,实现步骤是参照之前的流程。
主要流程如下:
1、路由是否启用限流,如果未启用直接完成校验,如果进行第2步判断.
2、客户端对应的路由是否设置了限流规则,如果未设置,直接完成校验,否则进入第3步判断.
3、客户端是否开启了路由白名单功能,如果开启了直接完成校验,否则进入第4步。
4、使用redis来进行限流的判断。使用的就是计数器方法,结合redis设置key的过期时间来实现的。
为了减少后端请求,在数据库提取的方法前都加入了缓存,现在我们需要把用到的接口添加到入口进行注入。
builder.services.addsingleton<iocelotcache<ratelimitrulemodel>, inrediscache<ratelimitrulemodel>>(); builder.services.addsingleton<iocelotcache<ahphclientratelimitcounter?>, inrediscache<ahphclientratelimitcounter?>>();
现在我们还剩下iclientratelimitrepository
接口未实现,现在只要实现这个接口,然后注入下,我们就完成了限流中间件的开发了,我们根据限流的流程,梳理了实现,现在有3个方法需要进行实现。
新建sqlserverclientratelimitrepository
类,来开始实现我们与数据库的操作,有了上面的分析思路,现在就是把一个一个详细确定的方法实现而已,太简单了,只要花了几分钟后,就可以瞬间写出如下代码。
using ctr.ahphocelot.configuration; using ctr.ahphocelot.ratelimit; using dapper; using system; using system.collections.generic; using system.data.sqlclient; using system.text; using system.threading.tasks; namespace ctr.ahphocelot.database.sqlserver { /// <summary> /// 金焰的世界 /// 2018-11-19 /// 客户端限流信息提取 /// </summary> public class sqlserverclientratelimitrepository : iclientratelimitrepository { private readonly ahphocelotconfiguration _option; public sqlserverclientratelimitrepository(ahphocelotconfiguration option) { _option = option; } /// <summary> /// 校验客户端限流规则 /// </summary> /// <param name="clientid">客户端id</param> /// <param name="path">请求地址</param> /// <returns></returns> public async task<(bool ratelimit, list<ahphclientratelimitoptions> ratelimitoptions)> checkclientratelimitasync(string clientid, string path) { using (var connection = new sqlconnection(_option.dbconnectionstrings)) { string sql = @"select distinct upstreampathtemplate as ratelimitpath,limitperiod as period,limitnum as limit,clientid from ahphreroute t1 inner join ahphreroutelimitrule t2 on t1.rerouteid=t2.rerouteid inner join ahphlimitrule t3 on t2.ruleid=t3.ruleid inner join ahphlimitgrouprule t4 on t2.reroutelimitid=t4.reroutelimitid inner join ahphlimitgroup t5 on t4.limitgroupid=t5.limitgroupid inner join ahphclientlimitgroup t6 on t5.limitgroupid=t6.limitgroupid inner join ahphclients t7 on t6.id=t7.id where t1.infostatus=1 and t1.upstreampathtemplate=@path and t3.infostatus=1 and t5.infostatus=1 and clientid=@clientid and enabled=1"; var result = (await connection.queryasync<ahphclientratelimitoptions>(sql, new { clientid, path }))?.aslist(); if (result != null && result.count > 0) { return (true, result); } else { return (false, null); } } } /// <summary> /// 校验是否设置了路由白名单 /// </summary> /// <param name="clientid">客户端id</param> /// <param name="path">请求地址</param> /// <returns></returns> public async task<bool> checkclientreroutewhitelistasync(string clientid, string path) { using (var connection = new sqlconnection(_option.dbconnectionstrings)) { string sql = @"select count(1) from ahphreroute t1 inner join ahphclientreroutewhitelist t2 on t1.rerouteid=t2.rerouteid inner join ahphclients t3 on t2.id=t3.id where t1.infostatus=1 and upstreampathtemplate=@path and clientid=@clientid and enabled=1"; var result = await connection.queryfirstordefaultasync<int>(sql, new { clientid,path }); return result > 0; } } /// <summary> /// 校验是否启用限流规则 /// </summary> /// <param name="path">请求地址</param> /// <returns></returns> public async task<bool> checkrerouteruleasync(string path) { using (var connection = new sqlconnection(_option.dbconnectionstrings)) { string sql = @"select count(1) from ahphreroute t1 inner join ahphreroutelimitrule t2 on t1.rerouteid=t2.rerouteid inner join ahphlimitrule t3 on t2.ruleid=t3.ruleid where t1.infostatus=1 and upstreampathtemplate=@path and t3.infostatus=1"; var result = await connection.queryfirstordefaultasync<int>(sql, new { path }); return result > 0; } } } }
主要就是注意下表之间的关系,把实现注入到addahphocelot
里,现在就可以测试开始自定义客户端限流中间件。
builder.services.addsingleton<iclientratelimitrepository, sqlserverclientratelimitrepository>();
4、测试限流中间件
为了把把所有情况都测试一遍,先从开启限流,什么都不写入看是否能够正常运行。
option.clientratelimit = true;
还记得我们上篇的两个客户端和能访问的页面吗?就用它们来测试,结果显示正常,说明不开启限流没有影响。
开启/cjy/values
2个限流规则,一个每1分钟访问1次,一个每1分钟访问60次。
--1、插入限流规则 insert into ahphlimitrule values('每1分钟访问1次','1m',1,1); insert into ahphlimitrule values('每1分钟访问60次','1m',60,1); --2、应用到/cjy/values路由 insert into ahphreroutelimitrule values(1,1); insert into ahphreroutelimitrule values(2,1);
因为还未给客户端应用规则,所以应该也是可以正常访问,可以使用postman
测试下,测试时需要注意下缓存,因为所有的访问都启用的默认缓存策略,经测试得到预期效果。
现在开始把限流分别应用到客户端1和客户端2,看下限流效果。
--3、插入测试分组 insert into ahphlimitgroup values('限流分组1','',1); insert into ahphlimitgroup values('限流分组2','',1); --4、分组应用策略 insert into ahphlimitgrouprule values(1,1); insert into ahphlimitgrouprule values(2,2); --5、客户端应用限流分组 insert into ahphclientlimitgroup values(2,1); insert into ahphclientlimitgroup values(3,2);
然后使用postman
测试客户端1和客户端2,结果如下,超过设置的频率后不返回结果,达到预期目的,但是返回的是404错误,强迫症患者表示这不优雅啊,应该是429 too many requests
,那我们如何修改呢?
这里就需要了解下错误信息是如何输出的,需要查看ocelot
源码,您会发现ierrorstohttpstatuscodemapper
接口和errorstohttpstatuscodemapper
实现,代码如下,
using system.collections.generic; using system.linq; using ocelot.errors; namespace ocelot.responder { public class errorstohttpstatuscodemapper : ierrorstohttpstatuscodemapper { public int map(list<error> errors) { if (errors.any(e => e.code == oceloterrorcode.unauthenticatederror)) { return 401; } if (errors.any(e => e.code == oceloterrorcode.unauthorizederror || e.code == oceloterrorcode.claimvaluenotauthorisederror || e.code == oceloterrorcode.scopenotauthorisederror || e.code == oceloterrorcode.userdoesnothaveclaimerror || e.code == oceloterrorcode.cannotfindclaimerror)) { return 403; } if (errors.any(e => e.code == oceloterrorcode.requesttimedouterror)) { return 503; } if (errors.any(e => e.code == oceloterrorcode.unabletofinddownstreamrouteerror)) { return 404; } if (errors.any(e => e.code == oceloterrorcode.unabletocompleterequesterror)) { return 500; } return 404; } } }
可以发现因为未定义ratelimitoptionserror
错误的状态码,增加一个判断即可,那我们重写下把,然后集成在我们自己的中间件里,这块在后期有很多扩展能够用到,增加如下代码。
if (errors.any(e => e.code == oceloterrorcode.ratelimitoptionserror)) { return 429; }
然后重新注入下。
builder.services.addsingleton<ierrorstohttpstatuscodemapper, ahpherrorstohttpstatuscodemapper>();
在重新测试下访问限流地址。
奈斯,达到了我们预期的效果,.netcore
开发魅力体现出来了吗?
我们增加客户端1的路由白名单,然后再继续测试看是否解除限流限制?
--6、设置客户端1/cjy/values路由白名单 insert into ahphclientreroutewhitelist values(1,2);
注意测试时清除缓存
经测试不受限流控制,达到了我们最终目的,到此限流功能全部实现。
5、增加mysql支持
直接重写iclientratelimitrepository
实现,然后注入实现。
builder.services.addsingleton<iclientratelimitrepository, mysqlclientratelimitrepository>();
四、总结及预告
本篇我们讲解的是网关如何实现自定义客户端限流功能,从设计到实现一步一步详细讲解,虽然只用一篇就写完了,但是涉及的知识点还是非常多的,希望大家认真理解实现的思想,看我是如何从规划到实现的,为了更好的帮助大家理解。大家可以根据博客内容自己手动实现下,有利于消化,如果在操作中遇到什么问题,可以加.net core项目实战交流群(qq群号:637326624)
咨询作者。
从下一篇开始介绍identityserver4
的相关应用,并配合我们的网关实现认证,在跟我教程学习的朋友,可以自己先预习下。
上一篇: 【解决办法】HTTP状态 404 - 未找到 文.件[/register.jsp] 未找到 源服务器未能找到目标资源的表示或者是不愿公开一个已经存在的资源表示。
下一篇: 缓存反向代理-Varnish
推荐阅读
-
【.NET Core项目实战-统一认证平台】第二章网关篇-重构Ocelot来满足需求
-
【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务
-
【.NET Core项目实战-统一认证平台】第九章 授权篇-使用Dapper持久化IdentityServer4
-
【.NET Core项目实战-统一认证平台】第十二章 授权篇-深入理解JWT生成及验证流程
-
【.NET Core项目实战-统一认证平台】第三章 网关篇-数据库存储配置(1)
-
【.NET Core项目实战-统一认证平台】第六章 网关篇-自定义客户端授权
-
【.NET Core项目实战-统一认证平台】第十三章 授权篇-如何强制有效令牌过期
-
【.NET Core项目实战-统一认证平台】第十四章 授权篇-自定义授权方式
-
【.NET Core项目实战-统一认证平台】第十五章 网关篇-使用二级缓存提升性能
-
【.NET Core项目实战-统一认证平台】第十章 授权篇-客户端授权