美团分布式ID生成框架Leaf源码分析及优化改进
最近做了一个面试题解答的开源项目,大家可以看一看,如果对大家有帮助,希望大家帮忙给一个star,谢谢大家了!
《面试指北》项目地址:https://github.com/notfound9/interviewguide
本文主要是对美团的分布式id框架leaf的原理进行介绍,针对leaf原项目中的一些issue,对leaf项目进行功能增强,问题修复及优化改进,改进后的项目地址在这里:
leaf项目改进计划 https://github.com/notfound9/leaf
leaf原理分析
snowflake生成id的模式
7849276-4d1955394baa3c6d.png
snowflake算法对于id的位数是上图这样分配的:
1位的符号位+41位时间戳+10位workid+12位序列号
加起来一共是64个二进制位,正好与java中的long类型的位数一样。
美团的leaf框架对于snowflake算法进行了一些位数调整,位数分配是这样:
最大41位时间差+10位的workid+12位序列化
虽然看美团对leaf的介绍文章里面说
leaf-snowflake方案完全沿用snowflake方案的bit位设计,即是“1+41+10+12”的方式组装id号。
其实看代码里面是没有专门设置符号位的,如果timestamp过大,导致时间差占用42个二进制位,时间差的第一位为1时,可能生成的id转换为十进制后会是负数:
//timestampleftshift是22,workeridshift是12 long id = ((timestamp - twepoch) << timestampleftshift) | (workerid << workeridshift) | sequence;
时间差是什么?
因为时间戳是以1970年01月01日00时00分00秒作为起始点,其实我们一般取的时间戳其实是起始点到现在的时间差,如果我们能确定我们取的时间都是某个时间点以后的时间,那么可以将时间戳的起始点改成这个时间点,leaf项目中,如果不设置起始时间,默认是2010年11月4日09:42:54,这样可以使得支持的最大时间增长,leaf框架的支持最大时间是起始点之后的69年。
workid怎么分配?
leaf使用zookeeper作为注册中心,每次机器启动时去zookeeper特定路径/forever/下读取子节点列表,每个子节点存储了ip:port及对应的workid,遍历子节点列表,如果存在当前ip:port对应的workid,就使用节点信息中存储的workid,不存在就创建一个永久有序节点,将序号作为workid,并且将workid信息写入本地缓存文件workerid.properties,供启动时连接zookeeper失败,读取使用。因为workid只分配了10个二进制位,所以取值范围是0-1023。
序列号怎么生成?
序列号是12个二进制位,取值范围是0到4095,主要保证同一个leaf服务在同一毫秒内,生成的id的唯一性。
序列号是生成流程如下:
1.当前时间戳与上一个id的时间戳在同一毫秒内,那么对sequence+1,如果sequence+1超过了4095,那么进行等待,等到下一毫秒到了之后再生成id。
2.当前时间戳与上一个id的时间戳不在同一毫秒内,取一个100以内的随机数作为序列号。
if (lasttimestamp == timestamp) { sequence = (sequence + 1) & sequencemask; if (sequence == 0) { //seq 为0的时候表示是下一毫秒时间开始对seq做随机 sequence = random.nextint(100); timestamp = tilnextmillis(lasttimestamp); } } else { //如果是新的ms开始 sequence = random.nextint(100); } lasttimestamp = timestamp;
segment生成id的模式
5e4ff128.png
这种模式需要依赖mysql,表字段biz_tag代表业务名,max_id代表该业务目前已分配的最大id值,step代表每次leaf往数据库请求时,一次性分配的id数量。
大致流程就是每个leaf服务在内存中有两个segment实例,每个segement保存一个分段的id,
一个segment是当前用于分配id,有一个value属性保存这个分段已分配的最大id,以及一个max属性这个分段最大的id。
另外一个segement是备用的,当一个segement用完时,会进行切换,使用另一个segement进行使用。
当一个segement的分段id使用率达到10%时,就会触发另一个segement去db获取分段id,初始化好分段id供之后使用。
segment { private atomiclong value = new atomiclong(0); private volatile long max; private volatile int step; } segmentbuffer { private string key; private segment[] segments; //双buffer private volatile int currentpos; //当前的使用的segment的index private volatile boolean nextready; //下一个segment是否处于可切换状态 private volatile boolean initok; //是否初始化完成 private final atomicboolean threadrunning; //线程是否在运行中 private final readwritelock lock; private volatile int step; private volatile int minstep; private volatile long updatetimestamp; }
leaf项目改进
目前leaf项目存在的问题是
snowflake生成id相关:
1.注册中心只支持zookeeper
而对于一些小公司或者项目组,其他业务没有使用到zookeeper的话,为了部署leaf服务而维护一个zookeeper集群的代价太大。所以原项目中有issue在问”怎么支持非zookeeper的注册中心“,由于一般项目中使用mysql的概率会大很多,所以增加了使用mysql作为注册中心,本地配置作为注册中心的功能。
2.潜在的时钟回拨问题
由于启动前,服务器时间调到了以前的时间或者进行了回拨,连接zookeeper失败时会使用本地缓存文件workerid.properties中的workerid,而没有校验该id生成的最大时间戳,可能会造成id重复,对这个问题进行了修复。
3.时间差过大时,生成id为负数
因为缺少对时间差的校验,当时间差过大,转换为二进制数后超过41位后,在生成id时会造成溢出,使得符号位为1,生成id为负数。
segement生成id相关:
没有太多问题,主要是根据一些issue对代码进行了性能优化。
具体改进如下:
snowflake生成id相关的改进:
1.针对leaf原项目中的issue#84,增加zk_recycle模式(注册中心为zk,workid循环使用)
2.针对leaf原项目中的issue#100,增加mysql模式(注册中心为mysql)
3.针对leaf原项目中的issue#100,增加local模式(注册中心为本地项目配置)
4.针对leaf原项目中的issue#84,修复启动时时钟回拨的问题
5.针对leaf原项目中的issue#106,修复时间差过大,超过41位溢出,导致生成的id负数的问题
segement生成id相关的改进:
1.针对leaf原项目中的issue#68,优化segmentidgenimpl.updatecachefromdb()方法。
2.针对leaf原项目中的 issue#88,使用位运算&替换取模运算
snowflake算法生成id的相关改进
leaf项目原来的注册中心的模式(我们暂时命令为zk_normal模式)
使用zookeeper作为注册中心,每次机器启动时去zookeeper特定路径下读取子节点列表,如果存在当前ip:port对应的workid,就使用节点信息中存储的workid,不存在就创建一个永久有序节点,将序号作为workid,并且将workid信息写入本地缓存文件workerid.properties,供启动时连接zookeeper失败,读取使用。
1.针对leaf原项目中的issue#84,增加zk_recycle模式(注册中心为zk,workid循环使用)
问题详情:
issue#84:workid是否支持回收?
snowflakeservice模式中,workid是否支持回收?分布式环境下,每次重新部署可能就换了一个ip,如果没有回收的话1024个机器标识很快就会消耗完,为什么zk不用临时节点去存储呢,这样能动态感知服务上下线,对workid进行管理回收?
解决方案:
开发了zk_recycle模式,针对使用snowflake生成分布式id的技术方案,原本是使用zookeeper作为注册中心为每个服务根据ip:port分配一个固定的workid,workid生成范围为0到1023,workid不支持回收,所以在leaf的原项目中有人提出了一个issue#84 workid是否支持回收?,因为当部署leaf的服务的ip和port不固定时,如果workid不支持回收,当workid超过最大值时,会导致生成的分布式id的重复。所以增加了workid循环使用的模式zk_recycle。
如何使用zk_recycle模式?
在leaf/leaf-server/src/main/resources/leaf.properties中添加以下配置
//开启snowflake服务 leaf.snowflake.enable=true //leaf服务的端口,用于生成workid leaf.snowflake.port= //将snowflake模式设置为zk_recycle,此时注册中心为zookeeper,并且workerid可复用 leaf.snowflake.mode=zk_recycle //zookeeper的地址 leaf.snowflake.zk.address=localhost:2181
启动leafserverapplication,调用/api/snowflake/get/test就可以获得此种模式下生成的分布式id。
curl domain/api/snowflake/get/test 1256557484213448722
zk_recycle模式实现原理
按照上面的配置在leaf.properties里面进行配置后,
if(mode.equals(snowflakemode.zk_recycle)) {//注册中心为zk,对ip:port分配的workid是课循环利用的模式 string zkaddress = properties.getproperty(constants.leaf_snowflake_zk_address); recyclablezookeeperholder holder = new recyclablezookeeperholder(utils.getip(),port,zkaddress); idgen = new snowflakeidgenimpl(holder); if (idgen.init()) { logger.info("snowflake service init successfully in mode " + mode); } else { throw new initexception("snowflake service init fail"); } }
此时snowflakeidgenimpl使用的holder是recyclablezookeeperholder的实例,workid是可循环利用的,recyclablezookeeperholder工作流程如下:
1.首先会在未使用的workid池(zookeeper路径为/snowflake/leaf.name/recycle/notuse/)中生成所有workid。
2.然后每次服务器启动时都是去未使用的workid池取一个新的workid,然后放到正在使用的workid池(zookeeper路径为/snowflake/leaf.name/recycle/inuse/)下,将此workid用于id生成,并且定时上报时间戳,更新zookeeper中的节点信息。
3.并且定时检测正在使用的workid池,发现某个workid超过最大时间没有更新时间戳的workid,会把它从正在使用的workid池移出,然后放到未使用的workid池中,以供workid循环使用。
4.并且正在使用这个很长时间没有更新时间戳的workid的服务器,在发现自己超过最大时间,还没有上报时间戳成功后,会停止id生成服务,以防workid被其他服务器循环使用,导致id重复。
2.针对leaf原项目中的issue#100,增加mysql模式(注册中心为mysql)
问题详情:
issue#100:如何使用非zk的注册中心?
解决方案:
开发了mysql模式,这种模式注册中心为mysql,针对每个ip:port的workid是固定的。
如何使用这种mysql模式?
需要先在数据库执行项目中的leaf_workerid_alloc.sql,完成建表,然后在leaf/leaf-server/src/main/resources/leaf.properties中添加以下配置
//开启snowflake服务 leaf.snowflake.enable=true //leaf服务的端口,用于生成workid leaf.snowflake.port= //将snowflake模式设置为mysql,此时注册中心为zookeeper,workerid为固定分配 leaf.snowflake.mode=mysql //mysql数据库地址 leaf.jdbc.url= leaf.jdbc.username= leaf.jdbc.password=
启动leafserverapplication,调用/api/snowflake/get/test就可以获得此种模式下生成的分布式id。
curl domain/api/snowflake/get/test 1256557484213448722
实现原理
使用上面的配置后,此时snowflakeidgenimpl使用的holder是snowflakemysqlholder的实例。实现原理与leaf原项目默认的模式,使用zookeeper作为注册中心,每个ip:port的workid是固定的实现原理类似,只是注册,获取workid,及更新时间戳是与mysql进行交互,而不是zookeeper。
if (mode.equals(snowflakemode.mysql)) {//注册中心为mysql druiddatasource datasource = new druiddatasource(); datasource.seturl(properties.getproperty(constants.leaf_jdbc_url)); datasource.setusername(properties.getproperty(constants.leaf_jdbc_username)); datasource.setpassword(properties.getproperty(constants.leaf_jdbc_password)); datasource.init(); // config dao workeridallocdao dao = new workeridallocdaoimpl(datasource); snowflakemysqlholder holder = new snowflakemysqlholder(utils.getip(), port, dao); idgen = new snowflakeidgenimpl(holder); if (idgen.init()) { logger.info("snowflake service init successfully in mode " + mode); } else { throw new initexception("snowflake service init fail"); } }
3.针对leaf原项目中的issue#100,增加local模式(注册中心为本地项目配置)
问题详情:
issue#100:如何使用非zk的注册中心?
解决方案:
开发了local模式,这种模式就是适用于部署leaf服务的ip和port基本不会变化的情况,就是在leaf项目中的配置文件leaf.properties中显式得配置某某ip:某某port对应哪个workid,每次部署新机器时,将ip:port的时候在项目中添加这个配置,然后启动时项目会去读取leaf.properties中的配置,读取完写入本地缓存文件workid.json,下次启动时直接读取workid.json,最大时间戳也每次同步到机器上的缓存文件workid.json中。
如何使用这种local模式?
在leaf/leaf-server/src/main/resources/leaf.properties中添加以下配置
//开启snowflake服务 leaf.snowflake.enable=true //leaf服务的端口,用于生成workid leaf.snowflake.port= #注册中心为local的的模式 #leaf.snowflake.mode=local #leaf.snowflake.local.workidmap= #workidmap的格式是这样的{"leaf服务的ip:端口":"固定的workid"},例如:{"10.1.46.33:8080":1,"10.1.46.33:8081":2}
启动leafserverapplication,调用/api/snowflake/get/test就可以获得此种模式下生成的分布式id。
curl domain/api/snowflake/get/test 1256557484213448722
4.针对leaf原项目中的issue#84,修复启动时时钟回拨的问题
问题详情:
issue#84:因为当使用默认的模式(我们暂时命令为zk_normal模式),注册中心为zookeeper,workid不可复用,上面介绍了这种模式的工作流程,当leaf服务启动时,连接zookeeper失败,那么会去本机缓存中读取workerid.properties文件,读取workid进行使用,但是由于workerid.properties中只存了workid信息,没有存储上次上报的最大时间戳,所以没有进行时间戳判断,所以如果机器的当前时间被修改到之前,就可能会导致生成的id重复。
解决方案:
所以增加了更新时间戳到本地缓存的机制,每次在上报时间戳时将时间戳同时写入本机缓存workerid.properties,并且当使用本地缓存workerid.properties中的workid时,对时间戳进行校验,当前系统时间戳<缓存中的时间戳时,才使用这个workerid。
//连接失败,使用本地workerid.properties中的workerid,并且对时间戳进行校验。 try { properties properties = new properties(); properties.load(new fileinputstream(new file(prop_path.replace("{port}", port + "")))); long maxtimestamp = long.valueof(properties.getproperty("maxtimestamp")); if (maxtimestamp!=null && system.currenttimemillis() <maxtimestamp) { throw new checklasttimeexception("init timestamp check error,forever node timestamp gt this node time"); } workerid = integer.valueof(properties.getproperty("workerid")); logger.warn("start failed ,use local node file properties workerid-{}", workerid); } catch (exception e1) { logger.error("read file error ", e1); return false; } //定时任务每3s执行一次updatenewdata()方法,调用更新updatelocalworkerid()更新缓存文件workerid.properties void updatenewdata(curatorframework curator, string path) { try { if (system.currenttimemillis() < lastupdatetime) { return; } curator.setdata().forpath(path, builddata().getbytes()); updatelocalworkerid(workerid); lastupdatetime = system.currenttimemillis(); } catch (exception e) { logger.info("update init data error path is {} error is {}", path, e); } }
5.针对leaf原项目中的issue#106,修复时间差过大,超过41位溢出,导致生成的id负数的问题
问题详情:
因为leaf框架是沿用snowflake的位数分配
最大41位时间差+10位的workid+12位序列化,但是由于snowflake是强制要求第一位为符号位0,否则生成的id转换为十进制后会是复试,但是leaf项目中没有对时间差进行校验,当时间戳过大或者自定义的twepoch设置不当过小,会导致计算得到的时间差过大,转化为2进制后超过41位,且第一位为1,会导致生成的long类型的id为负数,例如当timestamp = twepoch+2199023255552l时,
此时在生成id时,timestamp - twepoch会等于2199023255552,2199023255552转换为二进制后是1+41个0,此时生成的id由于符号位是1,id会是负数-9223372036854775793
long id = ((timestamp - twepoch) << timestampleftshift) | (workerid << workeridshift) | sequence;
解决方案:
//一开始将最大的maxtimestamp计算好 this.maxtimestamp = ~(-1l << timestampbits) + twepoch; //然后生成id时进行校验 if (timestamp>maxtimestamp) { throw new overmaxtimestampexception("current timestamp is over maxtimestamp, the generate id will be negative"); }
针对segement生成分布式id相关的改进
1.针对leaf原项目中的issue#68,优化segmentidgenimpl.updatecachefromdb()方法
针对issue#68里面的优化方案,对segement buffer的缓存数据与db数据同步的工作流程进行了进一步优化,主要是对
对segmentidgenimpl.updatecachefromdb()方法进行了优化。
原方案工作流程:
1.遍历cachetags,将dbtags的副本inserttagsset中存在的元素移除,使得inserttagsset只有db新增的tag
2.遍历inserttagsset,将这些新增的元素添加到cache中
3.遍历dbtags,将cachetags的副本removetagsset中存在的元素移除,使得removetagsset只有cache中过期的tag
4.遍历removetagsset,将过期的元素移除cache
这种方案需要经历四次循环,使用两个hashset分别存储db中新增的tag,cache中过期的tag,
并且为了筛选出新增的tag,过期的tag,对每个现在使用的tag有两次删除操作,
原有方案代码如下:
list<string> dbtags = dao.getalltags(); if (dbtags == null || dbtags.isempty()) { return; } list<string> cachetags = new arraylist<string>(cache.keyset()); set<string> inserttagsset = new hashset<>(dbtags); set<string> removetagsset = new hashset<>(cachetags); //db中新加的tags灌进cache for(int i = 0; i < cachetags.size(); i++){ string tmp = cachetags.get(i); if(inserttagsset.contains(tmp)){ inserttagsset.remove(tmp); } } for (string tag : inserttagsset) { segmentbuffer buffer = new segmentbuffer(); buffer.setkey(tag); segment segment = buffer.getcurrent(); segment.setvalue(new atomiclong(0)); segment.setmax(0); segment.setstep(0); cache.put(tag, buffer); logger.info("add tag {} from db to idcache, segmentbuffer {}", tag, buffer); } //cache中已失效的tags从cache删除 for(int i = 0; i < dbtags.size(); i++){ string tmp = dbtags.get(i); if(removetagsset.contains(tmp)){ removetagsset.remove(tmp); } } for (string tag : removetagsset) { cache.remove(tag); logger.info("remove tag {} from idcache", tag); }
实际上我们并不需要这些中间过程,现方案工作流程:
只需要遍历dbtags,判断cache中是否存在这个key,不存在就是新增元素,进行新增。
遍历cachetags,判断dbset中是否存在这个key,不存在就是过期元素,进行删除。
现有方案代码:
list<string> dbtags = dao.getalltags(); if (dbtags == null || dbtags.isempty()) { return; } //将dbtags中新加的tag添加cache,通过遍历dbtags,判断是否在cache中存在,不存在就添加到cache for (string dbtag : dbtags) { if (cache.containskey(dbtag)==false) { segmentbuffer buffer = new segmentbuffer(); buffer.setkey(dbtag); segment segment = buffer.getcurrent(); segment.setvalue(new atomiclong(0)); segment.setmax(0); segment.setstep(0); cache.put(dbtag, buffer); logger.info("add tag {} from db to idcache, segmentbuffer {}", dbtag, buffer); } } list<string> cachetags = new arraylist<string>(cache.keyset()); set<string> dbtagset = new hashset<>(dbtags); //将cache中已失效的tag从cache删除,通过遍历cachetags,判断是否在dbtagset中存在,不存在说明过期,直接删除 for (string cachetag : cachetags) { if (dbtagset.contains(cachetag) == false) { cache.remove(cachetag); logger.info("remove tag {} from idcache", cachetag); } }
两个方案对比:
- 空间复杂度
相比原方案需要使用两个hashset,这种方案的只需要使用一个hashset,空间复杂度会低一些。 - 时间复杂度
总遍历次数会比原来的少,时间复杂度更低,因为判断是新增,过期的情况就直接处理了,不需要后续再单独遍历,
而且不需要对cache和dbtag的交集进行删除操作,因为原来方案为了获得新增的元素,是将dbset的副本中现有元素进行删除得到。 - 代码可读性
原方案是4个for循环,总共35行代码,现方案是2个for循环,总共25行代码,更加简洁易懂。
2.针对leaf原项目中的issue#88,使用位运算&替换取模运算
这个更新是针对这个issue#88 提出的问题,使用位运算&来代替取模运算%,执行效率更高。
原代码:
public int nextpos() { return (currentpos + 1) % 2; }
现代码:
public int nextpos() { return (currentpos + 1) & 1; }