flink source和sink
程序员文章站
2022-03-08 09:20:22
...
flink中的source作为整个stream中的入口,而sink作为整个stream的终点。
SourceFunction为所有flink中source的根接口,其定义了run()方法和cancel()方法。
在SourceFunction中的run()方法用以作为source向整个stream发出数据,并用以控制数据的进入。
在大部分的source中,会在run()方法中对数据的发出采用循环,而可以在cancel()方法中定义退出循环及退出数据发送的逻辑。
根据sourceFunction中给出的例子,一个标准的带有checkpointed source如下所示:
public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
* private long count = 0L;
* private volatile boolean isRunning = true;
*
* private transient ListState<Long> checkpointedCount;
*
* public void run(SourceContext<T> ctx) {
* while (isRunning && count < 1000) {
* // this synchronized block ensures that state checkpointing,
* // internal state updates and emission of elements are an atomic operation
* synchronized (ctx.getCheckpointLock()) {
* ctx.collect(count);
* count++;
* }
* }
* }
*
* public void cancel() {
* isRunning = false;
* }
*
* public void initializeState(FunctionInitializationContext context) {
* this.checkpointedCount = context
* .getOperatorStateStore()
* .getListState(new ListStateDescriptor<>("count", Long.class));
*
* if (context.isRestored()) {
* for (Long count : this.checkpointedCount.get()) {
* this.count = count;
* }
* }
* }
*
* public void snapshotState(FunctionSnapshotContext context) {
* this.checkpointedCount.clear();
* this.checkpointedCount.add(count);
* }
* }
* }
sourceContext是在source中具体发送数据的上下文,一般在source中通过其collect()来具体完成数据的发送。
sourceContext的方法主要有collect(),collectWithTimestamp(),emitWatermark(),markAsTemporarilyidle()方法。
其中collect()用以发出数据,而collectWithTimestamp()方法在发送数据的同时带上指定 的时间戳,emitWatermark()用以确定在某一时间之前的数据已经全部发出,markAsTemporarilyidle()方法则用以标记该source将处于空闲状态,将不会再继续发送数据。
Sink是流的重点,根接口是sinkFunction。
其重要的方法是invoke()方法,用以实现结果数据的处理逻辑,在sink的最简单实现DiscardingSink中,invoke()方法没有任何实现,则代表对结果数据不进行任何处理,直接废弃。
推荐阅读
-
Mysql导入导出工具Mysqldump和Source命令用法详解
-
Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文
-
ApacheFlink高级特性与高级应用之Flink中参数传递和容错设定
-
Flink本地安装和创建Flink应用
-
深入了解 Flink 网络栈(二):监控、指标和处理背压 工作网络协议jvm活动apache
-
flink sink hudi: org.apache.hudi.org.apache.avro.InvalidAvroMagicException: Not an Avro data file
-
搭建Flink和Hudi本地开发环境
-
1.11.Flink DataSetAPI、DataSet API之Data Sources、DataSet API之Transformations、DataSet Sink部分详解
-
Flink1.11.2-pg-source&sink
-
flink的常用Source和Sink