Jedis一致性哈希实现
程序员文章站
2024-03-19 22:40:46
...
一致性哈希算法介绍:一致性哈希算法
导入Jedis依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
代码如下
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPool;
import java.util.Arrays;
import java.util.List;
public class Test {
public static void main(String[] args) {
//连接池配置
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(10);
poolConfig.setMaxIdle(1);
poolConfig.setMaxWaitMillis(200);
poolConfig.setTestOnBorrow(false);
poolConfig.setTestOnReturn(false);
//分片信息
JedisShardInfo shardInfo1 = new JedisShardInfo("192.168.137.128", 6379, 500);
JedisShardInfo shardInfo2 = new JedisShardInfo("192.168.137.129", 6379, 500);
//根据分片信息创建连接池
List<JedisShardInfo> infoList = Arrays.asList(shardInfo1, shardInfo2);
ShardedJedisPool jedisPool = new ShardedJedisPool(poolConfig, infoList);
ShardedJedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set("test", "this is a test");
} finally {
if(jedis!=null)
jedis.close();
}
try {
jedis = jedisPool.getResource();
String value = jedis.get("test");
System.out.println(value);
} finally {
if(jedis!=null)
jedis.close();
}
}
}
实际效果如下:
D:\Redis-x64-2.8.2402>redis-cli.exe -h 192.168.137.128 -p 6379
192.168.137.128:6379> get test
(nil)
192.168.137.128:6379> exit
D:\Redis-x64-2.8.2402>redis-cli.exe -h 192.168.137.129 -p 6379
192.168.137.129:6379> get test
"this is a test"
192.168.137.129:6379> exit
由于是从连接池获取连接信息,先来看下连接池的实现方式
public class ShardedJedisPool extends Pool<ShardedJedis> {
public ShardedJedisPool(final GenericObjectPoolConfig poolConfig,
List<JedisShardInfo> shards) {
this(poolConfig, shards, Hashing.MURMUR_HASH);
}
public ShardedJedisPool(final GenericObjectPoolConfig poolConfig,
List<JedisShardInfo> shards, Hashing algo) {
this(poolConfig, shards, algo, null);
}
public ShardedJedisPool(final GenericObjectPoolConfig poolConfig,
List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern));
}
......
}
public abstract class Pool<T> implements Closeable {
// 连接池底层使用apache common pool 2.0实现
// import org.apache.commons.pool2.PooledObjectFactory;
// import org.apache.commons.pool2.impl.GenericObjectPool;
protected GenericObjectPool<T> internalPool;
public Pool(final GenericObjectPoolConfig poolConfig,
PooledObjectFactory<T> factory) {
initPool(poolConfig, factory);
}
// factory = ShardedJedisFactory
public void initPool(final GenericObjectPoolConfig poolConfig,
PooledObjectFactory<T> factory) {
if (this.internalPool != null) {
try {
closeInternalPool();
} catch (Exception e) {}
}
this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
}
......
}
连接池初始化完成后,下一步是从连接池获取连接
public class ShardedJedisPool extends Pool<ShardedJedis> {
......
@Override
public ShardedJedis getResource() {
ShardedJedis jedis = super.getResource();
jedis.setDataSource(this);
return jedis;
}
......
}
public abstract class Pool<T> implements Closeable {
......
public T getResource() {
try {
return internalPool.borrowObject();
} catch (NoSuchElementException nse) {
throw new JedisException("Could not get a resource from the pool", nse);
} catch (Exception e) {
throw new JedisConnectionException(
"Could not get a resource from the pool", e);
}
}
......
}
public class GenericObjectPool<T> extends BaseGenericObjectPool<T>
implements ObjectPool<T>, GenericObjectPoolMXBean, UsageTracking<T> {
public T borrowObject() throws Exception {
return this.borrowObject(this.getMaxWaitMillis());
}
public T borrowObject(long borrowMaxWaitMillis) throws Exception {
......
p = (PooledObject)this.idleObjects.pollFirst();
if(p == null) {
p = this.create();
......
}
// ShardedJedisFactory
private PooledObject<T> create() throws Exception {
......
p = this.factory.makeObject();
......
}
......
}
看下ShardedJedisFactory的实现
public class ShardedJedisPool extends Pool<ShardedJedis> {
......
private static class ShardedJedisFactory
implements PooledObjectFactory<ShardedJedis> {
private List<JedisShardInfo> shards;
private Hashing algo;
private Pattern keyTagPattern;
// shards = 分片信息
// algo = Hashing.MURMUR_HASH
// keyTagPattern = null
public ShardedJedisFactory(List<JedisShardInfo> shards,
Hashing algo, Pattern keyTagPattern) {
this.shards = shards;
this.algo = algo;
this.keyTagPattern = keyTagPattern;
}
@Override
public PooledObject<ShardedJedis> makeObject() throws Exception {
ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
return new DefaultPooledObject<ShardedJedis>(jedis);
}
......
}
......
}
每个连接都保存了所有的Redis实例信息
public class ShardedJedis extends BinaryShardedJedis
implements JedisCommands, Closeable {
// shards = 分片信息
// algo = Hashing.MURMUR_HASH
// keyTagPattern = null
public ShardedJedis(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
super(shards, algo, keyTagPattern);
}
......
}
public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo>
implements BinaryJedisCommands {
// shards = 分片信息
// algo = Hashing.MURMUR_HASH
// keyTagPattern = null
public BinaryShardedJedis(List<JedisShardInfo> shards, Hashing algo,
Pattern keyTagPattern) {
super(shards, algo, keyTagPattern);
}
......
}
使用Redis实例信息构建一致性哈希环
public class Sharded<R, S extends ShardInfo<R>> {
// TreeMap<hashkey, "192.168.137.128:6379">
private TreeMap<Long, S> nodes;
// Map<"192.168.137.128:6379", Jedis>
private final Map<ShardInfo<R>, R> resources =
new LinkedHashMap<ShardInfo<R>, R>();
// shards = 分片信息
// algo = Hashing.MURMUR_HASH
// keyTagPattern = null
public Sharded(List<S> shards, Hashing algo, Pattern tagPattern) {
this.algo = algo;
this.tagPattern = tagPattern;
initialize(shards);
}
// 构建一致性哈希环
private void initialize(List<S> shards) {
nodes = new TreeMap<Long, S>();
for (int i = 0; i != shards.size(); ++i) {
final S shardInfo = shards.get(i);
if (shardInfo.getName() == null)
// 乘160是为了构建虚拟节点
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
}
else
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
nodes.put(this.algo.hash(shardInfo.getName()
+ "*" + shardInfo.getWeight() + n), shardInfo);
}
resources.put(shardInfo, shardInfo.createResource());
}
}
向Redis中设置缓时,根据缓存的key确定写入的Redis实例
public class ShardedJedis extends BinaryShardedJedis
implements JedisCommands, Closeable {
// key = "test", value = "this is a test"
public String set(String key, String value) {
Jedis j = getShard(key);
return j.set(key, value);
}
......
}
public class Sharded<R, S extends ShardInfo<R>> {
// TreeMap<hashkey, "192.168.137.128:6379">
private TreeMap<Long, S> nodes;
// Map<"192.168.137.128:6379", Jedis>
private final Map<ShardInfo<R>, R> resources =
new LinkedHashMap<ShardInfo<R>, R>();
public R getShard(String key) {
return resources.get(getShardInfo(key));
}
public S getShardInfo(String key) {
return getShardInfo(SafeEncoder.encode(getKeyTag(key)));
}
public S getShardInfo(byte[] key) {
SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
if (tail.isEmpty()) {
return nodes.get(nodes.firstKey());
}
return tail.get(tail.firstKey());
}
......
}