.netcore实现一个读写分离的数据库访问中间件
程序员文章站
2022-04-29 13:41:41
在实际业务系统中,当单个数据库不能承载负载压力的时候,一般我们采用数据库读写分离的方式来分担数据库负载。主库承担写以及事务操作,从库承担读操作。 为了支持多种数据库我们先定义一个数据类型字典。key为连接字符串,value为数据库类型: /// /// 数据库方言集合 /// < ......
在实际业务系统中,当单个数据库不能承载负载压力的时候,一般我们采用数据库读写分离的方式来分担数据库负载。主库承担写以及事务操作,从库承担读操作。
为了支持多种数据库我们先定义一个数据类型字典。key为连接字符串,value为数据库类型:
/// <summary> /// 数据库方言集合 /// </summary> private readonly dictionary<string, databasedialectenum> dialectdictionary = new dictionary<string, databasedialectenum> { ["sqlconnection"] = databasedialectenum.mssql, ["sqlceconnection"] = databasedialectenum.sqlce, ["npgsqlconnection"] = databasedialectenum.postgres, ["sqliteconnection"] = databasedialectenum.sqllite, ["mysqlconnection"] = databasedialectenum.mysql, ["fbconnection"] = databasedialectenum.firebase };
这样我们切换不同的数据库只需要配置数据库连接字符串即可。
以mssql为例,配置数据库连接字符串
"connectionstring": { "sqlconnection": "data source=.;initial catalog=db;user id=sa;password=**;enlist=false;max pool size=500;min pool size=50;multipleactiveresultsets=true", "sqlconnection_slaver_1": "data source=.;initial catalog=db;user id=sa;password=**;enlist=false;max pool size=500;min pool size=50;multipleactiveresultsets=true", "sqlconnection_slaver_2": "data source=.;initial catalog=db;user id=sa;password=**;enlist=false;max pool size=500;min pool size=50;multipleactiveresultsets=true" }
key: sqlconnection为主库(master)连接字符串,key: sqlconnection_slaver_1和sqlconnection_slaver_2为两个从库(slaver)连接字符串。多个从库(slaver)可以实现随机访问。也可以采用其他算法来负载均衡。
根据字符串连接配置我们得到 主库 连接串,和从库连接串集合。同时根据连接串的key 确定数据库种类。代码如下:
/// <summary> /// 主数据库连接串 /// </summary> private string masterconnectionstring { get; set; } /// <summary> /// 从数据库连接串集合 /// </summary> private list<string> slaverconnectionstrings { get; set; } = new list<string>(); public connectionfactory(iconfiguration configuration, iloggerfactory loggerfactory) { _logger = loggerfactory.createlogger<connectionfactory>(); var connectionkeys = configuration.getsection("connectionstring").getchildren().select(s => s.key).toarray(); foreach (var connkey in connectionkeys) { var connsplit = connkey.split('_'); if (connsplit.length == 1) { masterconnectionstring = configuration[$"connectionstring:{connkey}"];
//根据连接字符串约定,确定数据库类型 databasedialect = dialectdictionary[connkey]; } else { slaverconnectionstrings.add(configuration[$"connectionstring:{connkey}"]); } } }
/// <summary> /// 数据库类型 /// </summary> public databasedialectenum databasedialect { get; private set; }
获取主库连接
private idbconnection getmasterconnection() { return getconnection(masterconnectionstring); }
获取从库连接,这里采用随机算法,如果没有配置从库,这里会返回主库连接。
private idbconnection getslaverconnection() { int sc = slaverconnectionstrings.count(); if (sc > 0) { random random = new random(); int index = random.next(0, sc); return getconnection(slaverconnectionstrings[index]); } else { _logger.loginformation("没有设置从库,将建立主库连接"); return getmasterconnection(); } }
private idbconnection getconnection(string connectionstring) => databasedialect switch { databasedialectenum.mssql =>new profileddbconnection(new sqlconnection(connectionstring),miniprofiler.current), databasedialectenum.mysql => new profileddbconnection(new mysqlconnection(connectionstring), miniprofiler.current), _ => throw new notimplementedexception() };
注:这里采用miniprofiler来监控数据库连接性能,所以 返回的connection用profileddbconnection进行了包装。
主从数据源类型如下:
public enum datasourceenum { master, slave }
本connectionfactory为单例模式,存在多线程访问的情况,所以数据源设置为threadlocal<datasourceenum>,线程内共享。
private static threadlocal<datasourceenum> threadlocal = new threadlocal<datasourceenum>(); /// <summary> /// 当前线程数据源 /// </summary> /// <param name="sourceenum"></param> public datasourceenum datasource { set { threadlocal.value = value; } get { return threadlocal.value; } }
下面正式获取idbconnection
public idbconnection getdbconnection() { if (datasource == datasourceenum.master) { return getmasterconnection(); } else { return getslaverconnection(); } }
使用:
根据文章开头所描述的实际操作来进行主从库访问。
private idbconnection getdbconnection(datasourceenum datasource) { connectionfactory.datasource = datasource; return connectionfactory.getdbconnection(); }
using var connection = getdbconnection(datasourceenum.master); connection.execute(sql, param, currenttransaction, null, commandtype)
using var connection = getdbconnection(datasourceenum.slave); connection.get<t>(id, currenttransaction, commandtimeout)
奉上全部代码
public class connectionfactory : iconnectionfactory { private readonly ilogger _logger; private static threadlocal<datasourceenum> threadlocal = new threadlocal<datasourceenum>(); static connectionfactory() { //设置dapper的tablename取值 sqlmapperextensions.tablenamemapper = (type) => type.name; } /// <summary> /// 当前线程数据源 /// </summary> /// <param name="sourceenum"></param> public datasourceenum datasource { set { threadlocal.value = value; } get { return threadlocal.value; } } /// <summary> /// 主数据库连接串 /// </summary> private string masterconnectionstring { get; set; } /// <summary> /// 从数据库连接串集合 /// </summary> private list<string> slaverconnectionstrings { get; set; } = new list<string>(); public connectionfactory(iconfiguration configuration, iloggerfactory loggerfactory) { _logger = loggerfactory.createlogger<connectionfactory>(); var connectionkeys = configuration.getsection("connectionstring").getchildren().select(s => s.key).toarray(); foreach (var connkey in connectionkeys) { var connsplit = connkey.split('_'); if (connsplit.length == 1) { masterconnectionstring = configuration[$"connectionstring:{connkey}"]; databasedialect = dialectdictionary[connkey]; } else { slaverconnectionstrings.add(configuration[$"connectionstring:{connkey}"]); } } } /// <summary> /// 数据库方言集合 /// </summary> private readonly dictionary<string, databasedialectenum> dialectdictionary = new dictionary<string, databasedialectenum> { ["sqlconnection"] = databasedialectenum.mssql, ["sqlceconnection"] = databasedialectenum.sqlce, ["npgsqlconnection"] = databasedialectenum.postgres, ["sqliteconnection"] = databasedialectenum.sqllite, ["mysqlconnection"] = databasedialectenum.mysql, ["fbconnection"] = databasedialectenum.firebase }; /// <summary> /// 数据库方言 /// </summary> public databasedialectenum databasedialect { get; private set; } private idbconnection getconnection(string connectionstring) => databasedialect switch { databasedialectenum.mssql =>new profileddbconnection(new sqlconnection(connectionstring),miniprofiler.current), databasedialectenum.mysql => new profileddbconnection(new mysqlconnection(connectionstring), miniprofiler.current), _ => throw new notimplementedexception() }; public idbconnection getdbconnection() { if (datasource == datasourceenum.master) { return getmasterconnection(); } else { return getslaverconnection(); } } private idbconnection getmasterconnection() { return getconnection(masterconnectionstring); } private idbconnection getslaverconnection() { int sc = slaverconnectionstrings.count(); if (sc > 0) { random random = new random(); int index = random.next(0, sc); return getconnection(slaverconnectionstrings[index]); } else { _logger.loginformation("没有设置从库,将从建立主库连接"); return getmasterconnection(); } } } public enum datasourceenum { master, slave }
上一篇: 二货办事你大可放心的笑吧...
下一篇: 当前互联网已经分配的端口和服务