Flink DataSet API - Transformations
DataSet API - Transformations
数据转换
数据转是换将一个或多个DataSet 转换成为一个新的DataSet 。程序可以将多个DataSet 转换组合成复杂的DataSet。
Map
对整个DataSet做一对一映射,即每个元素对应产生一个新元素。
data.map(new MapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
FlatMap
获取一个元素并生成对应的零个、一个或多个元素。
data.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String value, Collector<String> out) {
for (String s : value.split(" ")) {
out.collect(s);
}
}
});
MapPartition
Map和FlatMap转换的对象是DataSet中的每个元素,而MapPartition转换的对象是DataSet的每个分区。
data.mapPartition(new MapPartitionFunction<String, Long>() {
public void mapPartition(Iterable<String> values, Collector<Long> out) {
long c = 0;
for (String s : values) {
c++;
}
out.collect(c);
}
})
Filter
过滤出符合条件的元素
data.filter(new FilterFunction<Integer>() {
public boolean filter(Integer value) { return value > 1000; }
});
SortPartition
根据DataSet的某个属性域进行升序或降序排序。
DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING)
.mapPartition(new PartitionMapper());
Reduce
通过将两个元素反复组合为一个元素,将一组元素组合为一个元素。Reduce可以应用于完整的DataSet或分组的DataSet。
如果将reduce应用于分组DataSet,则可以通过向setCombineHint提供一个CombineHint来指定运行时执行reduce的组合阶段的方式。在大多数情况下,基于散列的策略应该更快,特别是具有很多不同键的时候。
data.reduce(new ReduceFunction<Integer> {
public Integer reduce(Integer a, Integer b) { return a + b; }
});
Aggregate
将一组值聚合为单个值。聚合函数可以看作是内置的reduce函数。聚合可以应用于完整的DataSet,也可以应用于分组的DataSet。
Aggregate 函数包括求和(SUM)、求最小值(SUM)、求最大值(MAX)。
作用于分组上
根据String分组后,先对第一个Int求和,在对Double求最小值:
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, String, Double>> input = env.fromElements(
Tuple3.of(1, "hello", 4.0),
Tuple3.of(1, "hello", 5.0),
Tuple3.of(2, "hello", 5.0),
Tuple3.of(3, "world", 6.0),
Tuple3.of(3, "world", 6.0)
);
DataSet<Tuple3<Integer, String, Double>> output = input.groupBy(1)
.aggregate(Aggregations.SUM, 0)
.and(Aggregations.MIN, 2);
output.print();
env.execute("Aggregations Demo");
}
结果:
(6,world,6.0)
(4,hello,4.0)
作用于整个DataSet上
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, String, Double>> input = env.fromElements(
Tuple3.of(1, "hello", 4.0),
Tuple3.of(1, "hello", 5.0),
Tuple3.of(2, "hello", 5.0),
Tuple3.of(3, "world", 6.0),
Tuple3.of(3, "world", 6.0)
);
DataSet<Tuple3<Integer, String, Double>> output = input
.aggregate(Aggregations.SUM, 0)
.and(Aggregations.MIN, 2);
output.print();
env.execute("Aggregations Demo");
}
结果:
(10,world,4.0)
minBy/maxBy
从一组元组中选择一个元组,取其中一个或多个字段的值为最小值(最大值)。用于比较的字段必须是有效的关键字段,即具有可比性。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。MinBy (MaxBy)可以应用于完整数据集或分组数据集。
作用于分组上
根据String分组后,先对Int属性求最小值,若对应最小值存在多个元祖,在根据double求最小值。
package com.ztland.flink_demo.dataSet;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
public class DataSetMinBy {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, String, Double>> input = env.fromElements(
Tuple3.of(1, "hello", 5.0),
Tuple3.of(1, "hello", 4.0),
Tuple3.of(2, "hello", 5.0),
Tuple3.of(3, "world", 7.0),
Tuple3.of(4, "world", 6.0)
);
DataSet<Tuple3<Integer, String, Double>> output = input.groupBy(1)
.minBy(0,2);
output.print();
env.execute("minBy Demo");
}
结果:
(1,hello,4.0)
(3,world,7.0)
}
作用于整个DataSet上
package com.ztland.flink_demo.dataSet;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
public class DataSetCreateFlow {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, String, Double>> input = env.fromElements(
Tuple3.of(1, "hello", 5.0),
Tuple3.of(1, "hello", 4.0),
Tuple3.of(2, "hello", 5.0),
Tuple3.of(3, "world", 7.0),
Tuple3.of(4, "world", 6.0)
);
DataSet<Tuple3<Integer, String, Double>> output = input
.minBy(0,2);
output.print();
env.execute("minBy Demo");
}
}
结果:
(1,hello,4.0)
GroupReduce
可作用于分组的DataSet或整个的DataSet上,通过迭代器访问DataSet中的所有元素。
package com.ztland.flink_demo.dataSet;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
public class DataSetReduceGroup {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, String, Double>> input = env.fromElements(
Tuple3.of(1, "hello", 5.0),
Tuple3.of(1, "hello", 4.0),
Tuple3.of(2, "hello", 5.0),
Tuple3.of(3, "world", 7.0),
Tuple3.of(4, "world", 6.0)
);
DataSet<Tuple3<Integer, String, Double>> output = input.groupBy(1)
.reduceGroup(new GroupReduceFunction<Tuple3<Integer,String,Double>, Tuple3<Integer,String,Double>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void reduce(Iterable<Tuple3<Integer, String, Double>> in,
Collector<Tuple3<Integer, String, Double>> out) throws Exception {
// TODO Auto-generated method stub
for(Tuple3<Integer, String, Double> tuple : in) {
out.collect(tuple);
}
}
});
output.print();
env.execute("reduceGroup Demo");
}
}
结果:
(3,world,7.0)
(4,world,6.0)
(1,hello,5.0)
(1,hello,4.0)
(2,hello,5.0)
Distinct
Flink支持每个元素去重,也支持根据key的位置去重。
package com.ztland.flink_demo.dataSet;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
public class DataSetReduceGroup {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, String, Double>> input = env.fromElements(
Tuple3.of(1, "hello", 5.0),
Tuple3.of(1, "hello", 5.0),
Tuple3.of(2, "hello", 5.0),
Tuple3.of(3, "world", 7.0),
Tuple3.of(4, "world", 6.0)
);
// 对所有元素去重
// DataSet<Tuple3<Integer, String, Double>> output = input.distinct();
// 对Int和String去重
DataSet<Tuple3<Integer, String, Double>> output = input.distinct(0,1);
DataSet<Integer> inputInteger = env.fromElements(1,2,3,4,5,5);
// 对转换结果去重
DataSet<Integer> outputInteger = inputInteger.distinct(
x -> Math.abs(x)
);
output.print();
outputInteger.print();
env.execute("reduceGroup Demo");
}
}
Join
连接分为两类:
- 内连接
- 外连接
- 左外连接
- 右外连接
- 全外连接
内连接有以下三种形式:
不带连接函数的形式。where和equalTo分别定义被连接的两个数据集的属性位置,输出者两个位置上相等的元素。
package com.ztland.flink_demo.dataSet;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
public class DataSetJoin {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, String>> input1 = env.fromElements(
Tuple2.of(1, "hello"),
Tuple2.of(2, "world")
);
DataSet<Tuple2<String,Integer>> input2 = env.fromElements(
Tuple2.of("hello",1),
Tuple2.of("world",2)
);
// input1的第0位,于input2的第1位,值相等则连接,不对结果调用连接函数
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<String, Integer>>> output = input1.join(input2)
.where(0)
.equalTo(1);
output.print();
env.execute("join Demo");
}
}
结果:
((1,hello),(hello,1))
((2,world),(world,2))
带JoinFunction函数的:
笛卡儿积
构建两个DataSet的笛卡儿积。
package com.ztland.flink_demo.dataSet;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
public class DataSetCross {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> data1 = env.fromElements(1,2,3);
DataSet<String> data2 = env.fromElements("one","tow","three");
DataSet<Tuple2<Integer, String>> result = data1.cross(data2);
result.print();
env.execute("cross Demo");
}
}
结果:
(1,one)
(1,tow)
(1,three)
(2,one)
(2,tow)
(2,three)
(3,one)
(3,tow)
(3,three)
Union
类似RDBMS中的union,DataSet的Union将多个相同类型的数据集拼接在一起。
package com.ztland.flink_demo.dataSet;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
public class DataSetUnion {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, Integer>> data1 = env.fromElements(
Tuple2.of("hello", 1),
Tuple2.of("world", 2)
);
DataSet<Tuple2<String, Integer>> data2 = env.fromElements(
Tuple2.of("hello", 1),
Tuple2.of("world", 2)
);
DataSet<Tuple2<String, Integer>> data3 = env.fromElements(
Tuple2.of("hello", 1),
Tuple2.of("world", 2)
);
DataSet<Tuple2<String, Integer>> result = data1.union(data2).union(data3);
result.print();
env.execute("union Demo");
}
}
结果:
(hello,1)
(hello,1)
(hello,1)
(world,2)
(world,2)
(world,2)
数据分区
-
Rebalance模式:根据轮询调度算法,将数据均匀地发送给下一级节点。
``` package com.ztland.flink_demo.dataSet; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; public class DataSetDemo { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> in = env.fromElements( "hello", "world" ); DataSet<String> out = in.rebalance(); out.print(); env.execute("rebalance Demo"); } } ```
- Hash-Partition模式:根据元祖的某个属性域进行散列分区。
package com.ztland.flink_demo.dataSet; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; public class DataSetDemo { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<String, Integer>> in = env.fromElements( Tuple2.of("hello", 1), Tuple2.of("world", 2) ); DataSet<Tuple2<String, Integer>> out = in.partitionByHash(0); out.print(); env.execute("partitionByHash Demo"); } }
-
Range-Partition模式:根据某个属性的范围进行分区。
package com.ztland.flink_demo.dataSet; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; public class DataSetDemo { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<String, Integer>> in = env.fromElements( Tuple2.of("hello", 1), Tuple2.of("world", 2) ); DataSet<Tuple2<String, Integer>> out = in.partitionByRange(0); out.print(); env.execute("partitionByRange Demo"); } }
广播变量
广播变量是算子的多个并行实例间共享数据的一类方法,主要特点如下:
- 动态数据共享。算子间共享的输入和配置参数是静态的,而广播变量共享的数据是动态的。
- 可以分发更大规模的对象。应用程序可引用闭包中的变量,这种数据共享对象是小规模的,而广播变量则能分发更大规模的对象。
- 广播变量以名称广播和访问
广播变量以集合的方式定义在需要共享的算子上,算子的每一个实例可以通过集合访问共享变量。共享变量会在任务初始化时被发送到并行实例所在的节点上,并存储在TaskManager的内存里,因此尽管用于分发大规模对象,共享变更也不宜过大。
使用方法:
package com.ztland.flink_demo.dataSet;
import java.util.Collection;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import scala.collection.Traversable;
public class DataSetDemo {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
DataSet<String> data = env.fromElements("a", "b");
DataSet<String> out = data.map(new RichMapFunction<String, String>() {
@Override
public void open(Configuration parameters) throws Exception {
// 3. Access the broadcast DataSet as a Collection
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
}
@Override
public String map(String value) throws Exception {
return value;
}
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet
out.print();
env.execute("partitionByRange Demo");
}
}
文件缓存
为了提高访问速度和效率,TaskManager将算子实例要访问的远程文件复制到本地进行缓存起来。
使用步骤如下:
package com.ztland.flink_demo.dataSet;
import java.io.File;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
public class DataSetDemo {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
/**
* 注册缓存文件,分为两类
*/
// 远程文件。这里的远程是相对JobManager来说的,以下是注册HDFS缓存文件的示例
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile");
// 本地文件,即JobManager本地文件
env.registerCachedFile("file:///path/to/your/file", "lcaolFile",true);
DataSet<String> input = env.readTextFile("");
DataSet<Integer> result = input.map(new MyMapper());
env.execute();
}
}
//利用RichFunction实现自定义算子函数,通过注册名称访问缓存的文件或目录
final class MyMapper extends RichMapFunction<String, Integer>{
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void open(Configuration config) {
// access cached file via RuntimeContext and DistributedCache
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
// read the file (or navigate the directory)
}
@Override
public Integer map(String value) throws Exception {
// use content of cached file
return null;
}
}
容错
批处理程序容错的方法就是重试(Retry),重试有两个参数,即故障发生后最多重试次数和在故障发生后延迟多久开始重试,配置方法如下:
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 重试次数
env.setNumberOfExecutionRetries(3);
// 重试延迟
env.getConfig().setExecutionRetryDelay(5000);
}
或在配置文件flink-conf.yaml中配置:
execution-retries.default: 3
execution-retries.delay: 10 s
上一篇: 集成方法
下一篇: Android实现Toast快速刷新
推荐阅读
-
Flink实战(六) - Table API & SQL编程
-
idea中flink启动报错org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters
-
Flink 1.8 Basic API Concepts 基本API概念
-
Flink入门(五)——DataSet Api编程指南
-
Flink DataStream API之Operators
-
[flink]#31_扩展库:State Processor API
-
1.11.Flink DataSetAPI、DataSet API之Data Sources、DataSet API之Transformations、DataSet Sink部分详解
-
Flink DataSet Kafka Sink
-
Flink DataSet API
-
Flink Dataset Api(七)分布式缓存