欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink_SQL 应用

程序员文章站 2022-06-17 09:22:27
...

1.批数据 SQL

Flink_SQL 应用
参考代码

package com.czxy.flink.stream

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row

/**
 * 需求:
 * 使用 Flink SQL 统计用户消费订单的总金额、 最大金额、 最小金额、 订单总数。
 * 实现步骤:
 * 1) 获取一个批处理运行环境
 * 2) 获取一个 Table 运行环境
 * 3) 创建一个样例类 Order 用来映射数据(订单名、 用户名、 订单日期、 订单金额)
 * 4) 基于本地 Order 集合创建一个 DataSet source
 * 5) 使用 Table 运行环境将 DataSet 注册为一张表
 * 6) 使用 SQL 语句来操作数据(统计用户消费订单的总金额、 最大金额、 最小金额、 订单总数)
 * 7) 使用 TableEnv.toDataSet 将 Table 转换为 DataSet
 * 8) 打印测试
 */
object BatchFlinkSql {
  //3.创建一个样例类 Order 用来映射数据(订单名、 用户名、 订单日期、 订单金额)
  case class Order(id:Int,userName:String,createTime:String,money:Double)
  
  def main(args: Array[String]): Unit = {
    //1.获取一个批处理运行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //2.获取一个 Table 运行环境
    val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
    //4. 基于本地 Order 集合创建一个 DataSet source
    import org.apache.flink.api.scala._
    val orderDataSet: DataSet[Order] = env.fromElements(
      Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
      Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
      Order(3, "lisi", "2018-10-20 16:30", 127.5),
      Order(4, "lisi", "2018-10-20 16:30", 328.5),
      Order(5, "lisi", "2018-10-20 16:30", 432.5),
      Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
      Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
      Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
      Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
    )
    //5. 使用 Table 运行环境将 DataSet 注册为一张表
    tableEnv.registerDataSet("t_order",orderDataSet)
    //6.使用 SQL 语句来操作数据(统计用户消费订单的总金额、 最大金额、 最小金额、 订单总数)
    //订单名、 用户名、 订单日期、 订单金额
    val sql=
      """
        |select
        | userName,
        | sum(money) as totalMoney,
        | max(money) as maxMoney,
        | min(money) as minMoney,
        | count(1) as totalCount
        | from t_order
        | group by userName
        |""".stripMargin
    val table: Table = tableEnv.sqlQuery(sql)
    table.printSchema()
    //7.使用 TableEnv.toDataSet 将 Table 转换为 DataSet
    val result: DataSet[Row] = tableEnv.toDataSet[Row](table)
    //8.打印测试
    result.print()
  }
}

2. 流数据 SQL

Flink_SQL 应用
参考代码

package com.czxy.flink.stream

import java.util.UUID
import java.util.concurrent.TimeUnit

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.api.scala._
import scala.util.Random

/**
 * 需求:
 * 使用 Flink SQL 来统计 5 秒内 用户的 订单总数、 订单的最大金额、 订单的最小金额。
 * 实现思路:
 * 1) 获取流处理运行环境
 * 2) 获取 Table 运行环境
 * 3) 设置处理时间为 EventTime
 * 4) 创建一个订单样例类 Order , 包含四个字段(订单 ID、 用户 ID、 订单金额、 时间戳)
 * 5) 创建一个自定义数据源
 *    a. 使用 for 循环生成 1000 个订单
 *    b. 随机生成订单 ID(UUID)
 *    c. 随机生成用户 ID(0-2)
 *    d. 随机生成订单金额(0-100)
 *    e. 时间戳为当前系统时间
 *    f. 每隔 1 秒生成一个订单
 * 6) 添加水印, 允许延迟 2 秒
 * 7) 导入 import org.apache.flink.table.api.scala._ 隐式参数
 * 8) 使用 registerDataStream 注册表, 并分别指定字段, 还要指定 rowtime 字段
 * 9) 编写 SQL 语句统计用户订单总数、 最大金额、 最小金额分组时要使用 tumble(时间列,interval '窗口时间' second) 来创建窗口
 * 10) 使用 tableEnv.sqlQuery 执行 sql 语句
 * 11) 将 SQL 的执行结果转换成 DataStream 再打印出来
 * 12) 启动流处理程序
 */
object StreamFlinkSql {

  //4.创建一个订单样例类 Order , 包含四个字段(订单 ID、 用户 ID、 订单金额、 时间戳)
  case class Order(orderId: String, userId: Int, money: Long, createTime: Long)

  def main(args: Array[String]): Unit = {
    
    //1.获取流处理运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //2.获取 Table 运行环境
    val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)

    //3.设置处理时间为 EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //5.创建一个自定义数据源
    val orderDataStream: DataStream[Order] = env.addSource(new RichSourceFunction[Order] {
      //定义一个字段boolean
      var isRunning: Boolean = true

      /**
       * a. 使用 for 循环生成 1000 个订单
       * b. 随机生成订单 ID(UUID)
       * c. 随机生成用户 ID(0-2)
       * d. 随机生成订单金额(0-100)
       * e. 时间戳为当前系统时间
       * f. 每隔 1 秒生成一个订单
       *
       * @param ctx
       */
      override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {
        //a. 使用 for 循环生成 1000 个订单
        for (i <- 0 until 1000 if isRunning) {
          val order: Order = Order(UUID.randomUUID().toString, Random.nextInt(3), Random.nextInt(101), System.currentTimeMillis())
          TimeUnit.SECONDS.sleep(1)
          ctx.collect(order)
        }
      }

      override def cancel(): Unit = {
        isRunning = false
      }
    })
    //6.添加水印, 允许延迟 2 秒
    val waterMark: DataStream[Order] = orderDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(2)) {
      override def extractTimestamp(element: Order): Long = {
        val eventTime: Long = element.createTime
        eventTime
      }
    })
    //7.导入 import org.apache.flink.table.api.scala._ 隐式参数
    import org.apache.flink.table.api.scala._
    //8.使用 registerDataStream 注册表, 并分别指定字段, 还要指定 rowtime 字段
    tableEnv.registerDataStream("t_order",waterMark,'orderId,'userId,'money,'createTime.rowtime)

    //9.编写 SQL 语句统计用户订单总数、 最大金额、
    // 最小金额分组时要使用 tumble(时间列,interval '窗口时间' second) 来创建窗口

    val sql=
      """
        | select
        |   userId,
        |   count(1) as totalCount,
        |   max(money) as maxMoney,
        |   min(money) as minMoney
        | from t_order
        | group by userId,tumble(createTime,interval '5' second)
        |""".stripMargin
    //10.使用 tableEnv.sqlQuery 执行 sql 语句
    val table: Table = tableEnv.sqlQuery(sql)
    table.printSchema()
    //11.将 SQL 的执行结果转换成 DataStream 再打印出来
    val result: DataStream[Row] = tableEnv.toAppendStream[Row](table)
    //12.打印输出
    result.print()

    env.execute(this.getClass.getSimpleName)
  }
}
相关标签: Flink