欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

第 10 节 DataStream之source讲解(java)

程序员文章站 2022-03-14 19:05:44
...

上篇:第 9 节 Flink scala shell代码调试


Flink API的抽象级别

第 10 节 DataStream之source讲解(java)
第 10 节 DataStream之source讲解(java)


1、DataStream API之Data Sources

  1. source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。
  2. flink提供了大量的已经实现好的source方法,你也可以自定义source
    通过实现sourceFunction接口来自定义无并行度的source,
    或者你也可以通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义有并行度的source。

2、DataStream API之Data Sources

  1. 基于文件
    readTextFile(path)
    读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。
  2. 基于socket
    socketTextStream从socker中读取数据,元素可以通过一个分隔符切开。
  3. 基于集合
    fromCollection(Collection)
    通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。
  4. 自定义输入
    addSource 可以实现读取第三方数据源的数据
    系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】

3、内置Connectors

  1. Apache Kafka (source/sink)
  2. Apache Cassandra (sink)
  3. Elasticsearch (sink)
  4. Hadoop FileSystem (sink)
  5. RabbitMQ (source/sink)
  6. Apache ActiveMQ (source/sink)
  7. Redis (sink)

4、Source 容错性保证

Source 语义保证 备注
kafka exactly once(仅一次) 建议使用0.10及以上
Collections exactly once
Files exactly once
Socktes at most once

5、代码操作

(1)在pom文件依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example.flink01</groupId>
    <artifactId>flink01</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.6.1</version>
           <!-- //   <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <!-- scala编译插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.1.6</version>
                <configuration>
                    <scalaCompatVersion>2.11</scalaCompatVersion>
                    <scalaVersion>2.11.12</scalaVersion>
                    <encoding>UTF-8</encoding>
                </configuration>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile-scala</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- 可以设置jar包的入口类(可选) -->
                            <mainClass>xuwei.streaming.SocketWindowWordCountJava</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

(2)代码编写:

package xuwei.streaming;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

/**
 * Collection作为数据源
 */
public class StreamingFromCollection {
    public static void main(String[] args)throws Exception {
        //获取flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Integer> data = new ArrayList<>();
        data.add(10);
        data.add(15);
        data.add(20);

        //指定数据源
        DataStreamSource<Integer> collectionData = env.fromCollection(data);

        //通过map数据进行处理
        DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer value) throws Exception {
                return value + 1;
            }
        });
        //直接打印
        num.print();

        env.execute("StreamingFromCollection");

    }
}

(3)控制台打印信息:
第 10 节 DataStream之source讲解(java)

相关标签: Flink入门实战