欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink DataSet API - Transformations

程序员文章站 2022-07-14 13:49:12
...

数据转换

数据转是换将一个或多个DataSet 转换成为一个新的DataSet 。程序可以将多个DataSet 转换组合成复杂的DataSet。

Map

对整个DataSet做一对一映射,即每个元素对应产生一个新元素。

data.map(new MapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});

FlatMap

获取一个元素并生成对应的零个、一个或多个元素。

data.flatMap(new FlatMapFunction<String, String>() {
  public void flatMap(String value, Collector<String> out) {
    for (String s : value.split(" ")) {
      out.collect(s);
    }
  }
});

MapPartition

Map和FlatMap转换的对象是DataSet中的每个元素,而MapPartition转换的对象是DataSet的每个分区。

data.mapPartition(new MapPartitionFunction<String, Long>() {
  public void mapPartition(Iterable<String> values, Collector<Long> out) {
    long c = 0;
    for (String s : values) {
      c++;
    }
    out.collect(c);
  }
})

Filter

过滤出符合条件的元素

data.filter(new FilterFunction<Integer>() {
  public boolean filter(Integer value) { return value > 1000; }
});

SortPartition

根据DataSet的某个属性域进行升序或降序排序。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING)
                            .mapPartition(new PartitionMapper());

Reduce

通过将两个元素反复组合为一个元素,将一组元素组合为一个元素。Reduce可以应用于完整的DataSet或分组的DataSet。
如果将reduce应用于分组DataSet,则可以通过向setCombineHint提供一个CombineHint来指定运行时执行reduce的组合阶段的方式。在大多数情况下,基于散列的策略应该更快,特别是具有很多不同键的时候。

data.reduce(new ReduceFunction<Integer> {
  public Integer reduce(Integer a, Integer b) { return a + b; }
});

Aggregate

将一组值聚合为单个值。聚合函数可以看作是内置的reduce函数。聚合可以应用于完整的DataSet,也可以应用于分组的DataSet。
Aggregate 函数包括求和(SUM)、求最小值(SUM)、求最大值(MAX)。

作用于分组上

根据String分组后,先对第一个Int求和,在对Double求最小值:

public static void main(String[] args) throws Exception {
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple3<Integer, String, Double>> input = env.fromElements(
				Tuple3.of(1, "hello", 4.0),
				Tuple3.of(1, "hello", 5.0),
				Tuple3.of(2, "hello", 5.0),
				Tuple3.of(3, "world", 6.0),
				Tuple3.of(3, "world", 6.0)
				);
		
		DataSet<Tuple3<Integer, String, Double>> output = input.groupBy(1)
				.aggregate(Aggregations.SUM, 0)
				.and(Aggregations.MIN, 2);
		
		output.print();
		
		env.execute("Aggregations Demo");

	}
	
结果:
(6,world,6.0)
(4,hello,4.0)

作用于整个DataSet上

	public static void main(String[] args) throws Exception {
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple3<Integer, String, Double>> input = env.fromElements(
				Tuple3.of(1, "hello", 4.0),
				Tuple3.of(1, "hello", 5.0),
				Tuple3.of(2, "hello", 5.0),
				Tuple3.of(3, "world", 6.0),
				Tuple3.of(3, "world", 6.0)
				);
		
		DataSet<Tuple3<Integer, String, Double>> output = input
				.aggregate(Aggregations.SUM, 0)
				.and(Aggregations.MIN, 2);
		
		output.print();
		
		env.execute("Aggregations Demo");

	}
		
结果:
(10,world,4.0)

minBy/maxBy

从一组元组中选择一个元组,取其中一个或多个字段的值为最小值(最大值)。用于比较的字段必须是有效的关键字段,即具有可比性。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。MinBy (MaxBy)可以应用于完整数据集或分组数据集。

作用于分组上

根据String分组后,先对Int属性求最小值,若对应最小值存在多个元祖,在根据double求最小值。

package com.ztland.flink_demo.dataSet;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;


public class DataSetMinBy {

	public static void main(String[] args) throws Exception {
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple3<Integer, String, Double>> input = env.fromElements(
				Tuple3.of(1, "hello", 5.0),
				Tuple3.of(1, "hello", 4.0),
				Tuple3.of(2, "hello", 5.0),
				Tuple3.of(3, "world", 7.0),
				Tuple3.of(4, "world", 6.0)
				);
		
		DataSet<Tuple3<Integer, String, Double>> output = input.groupBy(1)
				.minBy(0,2);
		
		output.print();
		
		env.execute("minBy Demo");

	}
结果:
(1,hello,4.0)
(3,world,7.0)
}

作用于整个DataSet上

package com.ztland.flink_demo.dataSet;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;


public class DataSetCreateFlow {

	public static void main(String[] args) throws Exception {
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple3<Integer, String, Double>> input = env.fromElements(
				Tuple3.of(1, "hello", 5.0),
				Tuple3.of(1, "hello", 4.0),
				Tuple3.of(2, "hello", 5.0),
				Tuple3.of(3, "world", 7.0),
				Tuple3.of(4, "world", 6.0)
				);
		
		DataSet<Tuple3<Integer, String, Double>> output = input
				.minBy(0,2);
		
		output.print();
		
		env.execute("minBy Demo");

	}

}
结果:
(1,hello,4.0)

GroupReduce

可作用于分组的DataSet或整个的DataSet上,通过迭代器访问DataSet中的所有元素。

package com.ztland.flink_demo.dataSet;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;


public class DataSetReduceGroup {

	public static void main(String[] args) throws Exception {
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Tuple3<Integer, String, Double>> input = env.fromElements(
				Tuple3.of(1, "hello", 5.0),
				Tuple3.of(1, "hello", 4.0),
				Tuple3.of(2, "hello", 5.0),
				Tuple3.of(3, "world", 7.0),
				Tuple3.of(4, "world", 6.0)
				);

		DataSet<Tuple3<Integer, String, Double>> output = input.groupBy(1)
				.reduceGroup(new GroupReduceFunction<Tuple3<Integer,String,Double>, Tuple3<Integer,String,Double>>() {

					/**
					 * 
					 */
					private static final long serialVersionUID = 1L;

					@Override
					public void reduce(Iterable<Tuple3<Integer, String, Double>> in,
							Collector<Tuple3<Integer, String, Double>> out) throws Exception {
						// TODO Auto-generated method stub
						for(Tuple3<Integer, String, Double> tuple : in) {
							out.collect(tuple);
						}
					}

					
				});

				output.print();

				env.execute("reduceGroup Demo");

	}

}

结果:
(3,world,7.0)
(4,world,6.0)
(1,hello,5.0)
(1,hello,4.0)
(2,hello,5.0)

Distinct

Flink支持每个元素去重,也支持根据key的位置去重。

package com.ztland.flink_demo.dataSet;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;


public class DataSetReduceGroup {

	public static void main(String[] args) throws Exception {
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Tuple3<Integer, String, Double>> input = env.fromElements(
				Tuple3.of(1, "hello", 5.0),
				Tuple3.of(1, "hello", 5.0),
				Tuple3.of(2, "hello", 5.0),
				Tuple3.of(3, "world", 7.0),
				Tuple3.of(4, "world", 6.0)
				);

		// 对所有元素去重
		//		DataSet<Tuple3<Integer, String, Double>> output = input.distinct();

		// 对Int和String去重
		DataSet<Tuple3<Integer, String, Double>> output = input.distinct(0,1);

		DataSet<Integer> inputInteger = env.fromElements(1,2,3,4,5,5);

		// 对转换结果去重
		DataSet<Integer> outputInteger = inputInteger.distinct(
				x -> Math.abs(x)
				);

		output.print();

		outputInteger.print();

		env.execute("reduceGroup Demo");

	}

}

Join

连接分为两类:

  • 内连接
  • 外连接
    • 左外连接
    • 右外连接
    • 全外连接

内连接有以下三种形式:
不带连接函数的形式。where和equalTo分别定义被连接的两个数据集的属性位置,输出者两个位置上相等的元素。

package com.ztland.flink_demo.dataSet;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;


public class DataSetJoin {

	public static void main(String[] args) throws Exception {
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(
				Tuple2.of(1, "hello"),
				Tuple2.of(2, "world")
				);

		DataSet<Tuple2<String,Integer>> input2 = env.fromElements(
				Tuple2.of("hello",1),
				Tuple2.of("world",2)
				);
		// input1的第0位,于input2的第1位,值相等则连接,不对结果调用连接函数
		DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<String, Integer>>> output = input1.join(input2)
				.where(0)
				.equalTo(1);
		
		output.print();

		env.execute("join Demo");

	}

}

结果:
((1,hello),(hello,1))
((2,world),(world,2))

带JoinFunction函数的:

笛卡儿积

构建两个DataSet的笛卡儿积。

package com.ztland.flink_demo.dataSet;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;


public class DataSetCross {

	public static void main(String[] args) throws Exception {
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Integer> data1 = env.fromElements(1,2,3);
		DataSet<String> data2 = env.fromElements("one","tow","three");
		DataSet<Tuple2<Integer, String>> result = data1.cross(data2);

		result.print();

		env.execute("cross Demo");

	}

}
结果:
(1,one)
(1,tow)
(1,three)
(2,one)
(2,tow)
(2,three)
(3,one)
(3,tow)
(3,three)

Union

类似RDBMS中的union,DataSet的Union将多个相同类型的数据集拼接在一起。

package com.ztland.flink_demo.dataSet;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;


public class DataSetUnion {

	public static void main(String[] args) throws Exception {
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Tuple2<String, Integer>> data1 = env.fromElements(
				Tuple2.of("hello", 1),
				Tuple2.of("world", 2)
				);
		DataSet<Tuple2<String, Integer>> data2 = env.fromElements(
				Tuple2.of("hello", 1),
				Tuple2.of("world", 2)
				);
		DataSet<Tuple2<String, Integer>> data3 = env.fromElements(
				Tuple2.of("hello", 1),
				Tuple2.of("world", 2)
				);
		
		DataSet<Tuple2<String, Integer>> result = data1.union(data2).union(data3);

		result.print();

		env.execute("union Demo");

	}

}

结果:
(hello,1)
(hello,1)
(hello,1)
(world,2)
(world,2)
(world,2)

数据分区

  • Rebalance模式:根据轮询调度算法,将数据均匀地发送给下一级节点。

     ```
     package com.ztland.flink_demo.dataSet;
     
     import org.apache.flink.api.java.DataSet;
     import org.apache.flink.api.java.ExecutionEnvironment;
     
     
     public class DataSetDemo {
     
     	public static void main(String[] args) throws Exception {
     		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
     
     		DataSet<String> in = env.fromElements(
     				"hello",
     				"world"
     				);
     		
     		DataSet<String> out = in.rebalance();
     
     		out.print();
     
     		env.execute("rebalance Demo");
     
     	}
     
     }
     
     ```
    
    • Hash-Partition模式:根据元祖的某个属性域进行散列分区。
    package com.ztland.flink_demo.dataSet;
    
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    
    
    public class DataSetDemo {
    
    	public static void main(String[] args) throws Exception {
    		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    		DataSet<Tuple2<String, Integer>> in = env.fromElements(
    				Tuple2.of("hello", 1),
    				Tuple2.of("world", 2)
    				);
    		
    		DataSet<Tuple2<String, Integer>> out = in.partitionByHash(0);
    
    		out.print();
    
    		env.execute("partitionByHash Demo");
    
    	}
    
    }
    
    
  • Range-Partition模式:根据某个属性的范围进行分区。

    package com.ztland.flink_demo.dataSet;
    
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    
    
    public class DataSetDemo {
    
    	public static void main(String[] args) throws Exception {
    		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    		DataSet<Tuple2<String, Integer>> in = env.fromElements(
    				Tuple2.of("hello", 1),
    				Tuple2.of("world", 2)
    				);
    		
    		DataSet<Tuple2<String, Integer>> out = in.partitionByRange(0);
    
    		out.print();
    
    		env.execute("partitionByRange Demo");
    
    	}
    
    }
    
    

广播变量

广播变量是算子的多个并行实例间共享数据的一类方法,主要特点如下:

  • 动态数据共享。算子间共享的输入和配置参数是静态的,而广播变量共享的数据是动态的。
  • 可以分发更大规模的对象。应用程序可引用闭包中的变量,这种数据共享对象是小规模的,而广播变量则能分发更大规模的对象。
  • 广播变量以名称广播和访问
    广播变量以集合的方式定义在需要共享的算子上,算子的每一个实例可以通过集合访问共享变量。共享变量会在任务初始化时被发送到并行实例所在的节点上,并存储在TaskManager的内存里,因此尽管用于分发大规模对象,共享变更也不宜过大。

使用方法:

package com.ztland.flink_demo.dataSet;

import java.util.Collection;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;

import scala.collection.Traversable;


public class DataSetDemo {

	public static void main(String[] args) throws Exception {
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);

		DataSet<String> data = env.fromElements("a", "b");

		DataSet<String> out = data.map(new RichMapFunction<String, String>() {
		    @Override
		    public void open(Configuration parameters) throws Exception {
		      // 3. Access the broadcast DataSet as a Collection
		      Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
		    }


		    @Override
		    public String map(String value) throws Exception {
		        return value;
		    }
		}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet

		out.print();

		env.execute("partitionByRange Demo");

	}

}

文件缓存

为了提高访问速度和效率,TaskManager将算子实例要访问的远程文件复制到本地进行缓存起来。
使用步骤如下:

package com.ztland.flink_demo.dataSet;

import java.io.File;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;



public class DataSetDemo {

	public static void main(String[] args) throws Exception {

		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		/**
		 * 注册缓存文件,分为两类
		 */
		// 远程文件。这里的远程是相对JobManager来说的,以下是注册HDFS缓存文件的示例
		env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile");

		// 本地文件,即JobManager本地文件
		env.registerCachedFile("file:///path/to/your/file", "lcaolFile",true);

		DataSet<String> input = env.readTextFile("");

		DataSet<Integer> result = input.map(new MyMapper());

		env.execute();

	}

}

//利用RichFunction实现自定义算子函数,通过注册名称访问缓存的文件或目录

final class MyMapper extends RichMapFunction<String, Integer>{

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public void open(Configuration config) {

		// access cached file via RuntimeContext and DistributedCache
		File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
		// read the file (or navigate the directory)
	}

	@Override
	public Integer map(String value) throws Exception {
		// use content of cached file
		return null;
	}

} 

容错

批处理程序容错的方法就是重试(Retry),重试有两个参数,即故障发生后最多重试次数和在故障发生后延迟多久开始重试,配置方法如下:

public static void main(String[] args) throws Exception {

		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		// 重试次数
		env.setNumberOfExecutionRetries(3);
		
		// 重试延迟
		env.getConfig().setExecutionRetryDelay(5000);

	}

或在配置文件flink-conf.yaml中配置:

execution-retries.default: 3
execution-retries.delay: 10 s