Spark编程模型(下)
程序员文章站
2022-03-24 17:38:08
...
创建Pair RDD
什么是Pair RDD
- 包含键值对类型的RDD类型被称作Pair RDD;
- Pair RDD通常用来进行聚合计算;
- Pair RDD通常由普通RDD做ETL转化而来。
Python:
pairs = lines.map(lambda x: (x.split(" ")[0], x))
Scala:
val pairs = lines.map(x => (x.split(" ")(0), x))
Java:
PairFunction<String, String, String> keyData = new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String x){
return new Tuple(x.split(" ")[0], x);
}
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);
Pair RDD的transformation操作
Pair RDD可以使用所有标准RDD上的转化操作(见博文Spark编程模型(中)),还提供了特有的转换操作。
下面给大家示范一个操作,其它的自行去尝试。
[aaa@qq.com ~]$ cd /home/hadoop/app/spark-2.2.0/bin/
[aaa@qq.com bin]$ ./spark-shell //进入Spark Shell模式
scala> val rdd = sc.parallelize(Array((1,2),(3,4),(3,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
//其中parallelize()方法的作用是从一个集合创建RDD,本例中时从一个Array()创建
scala> rdd.take(3)
res0: Array[(Int, Int)] = Array((1,2), (3,4), (3,6))
scala> rdd.reduceByKey((x,y)=>x+y).take(2) //合并key相同的项
res3: Array[(Int, Int)] = Array((1,2), (3,10))
Pair RDD的action控制
所有的基础RDD支持的行动操作也都在Pair RDD上可用。
Pair RDD的分区控制
Spark 中所有的键值对RDD 都可以进行分区控制—自定义分区
自定义分区的好处:
1)避免数据倾斜
2)控制task并行度
自定义分区方式:
class DomainNamePartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
val domain = new Java.net.URL(key.toString).getHost()
val code = (domain.hashCode % numPartitions)
if(code < 0) {
code + numPartitions // 使其非负
}else{
code
}
}
// 用来让Spark区分分区函数对象的Java equals方法
override def equals(other: Any): Boolean = other match {
case dnp: DomainNamePartitioner =>
dnp.numPartitions == numPartitions
case _ =>
false
}
}
以上就是博主为大家介绍的这一板块的主要内容,这都是博主自己的学习过程,希望能给大家带来一定的指导作用,有用的还望大家点个支持,如果对你没用也望包涵,有错误烦请指出。如有期待可关注博主以第一时间获取更新哦,谢谢!