欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hive udf 唯一bigInt 生成器

程序员文章站 2022-06-07 23:12:29
...

一、背景

        mysql数据由于自增的bigint 主键,会插入更快,因为能持续往文件末尾插入嘛,因此需要这个东西。

        然后呢,服务端有专门生产id的接口,但是数据中心批量插入,肯定会拉暴他们,不让我们一起玩,只能自己玩。

 

二、方案

        1.redis 获取数据段,程序内部自增。

        问题:要用外部redis麻烦,而且要持久化

 

        2.python 服务注册的,这个自己网上搜索。

        问题:要机器,要服务端和客户端,而且python 我们版本不够高。。。尴尬,不想随便升级整个集群

 

 

        3.twitter 的 snowflake 算法

         参考: https://www.jianshu.com/p/54a87a7c3622

 

         参数:workId  datacenterId 唯一

  

         问题:

          1.两个ID 我都没法唯一。MAP 阶段可能是在同一个机器上,同时执行,参数不好搞

          

          解决方案:

          1.我们场景是只需要同一个任务的主键不重复就行。因此workId 我选取map 的ID,毕竟同一个任务,每个MAP的ID 肯定不同。 datacenterId   可以先默认0.

 

三.上代码:

      

public class MagicSnowFlake {

    //其实时间戳   2017-01-01 00:00:00
    private final static long twepoch = 1483200000000l;

 // 改到16 65535,认为MAP的最大数量限制

    private final static long ipIdMax = 65535;

    // 默认1位,我们小,没那么多数据中心,意思一下
    private final static long dataCenterIdBits = 1L;

  // 9+1 (10 )

    private final static long mapIdBits = 9L;

    private final static long dataCenterIdMax = ~ (-1L << dataCenterIdBits);

    //序列在id中占的位数 12bit
    private final static long seqBits = 12L;

    //序列最大值 4095 即2的12次方减一。
    private final static long seqMax = ~(-1L << seqBits);

    // 64位的数字:首位0  随后41位表示时间戳 MAP_ID 最后12位序列号
    private final static long dataCenterIdLeftShift = seqBits;
    private final static long mapIdLeftShift = seqBits + dataCenterIdBits;
    private final static long timeLeftShift = seqBits  + dataCenterIdBits + mapIdLeftShift;

    //IP标识(0~255)
    private long ipId;

    // 数据中心ID(0~3)
    private long dataCenterId;

    // 毫秒内序列(0~4095)
    private long seq = 0L;

    // 上次生成ID的时间截
    private long lastTime = -1L;

    public MagicSnowFlake(long ipId, long dataCenterId) {
        if(ipId < 0 || ipId > ipIdMax) {
            System.out.println(" ---------- ipId不在正常范围内(0~"+ipIdMax +") " + ipId);
            System.exit(0);
        }

        if(dataCenterId < 0 || dataCenterId > dataCenterIdMax) {
            System.out.println(" ---------- dataCenterId不在正常范围内(0~"+dataCenterIdMax +") " + dataCenterId);
            System.exit(0);
        }

        this.ipId = ipId;
        this.dataCenterId = dataCenterId;
    }

    public synchronized long nextId() {
        long nowTime = System.currentTimeMillis();

        if(nowTime < lastTime) {
            System.out.println(" ---------- 当前时间前于上次操作时间,当前时间有误: " + nowTime);
            System.exit(0);
        }

        if(nowTime == lastTime) {
            seq = (seq + 1) & seqMax;
            if(seq == 0) {
                nowTime = getNextTimeStamp();
            }
        } else {
            seq = 0L;
        }

        lastTime = nowTime;


        return ((nowTime - twepoch) << timeLeftShift)
                | (ipId << mapIdLeftShift)
                | (dataCenterId << dataCenterIdLeftShift)
                | seq;
    }

    private long getNextTimeStamp() {
        long nowTime;
        do {
            nowTime = System.currentTimeMillis();
        } while(nowTime <= lastTime);
        return nowTime;
    }

    public static void main(String[] args) {
        System.out.println(Long.MAX_VALUE);
        MagicSnowFlake msf = new MagicSnowFlake(1, 1);
        msf.nextId();
        System.out.println(~ (-1L << 15));
    }
}

 

       

 

    UDF 部分

   

import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

/**
 * @author <a href="mailto:huoguo@2dfire.com">火锅</a>
 * @time 18/3/8
 */
@UDFType(deterministic = false, stateful = true)
public class LongIdUDF extends GenericUDF {
    private static final char SEPARATOR = '_';
    private static final String ATTEMPT = "attempt";
    private long mapTaskId = 0l;
    private int increment = 0;

    private MagicSnowFlake snowFlake;



    @Override
    public void configure(MapredContext context) {
        increment = context.getJobConf().getNumMapTasks();
        if(increment == 0) {
            throw new IllegalArgumentException("mapred.map.tasks is zero");
        }

        mapTaskId = getInitId(context.getJobConf().get("mapred.task.id"),increment);
        if(mapTaskId == 0l) {
            throw new IllegalArgumentException("mapred.task.id");
        }
    }

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments)
            throws UDFArgumentException {
        return PrimitiveObjectInspectorFactory.javaLongObjectInspector;
    }

    @Override
    public Long evaluate(DeferredObject[] arguments) throws HiveException {
        if(snowFlake == null){
            int dataCenterId = Integer.parseInt(arguments[0].get().toString());
            snowFlake = new MagicSnowFlake(getMapTaskId(),dataCenterId);
        }
        return snowFlake.nextId();
    }

    @Override
    public String getDisplayString(String[] children) {
        return "getLongId(0)";
    }


    private synchronized long getMapTaskId() {
        return mapTaskId;
    }

    //attempt_1478926768563_0537_m_000004_0 // return 0+1
    private long getInitId (String taskAttemptIDstr,int numTasks)
            throws IllegalArgumentException {
        try {
            String[] parts = taskAttemptIDstr.split(Character.toString(SEPARATOR));
            if(parts.length == 6) {
                if(parts[0].equals(ATTEMPT)) {
                    if(!parts[3].equals("m") && !parts[3].equals("r")) {
                        throw new Exception();
                    }
                    long result = Long.parseLong(parts[4]);
                    if(result >= numTasks) { //if taskid >= numtasks
                        throw new Exception("TaskAttemptId string : " + taskAttemptIDstr  + "  parse ID [" + result + "] >= numTasks[" + numTasks + "] ..");
                    }
                    return result + 1;
                }
            }
        } catch (Exception e) {}
        throw new IllegalArgumentException("TaskAttemptId string : " + taskAttemptIDstr + " is  not properly formed");
    }


    public static void main(String[] args) {
        String s = "attempt_1478926768563_0537_m_000004_4";
        System.out.println(new LongIdUDF().getInitId(s,5));
    }

}

 

 

   小结:

          1.copy的代码自己改造的,忘记位置了,总的来说是出自twitter 嘛。

          2.用了30亿的表进行测试,没重复

          3.有问题 请及时提出,