欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink DataSet API之partition

程序员文章站 2022-03-08 09:09:21
...

基本介绍

Rebalance:对数据集进行再平衡,重分区,消除数据倾斜

Hash-Partition:根据指定key的哈希值对数据集进行分区,某一key集中时还是会出现数据倾斜 (partitionByHash())

Range-Partition:根据指定的key对数据集进行范围分区   (.partitionByRange()) 

Custom Partitioning:类似于DStream partition分区规则(自定义分区需要实现Partitioner接口,partitionCustom(partitioner, "someKey") 或者 partitionCustom(partitioner, 0)) 

Hash-Partition、Range-Partition 使用demo

import org.apache.flink.api.scala.ExecutionEnvironment

import scala.collection.mutable.ListBuffer

object BatchDemoHashRangePartitionScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val data1 = ListBuffer[Tuple2[Int,String]]()
    data1.append((1,"hello1"))
    data1.append((2,"hello2"))
    data1.append((2,"hello3"))
    data1.append((3,"hello4"))
    data1.append((3,"hello5"))
    data1.append((3,"hello6"))
    data1.append((4,"hello7"))
    data1.append((4,"hello8"))
    data1.append((4,"hello9"))
    data1.append((4,"hello10"))
    data1.append((5,"hello11"))
    data1.append((5,"hello12"))
    data1.append((5,"hello13"))
    data1.append((5,"hello14"))
    data1.append((5,"hello15"))
    data1.append((6,"hello16"))
    data1.append((6,"hello17"))
    data1.append((6,"hello18"))
    data1.append((6,"hello19"))
    data1.append((6,"hello20"))
    data1.append((6,"hello21"))
    val text = env.fromCollection(data1)
    text.partitionByHash(0).mapPartition(it=>{
      while (it.hasNext){
        val tu = it.next()
        println("当前线程id:"+Thread.currentThread().getId+","+tu)
      }
      it
    }).print()
    println("=====================================")
    text.partitionByRange(0).mapPartition(it=>{
      while (it.hasNext){
        val tu = it.next()
        println("当前线程id:"+Thread.currentThread().getId+","+tu)
      }
      it
    }).print()
  }
}

 

相关标签: Flink