欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink DataStream API之Data Sources

程序员文章站 2022-03-08 09:10:09
...

1、介绍

基于文件 

   readTextFile(path) 读文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。

基于socket

  socketTextStream  从socker中读取数据,元素可以通过一个分隔符切开。

基于集合

  fromCollection(Collection) 通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。

自定义输入

addSource 可以实现读取第三方数据源的数据

另外,系统内置提供了一批connectors,连接器会提供对应的source支持【如kafka

2、内置Connectors

Apache Kafka (source/sink)

Apache Cassandra (sink)

Elasticsearch (sink)

Hadoop FileSystem (sink)

RabbitMQ (source/sink)

Apache ActiveMQ (source/sink)

Redis (sink)

3、Source 容错性保证

Source

语义保证

备注

kafka

exactly once(仅一次)

建议使用0.10及以上

Collections

exactly once

 

Files

exactly once

 

Socktes

at most once

 

4、自定义source

实现并行度为1的自定义source(实现SourceFunction)

package com.flink.learn.custom;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class NoParalleSource implements SourceFunction<Long> {
    private long count =1;
    private boolean isRun = true;
    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while (isRun) {
            ctx.collect(count++);
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRun = false;
    }
}

实现并行化的自定义source(实现ParallelSourceFunction )

import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

public class ParalleSource implements ParallelSourceFunction<Long> {
    private long count = 1;
    private boolean isRun = true;

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while (isRun) {
            ctx.collect(count++);
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRun = false;
    }
}

实现并行化的自定义source(继承RichParallelSourceFunction,open方法在初始化时调用,并且只调用一次)

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class RichParalleSource extends RichParallelSourceFunction<Long> {
    private long count = 1;
    private boolean isRun = true;

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while (isRun) {
            ctx.collect(count++);
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRun = false;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("open.............");
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }
}

自定义source测试

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TestCustomSource {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         DataStream<Long> text = env.addSource(new NoParalleSource());
        // DataStream<Long> text = env.addSource(new ParalleSource()).setParallelism(2);
        // DataStream<Long> text = env.addSource(new RichParalleSource()).setParallelism(2);
        SingleOutputStreamOperator<Long> num = text.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("this out:" + value);
                return value;
            }
        });
        SingleOutputStreamOperator<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
        sum.print();
        String name = TestCustomSourceScala.class.getSimpleName();
        env.execute(name);
    }
}

 

相关标签: Flink