Flink学习系列之三 DataStreamAPI之source
程序员文章站
2022-06-16 15:20:05
...
Flink API的抽象级别,如下图:
1 DataStream API之Data Sources
- source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。
- flink提供了大量的已经实现好的source方法,你也可以自定义source 通过实现sourceFunction接口来自定义无并行度的source, 或者你也可以通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义有并行度的source。
- 基于文件 readTextFile(path) 读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。
- 基于socket socketTextStream 从socker中读取数据,元素可以通过一个分隔符切开。
- 基于集合 fromCollection(Collection) 通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。 自定义输入
- addSource 可以实现读取第三方数据源的数据 系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】
2 内置Connectors
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache ActiveMQ (source/sink)
- Redis (sink)
基于不同连接器获取到的数据源,Flink都提供有容错机制;
3 Source 容错性保证
Source |
语义保证 |
备注 |
kafka |
exactly once(仅一次) |
建议使用0.10及以上 |
Collections |
exactly once |
|
Files |
exactly once |
|
Socktes |
at most once |
|
4 基于fromCollection(Collection)的source Demo
通过Java集合得到数据源,接着进行计算,代码如下:
package com.caozg.stream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
/**
*
* @ProjectName: FlinkLearning
* @Package: com.caozg.stream
* @ClassName: FromCollectionStream
* @Description: Collection datasource sink
* @Author: GoSaint
* @CreateDate: 19-10-30 下午9:57
* @UpdateDate: 19-10-30 下午9:57
* @Version: 1.0
*/
public class FromCollectionStream {
public static void main(String[] args) {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
List<Integer> datasource=new ArrayList<Integer>();
datasource.add(10);
datasource.add(15);
datasource.add(20);
DataStreamSource<Integer> source = environment.fromCollection(datasource);
DataStream<Integer> res = source.map(new MapFunction<Integer, Integer>() {
public Integer map(Integer input) throws Exception {
return input + 1;
}
});
res.print();
try {
environment.execute("FromCollectionStream");
} catch (Exception e) {
e.printStackTrace();
}
}
}
结果如下:
1> 16
2> 21
4> 11
前面的1,2,4指的是线程代号,因为Stream默认是并行处理,如果设置并行度为1,那么就不会出现前面的数字。
5 自定义source
- 实现并行度为1的自定义source 实现SourceFunction 一般不需要实现容错性保证 处理好cancel方法(cancel应用的时候,这个方法会被调用)
- 实现并行化的自定义source ,实现ParallelSourceFunction 或者继承RichParallelSourceFunction
demo1: 实现并行度为1的自定义source
package com.caozg.stream;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
/**
*
* @ProjectName: FlinkLearning
* @Package: com.caozg.stream
* @ClassName: SelfdDefinitionSerialSourceFunction
* @Description: 自定义source,需要实现SourceFunction,并且指定泛型类型。这里设置串行执行
* @Author: GoSaint
* @CreateDate: 19-10-30 下午10:24
* @UpdateDate: 19-10-30 下午10:24
* @Version: 1.0
*/
public class SelfdDefinitionSerialSourceFunction implements SourceFunction<Long> {
private boolean ISRUNNING=true;
private Long COUNT=0L;
public void run(SourceContext<Long> sourceContext) throws Exception {
while (ISRUNNING){
sourceContext.collect(COUNT);
COUNT++;
Thread.sleep(1000);
}
}
public void cancel() {
ISRUNNING=false;
}
}
package com.caozg.stream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* 实现自定义数据源的聚合操作
*
* @ProjectName: FlinkLearning
* @Package: com.caozg.stream
* @ClassName: DefinitionSerialSource
* @Author: GoSaint
* @CreateDate: 19-10-30 下午10:29
* @UpdateDate: 19-10-30 下午10:29
* @Version: 1.0
*/
public class DefinitionSerialSource {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> text = env.addSource(new SelfdDefinitionSerialSourceFunction());
DataStream<Long> res = text.map(new MapFunction<Long, Long>() {
public Long map(Long out) throws Exception {
System.out.println("接受到的数据: " + out);
return out;
}//每2秒接受数据,然后sum求和
}).timeWindowAll(Time.seconds(2)).sum(0);
res.print().setParallelism(1);
try {
env.execute("DefinitionSerialSource");
} catch (Exception e) {
e.printStackTrace();
}
}
}
结果如下所示:
接受到的数据: 0
0
接受到的数据: 1
接受到的数据: 2
3
接受到的数据: 3
接受到的数据: 4
7
接受到的数据: 5
接受到的数据: 6
11
接受到的数据: 7
接受到的数据: 8
demo2: 实现多并行度的自定义source
实现并行化的自定义source ①实现ParallelSourceFunction ②继承RichParallelSourceFunction。这里我选择继承RichParallelSourceFunction,实现接口的写法和单线程的基本一致。
package com.caozg.stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
/**
*
* @ProjectName: FlinkLearning
* @Package: com.caozg.stream
* @ClassName: SelfdDefinitionParallelSourceFunction
* @Description: 自定义source,需要实现SourceFunction,并且指定泛型类型。这里设置并行执行
* @Author: GoSaint
* @CreateDate: 19-10-30 下午10:24
* @UpdateDate: 19-10-30 下午10:24
* @Version: 1.0
*/
public class SelfdDefinitionParallelSourceFunction extends RichParallelSourceFunction<Long> {
private boolean ISRUNNING=true;
private Long COUNT=0L;
public void run(SourceContext<Long> sourceContext) throws Exception {
while (ISRUNNING){
sourceContext.collect(COUNT);
COUNT++;
Thread.sleep(1000);
}
}
public void cancel() {
ISRUNNING=false;
}
/**
* 打开链接
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
/**
* 关闭链接
* @throws Exception
*/
@Override
public void close() throws Exception {
super.close();
}
}
package com.caozg.stream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class DefinitionParallelSource {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> text = env.addSource(new SelfdDefinitionParallelSourceFunction());
DataStream<Long> res = text.map(new MapFunction<Long, Long>() {
public Long map(Long out) throws Exception {
System.out.println("接受到的数据: " + out);
return out;
}//每2秒接受数据,然后sum求和
}).timeWindowAll(Time.seconds(2)).sum(0);
//这里不设置并行度,默认多线程执行
res.print();
try {
env.execute("DefinitionSerialSource");
} catch (Exception e) {
e.printStackTrace();
}
}
}
结果如下:
接受到的数据: 0
3> 0
接受到的数据: 1
接受到的数据: 1
接受到的数据: 2
接受到的数据: 2
接受到的数据: 2
4> 8
接受到的数据: 3
接受到的数据: 4
接受到的数据: 4
1> 19
接受到的数据: 5
接受到的数据: 5
接受到的数据: 6
2> 35
接受到的数据: 7
接受到的数据: 7