实时数仓(三)需求1 :全量统计到目前为止查询城市、省份、订单总额、订单总数
程序员文章站
2022-07-14 21:47:34
...
思路分析
- 自定义数据源,读取hbase表中dim_lagou_area和lagou_trade_orders消息
- 根据流数据构建flink table
- 执行关联的sql
SELECT
areas.city AS city,
areas.province AS province,
COUNT(orders.orderid) AS totalNumber,
SUM(orders.totalMoney) AS totalMoney FROM
lagou_trade_orders orders
INNER JOIN dim_lagou_area areas ON orders.areaId = areas.areaId
GROUP BY areas.city,areas.province
- 将结果table转换成流数据并打印出来
编码实现
1.建立样例类,自定义获取数据源继承RichSourceFunction,重写
open,run,close方法
package dws
case class TradeOrders(
orderId:Int,
orderNo:String,
userId:Int,
status:Int,
totalMoney:Double,
areaId:Int
)
package dws
import java.util
import dim.DimArea
import myUtils.ConnHBase
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.hadoop.hbase.{Cell, TableName}
import org.apache.hadoop.hbase.client.{Connection, Result, ResultScanner, Scan, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.flink.api.scala._
import scala.collection.JavaConverters._
class ReadDimArea extends RichSourceFunction[(String,String)] {
private var conn:Connection=_
private var table:Table=_
private var scan:Scan=_
private var flag=false
//open
override def open(parameters: Configuration): Unit = {
conn = new ConnHBase().connToHabse
table = conn.getTable(TableName.valueOf("dim_lagou_area"))
scan = new Scan()
scan.addFamily("f1".getBytes())
}
override def close(): Unit = {
if(table!=null){
table.close()
}
if(conn!=null){
conn.close()
}
}
override def run(sourceContext: SourceFunction.SourceContext[(String,String)]): Unit = {
if(!flag){
val rs: ResultScanner = table.getScanner(scan)
val iterator: util.Iterator[Result] = rs.iterator()
while(iterator.hasNext){
val result: Result = iterator.next()
val rowKey: String= Bytes.toString(result.getRow)
val buffer = new StringBuffer()
for(cell:Cell<-result.listCells().asScala){
val value: String = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
buffer.append(value).append(",")
}
val valueString: String = buffer.replace(buffer.length() - 1, buffer.length(), "").toString
sourceContext.collect(rowKey,valueString)
}
}
}
override def cancel(): Unit = {
flag =true
}
}
package dws
import java.util
import myUtils.ConnHBase
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.hadoop.hbase.{Cell, TableName}
import org.apache.hadoop.hbase.client.{Connection, Result, ResultScanner, Scan, Table}
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.JavaConverters._
class ReadTradeOrder extends RichSourceFunction[(String,String)]{
private var connection:Connection = _
private var htable:Table =_
private var scan:Scan=_
override def open(parameters: Configuration): Unit = {
connection = new ConnHBase().connToHabse
htable = connection.getTable(TableName.valueOf("lagou_trade_orders"))
scan = new Scan()
scan.addFamily("f1".getBytes())
}
override def close(): Unit = {
if(htable!=null){
htable.close()
}
if(connection!=null){
connection.close()
}
}
override def run(sourceContext: SourceFunction.SourceContext[(String,String)]): Unit = {
val rs: ResultScanner = htable.getScanner(scan)
val iterator: util.Iterator[Result] = rs.iterator()
while(iterator.hasNext){
val result: Result = iterator.next()
val rowKey: String = Bytes.toString(result.getRow)
val buffer = new StringBuffer()
for(cell:Cell<-result.listCells().asScala){
val value: String = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
buffer.append(value).append(",")
}
val valueString: String = buffer.replace(buffer.length() - 1, buffer.length(), "").toString
sourceContext.collect(rowKey,valueString)
}
}
override def cancel(): Unit = {
}
}
- flink进行处理处理
package dws
import dim.DimArea
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
// 需求1:全量查询 查询城市、省份、订单总额、订单总数----全量查询
object TotalCityOrder {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(5000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//获取两个数据源
val dimAreaStream: DataStream[(String, String)] = env.addSource(new ReadDimArea)
val tradeOrdertream: DataStream[(String, String)] = env.addSource(new ReadTradeOrder)
val areaStream: DataStream[DimArea] = dimAreaStream.map(x => {
val areaId: Int = x._1.toInt
val datas: Array[String] = x._2.split(",")
val aname: String = datas(0).trim.toString
val cid: Int = datas(1).trim.toInt
val city: String = datas(2).trim.toString
val proid: Int = datas(3).trim.toInt
val province: String = datas(4).trim.toString
DimArea(areaId, aname, cid, city, proid, province)
})
//(23,
//
// 110108,
// 2020-06-28 19:02:18,
// 1,
// 0,
// 0,
// 2020-10-22 06:16:26,
// 23a0b124546,
// 4,
// 2020-06-28 19:02:18,
// 0.12,
// 2,
// 5391.0,
// 0,
// 0,
// 32)
val orderStream: DataStream[TradeOrders] = tradeOrdertream.map(x => {
val orderid: Int = x._1.toInt
val datas: Array[String] = x._2.split(",")
// datas.foreach(println)
val orderNo: String = datas(6).trim.toString
val userId: Int = datas(14).trim.toInt
val status: Int = datas(10).toInt
val totalMoney: Double = datas(11).toDouble
val areaId: Int = datas(0).toInt
// println(orderid, orderNo, userId, status, totalMoney, areaId)
TradeOrders(orderid, orderNo, userId, status, totalMoney, areaId)
})
// tradeOrdertream.print()
// dimAreaStream.print()
// orderStream.print()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tableEnv.createTemporaryView("dim_lagou_area",areaStream)
tableEnv.createTemporaryView("lagou_trade_orders",orderStream)
val sql =
"""
|SELECT
|areas.city AS city,
|areas.province AS province,
|COUNT(orders.orderId) AS totalNumber,
|SUM(orders.totalMoney) AS totalMoney FROM
|lagou_trade_orders orders
|INNER JOIN dim_lagou_area areas ON orders.areaId = areas.areaId
|GROUP BY areas.city,areas.province
|""".stripMargin
val resultTable: Table = tableEnv.sqlQuery(sql)
val result: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](resultTable)
println("------------------------")
result.filter(x=>x._1==true).print()
env.execute()
}
}
- 打印结果展示
上一篇: Kafka中利用自定义序列化器收发消息
下一篇: NameNode节点的增添或删减