Flink DataSet API之partition
程序员文章站
2022-03-08 09:09:21
...
基本介绍
Rebalance:对数据集进行再平衡,重分区,消除数据倾斜
Hash-Partition:根据指定key的哈希值对数据集进行分区,某一key集中时还是会出现数据倾斜 (partitionByHash())
Range-Partition:根据指定的key对数据集进行范围分区 (.partitionByRange())
Custom Partitioning:类似于DStream partition分区规则(自定义分区需要实现Partitioner接口,partitionCustom(partitioner, "someKey") 或者 partitionCustom(partitioner, 0))
Hash-Partition、Range-Partition 使用demo
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
object BatchDemoHashRangePartitionScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data1 = ListBuffer[Tuple2[Int,String]]()
data1.append((1,"hello1"))
data1.append((2,"hello2"))
data1.append((2,"hello3"))
data1.append((3,"hello4"))
data1.append((3,"hello5"))
data1.append((3,"hello6"))
data1.append((4,"hello7"))
data1.append((4,"hello8"))
data1.append((4,"hello9"))
data1.append((4,"hello10"))
data1.append((5,"hello11"))
data1.append((5,"hello12"))
data1.append((5,"hello13"))
data1.append((5,"hello14"))
data1.append((5,"hello15"))
data1.append((6,"hello16"))
data1.append((6,"hello17"))
data1.append((6,"hello18"))
data1.append((6,"hello19"))
data1.append((6,"hello20"))
data1.append((6,"hello21"))
val text = env.fromCollection(data1)
text.partitionByHash(0).mapPartition(it=>{
while (it.hasNext){
val tu = it.next()
println("当前线程id:"+Thread.currentThread().getId+","+tu)
}
it
}).print()
println("=====================================")
text.partitionByRange(0).mapPartition(it=>{
while (it.hasNext){
val tu = it.next()
println("当前线程id:"+Thread.currentThread().getId+","+tu)
}
it
}).print()
}
}
推荐阅读
-
Flink入门(五)——DataSet Api编程指南
-
Flink DataStream API之Operators
-
1.11.Flink DataSetAPI、DataSet API之Data Sources、DataSet API之Transformations、DataSet Sink部分详解
-
Flink DataSet API
-
Flink Dataset Api(七)分布式缓存
-
Flink 批处理之DataSet
-
【09】Flink 之 DataSet API(三):DataSet Sink 操作
-
Flink DataSet API
-
Flink DataSet API - Transformations
-
Flink DataSet API 使用示范