hive udf 唯一bigInt 生成器
一、背景
mysql数据由于自增的bigint 主键,会插入更快,因为能持续往文件末尾插入嘛,因此需要这个东西。
然后呢,服务端有专门生产id的接口,但是数据中心批量插入,肯定会拉暴他们,不让我们一起玩,只能自己玩。
二、方案
1.redis 获取数据段,程序内部自增。
问题:要用外部redis麻烦,而且要持久化
2.python 服务注册的,这个自己网上搜索。
问题:要机器,要服务端和客户端,而且python 我们版本不够高。。。尴尬,不想随便升级整个集群
3.twitter 的 snowflake 算法
参考: https://www.jianshu.com/p/54a87a7c3622
参数:workId datacenterId 唯一
问题:
1.两个ID 我都没法唯一。MAP 阶段可能是在同一个机器上,同时执行,参数不好搞
解决方案:
1.我们场景是只需要同一个任务的主键不重复就行。因此workId 我选取map 的ID,毕竟同一个任务,每个MAP的ID 肯定不同。 datacenterId 可以先默认0.
三.上代码:
public class MagicSnowFlake { //其实时间戳 2017-01-01 00:00:00 private final static long twepoch = 1483200000000l;
// 改到16位 65535,认为MAP的最大数量限制
private final static long ipIdMax = 65535; // 默认1位,我们小,没那么多数据中心,意思一下 private final static long dataCenterIdBits = 1L;
// 9+1 (10 )
private final static long mapIdBits = 9L;
private final static long dataCenterIdMax = ~ (-1L << dataCenterIdBits); //序列在id中占的位数 12bit private final static long seqBits = 12L; //序列最大值 4095 即2的12次方减一。 private final static long seqMax = ~(-1L << seqBits); // 64位的数字:首位0 随后41位表示时间戳 MAP_ID 最后12位序列号 private final static long dataCenterIdLeftShift = seqBits; private final static long mapIdLeftShift = seqBits + dataCenterIdBits; private final static long timeLeftShift = seqBits + dataCenterIdBits + mapIdLeftShift; //IP标识(0~255) private long ipId; // 数据中心ID(0~3) private long dataCenterId; // 毫秒内序列(0~4095) private long seq = 0L; // 上次生成ID的时间截 private long lastTime = -1L; public MagicSnowFlake(long ipId, long dataCenterId) { if(ipId < 0 || ipId > ipIdMax) { System.out.println(" ---------- ipId不在正常范围内(0~"+ipIdMax +") " + ipId); System.exit(0); } if(dataCenterId < 0 || dataCenterId > dataCenterIdMax) { System.out.println(" ---------- dataCenterId不在正常范围内(0~"+dataCenterIdMax +") " + dataCenterId); System.exit(0); } this.ipId = ipId; this.dataCenterId = dataCenterId; } public synchronized long nextId() { long nowTime = System.currentTimeMillis(); if(nowTime < lastTime) { System.out.println(" ---------- 当前时间前于上次操作时间,当前时间有误: " + nowTime); System.exit(0); } if(nowTime == lastTime) { seq = (seq + 1) & seqMax; if(seq == 0) { nowTime = getNextTimeStamp(); } } else { seq = 0L; } lastTime = nowTime; return ((nowTime - twepoch) << timeLeftShift) | (ipId << mapIdLeftShift) | (dataCenterId << dataCenterIdLeftShift) | seq; } private long getNextTimeStamp() { long nowTime; do { nowTime = System.currentTimeMillis(); } while(nowTime <= lastTime); return nowTime; } public static void main(String[] args) { System.out.println(Long.MAX_VALUE); MagicSnowFlake msf = new MagicSnowFlake(1, 1); msf.nextId(); System.out.println(~ (-1L << 15)); } }
UDF 部分
import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; /** * @author <a href="mailto:huoguo@2dfire.com">火锅</a> * @time 18/3/8 */ @UDFType(deterministic = false, stateful = true) public class LongIdUDF extends GenericUDF { private static final char SEPARATOR = '_'; private static final String ATTEMPT = "attempt"; private long mapTaskId = 0l; private int increment = 0; private MagicSnowFlake snowFlake; @Override public void configure(MapredContext context) { increment = context.getJobConf().getNumMapTasks(); if(increment == 0) { throw new IllegalArgumentException("mapred.map.tasks is zero"); } mapTaskId = getInitId(context.getJobConf().get("mapred.task.id"),increment); if(mapTaskId == 0l) { throw new IllegalArgumentException("mapred.task.id"); } } @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { return PrimitiveObjectInspectorFactory.javaLongObjectInspector; } @Override public Long evaluate(DeferredObject[] arguments) throws HiveException { if(snowFlake == null){ int dataCenterId = Integer.parseInt(arguments[0].get().toString()); snowFlake = new MagicSnowFlake(getMapTaskId(),dataCenterId); } return snowFlake.nextId(); } @Override public String getDisplayString(String[] children) { return "getLongId(0)"; } private synchronized long getMapTaskId() { return mapTaskId; } //attempt_1478926768563_0537_m_000004_0 // return 0+1 private long getInitId (String taskAttemptIDstr,int numTasks) throws IllegalArgumentException { try { String[] parts = taskAttemptIDstr.split(Character.toString(SEPARATOR)); if(parts.length == 6) { if(parts[0].equals(ATTEMPT)) { if(!parts[3].equals("m") && !parts[3].equals("r")) { throw new Exception(); } long result = Long.parseLong(parts[4]); if(result >= numTasks) { //if taskid >= numtasks throw new Exception("TaskAttemptId string : " + taskAttemptIDstr + " parse ID [" + result + "] >= numTasks[" + numTasks + "] .."); } return result + 1; } } } catch (Exception e) {} throw new IllegalArgumentException("TaskAttemptId string : " + taskAttemptIDstr + " is not properly formed"); } public static void main(String[] args) { String s = "attempt_1478926768563_0537_m_000004_4"; System.out.println(new LongIdUDF().getInitId(s,5)); } }
小结:
1.copy的代码自己改造的,忘记位置了,总的来说是出自twitter 嘛。
2.用了30亿的表进行测试,没重复
3.有问题 请及时提出,
下一篇: 男女之间的暴笑笑话