欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Jedis一致性哈希实现

程序员文章站 2024-03-19 22:40:46
...

一致性哈希算法介绍:一致性哈希算法

导入Jedis依赖

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

代码如下

import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPool;

import java.util.Arrays;
import java.util.List;

public class Test {
    public static void main(String[] args) {
        //连接池配置
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(10);
        poolConfig.setMaxIdle(1);
        poolConfig.setMaxWaitMillis(200);
        poolConfig.setTestOnBorrow(false);
        poolConfig.setTestOnReturn(false);

        //分片信息
        JedisShardInfo shardInfo1 = new JedisShardInfo("192.168.137.128", 6379, 500);
        JedisShardInfo shardInfo2 = new JedisShardInfo("192.168.137.129", 6379, 500);

        //根据分片信息创建连接池
        List<JedisShardInfo> infoList = Arrays.asList(shardInfo1, shardInfo2);
        ShardedJedisPool jedisPool = new ShardedJedisPool(poolConfig, infoList);

        ShardedJedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.set("test", "this is a test");
        } finally {
            if(jedis!=null)
                jedis.close();
        }

        try {
            jedis = jedisPool.getResource();
            String value = jedis.get("test");
            System.out.println(value);
        } finally {
            if(jedis!=null)
                jedis.close();
        }
    }
}

实际效果如下:

D:\Redis-x64-2.8.2402>redis-cli.exe -h 192.168.137.128 -p 6379
192.168.137.128:6379> get test
(nil)
192.168.137.128:6379> exit

D:\Redis-x64-2.8.2402>redis-cli.exe -h 192.168.137.129 -p 6379
192.168.137.129:6379> get test
"this is a test"
192.168.137.129:6379> exit

由于是从连接池获取连接信息,先来看下连接池的实现方式

public class ShardedJedisPool extends Pool<ShardedJedis> {
    public ShardedJedisPool(final GenericObjectPoolConfig poolConfig, 
            List<JedisShardInfo> shards) {
        this(poolConfig, shards, Hashing.MURMUR_HASH);
    }
    
    public ShardedJedisPool(final GenericObjectPoolConfig poolConfig,
            List<JedisShardInfo> shards, Hashing algo) {
        this(poolConfig, shards, algo, null);
    }

    public ShardedJedisPool(final GenericObjectPoolConfig poolConfig, 
            List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {          
        super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern));
    }
    ......
}
public abstract class Pool<T> implements Closeable {
    
    // 连接池底层使用apache common pool 2.0实现
    // import org.apache.commons.pool2.PooledObjectFactory;
    // import org.apache.commons.pool2.impl.GenericObjectPool;
    protected GenericObjectPool<T> internalPool;

    public Pool(final GenericObjectPoolConfig poolConfig, 
            PooledObjectFactory<T> factory) {
        initPool(poolConfig, factory);
    }
    
    // factory = ShardedJedisFactory
    public void initPool(final GenericObjectPoolConfig poolConfig, 
            PooledObjectFactory<T> factory) {
        if (this.internalPool != null) {
            try {
                closeInternalPool();
            } catch (Exception e) {}
        }
        this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
    }
    ......
}

连接池初始化完成后,下一步是从连接池获取连接

public class ShardedJedisPool extends Pool<ShardedJedis> {
    ......
    @Override
    public ShardedJedis getResource() {
        ShardedJedis jedis = super.getResource();
        jedis.setDataSource(this);
        return jedis;
    }
    ......
}
public abstract class Pool<T> implements Closeable {
    ......
    public T getResource() {
        try {
            return internalPool.borrowObject();
        } catch (NoSuchElementException nse) {
               throw new JedisException("Could not get a resource from the pool", nse);
        } catch (Exception e) {
                throw new JedisConnectionException(
                        "Could not get a resource from the pool", e);
        }
    }
    ......
}
public class GenericObjectPool<T> extends BaseGenericObjectPool<T> 
        implements ObjectPool<T>, GenericObjectPoolMXBean, UsageTracking<T> {
    public T borrowObject() throws Exception {
        return this.borrowObject(this.getMaxWaitMillis());
    }

    public T borrowObject(long borrowMaxWaitMillis) throws Exception {
        ......
        p = (PooledObject)this.idleObjects.pollFirst();
        if(p == null) {
            p = this.create();
        ......
    }

    // ShardedJedisFactory
    private PooledObject<T> create() throws Exception {
        ......
        p = this.factory.makeObject();
        ......
    }
    ......
}

看下ShardedJedisFactory的实现

public class ShardedJedisPool extends Pool<ShardedJedis> {
    ......
    private static class ShardedJedisFactory 
            implements PooledObjectFactory<ShardedJedis> {
        
        private List<JedisShardInfo> shards;
        private Hashing algo;
        private Pattern keyTagPattern;
        
        // shards = 分片信息
        // algo = Hashing.MURMUR_HASH
        // keyTagPattern = null
        public ShardedJedisFactory(List<JedisShardInfo> shards, 
                Hashing algo, Pattern keyTagPattern) {
            this.shards = shards;
            this.algo = algo;
            this.keyTagPattern = keyTagPattern;
        }

        @Override
        public PooledObject<ShardedJedis> makeObject() throws Exception {
            ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
            return new DefaultPooledObject<ShardedJedis>(jedis);
        }
        ......
    }
    ......
}

每个连接都保存了所有的Redis实例信息

public class ShardedJedis extends BinaryShardedJedis 
        implements JedisCommands, Closeable {

    // shards = 分片信息
    // algo = Hashing.MURMUR_HASH
    // keyTagPattern = null
    public ShardedJedis(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
        super(shards, algo, keyTagPattern);
    }
    ......
}
public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo> 
        implements BinaryJedisCommands {

    // shards = 分片信息
    // algo = Hashing.MURMUR_HASH
    // keyTagPattern = null
    public BinaryShardedJedis(List<JedisShardInfo> shards, Hashing algo, 
            Pattern keyTagPattern) {
        super(shards, algo, keyTagPattern);
    }
    ......
}

使用Redis实例信息构建一致性哈希环

public class Sharded<R, S extends ShardInfo<R>> {
    // TreeMap<hashkey, "192.168.137.128:6379">
    private TreeMap<Long, S> nodes;
    // Map<"192.168.137.128:6379", Jedis>
    private final Map<ShardInfo<R>, R> resources = 
            new LinkedHashMap<ShardInfo<R>, R>();

    // shards = 分片信息
    // algo = Hashing.MURMUR_HASH
    // keyTagPattern = null
    public Sharded(List<S> shards, Hashing algo, Pattern tagPattern) {
        this.algo = algo;
        this.tagPattern = tagPattern;
        initialize(shards);
    }

    // 构建一致性哈希环
    private void initialize(List<S> shards) {
        nodes = new TreeMap<Long, S>();
        
        for (int i = 0; i != shards.size(); ++i) {
            final S shardInfo = shards.get(i);
            if (shardInfo.getName() == null) 
                // 乘160是为了构建虚拟节点
                for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
                    nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
                }
            else 
                for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
                    nodes.put(this.algo.hash(shardInfo.getName() 
                            + "*" + shardInfo.getWeight() + n), shardInfo);
                }
            resources.put(shardInfo, shardInfo.createResource());
        }
    }

向Redis中设置缓时,根据缓存的key确定写入的Redis实例

public class ShardedJedis extends BinaryShardedJedis 
        implements JedisCommands, Closeable {

    // key = "test", value = "this is a test"
    public String set(String key, String value) {
        Jedis j = getShard(key);
        return j.set(key, value);
    }
    ......
}  
public class Sharded<R, S extends ShardInfo<R>> {
    // TreeMap<hashkey, "192.168.137.128:6379">
    private TreeMap<Long, S> nodes;
    // Map<"192.168.137.128:6379", Jedis>
    private final Map<ShardInfo<R>, R> resources = 
            new LinkedHashMap<ShardInfo<R>, R>();

    public R getShard(String key) {
        return resources.get(getShardInfo(key));
    }

    public S getShardInfo(String key) {
        return getShardInfo(SafeEncoder.encode(getKeyTag(key)));
    }

    public S getShardInfo(byte[] key) {
        SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
        if (tail.isEmpty()) {
            return nodes.get(nodes.firstKey());
        }
        return tail.get(tail.firstKey());
    }
    ......
}