欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark读取hbase数据 博客分类: hbasespark hbase 

程序员文章站 2024-03-22 22:39:22
...
package ceshi

import java.io.IOException
import java.util
import java.util.{Random, UUID}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by hyt on 5/24/17.
  */
object HbaseTestInsert {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()
    sparkConf.setMaster("local[1]").setAppName("HbaseTestInsert")
    val sc = new SparkContext(sparkConf)

    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, "spark")
    conf.set("hbase.zookeeper.quorum", "172.16.0.226:2181,172.16.0.223:2181;172.16.0.224:2181")
    conf.set("hbase.zookeeper.property.clientPort","2181")

    val scan:Scan = new Scan()
    scan.addColumn(Bytes.toBytes("s"),Bytes.toBytes("v_ip"))
    val startRow = "1495607836079"
    val endRow = "1495607836920"

    scan.setStartRow(Bytes.toBytes(startRow))
    scan.setStopRow(Bytes.toBytes(endRow))

    conf.set(TableInputFormat.SCAN,convertScanToString(scan))

    //读取数据并转化成rdd
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])


    val count = hBaseRDD.count()

    println(count)

//    hBaseRDD.foreach{case (_,result) =>{
//      //获取行键
//      val key = Bytes.toString(result.getRow)
//      //通过列族和列名获取列
//      val v_ip = Bytes.toString(result.getValue("s".getBytes,"v_ip".getBytes))
//      println("Row key:"+key+" ip:"+v_ip)
//    }}

    hBaseRDD.map(x=>x._2).map{
      x=>
        (Bytes.toString(x.getValue("s".getBytes,"v_ip".getBytes)),1)
    }.reduceByKey(_+_).sortBy(x=>x._2,false).foreach(println)

  }

  def convertScanToString(scan: Scan) = {
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray)
  }

  def readDataFromHbase(): Unit ={
    val scanner: ResultScanner = scanDataByStartAndStopRow("spark", "1495607836079", "1495607836920")
    val iterator = scanner.iterator()
    while(iterator.hasNext()) {
      val result: Result = iterator.next()

      println(new String(result.getValue(Bytes.toBytes("s"), Bytes.toBytes("v_ip"))))

      println(new String(result.getRow))
    }
  }

  def  scanDataByStartAndStopRow(tableName:String,startRow:String,endRow:String):ResultScanner = {

    val conf:Configuration = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "172.16.0.226:2181,172.16.0.223:2181;172.16.0.224:2181")
    conf.set("hbase.zookeeper.property.clientPort","2181")

    var rs : ResultScanner = null
    try {
      val table : HTable  = new HTable(conf,tableName)

      val scan:Scan = new Scan()
      scan.addColumn(Bytes.toBytes("s"),Bytes.toBytes("v_ip"))
      scan.setStartRow(Bytes.toBytes(startRow))
      scan.setStopRow(Bytes.toBytes(endRow))
      rs = table.getScanner(scan)
    } catch {
      case e: IOException => e.printStackTrace()
    }
    rs
  }

    def putHbaseData(): Unit ={

      val conf:Configuration = HBaseConfiguration.create()
      conf.set("hbase.zookeeper.quorum", "172.16.0.226:2181,172.16.0.223:2181;172.16.0.224:2181")
      conf.set("hbase.zookeeper.property.clientPort","2181")

      val table:HTable = new HTable(conf,"spark")
      table.setAutoFlush(false)
      val list = new util.ArrayList[Put]()

      for(i <- 0 to 100000){

        val v_ip = getRandomIp()
        val v_time = System.currentTimeMillis().toString
        val rowKey = System.currentTimeMillis() + UUID.randomUUID().toString


        val put:Put = new Put(Bytes.toBytes(rowKey))
        put.add(Bytes.toBytes("s"), Bytes.toBytes("v_time"), Bytes.toBytes(v_time))
        put.add(Bytes.toBytes("s"), Bytes.toBytes("v_ip"), Bytes.toBytes(v_ip))
        list.add(put)
        println(i)
        if(i%100 == 0){
          table.put(list)
          table.flushCommits()
          list.clear()
        }

      }

    def getRandomIp():String = {
      val random_int  = new Random()
      "192"+"."+"168"+"."+random_int.nextInt(255)+"."+random_int.nextInt(255)
    }
  }





}

 

相关标签: hbase