欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink读Hbase

程序员文章站 2022-07-14 13:19:15
...
package tmp

import java.util
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.hadoop.conf
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
import scala.collection.JavaConverters._


object Lucas {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val data: DataStream[(String, String)] = env.addSource(new RichSourceFunction[(String, String)] {
      var scan: Scan = _
      var con: Connection = _
      var table: Table = _

      override def open(parameters: Configuration): Unit = {
        val config: conf.Configuration = HBaseConfiguration.create()
        config.set(HConstants.ZOOKEEPER_QUORUM, "linux121,linux123")
        config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
        con = ConnectionFactory.createConnection(config)
        scan = new Scan()
        table = con.getTable(TableName.valueOf("dim_lagou_area"))
      }

      override def run(ctx: SourceFunction.SourceContext[(String, String)]): Unit = {

        val rs: ResultScanner = table.getScanner(scan)
        val iterator: util.Iterator[Result] = rs.iterator()
        while (iterator.hasNext) {
          val result: Result = iterator.next()
          val row: Array[Byte] = result.getRow
          val rowStr: String = Bytes.toString(row)
          val sb = new StringBuffer()

          val cells: util.List[Cell] = result.listCells()
          for(cell:Cell <- cells.asScala){
            val array: Array[Byte] = cell.getValueArray
            val offset: Int = cell.getValueOffset
            val length: Int = cell.getValueLength
            val str: String = Bytes.toString(array, offset, length)
            sb.append(str).append(",")
          }
          val valueString: String = sb.replace(sb.length() - 1, sb.length(), "").toString
          ctx.collect((rowStr, valueString))
        }

      }

      override def cancel(): Unit = {
        con.close()
      }
    })
    
    data.print()
    env.execute()

  }


}

相关标签: hadoop