一致性哈希算法(分库分表,负载均衡,实践分享)
一、一致性hash算法的介绍
在分布式应用中,应该来说使用到hash最多的地方就是rpc负载均衡和分库分表,通常对于正式意义上的分布式应用来说,扩容和收缩是一个半自动化的过程,在此期间,应用基本上是可用的,所以不能发生大规模动荡的意外,为了最小化潜在的影响,一致性hash算法就扮演了极为重要的角色。
consistent hashing 算法早在 1997 年就在论文 Consistent hashing and random trees 中被提出,目前在cache 系统中应用越来越广泛;
1 基本场景
比如你有 N 个 cache 服务器(后面简称 cache ),那么如何将一个对象 object 映射到 N 个 cache 上呢,你很可能会采用类似下面的通用方法计算 object 的 hash 值,然后均匀的映射到到 N 个 cache ;
hash(object)%N
一切都运行正常,再考虑如下的两种情况;
1 一个 cache 服务器 m down 掉了(在实际应用中必须要考虑这种情况),这样所有映射到 cache m 的对象都会失效,怎么办,需要把 cache m 从 cache 中移除,这时候 cache 是 N-1 台,映射公式变成了 hash(object)%(N-1) ;
2 由于访问加重,需要添加 cache ,这时候 cache 是 N+1 台,映射公式变成了 hash(object)%(N+1) ;
1 和 2 意味着什么?这意味着突然之间几乎所有的 cache 都失效了。对于服务器而言,这是一场灾难,洪水般的访问都会直接冲向后台服务器;
再来考虑第三个问题,由于硬件能力越来越强,你可能想让后面添加的节点多做点活,显然上面的 hash 算法也做不到。
有什么方法可以改变这个状况呢,这就是 consistent hashing...
2 hash 算法和单调性
Hash 算法的一个衡量指标是单调性( Monotonicity ),定义如下:
单调性是指如果已经有一些内容通过哈希分派到了相应的缓冲中,又有新的缓冲加入到系统中。哈希的结果应能够保证原有已分配的内容可以被映射到新的缓冲中去,而不会被映射到旧的缓冲集合中的其他缓冲区。
容易看到,上面的简单 hash 算法 hash(object)%N 难以满足单调性要求。
3 consistent hashing 算法的原理
consistent hashing 是一种 hash 算法,简单的说,在移除 / 添加一个 cache 时,它能够尽可能小的改变已存在key 映射关系,尽可能的满足单调性的要求。
下面就来按照 5 个步骤简单讲讲 consistent hashing 算法的基本原理。
3.1 环形hash 空间
考虑通常的 hash 算法都是将 value 映射到一个 32 为的 key 值,也即是 0~2^32-1 次方的数值空间;我们可以将这个空间想象成一个首( 0 )尾( 2^32-1 )相接的圆环,如下面图 1 所示的那样。
图 1 环形 hash 空间
3.2 把对象映射到hash 空间
接下来考虑 4 个对象 object1~object4 ,通过 hash 函数计算出的 hash 值 key 在环上的分布如图 2 所示。
hash(object1) = key1;
… …
hash(object4) = key4;
图 2 4 个对象的 key 值分布
3.3 把cache 映射到hash 空间
Consistent hashing 的基本思想就是将对象和 cache 都映射到同一个 hash 数值空间中,并且使用相同的 hash算法。
假设当前有 A,B 和 C 共 3 台 cache ,那么其映射结果将如图 3 所示,他们在 hash 空间中,以对应的 hash 值排列。
hash(cache A) = key A;
… …
hash(cache C) = key C;
图 3 cache 和对象的 key 值分布
说到这里,顺便提一下 cache 的 hash 计算,一般的方法可以使用 cache 机器的 IP 地址或者机器名作为 hash输入。
3.4 把对象映射到cache
现在 cache 和对象都已经通过同一个 hash 算法映射到 hash 数值空间中了,接下来要考虑的就是如何将对象映射到 cache 上面了。
在这个环形空间中,如果沿着顺时针方向从对象的 key 值出发,直到遇见一个 cache ,那么就将该对象存储在这个 cache 上,因为对象和 cache 的 hash 值是固定的,因此这个 cache 必然是唯一和确定的。这样不就找到了对象和 cache 的映射方法了吗?!
依然继续上面的例子(参见图 3 ),那么根据上面的方法,对象 object1 将被存储到 cache A 上; object2 和object3 对应到 cache C ; object4 对应到 cache B ;
3.5 考察cache 的变动
前面讲过,通过 hash 然后求余的方法带来的最大问题就在于不能满足单调性,当 cache 有所变动时, cache会失效,进而对后台服务器造成巨大的冲击,现在就来分析分析 consistent hashing 算法。
3.5.1 移除 cache
考虑假设 cache B 挂掉了,根据上面讲到的映射方法,这时受影响的将仅是那些沿 cache B 逆时针遍历直到下一个 cache ( cache C )之间的对象,也即是本来映射到 cache B 上的那些对象。
因此这里仅需要变动对象 object4 ,将其重新映射到 cache C 上即可;参见图 4 。
图 4 Cache B 被移除后的 cache 映射
3.5.2 添加 cache
再考虑添加一台新的 cache D 的情况,假设在这个环形 hash 空间中, cache D 被映射在对象 object2 和object3 之间。这时受影响的将仅是那些沿 cache D 逆时针遍历直到下一个 cache ( cache B )之间的对象(它们是也本来映射到 cache C 上对象的一部分),将这些对象重新映射到 cache D 上即可。
因此这里仅需要变动对象 object2 ,将其重新映射到 cache D 上;参见图 5 。
图 5 添加 cache D 后的映射关系
4 虚拟节点
考量 Hash 算法的另一个指标是平衡性 (Balance) ,定义如下:
平衡性:指哈希的结果能够尽可能分布到所有的缓冲中去,这样可以使得所有的缓冲空间都得到利用。
hash 算法并不是保证绝对的平衡,如果 cache 较少的话,对象并不能被均匀的映射到 cache 上,比如在上面的例子中,仅部署 cache A 和 cache C 的情况下,在 4 个对象中, cache A 仅存储了 object1 ,而 cache C 则存储了object2 、 object3 和 object4 ;分布是很不均衡的。
为了解决这种情况, consistent hashing 引入了“虚拟节点”的概念,它可以如下定义:
“虚拟节点”( virtual node )是实际节点在 hash 空间的复制品( replica ),一实际个节点对应了若干个“虚拟节点”,这个对应个数也成为“复制个数”,“虚拟节点”在 hash 空间中以 hash 值排列。
仍以仅部署 cache A 和 cache C 的情况为例,在图 4 中我们已经看到, cache 分布并不均匀。现在我们引入虚拟节点,并设置“复制个数”为 2 ,这就意味着一共会存在 4 个“虚拟节点”, cache A1, cache A2 代表了cache A ; cache C1, cache C2 代表了 cache C ;假设一种比较理想的情况,参见图 6 。
图 6 引入“虚拟节点”后的映射关系
此时,对象到“虚拟节点”的映射关系为:
objec1->cache A2 ; objec2->cache A1 ; objec3->cache C1 ; objec4->cache C2 ;
因此对象 object1 和 object2 都被映射到了 cache A 上,而 object3 和 object4 映射到了 cache C 上;平衡性有了很大提高。
引入“虚拟节点”后,映射关系就从 { 对象 -> 节点 } 转换到了 { 对象 -> 虚拟节点 } 。查询物体所在 cache 时的映射关系如图 7 所示。
图 7 查询对象所在 cache
“虚拟节点”的 hash 计算可以采用对应节点的 IP 地址加数字后缀的方式。例如假设 cache A 的 IP 地址为202.168.14.241 。
引入“虚拟节点”前,计算 cache A 的 hash 值:
Hash(“202.168.14.241”);
引入“虚拟节点”后,计算“虚拟节”点 cache A1 和 cache A2 的 hash 值:
Hash(“202.168.14.241#1”); // cache A1
Hash(“202.168.14.241#2”); // cache A2
二、一致性hash算法的Java实现
1、不带虚拟节点的
package hash;
import java.util.SortedMap;
import java.util.TreeMap;
/**
* 不带虚拟节点的一致性Hash算法
* 重点:1.如何造一个hash环,2.如何在哈希环上映射服务器节点,3.如何找到对应的节点
*/
public class ConsistentHashingWithoutVirtualNode {
//待添加入Hash环的服务器列表
private static String[] servers = { "192.168.0.0:111", "192.168.0.1:111",
"192.168.0.2:111", "192.168.0.3:111", "192.168.0.4:111" };
//key表示服务器的hash值,value表示服务器
private static SortedMap<Integer, String> sortedMap = new TreeMap<Integer, String>();
//程序初始化,将所有的服务器放入sortedMap中
static {
for (int i=0; i<servers.length; i++) {
int hash = getHash(servers[i]);
System.out.println("[" + servers[i] + "]加入集合中, 其Hash值为" + hash);
sortedMap.put(hash, servers[i]);
}
System.out.println();
}
//得到应当路由到的结点
private static String getServer(String key) {
//得到该key的hash值
int hash = getHash(key);
//得到大于该Hash值的所有Map
SortedMap<Integer, String> subMap = sortedMap.tailMap(hash);
if(subMap.isEmpty()){
//如果没有比该key的hash值大的,则从第一个node开始
Integer i = sortedMap.firstKey();
//返回对应的服务器
return sortedMap.get(i);
}else{
//第一个Key就是顺时针过去离node最近的那个结点
Integer i = subMap.firstKey();
//返回对应的服务器
return subMap.get(i);
}
}
//使用FNV1_32_HASH算法计算服务器的Hash值,这里不使用重写hashCode的方法,最终效果没区别
private static int getHash(String str) {
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < str.length(); i++)
hash = (hash ^ str.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
// 如果算出来的值为负数则取其绝对值
if (hash < 0)
hash = Math.abs(hash);
return hash;
}
public static void main(String[] args) {
String[] keys = {"太阳", "月亮", "星星"};
for(int i=0; i<keys.length; i++)
System.out.println("[" + keys[i] + "]的hash值为" + getHash(keys[i])
+ ", 被路由到结点[" + getServer(keys[i]) + "]");
}
}
执行结果:
[192.168.0.0:111]加入集合中, 其Hash值为575774686
[192.168.0.1:111]加入集合中, 其Hash值为8518713
[192.168.0.2:111]加入集合中, 其Hash值为1361847097
[192.168.0.3:111]加入集合中, 其Hash值为1171828661
[192.168.0.4:111]加入集合中, 其Hash值为1764547046
[太阳]的hash值为1977106057, 被路由到结点[192.168.0.1:111]
[月亮]的hash值为1132637661, 被路由到结点[192.168.0.3:111]
[星星]的hash值为880019273, 被路由到结点[192.168.0.3:111]
2、带虚拟节点的
package hash;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
/**
* 带虚拟节点的一致性Hash算法
*/
public class ConsistentHashingWithoutVirtualNode {
//待添加入Hash环的服务器列表
private static String[] servers = {"192.168.0.0:111", "192.168.0.1:111", "192.168.0.2:111",
"192.168.0.3:111", "192.168.0.4:111"};
//真实结点列表,考虑到服务器上线、下线的场景,即添加、删除的场景会比较频繁,这里使用LinkedList会更好
private static List<String> realNodes = new LinkedList<String>();
//虚拟节点,key表示虚拟节点的hash值,value表示虚拟节点的名称
private static SortedMap<Integer, String> virtualNodes = new TreeMap<Integer, String>();
//虚拟节点的数目,这里写死,为了演示需要,一个真实结点对应5个虚拟节点
private static final int VIRTUAL_NODES = 5;
static{
//先把原始的服务器添加到真实结点列表中
for(int i=0; i<servers.length; i++)
realNodes.add(servers[i]);
//再添加虚拟节点,遍历LinkedList使用foreach循环效率会比较高
for (String str : realNodes){
for(int i=0; i<VIRTUAL_NODES; i++){
String virtualNodeName = str + "&&VN" + String.valueOf(i);
int hash = getHash(virtualNodeName);
System.out.println("虚拟节点[" + virtualNodeName + "]被添加, hash值为" + hash);
virtualNodes.put(hash, virtualNodeName);
}
}
System.out.println();
}
//使用FNV1_32_HASH算法计算服务器的Hash值,这里不使用重写hashCode的方法,最终效果没区别
private static int getHash(String str){
final int p = 16777619;
int hash = (int)2166136261L;
for (int i = 0; i < str.length(); i++)
hash = (hash ^ str.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
// 如果算出来的值为负数则取其绝对值
if (hash < 0)
hash = Math.abs(hash);
return hash;
}
//得到应当路由到的结点
private static String getServer(String key){
//得到该key的hash值
int hash = getHash(key);
// 得到大于该Hash值的所有Map
SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
String virtualNode;
if(subMap.isEmpty()){
//如果没有比该key的hash值大的,则从第一个node开始
Integer i = virtualNodes.firstKey();
//返回对应的服务器
virtualNode = virtualNodes.get(i);
}else{
//第一个Key就是顺时针过去离node最近的那个结点
Integer i = subMap.firstKey();
//返回对应的服务器
virtualNode = subMap.get(i);
}
//virtualNode虚拟节点名称要截取一下
if(StringUtils.isNotBlank(virtualNode)){
return virtualNode.substring(0, virtualNode.indexOf("&&"));
}
return null;
}
public static void main(String[] args){
String[] keys = {"太阳", "月亮", "星星"};
for(int i=0; i<keys.length; i++)
System.out.println("[" + keys[i] + "]的hash值为" +
getHash(keys[i]) + ", 被路由到结点[" + getServer(keys[i]) + "]");
}
}
执行结果:
虚拟节点[192.168.0.0:111&&VN0]被添加, hash值为1686427075
虚拟节点[192.168.0.0:111&&VN1]被添加, hash值为354859081
虚拟节点[192.168.0.0:111&&VN2]被添加, hash值为1306497370
虚拟节点[192.168.0.0:111&&VN3]被添加, hash值为817889914
虚拟节点[192.168.0.0:111&&VN4]被添加, hash值为396663629
...
虚拟节点[192.168.0.4:111&&VN0]被添加, hash值为586921010
虚拟节点[192.168.0.4:111&&VN1]被添加, hash值为184078390
虚拟节点[192.168.0.4:111&&VN2]被添加, hash值为1331645117
虚拟节点[192.168.0.4:111&&VN3]被添加, hash值为918790803
虚拟节点[192.168.0.4:111&&VN4]被添加, hash值为1232193678
[太阳]的hash值为1977106057, 被路由到结点[192.168.0.2:111]
[月亮]的hash值为1132637661, 被路由到结点[192.168.0.4:111]
[星星]的hash值为880019273, 被路由到结点[192.168.0.3:111]
三、项目实践
项目背景
- 项目是一个实验室项目
- 其中有一个表叫做试验表,用于存储车型的试验数据,每个试验大概有6000条数据
- 总计初期约有2万个车型,每个车型初期包含超过50个试验。后期还会动态增长
- 试验表中的数据仅需要根据车型试验ID能取出来即可,没有其他更复杂的业务逻辑
方案决策
项目正式上线初期,数据量不会直接爆发式增长到90亿,需要时间上的积累(逐步做实验),最终可能达到90亿数据,甚至超过90亿数据。
按照我们实际了解情况,oracle存储数据量达到1千万的时候,性能擅可。而Oracle官方的说法,如单表存储1g有分区(大致500万数据),查询效率非常高。而试验表中仅四个字段,每条数据数据量较小。所以我们最终决定以1000万为节点,水平拆表。当表数据达到1千万时,即增加下一波表。进行数据自动迁移。
按照90亿的总量,1000万数据一个表的划分,最终大致会产生900个左右的表。所以我们最终使用了4个数据库。1个存储其他业务模块的表,3个存储此大数据表。每个数据库大致有300张表。性能上和数量上都可达到我们的要求。
相关表结构
试验信息表(EXPERIMENT_MESSAGE),挂接车型和试验的关系。试验数据表(EXPERIMENT_DATA),存储试验数据
试验信息表:
字段 | 含义 |
---|---|
ID | 主键,采用UUID生成 |
EXPERIMENT_ID | 试验表中的ID |
CAR_ID | 车型表中的ID |
... | 其余数十个字段省略 |
试验数据表:
字段 | 含义 |
---|---|
ID | 主键,采用UUID生成 |
EXPERIMENT_MESSAGE_ID | 对应的实验信息id |
X_VALUE | 试验数据X值 |
Y_VALUE | 试验数据Y值 |
我们采用作一致性hash的key,就是试验数据表中的EXPERIMENT_MESSAGE_ID
字段。也就是说,每个试验数据表,不存则以,存则一次性大致有6000条数据。取同理。
一致性Hash算法实现
一致性Hash算法的hash部分,采用了著名的ketama算法。在此,我们不多讨论ketama算法的细节,若各位有兴趣,请查阅ketama算法
public long hash(String key) {
if (md5 == null) {
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("no md5 algorythm found");
}
}
md5.reset();
md5.update(key.getBytes());
byte[] bKey = md5.digest();
long res = ((long) (bKey[3] & 0xFF) << 24) |
((long) (bKey[2] & 0xFF) << 16) |
((long) (bKey[1] & 0xFF) << 8) |
(long) (bKey[0] & 0xFF);
return res & 0xffffffffL;
}
有了Hash的算法,接下来就要构造Hash环了。Hash环采用的SortedMap数据结构实现。
private final SortedMap<Long, T> circle = new TreeMap<Long, T>();
其中添加节点和移除节点部分,需要根据hash算法得到节点在环上的位置,具体代码如下:
/**
* 添加虚拟节点
* numberOfReplicas为虚拟节点的数量,初始化hash环的时候传入,我们使用300个虚拟节点
* @param node
*/
public void add(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
circle.put(hashFunction.hash(node.toString() + i), node);
}
}
/**
* 移除节点
* @param node
*/
public void remove(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
circle.remove(hashFunction.hash(node.toString() + i));
}
}
而hash环中得到节点部分比较特殊,根据一致性hash算法的介绍,得到hash环中的节点,实际上是计算出的hash值顺时针找到的第一个节点。
/**
* 获得一个最近的顺时针节点
* @param key 为给定键取Hash,取得顺时针方向上最近的一个虚拟节点对应的实际节点
* @return
*/
public T get(Object key) {
if (circle.isEmpty()) {
return null;
}
long hash = hashFunction.hash((String) key);
if (!circle.containsKey(hash)) {
//返回此映射的部分视图,其键大于等于 hash
SortedMap<Long, T> tailMap = circle.tailMap(hash);
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}
return circle.get(hash);
}
单表拆分实践
上面完成了一致性hash算法的实现,包含了hash算法和hash环的实现。接下来就要处理具体业务中,如何使用这个hash环和算法了。
我们业务中,主要操作这张表的数据,也就是增删查。然后我们数据库拆分成了3个,所以需要增删查的操作基本一致,都是先通过一致性hash得到库,再通过一致性hash得到表。
获取数据库名的操作如下,获取到数据库后,根据数据库名到对应的连接池中获取连接。
/**
* 根据试验信息id获取其所在库名
* DatabaseType为我们数据的枚举
* @return 数据库的名称
**/
private String getDataBase(String experimentMessageId) {
//获取数据源
DatabaseType[] databasetype = DatabaseType.values();
List<String> dataBaselist = new ArrayList<>();
Map<String, DatabaseType> map = new HashMap<>();
for (DatabaseType d:databasetype) {
if (!d.equals(DatabaseType.KC)) {
dataBaselist.add(d.toString());
map.put(d.toString(), d);
}
}
//获取数据源hash
ConsistentHash<String> dataBaseCon = getConsistentHash(dataBaselist);
//获取id所在数据源
String dataBase = dataBaseCon.get(experimentMessageId);
return dataBase;
}
获取表名的操作如下,获取到数据库后,在对应的数据库中找到需要的表,再从该表中查询数据。
/**
* 根据试验信息id获取其试验数据所在表
* @return
**/
public String getTableName(String experimentMessageId) {
String dataBase = getDataBase(experimentMessageId);
//查询所有试验数据表
List<String> tables = experimentDataEODao.queryTbaleNames(dataBase, tableName);
ConsistentHash<String> consistentHash = getConsistentHash(tables);
String tableName = consistentHash.get(experimentMessageId);
return tableName;
}
剩下的增删改操作和平常一致,在此不多赘述。
数据迁移实践
一致性hash势必涉及到数据迁移问题,我们采取的数据迁移方式为定时任务,针对每个数据库在每天夜里全量扫描一次。检查是否有数据量超过1000万的表,若存在这样的表,就把现有的表数量double。
数据迁移只会在同库之间迁移,不会涉及跨数据库的情况。
此方案为初步方案,后续会改进的更加智能,根据表的数量,增加不同数量的表。而不是简单的把表数量翻倍。
表创建后,将需要迁移的表数据逐个迁移。
在连接到数据源后,我们做了如下事情进行数据迁移
1.获取库中所有的表
List<String> tables = getTables(connection, p, d.toString());
2.遍历表,检查表中数据是否超过边界线(我们为1000万)
for (int i = 0; i < tables.size(); i++) {
//查询表内数据量
int num = countByTableName(connection, p, tables.get(i));
//finalNum为边界值,此处为1000万
if (num > finalNum) {
……
}
……
}
3.根据所有的表计算现有的虚拟节点
ConsistentHash<String> consistentHashOld = getConsistentHash(tables);
4.把表加倍
List<String> tablesNew = deepCopy(tables); //注意一定要采用深复制
int tableSize = tablesNew.size();
for (int y = 0; y < tableSize; y++) {
String tableNameNew = tableName + (tablesNew.size() + 1);
//创建表
createTable(connection, p, d.toString(), tableNameNew);
tablesNew.add(tableNameNew);
tableDelete.add(tableNameNew);
}
5.计算加倍后的虚拟节点
ConsistentHash<String> consistentHashNew = getConsistentHash(tablesNew);
6.数据迁移
for (int z = 0; z < tableSize; z++) {
String tableNameOld = tablesNew.get(z);
//查询试验信息id不重复的试验数据信息
List<String> disData = selectExperimentIdDis(connection, p, tableNameOld);
List<String> deleteList = new LinkedList<>();
for (String experimentId : disData) {
//如果数据hash计算 原所在表与新建表之后不一致,执行转移
if (!consistentHashNew.get(experimentId).equals(consistentHashOld.get(experimentId))) {
//新增到新表数据
insertHash(connection, p, experimentId, consistentHashOld.get(experimentId),
consistentHashNew.get(experimentId));
//删除数据集合
deleteList.add(experimentId);
//删除旧表数据
final int defaultDelNum = 1000;
if (deleteList.size() == defaultDelNum) {
deleteInbatch(connection, p, deleteList, tableNameOld);
deleteList.clear();
}
}
}
//删除旧表数据
if (deleteList.size() > 0) {
deleteInbatch(connection, p, deleteList, tableNameOld);
}
}
总结
以上为我们所做的一致性hash实践,其中还存在很多问题,比如迁移过程单线程导致迁移较慢、自动扩容机制不智能、迁移过程中数据访问不稳定等情况。
我们将会在后续的开发中逐步进行完善改进。
以上就是我们针对一致性hash在oracle分表中的实践
转载于:https://my.oschina.net/oosc/blog/3050988