欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

实时数仓项目(二)DIM层地域维表建立

程序员文章站 2022-07-14 21:50:47
...

DIM层地域维表建立

一、思路分析

  1. mysql中lagou_area导入到hbase的lagou_area中
  2. 构建数据源,从Hbase中读取lagou_area内容
  3. 使用flink进行算子转化,得到flinktable
  4. 编写sql
  5. 将table转回流数据
  6. 自定义下沉器,将数据写入到hbase

二、编码实现

  1. 建立样例类
package dim

case class DimArea(
                  areaId:Int,
                  aname:String,
                  cid:Int,
                  city:String,
                  proId:Int,
                  province:String
                  )

case class AreaDetail(id:Int,name:String,pid:Int)

  1. 自定义HbaseReader读取lagou_area数据
package dim

import java.util

import myUtils.ConnHBase
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.hadoop.hbase.{Cell, TableName}
import org.apache.hadoop.hbase.client.{Connection, Result, ResultScanner, Scan, Table}
import org.apache.hadoop.hbase.util.Bytes

import scala.collection.JavaConverters._

//?会增量读取HBASE中的数据吗?
class HBaseReader extends RichSourceFunction[(String,String)]{

  private var conn:Connection =_
  private var table:Table =_
  private var scan:Scan=_


  override def run(sourceContext: SourceFunction.SourceContext[(String, String)]): Unit = {
    val rs: ResultScanner = table.getScanner(scan)
    val iterator: util.Iterator[Result] = rs.iterator()

    while(iterator.hasNext){
      val result: Result = iterator.next()
      val rowKey: String = Bytes.toString(result.getRow)
      val buffer = new StringBuffer()
      for(cell:Cell<-result.listCells().asScala){
        val value: String = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
        buffer.append(value).append("-")

      }

      val valueString: String = buffer.replace(buffer.length() - 1, buffer.length(), "").toString
      sourceContext.collect((rowKey,valueString))
    }
  }

  override def cancel(): Unit = {

  }

  override def open(parameters: Configuration): Unit = {

    conn = new ConnHBase().connToHabse
    table = conn.getTable(TableName.valueOf("lagou_area"))
    scan = new Scan()
    scan.addFamily("f1".getBytes())

  }

  override def close(): Unit = {
    try {
      if(table!=null){
        table.close()
      }
      if(conn!=null){
        conn.close()
      }

    }catch {
      case e:Exception=>println(e.getMessage)
    }

  }
}

  1. 自定义HbaseWriter写入dim_lagou_area数据
package dim

import java.util

import myUtils.ConnHBase
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.hadoop.hbase.{Cell, TableName}
import org.apache.hadoop.hbase.client.{Connection, Put, Result, ResultScanner, Scan, Table}
import org.apache.hadoop.hbase.util.Bytes

import scala.collection.JavaConverters._

class HBaseWriter extends RichSinkFunction[DimArea] {

  var connection:Connection = _
  var hTable:Table =_
  override def open(parameters: Configuration): Unit = {
    connection = new ConnHBase().connToHabse
    hTable = connection.getTable(TableName.valueOf("dim_lagou_area"))
  }

  override def close(): Unit = {
    if(hTable != null) {
      hTable.close()
    }
    if (connection != null) {
      connection.close()
    }
  }

  override def invoke(value: DimArea, context: SinkFunction.Context[_]): Unit = {
    insertDimArea(hTable,value)
  }



  def insertDimArea(table: Table, value: DimArea): Unit ={
//    val infos: Array[String] = value.split(",")
//    val areaId: String = infos(0).trim.toString
//    val aname: String = infos(1).trim.toString
//    val cid: String = infos(2).trim.toString
//    val city: String = infos(3).trim.toString
//    val proId: String = infos(4).trim.toString
//    val province: String = infos(5).trim.toString

    val areaId: String = value.areaId.toString
    val aname: String = value.aname.toString
    val cid: String = value.cid.toString
    val city: String = value.city.toString
    val proId: String = value.proId.toString
    val province: String = value.province.toString

    val put = new Put(areaId.getBytes())
    put.addColumn("f1".getBytes(),"aname".getBytes(),aname.getBytes())
    put.addColumn("f1".getBytes(),"cid".getBytes(),cid.getBytes())
    put.addColumn("f1".getBytes(),"city".getBytes(),city.getBytes())
    put.addColumn("f1".getBytes(),"proId".getBytes(),proId.getBytes())
    put.addColumn("f1".getBytes(),"province".getBytes(),province.getBytes())
    hTable.put(put)

  }
}

  1. 使用flink flinktable 处理数据
    思路:
    1.初始化数据源,拿到flinkStreamExcutionEnvironment
    2.获取流数据并进行解析
    3.根据流数据创建临时表
    4.编写转换sql
    5.将flink table转换成流数据
    6.利用自定义下沉器写入hbase表中
package dim

import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
object AreaDetailInfo {


  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(5000)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val data: DataStream[(String, String)] = env.addSource(new HBaseReader)

    //data.print()

    val dataStream: DataStream[AreaDetail] = data.map(x => {
      val id: Int = x._1.toInt
      val datas: Array[String] = x._2.split("-")
      val name: String = datas(5).trim
      val pid: Int = datas(6).trim.toInt
      AreaDetail(id, name, pid)
    })

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    tableEnv.createTemporaryView("lagou_area",dataStream)


    val sql: String =
      """
        |select a.id as areaId,a.name as aname,a.pid as cid,b.name as city,c.id as proid,c.name as province
        |from lagou_area as a
        |inner join lagou_area as b on a.pid = b.id
        |inner join lagou_area as c on b.pid = c.id
        |""".stripMargin

    val areaTable: Table = tableEnv.sqlQuery(sql)


//    val resultStream: DataStream[String] = tableEnv.toRetractStream[DimArea](areaTable).map(x => {
//      val row: DimArea = x._2
//      row.areaId + "," + row.aname + "," + row.cid + "," + row.city + "," + row.proId + "," + row.province
//    })
    //x_1是true
    val resultStream: DataStream[DimArea] = tableEnv.toRetractStream[DimArea](areaTable).map(x => {
      val row: DimArea = x._2
      row
    })

    resultStream.print()

    resultStream.addSink(new HBaseWriter)
    env.execute("dimareabuild")



  }

}

上一篇: jQuery 动画

下一篇: jQuery——动画