基于Mysql的Sequence实现方法
团队更换新框架。新的业务全部使用新的框架,甚至是新的数据库--mysql。
这边之前一直是使用oracle,各种订单号、流水号、批次号啥的,都是直接使用oracle的sequence提供的数字序列号。现在数据库更换成mysql了,显然以前的老方法不能适用了。
需要新写一个:
•分布式场景使用
•满足一定的并发要求
找了一些相关的资料,发现mysql这方面的实现,原理都是一条数据库记录,不断update它的值。然后大部分的实现方案,都用到了函数。
贴一下网上的代码:
基于mysql函数实现
表结构
create table `t_sequence` ( `sequence_name` varchar(64) character set utf8 collate utf8_general_ci not null comment '序列名称' , `value` int(11) null default null comment '当前值' , primary key (`sequence_name`) ) engine=innodb default character set=utf8 collate=utf8_general_ci row_format=compact ;
获取下一个值
create definer = `root`@`localhost` function `nextval`(sequence_name varchar(64)) returns int(11) begin declare current integer; set current = 0; update t_sequence t set t.value = t.value + 1 where t.sequence_name = sequence_name; select t.value into current from t_sequence t where t.sequence_name = sequence_name; return current; end;
并发场景有可能会出问题,虽然可以在业务层加锁,但分布式场景就无法保证了,然后效率应该也不会高。
自己实现一个,java版
原理:
•读取一条记录,缓存一个数据段,如:0-100,将记录的当前值从0修改为100
•数据库乐观锁更新,允许重试
•读取数据从缓存中读取,用完再读取数据库
不废话,上代码:
基于java实现
表结构
每次update,都是将seq_value设置为seq_value+step
create table `t_pub_sequence` ( `seq_name` varchar(128) character set utf8 not null comment '序列名称', `seq_value` bigint(20) not null comment '目前序列值', `min_value` bigint(20) not null comment '最小值', `max_value` bigint(20) not null comment '最大值', `step` bigint(20) not null comment '每次取值的数量', `tm_create` datetime not null comment '创建时间', `tm_smp` datetime not null default current_timestamp on update current_timestamp comment '修改时间', primary key (`seq_name`) ) engine=innodb default charset=utf8mb4 comment='流水号生成表';
sequence接口
/** * <p></p> * @author coderzl * @title mysqlsequence * @description 基于mysql数据库实现的序列 * @date 2017/6/6 23:03 */ public interface mysqlsequence { /** * <p> * 获取指定sequence的序列号 * </p> * @param seqname sequence名 * @return string 序列号 */ public string nextval(string seqname); }
序列区间
用于本地缓存一段序列,从min到max区间
/** * <p></p> * * @author coderzl * @title sequencerange * @description 序列区间,用于缓存序列 * @date 2017/6/6 22:58 */ @data public class sequencerange { private final long min; private final long max; /** */ private final atomiclong value; /** 是否超限 */ private volatile boolean over = false; /** * 构造. * * @param min * @param max */ public sequencerange(long min, long max) { this.min = min; this.max = max; this.value = new atomiclong(min); } /** * <p>gets and increment</p> * * @return */ public long getandincrement() { long currentvalue = value.getandincrement(); if (currentvalue > max) { over = true; return -1; } return currentvalue; } }
bo
对应数据库记录
@data public class mysqlsequencebo { /** * seq名 */ private string seqname; /** * 当前值 */ private long seqvalue; /** * 最小值 */ private long minvalue; /** * 最大值 */ private long maxvalue; /** * 每次取值的数量 */ private long step; /** */ private date tmcreate; /** */ private date tmsmp; public boolean validate(){ //一些简单的校验。如当前值必须在最大最小值之间。step值不能大于max与min的差 if (stringutil.isblank(seqname) || minvalue < 0 || maxvalue <= 0 || step <= 0 || minvalue >= maxvalue || maxvalue - minvalue <= step ||seqvalue < minvalue || seqvalue > maxvalue ) { return false; } return true; } }
dao
增删改查,其实就用到了改和查
public interface mysqlsequencedao { /** * */ public int createsequence(mysqlsequencebo bo); public int updsequence(@param("seqname") string seqname, @param("oldvalue") long oldvalue ,@param("newvalue") long newvalue); public int delsequence(@param("seqname") string seqname); public mysqlsequencebo getsequence(@param("seqname") string seqname); public list<mysqlsequencebo> getall(); }
mapper
<?xml version="1.0" encoding="utf-8" ?> <!doctype mapper public "-//mybatis.org//dtd mapper 3.0//en" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.xxxxx.core.sequence.impl.dao.mysqlsequencedao" > <resultmap id="baseresultmap" type="com.xxxxx.core.sequence.impl.mysqlsequencebo" > <result column="seq_name" property="seqname" jdbctype="varchar" /> <result column="seq_value" property="seqvalue" jdbctype="bigint" /> <result column="min_value" property="minvalue" jdbctype="bigint" /> <result column="max_value" property="maxvalue" jdbctype="bigint" /> <result column="step" property="step" jdbctype="bigint" /> <result column="tm_create" property="tmcreate" jdbctype="timestamp" /> <result column="tm_smp" property="tmsmp" jdbctype="timestamp" /> </resultmap> <delete id="delsequence" parametertype="java.lang.string" > delete from t_pub_sequence where seq_name = #{seqname,jdbctype=varchar} </delete> <insert id="createsequence" parametertype="com.xxxxx.core.sequence.impl.mysqlsequencebo" > insert into t_pub_sequence (seq_name,seq_value,min_value,max_value,step,tm_create) values (#{seqname,jdbctype=varchar}, #{seqvalue,jdbctype=bigint}, #{minvalue,jdbctype=bigint}, #{maxvalue,jdbctype=bigint}, #{step,jdbctype=bigint}, now()) </insert> <update id="updsequence" parametertype="com.xxxxx.core.sequence.impl.mysqlsequencebo" > update t_pub_sequence set seq_value = #{newvalue,jdbctype=bigint} where seq_name = #{seqname,jdbctype=varchar} and seq_value = #{oldvalue,jdbctype=bigint} </update> <select id="getall" resultmap="baseresultmap" > select seq_name, seq_value, min_value, max_value, step from t_pub_sequence </select> <select id="getsequence" resultmap="baseresultmap" > select seq_name, seq_value, min_value, max_value, step from t_pub_sequence where seq_name = #{seqname,jdbctype=varchar} </select> </mapper>
接口实现
@repository("mysqlsequence") public class mysqlsequenceimpl implements mysqlsequence{ @autowired private mysqlsequencefactory mysqlsequencefactory; /** * <p> * 获取指定sequence的序列号 * </p> * * @param seqname sequence名 * @return string 序列号 * @author coderzl */ @override public string nextval(string seqname) { return objects.tostring(mysqlsequencefactory.getnextval(seqname)); } }
工厂
工厂只做了两件事
•服务启动的时候,初始化数据库中所有sequence【完成序列区间缓存】
•获取sequence的下一个值
@component public class mysqlsequencefactory { private final lock lock = new reentrantlock(); /** */ private map<string,mysqlsequenceholder> holdermap = new concurrenthashmap<>(); @autowired private mysqlsequencedao msqlsequencedao; /** 单个sequence初始化乐观锁更新失败重试次数 */ @value("${seq.init.retry:5}") private int initretrynum; /** 单个sequence更新序列区间乐观锁更新失败重试次数 */ @value("${seq.get.retry:20}") private int getretrynum; @postconstruct private void init(){ //初始化所有sequence initall(); } /** * <p> 加载表中所有sequence,完成初始化 </p> * @return void * @author coderzl */ private void initall(){ try { lock.lock(); list<mysqlsequencebo> bolist = msqlsequencedao.getall(); if (bolist == null) { throw new illegalargumentexception("the sequencerecord is null!"); } for (mysqlsequencebo bo : bolist) { mysqlsequenceholder holder = new mysqlsequenceholder(msqlsequencedao, bo,initretrynum,getretrynum); holder.init(); holdermap.put(bo.getseqname(), holder); } }finally { lock.unlock(); } } /** * <p> </p> * @param seqname * @return long * @author coderzl */ public long getnextval(string seqname){ mysqlsequenceholder holder = holdermap.get(seqname); if (holder == null) { try { lock.lock(); holder = holdermap.get(seqname); if (holder != null){ return holder.getnextval(); } mysqlsequencebo bo = msqlsequencedao.getsequence(seqname); holder = new mysqlsequenceholder(msqlsequencedao, bo,initretrynum,getretrynum); holder.init(); holdermap.put(seqname, holder); }finally { lock.unlock(); } } return holder.getnextval(); } }
单一sequence的holder
•init() 初始化 其中包括参数校验,数据库记录更新,创建序列区间
•getnextval() 获取下一个值
public class mysqlsequenceholder { private final lock lock = new reentrantlock(); /** seqname */ private string seqname; /** sequencedao */ private mysqlsequencedao sequencedao; private mysqlsequencebo sequencebo; /** */ private sequencerange sequencerange; /** 是否初始化 */ private volatile boolean isinitialize = false; /** sequence初始化重试次数 */ private int initretrynum; /** sequence获取重试次数 */ private int getretrynum; /** * <p> 构造方法 </p> * @title mysqlsequenceholder * @param sequencedao * @param sequencebo * @param initretrynum 初始化时,数据库更新失败后重试次数 * @param getretrynum 获取nextval时,数据库更新失败后重试次数 * @return * @author coderzl */ public mysqlsequenceholder(mysqlsequencedao sequencedao, mysqlsequencebo sequencebo,int initretrynum,int getretrynum) { this.sequencedao = sequencedao; this.sequencebo = sequencebo; this.initretrynum = initretrynum; this.getretrynum = getretrynum; if(sequencebo != null) this.seqname = sequencebo.getseqname(); } /** * <p> 初始化 </p> * @title init * @param * @return void * @author coderzl */ public void init(){ if (isinitialize == true) { throw new sequenceexception("[" + seqname + "] the mysqlsequenceholder has inited"); } if (sequencedao == null) { throw new sequenceexception("[" + seqname + "] the sequencedao is null"); } if (seqname == null || seqname.trim().length() == 0) { throw new sequenceexception("[" + seqname + "] the sequencename is null"); } if (sequencebo == null) { throw new sequenceexception("[" + seqname + "] the sequencebo is null"); } if (!sequencebo.validate()){ throw new sequenceexception("[" + seqname + "] the sequencebo validate fail. bo:"+sequencebo); } // 初始化该sequence try { initsequencerecord(sequencebo); } catch (sequenceexception e) { throw e; } isinitialize = true; } /** * <p> 获取下一个序列号 </p> * @title getnextval * @param * @return long * @author coderzl */ public long getnextval(){ if(isinitialize == false){ throw new sequenceexception("[" + seqname + "] the mysqlsequenceholder not inited"); } if(sequencerange == null){ throw new sequenceexception("[" + seqname + "] the sequencerange is null"); } long curvalue = sequencerange.getandincrement(); if(curvalue == -1){ try{ lock.lock(); curvalue = sequencerange.getandincrement(); if(curvalue != -1){ return curvalue; } sequencerange = retryrange(); curvalue = sequencerange.getandincrement(); }finally { lock.unlock(); } } return curvalue; } /** * <p> 初始化当前这条记录 </p> * @title initsequencerecord * @description * @param sequencebo * @return void * @author coderzl */ private void initsequencerecord(mysqlsequencebo sequencebo){ //在限定次数内,乐观锁更新数据库记录 for(int i = 1; i < initretrynum; i++){ //查询bo mysqlsequencebo curbo = sequencedao.getsequence(sequencebo.getseqname()); if(curbo == null){ throw new sequenceexception("[" + seqname + "] the current sequencebo is null"); } if (!curbo.validate()){ throw new sequenceexception("[" + seqname + "] the current sequencebo validate fail"); } //改变当前值 long newvalue = curbo.getseqvalue()+curbo.getstep(); //检查当前值 if(!checkcurrentvalue(newvalue,curbo)){ newvalue = resetcurrentvalue(curbo); } int result = sequencedao.updsequence(sequencebo.getseqname(),curbo.getseqvalue(),newvalue); if(result > 0){ sequencerange = new sequencerange(curbo.getseqvalue(),newvalue - 1); curbo.setseqvalue(newvalue); this.sequencebo = curbo; return; }else{ continue; } } //限定次数内,更新失败,抛出异常 throw new sequenceexception("[" + seqname + "] sequencebo update error"); } /** * <p> 检查新值是否合法 新的当前值是否在最大最小值之间</p> * @param curvalue * @param curbo * @return boolean * @author coderzl */ private boolean checkcurrentvalue(long curvalue,mysqlsequencebo curbo){ if(curvalue > curbo.getminvalue() && curvalue <= curbo.getmaxvalue()){ return true; } return false; } /** * <p> 重置sequence当前值 :当前sequence达到最大值时,重新从最小值开始 </p> * @title resetcurrentvalue * @param curbo * @return long * @author coderzl */ private long resetcurrentvalue(mysqlsequencebo curbo){ return curbo.getminvalue(); } /** * <p> 缓存区间使用完毕时,重新读取数据库记录,缓存新序列段 </p> * @title retryrange * @param sequencerange * @author coderzl */ private sequencerange retryrange(){ for(int i = 1; i < getretrynum; i++){ //查询bo mysqlsequencebo curbo = sequencedao.getsequence(sequencebo.getseqname()); if(curbo == null){ throw new sequenceexception("[" + seqname + "] the current sequencebo is null"); } if (!curbo.validate()){ throw new sequenceexception("[" + seqname + "] the current sequencebo validate fail"); } //改变当前值 long newvalue = curbo.getseqvalue()+curbo.getstep(); //检查当前值 if(!checkcurrentvalue(newvalue,curbo)){ newvalue = resetcurrentvalue(curbo); } int result = sequencedao.updsequence(sequencebo.getseqname(),curbo.getseqvalue(),newvalue); if(result > 0){ sequencerange = new sequencerange(curbo.getseqvalue(),newvalue - 1); curbo.setseqvalue(newvalue); this.sequencebo = curbo; return sequencerange; }else{ continue; } } throw new sequenceexception("[" + seqname + "] sequencebo update error"); } }
总结
•当服务重启或异常的时候,会丢失当前服务所缓存且未用完的序列
•分布式场景,多个服务同时初始化,或者重新获取sequence时,乐观锁会保证彼此不冲突。a服务获取0-99,b服务会获取100-199,以此类推
•当该sequence获取较为频繁时,增大step值,能提升性能。但同时服务异常时,损失的序列也较多
•修改数据库里sequence的一些属性值,比如step,max等,再下一次从数据库获取时,会启用新的参数
•sequence只是提供了有限个序列号(最多max-min个),达到max后,会循环从头开始。
•由于sequence会循环,所以达到max后,再获取,就不会唯一。建议使用sequence来做业务流水号时,拼接时间。如:20170612235101+序列号
业务id拼接方法
@service public class jrngeneratorservice { private static final string seq_name = "t_seq_test"; /** sequence服务 */ @autowired private mysqlsequence mysqlsequence; public string generatejrn() { try { string sequence = mysqlsequence.getnextvalue(seq_name); sequence = leftpadding(sequence,8); calendar calendar = calendar.getinstance(); simpledateformat sdateformat = new simpledateformat("yyyymmddhhmmss"); string nowdate = sdateformat.format(calendar.gettime()); nowdate.substring(4, nowdate.length()); string jrn = nowdate + sequence + randomutil.getfixedlengthrandom(6);//10位时间+8位序列 + 6位随机数=24位流水号 return jrn; } catch (exception e) { //todo } } private string leftpadding(string seq,int len){ string res =""; string str =""; if(seq.length()<len){ for(int i=0;i<len-seq.length();i++){ str +="0"; } } res =str+seq; return res; } }
以上这篇基于mysql的sequence实现方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
上一篇: MySQL无法启动的解决办法
下一篇: vue2组件实现懒加载浅析