欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark window 的使用

程序员文章站 2022-07-12 18:12:23
...

1. window 用在rank 中的使用

看这样一个需求,求出每个销售人员的按照销售金额大小的orderid

package com.waitingfy

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._


object WindowTest {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[2]")
      .appName("WindowTest")
      .getOrCreate()
    import spark.implicits._

    val orders = Seq(
      ("1", "s1", "2017-05-01", 100),
      ("2", "s1", "2017-05-02", 200),
      ("3", "s1", "2017-05-02", 200),
      ("4", "s2", "2017-05-01", 300),
      ("5", "s2", "2017-05-01", 100),
      ("6", "s3", "2017-05-01", 100),
      ("6", "s3", "2017-05-02", 50)
    ).toDF("order_id", "seller_id", "order_date", "price")


    val rankSpec = Window.partitionBy("seller_id").orderBy(orders("price").desc)

    val shopOrderRank =
      orders.withColumn("rank", dense_rank.over(rankSpec))

    shopOrderRank.show()

    spark.close()
  }
}
+--------+---------+----------+-----+----+
|order_id|seller_id|order_date|price|rank|
+--------+---------+----------+-----+----+
|       4|       s2|2017-05-01|  300|   1|
|       5|       s2|2017-05-01|  100|   2|
|       6|       s3|2017-05-01|  100|   1|
|       6|       s3|2017-05-02|   50|   2|
|       2|       s1|2017-05-02|  200|   1|
|       3|       s1|2017-05-02|  200|   1|
|       1|       s1|2017-05-01|  100|   2|
+--------+---------+----------+-----+----+

注意下这边用了dense_rank,所以金额一样的,排名一样,而且下一个排名不会出现断层
如果用rank,结果如下:

+--------+---------+----------+-----+----+
|order_id|seller_id|order_date|price|rank|
+--------+---------+----------+-----+----+
|       4|       s2|2017-05-01|  300|   1|
|       5|       s2|2017-05-01|  100|   2|
|       6|       s3|2017-05-01|  100|   1|
|       6|       s3|2017-05-02|   50|   2|
|       2|       s1|2017-05-02|  200|   1|
|       3|       s1|2017-05-02|  200|   1|
|       1|       s1|2017-05-01|  100|   3|
+--------+---------+----------+-----+----+

如果用row_number 结果如下:

+--------+---------+----------+-----+----+
|order_id|seller_id|order_date|price|rank|
+--------+---------+----------+-----+----+
|       4|       s2|2017-05-01|  300|   1|
|       5|       s2|2017-05-01|  100|   2|
|       6|       s3|2017-05-01|  100|   1|
|       6|       s3|2017-05-02|   50|   2|
|       2|       s1|2017-05-02|  200|   1|
|       3|       s1|2017-05-02|  200|   2|
|       1|       s1|2017-05-01|  100|   3|
+--------+---------+----------+-----+----+

2. 求移动平均,或者移动和类似rowsBetween的使用

现在需求是求一个营业员的最近2个订单的平均成交价格

package com.waitingfy

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._


object WindowTest {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[2]")
      .appName("WindowTest")
      .getOrCreate()
    import spark.implicits._

    val orders = Seq(
      ("1", "s1", "2017-05-01", 100),
      ("2", "s1", "2017-05-02", 200),
      ("3", "s1", "2017-05-02", 200),
      ("4", "s2", "2017-05-01", 300),
      ("5", "s2", "2017-05-01", 100),
      ("6", "s3", "2017-05-01", 100),
      ("6", "s3", "2017-05-02", 50)
    ).toDF("order_id", "seller_id", "order_date", "price")


    val rankSpec = Window.partitionBy("seller_id").orderBy(orders("order_date")).rowsBetween(-1, 0)

//    val shopOrderRank =
//      orders.withColumn("rank", row_number.over(rankSpec))

        val shopOrderRank =
          orders.withColumn("avg sum", avg("price").over(rankSpec))

    shopOrderRank.show()

    spark.close()
  }
}

注意rowsBetween的使用。

 

+--------+---------+----------+-----+-------+
|order_id|seller_id|order_date|price|avg sum|
+--------+---------+----------+-----+-------+
|       4|       s2|2017-05-01|  300|  300.0|
|       5|       s2|2017-05-01|  100|  200.0|
|       6|       s3|2017-05-01|  100|  100.0|
|       6|       s3|2017-05-02|   50|   75.0|
|       1|       s1|2017-05-01|  100|  100.0|
|       2|       s1|2017-05-02|  200|  150.0|
|       3|       s1|2017-05-02|  200|  200.0|
+--------+---------+----------+-----+-------+

http://www.waitingfy.com/archives/4409