SpringBoot+AOP构建多数据源的切换实践
针对微服务架构中常用的设计模块,通常我们都会需要使用到druid作为我们的数据连接池,当架构发生扩展的时候 ,通常面对的数据存储服务器也会渐渐增加,从原本的单库架构逐渐扩展为复杂的多库架构。
当在业务层需要涉及到查询多种同数据库的场景下,我们通常需要在执行sql的时候动态指定对应的datasource。
而spring的abstractroutingdatasource则正好为我们提供了这一功能点,下边我将通过一个简单的基于springboot+aop的案例来实现如何通过自定义注解切换不同的数据源进行读数据操作,同时也将结合部分源码的内容进行讲解。
首先我们需要自定义一个专门用于申明当前java应用程序所需要使用到哪些数据源信息:
package mutidatasource.annotation; import mutidatasource.config.datasourceconfigregister; import mutidatasource.enums.supportdatasourceenum; import org.springframework.context.annotation.import; import org.springframework.stereotype.component; import java.lang.annotation.*; /** * 注入数据源 * * @author idea * @data 2020/3/7 */ @target({elementtype.method,elementtype.type}) @retention(retentionpolicy.runtime) @documented @import(datasourceconfigregister.class) public @interface appdatasource { supportdatasourceenum[] datasourcetype(); }
这里为了方便,我将测试中使用的数据源地址都配置在来enum里面,如果后边需要灵活处理的话,可以将这些配置信息抽取出来放在一些配置中心上边。
package mutidatasource.enums; import lombok.allargsconstructor; import lombok.getter; import lombok.noargsconstructor; /** * 目前支持的数据源信息 * * @author idea * @data 2020/3/7 */ @allargsconstructor @getter public enum supportdatasourceenum { prod_db("jdbc:mysql://localhost:3306/db-prod?useunicode=true&characterencoding=utf8","root","root","db-prod"), dev_db("jdbc:mysql://localhost:3306/db-dev?useunicode=true&characterencoding=utf8","root","root","db-dev"), pre_db("jdbc:mysql://localhost:3306/db-pre?useunicode=true&characterencoding=utf8","root","root","db-pre"); string url; string username; string password; string databasename; @override public string tostring() { return super.tostring().tolowercase(); } }
之所以要创建这个@appdatasource注解,是要在springboot的启动类上边进行标注:
package mutidatasource; import mutidatasource.annotation.appdatasource; import mutidatasource.enums.supportdatasourceenum; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; /** * @author idea * @data 2020/3/7 */ @springbootapplication @appdatasource(datasourcetype = {supportdatasourceenum.dev_db, supportdatasourceenum.pre_db, supportdatasourceenum.prod_db}) public class springapplicationdemo { public static void main(string[] args) { springapplication.run(springapplicationdemo.class); } }
借助springboot的importselector 自定义一个注册器来获取启动类头部的注解所指定的数据源类型:
package mutidatasource.config; import lombok.extern.slf4j.slf4j; import mutidatasource.annotation.appdatasource; import mutidatasource.core.datasourcecontextholder; import mutidatasource.enums.supportdatasourceenum; import org.springframework.context.annotation.importselector; import org.springframework.core.annotation.annotationattributes; import org.springframework.core.type.annotationmetadata; import org.springframework.stereotype.component; /** * @author idea * @data 2020/3/7 */ @slf4j @component public class datasourceconfigregister implements importselector { @override public string[] selectimports(annotationmetadata annotationmetadata) { annotationattributes attributes = annotationattributes.frommap(annotationmetadata.getannotationattributes(appdatasource.class.getname())); system.out.println("####### datasource import #######"); if (null != attributes) { object object = attributes.get("datasourcetype"); supportdatasourceenum[] supportdatasourceenums = (supportdatasourceenum[]) object; for (supportdatasourceenum supportdatasourceenum : supportdatasourceenums) { datasourcecontextholder.adddatasource(supportdatasourceenum); } } return new string[0]; } }
好的,现在我们已经能够获取到对应的数据源类型信息了,这里你会看到一个叫做datasourcecontextholder的角色。这个对象主要是用于对每个请求线程的数据源信息做统一的分配和管理。
在多并发场景下,为了防止不同线程请求的数据源出现“互窜”情况,通常我们都会使用到threadlocal来做处理。为每一个线程都分配一个指定的,属于其内部的副本变量,当当前线程结束之前,记得将对应的线程副本也进行销毁。
package mutidatasource.core; import mutidatasource.enums.supportdatasourceenum; import java.util.hashset; /** * @author idea * @data 2020/3/7 */ public class datasourcecontextholder { private static final hashset<supportdatasourceenum> datasourceset = new hashset<>(); private static final threadlocal<string> databaseholder = new threadlocal<>(); public static void setdatabaseholder(supportdatasourceenum supportdatasourceenum) { databaseholder.set(supportdatasourceenum.tostring()); } /** * 取得当前数据源 * * @return */ public static string getdatabaseholder() { return databaseholder.get(); } /** * 添加数据源 * * @param supportdatasourceenum */ public static void adddatasource(supportdatasourceenum supportdatasourceenum) { datasourceset.add(supportdatasourceenum); } /** * 获取当期应用所支持的所有数据源 * * @return */ public static hashset<supportdatasourceenum> getdatasourceset() { return datasourceset; } /** * 清除上下文数据 */ public static void clear() { databaseholder.remove(); } }
spring内部的abstractroutingdatasource动态路由数据源里面有一个抽象方法叫做
determinecurrentlookupkey,这个方法适用于提供给开发者自定义对应数据源的查询key。
package mutidatasource.core; import org.springframework.jdbc.datasource.lookup.abstractroutingdatasource; /** * @author idea * @data 2020/3/7 */ public class dynamicdatasource extends abstractroutingdatasource { @override protected object determinecurrentlookupkey() { string datasource = datasourcecontextholder.getdatabaseholder(); return datasource; } }
这里我使用的druid数据源,所以配置数据源的配置类如下:这里面我默认该应用配置类prod数据源,用于测试使用。
package mutidatasource.core; import com.alibaba.druid.pool.druiddatasource; import lombok.extern.slf4j.slf4j; import mutidatasource.enums.supportdatasourceenum; import org.springframework.boot.autoconfigure.condition.conditionalonmissingbean; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.primary; import org.springframework.stereotype.component; import javax.sql.datasource; import java.util.hashmap; import java.util.hashset; /** * @author idea * @data 2020/3/7 */ @slf4j @component public class dynamicdatasourceconfiguration { @bean @primary @conditionalonmissingbean public datasource datasource() { system.out.println("init datasource"); dynamicdatasource dynamicdatasource = new dynamicdatasource(); //设置原始数据源 hashmap<object, object> datasourcesmap = new hashmap<>(); hashset<supportdatasourceenum> dataset = datasourcecontextholder.getdatasourceset(); for (supportdatasourceenum supportdatasourceenum : dataset) { datasource datasource = this.createdatasourceproperties(supportdatasourceenum); datasourcesmap.put(supportdatasourceenum.tostring(), datasource); } dynamicdatasource.settargetdatasources(datasourcesmap); dynamicdatasource.setdefaulttargetdatasource(createdatasourceproperties(supportdatasourceenum.pre_db)); return dynamicdatasource; } private synchronized datasource createdatasourceproperties(supportdatasourceenum supportdatasourceenum) { druiddatasource druiddatasource = new druiddatasource(); druiddatasource.seturl(supportdatasourceenum.geturl()); druiddatasource.setusername(supportdatasourceenum.getusername()); druiddatasource.setpassword(supportdatasourceenum.getpassword()); //具体配置 druiddatasource.setmaxactive(100); druiddatasource.setinitialsize(5); druiddatasource.setminidle(1); druiddatasource.setmaxwait(30000); //间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 druiddatasource.settimebetweenconnecterrormillis(60000); return druiddatasource; } }
好了现在一个基础的数据源注入已经可以了,那么我们该如何借助注解来实现动态切换数据源的操作呢?
为此,我设计了一个叫做usingdatasource的注解,通过利用该注解来识别当前线程所需要使用的数据源操作:
package mutidatasource.annotation; import mutidatasource.enums.supportdatasourceenum; import java.lang.annotation.*; /** * @author idea * @data 2020/3/7 */ @target({elementtype.method,elementtype.type}) @retention(retentionpolicy.runtime) @documented public @interface usingdatasource { supportdatasourceenum type() ; }
然后,借助了spring的aop来做切面拦截:
package mutidatasource.core; import lombok.extern.slf4j.slf4j; import mutidatasource.annotation.usingdatasource; import org.aspectj.lang.joinpoint; import org.aspectj.lang.proceedingjoinpoint; import org.aspectj.lang.signature; import org.aspectj.lang.annotation.*; import org.aspectj.lang.reflect.methodsignature; import org.springframework.context.annotation.configuration; import org.springframework.core.annotation.annotationutils; import org.springframework.core.annotation.order; import org.springframework.stereotype.component; import java.lang.reflect.method; import java.util.arrays; /** * @author idea * @data 2020/3/7 */ @slf4j @aspect @configuration public class datasourceaspect { public datasourceaspect(){ system.out.println("this is init"); } @pointcut("@within(mutidatasource.annotation.usingdatasource) || " + "@annotation(mutidatasource.annotation.usingdatasource)") public void pointcut(){ } @before("pointcut() && @annotation(usingdatasource)") public void dobefore(usingdatasource usingdatasource){ log.debug("select datasource---"+usingdatasource.type()); datasourcecontextholder.setdatabaseholder(usingdatasource.type()); } @after("pointcut()") public void doafter(){ datasourcecontextholder.clear(); } }
测试类如下所示:
package mutidatasource.controller; import lombok.extern.slf4j.slf4j; import mutidatasource.annotation.usingdatasource; import mutidatasource.enums.supportdatasourceenum; import org.springframework.beans.factory.annotation.autowired; import org.springframework.jdbc.core.jdbctemplate; import org.springframework.web.bind.annotation.getmapping; import org.springframework.web.bind.annotation.requestmapping; import org.springframework.web.bind.annotation.restcontroller; /** * @author idea * @data 2020/3/8 */ @restcontroller @requestmapping(value = "/test") @slf4j public class testcontroller { @autowired private jdbctemplate jdbctemplate; @getmapping(value = "/testdev") @usingdatasource(type=supportdatasourceenum.dev_db) public void testdev() { showdata(); } @getmapping(value = "/testpre") @usingdatasource(type=supportdatasourceenum.pre_db) public void testpre() { showdata(); } private void showdata() { jdbctemplate.queryforlist("select * from test1").foreach(row -> log.info(row.tostring())); } }
最后 启动springboot服务,通过使用注解即可测试对应功能。
关于abstractroutingdatasource 动态路由数据源的注入原理,
可以看到这个内部类里面包含了多种用于做数据源映射的map数据结构。
在该类的最底部,有一个determinecurrentlookupkey函数,也就是上边我们所提及的使用于查询当前数据源key的方法。
具体代码如下:
/** * retrieve the current target datasource. determines the * {@link #determinecurrentlookupkey() current lookup key}, performs * a lookup in the {@link #settargetdatasources targetdatasources} map, * falls back to the specified * {@link #setdefaulttargetdatasource default target datasource} if necessary. * @see #determinecurrentlookupkey() */ protected datasource determinetargetdatasource() { assert.notnull(this.resolveddatasources, "datasource router not initialized"); //这里面注入我们当前线程使用的数据源 object lookupkey = determinecurrentlookupkey(); //在初始化数据源的时候需要我们去给resolveddatasources进行注入 datasource datasource = this.resolveddatasources.get(lookupkey); if (datasource == null && (this.lenientfallback || lookupkey == null)) { datasource = this.resolveddefaultdatasource; } if (datasource == null) { throw new illegalstateexception("cannot determine target datasource for lookup key [" + lookupkey + "]"); } return datasource; } /** * determine the current lookup key. this will typically be * implemented to check a thread-bound transaction context. * <p>allows for arbitrary keys. the returned key needs * to match the stored lookup key type, as resolved by the * {@link #resolvespecifiedlookupkey} method. */ @nullable protected abstract object determinecurrentlookupkey();
而在该类的afterpropertiesset里面,又有对于初始化数据源的注入操作,这里面的targetdatasources 正是上文中我们对在初始化数据源时候注入的信息。
@override public void afterpropertiesset() { if (this.targetdatasources == null) { throw new illegalargumentexception("property 'targetdatasources' is required"); } this.resolveddatasources = new hashmap<>(this.targetdatasources.size()); this.targetdatasources.foreach((key, value) -> { object lookupkey = resolvespecifiedlookupkey(key); datasource datasource = resolvespecifieddatasource(value); this.resolveddatasources.put(lookupkey, datasource); }); if (this.defaulttargetdatasource != null) { this.resolveddefaultdatasource = resolvespecifieddatasource(this.defaulttargetdatasource); } }