欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用AspectCore实现AOP模式的Redis缓存

程序员文章站 2022-03-30 16:09:01
这次的目标是实现通过标注Attribute实现缓存的功能,精简代码,减少缓存的代码侵入业务代码。 缓存内容即为Service查询汇总的内容,不做其他高大上的功能,提升短时间多次查询的响应速度,适当减轻数据库压力。 在做之前,也去看了EasyCaching的源码,这次的想法也是源于这里,AOP的方式让 ......

这次的目标是实现通过标注attribute实现缓存的功能,精简代码,减少缓存的代码侵入业务代码。

缓存内容即为service查询汇总的内容,不做其他高大上的功能,提升短时间多次查询的响应速度,适当减轻数据库压力。

在做之前,也去看了easycaching的源码,这次的想法也是源于这里,aop的方式让代码减少耦合,但是缓存策略有限。经过考虑决定,自己实现类似功能,在之后的应用中也方便对缓存策略的扩展。

本文内容也许有点不严谨的地方,仅供参考。同样欢迎各位路过的大佬提出建议。

在项目中加入aspectcore

之前有做aspectcore的总结,相关内容就不再赘述了。

在项目中加入stackexchange.redis

在stackexchange.redis和csredis中纠结了很久,也没有一个特别的有优势,最终选择了stackexchange.redis,没有理由。至于连接超时的问题,可以用异步解决。

  • 安装stackexchange.redis
install-package stackexchange.redis -version 2.0.601
  • 在appsettings.json配置redis连接信息
{
	"redis": {
		"default": {
			"connection": "127.0.0.1:6379",
			"instancename": "rediscache:",
			"defaultdb": 0
		}
	}
}
  • redisclient

用于连接redis服务器,包括创建连接,获取数据库等操作

public class redisclient : idisposable
{
	private string _connectionstring;
	private string _instancename;
	private int _defaultdb;
	private concurrentdictionary<string, connectionmultiplexer> _connections;
	public redisclient(string connectionstring, string instancename, int defaultdb = 0)
	{
		_connectionstring = connectionstring;
		_instancename = instancename;
		_defaultdb = defaultdb;
		_connections = new concurrentdictionary<string, connectionmultiplexer>();
	}

	private connectionmultiplexer getconnect()
	{
		return _connections.getoradd(_instancename, p => connectionmultiplexer.connect(_connectionstring));
	}

	public idatabase getdatabase()
	{
		return getconnect().getdatabase(_defaultdb);
	}

	public iserver getserver(string configname = null, int endpointsindex = 0)
	{
		var confoption = configurationoptions.parse(_connectionstring);
		return getconnect().getserver(confoption.endpoints[endpointsindex]);
	}

	public isubscriber getsubscriber(string configname = null)
	{
		return getconnect().getsubscriber();
	}

	public void dispose()
	{
		if (_connections != null && _connections.count > 0)
		{
			foreach (var item in _connections.values)
			{
				item.close();
			}
		}
	}
}
  • 注册服务

redis是单线程的服务,多几个redisclient的实例也是无济于事,所以依赖注入就采用singleton的方式。

public static class redisextensions
{
	public static void configredis(this iservicecollection services, iconfiguration configuration)
	{
		var section = configuration.getsection("redis:default");
		string _connectionstring = section.getsection("connection").value;
		string _instancename = section.getsection("instancename").value;
		int _defaultdb = int.parse(section.getsection("defaultdb").value ?? "0");
		services.addsingleton(new redisclient(_connectionstring, _instancename, _defaultdb));
	}
}

public class startup
{
	public void configureservices(iservicecollection services)
	{
		services.configredis(configuration);
	}
}
  • keygenerator

创建一个缓存key的生成器,以attribute中的cachekeyprefix作为前缀,之后可以扩展批量删除的功能。被拦截方法的方法名和入参也同样作为key的一部分,保证key值不重复。

public static class keygenerator
{
	public static string getcachekey(methodinfo methodinfo, object[] args, string prefix)
	{
		stringbuilder cachekey = new stringbuilder();
		cachekey.append($"{prefix}_");
		cachekey.append(methodinfo.declaringtype.name).append($"_{methodinfo.name}");
		foreach (var item in args)
		{
			cachekey.append($"_{item}");
		}
		return cachekey.tostring();
	}

	public static string getcachekeyprefix(methodinfo methodinfo, string prefix)
	{
		stringbuilder cachekey = new stringbuilder();
		cachekey.append(prefix);
        cachekey.append($"_{methodinfo.declaringtype.name}").append($"_{methodinfo.name}");
		return cachekey.tostring();
	}
}

写一套缓存拦截器

  • cacheableattribute

attribute中保存缓存的策略信息,包括过期时间,key值前缀等信息,在使用缓存时可以对这些选项值进行配置。

public class cacheableattribute : attribute
{
	/// <summary>
	/// 过期时间(秒)
	/// </summary>
	public int expiration { get; set; } = 300;

	/// <summary>
	/// key值前缀
	/// </summary>
	public string cachekeyprefix { get; set; } = string.empty;

	/// <summary>
	/// 是否高可用(异常时执行原方法)
	/// </summary>
	public bool ishighavailability { get; set; } = true;

	/// <summary>
	/// 只允许一个线程更新缓存(带锁)
	/// </summary>
	public bool onceupdate { get; set; } = false;
}
  • cacheableinterceptor

接下来就是重头戏,拦截器中的逻辑就相对于缓存的相关策略,不用的策略可以分成不同的拦截器。 这里的逻辑参考了easycaching的源码,并加入了redis分布式锁的应用。

public class cacheableinterceptor : abstractinterceptor
{
	[fromcontainer]
	private redisclient redisclient { get; set; }

	private idatabase database;

	private static readonly concurrentdictionary<type, methodinfo> typeoftaskresultmethod = new concurrentdictionary<type, methodinfo>();

	public async override task invoke(aspectcontext context, aspectdelegate next)
	{
		cacheableattribute attribute = context.getattribute<cacheableattribute>();

		if (attribute == null)
		{
			await context.invoke(next);
			return;
		}

		try
		{
			database = redisclient.getdatabase();

			string cachekey = keygenerator.getcachekey(context.servicemethod, context.parameters, attribute.cachekeyprefix);

			string cachevalue = await getcacheasync(cachekey);

			type returntype = context.getreturntype();

			if (string.isnullorwhitespace(cachevalue))
			{
				if (attribute.onceupdate)
				{
					string lockkey = $"lock_{cachekey}";
					redisvalue token = environment.machinename;

					if (await database.locktakeasync(lockkey, token, timespan.fromseconds(10)))
					{
						try
						{
							var result = await runandgetreturn(context, next);
							await setcache(cachekey, result, attribute.expiration);
							return;
						}
						finally
						{
							await database.lockreleaseasync(lockkey, token);
						}
					}
					else
					{
						for (int i = 0; i < 5; i++)
						{
							thread.sleep(i * 100 + 500);
							cachevalue = await getcacheasync(cachekey);
							if (!string.isnullorwhitespace(cachevalue))
							{
								break;
							}
						}
						if (string.isnullorwhitespace(cachevalue))
						{
							var defaultvalue = createdefaultresult(returntype);
							context.returnvalue = resultfactory(defaultvalue, returntype, context.isasync());
							return;
						}
					}
				}
				else
				{
					var result = await runandgetreturn(context, next);
					await setcache(cachekey, result, attribute.expiration);
					return;
				}
			}
			var objvalue = await deserializecache(cachekey, cachevalue, returntype);
			//缓存值不可用
			if (objvalue == null)
			{
				await context.invoke(next);
				return;
			}
				context.returnvalue = resultfactory(objvalue, returntype, context.isasync());
		}
		catch (exception)
		{
			if (context.returnvalue == null)
			{
				await context.invoke(next);
			}
		}
	}

	private async task<string> getcacheasync(string cachekey)
	{
		string cachevalue = null;
		try
		{
			cachevalue = await database.stringgetasync(cachekey);
		}
		catch (exception)
		{
			return null;
		}
		return cachevalue;
	}

	private async task<object> runandgetreturn(aspectcontext context, aspectdelegate next)
	{
		await context.invoke(next);
		return context.isasync()
		? await context.unwrapasyncreturnvalue()
		: context.returnvalue;
	}

	private async task setcache(string cachekey, object cachevalue, int expiration)
	{
		string jsonvalue = jsonconvert.serializeobject(cachevalue);
		await database.stringsetasync(cachekey, jsonvalue, timespan.fromseconds(expiration));
	}

	private async task remove(string cachekey)
	{
		await database.keydeleteasync(cachekey);
	}

	private async task<object> deserializecache(string cachekey, string cachevalue, type returntype)
	{
		try
		{
			return jsonconvert.deserializeobject(cachevalue, returntype);
		}
		catch (exception)
		{
			await remove(cachekey);
			return null;
		}
	}

	private object createdefaultresult(type returntype)
	{
		return activator.createinstance(returntype);
	}

	private object resultfactory(object result, type returntype, bool isasync)
	{
		if (isasync)
		{
			return typeoftaskresultmethod
				.getoradd(returntype, t => typeof(task)
				.getmethods()
				.first(p => p.name == "fromresult" && p.containsgenericparameters)
				.makegenericmethod(returntype))
				.invoke(null, new object[] { result });
		}
		else
		{
			return result;
		}
	}
}
  • 注册拦截器

在aspectcore中注册cacheableinterceptor拦截器,这里直接注册了用于测试的demoservice, 在正式项目中,打算用反射注册需要用到缓存的service或者method。

public static class aspectcoreextensions
{
	public static void configaspectcore(this iservicecollection services)
	{
		services.configuredynamicproxy(config =>
		{
			config.interceptors.addtyped<cacheableinterceptor>(predicates.implement(typeof(demoservice)));
		});
		services.buildaspectinjectorprovider();
	}
}

测试缓存功能

  • 在需要缓存的接口/方法上标注attribute
[cacheable(cachekeyprefix = "test", expiration = 30, onceupdate = true)]
public virtual datetimemodel gettime()
{
    return new datetimemodel
	{
	    id = gethashcode(),
		time = datetime.now
    };
}
  • 测试结果截图

请求接口,返回时间,并将返回结果缓存到redis中,保留300秒后过期。 使用AspectCore实现AOP模式的Redis缓存

相关链接