spark读取hbase数据 博客分类: hbasespark hbase
程序员文章站
2024-03-22 22:39:22
...
package ceshi import java.io.IOException import java.util import java.util.{Random, UUID} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.spark.{SparkConf, SparkContext} /** * Created by hyt on 5/24/17. */ object HbaseTestInsert { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() sparkConf.setMaster("local[1]").setAppName("HbaseTestInsert") val sc = new SparkContext(sparkConf) sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, "spark") conf.set("hbase.zookeeper.quorum", "172.16.0.226:2181,172.16.0.223:2181;172.16.0.224:2181") conf.set("hbase.zookeeper.property.clientPort","2181") val scan:Scan = new Scan() scan.addColumn(Bytes.toBytes("s"),Bytes.toBytes("v_ip")) val startRow = "1495607836079" val endRow = "1495607836920" scan.setStartRow(Bytes.toBytes(startRow)) scan.setStopRow(Bytes.toBytes(endRow)) conf.set(TableInputFormat.SCAN,convertScanToString(scan)) //读取数据并转化成rdd val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val count = hBaseRDD.count() println(count) // hBaseRDD.foreach{case (_,result) =>{ // //获取行键 // val key = Bytes.toString(result.getRow) // //通过列族和列名获取列 // val v_ip = Bytes.toString(result.getValue("s".getBytes,"v_ip".getBytes)) // println("Row key:"+key+" ip:"+v_ip) // }} hBaseRDD.map(x=>x._2).map{ x=> (Bytes.toString(x.getValue("s".getBytes,"v_ip".getBytes)),1) }.reduceByKey(_+_).sortBy(x=>x._2,false).foreach(println) } def convertScanToString(scan: Scan) = { val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } def readDataFromHbase(): Unit ={ val scanner: ResultScanner = scanDataByStartAndStopRow("spark", "1495607836079", "1495607836920") val iterator = scanner.iterator() while(iterator.hasNext()) { val result: Result = iterator.next() println(new String(result.getValue(Bytes.toBytes("s"), Bytes.toBytes("v_ip")))) println(new String(result.getRow)) } } def scanDataByStartAndStopRow(tableName:String,startRow:String,endRow:String):ResultScanner = { val conf:Configuration = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "172.16.0.226:2181,172.16.0.223:2181;172.16.0.224:2181") conf.set("hbase.zookeeper.property.clientPort","2181") var rs : ResultScanner = null try { val table : HTable = new HTable(conf,tableName) val scan:Scan = new Scan() scan.addColumn(Bytes.toBytes("s"),Bytes.toBytes("v_ip")) scan.setStartRow(Bytes.toBytes(startRow)) scan.setStopRow(Bytes.toBytes(endRow)) rs = table.getScanner(scan) } catch { case e: IOException => e.printStackTrace() } rs } def putHbaseData(): Unit ={ val conf:Configuration = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "172.16.0.226:2181,172.16.0.223:2181;172.16.0.224:2181") conf.set("hbase.zookeeper.property.clientPort","2181") val table:HTable = new HTable(conf,"spark") table.setAutoFlush(false) val list = new util.ArrayList[Put]() for(i <- 0 to 100000){ val v_ip = getRandomIp() val v_time = System.currentTimeMillis().toString val rowKey = System.currentTimeMillis() + UUID.randomUUID().toString val put:Put = new Put(Bytes.toBytes(rowKey)) put.add(Bytes.toBytes("s"), Bytes.toBytes("v_time"), Bytes.toBytes(v_time)) put.add(Bytes.toBytes("s"), Bytes.toBytes("v_ip"), Bytes.toBytes(v_ip)) list.add(put) println(i) if(i%100 == 0){ table.put(list) table.flushCommits() list.clear() } } def getRandomIp():String = { val random_int = new Random() "192"+"."+"168"+"."+random_int.nextInt(255)+"."+random_int.nextInt(255) } } }
下一篇: Kubernetes练习---控制器
推荐阅读
-
向HBase中插入数据 博客分类: hbase HBaseHadoopApacheUP
-
spark读取hbase数据 博客分类: hbasespark hbase
-
导出HBase数据到Excel(Java代码) 博客分类: 大数据 hbaseexcel
-
浅谈Hbase 的强一致性 博客分类: hbase大数据
-
hbase MemStore避免内存碎片 博客分类: hbase大数据
-
hbase 0.96和hadoop 2.2 java 客户端的jar依赖 博客分类: hbase大数据 hbasehadoop
-
Tinkerpop + JanusGraph + Hbase简单实现 博客分类: TinkerpopJanusGraph图数据库GremlinHbase TinkerpopJanusGraph图数据库GremlinHbase
-
Linux搭建Hbase开发环境 博客分类: 大数据技术栈 大数据Hbase伪分布
-
Linux搭建Hbase开发环境 博客分类: 大数据技术栈 大数据Hbase伪分布
-
分布式数据库 HBase 博客分类: HBasehadoop