欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

.netcore实现一个读写分离的数据库访问中间件

程序员文章站 2022-04-29 13:41:41
在实际业务系统中,当单个数据库不能承载负载压力的时候,一般我们采用数据库读写分离的方式来分担数据库负载。主库承担写以及事务操作,从库承担读操作。 为了支持多种数据库我们先定义一个数据类型字典。key为连接字符串,value为数据库类型: /// /// 数据库方言集合 /// < ......

在实际业务系统中,当单个数据库不能承载负载压力的时候,一般我们采用数据库读写分离的方式来分担数据库负载。主库承担写以及事务操作,从库承担读操作。

为了支持多种数据库我们先定义一个数据类型字典。key为连接字符串,value为数据库类型:

        /// <summary>
        /// 数据库方言集合
        /// </summary>
        private readonly dictionary<string, databasedialectenum> dialectdictionary
          = new dictionary<string, databasedialectenum>
          {
              ["sqlconnection"] = databasedialectenum.mssql,
              ["sqlceconnection"] = databasedialectenum.sqlce,
              ["npgsqlconnection"] = databasedialectenum.postgres,
              ["sqliteconnection"] = databasedialectenum.sqllite,
              ["mysqlconnection"] = databasedialectenum.mysql,
              ["fbconnection"] = databasedialectenum.firebase
          };

这样我们切换不同的数据库只需要配置数据库连接字符串即可。

以mssql为例,配置数据库连接字符串

  "connectionstring": {
    "sqlconnection": "data source=.;initial catalog=db;user id=sa;password=**;enlist=false;max pool size=500;min pool size=50;multipleactiveresultsets=true",
    "sqlconnection_slaver_1": "data source=.;initial catalog=db;user id=sa;password=**;enlist=false;max pool size=500;min pool size=50;multipleactiveresultsets=true",
    "sqlconnection_slaver_2": "data source=.;initial catalog=db;user id=sa;password=**;enlist=false;max pool size=500;min pool size=50;multipleactiveresultsets=true"
  }
key: sqlconnection为主库(master)连接字符串,key: sqlconnection_slaver_1和sqlconnection_slaver_2为两个从库(slaver)连接字符串。多个从库(slaver)可以实现随机访问。也可以采用其他算法来负载均衡。

根据字符串连接配置我们得到 主库 连接串,和从库连接串集合。同时根据连接串的key 确定数据库种类。代码如下:

        /// <summary>
        /// 主数据库连接串
        /// </summary>
        private string masterconnectionstring { get; set; }
        /// <summary>
        /// 从数据库连接串集合
        /// </summary>
        private list<string> slaverconnectionstrings { get; set; } = new list<string>();
        public connectionfactory(iconfiguration configuration, iloggerfactory loggerfactory)
        {
            _logger = loggerfactory.createlogger<connectionfactory>();
            var connectionkeys = configuration.getsection("connectionstring").getchildren().select(s => s.key).toarray();
            foreach (var connkey in connectionkeys)
            {
                var connsplit = connkey.split('_');
                if (connsplit.length == 1)
                {
                    masterconnectionstring = configuration[$"connectionstring:{connkey}"];
            //根据连接字符串约定,确定数据库类型 databasedialect = dialectdictionary[connkey]; } else { slaverconnectionstrings.add(configuration[$"connectionstring:{connkey}"]); } } }
        /// <summary>
        /// 数据库类型
        /// </summary>
        public databasedialectenum databasedialect { get; private set; }

获取主库连接

        private idbconnection getmasterconnection()
        {
            return getconnection(masterconnectionstring);
        }

获取从库连接,这里采用随机算法,如果没有配置从库,这里会返回主库连接。

        private idbconnection getslaverconnection()
        {
            int sc = slaverconnectionstrings.count();
            if (sc > 0)
            {
                random random = new random();
                int index = random.next(0, sc);
                return getconnection(slaverconnectionstrings[index]);
            }
            else
            {
                _logger.loginformation("没有设置从库,将建立主库连接");
                return getmasterconnection();
            }
        }    
        private idbconnection getconnection(string connectionstring) => databasedialect switch
        {
            databasedialectenum.mssql =>new profileddbconnection(new sqlconnection(connectionstring),miniprofiler.current),
            databasedialectenum.mysql => new profileddbconnection(new mysqlconnection(connectionstring), miniprofiler.current),
            _ => throw new notimplementedexception()
        };
注:这里采用miniprofiler来监控数据库连接性能,所以 返回的connection用profileddbconnection进行了包装。

主从数据源类型如下:

    public enum datasourceenum
    {
        master,
        slave
    }

本connectionfactory为单例模式,存在多线程访问的情况,所以数据源设置为threadlocal<datasourceenum>,线程内共享。

private static threadlocal<datasourceenum> threadlocal = new threadlocal<datasourceenum>();
        /// <summary>
        /// 当前线程数据源 
        /// </summary>
        /// <param name="sourceenum"></param>     
        public datasourceenum datasource
        {
            set { threadlocal.value = value; }
            get { return threadlocal.value; }
        }

下面正式获取idbconnection

        public idbconnection getdbconnection()
        {
            if (datasource == datasourceenum.master)
            {
                return getmasterconnection();
            }
            else
            {
                return getslaverconnection();
            }
        }

使用:

根据文章开头所描述的实际操作来进行主从库访问。

        private idbconnection getdbconnection(datasourceenum datasource)
        {
            connectionfactory.datasource = datasource;
            return connectionfactory.getdbconnection();
        }
using var connection = getdbconnection(datasourceenum.master);
 connection.execute(sql, param, currenttransaction, null, commandtype)
 using var connection = getdbconnection(datasourceenum.slave);
 connection.get<t>(id, currenttransaction, commandtimeout)

 

奉上全部代码

    public class connectionfactory : iconnectionfactory
    {
        private readonly ilogger _logger;
        private static threadlocal<datasourceenum> threadlocal = new threadlocal<datasourceenum>();
        static connectionfactory()
        {
            //设置dapper的tablename取值
            sqlmapperextensions.tablenamemapper = (type) => type.name;
        } 

        /// <summary>
        /// 当前线程数据源 
        /// </summary>
        /// <param name="sourceenum"></param>     
        public datasourceenum datasource
        {
            set { threadlocal.value = value; }
            get { return threadlocal.value; }
        }

        /// <summary>
        /// 主数据库连接串
        /// </summary>
        private string masterconnectionstring { get; set; }
        /// <summary>
        /// 从数据库连接串集合
        /// </summary>
        private list<string> slaverconnectionstrings { get; set; } = new list<string>();
        public connectionfactory(iconfiguration configuration, iloggerfactory loggerfactory)
        {
            _logger = loggerfactory.createlogger<connectionfactory>();
            var connectionkeys = configuration.getsection("connectionstring").getchildren().select(s => s.key).toarray();
            foreach (var connkey in connectionkeys)
            {
                var connsplit = connkey.split('_');
                if (connsplit.length == 1)
                {
                    masterconnectionstring = configuration[$"connectionstring:{connkey}"];
                    databasedialect = dialectdictionary[connkey];
                }
                else
                {
                    slaverconnectionstrings.add(configuration[$"connectionstring:{connkey}"]);
                }

            }
        }
        /// <summary>
        /// 数据库方言集合
        /// </summary>
        private readonly dictionary<string, databasedialectenum> dialectdictionary
          = new dictionary<string, databasedialectenum>
          {
              ["sqlconnection"] = databasedialectenum.mssql,
              ["sqlceconnection"] = databasedialectenum.sqlce,
              ["npgsqlconnection"] = databasedialectenum.postgres,
              ["sqliteconnection"] = databasedialectenum.sqllite,
              ["mysqlconnection"] = databasedialectenum.mysql,
              ["fbconnection"] = databasedialectenum.firebase
          };
        /// <summary>
        /// 数据库方言
        /// </summary>
        public databasedialectenum databasedialect { get; private set; }

        private idbconnection getconnection(string connectionstring) => databasedialect switch
        {
            databasedialectenum.mssql =>new profileddbconnection(new sqlconnection(connectionstring),miniprofiler.current),
            databasedialectenum.mysql => new profileddbconnection(new mysqlconnection(connectionstring), miniprofiler.current),
            _ => throw new notimplementedexception()
        };
        public idbconnection getdbconnection()
        {
            if (datasource == datasourceenum.master)
            {
                return getmasterconnection();
            }
            else
            {
                return getslaverconnection();
            }
        }
        private idbconnection getmasterconnection()
        {
            return getconnection(masterconnectionstring);
        }
        private idbconnection getslaverconnection()
        {
            int sc = slaverconnectionstrings.count();
            if (sc > 0)
            {
                random random = new random();
                int index = random.next(0, sc);
                return getconnection(slaverconnectionstrings[index]);
            }
            else
            {
                _logger.loginformation("没有设置从库,将从建立主库连接");
                return getmasterconnection();
            }
        }    
    }

    public enum datasourceenum
    {
        master,
        slave
    }