class AreaClickUDAF extends UserDefinedAggregateFunction {
// 输入数据的类型: 北京 String
override def inputSchema: StructType = {
StructType(StructField("city_name", StringType) :: Nil)
// StructType(Array(StructField("city_name", StringType)))
}
// 缓存的数据的类型: 北京->1000, 天津->5000 Map, 总的点击量 1000/?
override def bufferSchema: StructType = {
// MapType(StringType, LongType) 还需要标注 map的key的类型和value的类型
StructType(StructField("city_count", MapType(StringType, LongType)) :: StructField("total_count", LongType) :: Nil)
}
// 输出的数据类型 "北京21.2%,天津13.2%,其他65.6%" String
override def dataType: DataType = StringType
// 相同的输入是否应用有相同的输出.
override def deterministic: Boolean = true
// 给存储数据初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
//初始化map缓存
buffer(0) = Map[String, Long]()
// 初始化总的点击量
buffer(1) = 0L
}
// 分区内合并 Map[城市名, 点击量]
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// 首先拿到城市名, 然后把成名作为key去查看map中是否存在, 如果存在就把对应的值 +1, 如果不存在, 则直径0+1
val cityName = input.getString(0)
// val map: collection.Map[String, Long] = buffer.getMap[String, Long](0)
val map: Map[String, Long] = buffer.getAs[Map[String, Long]](0)
buffer(0) = map + (cityName -> (map.getOrElse(cityName, 0L) + 1L))
// 碰到一个城市, 则总的点击量要+1
buffer(1) = buffer.getLong(1) + 1L
}
// 分区间的合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val map1 = buffer1.getAs[Map[String, Long]](0)
val map2 = buffer2.getAs[Map[String, Long]](0)
// 把map1的键值对与map2中的累积, 最后赋值给buffer1
buffer1(0) = map1.foldLeft(map2) {
case (map, (k, v)) =>
map + (k -> (map.getOrElse(k, 0L) + v))
}
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 最终的输出. "北京21.2%,天津13.2%,其他65.6%"
override def evaluate(buffer: Row): Any = {
val cityCountMap = buffer.getAs[Map[String, Long]](0)
val totalCount = buffer.getLong(1)
var citysRatio: List[CityRemark] = cityCountMap.toList.sortBy(-_._2).take(2).map {
case (cityName, count) => {
CityRemark(cityName, count.toDouble / totalCount)
}
}
// 如果城市的个数超过2才显示其他
if (cityCountMap.size > 2) {
citysRatio = citysRatio :+ CityRemark("其他", citysRatio.foldLeft(1D)(_ - _.cityRatio))
}
citysRatio.mkString(", ")
}
}
case class CityRemark(cityName: String, cityRatio: Double) {
val formatter = new DecimalFormat("0.00%")
override def toString: String = s"$cityName:${formatter.format(cityRatio)}"
}
def main(args: Array[String]): Unit = {
val ss = SparkSession.builder().master("local[*]").appName("wula").enableHiveSupport().getOrCreate()
val sc = ss.sparkContext
sc.setLogLevel("warn")
ss.sql("use testDemo01").show()
//请使用spark sql 计算各个区域前三大热门商品,并备注上每个商品在主要城市中的分布比例,超过两个城市用其他显示。
//用户行为表 时间 用户id 会话id 页面id 时间戳 搜索关键字 点击品类id 点击产品id 下单品类id 下单产品id 支付品类ids 支付产品ids 城市id
//城市表 字段依次是:城市id 城市名字 地区
//产品表: 产品id 产品名字 店铺类型
ss.udf.register("cr", new AreaClickUDAF)
val cvpDF1: DataFrame = ss.sql("select ci.*,pva.click_product_id,pi.product_name from product_info pi join user_visit_action pva on pi.product_id=pva.click_product_id join city_info ci on ci.city_id = pva.city_id where click_product_id>-1".stripMargin)
cvpDF1.createOrReplaceTempView("tb1")
// cvpDF1.show()
val cvpDF2: DataFrame = ss.sql("select tb1.area, tb1.product_name, count(*) click_count,cr(tb1.city_name) from tb1 group by area,product_name".stripMargin)
cvpDF2.createOrReplaceTempView("tb2")
ss.sql("select *,rank() over(partition by tb2.area order by tb2.click_count desc) rank from tb2 having rank <= 3".stripMargin).show()
ss.stop()
}
Spark Structured Streaming UDAF使用实例
程序员文章站
2022-07-14 21:54:17
...
上一篇: iframeèªéåºé«åº¦é®é¢
下一篇: textarea 自适应宽度和高度
推荐阅读
-
Spark Structured Streaming 实时解析mr 任务
-
structured streaming 入门级初使用(一)
-
Structured Streaming入门实例
-
在 Spark SQL 和 Spark Structured Streaming 中使用 Pulsar
-
Spark Structured Streaming UDAF使用实例
-
Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统
-
Spark Streaming vs. Structured Streaming
-
Spark Streaming实时流处理项目实战笔记——updateStateByKey算子的使用
-
Spark Streaming实时流处理项目实战笔记——使用Flume采集Log4j产生的日志
-
Spark Streaming算子开发实例