spark (4)spark-shell 读写hdfs 读写redis 读写hbase
(1)初学者对于spark的几个疑问 | http://aperise.iteye.com/blog/2302481 |
(2)spark开发环境搭建 | http://aperise.iteye.com/blog/2302535 |
(3)Spark Standalone集群安装介绍 | http://aperise.iteye.com/blog/2305905 |
(4)spark-shell 读写hdfs 读写redis 读写hbase | http://aperise.iteye.com/blog/2324253 |
spark-shell 读写hdfs 读写hbase 读写redis
1.进入spark-shell环境
bin/spark-shell --master spark://hadoop31:7077,hadoop33:7077 --executor-memory 4G
2.spark-shell读写hdfs
2.1 读取HDFS上文件
spark-shell读取位于hadoop ha集群下的目录/data/2*/2016*/*,目录采用模糊匹配方式
sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")
2.2 RDD处理结果写HDFS
spark-shell存储数据到hadoop的HDFS上,下面这种方式在/hdfsfile下会存在多个结果文件,形如:part-00001.snappy、part-00002.snappy......
sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*") .map(line=>(line.split(",")(0),line.split(",")(1),line.split(",")(2),line.split(",")(3))) .saveAsTextFile("hdfs://hadoop-ha-cluster/hdfsfile")
2.3 RDD处理结果只写一个文件到HDFS
有时为了汇聚结果到一个文件,可以在存储文件之前增加repartition操作,这样在/hdfsfile下面只会产生一个结果文件part-00000.snappy文件
sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*") .map(line=>(line.split(",")(0),line.split(",")(1),line.split(",")(2),line.split(",")(3))) .repartition(1) .saveAsTextFile("hdfs://hadoop-ha-cluster/hdfsfile")
3.spark-shell读写hbase
3.1 spark加载hbase jar
修改spark配置文件,使得spark知道哪里加载hbase相关jar包,修改配置文件spark-env.sh,添加如下内容:
export SPARK_CLASSPATH=/home/hadoop/hbase-1.2.1/lib/*
#加载jedis相关jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/jedis-2.9.0.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/commons-pool2-2.4.2.jar
3.2 hbase中新增数据表
首先在hbase中创建一个数据表
./hbase shell
disable 'hbase_test'
drop 'hbase_test'
create 'hbase_test', {NAME => 'comumnfamily', TTL=>'604800', COMPRESSION => 'SNAPPY'}, SPLITS => ['2','4','6','8']
quit
3.3 spark-shell往hbase里面写入spark处理的结果
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.Put sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*") .map(line=>(line.split(",")(0),(line.split(",")(1),line.split(",")(2).toLong,line.split(",")(3).toLong))) .reduceByKey((tuple1,tuple2)=>(tuple1._1+tuple2._1,tuple1._2+tuple2._2)) .foreachPartition{ iterators=>{ var tmpConf=HBaseConfiguration.create() tmpConf.set("hbase.zookeeper.quorum","hadoop31,hadoop32,hadoop33,hadoop34,hadoop35") tmpConf.set("hbase.zookeeper.property.clientPort","2181") var table=new HTable(tmpConf,"hbase_test") table.setWriteBufferSize(5*1024*1024) var putList=new java.util.ArrayList[Put] iterators.foreach{tupple=>{ var fixednumber=("0"*(25-tupple._1.length)+tupple._1).reverse var rowkey=fixednumber.getBytes() var p=new Put(rowkey) p.add("comumnfamily".getBytes,"column1".getBytes,tupple._2._1.toString.getBytes) p.add("comumnfamily".getBytes,"column2".getBytes,tupple._2._2.toString.getBytes) putList.add(p) if(putList.size()>0&&putList.size()%1000==0){ table.put(putList) putList.clear() }}} table.put(putList) } }
上面小的技巧首先是使用了foreachPartition,使用该操作后,对于每一个parttion,hbase的数据库链接只需建立一个,该parttion内无需频繁创建hbase链接,不用担心序列化相关问题;
第二是hbase使用批量提交,每次提交1000条记录,提高写入速度。
3.4 spark-shell读取hbase中的数据,写成hdfs文件
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.HConstants val tmpConf = HBaseConfiguration.create() tmpConf.set("hbase.zookeeper.quorum", "hadoop31,hadoop32,hadoop33,hadoop34,hadoop35") tmpConf.set("hbase.zookeeper.property.clientPort", "2181") tmpConf.set(TableInputFormat.INPUT_TABLE, "hbase_test") tmpConf.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, "120000"); val hBaseRDD = sc.newAPIHadoopRDD(tmpConf, classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result]) val lineRdd=hBaseRDD.map(r=> (if(r._2.getFamilyMap("comumnfamily".getBytes).keySet.contains("column1".getBytes)){new String(r._2.getValue("data".getBytes,"log_date".getBytes))}else{"0"})+","+ (if(r._2.getFamilyMap("comumnfamily".getBytes).keySet.contains("column2".getBytes)){new String(r._2.getValue("data".getBytes,"area_code".getBytes))}else{"0"}) ) lineRdd.repartition(1).saveAsTextFile("hdfs://hadoop-ha-cluster/hbase2hdfs")
4.spark-shell读写redis
4.1 spark加载redis jar
配置spark,使得spark知道哪里加载jedis的jar,修改配置文件spark-env.sh,添加如下内容
export SPARK_CLASSPATH=/home/hadoop/hbase-1.2.1/lib/*
#加载jedis相关jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/jedis-2.9.0.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/commons-pool2-2.4.2.jar
4.2 spark-shell通过jedis API写数据到redis-cluster
import redis.clients.jedis.HostAndPort import redis.clients.jedis.JedisCluster sc.textFile("hdfs://hadoop-ha-cluster/auto_data.txt") .repartition(10) .map(line => (line.split(",")(0), line.split(",")(1), line.split(",")(2))) .foreachPartition { iterators => { val jedisClusterNodes = new java.util.HashSet[HostAndPort] val serverList = new java.util.ArrayList[HostAndPort] serverList.add(new HostAndPort("192.168.173.21", 6379)) serverList.add(new HostAndPort("192.168.173.22", 6380)) serverList.add(new HostAndPort("192.168.173.23", 6381)) serverList.add(new HostAndPort("192.168.173.24", 6379)) serverList.add(new HostAndPort("192.168.173.25", 6380)) serverList.add(new HostAndPort("192.168.173.26", 6381)) jedisClusterNodes.addAll(serverList) val jc = new JedisCluster(jedisClusterNodes) iterators.foreach { t => { var key = "auto," + t._1 + "," + t._2 var value = t._3 jc.set(key, value) } } } }
此种方式中JedisCluster不需要序列化,因为使用JedisCluster的地方不在RDD里面,RDD已经通过collect汇聚结果到了当前节点
4.3 spark-shell在RDD操作过程中通过jedis API使用redis-cluster中数据
import java.text.SimpleDateFormat import java.util.{Calendar, Date} import redis.clients.jedis.HostAndPort import redis.clients.jedis.JedisCluster import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.Put object JedisClusterObject extends Serializable { var jedisClusterNodes=new java.util.HashSet[HostAndPort] var serverList=new java.util.ArrayList[HostAndPort] serverList.add(new HostAndPort("192.168.173.21",6379)) serverList.add(new HostAndPort("192.168.173.22",6380)) serverList.add(new HostAndPort("192.168.173.23",6381)) serverList.add(new HostAndPort("192.168.173.24",6379)) serverList.add(new HostAndPort("192.168.173.25",6380)) serverList.add(new HostAndPort("192.168.173.26",6381)) jedisClusterNodes.addAll(serverList) val jc:JedisCluster = new JedisCluster(jedisClusterNodes) } sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*") .map(line=>line.split(",")(0)) .map(line=>( line, if(JedisClusterObject.jc.exists("auto,"+line)){JedisClusterObject.jc.get("auto,"+line)}else{line}, if(JedisClusterObject.jc.exists("auto,"+line)){JedisClusterObject.jc.get("auto,"+line)}else{line} )) .saveAsTextFile("hdfs://hadoop-ha-cluster/jedistest")
此种方式JedisCluster必须通过单例并序列化,因为JedisCluster实在RDD中使用,会被序列化后在各个节点计算中使用,否则会提示Task not Serialized :JedisCluster
5 本地开发工具链接spark
5.1 本地IDEA工具中如何链接HADOOP HA环境
hadoop安装的是采用HA的方式,现在本地开发环境开发spark时候,无法解析hadoop-ha方式下的cluster名称,原因是本地程序不知道加载的cluster ha对应的namenode名称和IP,解决办法是通过sparkconf追加参数,让spark 本地local模式知道hadoop ha配置,如下:
val spark = SparkSession .builder() .master("local[2]") .appName("HtSecApp UserEvent Processor") .getOrCreate() val sc = spark.sparkContext val hadoopConf = sc.hadoopConfiguration hadoopConf.set("dfs.nameservices", "mycluster") hadoopConf.set("dfs.client.failover.proxy.provider.mycluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider") hadoopConf.set("dfs.ha.namenodes.mycluster", "nn1,nn2") hadoopConf.set("dfs.namenode.rpc-address.mycluster.nn1", "192.168.77.38:9000") hadoopConf.set("dfs.namenode.rpc-address.mycluster.nn2", "192.168.77.39:9000")
能够避免以下错误: