使用AspectCore实现AOP模式的Redis缓存
这次的目标是实现通过标注attribute实现缓存的功能,精简代码,减少缓存的代码侵入业务代码。
缓存内容即为service查询汇总的内容,不做其他高大上的功能,提升短时间多次查询的响应速度,适当减轻数据库压力。
在做之前,也去看了easycaching的源码,这次的想法也是源于这里,aop的方式让代码减少耦合,但是缓存策略有限。经过考虑决定,自己实现类似功能,在之后的应用中也方便对缓存策略的扩展。
本文内容也许有点不严谨的地方,仅供参考。同样欢迎各位路过的大佬提出建议。
在项目中加入aspectcore
之前有做aspectcore的总结,相关内容就不再赘述了。
在项目中加入stackexchange.redis
在stackexchange.redis和csredis中纠结了很久,也没有一个特别的有优势,最终选择了stackexchange.redis,没有理由。至于连接超时的问题,可以用异步解决。
- 安装stackexchange.redis
install-package stackexchange.redis -version 2.0.601
- 在appsettings.json配置redis连接信息
{ "redis": { "default": { "connection": "127.0.0.1:6379", "instancename": "rediscache:", "defaultdb": 0 } } }
- redisclient
用于连接redis服务器,包括创建连接,获取数据库等操作
public class redisclient : idisposable { private string _connectionstring; private string _instancename; private int _defaultdb; private concurrentdictionary<string, connectionmultiplexer> _connections; public redisclient(string connectionstring, string instancename, int defaultdb = 0) { _connectionstring = connectionstring; _instancename = instancename; _defaultdb = defaultdb; _connections = new concurrentdictionary<string, connectionmultiplexer>(); } private connectionmultiplexer getconnect() { return _connections.getoradd(_instancename, p => connectionmultiplexer.connect(_connectionstring)); } public idatabase getdatabase() { return getconnect().getdatabase(_defaultdb); } public iserver getserver(string configname = null, int endpointsindex = 0) { var confoption = configurationoptions.parse(_connectionstring); return getconnect().getserver(confoption.endpoints[endpointsindex]); } public isubscriber getsubscriber(string configname = null) { return getconnect().getsubscriber(); } public void dispose() { if (_connections != null && _connections.count > 0) { foreach (var item in _connections.values) { item.close(); } } } }
- 注册服务
redis是单线程的服务,多几个redisclient的实例也是无济于事,所以依赖注入就采用singleton的方式。
public static class redisextensions { public static void configredis(this iservicecollection services, iconfiguration configuration) { var section = configuration.getsection("redis:default"); string _connectionstring = section.getsection("connection").value; string _instancename = section.getsection("instancename").value; int _defaultdb = int.parse(section.getsection("defaultdb").value ?? "0"); services.addsingleton(new redisclient(_connectionstring, _instancename, _defaultdb)); } } public class startup { public void configureservices(iservicecollection services) { services.configredis(configuration); } }
- keygenerator
创建一个缓存key的生成器,以attribute中的cachekeyprefix作为前缀,之后可以扩展批量删除的功能。被拦截方法的方法名和入参也同样作为key的一部分,保证key值不重复。
public static class keygenerator { public static string getcachekey(methodinfo methodinfo, object[] args, string prefix) { stringbuilder cachekey = new stringbuilder(); cachekey.append($"{prefix}_"); cachekey.append(methodinfo.declaringtype.name).append($"_{methodinfo.name}"); foreach (var item in args) { cachekey.append($"_{item}"); } return cachekey.tostring(); } public static string getcachekeyprefix(methodinfo methodinfo, string prefix) { stringbuilder cachekey = new stringbuilder(); cachekey.append(prefix); cachekey.append($"_{methodinfo.declaringtype.name}").append($"_{methodinfo.name}"); return cachekey.tostring(); } }
写一套缓存拦截器
- cacheableattribute
attribute中保存缓存的策略信息,包括过期时间,key值前缀等信息,在使用缓存时可以对这些选项值进行配置。
public class cacheableattribute : attribute { /// <summary> /// 过期时间(秒) /// </summary> public int expiration { get; set; } = 300; /// <summary> /// key值前缀 /// </summary> public string cachekeyprefix { get; set; } = string.empty; /// <summary> /// 是否高可用(异常时执行原方法) /// </summary> public bool ishighavailability { get; set; } = true; /// <summary> /// 只允许一个线程更新缓存(带锁) /// </summary> public bool onceupdate { get; set; } = false; }
- cacheableinterceptor
接下来就是重头戏,拦截器中的逻辑就相对于缓存的相关策略,不用的策略可以分成不同的拦截器。 这里的逻辑参考了easycaching的源码,并加入了redis分布式锁的应用。
public class cacheableinterceptor : abstractinterceptor { [fromcontainer] private redisclient redisclient { get; set; } private idatabase database; private static readonly concurrentdictionary<type, methodinfo> typeoftaskresultmethod = new concurrentdictionary<type, methodinfo>(); public async override task invoke(aspectcontext context, aspectdelegate next) { cacheableattribute attribute = context.getattribute<cacheableattribute>(); if (attribute == null) { await context.invoke(next); return; } try { database = redisclient.getdatabase(); string cachekey = keygenerator.getcachekey(context.servicemethod, context.parameters, attribute.cachekeyprefix); string cachevalue = await getcacheasync(cachekey); type returntype = context.getreturntype(); if (string.isnullorwhitespace(cachevalue)) { if (attribute.onceupdate) { string lockkey = $"lock_{cachekey}"; redisvalue token = environment.machinename; if (await database.locktakeasync(lockkey, token, timespan.fromseconds(10))) { try { var result = await runandgetreturn(context, next); await setcache(cachekey, result, attribute.expiration); return; } finally { await database.lockreleaseasync(lockkey, token); } } else { for (int i = 0; i < 5; i++) { thread.sleep(i * 100 + 500); cachevalue = await getcacheasync(cachekey); if (!string.isnullorwhitespace(cachevalue)) { break; } } if (string.isnullorwhitespace(cachevalue)) { var defaultvalue = createdefaultresult(returntype); context.returnvalue = resultfactory(defaultvalue, returntype, context.isasync()); return; } } } else { var result = await runandgetreturn(context, next); await setcache(cachekey, result, attribute.expiration); return; } } var objvalue = await deserializecache(cachekey, cachevalue, returntype); //缓存值不可用 if (objvalue == null) { await context.invoke(next); return; } context.returnvalue = resultfactory(objvalue, returntype, context.isasync()); } catch (exception) { if (context.returnvalue == null) { await context.invoke(next); } } } private async task<string> getcacheasync(string cachekey) { string cachevalue = null; try { cachevalue = await database.stringgetasync(cachekey); } catch (exception) { return null; } return cachevalue; } private async task<object> runandgetreturn(aspectcontext context, aspectdelegate next) { await context.invoke(next); return context.isasync() ? await context.unwrapasyncreturnvalue() : context.returnvalue; } private async task setcache(string cachekey, object cachevalue, int expiration) { string jsonvalue = jsonconvert.serializeobject(cachevalue); await database.stringsetasync(cachekey, jsonvalue, timespan.fromseconds(expiration)); } private async task remove(string cachekey) { await database.keydeleteasync(cachekey); } private async task<object> deserializecache(string cachekey, string cachevalue, type returntype) { try { return jsonconvert.deserializeobject(cachevalue, returntype); } catch (exception) { await remove(cachekey); return null; } } private object createdefaultresult(type returntype) { return activator.createinstance(returntype); } private object resultfactory(object result, type returntype, bool isasync) { if (isasync) { return typeoftaskresultmethod .getoradd(returntype, t => typeof(task) .getmethods() .first(p => p.name == "fromresult" && p.containsgenericparameters) .makegenericmethod(returntype)) .invoke(null, new object[] { result }); } else { return result; } } }
- 注册拦截器
在aspectcore中注册cacheableinterceptor拦截器,这里直接注册了用于测试的demoservice, 在正式项目中,打算用反射注册需要用到缓存的service或者method。
public static class aspectcoreextensions { public static void configaspectcore(this iservicecollection services) { services.configuredynamicproxy(config => { config.interceptors.addtyped<cacheableinterceptor>(predicates.implement(typeof(demoservice))); }); services.buildaspectinjectorprovider(); } }
测试缓存功能
- 在需要缓存的接口/方法上标注attribute
[cacheable(cachekeyprefix = "test", expiration = 30, onceupdate = true)] public virtual datetimemodel gettime() { return new datetimemodel { id = gethashcode(), time = datetime.now }; }
- 测试结果截图
请求接口,返回时间,并将返回结果缓存到redis中,保留300秒后过期。