欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

一致性hash算法 - consistent hashing

程序员文章站 2024-03-19 21:44:22
...

consistent hashing 算法早在 1997 年就在论文 Consistent hashing and random trees 中被提出,目前在cache 系统中应用越来越广泛;

1 基本场景

比如你有 N  cache 服务器(后面简称 cache ),那么如何将一个对象 object 映射到 N  cache 上呢,你很可能会采用类似下面的通用方法计算 object  hash 值,然后均匀的映射到到 N  cache 

hash(object)%N

一切都运行正常,再考虑如下的两种情况;

一个 cache 服务器 m down 掉了(在实际应用中必须要考虑这种情况),这样所有映射到 cache m 的对象都会失效,怎么办,需要把 cache m  cache 中移除,这时候 cache  N-1 台,映射公式变成了 hash(object)%(N-1)

由于访问加重,需要添加 cache ,这时候 cache  N+1 台,映射公式变成了 hash(object)%(N+1) 

1  2 意味着什么?这意味着突然之间几乎所有的 cache 都失效了。对于服务器而言,这是一场灾难,洪水般的访问都会直接冲向后台服务器;

再来考虑第三个问题,由于硬件能力越来越强,你可能想让后面添加的节点多做点活,显然上面的 hash 算法也做不到。

  有什么方法可以改变这个状况呢,这就是 consistent hashing...

2 hash 算法和单调性

   Hash 算法的一个衡量指标是单调性( Monotonicity ),定义如下:

  单调性是指如果已经有一些内容通过哈希分派到了相应的缓冲中,又有新的缓冲加入到系统中。哈希的结果应能够保证原有已分配的内容可以被映射到新的缓冲中去,而不会被映射到旧的缓冲集合中的其他缓冲区。

容易看到,上面的简单 hash 算法 hash(object)%N 难以满足单调性要求。

3 consistent hashing 算法的原理

consistent hashing 是一种 hash 算法,简单的说,在移除 / 添加一个 cache 时,它能够尽可能小的改变已存在 key 映射关系,尽可能的满足单调性的要求。

下面就来按照 5 个步骤简单讲讲 consistent hashing 算法的基本原理。

3.1 环形hash 空间

考虑通常的 hash 算法都是将 value 映射到一个 32 为的 key 值,也即是 0~2^32-1 次方的数值空间;我们可以将这个空间想象成一个首( 0 )尾( 2^32-1 )相接的圆环,如下面图 1 所示的那样。

一致性hash算法 - consistent hashing

 1 环形 hash 空间

3.2 把对象映射到hash 空间

接下来考虑 4 个对象 object1~object4 ,通过 hash 函数计算出的 hash  key 在环上的分布如图 2 所示。

hash(object1) = key1;

… …

hash(object4) = key4;

一致性hash算法 - consistent hashing

 2 4 个对象的 key 值分布

3.3 cache 映射到hash 空间

Consistent hashing 的基本思想就是将对象和 cache 都映射到同一个 hash 数值空间中,并且使用相同的 hash算法。

假设当前有 A,B  C  3  cache ,那么其映射结果将如图 3 所示,他们在 hash 空间中,以对应的 hash 值排列。

hash(cache A) = key A;

… …

hash(cache C) = key C;

一致性hash算法 - consistent hashing

 3 cache 和对象的 key 值分布

 

说到这里,顺便提一下 cache  hash 计算,一般的方法可以使用 cache 机器的 IP 地址或者机器名作为 hash输入。

3.4 把对象映射到cache

现在 cache 和对象都已经通过同一个 hash 算法映射到 hash 数值空间中了,接下来要考虑的就是如何将对象映射到 cache 上面了。

在这个环形空间中,如果沿着顺时针方向从对象的 key 值出发,直到遇见一个 cache ,那么就将该对象存储在这个 cache 上,因为对象和 cache  hash 值是固定的,因此这个 cache 必然是唯一和确定的。这样不就找到了对象和 cache 的映射方法了吗?!

依然继续上面的例子(参见图 3 ),那么根据上面的方法,对象 object1 将被存储到 cache A 上; object2 object3 对应到 cache C  object4 对应到 cache B 

3.5 考察cache 的变动

前面讲过,通过 hash 然后求余的方法带来的最大问题就在于不能满足单调性,当 cache 有所变动时, cache会失效,进而对后台服务器造成巨大的冲击,现在就来分析分析 consistent hashing 算法。

3.5.1 移除 cache

考虑假设 cache B 挂掉了,根据上面讲到的映射方法,这时受影响的将仅是那些沿 cache B 逆时针遍历直到下一个 cache  cache C )之间的对象,也即是本来映射到 cache B 上的那些对象。

因此这里仅需要变动对象 object4 ,将其重新映射到 cache C 上即可;参见图 4 

一致性hash算法 - consistent hashing

 4 Cache B 被移除后的 cache 映射

3.5.2 添加 cache

再考虑添加一台新的 cache D 的情况,假设在这个环形 hash 空间中, cache D 被映射在对象 object2 object3 之间。这时受影响的将仅是那些沿 cache D 逆时针遍历直到下一个 cache  cache B )之间的对象(它们是也本来映射到 cache C 上对象的一部分),将这些对象重新映射到 cache D 上即可。

 

因此这里仅需要变动对象 object2 ,将其重新映射到 cache D 上;参见图 5 

一致性hash算法 - consistent hashing

 添加 cache D 后的映射关系

虚拟节点

考量 Hash 算法的另一个指标是平衡性 (Balance) ,定义如下:

平衡性

  平衡性是指哈希的结果能够尽可能分布到所有的缓冲中去,这样可以使得所有的缓冲空间都得到利用。

hash 算法并不是保证绝对的平衡,如果 cache 较少的话,对象并不能被均匀的映射到 cache 上,比如在上面的例子中,仅部署 cache A  cache C 的情况下,在 4 个对象中, cache A 仅存储了 object1 ,而 cache C 则存储了object2  object3  object4 ;分布是很不均衡的。

为了解决这种情况, consistent hashing 引入了“虚拟节点”的概念,它可以如下定义:

“虚拟节点”( virtual node )是实际节点在 hash 空间的复制品( replica ),一实际个节点对应了若干个“虚拟节点”,这个对应个数也成为“复制个数”,“虚拟节点”在 hash 空间中以 hash 值排列。

仍以仅部署 cache A  cache C 的情况为例,在图 4 中我们已经看到, cache 分布并不均匀。现在我们引入虚拟节点,并设置“复制个数”为 2 ,这就意味着一共会存在 4 个“虚拟节点”, cache A1, cache A2 代表了cache A  cache C1, cache C2 代表了 cache C ;假设一种比较理想的情况,参见图 6 

一致性hash算法 - consistent hashing

 引入“虚拟节点”后的映射关系

 

此时,对象到“虚拟节点”的映射关系为:

objec1->cache A2  objec2->cache A1  objec3->cache C1  objec4->cache C2 

因此对象 object1  object2 都被映射到了 cache A 上,而 object3  object4 映射到了 cache C 上;平衡性有了很大提高。

引入“虚拟节点”后,映射关系就从 { 对象 -> 节点 } 转换到了 { 对象 -> 虚拟节点 } 。查询物体所在 cache 时的映射关系如图 7 所示。

一致性hash算法 - consistent hashing

 查询对象所在 cache

 

“虚拟节点”的 hash 计算可以采用对应节点的 IP 地址加数字后缀的方式。例如假设 cache A  IP 地址为202.168.14.241 

引入“虚拟节点”前,计算 cache A  hash 值:

Hash(“202.168.14.241”);

引入“虚拟节点”后,计算“虚拟节”点 cache A1  cache A2  hash 值:

Hash(“202.168.14.241#1”);  // cache A1

Hash(“202.168.14.241#2”);  // cache A2

小结

Consistent hashing 的基本原理就是这些,具体的分布性等理论分析应该是很复杂的,不过一般也用不到。

http://weblogs.java.net/blog/2007/11/27/consistent-hashing 上面有一个 Java 版本的例子,可以参考。

http://blog.csdn.net/mayongzhan/archive/2009/06/25/4298834.aspx 转载了一个 PHP 版的实现代码。

http://www.codeproject.com/KB/recipes/lib-conhash.aspx C语言版本

 

 

一些参考资料地址:

http://portal.acm.org/citation.cfm?id=258660

http://en.wikipedia.org/wiki/Consistent_hashing

http://www.spiteful.com/2008/03/17/programmers-toolbox-part-3-consistent-hashing/

 http://weblogs.java.net/blog/2007/11/27/consistent-hashing

http://tech.idv2.com/2008/07/24/memcached-004/

http://blog.csdn.net/mayongzhan/archive/2009/06/25/4298834.aspx

 

代码如下:AbstractConsistentHash


import java.util.Collection;
import java.util.SortedMap;
import java.util.TreeMap;

/**
 * 将Hash算法跟虚拟节点的虚拟方式给抽出来,模版方法设计模式
 * 
 * @Author 妳那伊抹微笑
 *
 */
public abstract class AbstractConsistentHash {

	/**
	 * 复制节点,增加每个节点的复制节点有利于负载均衡
	 */
	protected int numberOfReplicas = 8;
	
	/**
	 * 一致性哈希
	 */
	protected SortedMap<Long, String> circle = new TreeMap<Long, String>();

	/**
	 * 自定义一致性哈希算法
	 */
	protected abstract long defineHash(String key);

	/**
	 * 自定义circle key
	 */
	protected abstract String defineCircleKey(String node, int i);

	public AbstractConsistentHash() {
	}

	public AbstractConsistentHash(int numberOfReplicas, Collection<String> nodes) {
		this.numberOfReplicas = numberOfReplicas;
		
		for (String node : nodes) {
			addNode(node);
		}
	}

	/**
	 * 添加节点
	 */
	public void addNode(String node) {
		for (int i = 0; i < this.numberOfReplicas; i++) {
			/*
			 * 计算key的哈希算法
			 */
			long key = defineHash(defineCircleKey(node, i));

			circle.put(key, node);
		}
	}

	/**
	 * 删除节点
	 */
	public void removeNode(String node) {
		for (int i = 0; i < this.numberOfReplicas; i++) {
			/*
			 * 计算key的哈希算法
			 */
			long key = defineHash(defineCircleKey(node, i));
			
			circle.remove(key);
		}
	}

	/**
	 * 查找节点,取得顺时针方向上最近的一个虚拟节点对应的实际节点
	 */
	public String getNode(String node) {
		if (circle.isEmpty()) {
			return null;
		}
		
		/*
		 * 计算key的哈希算法
		 */
		long key = defineHash(node);

		if (!circle.containsKey(key)) {
			SortedMap<Long, String> tailMap = circle.tailMap(key);
			key = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
		}
		
		return circle.get(key);
	}
	
}

 

   实现类1:CRC32ConsistentHash

   


import java.util.Set;
import java.util.zip.CRC32;

/**
 * HashCode 一致性哈希,使用CRC32算法计算hash值
 * 
 * @Author 妳那伊抹微笑
 *
 */
public class CRC32ConsistentHash extends AbstractConsistentHash {

	public CRC32ConsistentHash(int numberOfReplicas, Set<String> nodes) {
		super(numberOfReplicas, nodes);
	}

	@Override
	protected long defineHash(String key) {
		CRC32 hash = new CRC32();
		hash.update(key.getBytes());
		return (int)hash.getValue();
	}
	
	@Override
	protected String defineCircleKey(String node, int i) {
		return (node+i);
	}

 

    实现类2:HashCodeConsistentHash

    

import java.util.Set;

/**
 * HashCode 一致性哈希,使用自带的hashCode计算hash值
 * 
 * @Author 妳那伊抹微笑
 *
 */
public class HashCodeConsistentHash extends AbstractConsistentHash {

	public HashCodeConsistentHash(int numberOfReplicas, Set<String> nodes) {
		super(numberOfReplicas, nodes);
	}

	@Override
	protected long defineHash(String key) {
		return key.hashCode();
	}
	
	@Override
	protected String defineCircleKey(String node, int i) {
		if(i > 99){
			return node + "-" + i;
		}else if(i > 9){
			return node + "-0" + i;
		}else{
			return node + "-00" + i;
		}
	}
	
}

 

  实现类3:MD5ConsistentHash

  


import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.util.Set;

/**
 * HashCode 一致性哈希,使用自MD5算法计算hash值
 * 
 * @Author 妳那伊抹微笑
 *
 */
public class MD5ConsistentHash extends AbstractConsistentHash{

	public MD5ConsistentHash(int numberOfReplicas, Set<String> nodes) {
		super(numberOfReplicas, nodes);
	}

	@Override
	protected String defineCircleKey(String node, int i) {
		if(i > 99){
			return node + "-" + i;
		}else if(i > 9){
			return node + "-0" + i;
		}else{
			return node + "-00" + i;
		}
	}

	@Override
	protected long defineHash(String key) {
		try {
			MessageDigest md5 = MessageDigest.getInstance("MD5");
			md5.update(key.getBytes(Charset.forName("UTF8")));
			byte[] digest = md5.digest();

			long hash = 0;
			for (int i = 0; i < 4; i++) {
				hash += ((long) (digest[i * 4 + 3] & 0xFF) << 24)
						| ((long) (digest[i * 4 + 2] & 0xFF) << 16)
						| ((long) (digest[i * 4 + 1] & 0xFF) << 8)
						| ((long) (digest[i * 4 + 0] & 0xFF));
			}
			System.out.println(key+ "--->" + hash);
			return hash;
		} catch (Exception ex) {
			return -1;
		}
	}

 

   redis集群:ConsistentHashRedisUtils

   

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;


/**
* 一致性哈希Utils, 实现了redis中hash的一般操作
*
* @Author 妳那伊抹微笑
*
*/
public class ConsistentHashRedisUtils {

	/**
	 * redis server properties 配置文件的路径
	 */
	private static final String REDIS_SERVERS_HANGZHOU_PROPERTIES = "../../conf/analysis-redis-servers-hangzhou.properties";

	/**
	 * 日志对象
	 */
	private Log log = LogFactory.getLog(this.getClass());

	/**
	 * 一致性哈希
	 */
	private AbstractConsistentHash hash;

	/**
	 * Redis连接池
	 */
	private Map<String, JedisPool> jedisPools = new HashMap<String, JedisPool>();;

	/**
	 * 外网地址到内网地IP映射
	 */
	private Map<String, String> redisNodeMaps = new LinkedHashMap<String, String>();

	/**
	 * 是否转换到内网地址创建jedis对象
	 */
	private boolean isConvertIp = false;

	/**
	 * 虚拟节点个数
	 */
	private static int numberOfReplicas = 8;

	public ConsistentHashRedisUtils() {
		this(numberOfReplicas);
	}

	public ConsistentHashRedisUtils(int numberOfReplicas) {
		redisNodeMaps = getRedisConfNodeMaps();
		hash = new CRC32ConsistentHash(numberOfReplicas, redisNodeMaps.keySet());
	}

	/**
	 * hset
	 *
	 * @param key
	 * @param field
	 * @param value
	 */
	public void hset(String key, String field, String value) {
		JedisPool pool = null;
		Jedis jedis = null;

		try {
			pool = getJedisPool(key);
			jedis = pool.getResource();
			jedis.hset(key, field, value);
		} catch (Exception e) {
			log.error("Redis hset exception : " + key, e);
			pool.returnBrokenResource(jedis);
		} finally {
			pool.returnResource(jedis);
		}
	}

	/**
	 * hincrBy
	 *
	 * @param key
	 * @param field
	 * @param value
	 */
	public void hincrBy(String key, String field, long value) {
		JedisPool pool = null;
		Jedis jedis = null;

		try {
			pool = getJedisPool(key);
			jedis = pool.getResource();
			jedis.hincrBy(key, field, value);
		} catch (Exception e) {
			log.error("Redis hincrBy exception : " + key, e);
			pool.returnBrokenResource(jedis);
		} finally {
			pool.returnResource(jedis);
		}
	}

	/**
	 * hget
	 *
	 * @param key
	 * @param field
	 * @return?
	 */
	public String hget(String key, String field) {
		JedisPool pool = null;
		Jedis jedis = null;
		String result = null;

		try {
			pool = getJedisPool(key);
			jedis = pool.getResource();
			result = jedis.hget(key, field);
		} catch (Exception e) {
			log.error("Redis hget exception : " + key, e);
			pool.returnBrokenResource(jedis);
		} finally {
			pool.returnResource(jedis);
		}
		return result;
	}

	/**
	 * hgetAll
	 *
	 * @param key
	 * @return map
	 */
	public Map<String, String> hgetAll(String key) {
		JedisPool pool = null;
		Jedis jedis = null;
		Map<String, String> result = null;

		try {
			pool = getJedisPool(key);
			jedis = pool.getResource();
			result = jedis.hgetAll(key);
		} catch (Exception e) {
			log.error("Redis hgetAll exception : " + key, e);
			pool.returnBrokenResource(jedis);
		} finally {
			pool.returnResource(jedis);
		}
		return result;
	}

	/**
	 * hdel
	 *
	 * @param key
	 * @param field
	 */
	public void hdel(String key, String field) {
		JedisPool pool = null;
		Jedis jedis = null;
		try {
			pool = getJedisPool(key);
			jedis = pool.getResource();
			jedis.hdel(key, field);
		} catch (Exception e) {
			log.error("Redis hdel exception : " + key, e);
			pool.returnBrokenResource(jedis);
		} finally {
			pool.returnResource(jedis);
		}
	}

	/**
	 * 通过userid获取对应的jedis连接
	 *
	 * @param userid
	 * @return
	 * @throws Exception
	 */
	private JedisPool getJedisPool(String userid) throws Exception {
		String key = this.hash.getNode(userid);
		JedisPool pool = this.jedisPools.get(key);

		if (pool == null) {
			String outKey = key;
			/*
			 * 将外网ip转换内网ip
			 */
			if (this.isConvertIp) {
				key = this.redisNodeMaps.get(key);
				if (key == null) {
					throw new Exception("not " + key + " in redisNodeMaps");
				}
			}
			String[] host_port = key.split(":");
			pool = getJedisPool(host_port);
			log.info("-------------------->Redis ip--->port : " + host_port[0] + "--->" + host_port[1]);

			this.jedisPools.put(outKey, pool);
		}
		return pool;
	}

	/**
	 * 获得redis的连接池
	 *
	 * @param host_port
	 * @return
	 */
	private JedisPool getJedisPool(String[] host_port) {
		JedisPoolConfig config = new JedisPoolConfig();
//		config.setMaxActive(500);
		config.setMaxIdle(10);
//		config.setMaxWait(3000000);
		config.setTestOnBorrow(true);
		config.setTestOnReturn(true);
		return new JedisPool(config, host_port[0], Integer.parseInt(host_port[1]));
	}

	/**
	 * 关闭所有redis链接
	 */
	public void closeAllConnection() throws Exception {
		for (String key : this.jedisPools.keySet()) {
			JedisPool pool = this.jedisPools.get(key);

			if (pool != null) {
				pool.destroy();
			}
		}
	}

	/**
	 * 从配置文件中获取Redis集群的IP
	 *
	 * @return
	 */
	private Map<String, String> getRedisConfNodeMaps() {
		Map<String, String> rmaps = null;
		String line = null;
		try {
			BufferedReader reader = new BufferedReader(
					new InputStreamReader(this.getClass().getResourceAsStream(REDIS_SERVERS_HANGZHOU_PROPERTIES)));
			rmaps = new LinkedHashMap<String, String>();
			String[] splits = null;
			String outIp = null;
			String inIp = null;
			while ((line = reader.readLine()) != null) {
				if (line.trim().length() < 5 || line.trim().startsWith("#"))
					continue;
				log.info("redis server : " + line);
				splits = line.split(" +|\t+");
				outIp = splits[0];
				inIp = splits[1];
				rmaps.put(outIp, inIp);
			}
			log.info(
					"ConsistentHashRedisUtils initialize configure ../../conf/analysis-redis-servers-hangzhou.properties is successful ...");
		} catch (Exception e) {
			log.error("ConsistentHashRedisUtils initialize error : " + line, e);
		}
		return rmaps;
	}

}

 

 

  问题:

   1、减少节点

以上的代码还没有解决数据转移的问题,比如其中一台Redis服务器挂掉了,应该将数据转移到该服务器顺时针最近的那一台活着的服务器上去,而且以后数据恢复,还需要自己写代码做数据校验,数据找回等等!!!这也是一个麻烦的问题 、、、(至少数据不会丢失了,这点还是不错的,数据校验跟数据找回的代码写好后,也可以一劳永逸了,嘎嘎!)

2、增加节点
由于使用了虚拟节点的方式,上面的虚拟节点数量为8,现在新增一台Redis服务器,也会虚拟出8台虚拟节点出来,这里原先假设的是有12台Redis服务器,假如现在的这8台虚拟节点如果刚好分布到了其他8台虚拟节点之间,也就是意味着另外8台Redis中差不多每台Rdis服务器都会有八分之一的数据会跑到现在新增的这台Redis服务器上,这样也会造成数据混乱,这该如何解决 、、、也就是新增节点之后还是破坏了原有的映射关系,当然知道了这样的原理之后也是可以写代码数据校验跟找回的,重新平衡服务器的数据,不过那也太麻烦点了,伤不起啊!假如这里虚拟节点的数据变大了,那以后增加节点是不是要做的工作就更大了,这 、、、

 

转载:http://blog.csdn.net/u012185296/article/details/28427251