欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark (4)spark-shell 读写hdfs 读写redis 读写hbase

程序员文章站 2022-06-16 15:45:44
...

 

(1)初学者对于spark的几个疑问 http://aperise.iteye.com/blog/2302481
(2)spark开发环境搭建 http://aperise.iteye.com/blog/2302535
(3)Spark Standalone集群安装介绍 http://aperise.iteye.com/blog/2305905
(4)spark-shell 读写hdfs 读写redis 读写hbase http://aperise.iteye.com/blog/2324253

spark-shell 读写hdfs 读写hbase 读写redis

1.进入spark-shell环境

spark使用的是standalone方式,spark通过zookeeper做了HA(Highe Available),spark master在机器hadoop31和hadoop33上面,登录时候指定每个worker在跑spark-shell任务时候使用内存为4GB
cd /home/hadoop/spark-1.6.0-bin-hadoop2.6/
bin/spark-shell --master spark://hadoop31:7077,hadoop33:7077 --executor-memory 4G

 

2.spark-shell读写hdfs

    2.1 读取HDFS上文件

    spark-shell读取位于hadoop ha集群下的目录/data/2*/2016*/*,目录采用模糊匹配方式

 

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")

   

 

    2.2 RDD处理结果写HDFS

    spark-shell存储数据到hadoop的HDFS上,下面这种方式在/hdfsfile下会存在多个结果文件,形如:part-00001.snappy、part-00002.snappy......

 

 

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")
.map(line=>(line.split(",")(0),line.split(",")(1),line.split(",")(2),line.split(",")(3)))
.saveAsTextFile("hdfs://hadoop-ha-cluster/hdfsfile")

     

    2.3 RDD处理结果只写一个文件到HDFS

    有时为了汇聚结果到一个文件,可以在存储文件之前增加repartition操作,这样在/hdfsfile下面只会产生一个结果文件part-00000.snappy文件

 

 

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")
.map(line=>(line.split(",")(0),line.split(",")(1),line.split(",")(2),line.split(",")(3)))
.repartition(1)
.saveAsTextFile("hdfs://hadoop-ha-cluster/hdfsfile")

 

 

3.spark-shell读写hbase

    3.1 spark加载hbase jar

    修改spark配置文件,使得spark知道哪里加载hbase相关jar包,修改配置文件spark-env.sh,添加如下内容:

#加载hbase相关jar
export SPARK_CLASSPATH=/home/hadoop/hbase-1.2.1/lib/*

#加载jedis相关jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/jedis-2.9.0.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/commons-pool2-2.4.2.jar

 

    3.2 hbase中新增数据表

    首先在hbase中创建一个数据表

cd /home/hadoop/hbase-1.2.1/bin
./hbase shell
disable 'hbase_test'
drop 'hbase_test'
create 'hbase_test', {NAME => 'comumnfamily', TTL=>'604800', COMPRESSION => 'SNAPPY'}, SPLITS => ['2','4','6','8']
quit

 

    3.3 spark-shell往hbase里面写入spark处理的结果

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")
.map(line=>(line.split(",")(0),(line.split(",")(1),line.split(",")(2).toLong,line.split(",")(3).toLong)))
.reduceByKey((tuple1,tuple2)=>(tuple1._1+tuple2._1,tuple1._2+tuple2._2))
.foreachPartition{
    iterators=>{
    var tmpConf=HBaseConfiguration.create()
    tmpConf.set("hbase.zookeeper.quorum","hadoop31,hadoop32,hadoop33,hadoop34,hadoop35")
    tmpConf.set("hbase.zookeeper.property.clientPort","2181")
    var table=new HTable(tmpConf,"hbase_test")
    table.setWriteBufferSize(5*1024*1024)
    var putList=new java.util.ArrayList[Put]
    iterators.foreach{tupple=>{
        var fixednumber=("0"*(25-tupple._1.length)+tupple._1).reverse
        var rowkey=fixednumber.getBytes()
        var p=new Put(rowkey)
        p.add("comumnfamily".getBytes,"column1".getBytes,tupple._2._1.toString.getBytes)
        p.add("comumnfamily".getBytes,"column2".getBytes,tupple._2._2.toString.getBytes)
        
        putList.add(p)
        if(putList.size()>0&&putList.size()%1000==0){
            table.put(putList)
            putList.clear()
        }}}
    table.put(putList)
    }
}

     上面小的技巧首先是使用了foreachPartition,使用该操作后,对于每一个parttion,hbase的数据库链接只需建立一个,该parttion内无需频繁创建hbase链接,不用担心序列化相关问题

    第二是hbase使用批量提交,每次提交1000条记录,提高写入速度

 

    3.4 spark-shell读取hbase中的数据,写成hdfs文件

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.HConstants

val tmpConf = HBaseConfiguration.create()
tmpConf.set("hbase.zookeeper.quorum", "hadoop31,hadoop32,hadoop33,hadoop34,hadoop35")
tmpConf.set("hbase.zookeeper.property.clientPort", "2181")
tmpConf.set(TableInputFormat.INPUT_TABLE, "hbase_test")
tmpConf.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, "120000");   
val hBaseRDD = sc.newAPIHadoopRDD(tmpConf, classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
val lineRdd=hBaseRDD.map(r=>
    (if(r._2.getFamilyMap("comumnfamily".getBytes).keySet.contains("column1".getBytes)){new String(r._2.getValue("data".getBytes,"log_date".getBytes))}else{"0"})+","+
    (if(r._2.getFamilyMap("comumnfamily".getBytes).keySet.contains("column2".getBytes)){new String(r._2.getValue("data".getBytes,"area_code".getBytes))}else{"0"})
)
lineRdd.repartition(1).saveAsTextFile("hdfs://hadoop-ha-cluster/hbase2hdfs")

 

4.spark-shell读写redis

    4.1 spark加载redis jar

    配置spark,使得spark知道哪里加载jedis的jar,修改配置文件spark-env.sh,添加如下内容

#加载hbase相关jar
export SPARK_CLASSPATH=/home/hadoop/hbase-1.2.1/lib/*

#加载jedis相关jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/jedis-2.9.0.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/commons-pool2-2.4.2.jar

 

    4.2 spark-shell通过jedis API写数据到redis-cluster

import redis.clients.jedis.HostAndPort
import redis.clients.jedis.JedisCluster

sc.textFile("hdfs://hadoop-ha-cluster/auto_data.txt")
      .repartition(10)
      .map(line => (line.split(",")(0), line.split(",")(1), line.split(",")(2)))
      .foreachPartition { iterators => {
        val jedisClusterNodes = new java.util.HashSet[HostAndPort]
        val serverList = new java.util.ArrayList[HostAndPort]
        serverList.add(new HostAndPort("192.168.173.21", 6379))
        serverList.add(new HostAndPort("192.168.173.22", 6380))
        serverList.add(new HostAndPort("192.168.173.23", 6381))
        serverList.add(new HostAndPort("192.168.173.24", 6379))
        serverList.add(new HostAndPort("192.168.173.25", 6380))
        serverList.add(new HostAndPort("192.168.173.26", 6381))
        jedisClusterNodes.addAll(serverList)
        val jc = new JedisCluster(jedisClusterNodes)
        iterators.foreach { t => {
          var key = "auto," + t._1 + "," + t._2
          var value = t._3
          jc.set(key, value)
        }
        }
      }
      }

     此种方式中JedisCluster不需要序列化,因为使用JedisCluster的地方不在RDD里面,RDD已经通过collect汇聚结果到了当前节点

 

    4.3 spark-shell在RDD操作过程中通过jedis API使用redis-cluster中数据

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import redis.clients.jedis.HostAndPort
import redis.clients.jedis.JedisCluster
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put

object JedisClusterObject extends Serializable {
    var jedisClusterNodes=new java.util.HashSet[HostAndPort]
    var serverList=new java.util.ArrayList[HostAndPort]
    serverList.add(new HostAndPort("192.168.173.21",6379))
    serverList.add(new HostAndPort("192.168.173.22",6380))
    serverList.add(new HostAndPort("192.168.173.23",6381))
    serverList.add(new HostAndPort("192.168.173.24",6379))
    serverList.add(new HostAndPort("192.168.173.25",6380))
    serverList.add(new HostAndPort("192.168.173.26",6381))
    jedisClusterNodes.addAll(serverList)
    val jc:JedisCluster = new JedisCluster(jedisClusterNodes)
}

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")
.map(line=>line.split(",")(0))
.map(line=>(
  line,
  if(JedisClusterObject.jc.exists("auto,"+line)){JedisClusterObject.jc.get("auto,"+line)}else{line},
  if(JedisClusterObject.jc.exists("auto,"+line)){JedisClusterObject.jc.get("auto,"+line)}else{line}
))
.saveAsTextFile("hdfs://hadoop-ha-cluster/jedistest")

     此种方式JedisCluster必须通过单例并序列化,因为JedisCluster实在RDD中使用,会被序列化后在各个节点计算中使用,否则会提示Task not Serialized :JedisCluster

 

5 本地开发工具链接spark

    5.1 本地IDEA工具中如何链接HADOOP HA环境

      hadoop安装的是采用HA的方式,现在本地开发环境开发spark时候,无法解析hadoop-ha方式下的cluster名称,原因是本地程序不知道加载的cluster ha对应的namenode名称和IP,解决办法是通过sparkconf追加参数,让spark 本地local模式知道hadoop ha配置,如下:

  val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName("HtSecApp UserEvent Processor")
    .getOrCreate()

  val sc = spark.sparkContext
  val hadoopConf = sc.hadoopConfiguration

  hadoopConf.set("dfs.nameservices", "mycluster")
  hadoopConf.set("dfs.client.failover.proxy.provider.mycluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
  hadoopConf.set("dfs.ha.namenodes.mycluster", "nn1,nn2")
  hadoopConf.set("dfs.namenode.rpc-address.mycluster.nn1", "192.168.77.38:9000")
  hadoopConf.set("dfs.namenode.rpc-address.mycluster.nn2", "192.168.77.39:9000")

    能够避免以下错误:
spark  (4)spark-shell 读写hdfs 读写redis 读写hbase
            
    
    博客分类: spark sparkspark-shellhdfsredishbase
 

    5.2 本地搭建spark开发环境

    参见博客spark (2)spark开发环境搭建

  • spark  (4)spark-shell 读写hdfs 读写redis 读写hbase
            
    
    博客分类: spark sparkspark-shellhdfsredishbase
  • 大小: 18 KB