Flink读Hbase
程序员文章站
2022-07-14 13:19:15
...
package tmp
import java.util
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.hadoop.conf
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
import scala.collection.JavaConverters._
object Lucas {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val data: DataStream[(String, String)] = env.addSource(new RichSourceFunction[(String, String)] {
var scan: Scan = _
var con: Connection = _
var table: Table = _
override def open(parameters: Configuration): Unit = {
val config: conf.Configuration = HBaseConfiguration.create()
config.set(HConstants.ZOOKEEPER_QUORUM, "linux121,linux123")
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
con = ConnectionFactory.createConnection(config)
scan = new Scan()
table = con.getTable(TableName.valueOf("dim_lagou_area"))
}
override def run(ctx: SourceFunction.SourceContext[(String, String)]): Unit = {
val rs: ResultScanner = table.getScanner(scan)
val iterator: util.Iterator[Result] = rs.iterator()
while (iterator.hasNext) {
val result: Result = iterator.next()
val row: Array[Byte] = result.getRow
val rowStr: String = Bytes.toString(row)
val sb = new StringBuffer()
val cells: util.List[Cell] = result.listCells()
for(cell:Cell <- cells.asScala){
val array: Array[Byte] = cell.getValueArray
val offset: Int = cell.getValueOffset
val length: Int = cell.getValueLength
val str: String = Bytes.toString(array, offset, length)
sb.append(str).append(",")
}
val valueString: String = sb.replace(sb.length() - 1, sb.length(), "").toString
ctx.collect((rowStr, valueString))
}
}
override def cancel(): Unit = {
con.close()
}
})
data.print()
env.execute()
}
}
下一篇: 《中级》Flink 读写hbase