欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

EF Core 实现读写分离的最佳方案

程序员文章站 2022-10-24 09:22:41
前言 公司之前使用Ado.net和Dapper进行数据访问层的操作, 进行读写分离也比较简单, 只要使用对应的数据库连接字符串即可. 而最近要迁移到新系统中,新系统使用.net core和EF Core进行数据访问. 所以趁着国庆假期拿出一两天时间研究了一下如何EF Core进行读写分离. 思路 根 ......

前言

公司之前使用ado.net和dapper进行数据访问层的操作, 进行读写分离也比较简单, 只要使用对应的数据库连接字符串即可. 而最近要迁移到新系统中,新系统使用.net core和ef core进行数据访问. 所以趁着国庆假期拿出一两天时间研究了一下如何ef core进行读写分离.

思路

根据园子里的jeffcky大神的博客, 参考
entityframework core进行读写分离最佳实践方式,了解一下(一)?
entityframework core进行读写分离最佳实践方式,了解一下(二)?

最简单的思路就是使用手动切换ef core上下文的连接, 即context.database.getdbconnection().connectionstring = "xxx", 但必须要先创建上下文, 再关闭之前的连接, 才能进行切换
另一种方式是通过监听diagnostic来将进行查询的sql切换到从库执行, 这种方式虽然可以实现无感知的切换操作, 但不能满足公司的业务需求. 在后台管理或其他对数据实时性要求比较高的项目里,查询操作也都应该走主库,而这种方式却会切换到从库去. 另一方面就是假若公司的库比较多,每种业务都对应了一个库, 每个库都对应了一种dbcontext, 这种情况下, 要实现自动切换就变得很复杂了.

上面的两种方式都是从切换数据库连接入手,但是频繁的切换数据库连接势必会对性能造成影响. 我认为最理想的方式是要避免数据库连接的切换, 且能够适应多dbcontext的情况, 在创建上下文实例时,就指定好是访问主库还是从库, 而不是在后期再进行数据库切换. 因此, 在上下文实例化时,就传入相应的数据库连接字符串, 这样一来dbcontext的创建就需要交由我们自己来进行, 就不是由di容器进行创建了. 同时仓储应该区分为只读和可读可写两种,以防止其他人对从库进行写操作.

实现

    public interface ireadonlyrepository<tentity, tkey>
        where tentity : class, ientity<tkey>
        where tkey : iequatable<tkey>
    {}

    public interface irepository<tentity, tkey> : ireadonlyrepository<tentity, tkey>
    where tentity : class, ientity<tkey>
    where tkey : iequatable<tkey>
    {}

ireadonlyrepository接口是只读仓储接口,提供查询相关方法,irepository接口是可读可写仓储接口,提供增删查改等方法, 接口的实现就那些东西这里就省略了.

    public interface irepositoryfactory
    {
        irepository<tentity, tkey> getrepository<tentity, tkey>(iunitofwork unitofwork)
            where tentity : class, ientity<tkey>
            where tkey : iequatable<tkey>;
         ireadonlyrepository<tentity, tkey> getreadonlyrepository<tentity, tkey>(iunitofwork unitofwork)
                where tentity : class, ientity<tkey>
                where tkey : iequatable<tkey>;
    }
    public class repositoryfactory : irepositoryfactory
    {
        public repositoryfactory()
        {
        }

        public irepository<tentity, tkey> getrepository<tentity, tkey>(iunitofwork unitofwork)
            where tentity : class, ientity<tkey>
            where tkey : iequatable<tkey>
        {
            return new repository<tentity, tkey>(unitofwork);
        }

        public ireadonlyrepository<tentity, tkey> getreadonlyrepository<tentity, tkey>(iunitofwork unitofwork)
            where tentity : class, ientity<tkey>
            where tkey : iequatable<tkey>
        {
            return new readonlyrepository<tentity, tkey>(unitofwork);
        }
    }

repositoryfactory提供仓储对象的实例化

    public interface iunitofwork : idisposable
    {
        public dbcontext dbcontext { get; }

        /// <summary>
        /// 获取只读仓储对象
        /// </summary>
        ireadonlyrepository<tentity, tkey> getreadonlyrepository<tentity, tkey>()
            where tentity : class, ientity<tkey>
            where tkey : iequatable<tkey>;

        /// <summary>
        /// 获取仓储对象
        /// </summary>
        irepository<tentity, tkey> getrepository<tentity, tkey>()
            where tentity : class, ientity<tkey>
            where tkey : iequatable<tkey>;
        int savechanges();
        task<int> savechangesasync(cancellationtoken canceltoken = default);
    }
    
    public class unitofwork : iunitofwork
    {
        private readonly iserviceprovider _serviceprovider;
        private readonly dbcontext _dbcontext;
        private readonly irepositoryfactory _repositoryfactory;
        private bool _disposed;

        public unitofwork(iserviceprovider serviceprovider, dbcontext context)
        {
            check.notnull(serviceprovider, nameof(serviceprovider));
            _serviceprovider = serviceprovider;
            _dbcontext = context;
            _repositoryfactory = serviceprovider.getrequiredservice<irepositoryfactory>();
        }
        public dbcontext dbcontext { get => _dbcontext; }
        public ireadonlyrepository<tentity, tkey> getreadonlyrepository<tentity, tkey>()
            where tentity : class, ientity<tkey>
            where tkey : iequatable<tkey>
        {
            return _repositoryfactory.getreadonlyrepository<tentity, tkey>(this);
        }

        public irepository<tentity, tkey> getrepository<tentity, tkey>()
            where tentity : class, ientity<tkey>
            where tkey : iequatable<tkey>
        {
            return _repositoryfactory.getrepository<tentity, tkey>(this);
        }
        
        public void dispose()
        {
            if (_disposed)
            {
                return;
            }

            _dbcontext?.dispose();
            _disposed = true;
        }
        
        // 其他略
    }
    /// <summary>
    /// 数据库提供者接口
    /// </summary>
    public interface idbprovider : idisposable
    {
        /// <summary>
        /// 根据上下文类型及数据库名称获取unitofwork对象, dbname为null时默认为第一个数据库名称
        /// </summary>
        iunitofwork getunitofwork(type dbcontexttype, string dbname = null);
    }

idbprovider 接口, 根据上下文类型和配置文件中的数据库连接字符串名称创建iunitofwork, 在di中的生命周期是scoped,在销毁的同时会销毁数据库上下文对象, 下面是它的实现, 为了提高性能使用了expression来代替反射.

public class dbprovider : idbprovider
    {
        private readonly iserviceprovider _serviceprovider;
        private readonly concurrentdictionary<string, iunitofwork> _works = new concurrentdictionary<string, iunitofwork>();
        private static concurrentdictionary<type, func<iserviceprovider, dbcontextoptions, dbcontext>> _expressionfactorydict =
            new concurrentdictionary<type, func<iserviceprovider, dbcontextoptions, dbcontext>>();

        public dbprovider(iserviceprovider serviceprovider)
        {
            _serviceprovider = serviceprovider;
        }

        public iunitofwork getunitofwork(type dbcontexttype, string dbname = null)
        {
            var key = string.format("{0}${1}$", dbname, dbcontexttype.fullname);
            iunitofwork unitofwork;
            if (_works.trygetvalue(key, out unitofwork))
            {
                return unitofwork;
            }
            else
            {
                dbcontext dbcontext;
                var dbconnectionoptionsmap = _serviceprovider.getrequiredservice<ioptions<fxoptions>>().value.dbconnections;
                if (dbconnectionoptionsmap == null || dbconnectionoptionsmap.count <= 0)
                {
                    throw new exception("无法获取数据库配置");
                }

                dbconnectionoptions dbconnectionoptions = dbname == null ? dbconnectionoptionsmap.first().value : dbconnectionoptionsmap[dbname];

                var builderoptions = _serviceprovider.getservices<dbcontextoptionsbuilderoptions>()
                     ?.where(d => (d.dbname == null || d.dbname == dbname) && (d.dbcontexttype == null || d.dbcontexttype == dbcontexttype))
                     ?.orderbydescending(d => d.dbname)
                     ?.orderbydescending(d => d.dbcontexttype);
                if (builderoptions == null || !builderoptions.any())
                {
                    throw new exception("无法获取匹配的dbcontextoptionsbuilder");
                }

                var dbuser = _serviceprovider.getservices<idbcontextoptionsbuilderuser>()?.firstordefault(u => u.type == dbconnectionoptions.databasetype);
                if (dbuser == null)
                {
                    throw new exception($"无法解析类型为“{dbconnectionoptions.databasetype}”的 {typeof(idbcontextoptionsbuilderuser).fullname} 实例");
                }
                
                var dbcontextoptions = dbuser.use(builderoptions.first().builder, dbconnectionoptions.connectionstring).options;
                if (_expressionfactorydict.trygetvalue(dbcontexttype, out func<iserviceprovider, dbcontextoptions, dbcontext> factory))
                {
                    dbcontext = factory(_serviceprovider, dbcontextoptions);
                }
                else
                {
                    // 使用expression创建dbcontext
                    var constructormethod = dbcontexttype.getconstructors()
                        .where(c => c.ispublic && !c.isabstract && !c.isstatic)
                        .orderbydescending(c => c.getparameters().length)
                        .firstordefault();
                    if (constructormethod == null)
                    {
                        throw new exception("无法获取有效的上下文构造器");
                    }

                    var dbcontextoptionsbuildertype = typeof(dbcontextoptionsbuilder<>);
                    var dbcontextoptionstype = typeof(dbcontextoptions);
                    var dbcontextoptionsgenerictype = typeof(dbcontextoptions<>);
                    var serviceprovidertype = typeof(iserviceprovider);
                    var getservicemethod = serviceprovidertype.getmethod("getservice");
                    var lambdaparameterexpressions = new parameterexpression[2];
                    lambdaparameterexpressions[0] = (expression.parameter(serviceprovidertype, "serviceprovider"));
                    lambdaparameterexpressions[1] = (expression.parameter(dbcontextoptionstype, "dbcontextoptions"));
                    var paramtypes = constructormethod.getparameters();
                    var argumentexpressions = new expression[paramtypes.length];
                    for (int i = 0; i < paramtypes.length; i++)
                    {
                        var ptype = paramtypes[i];
                        if (ptype.parametertype == dbcontextoptionstype ||
                            (ptype.parametertype.isgenerictype && ptype.parametertype.getgenerictypedefinition() == dbcontextoptionsgenerictype))
                        {
                            argumentexpressions[i] = expression.convert(lambdaparameterexpressions[1], ptype.parametertype);
                        }
                        else if (ptype.parametertype == serviceprovidertype)
                        {
                            argumentexpressions[i] = lambdaparameterexpressions[0];
                        }
                        else
                        {
                            argumentexpressions[i] = expression.call(lambdaparameterexpressions[0], getservicemethod);
                        }
                    }

                    factory = expression
                        .lambda<func<iserviceprovider, dbcontextoptions, dbcontext>>(
                            expression.convert(expression.new(constructormethod, argumentexpressions), typeof(dbcontext)), lambdaparameterexpressions.asenumerable())
                        .compile();
                    _expressionfactorydict.tryadd(dbcontexttype, factory);

                    dbcontext = factory(_serviceprovider, dbcontextoptions);
                }

                var unitofworkfactory = _serviceprovider.getrequiredservice<iunitofworkfactory>();
                unitofwork = unitofworkfactory.getunitofwork(_serviceprovider, dbcontext);
                _works.tryadd(key, unitofwork);
                return unitofwork;
            }
        }

        public void dispose()
        {
            if (_works != null && _works.count > 0)
            {
                foreach (var unitofwork in _works.values)
                    unitofwork.dispose();
                _works.clear();
            }
        }
    }
    
    public static class dbproviderextensions
    {
        public static iunitofwork getunitofwork<tdbcontext>(this idbprovider provider, string dbname = null)
        {
            if (provider == null)
                return null;
            return provider.getunitofwork(typeof(tdbcontext), dbname);
        }
    }
    /// <summary>
    /// 业务系统配置选项
    /// </summary>
    public class fxoptions
    {
        public fxoptions()
        {
        }

        /// <summary>
        /// 默认数据库类型
        /// </summary>
        public databasetype defaultdatabasetype { get; set; } = databasetype.sqlserver;

        /// <summary>
        /// 数据库连接配置
        /// </summary>
        public idictionary<string, dbconnectionoptions> dbconnections { get; set; }

    }
    
    public class fxoptionssetup: iconfigureoptions<fxoptions>
    {
        private readonly iconfiguration _configuration;

        public fxoptionssetup(iconfiguration configuration)
        {
            _configuration = configuration;
        }

        /// <summary>
        /// 配置options各属性信息
        /// </summary>
        /// <param name="options"></param>
        public void configure(fxoptions options)
        {
            setdbconnectionsoptions(options);
            // ...
        }

        private void setdbconnectionsoptions(fxoptions options)
        {
            var dbconnectionmap = new dictionary<string, dbconnectionoptions>();
            options.dbconnections = dbconnectionmap;
            iconfiguration section = _configuration.getsection("fxcore:dbconnections");
            dictionary<string, dbconnectionoptions> dict = section.get<dictionary<string, dbconnectionoptions>>();
            if (dict == null || dict.count == 0)
            {
                string connectionstring = _configuration["connectionstrings:defaultdbcontext"];
                if (connectionstring == null)
                {
                    return;
                }
                dbconnectionmap.add("defaultdb", new dbconnectionoptions
                {
                    connectionstring = connectionstring,
                    databasetype = options.defaultdatabasetype
                });

                return;
            }

            var ambiguous = dict.keys.groupby(d => d).firstordefault(d => d.count() > 1);
            if (ambiguous != null)
            {
                throw new exception($"数据上下文配置中存在多个配置节点拥有同一个数据库连接名称,存在二义性:{ambiguous.first()}");
            }
            foreach (var db in dict)
            {
                dbconnectionmap.add(db.key, db.value);
            }
        }
    }
    
    /// <summary>
    /// dbcontextoptionsbuilder配置选项
    /// </summary>
    public class dbcontextoptionsbuilderoptions
    {
        /// <summary>
        /// 配置dbcontextoptionsbuilder, dbname指定数据库名称, 为null时表示所有数据库,默认为null
        /// </summary>
        /// <param name="build"></param>
        /// <param name="dbname"></param>
        /// <param name="dbcontexttype"></param>
        public dbcontextoptionsbuilderoptions(dbcontextoptionsbuilder build, string dbname = null, type dbcontexttype = null)
        {
            builder = build;
            dbname = dbname;
            dbcontexttype = dbcontexttype;
        }

        public dbcontextoptionsbuilder builder { get; }
        public string dbname { get; }
        public type dbcontexttype { get; }
    }

fxoptions是业务系统的配置选项(随便取得), 在通过service.getservice<ioptions>()时会调用iconfigureoptions完成fxoptions的初始化. dbcontextoptionsbuilderoptions用来提供dbcontextoptionsbuilder的相关配置

    public interface idbcontextoptionsbuilderuser
    {
        /// <summary>
        /// 获取 数据库类型名称,如 sqlserver,mysql,sqlite等
        /// </summary>
        databasetype type { get; }

        /// <summary>
        /// 使用数据库
        /// </summary>
        /// <param name="builder">创建器</param>
        /// <param name="connectionstring">连接字符串</param>
        /// <returns></returns>
        dbcontextoptionsbuilder use(dbcontextoptionsbuilder builder, string connectionstring);
    }
    
    public class sqlserverdbcontextoptionsbuilderuser : idbcontextoptionsbuilderuser
    {
        public databasetype type => databasetype.sqlserver;

        public dbcontextoptionsbuilder use(dbcontextoptionsbuilder builder, string connectionstring)
        {
            return builder.usesqlserver(connectionstring);
        }
    }

idbcontextoptionsbuilderuser接口用来适配不同的数据库来源

使用

{
    "fxcore": {
        "dbconnections": {
            "testdb": {
                "connectionstring": "xxx",
                "databasetype": "sqlserver"
            },
            "testdb_read": {
                "connectionstring": "xxx",
                "databasetype": "sqlserver"
            }
        }
    }
}
    class program
    {
        static void main(string[] args)
        {
            var config = new configurationbuilder()
                 .addjsonfile("appsettings.json")
                 .build();
            var services = new servicecollection()
                .addsingleton<iconfiguration>(config)
                .addoptions()
                .addsingleton<iconfigureoptions<fxoptions>, fxoptionssetup>()
                .addscoped<idbprovider, dbprovider>()
                .addsingleton<iunitofworkfactory, unitofworkfactory>()
                .addsingleton<irepositoryfactory, repositoryfactory>()
                .addsingleton<idbcontextoptionsbuilderuser, sqlserverdbcontextoptionsbuilderuser>()
                .addsingleton<dbcontextoptionsbuilderoptions>(new dbcontextoptionsbuilderoptions(new dbcontextoptionsbuilder<testdbcontext>(), null, typeof(testdbcontext)));

            var serviceprovider = services.buildserviceprovider();

            var dbprovider = serviceprovider.getrequiredservice<idbprovider>();
            var uow = dbprovider.getunitofwork<testdbcontext>("testdb"); // 访问主库

            var repodbtest = uow.getrepository<dbtest, int>();
            var obj = new dbtest { name = "123", date = datetime.now.date };
            repodbtest.insert(obj);
            uow.savechanges();
            
            console.readkey();
            
            var uow2 = dbprovider.getunitofwork<testdbcontext>("testdb_read");

             var uow2 = dbprovider.getunitofwork<testdbcontext>("testdb_read"); // 访问从库
            var repodbtest2 = uow2.getreadonlyrepository<dbtest, int>();
            var data2 = repodbtest2.getfirstordefault();
            console.writeline($"id: {data2.id} name: {data2.name}");
            console.readkey();
        }
    }

这里直接用控制台来做一个例子,中间多了一个console.readkey()是因为我本地没有配置主从模式,所以实际上我是先插入数据,然后复制到另一个数据库里,再进行读取的.

总结

本文给出的解决方案适用于系统中存在多个不同的上下文,能够适应复杂的业务场景.但对已有代码的侵入性比较大,不知道有没有更好的方案,欢迎一起探讨.