欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring:一主多从DataSource的读写分离实现 博客分类: spring AbstractRoutingDataSourceSpring 

程序员文章站 2024-03-17 22:22:04
...

对于一主多从的数据库配置, Spring的经典做法是继承AbstractRoutingDataSource. 不过个人觉得AbstractRoutingDataSource还是复杂了些, 于是自己实现了一个DynamicDataSource.

 

import org.springframework.jdbc.datasource.AbstractDataSource;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

/**
 * For read-write separation.
 *
 * @author rickg@iteye.com
 */
class DynamicDataSource extends AbstractDataSource {
    private static final ThreadLocal<Boolean> readSeparable = ThreadLocal.withInitial(() -> Boolean.FALSE);
    //master datasource
    private DataSource master;
    //all-in-one slave datasources
    private SlaveDataSource slave;

    public static void setReadSeparable() {
        readSeparable.set(Boolean.TRUE);
    }

    private boolean isReadSeparable() {
        return readSeparable.get();
    }

    public DynamicDataSource setMaster(DataSource dataSource) {
        this.master = dataSource;
        return this;
    }

    public DynamicDataSource addSlave(DataSource dataSource) {
        if (slave == null) {
            slave = new SlaveDataSource();
        }
        slave.add(dataSource);
        return this;
    }

    @Override
    public Connection getConnection() throws SQLException {
        return getTargetDataSource().getConnection();
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return getTargetDataSource().getConnection(username, password);
    }

    private DataSource getTargetDataSource() {
        return isReadSeparable() && slave != null ? slave : master;
    }

    /**
     * The all-in-one slave datasources.
     */
    static class SlaveDataSource extends AbstractDataSource {
        private List<DataSource> peers = new ArrayList<>();

        public SlaveDataSource add(DataSource dataSource) {
            peers.add(dataSource);
            return this;
        }

        @Override
        public Connection getConnection() throws SQLException {
            return getTargetSlaveDataSource().getConnection();
        }

        @Override
        public Connection getConnection(String username, String password) throws SQLException {
            return getTargetSlaveDataSource().getConnection(username, password);
        }

        private DataSource getTargetSlaveDataSource() {
            return peers.get((int) (Math.random() * peers.size()));
        }
    }
}

 

 

初始化DataSource:

    @Bean
    @Primary
    public DataSource dataSource(){
        DynamicDataSource dynamicDataSource = new DynamicDataSource();
        //dynamicDataSource.setMaster(....);
        //dynamicDataSource.addSlave(...);
        //dynamicDataSource.addSlave(...);
        return dynamicDataSource;
    }

 

 

添加AOP:

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.Ordered;

import java.lang.reflect.Method;

/**
 * For read-write separation.
 *
 * @author rickg@iteye.com
 */
@Aspect
public class DynamicDataSourceAspect implements Ordered {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicDataSourceAspect.class);

    @Pointcut("@annotation(com.myCompany.annotation.ReadSeparable)")
    public void readSeparable() {
    }

    @Before("readSeparable()")
    public void selectDataSourceOnFly(JoinPoint joinPoint) {
        if (!MethodSignature.class.isInstance(joinPoint.getSignature())) {
            return;
        }
        Method targetMethod = ((MethodSignature) joinPoint.getSignature()).getMethod();
        DynamicDataSource.setReadSeparable();
        LOG.info(String.format("ReadSeparable advised on thread: {%s}, method:{%s}", Thread.currentThread().getName(), targetMethod.toString()));
    }

    @Override
    public int getOrder() {
        return -999;
    }

 

 

ReadSeparable注解:

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * For read-write separation.
 *
 * @author rickg@iteye.com
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadSeparable {
    /**
     * Alias for {@link #threshold}.
     */
    int value() default 30;

    /**
     * Define how much latency in seconds is tolerable.
     */
    int threshold() default 30;
}

 

 

在需要读写分离的方法上添加ReadSeparable注解即可. DynamicDataSourceAspect捕获到ReadSeparable的方法, 并设置DynamicDataSource的readSeparable为true.  DynamicDataSource根据readSeparable的值来决定是从master还是从slave来取得connection. 如果是从SlaveDataSource中取的话, 则从多个slave中随机取一个, 此处为负载均衡的扩展点.