第 10 节 DataStream之source讲解(java)
程序员文章站
2022-03-14 19:05:44
...
上篇:第 9 节 Flink scala shell代码调试
Flink API的抽象级别
1、DataStream API之Data Sources
- source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。
- flink提供了大量的已经实现好的source方法,你也可以自定义source
通过实现sourceFunction接口来自定义无并行度的source,
或者你也可以通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义有并行度的source。
2、DataStream API之Data Sources
-
基于文件
readTextFile(path)
读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。 -
基于socket
socketTextStream从socker中读取数据,元素可以通过一个分隔符切开。 -
基于集合
fromCollection(Collection)
通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。 -
自定义输入
addSource 可以实现读取第三方数据源的数据
系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】
3、内置Connectors
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache ActiveMQ (source/sink)
- Redis (sink)
4、Source 容错性保证
Source | 语义保证 | 备注 |
---|---|---|
kafka | exactly once(仅一次) | 建议使用0.10及以上 |
Collections | exactly once | |
Files | exactly once | |
Socktes | at most once |
5、代码操作
(1)在pom文件依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example.flink01</groupId>
<artifactId>flink01</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.1</version>
<!-- // <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>xuwei.streaming.SocketWindowWordCountJava</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
(2)代码编写:
package xuwei.streaming;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
/**
* Collection作为数据源
*/
public class StreamingFromCollection {
public static void main(String[] args)throws Exception {
//获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ArrayList<Integer> data = new ArrayList<>();
data.add(10);
data.add(15);
data.add(20);
//指定数据源
DataStreamSource<Integer> collectionData = env.fromCollection(data);
//通过map数据进行处理
DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return value + 1;
}
});
//直接打印
num.print();
env.execute("StreamingFromCollection");
}
}
(3)控制台打印信息: