spark streaming updateStateByKey
程序员文章站
2022-07-11 12:23:16
...
1、updateStateByKey(updateFun)
2、updateFun
因为数据量大所以使用HyperLogLogPlus
val updateFuc = (values: Seq[String], state: Option[HyperLogLogPlus]) => {
if (state.nonEmpty) {
val hll = state.get
for (value <- values) { hll.offer(value) }
Option(hll)
} else {
val hll = new HyperLogLogPlus(14)
for (value <- values) { hll.offer(value) }
Option(hll)
}
}
updateStateByKey通用统计
//preVauleState为某key的原始统计值,currValues为某key当前需要进行统计的值
val addFunction = (currValues : Seq[Double],preVauleState : Option[Double]) => {
val currentSum = currValues.sum
val previousSum = preVauleState.getOrElse(0.0)
Some(currentSum + previousSum)
}
updateStateByKey(addFunction)
hyperloglog的java版使用
算法原理不是太明白
public void test(){
final int seed = 123456;
HashFunction hash = Hashing.murmur3_128(seed);
// data on which to calculate distinct count
final Integer[] data = new Integer[]{1, 1, 2, 3, 4, 5, 6, 6,
6, 7, 7, 7, 7, 8, 10};
final HLL hll = new HLL(13, 5); //number of bucket and bits per bucket
for (int item : data) {
final long value = hash.newHasher().putInt(item).hash().asLong();
hll.addRaw(value);
}
System.out.println("Distinct count="+ hll.cardinality());
}
maven
<dependency>
<groupId>net.agkn</groupId>
<artifactId>hll</artifactId>
<version>1.6.0</version>
</dependency>
上一篇: hyperloglog应用
下一篇: 亿网文交管理者孟建州告诉你算法代码的实现
推荐阅读
-
使用docker快速搭建Spark集群的方法教程
-
spark 安装教程(spark环境搭建及配置)
-
将string类型的数据类型转换为spark rdd时报错的解决方法
-
Spark SQL,如何将 DataFrame 转为 json 格式
-
spark: RDD与DataFrame之间的相互转换方法
-
用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试
-
Spark SQL常见4种数据源详解
-
spark dataframe 将一列展开,把该列所有值都变成新列的方法
-
CarbonData 大数据交互式分析实践(spark summit)
-
spark平台基本组成(spark安装详细教程)