欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink学习系列之三 DataStreamAPI之source

程序员文章站 2022-06-16 15:20:05
...

Flink API的抽象级别,如下图:

Flink学习系列之三 DataStreamAPI之source1 DataStream API之Data Sources

  1. source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。
  2. flink提供了大量的已经实现好的source方法,你也可以自定义source 通过实现sourceFunction接口来自定义无并行度的source, 或者你也可以通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义有并行度的source。
  3. 基于文件 readTextFile(path) 读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。
  4. 基于socket socketTextStream 从socker中读取数据,元素可以通过一个分隔符切开。
  5. 基于集合 fromCollection(Collection) 通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。 自定义输入
  6. addSource 可以实现读取第三方数据源的数据 系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】

2 内置Connectors

  1. Apache Kafka (source/sink)
  2. Apache Cassandra (sink)
  3. Elasticsearch (sink)
  4. Hadoop FileSystem (sink)
  5. RabbitMQ (source/sink)
  6. Apache ActiveMQ (source/sink)
  7. Redis (sink)

基于不同连接器获取到的数据源,Flink都提供有容错机制;

3 Source 容错性保证

Source

语义保证

备注

kafka

exactly once(仅一次)

建议使用0.10及以上

Collections

exactly once

 

Files

exactly once

 

Socktes

at most once

 

4 基于fromCollection(Collection)的source Demo

通过Java集合得到数据源,接着进行计算,代码如下:

package com.caozg.stream;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

/**
 *
 * @ProjectName: FlinkLearning
 * @Package: com.caozg.stream
 * @ClassName: FromCollectionStream
 * @Description: Collection datasource sink
 * @Author: GoSaint
 * @CreateDate: 19-10-30 下午9:57
 * @UpdateDate: 19-10-30 下午9:57
 * @Version: 1.0
 */
public class FromCollectionStream {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        List<Integer> datasource=new ArrayList<Integer>();

        datasource.add(10);
        datasource.add(15);
        datasource.add(20);
        DataStreamSource<Integer> source = environment.fromCollection(datasource);

        DataStream<Integer> res = source.map(new MapFunction<Integer, Integer>() {
            public Integer map(Integer input) throws Exception {
                return input + 1;
            }
        });

        res.print();
        try {
            environment.execute("FromCollectionStream");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

结果如下:

1> 16
2> 21
4> 11

前面的1,2,4指的是线程代号,因为Stream默认是并行处理,如果设置并行度为1,那么就不会出现前面的数字。

5 自定义source

  1. 实现并行度为1的自定义source 实现SourceFunction 一般不需要实现容错性保证 处理好cancel方法(cancel应用的时候,这个方法会被调用)
  2. 实现并行化的自定义source ,实现ParallelSourceFunction 或者继承RichParallelSourceFunction

demo1: 实现并行度为1的自定义source

package com.caozg.stream;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

/**
 *
 * @ProjectName: FlinkLearning
 * @Package: com.caozg.stream
 * @ClassName: SelfdDefinitionSerialSourceFunction
 * @Description: 自定义source,需要实现SourceFunction,并且指定泛型类型。这里设置串行执行
 * @Author: GoSaint
 * @CreateDate: 19-10-30 下午10:24
 * @UpdateDate: 19-10-30 下午10:24
 * @Version: 1.0
 */
public class SelfdDefinitionSerialSourceFunction implements SourceFunction<Long> {

    private boolean ISRUNNING=true;

    private Long COUNT=0L;

    public void run(SourceContext<Long> sourceContext) throws Exception {

        while (ISRUNNING){
            sourceContext.collect(COUNT);
            COUNT++;
            Thread.sleep(1000);
        }

    }

    public void cancel() {
        ISRUNNING=false;
    }
}
package com.caozg.stream;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * 实现自定义数据源的聚合操作
 *
 * @ProjectName: FlinkLearning
 * @Package: com.caozg.stream
 * @ClassName: DefinitionSerialSource
 * @Author: GoSaint
 * @CreateDate: 19-10-30 下午10:29
 * @UpdateDate: 19-10-30 下午10:29
 * @Version: 1.0
 */
public class DefinitionSerialSource {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Long> text = env.addSource(new SelfdDefinitionSerialSourceFunction());
        DataStream<Long> res = text.map(new MapFunction<Long, Long>() {
            public Long map(Long out) throws Exception {
                System.out.println("接受到的数据: " + out);
                return out;
            }//每2秒接受数据,然后sum求和
        }).timeWindowAll(Time.seconds(2)).sum(0);
        res.print().setParallelism(1);
        try {
            env.execute("DefinitionSerialSource");
        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}

结果如下所示:

接受到的数据: 0
0
接受到的数据: 1
接受到的数据: 2
3
接受到的数据: 3
接受到的数据: 4
7
接受到的数据: 5
接受到的数据: 6
11
接受到的数据: 7
接受到的数据: 8

demo2: 实现多并行度的自定义source

实现并行化的自定义source   ①实现ParallelSourceFunction ②继承RichParallelSourceFunction。这里我选择继承RichParallelSourceFunction,实现接口的写法和单线程的基本一致。

package com.caozg.stream;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

/**
 *
 * @ProjectName: FlinkLearning
 * @Package: com.caozg.stream
 * @ClassName: SelfdDefinitionParallelSourceFunction
 * @Description: 自定义source,需要实现SourceFunction,并且指定泛型类型。这里设置并行执行
 * @Author: GoSaint
 * @CreateDate: 19-10-30 下午10:24
 * @UpdateDate: 19-10-30 下午10:24
 * @Version: 1.0
 */
public class SelfdDefinitionParallelSourceFunction extends RichParallelSourceFunction<Long> {


    private boolean ISRUNNING=true;

    private Long COUNT=0L;

    public void run(SourceContext<Long> sourceContext) throws Exception {

        while (ISRUNNING){
            sourceContext.collect(COUNT);
            COUNT++;
            Thread.sleep(1000);
        }

    }

    public void cancel() {
        ISRUNNING=false;
    }

    /**
     * 打开链接
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    /**
     * 关闭链接
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        super.close();
    }
}
package com.caozg.stream;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;


public class DefinitionParallelSource {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Long> text = env.addSource(new SelfdDefinitionParallelSourceFunction());
        DataStream<Long> res = text.map(new MapFunction<Long, Long>() {
            public Long map(Long out) throws Exception {
                System.out.println("接受到的数据: " + out);
                return out;
            }//每2秒接受数据,然后sum求和
        }).timeWindowAll(Time.seconds(2)).sum(0);
        //这里不设置并行度,默认多线程执行
        res.print();
        try {
            env.execute("DefinitionSerialSource");
        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}

结果如下:

接受到的数据: 0
3> 0
接受到的数据: 1
接受到的数据: 1
接受到的数据: 2
接受到的数据: 2
接受到的数据: 2
4> 8
接受到的数据: 3
接受到的数据: 4
接受到的数据: 4
1> 19
接受到的数据: 5
接受到的数据: 5
接受到的数据: 6
2> 35
接受到的数据: 7
接受到的数据: 7