欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark streaming updateStateByKey

程序员文章站 2022-07-11 12:23:16
...

1、updateStateByKey(updateFun)

2、updateFun

因为数据量大所以使用HyperLogLogPlus

val updateFuc = (values: Seq[String], state: Option[HyperLogLogPlus]) => {
      if (state.nonEmpty) {
        val hll = state.get
        for (value <- values) { hll.offer(value) }
        Option(hll)
      } else {
        val hll = new HyperLogLogPlus(14)
        for (value <- values) { hll.offer(value) }
        Option(hll)
      }
    }


updateStateByKey通用统计

//preVauleState为某key的原始统计值,currValues为某key当前需要进行统计的值
 val addFunction = (currValues : Seq[Double],preVauleState : Option[Double]) => {
        val currentSum = currValues.sum
        val previousSum = preVauleState.getOrElse(0.0)
        Some(currentSum + previousSum)
      }

updateStateByKey(addFunction)

hyperloglog的java版使用

算法原理不是太明白

    public void test(){
        final int seed = 123456;
        HashFunction hash = Hashing.murmur3_128(seed);
        // data on which to calculate distinct count
        final Integer[] data = new Integer[]{1, 1, 2, 3, 4, 5, 6, 6,
                6, 7, 7, 7, 7, 8, 10};
        final HLL hll = new HLL(13, 5); //number of bucket and bits per bucket
        for (int item : data) {
            final long value = hash.newHasher().putInt(item).hash().asLong();
            hll.addRaw(value);
        }
        System.out.println("Distinct count="+ hll.cardinality());
    }

maven

<dependency>
            <groupId>net.agkn</groupId>
            <artifactId>hll</artifactId>
            <version>1.6.0</version>
        </dependency>