欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

实时数仓(三)需求1 :全量统计到目前为止查询城市、省份、订单总额、订单总数

程序员文章站 2022-07-14 21:47:34
...

需求1:全量统计到目前为止查询城市、省份、订单总额、订单总数

思路分析

  1. 自定义数据源,读取hbase表中dim_lagou_area和lagou_trade_orders消息
  2. 根据流数据构建flink table
  3. 执行关联的sql
SELECT 

areas.city AS city,
areas.province AS province,
COUNT(orders.orderid) AS totalNumber,
SUM(orders.totalMoney) AS totalMoney FROM 
lagou_trade_orders orders
INNER JOIN dim_lagou_area areas ON orders.areaId = areas.areaId
GROUP BY areas.city,areas.province
  1. 将结果table转换成流数据并打印出来

编码实现

1.建立样例类,自定义获取数据源继承RichSourceFunction,重写
open,run,close方法

package dws

case class TradeOrders(
                        orderId:Int,
                        orderNo:String,
                        userId:Int,
                        status:Int,
                        totalMoney:Double,
                        areaId:Int
                   )

package dws

import java.util

import dim.DimArea
import myUtils.ConnHBase
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.hadoop.hbase.{Cell, TableName}
import org.apache.hadoop.hbase.client.{Connection, Result, ResultScanner, Scan, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.flink.api.scala._

import scala.collection.JavaConverters._
class ReadDimArea extends RichSourceFunction[(String,String)] {
  private var conn:Connection=_
  private var table:Table=_
  private var scan:Scan=_
  private var flag=false

  //open
  override def open(parameters: Configuration): Unit = {
    conn = new ConnHBase().connToHabse
    table = conn.getTable(TableName.valueOf("dim_lagou_area"))
    scan = new Scan()
    scan.addFamily("f1".getBytes())


  }

  override def close(): Unit = {
    if(table!=null){
      table.close()
    }
    if(conn!=null){
      conn.close()
    }
  }

  override def run(sourceContext: SourceFunction.SourceContext[(String,String)]): Unit = {
    if(!flag){
      val rs: ResultScanner = table.getScanner(scan)
      val iterator: util.Iterator[Result] = rs.iterator()
      while(iterator.hasNext){
        val result: Result = iterator.next()
        val rowKey: String= Bytes.toString(result.getRow)
        val buffer = new StringBuffer()
        for(cell:Cell<-result.listCells().asScala){
          val value: String = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
          buffer.append(value).append(",")
        }
        val valueString: String = buffer.replace(buffer.length() - 1, buffer.length(), "").toString
        sourceContext.collect(rowKey,valueString)
      }
    }


  }

  override def cancel(): Unit = {
    flag =true

  }
}

package dws

import java.util

import myUtils.ConnHBase
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.hadoop.hbase.{Cell, TableName}
import org.apache.hadoop.hbase.client.{Connection, Result, ResultScanner, Scan, Table}
import org.apache.hadoop.hbase.util.Bytes

import scala.collection.JavaConverters._

class ReadTradeOrder extends RichSourceFunction[(String,String)]{
  private var connection:Connection = _
  private var htable:Table =_
  private var scan:Scan=_


  override def open(parameters: Configuration): Unit = {
    connection = new ConnHBase().connToHabse
    htable = connection.getTable(TableName.valueOf("lagou_trade_orders"))
    scan = new Scan()
    scan.addFamily("f1".getBytes())
  }

  override def close(): Unit = {
    if(htable!=null){
      htable.close()
    }
    if(connection!=null){
      connection.close()
    }
  }

  override def run(sourceContext: SourceFunction.SourceContext[(String,String)]): Unit = {
    val rs: ResultScanner = htable.getScanner(scan)
    val iterator: util.Iterator[Result] = rs.iterator()
    while(iterator.hasNext){
      val result: Result = iterator.next()
      val rowKey: String = Bytes.toString(result.getRow)
      val buffer = new StringBuffer()
      for(cell:Cell<-result.listCells().asScala){
        val value: String = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
        buffer.append(value).append(",")
      }
      val valueString: String = buffer.replace(buffer.length() - 1, buffer.length(), "").toString
      sourceContext.collect(rowKey,valueString)
    }

  }

  override def cancel(): Unit = {

  }
}

  1. flink进行处理处理
package dws

import dim.DimArea
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row

// 需求1:全量查询 查询城市、省份、订单总额、订单总数----全量查询
object TotalCityOrder {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(5000)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    //获取两个数据源
    val dimAreaStream: DataStream[(String, String)] = env.addSource(new ReadDimArea)
    val tradeOrdertream: DataStream[(String, String)] = env.addSource(new ReadTradeOrder)

    val areaStream: DataStream[DimArea] = dimAreaStream.map(x => {
      val areaId: Int = x._1.toInt
      val datas: Array[String] = x._2.split(",")
      val aname: String = datas(0).trim.toString
      val cid: Int = datas(1).trim.toInt
      val city: String = datas(2).trim.toString
      val proid: Int = datas(3).trim.toInt
      val province: String = datas(4).trim.toString
      DimArea(areaId, aname, cid, city, proid, province)
    })

    //(23,
    //
    // 110108,
    // 2020-06-28 19:02:18,
    // 1,
    // 0,
    // 0,
    // 2020-10-22 06:16:26,
    // 23a0b124546,
    // 4,
    // 2020-06-28 19:02:18,
    // 0.12,
    // 2,
    // 5391.0,
    // 0,
    // 0,
    // 32)
    val orderStream: DataStream[TradeOrders] = tradeOrdertream.map(x => {
      val orderid: Int = x._1.toInt
      val datas: Array[String] = x._2.split(",")
      //      datas.foreach(println)
      val orderNo: String = datas(6).trim.toString
      val userId: Int = datas(14).trim.toInt
      val status: Int = datas(10).toInt
      val totalMoney: Double = datas(11).toDouble
      val areaId: Int = datas(0).toInt
     // println(orderid, orderNo, userId, status, totalMoney, areaId)
      TradeOrders(orderid, orderNo, userId, status, totalMoney, areaId)
    })

//    tradeOrdertream.print()
 //   dimAreaStream.print()
 //   orderStream.print()

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    tableEnv.createTemporaryView("dim_lagou_area",areaStream)
    tableEnv.createTemporaryView("lagou_trade_orders",orderStream)


    val sql =
      """
        |SELECT
        |areas.city AS city,
        |areas.province AS province,
        |COUNT(orders.orderId) AS totalNumber,
        |SUM(orders.totalMoney) AS totalMoney FROM
        |lagou_trade_orders orders
        |INNER JOIN dim_lagou_area areas ON orders.areaId = areas.areaId
        |GROUP BY areas.city,areas.province
        |""".stripMargin

    val resultTable: Table = tableEnv.sqlQuery(sql)

    val result: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](resultTable)
    println("------------------------")
    result.filter(x=>x._1==true).print()

    env.execute()
  }

}

  1. 打印结果展示
    实时数仓(三)需求1 :全量统计到目前为止查询城市、省份、订单总额、订单总数