欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink学习之旅----DataSet API 开发概述

程序员文章站 2022-07-14 13:48:42
...

概述

DataSet概述

DataSet在Flink里面是一种十分常规的编程,你只需要去实现在数据集上面进行转化
数据集可以通过读文件,本地的集合加载进去 Source 源头
结果将会通过Sink(目的地)被返回 可以写到本地的文件或者标准的输出
Flink可以运行在各种环境中

构建DataSource

如何去构建dataSource

//从集合创建dataset
public class DataSetSource {
    public static void main(String[] args) throws Exception {
        //获取执行环境
        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();

        fromCollection(env);

    }
    //定义获取数据源的方法
    public static void fromCollection(ExecutionEnvironment env) throws Exception{
        List<Integer> list=new ArrayList<>();
        for(int i=1;i<=10;i++){
            list.add(i);
        }
        //给环境传递一个集合
        env.fromCollection(list).print();
    }
}



//从文件或者文件夹创建dataset
//文件夹是把所有的文件读出来 批处理顺序没有关系
public static void textFile(ExecutionEnvironment env) throws Exception{
        String filePath="file:///Users/hbin/Desktop/workspace/flinkproject/hello.txt";
        env.readTextFile(filePath).print();
        System.out.println("------------分割线------------");
        filePath="file:///Users/hbin/Desktop/workspace/flinkproject/inputs";
        env.readTextFile(filePath).print();
    }

Transformation

map

public class DataSetTransformation  {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
        mapFunction(env);

    }
    public static void mapFunction(ExecutionEnvironment env)throws Exception{
        List<Integer> list=new ArrayList<Integer>();
        for(int i=1;i<=10;i++){ 
            list.add(i);
        }
        DataSource<Integer> data=env.fromCollection(list);
        data.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer input) throws Exception {
                return input+1;
            }
        }).print();
    }
}

filter

public static void filterFunction(ExecutionEnvironment env) throws Exception{
        List<Integer> list=new ArrayList<Integer>();
        for(int i=1;i<=10;i++){
            list.add(i);
        }
        DataSource<Integer> data=env.fromCollection(list);
        data.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer input) throws Exception {
                return input+1;
            }
        }).filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer input) throws Exception {
                return input>5;
            }
        }).print();
    }

mappartition 减少对数据库的链接 写数据出去

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception{
        List<String> list=new ArrayList<String>();
        for(int i=1;i<=10;i++){
            list.add("Student"+i);
        }
        DataSource<String> data=env.fromCollection(list).setParallelism(6);
        data.mapPartition(new MapPartitionFunction<String, String>() {
            @Override
            public void mapPartition(Iterable<String> iterable, Collector<String> collector) throws Exception {
                String connection=DBUtils.getConection();
                System.out.println("connection="+connection);
                DBUtils.returnConnection();
            }
        }).print();

    }

first

public static void firstFunction(ExecutionEnvironment env) throws Exception{
        List<Tuple2<Integer,String>> info=new ArrayList<Tuple2<Integer,String>>();
        info.add(new Tuple2(1,"Hadoop"));
        info.add(new Tuple2(1,"Spark"));
        info.add(new Tuple2(1,"Flink"));
        info.add(new Tuple2(2,"Java"));
        info.add(new Tuple2(2,"SpringBoot"));
        info.add(new Tuple2(3,"Linux"));
        info.add(new Tuple2(4,"Flutter"));

        DataSource<Tuple2<Integer,String>> data= env.fromCollection(info);
        data.first(3).print();                              //就是打印前三条
        System.out.println("-----华丽的分割线-----------");
        data.groupBy(0).first(2).print();           //按照第一个字段分组,每组取两条
        System.out.println("-----华丽的分割线-----------");
        data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();    //按照第一个字段分组,组里面进行排序,每组取两条
    }

flatmap 作用在一个或者多个对象上面

public static void flatMapFunction(ExecutionEnvironment env) throws Exception {
        List<String> info = new ArrayList<String>();
        info.add("hadoop,spark");
        info.add("hadoop,flink");
        info.add("flink,flink");

        DataSource<String> data=env.fromCollection(info);

        data.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String input, Collector<String> collector) throws Exception {
                //分割每个单词
                String[] splits=input.split(",");
                for(String split:splits){
                    collector.collect(split);
                }
            }
        }).map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                //每个单词附上一个1
                return new Tuple2<String, Integer>(s,1);
            }
        }).groupBy(0).sum(1).print();   // grouBy  求和
    }

distinct 去重

public static void distinctFunction(ExecutionEnvironment env) throws Exception {
        List<String> info = new ArrayList<String>();
        info.add("hadoop,spark");
        info.add("hadoop,flink");
        info.add("flink,flink");

        DataSource<String> data=env.fromCollection(info);

        data.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String input, Collector<String> collector) throws Exception {
                //分割每个单词
                String[] splits=input.split(",");
                for(String split:splits){
                    collector.collect(split);
                }
            }
        }).distinct().print();
    }

join 内连接

public static void joinFunction(ExecutionEnvironment env) throws Exception {
        List<Tuple2<Integer, String>> info1 = new ArrayList<Tuple2<Integer, String>>();
        info1.add(new Tuple2(1, "PK哥"));
        info1.add(new Tuple2(2, "J哥"));
        info1.add(new Tuple2(3, "小队长"));
        info1.add(new Tuple2(5, "猪头户"));

        List<Tuple2<Integer, String>> info2 = new ArrayList<Tuple2<Integer, String>>();
        info2.add(new Tuple2(1, "北京"));
        info2.add(new Tuple2(2, "上海"));
        info2.add(new Tuple2(3, "成都"));
        info2.add(new Tuple2(4, "杭州"));

        DataSource<Tuple2<Integer,String>> data1=env.fromCollection(info1);
        DataSource<Tuple2<Integer,String>> data2=env.fromCollection(info2);
        //where  左边的   equalTo  右边    
        data1.join(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() {
            @Override
            public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                return new Tuple3<Integer, String, String>(first.f0,first.f1,second.f1);
            }
        }).print();
    }

outjoin 外连接 要记得做判断

public static void outJoinFunction(ExecutionEnvironment env) throws Exception {
        List<Tuple2<Integer, String>> info1 = new ArrayList<Tuple2<Integer, String>>();
        info1.add(new Tuple2(1, "PK哥"));
        info1.add(new Tuple2(2, "J哥"));
        info1.add(new Tuple2(3, "小队长"));
        info1.add(new Tuple2(5, "猪头户"));

        List<Tuple2<Integer, String>> info2 = new ArrayList<Tuple2<Integer, String>>();
        info2.add(new Tuple2(1, "北京"));
        info2.add(new Tuple2(2, "上海"));
        info2.add(new Tuple2(3, "成都"));
        info2.add(new Tuple2(4, "杭州"));

        DataSource<Tuple2<Integer,String>> data1=env.fromCollection(info1);
        DataSource<Tuple2<Integer,String>> data2=env.fromCollection(info2);

//        data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() {
//            @Override
//            public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
//                if(second==null){
//                    return new Tuple3<Integer, String, String>(first.f0,first.f1,"-");
//                }else{
//                    return new Tuple3<Integer, String, String>(first.f0,first.f1,second.f1);
//                }
//
//        }
//        data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() {
//            @Override
//            public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
//                if(first==null){
//                    return new Tuple3<Integer, String, String>(second.f0,"-",second.f1);
//                }else{
//                    return new Tuple3<Integer, String, String>(second.f0,first.f1,second.f1);
//                }
//            }
//        }).print();
        data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() {
            @Override
            public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                if(first==null){
                    return new Tuple3<Integer, String, String>(second.f0,"-",second.f1);
                }else if(second==null){
                    return new Tuple3<Integer, String, String>(first.f0,first.f1,"-");
                }else{
                    return new Tuple3<Integer, String, String>(first.f0,first.f1,second.f1);
                }
            }
        }).print();
    }

cross

public static void crossFunction(ExecutionEnvironment env) throws Exception {
        List<String> info1 = new ArrayList<String>();
        info1.add("曼联");
        info1.add("曼城");

        List<String> info2 = new ArrayList<String>();
        info2.add("3");
        info2.add("1");
        info2.add("0");

        DataSource<String> data1=env.fromCollection(info1);
        DataSource<String> data2=env.fromCollection(info2);

        data1.cross(data2).print();
    }

Sink

写入文件

public class DatasetSink {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();

        List<Integer> info=new ArrayList<Integer>();

        for(int i=1;i<=10;i++){
            info.add(i);
        }

        String filePath="file:///Users/hbin/Desktop/workspace/flinkproject/write.txt";

        DataSource<Integer> data=env.fromCollection(info);
        data.writeAsText(filePath, FileSystem.WriteMode.NO_OVERWRITE);
        env.execute("DatasetSink");
    }

}

计数器

public class Counter {

    public static void main(String[] args) throws Exception{

        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> data=env.fromElements("hadoop","spark","flink","pyspark","storm");

        DataSet<String> info=data.map(new RichMapFunction<String, String>() {

            LongCounter counter=new LongCounter();          //定义一个累加器

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);

                getRuntimeContext().addAccumulator("ele-counts-Java",counter);  //注册一个累加器
            }

            @Override
            public String map(String s) throws Exception {
                counter.add(1);                                     //累加器加一
                return s;
            }
        });

        String filePath="file:///Users/hbin/Desktop/workspace/flinkproject/write1.txt";
        info.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE).setParallelism(3);
        JobExecutionResult jobResult=env.execute("Counter");

        long num=jobResult.getAccumulatorResult("ele-counts-Java");

        System.out.println("Counter="+num);



    }

}

分布式缓存

简介

这个是Hadoop类似的,就是本地的文件可以被访问到,可以被并行的函数所访问到,这些函数可以用来共享外部的文件,比如是字典或者机器学习用到的回归模型

public class DistributedCache {

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();

        String filePath="file:///Users/hbin/Desktop/workspace/flinkproject/hello.txt";

        //step1:注册一个本地/HDFS文件
        env.registerCachedFile(filePath,"pk-java-dc");

        DataSource<String> data=env.fromElements("hadoop","spark","flink","pyspark","storm");

        data.map(new RichMapFunction<String, String>() {

            List<String> list=new ArrayList<String>();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);

                File file=getRuntimeContext().getDistributedCache().getFile("pk-java-dc");

                List<String> lines=FileUtils.readLines(file);
                for(String line:lines){
                    list.add(line);

                    System.out.println("line="+line);
                }
            }

            @Override
            public String map(String s) throws Exception {
                return s;
            }
        }).print();



    }
}

今天就主要给大家分享批处理的主要API,结果就大家动手做做看吧。源代码GitHub地址是项目地址,编写不易,希望给我一个赞和star

相关标签: Flink批处理