这次的目标是实现通过标注attribute实现缓存的功能,精简代码,减少缓存的代码侵入业务代码。

缓存内容即为service查询汇总的内容,不做其他高大上的功能,提升短时间多次查询的响应速度,适当减轻数据库压力。

在做之前,也去看了easycaching的源码,这次的想法也是源于这里,aop的方式让代码减少耦合,但是缓存策略有限。经过考虑决定,自己实现类似功能,在之后的应用中也方便对缓存策略的扩展。

本文内容也许有点不严谨的地方,仅供参考。同样欢迎各位路过的大佬提出建议。

在项目中加入aspectcore

之前有做aspectcore的总结,相关内容就不再赘述了。

  • asp.net core 3.0 使用aspectcore-framework实现aop
  • github:相关代码

在项目中加入stackexchange.redis

在stackexchange.redis和csredis中纠结了很久,也没有一个特别的有优势,最终选择了stackexchange.redis,没有理由。至于连接超时的问题,可以用异步解决。

  • 安装stackexchange.redis
install-package stackexchange.redis -version 2.0.601
  • 在appsettings.json配置redis连接信息
{
	"redis": {
		"default": {
			"connection": "127.0.0.1:6379",
			"instancename": "rediscache:",
			"defaultdb": 0
		}
	}
}
  • redisclient

用于连接redis服务器,包括创建连接,获取数据库等操作

public class redisclient : idisposable
{
	private string _connectionstring;
	private string _instancename;
	private int _defaultdb;
	private concurrentdictionary<string, connectionmultiplexer> _connections;
	public redisclient(string connectionstring, string instancename, int defaultdb = 0)
	{
		_connectionstring = connectionstring;
		_instancename = instancename;
		_defaultdb = defaultdb;
		_connections = new concurrentdictionary<string, connectionmultiplexer>();
	}

	private connectionmultiplexer getconnect()
	{
		return _connections.getoradd(_instancename, p => connectionmultiplexer.connect(_connectionstring));
	}

	public idatabase getdatabase()
	{
		return getconnect().getdatabase(_defaultdb);
	}

	public iserver getserver(string configname = null, int endpointsindex = 0)
	{
		var confoption = configurationoptions.parse(_connectionstring);
		return getconnect().getserver(confoption.endpoints[endpointsindex]);
	}

	public isubscriber getsubscriber(string configname = null)
	{
		return getconnect().getsubscriber();
	}

	public void dispose()
	{
		if (_connections != null && _connections.count > 0)
		{
			foreach (var item in _connections.values)
			{
				item.close();
			}
		}
	}
}
  • 注册服务

redis是单线程的服务,多几个redisclient的实例也是无济于事,所以依赖注入就采用singleton的方式。

public static class redisextensions
{
	public static void configredis(this iservicecollection services, iconfiguration configuration)
	{
		var section = configuration.getsection("redis:default");
		string _connectionstring = section.getsection("connection").value;
		string _instancename = section.getsection("instancename").value;
		int _defaultdb = int.parse(section.getsection("defaultdb").value ?? "0");
		services.addsingleton(new redisclient(_connectionstring, _instancename, _defaultdb));
	}
}

public class startup
{
	public void configureservices(iservicecollection services)
	{
		services.configredis(configuration);
	}
}
  • keygenerator

创建一个缓存key的生成器,以attribute中的cachekeyprefix作为前缀,之后可以扩展批量删除的功能。被拦截方法的方法名和入参也同样作为key的一部分,保证key值不重复。

public static class keygenerator
{
	public static string getcachekey(methodinfo methodinfo, object[] args, string prefix)
	{
		stringbuilder cachekey = new stringbuilder();
		cachekey.append($"{prefix}_");
		cachekey.append(methodinfo.declaringtype.name).append($"_{methodinfo.name}");
		foreach (var item in args)
		{
			cachekey.append($"_{item}");
		}
		return cachekey.tostring();
	}

	public static string getcachekeyprefix(methodinfo methodinfo, string prefix)
	{
		stringbuilder cachekey = new stringbuilder();
		cachekey.append(prefix);
        cachekey.append($"_{methodinfo.declaringtype.name}").append($"_{methodinfo.name}");
		return cachekey.tostring();
	}
}

写一套缓存拦截器

  • cacheableattribute

attribute中保存缓存的策略信息,包括过期时间,key值前缀等信息,在使用缓存时可以对这些选项值进行配置。

public class cacheableattribute : attribute
{
	/// <summary>
	/// 过期时间(秒)
	/// </summary>
	public int expiration { get; set; } = 300;

	/// <summary>
	/// key值前缀
	/// </summary>
	public string cachekeyprefix { get; set; } = string.empty;

	/// <summary>
	/// 是否高可用(异常时执行原方法)
	/// </summary>
	public bool ishighavailability { get; set; } = true;

	/// <summary>
	/// 只允许一个线程更新缓存(带锁)
	/// </summary>
	public bool onceupdate { get; set; } = false;
}
  • cacheableinterceptor

接下来就是重头戏,拦截器中的逻辑就相对于缓存的相关策略,不用的策略可以分成不同的拦截器。 这里的逻辑参考了easycaching的源码,并加入了redis分布式锁的应用。

public class cacheableinterceptor : abstractinterceptor
{
	[fromcontainer]
	private redisclient redisclient { get; set; }

	private idatabase database;

	private static readonly concurrentdictionary<type, methodinfo> typeoftaskresultmethod = new concurrentdictionary<type, methodinfo>();

	public async override task invoke(aspectcontext context, aspectdelegate next)
	{
		cacheableattribute attribute = context.getattribute<cacheableattribute>();

		if (attribute == null)
		{
			await context.invoke(next);
			return;
		}

		try
		{
			database = redisclient.getdatabase();

			string cachekey = keygenerator.getcachekey(context.servicemethod, context.parameters, attribute.cachekeyprefix);

			string cachevalue = await getcacheasync(cachekey);

			type returntype = context.getreturntype();

			if (string.isnullorwhitespace(cachevalue))
			{
				if (attribute.onceupdate)
				{
					string lockkey = $"lock_{cachekey}";
					redisvalue token = environment.machinename;

					if (await database.locktakeasync(lockkey, token, timespan.fromseconds(10)))
					{
						try
						{
							var result = await runandgetreturn(context, next);
							await setcache(cachekey, result, attribute.expiration);
							return;
						}
						finally
						{
							await database.lockreleaseasync(lockkey, token);
						}
					}
					else
					{
						for (int i = 0; i < 5; i++)
						{
							thread.sleep(i * 100 + 500);
							cachevalue = await getcacheasync(cachekey);
							if (!string.isnullorwhitespace(cachevalue))
							{
								break;
							}
						}
						if (string.isnullorwhitespace(cachevalue))
						{
							var defaultvalue = createdefaultresult(returntype);
							context.returnvalue = resultfactory(defaultvalue, returntype, context.isasync());
							return;
						}
					}
				}
				else
				{
					var result = await runandgetreturn(context, next);
					await setcache(cachekey, result, attribute.expiration);
					return;
				}
			}
			var objvalue = await deserializecache(cachekey, cachevalue, returntype);
			//缓存值不可用
			if (objvalue == null)
			{
				await context.invoke(next);
				return;
			}
				context.returnvalue = resultfactory(objvalue, returntype, context.isasync());
		}
		catch (exception)
		{
			if (context.returnvalue == null)
			{
				await context.invoke(next);
			}
		}
	}

	private async task<string> getcacheasync(string cachekey)
	{
		string cachevalue = null;
		try
		{
			cachevalue = await database.stringgetasync(cachekey);
		}
		catch (exception)
		{
			return null;
		}
		return cachevalue;
	}

	private async task<object> runandgetreturn(aspectcontext context, aspectdelegate next)
	{
		await context.invoke(next);
		return context.isasync()
		? await context.unwrapasyncreturnvalue()
		: context.returnvalue;
	}

	private async task setcache(string cachekey, object cachevalue, int expiration)
	{
		string jsonvalue = jsonconvert.serializeobject(cachevalue);
		await database.stringsetasync(cachekey, jsonvalue, timespan.fromseconds(expiration));
	}

	private async task remove(string cachekey)
	{
		await database.keydeleteasync(cachekey);
	}

	private async task<object> deserializecache(string cachekey, string cachevalue, type returntype)
	{
		try
		{
			return jsonconvert.deserializeobject(cachevalue, returntype);
		}
		catch (exception)
		{
			await remove(cachekey);
			return null;
		}
	}

	private object createdefaultresult(type returntype)
	{
		return activator.createinstance(returntype);
	}

	private object resultfactory(object result, type returntype, bool isasync)
	{
		if (isasync)
		{
			return typeoftaskresultmethod
				.getoradd(returntype, t => typeof(task)
				.getmethods()
				.first(p => p.name == "fromresult" && p.containsgenericparameters)
				.makegenericmethod(returntype))
				.invoke(null, new object[] { result });
		}
		else
		{
			return result;
		}
	}
}
  • 注册拦截器

在aspectcore中注册cacheableinterceptor拦截器,这里直接注册了用于测试的demoservice, 在正式项目中,打算用反射注册需要用到缓存的service或者method。

public static class aspectcoreextensions
{
	public static void configaspectcore(this iservicecollection services)
	{
		services.configuredynamicproxy(config =>
		{
			config.interceptors.addtyped<cacheableinterceptor>(predicates.implement(typeof(demoservice)));
		});
		services.buildaspectinjectorprovider();
	}
}

测试缓存功能

  • 在需要缓存的接口/方法上标注attribute
[cacheable(cachekeyprefix = "test", expiration = 30, onceupdate = true)]
public virtual datetimemodel gettime()
{
    return new datetimemodel
	{
	    id = gethashcode(),
		time = datetime.now
    };
}
  • 测试结果截图

请求接口,返回时间,并将返回结果缓存到redis中,保留300秒后过期。

相关链接

  • github:本文代码
  • github:easycaching
  • 官方文档:easycaching