Flink学习之旅----DataSet API 开发概述
程序员文章站
2022-07-14 13:48:42
...
DataSet API 开发概述
概述
DataSet概述
DataSet在Flink里面是一种十分常规的编程,你只需要去实现在数据集上面进行转化
数据集可以通过读文件,本地的集合加载进去 Source 源头
结果将会通过Sink(目的地)被返回 可以写到本地的文件或者标准的输出
Flink可以运行在各种环境中
构建DataSource
如何去构建dataSource
//从集合创建dataset
public class DataSetSource {
public static void main(String[] args) throws Exception {
//获取执行环境
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
fromCollection(env);
}
//定义获取数据源的方法
public static void fromCollection(ExecutionEnvironment env) throws Exception{
List<Integer> list=new ArrayList<>();
for(int i=1;i<=10;i++){
list.add(i);
}
//给环境传递一个集合
env.fromCollection(list).print();
}
}
//从文件或者文件夹创建dataset
//文件夹是把所有的文件读出来 批处理顺序没有关系
public static void textFile(ExecutionEnvironment env) throws Exception{
String filePath="file:///Users/hbin/Desktop/workspace/flinkproject/hello.txt";
env.readTextFile(filePath).print();
System.out.println("------------分割线------------");
filePath="file:///Users/hbin/Desktop/workspace/flinkproject/inputs";
env.readTextFile(filePath).print();
}
Transformation
map
public class DataSetTransformation {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
mapFunction(env);
}
public static void mapFunction(ExecutionEnvironment env)throws Exception{
List<Integer> list=new ArrayList<Integer>();
for(int i=1;i<=10;i++){
list.add(i);
}
DataSource<Integer> data=env.fromCollection(list);
data.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer input) throws Exception {
return input+1;
}
}).print();
}
}
filter
public static void filterFunction(ExecutionEnvironment env) throws Exception{
List<Integer> list=new ArrayList<Integer>();
for(int i=1;i<=10;i++){
list.add(i);
}
DataSource<Integer> data=env.fromCollection(list);
data.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer input) throws Exception {
return input+1;
}
}).filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer input) throws Exception {
return input>5;
}
}).print();
}
mappartition 减少对数据库的链接 写数据出去
public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception{
List<String> list=new ArrayList<String>();
for(int i=1;i<=10;i++){
list.add("Student"+i);
}
DataSource<String> data=env.fromCollection(list).setParallelism(6);
data.mapPartition(new MapPartitionFunction<String, String>() {
@Override
public void mapPartition(Iterable<String> iterable, Collector<String> collector) throws Exception {
String connection=DBUtils.getConection();
System.out.println("connection="+connection);
DBUtils.returnConnection();
}
}).print();
}
first
public static void firstFunction(ExecutionEnvironment env) throws Exception{
List<Tuple2<Integer,String>> info=new ArrayList<Tuple2<Integer,String>>();
info.add(new Tuple2(1,"Hadoop"));
info.add(new Tuple2(1,"Spark"));
info.add(new Tuple2(1,"Flink"));
info.add(new Tuple2(2,"Java"));
info.add(new Tuple2(2,"SpringBoot"));
info.add(new Tuple2(3,"Linux"));
info.add(new Tuple2(4,"Flutter"));
DataSource<Tuple2<Integer,String>> data= env.fromCollection(info);
data.first(3).print(); //就是打印前三条
System.out.println("-----华丽的分割线-----------");
data.groupBy(0).first(2).print(); //按照第一个字段分组,每组取两条
System.out.println("-----华丽的分割线-----------");
data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print(); //按照第一个字段分组,组里面进行排序,每组取两条
}
flatmap 作用在一个或者多个对象上面
public static void flatMapFunction(ExecutionEnvironment env) throws Exception {
List<String> info = new ArrayList<String>();
info.add("hadoop,spark");
info.add("hadoop,flink");
info.add("flink,flink");
DataSource<String> data=env.fromCollection(info);
data.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String input, Collector<String> collector) throws Exception {
//分割每个单词
String[] splits=input.split(",");
for(String split:splits){
collector.collect(split);
}
}
}).map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
//每个单词附上一个1
return new Tuple2<String, Integer>(s,1);
}
}).groupBy(0).sum(1).print(); // grouBy 求和
}
distinct 去重
public static void distinctFunction(ExecutionEnvironment env) throws Exception {
List<String> info = new ArrayList<String>();
info.add("hadoop,spark");
info.add("hadoop,flink");
info.add("flink,flink");
DataSource<String> data=env.fromCollection(info);
data.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String input, Collector<String> collector) throws Exception {
//分割每个单词
String[] splits=input.split(",");
for(String split:splits){
collector.collect(split);
}
}
}).distinct().print();
}
join 内连接
public static void joinFunction(ExecutionEnvironment env) throws Exception {
List<Tuple2<Integer, String>> info1 = new ArrayList<Tuple2<Integer, String>>();
info1.add(new Tuple2(1, "PK哥"));
info1.add(new Tuple2(2, "J哥"));
info1.add(new Tuple2(3, "小队长"));
info1.add(new Tuple2(5, "猪头户"));
List<Tuple2<Integer, String>> info2 = new ArrayList<Tuple2<Integer, String>>();
info2.add(new Tuple2(1, "北京"));
info2.add(new Tuple2(2, "上海"));
info2.add(new Tuple2(3, "成都"));
info2.add(new Tuple2(4, "杭州"));
DataSource<Tuple2<Integer,String>> data1=env.fromCollection(info1);
DataSource<Tuple2<Integer,String>> data2=env.fromCollection(info2);
//where 左边的 equalTo 右边
data1.join(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
return new Tuple3<Integer, String, String>(first.f0,first.f1,second.f1);
}
}).print();
}
outjoin 外连接 要记得做判断
public static void outJoinFunction(ExecutionEnvironment env) throws Exception {
List<Tuple2<Integer, String>> info1 = new ArrayList<Tuple2<Integer, String>>();
info1.add(new Tuple2(1, "PK哥"));
info1.add(new Tuple2(2, "J哥"));
info1.add(new Tuple2(3, "小队长"));
info1.add(new Tuple2(5, "猪头户"));
List<Tuple2<Integer, String>> info2 = new ArrayList<Tuple2<Integer, String>>();
info2.add(new Tuple2(1, "北京"));
info2.add(new Tuple2(2, "上海"));
info2.add(new Tuple2(3, "成都"));
info2.add(new Tuple2(4, "杭州"));
DataSource<Tuple2<Integer,String>> data1=env.fromCollection(info1);
DataSource<Tuple2<Integer,String>> data2=env.fromCollection(info2);
// data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() {
// @Override
// public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
// if(second==null){
// return new Tuple3<Integer, String, String>(first.f0,first.f1,"-");
// }else{
// return new Tuple3<Integer, String, String>(first.f0,first.f1,second.f1);
// }
//
// }
// data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() {
// @Override
// public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
// if(first==null){
// return new Tuple3<Integer, String, String>(second.f0,"-",second.f1);
// }else{
// return new Tuple3<Integer, String, String>(second.f0,first.f1,second.f1);
// }
// }
// }).print();
data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
if(first==null){
return new Tuple3<Integer, String, String>(second.f0,"-",second.f1);
}else if(second==null){
return new Tuple3<Integer, String, String>(first.f0,first.f1,"-");
}else{
return new Tuple3<Integer, String, String>(first.f0,first.f1,second.f1);
}
}
}).print();
}
cross
public static void crossFunction(ExecutionEnvironment env) throws Exception {
List<String> info1 = new ArrayList<String>();
info1.add("曼联");
info1.add("曼城");
List<String> info2 = new ArrayList<String>();
info2.add("3");
info2.add("1");
info2.add("0");
DataSource<String> data1=env.fromCollection(info1);
DataSource<String> data2=env.fromCollection(info2);
data1.cross(data2).print();
}
Sink
写入文件
public class DatasetSink {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
List<Integer> info=new ArrayList<Integer>();
for(int i=1;i<=10;i++){
info.add(i);
}
String filePath="file:///Users/hbin/Desktop/workspace/flinkproject/write.txt";
DataSource<Integer> data=env.fromCollection(info);
data.writeAsText(filePath, FileSystem.WriteMode.NO_OVERWRITE);
env.execute("DatasetSink");
}
}
计数器
public class Counter {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> data=env.fromElements("hadoop","spark","flink","pyspark","storm");
DataSet<String> info=data.map(new RichMapFunction<String, String>() {
LongCounter counter=new LongCounter(); //定义一个累加器
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
getRuntimeContext().addAccumulator("ele-counts-Java",counter); //注册一个累加器
}
@Override
public String map(String s) throws Exception {
counter.add(1); //累加器加一
return s;
}
});
String filePath="file:///Users/hbin/Desktop/workspace/flinkproject/write1.txt";
info.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE).setParallelism(3);
JobExecutionResult jobResult=env.execute("Counter");
long num=jobResult.getAccumulatorResult("ele-counts-Java");
System.out.println("Counter="+num);
}
}
分布式缓存
简介
这个是Hadoop类似的,就是本地的文件可以被访问到,可以被并行的函数所访问到,这些函数可以用来共享外部的文件,比如是字典或者机器学习用到的回归模型
public class DistributedCache {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
String filePath="file:///Users/hbin/Desktop/workspace/flinkproject/hello.txt";
//step1:注册一个本地/HDFS文件
env.registerCachedFile(filePath,"pk-java-dc");
DataSource<String> data=env.fromElements("hadoop","spark","flink","pyspark","storm");
data.map(new RichMapFunction<String, String>() {
List<String> list=new ArrayList<String>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
File file=getRuntimeContext().getDistributedCache().getFile("pk-java-dc");
List<String> lines=FileUtils.readLines(file);
for(String line:lines){
list.add(line);
System.out.println("line="+line);
}
}
@Override
public String map(String s) throws Exception {
return s;
}
}).print();
}
}
今天就主要给大家分享批处理的主要API,结果就大家动手做做看吧。源代码GitHub地址是项目地址,编写不易,希望给我一个赞和star