第 13 节 DataStream之partition(java)
程序员文章站
2022-06-16 16:42:14
...
上篇:第 12 节 DataStream之算子操作-java
1、DataStream API之partition
-
Random partitioning:随机分区
dataStream.shuffle() -
Rebalancing:对数据集进行再平衡,重分区,消除数据倾斜
dataStream.rebalance() -
Rescaling:解释见备注
dataStream.rescale() -
Custom partitioning:自定义分区
自定义分区需要实现Partitioner接口
dataStream.partitionCustom(partitioner, “someKey”)
或者dataStream.partitionCustom(partitioner, 0); - Broadcasting:在后面单独详解
2、Rescaling解释:
举个例子:
如果上游操作有2个并发,而下游操作有4个并发,那么上游的一个并发结果分配给下游的两个并发操作,另外的一个并发结果分配给了下游的另外两个并发操作.另一方面,下游有两个并发操作而上游又4个并发操作,那么上游的其中两个操作的结果分配给下游的一个并发操作而另外两个并发操作的结果则分配给另外一个并发操作。
Rescaling与Rebalancing的区别:
Rebalancing会产生全量重分区,而Rescaling不会。
3、代码操作
MyPartition.java
package xuwei.custormPartition;
import org.apache.flink.api.common.functions.Partitioner;
public class MyPartition implements Partitioner<Long> {
@Override
public int partition(Long key, int numPartition) {
System.out.println("分区总数:"+numPartition);
if(key % 2 ==0){
return 0;
}else{
return 1;
}
}
}
StreamingDemoMyPartition.java
package xuwei.custormPartition;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import xuwei.custormSource.MyNoParalleSource;
/**
* 使用自定义分区
* 根据数据的奇偶性来分区
*/
public class StreamingDemoMyPartition {
public static void main(String[] args) throws Exception{
//获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource());
//对数据进行转换,把long类型转成tuple1类型
DataStream<Tuple1<Long>> tupData = text.map(new MapFunction<Long, Tuple1<Long>>() {
@Override
public Tuple1<Long> map(Long value) throws Exception {
return new Tuple1<>(value);
}
});
//分区之后的数据
DataStream<Tuple1<Long>> partitionData = tupData.partitionCustom(new MyPartition(), 0);
DataStream<Long> result = partitionData.map(new MapFunction<Tuple1<Long>, Long>() {
@Override
public Long map(Tuple1<Long> value) throws Exception {
System.out.println("当前线程id" + Thread.currentThread().getId() + ",value" + value);
return value.getField(0);
}
});
result.print().setParallelism(1);
env.execute("StreamingDemoMyPartition");
}
}
控制台打印信息,不断打印下去:
当然,我们可以自定义分区
//获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2); //设置2个分区