Spring+MyBatis实现数据库读写分离方案
程序员文章站
2024-03-31 19:59:46
推荐第四种
方案1
通过mybatis配置文件创建读写分离两个datasource,每个sqlsessionfactorybean对象的mapperlocations属...
推荐第四种
方案1
通过mybatis配置文件创建读写分离两个datasource,每个sqlsessionfactorybean对象的mapperlocations属性制定两个读写数据源的配置文件。将所有读的操作配置在读文件中,所有写的操作配置在写文件中。
优点:实现简单
缺点:维护麻烦,需要对原有的xml文件进行重新修改,不支持多读,不易扩展
实现方式
<bean id="abstractdatasource" abstract="true" class="com.alibaba.druid.pool.druiddatasource" init-method="init" destroy-method="close"> <property name="driverclassname" value="com.microsoft.sqlserver.jdbc.sqlserverdriver"/> <!-- 配置获取连接等待超时的时间 --> <property name="maxwait" value="60000"/> <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 --> <property name="timebetweenevictionrunsmillis" value="60000"/> <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 --> <property name="minevictableidletimemillis" value="300000"/> <property name="validationquery" value="select 'x'"/> <property name="testwhileidle" value="true"/> <property name="testonborrow" value="false"/> <property name="testonreturn" value="false"/> <!-- 打开pscache,并且指定每个连接上pscache的大小 --> <property name="poolpreparedstatements" value="true"/> <property name="maxpoolpreparedstatementperconnectionsize" value="20"/> <property name="filters" value="config"/> <property name="connectionproperties" value="config.decrypt=true" /> </bean> <bean id="readdatasource" parent="abstractdatasource"> <!-- 基本属性 url、user、password --> <property name="url" value="${read.jdbc.url}"/> <property name="username" value="${read.jdbc.user}"/> <property name="password" value="${read.jdbc.password}"/> <!-- 配置初始化大小、最小、最大 --> <property name="initialsize" value="${read.jdbc.initpoolsize}"/> <property name="minidle" value="10"/> <property name="maxactive" value="${read.jdbc.maxpoolsize}"/> </bean> <bean id="writedatasource" parent="abstractdatasource"> <!-- 基本属性 url、user、password --> <property name="url" value="${write.jdbc.url}"/> <property name="username" value="${write.jdbc.user}"/> <property name="password" value="${write.jdbc.password}"/> <!-- 配置初始化大小、最小、最大 --> <property name="initialsize" value="${write.jdbc.initpoolsize}"/> <property name="minidle" value="10"/> <property name="maxactive" value="${write.jdbc.maxpoolsize}"/> </bean> <bean id="readsqlsessionfactory" class="org.mybatis.spring.sqlsessionfactorybean"> <!-- 实例化sqlsessionfactory时需要使用上述配置好的数据源以及sql映射文件 --> <property name="datasource" ref="readdatasource"/> <property name="mapperlocations" value="classpath:mapper/read/*.xml"/> </bean> <bean id="writesqlsessionfactory" class="org.mybatis.spring.sqlsessionfactorybean"> <!-- 实例化sqlsessionfactory时需要使用上述配置好的数据源以及sql映射文件 --> <property name="datasource" ref="writedatasource"/> <property name="mapperlocations" value="classpath:mapper/write/*.xml"/> </bean>
方案2
通过spring aop在业务层实现读写分离,在dao层调用前定义切面,利用spring的abstractroutingdatasource解决多数据源的问题,实现动态选择数据源
优点:通过注解的方法在dao每个方法上配置数据源,原有代码改动量少,易扩展,支持多读
缺点:需要在dao每个方法上配置注解,人工管理,容易出错
实现方式
//定义枚举类型,读写 public enum dynamicdatasourceglobal { read, write; }
import java.lang.annotation.elementtype; import java.lang.annotation.retention; import java.lang.annotation.retentionpolicy; import java.lang.annotation.target; /** * runtime * 定义注解 * 编译器将把注释记录在类文件中,在运行时 vm 将保留注释,因此可以反射性地读取。 * @author shma1664 * */ @retention(retentionpolicy.runtime) @target(elementtype.method) public @interface datasource { public dynamicdatasourceglobal value() default dynamicdatasourceglobal.read; }
/** * created by idea * 本地线程设置和获取数据源信息 * user: mashaohua * date: 2016-07-07 13:35 * desc: */ public class dynamicdatasourceholder { private static final threadlocal<dynamicdatasourceglobal> holder = new threadlocal<dynamicdatasourceglobal>(); public static void putdatasource(dynamicdatasourceglobal datasource){ holder.set(datasource); } public static dynamicdatasourceglobal getdatasource(){ return holder.get(); } public static void cleardatasource() { holder.remove(); } }
import org.springframework.jdbc.datasource.lookup.abstractroutingdatasource; import java.util.hashmap; import java.util.list; import java.util.map; import java.util.concurrent.threadlocalrandom; import java.util.concurrent.atomic.atomiclong; import java.util.concurrent.locks.lock; import java.util.concurrent.locks.reentrantlock; /** * created by idea * user: mashaohua * date: 2016-07-14 10:56 * desc: 动态数据源实现读写分离 */ public class dynamicdatasource extends abstractroutingdatasource { private object writedatasource; //写数据源 private list<object> readdatasources; //多个读数据源 private int readdatasourcesize; //读数据源个数 private int readdatasourcepollpattern = 0; //获取读数据源方式,0:随机,1:轮询 private atomiclong counter = new atomiclong(0); private static final long max_pool = long.max_value; private final lock lock = new reentrantlock(); @override public void afterpropertiesset() { if (this.writedatasource == null) { throw new illegalargumentexception("property 'writedatasource' is required"); } setdefaulttargetdatasource(writedatasource); map<object, object> targetdatasources = new hashmap<>(); targetdatasources.put(dynamicdatasourceglobal.write.name(), writedatasource); if (this.readdatasources == null) { readdatasourcesize = 0; } else { for(int i=0; i<readdatasources.size(); i++) { targetdatasources.put(dynamicdatasourceglobal.read.name() + i, readdatasources.get(i)); } readdatasourcesize = readdatasources.size(); } settargetdatasources(targetdatasources); super.afterpropertiesset(); } @override protected object determinecurrentlookupkey() { dynamicdatasourceglobal dynamicdatasourceglobal = dynamicdatasourceholder.getdatasource(); if(dynamicdatasourceglobal == null || dynamicdatasourceglobal == dynamicdatasourceglobal.write || readdatasourcesize <= 0) { return dynamicdatasourceglobal.write.name(); } int index = 1; if(readdatasourcepollpattern == 1) { //轮询方式 long currvalue = counter.incrementandget(); if((currvalue + 1) >= max_pool) { try { lock.lock(); if((currvalue + 1) >= max_pool) { counter.set(0); } } finally { lock.unlock(); } } index = (int) (currvalue % readdatasourcesize); } else { //随机方式 index = threadlocalrandom.current().nextint(0, readdatasourcesize); } return dynamicdatasourceglobal.name() + index; } public void setwritedatasource(object writedatasource) { this.writedatasource = writedatasource; } public void setreaddatasources(list<object> readdatasources) { this.readdatasources = readdatasources; } public void setreaddatasourcepollpattern(int readdatasourcepollpattern) { this.readdatasourcepollpattern = readdatasourcepollpattern; } }
import org.apache.log4j.logger; import org.aspectj.lang.joinpoint; import org.aspectj.lang.reflect.methodsignature; import java.lang.reflect.method; /** * created by idea * user: mashaohua * date: 2016-07-07 13:39 * desc: 定义选择数据源切面 */ public class dynamicdatasourceaspect { private static final logger logger = logger.getlogger(dynamicdatasourceaspect.class); public void pointcut(){}; public void before(joinpoint point) { object target = point.gettarget(); string methodname = point.getsignature().getname(); class<?>[] clazz = target.getclass().getinterfaces(); class<?>[] parametertypes = ((methodsignature) point.getsignature()).getmethod().getparametertypes(); try { method method = clazz[0].getmethod(methodname, parametertypes); if (method != null && method.isannotationpresent(datasource.class)) { datasource data = method.getannotation(datasource.class); dynamicdatasourceholder.putdatasource(data.value()); } } catch (exception e) { logger.error(string.format("choose datasource error, method:%s, msg:%s", methodname, e.getmessage())); } } public void after(joinpoint point) { dynamicdatasourceholder.cleardatasource(); } }
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd"> <bean id="abstractdatasource" abstract="true" class="com.alibaba.druid.pool.druiddatasource" init-method="init" destroy-method="close"> <property name="driverclassname" value="com.microsoft.sqlserver.jdbc.sqlserverdriver"/> <!-- 配置获取连接等待超时的时间 --> <property name="maxwait" value="60000"/> <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 --> <property name="timebetweenevictionrunsmillis" value="60000"/> <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 --> <property name="minevictableidletimemillis" value="300000"/> <property name="validationquery" value="select 'x'"/> <property name="testwhileidle" value="true"/> <property name="testonborrow" value="false"/> <property name="testonreturn" value="false"/> <!-- 打开pscache,并且指定每个连接上pscache的大小 --> <property name="poolpreparedstatements" value="true"/> <property name="maxpoolpreparedstatementperconnectionsize" value="20"/> <property name="filters" value="config"/> <property name="connectionproperties" value="config.decrypt=true" /> </bean> <bean id="datasourceread1" parent="abstractdatasource"> <property name="driverclassname" value="com.microsoft.sqlserver.jdbc.sqlserverdriver"/> <!-- 基本属性 url、user、password --> <property name="url" value="${read1.jdbc.url}"/> <property name="username" value="${read1.jdbc.user}"/> <property name="password" value="${read1.jdbc.password}"/> <!-- 配置初始化大小、最小、最大 --> <property name="initialsize" value="${read1.jdbc.initpoolsize}"/> <property name="minidle" value="${read1.jdbc.minpoolsize}"/> <property name="maxactive" value="${read1.jdbc.maxpoolsize}"/> </bean> <bean id="datasourceread2" parent="abstractdatasource"> <property name="driverclassname" value="com.microsoft.sqlserver.jdbc.sqlserverdriver"/> <!-- 基本属性 url、user、password --> <property name="url" value="${read2.jdbc.url}"/> <property name="username" value="${read2.jdbc.user}"/> <property name="password" value="${read2.jdbc.password}"/> <!-- 配置初始化大小、最小、最大 --> <property name="initialsize" value="${read2.jdbc.initpoolsize}"/> <property name="minidle" value="${read2.jdbc.minpoolsize}"/> <property name="maxactive" value="${read2.jdbc.maxpoolsize}"/> </bean> <bean id="datasourcewrite" parent="abstractdatasource"> <property name="driverclassname" value="com.microsoft.sqlserver.jdbc.sqlserverdriver"/> <!-- 基本属性 url、user、password --> <property name="url" value="${write.jdbc.url}"/> <property name="username" value="${write.jdbc.user}"/> <property name="password" value="${write.jdbc.password}"/> <!-- 配置初始化大小、最小、最大 --> <property name="initialsize" value="${write.jdbc.initpoolsize}"/> <property name="minidle" value="${write.jdbc.minpoolsize}"/> <property name="maxactive" value="${write.jdbc.maxpoolsize}"/> </bean> <bean id="datasource" class="com.test.api.dao.datasource.dynamicdatasource"> <property name="writedatasource" ref="datasourcewrite" /> <property name="readdatasources"> <list> <ref bean="datasourceread1" /> <ref bean="datasourceread2" /> </list> </property> <!--轮询方式--> <property name="readdatasourcepollpattern" value="1" /> <property name="defaulttargetdatasource" ref="datasourcewrite"/> </bean> <tx:annotation-driven transaction-manager="transactionmanager"/> <bean id="transactionmanager" class="org.springframework.jdbc.datasource.datasourcetransactionmanager"> <property name="datasource" ref="datasource"/> </bean> <!-- 针对mybatis的配置项 --> <!-- 配置sqlsessionfactory --> <bean id="sqlsessionfactory" class="org.mybatis.spring.sqlsessionfactorybean"> <!-- 实例化sqlsessionfactory时需要使用上述配置好的数据源以及sql映射文件 --> <property name="datasource" ref="datasource"/> <property name="mapperlocations" value="classpath:mapper/*.xml"/> </bean> <!-- 配置扫描器 --> <bean class="org.mybatis.spring.mapper.mapperscannerconfigurer"> <!-- 扫描包以及它的子包下的所有映射接口类 --> <property name="basepackage" value="com.test.api.dao.inte"/> <property name="sqlsessionfactorybeanname" value="sqlsessionfactory"/> </bean> <!-- 配置数据库注解aop --> <bean id="dynamicdatasourceaspect" class="com.test.api.dao.datasource.dynamicdatasourceaspect" /> <aop:config> <aop:aspect id="c" ref="dynamicdatasourceaspect"> <aop:pointcut id="tx" expression="execution(* com.test.api.dao.inte..*.*(..))"/> <aop:before pointcut-ref="tx" method="before"/> <aop:after pointcut-ref="tx" method="after"/> </aop:aspect> </aop:config> <!-- 配置数据库注解aop --> </beans>
方案3
通过mybatis的plugin在业务层实现数据库读写分离,在mybatis创建statement对象前通过拦截器选择真正的数据源,在拦截器中根据方法名称不同(select、update、insert、delete)选择数据源。
优点:原有代码不变,支持多读,易扩展
缺点:
实现方式
/** * created by idea * user: mashaohua * date: 2016-07-19 15:40 * desc: 创建connection代理接口 */ public interface connectionproxy extends connection { /** * 根据传入的读写分离需要的key路由到正确的connection * @param key 数据源标识 * @return */ connection gettargetconnection(string key); }
import java.lang.reflect.invocationhandler; import java.lang.reflect.invocationtargetexception; import java.lang.reflect.method; import java.lang.reflect.proxy; import java.sql.connection; import java.sql.sqlexception; import java.util.arraylist; import java.util.list; import java.util.logging.logger; import javax.sql.datasource; import org.springframework.beans.factory.initializingbean; import org.springframework.jdbc.datasource.abstractdatasource; import org.springframework.jdbc.datasource.lookup.datasourcelookup; import org.springframework.jdbc.datasource.lookup.jndidatasourcelookup; import org.springframework.util.assert; public abstract class abstractdynamicdatasourceproxy extends abstractdatasource implements initializingbean { private list<object> readdatasources; private list<datasource> resolvedreaddatasources; private object writedatasource; private datasource resolvedwritedatasource; private int readdatasourcepollpattern = 0; private int readdssize; private boolean defaultautocommit = true; private int defaulttransactionisolation = connection.transaction_read_committed; public static final string read = "read"; public static final string write = "write"; private datasourcelookup datasourcelookup = new jndidatasourcelookup(); @override public connection getconnection() throws sqlexception { return (connection) proxy.newproxyinstance( com.autohome.api.dealer.tuan.dao.rwmybatis.connectionproxy.class.getclassloader(), new class[] {com.autohome.api.dealer.tuan.dao.rwmybatis.connectionproxy.class}, new rwconnectioninvocationhandler()); } @override public connection getconnection(string username, string password) throws sqlexception { return (connection) proxy.newproxyinstance( com.autohome.api.dealer.tuan.dao.rwmybatis.connectionproxy.class.getclassloader(), new class[] {com.autohome.api.dealer.tuan.dao.rwmybatis.connectionproxy.class}, new rwconnectioninvocationhandler(username,password)); } public int getreaddssize(){ return readdssize; } public list<datasource> getresolvedreaddatasources() { return resolvedreaddatasources; } public void afterpropertiesset() throws exception { if(writedatasource == null){ throw new illegalargumentexception("property 'writedatasource' is required"); } this.resolvedwritedatasource = resolvespecifieddatasource(writedatasource); resolvedreaddatasources = new arraylist<datasource>(readdatasources.size()); for(object item : readdatasources){ resolvedreaddatasources.add(resolvespecifieddatasource(item)); } readdssize = readdatasources.size(); } protected datasource determinetargetdatasource(string key) { assert.notnull(this.resolvedreaddatasources, "datasource router not initialized"); if(write.equals(key)){ return resolvedwritedatasource; }else{ return loadreaddatasource(); } } public logger getparentlogger() { // noop just ignore return null; } /** * 获取真实的data source * @param datasource (jndi | real data source) * @return * @throws illegalargumentexception */ protected datasource resolvespecifieddatasource(object datasource) throws illegalargumentexception { if (datasource instanceof datasource) { return (datasource) datasource; } else if (datasource instanceof string) { return this.datasourcelookup.getdatasource((string) datasource); } else { throw new illegalargumentexception( "illegal data source value - only [javax.sql.datasource] and string supported: " + datasource); } } protected abstract datasource loadreaddatasource(); public void setreaddssize(int readdssize) { this.readdssize = readdssize; } public list<object> getreaddatasources() { return readdatasources; } public void setreaddatasources(list<object> readdatasources) { this.readdatasources = readdatasources; } public object getwritedatasource() { return writedatasource; } public void setwritedatasource(object writedatasource) { this.writedatasource = writedatasource; } public void setresolvedreaddatasources(list<datasource> resolvedreaddatasources) { this.resolvedreaddatasources = resolvedreaddatasources; } public datasource getresolvedwritedatasource() { return resolvedwritedatasource; } public void setresolvedwritedatasource(datasource resolvedwritedatasource) { this.resolvedwritedatasource = resolvedwritedatasource; } public int getreaddatasourcepollpattern() { return readdatasourcepollpattern; } public void setreaddatasourcepollpattern(int readdatasourcepollpattern) { this.readdatasourcepollpattern = readdatasourcepollpattern; } /** * invocation handler that defers fetching an actual jdbc connection * until first creation of a statement. */ private class rwconnectioninvocationhandler implements invocationhandler { private string username; private string password; private boolean readonly = boolean.false; private integer transactionisolation; private boolean autocommit; private boolean closed = false; private connection target; public rwconnectioninvocationhandler() { } public rwconnectioninvocationhandler(string username, string password) { this(); this.username = username; this.password = password; } public object invoke(object proxy, method method, object[] args) throws throwable { // invocation on connectionproxy interface coming in... if (method.getname().equals("equals")) { // we must avoid fetching a target connection for "equals". // only consider equal when proxies are identical. return (proxy == args[0] ? boolean.true : boolean.false); } else if (method.getname().equals("hashcode")) { // we must avoid fetching a target connection for "hashcode", // and we must return the same hash code even when the target // connection has been fetched: use hashcode of connection proxy. return new integer(system.identityhashcode(proxy)); } else if (method.getname().equals("gettargetconnection")) { // handle gettargetconnection method: return underlying connection. return gettargetconnection(method,args); } if (!hastargetconnection()) { // no physical target connection kept yet -> // resolve transaction demarcation methods without fetching // a physical jdbc connection until absolutely necessary. if (method.getname().equals("tostring")) { return "rw routing datasource proxy"; } else if (method.getname().equals("isreadonly")) { return this.readonly; } else if (method.getname().equals("setreadonly")) { this.readonly = (boolean) args[0]; return null; } else if (method.getname().equals("gettransactionisolation")) { if (this.transactionisolation != null) { return this.transactionisolation; } return defaulttransactionisolation; // else fetch actual connection and check there, // because we didn't have a default specified. } else if (method.getname().equals("settransactionisolation")) { this.transactionisolation = (integer) args[0]; return null; } else if (method.getname().equals("getautocommit")) { if (this.autocommit != null) return this.autocommit; return defaultautocommit; // else fetch actual connection and check there, // because we didn't have a default specified. } else if (method.getname().equals("setautocommit")) { this.autocommit = (boolean) args[0]; return null; } else if (method.getname().equals("commit")) { // ignore: no statements created yet. return null; } else if (method.getname().equals("rollback")) { // ignore: no statements created yet. return null; } else if (method.getname().equals("getwarnings")) { return null; } else if (method.getname().equals("clearwarnings")) { return null; } else if (method.getname().equals("isclosed")) { return (this.closed ? boolean.true : boolean.false); } else if (method.getname().equals("close")) { // ignore: no target connection yet. this.closed = true; return null; } else if (this.closed) { // connection proxy closed, without ever having fetched a // physical jdbc connection: throw corresponding sqlexception. throw new sqlexception("illegal operation: connection is closed"); } } // target connection already fetched, // or target connection necessary for current operation -> // invoke method on target connection. try { return method.invoke(target, args); } catch (invocationtargetexception ex) { throw ex.gettargetexception(); } } /** * return whether the proxy currently holds a target connection. */ private boolean hastargetconnection() { return (this.target != null); } /** * return the target connection, fetching it and initializing it if necessary. */ private connection gettargetconnection(method operation,object[] args) throws sqlexception { if (this.target == null) { string key = (string) args[0]; // no target connection held -> fetch one. if (logger.isdebugenabled()) { logger.debug("connecting to database for operation '" + operation.getname() + "'"); } // fetch physical connection from datasource. this.target = (this.username != null) ? determinetargetdatasource(key).getconnection(this.username, this.password) : determinetargetdatasource(key).getconnection(); // if we still lack default connection properties, check them now. //checkdefaultconnectionproperties(this.target); // apply kept transaction settings, if any. if (this.readonly.booleanvalue()) { this.target.setreadonly(this.readonly.booleanvalue()); } if (this.transactionisolation != null) { this.target.settransactionisolation(this.transactionisolation.intvalue()); } if (this.autocommit != null && this.autocommit.booleanvalue() != this.target.getautocommit()) { this.target.setautocommit(this.autocommit.booleanvalue()); } } else { // target connection already held -> return it. if (logger.isdebugenabled()) { logger.debug("using existing database connection for operation '" + operation.getname() + "'"); } } return this.target; } } }
import javax.sql.datasource; import java.util.concurrent.threadlocalrandom; import java.util.concurrent.atomic.atomiclong; import java.util.concurrent.locks.lock; import java.util.concurrent.locks.reentrantlock; /** * created by idea * user: mashaohua * date: 2016-07-19 16:04 * desc: */ public class dynamicroutingdatasourceproxy extends abstractdynamicdatasourceproxy { private atomiclong counter = new atomiclong(0); private static final long max_pool = long.max_value; private final lock lock = new reentrantlock(); @override protected datasource loadreaddatasource() { int index = 1; if(getreaddatasourcepollpattern() == 1) { //轮询方式 long currvalue = counter.incrementandget(); if((currvalue + 1) >= max_pool) { try { lock.lock(); if((currvalue + 1) >= max_pool) { counter.set(0); } } finally { lock.unlock(); } } index = (int) (currvalue % getreaddssize()); } else { //随机方式 index = threadlocalrandom.current().nextint(0, getreaddssize()); } return getresolvedreaddatasources().get(index); } }
import org.apache.ibatis.executor.statement.routingstatementhandler; import org.apache.ibatis.executor.statement.statementhandler; import org.apache.ibatis.mapping.mappedstatement; import org.apache.ibatis.mapping.sqlcommandtype; import org.apache.ibatis.plugin.*; import java.sql.connection; import java.util.properties; /** * 拦截器 */ @intercepts({ @signature(type = statementhandler.class, method = "prepare", args = { connection.class }) }) public class dynamicplugin implements interceptor { public object intercept(invocation invocation) throws throwable { connection conn = (connection)invocation.getargs()[0]; //如果是采用了我们代理,则路由数据源 if(conn instanceof com.autohome.api.dealer.tuan.dao.rwmybatis.connectionproxy){ statementhandler statementhandler = (statementhandler) invocation .gettarget(); mappedstatement mappedstatement = null; if (statementhandler instanceof routingstatementhandler) { statementhandler delegate = (statementhandler) reflectionutils .getfieldvalue(statementhandler, "delegate"); mappedstatement = (mappedstatement) reflectionutils.getfieldvalue( delegate, "mappedstatement"); } else { mappedstatement = (mappedstatement) reflectionutils.getfieldvalue( statementhandler, "mappedstatement"); } string key = abstractdynamicdatasourceproxy.write; if(mappedstatement.getsqlcommandtype() == sqlcommandtype.select){ key = abstractdynamicdatasourceproxy.read; }else{ key = abstractdynamicdatasourceproxy.write; } connectionproxy connectionproxy = (connectionproxy)conn; connectionproxy.gettargetconnection(key); } return invocation.proceed(); } public object plugin(object target) { return plugin.wrap(target, this); } public void setproperties(properties properties) { //noop } }
import org.apache.ibatis.logging.log; import org.apache.ibatis.logging.logfactory; import java.lang.reflect.*; public class reflectionutils { private static final log logger = logfactory.getlog(reflectionutils.class); /** * 直接设置对象属性值,无视private/protected修饰符,不经过setter函数. */ public static void setfieldvalue(final object object, final string fieldname, final object value) { field field = getdeclaredfield(object, fieldname); if (field == null) throw new illegalargumentexception("could not find field [" + fieldname + "] on target [" + object + "]"); makeaccessible(field); try { field.set(object, value); } catch (illegalaccessexception e) { } } /** * 直接读取对象属性值,无视private/protected修饰符,不经过getter函数. */ public static object getfieldvalue(final object object, final string fieldname) { field field = getdeclaredfield(object, fieldname); if (field == null) throw new illegalargumentexception("could not find field [" + fieldname + "] on target [" + object + "]"); makeaccessible(field); object result = null; try { result = field.get(object); } catch (illegalaccessexception e) { } return result; } /** * 直接调用对象方法,无视private/protected修饰符. */ public static object invokemethod(final object object, final string methodname, final class<?>[] parametertypes, final object[] parameters) throws invocationtargetexception { method method = getdeclaredmethod(object, methodname, parametertypes); if (method == null) throw new illegalargumentexception("could not find method [" + methodname + "] on target [" + object + "]"); method.setaccessible(true); try { return method.invoke(object, parameters); } catch (illegalaccessexception e) { } return null; } /** * 循环向上转型,获取对象的declaredfield. */ protected static field getdeclaredfield(final object object, final string fieldname) { for (class<?> superclass = object.getclass(); superclass != object.class; superclass = superclass .getsuperclass()) { try { return superclass.getdeclaredfield(fieldname); } catch (nosuchfieldexception e) { } } return null; } /** * 循环向上转型,获取对象的declaredfield. */ protected static void makeaccessible(final field field) { if (!modifier.ispublic(field.getmodifiers()) || !modifier.ispublic(field.getdeclaringclass().getmodifiers())) { field.setaccessible(true); } } /** * 循环向上转型,获取对象的declaredmethod. */ protected static method getdeclaredmethod(object object, string methodname, class<?>[] parametertypes) { for (class<?> superclass = object.getclass(); superclass != object.class; superclass = superclass .getsuperclass()) { try { return superclass.getdeclaredmethod(methodname, parametertypes); } catch (nosuchmethodexception e) { } } return null; } /** * 通过反射,获得class定义中声明的父类的泛型参数的类型. * eg. * public userdao extends hibernatedao<user> * * @param clazz the class to introspect * @return the first generic declaration, or object.class if cannot be determined */ @suppresswarnings("unchecked") public static <t> class<t> getsuperclassgenrictype(final class clazz) { return getsuperclassgenrictype(clazz, 0); } /** * 通过反射,获得class定义中声明的父类的泛型参数的类型. * eg. * public userdao extends hibernatedao<user> * * @param clazz the class to introspect * @return the first generic declaration, or object.class if cannot be determined */ @suppresswarnings("unchecked") public static class getsuperclassgenrictype(final class clazz, final int index) { type gentype = clazz.getgenericsuperclass(); if (!(gentype instanceof parameterizedtype)) { logger.warn(clazz.getsimplename() + "'s superclass not parameterizedtype"); return object.class; } type[] params = ((parameterizedtype) gentype).getactualtypearguments(); if (index >= params.length || index < 0) { logger.warn("index: " + index + ", size of " + clazz.getsimplename() + "'s parameterized type: " + params.length); return object.class; } if (!(params[index] instanceof class)) { logger.warn(clazz.getsimplename() + " not set the actual class on superclass generic parameter"); return object.class; } return (class) params[index]; } /** * 将反射时的checked exception转换为unchecked exception. */ public static illegalargumentexception converttouncheckedexception(exception e) { if (e instanceof illegalaccessexception || e instanceof illegalargumentexception || e instanceof nosuchmethodexception) return new illegalargumentexception("refelction exception.", e); else return new illegalargumentexception(e); } }
<?xml version="1.0" encoding="utf-8"?> <!doctype configuration public "-//mybatis.org//dtd sql map config 3.0//en" "http://mybatis.org/dtd/mybatis-3-config.dtd"> <configuration> <plugins> <plugin interceptor="com.test.api.dao.mybatis.dynamicplugin"> </plugin> </plugins> </configuration>
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd"> <bean id="abstractdatasource" abstract="true" class="com.alibaba.druid.pool.druiddatasource" init-method="init" destroy-method="close"> <property name="driverclassname" value="com.microsoft.sqlserver.jdbc.sqlserverdriver"/> <!-- 配置获取连接等待超时的时间 --> <property name="maxwait" value="60000"/> <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 --> <property name="timebetweenevictionrunsmillis" value="60000"/> <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 --> <property name="minevictableidletimemillis" value="300000"/> <property name="validationquery" value="select 'x'"/> <property name="testwhileidle" value="true"/> <property name="testonborrow" value="false"/> <property name="testonreturn" value="false"/> <!-- 打开pscache,并且指定每个连接上pscache的大小 --> <property name="poolpreparedstatements" value="true"/> <property name="maxpoolpreparedstatementperconnectionsize" value="20"/> <property name="filters" value="config"/> <property name="connectionproperties" value="config.decrypt=true" /> </bean> <bean id="datasourceread1" parent="abstractdatasource"> <property name="driverclassname" value="com.microsoft.sqlserver.jdbc.sqlserverdriver"/> <!-- 基本属性 url、user、password --> <property name="url" value="${read1.jdbc.url}"/> <property name="username" value="${read1.jdbc.user}"/> <property name="password" value="${read1.jdbc.password}"/> <!-- 配置初始化大小、最小、最大 --> <property name="initialsize" value="${read1.jdbc.initpoolsize}"/> <property name="minidle" value="${read1.jdbc.minpoolsize}"/> <property name="maxactive" value="${read1.jdbc.maxpoolsize}"/> </bean> <bean id="datasourceread2" parent="abstractdatasource"> <property name="driverclassname" value="com.microsoft.sqlserver.jdbc.sqlserverdriver"/> <!-- 基本属性 url、user、password --> <property name="url" value="${read2.jdbc.url}"/> <property name="username" value="${read2.jdbc.user}"/> <property name="password" value="${read2.jdbc.password}"/> <!-- 配置初始化大小、最小、最大 --> <property name="initialsize" value="${read2.jdbc.initpoolsize}"/> <property name="minidle" value="${read2.jdbc.minpoolsize}"/> <property name="maxactive" value="${read2.jdbc.maxpoolsize}"/> </bean> <bean id="datasourcewrite" parent="abstractdatasource"> <property name="driverclassname" value="com.microsoft.sqlserver.jdbc.sqlserverdriver"/> <!-- 基本属性 url、user、password --> <property name="url" value="${write.jdbc.url}"/> <property name="username" value="${write.jdbc.user}"/> <property name="password" value="${write.jdbc.password}"/> <!-- 配置初始化大小、最小、最大 --> <property name="initialsize" value="${write.jdbc.initpoolsize}"/> <property name="minidle" value="${write.jdbc.minpoolsize}"/> <property name="maxactive" value="${write.jdbc.maxpoolsize}"/> </bean> <bean id="datasource" class="com.test.api.dao.datasource.dynamicroutingdatasourceproxy"> <property name="writedatasource" ref="datasourcewrite" /> <property name="readdatasources"> <list> <ref bean="datasourceread1" /> <ref bean="datasourceread2" /> </list> </property> <!--轮询方式--> <property name="readdatasourcepollpattern" value="1" /> </bean> <tx:annotation-driven transaction-manager="transactionmanager"/> <bean id="transactionmanager" class="org.springframework.jdbc.datasource.datasourcetransactionmanager"> <property name="datasource" ref="datasource"/> </bean> <!-- 针对mybatis的配置项 --> <!-- 配置sqlsessionfactory --> <bean id="sqlsessionfactory" class="org.mybatis.spring.sqlsessionfactorybean"> <!-- 实例化sqlsessionfactory时需要使用上述配置好的数据源以及sql映射文件 --> <property name="datasource" ref="datasource"/> <property name="mapperlocations" value="classpath:mapper/*.xml"/> <property name="configlocation" value="classpath:mybatis-plugin-config.xml" /> </bean> <bean id="sqlsessiontemplate" class="org.mybatis.spring.sqlsessiontemplate"> <constructor-arg ref="sqlsessionfactory" /> </bean> <!-- 通过扫描的模式,扫描目录下所有的mapper, 根据对应的mapper.xml为其生成代理类--> <bean id="mapper" class="org.mybatis.spring.mapper.mapperscannerconfigurer"> <property name="basepackage" value="com.test.api.dao.inte" /> <property name="sqlsessiontemplate" ref="sqlsessiontemplate"></property> </bean> </beans>
方案4
如果你的后台结构是spring+mybatis,可以通过spring的abstractroutingdatasource和mybatis plugin拦截器实现非常友好的读写分离,原有代码不需要任何改变。推荐第四种方案
import org.springframework.jdbc.datasource.lookup.abstractroutingdatasource; import java.util.hashmap; import java.util.map; /** * created by idea * user: mashaohua * date: 2016-07-14 10:56 * desc: 动态数据源实现读写分离 */ public class dynamicdatasource extends abstractroutingdatasource { private object writedatasource; //写数据源 private object readdatasource; //读数据源 @override public void afterpropertiesset() { if (this.writedatasource == null) { throw new illegalargumentexception("property 'writedatasource' is required"); } setdefaulttargetdatasource(writedatasource); map<object, object> targetdatasources = new hashmap<>(); targetdatasources.put(dynamicdatasourceglobal.write.name(), writedatasource); if(readdatasource != null) { targetdatasources.put(dynamicdatasourceglobal.read.name(), readdatasource); } settargetdatasources(targetdatasources); super.afterpropertiesset(); } @override protected object determinecurrentlookupkey() { dynamicdatasourceglobal dynamicdatasourceglobal = dynamicdatasourceholder.getdatasource(); if(dynamicdatasourceglobal == null || dynamicdatasourceglobal == dynamicdatasourceglobal.write) { return dynamicdatasourceglobal.write.name(); } return dynamicdatasourceglobal.read.name(); } public void setwritedatasource(object writedatasource) { this.writedatasource = writedatasource; } public object getwritedatasource() { return writedatasource; } public object getreaddatasource() { return readdatasource; } public void setreaddatasource(object readdatasource) { this.readdatasource = readdatasource; } }
/** * created by idea * user: mashaohua * date: 2016-07-14 10:58 * desc: */ public enum dynamicdatasourceglobal { read, write; }
public final class dynamicdatasourceholder { private static final threadlocal<dynamicdatasourceglobal> holder = new threadlocal<dynamicdatasourceglobal>(); private dynamicdatasourceholder() { // } public static void putdatasource(dynamicdatasourceglobal datasource){ holder.set(datasource); } public static dynamicdatasourceglobal getdatasource(){ return holder.get(); } public static void cleardatasource() { holder.remove(); } }
import org.springframework.jdbc.datasource.datasourcetransactionmanager; import org.springframework.transaction.transactiondefinition; /** * created by idea * user: mashaohua * date: 2016-08-10 14:34 * desc: */ public class dynamicdatasourcetransactionmanager extends datasourcetransactionmanager { /** * 只读事务到读库,读写事务到写库 * @param transaction * @param definition */ @override protected void dobegin(object transaction, transactiondefinition definition) { //设置数据源 boolean readonly = definition.isreadonly(); if(readonly) { dynamicdatasourceholder.putdatasource(dynamicdatasourceglobal.read); } else { dynamicdatasourceholder.putdatasource(dynamicdatasourceglobal.write); } super.dobegin(transaction, definition); } /** * 清理本地线程的数据源 * @param transaction */ @override protected void docleanupaftercompletion(object transaction) { super.docleanupaftercompletion(transaction); dynamicdatasourceholder.cleardatasource(); } }
import org.apache.ibatis.executor.executor; import org.apache.ibatis.executor.keygen.selectkeygenerator; import org.apache.ibatis.mapping.boundsql; import org.apache.ibatis.mapping.mappedstatement; import org.apache.ibatis.mapping.sqlcommandtype; import org.apache.ibatis.plugin.*; import org.apache.ibatis.session.resulthandler; import org.apache.ibatis.session.rowbounds; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.transaction.support.transactionsynchronizationmanager; import java.util.locale; import java.util.map; import java.util.properties; import java.util.concurrent.concurrenthashmap; /** * created by idea * user: mashaohua * date: 2016-08-10 11:09 * desc: */ @intercepts({ @signature(type = executor.class, method = "update", args = { mappedstatement.class, object.class }), @signature(type = executor.class, method = "query", args = { mappedstatement.class, object.class, rowbounds.class, resulthandler.class }) }) public class dynamicplugin implements interceptor { protected static final logger logger = loggerfactory.getlogger(dynamicplugin.class); private static final string regex = ".*insert\\u0020.*|.*delete\\u0020.*|.*update\\u0020.*"; private static final map<string, dynamicdatasourceglobal> cachemap = new concurrenthashmap<>(); @override public object intercept(invocation invocation) throws throwable { boolean synchronizationactive = transactionsynchronizationmanager.issynchronizationactive(); if(!synchronizationactive) { object[] objects = invocation.getargs(); mappedstatement ms = (mappedstatement) objects[0]; dynamicdatasourceglobal dynamicdatasourceglobal = null; if((dynamicdatasourceglobal = cachemap.get(ms.getid())) == null) { //读方法 if(ms.getsqlcommandtype().equals(sqlcommandtype.select)) { //!selectkey 为自增id查询主键(select last_insert_id() )方法,使用主库 if(ms.getid().contains(selectkeygenerator.select_key_suffix)) { dynamicdatasourceglobal = dynamicdatasourceglobal.write; } else { boundsql boundsql = ms.getsqlsource().getboundsql(objects[1]); string sql = boundsql.getsql().tolowercase(locale.china).replaceall("[\\t\\n\\r]", " "); if(sql.matches(regex)) { dynamicdatasourceglobal = dynamicdatasourceglobal.write; } else { dynamicdatasourceglobal = dynamicdatasourceglobal.read; } } }else{ dynamicdatasourceglobal = dynamicdatasourceglobal.write; } logger.warn("设置方法[{}] use [{}] strategy, sqlcommandtype [{}]..", ms.getid(), dynamicdatasourceglobal.name(), ms.getsqlcommandtype().name()); cachemap.put(ms.getid(), dynamicdatasourceglobal); } dynamicdatasourceholder.putdatasource(dynamicdatasourceglobal); } return invocation.proceed(); } @override public object plugin(object target) { if (target instanceof executor) { return plugin.wrap(target, this); } else { return target; } } @override public void setproperties(properties properties) { // } }
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持!
上一篇: ASP.net 路径问题 详细说明
推荐阅读
-
Spring+MyBatis实现数据库读写分离方案
-
thinkphp下MySQL数据库读写分离代码剖析
-
Spring+Mybatis 实现aop数据库读写分离与多数据库源配置操作
-
SpringMVC4+MyBatis+SQL Server2014实现数据库读写分离
-
MySQL的使用中实现读写分离的教程
-
spring集成mybatis实现mysql数据库读写分离
-
CentOS6中MySql5.6数据库主从复制/读写分离(二)
-
MySQL主从复制(Master-Slave)与读写分离(MySQL-Proxy)实践 博客分类: 数据库集群
-
Linux MySQL数据库集群实战 读写分离
-
Linux运维之HAProxy实现web页面的动静分离、读写分离